You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/12/24 18:07:17 UTC
[pulsar] branch master updated: [pulsar-flink] add streaming
connectors as a Pulsar stream that serializes data in Avro format (#3231)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7a1a148 [pulsar-flink] add streaming connectors as a Pulsar stream that serializes data in Avro format (#3231)
7a1a148 is described below
commit 7a1a14855b7a08a4dd22c2f97b4bb79de33731b7
Author: wpl <12...@qq.com>
AuthorDate: Tue Dec 25 02:07:12 2018 +0800
[pulsar-flink] add streaming connectors as a Pulsar stream that serializes data in Avro format (#3231)
### Motivation
add Avro data format flink streaming connectors
### Modifications
add class PulsarAvroTableSink
### Result
add append-only table sink for serializes data in Avro format.
---
.gitignore | 4 +-
examples/flink-consumer-source/pom.xml | 12 ++
.../example/FlinkPulsarBatchAvroSinkExample.java | 2 +-
...lsarConsumerSourceWordCountToAvroTableSink.java | 120 +++++++++++++++
...lsarConsumerSourceWordCountToJsonTableSink.java | 133 ++++++++++++++++
.../src/main/resources/avro/NasaMission.avsc | 12 +-
.../FlinkPulsarBatchAvroSinkScalaExample.scala | 2 +-
pulsar-flink/pom.xml | 4 +
.../connectors/pulsar/PulsarAvroTableSink.java | 169 +++++++++++++++++++++
.../serialization/AvroSerializationSchemaTest.java | 2 +-
.../connectors/pulsar/PulsarAvroTableSinkTest.java | 106 +++++++++++++
.../connectors/pulsar/PulsarJsonTableSinkTest.java | 100 ++++++++++++
.../src/test/resources/avro/NasaMission.avsc | 2 +-
13 files changed, 661 insertions(+), 7 deletions(-)
diff --git a/.gitignore b/.gitignore
index bd3c93e..4aef5a0 100644
--- a/.gitignore
+++ b/.gitignore
@@ -80,5 +80,5 @@ docker.debug-info
**/website/translated_docs*
# Avro
-examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/avro/generated
-pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/avro/generated
+examples/flink-consumer-source/src/main/java/org/apache/flink/avro/generated
+pulsar-flink/src/test/java/org/apache/flink/avro/generated
diff --git a/examples/flink-consumer-source/pom.xml b/examples/flink-consumer-source/pom.xml
index 3c08697..35f5924 100644
--- a/examples/flink-consumer-source/pom.xml
+++ b/examples/flink-consumer-source/pom.xml
@@ -61,6 +61,18 @@
</dependency>
<dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-flink</artifactId>
<version>${project.version}</version>
diff --git a/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java b/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java
index 553bf6c..ef0048c 100644
--- a/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java
+++ b/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java
@@ -22,8 +22,8 @@ package org.apache.flink.batch.connectors.pulsar.example;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.avro.generated.NasaMission;
import org.apache.flink.batch.connectors.pulsar.PulsarAvroOutputFormat;
-import org.apache.flink.batch.connectors.pulsar.avro.generated.NasaMission;
import java.util.Arrays;
import java.util.List;
diff --git a/examples/flink-consumer-source/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToAvroTableSink.java b/examples/flink-consumer-source/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToAvroTableSink.java
new file mode 100644
index 0000000..7b78da5
--- /dev/null
+++ b/examples/flink-consumer-source/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToAvroTableSink.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flink.streaming.connectors.pulsar.example;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.avro.generated.WordWithCount;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.connectors.pulsar.PulsarAvroTableSink;
+import org.apache.flink.streaming.connectors.pulsar.PulsarSourceBuilder;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.table.sinks.CsvTableSink;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.pulsar.client.api.ProducerConfiguration;
+
+/**
+ * Implements a streaming wordcount program on pulsar topics.
+ *
+ * <p>Example usage:
+ * --service-url pulsar://localhost:6650 --input-topic test_topic --subscription test_sub
+ */
+public class PulsarConsumerSourceWordCountToAvroTableSink {
+ private static final String SERVICE_URL = "pulsar://localhost:6650";
+ private static final String ROUTING_KEY = "word";
+
+ public static void main(String[] args) throws Exception {
+ // parse input arguments
+ final ParameterTool parameterTool = ParameterTool.fromArgs(args);
+
+ if (parameterTool.getNumberOfParameters() < 2) {
+ System.out.println("Missing parameters!");
+ System.out.println("Usage: pulsar --service-url <pulsar-service-url> --input-topic <topic> --subscription <sub> --output-topic <topic>");
+ return;
+ }
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().disableSysoutLogging();
+ env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
+ env.enableCheckpointing(5000);
+ env.getConfig().setGlobalJobParameters(parameterTool);
+ env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+ StreamTableEnvironment tableEnvironment = StreamTableEnvironment.getTableEnvironment(env);
+
+ String serviceUrl = parameterTool.getRequired("service-url");
+ String inputTopic = parameterTool.getRequired("input-topic");
+ String subscription = parameterTool.get("subscription", "flink-examples");
+ String outputTopic = parameterTool.get("output-topic", null);
+ int parallelism = parameterTool.getInt("parallelism", 1);
+
+ System.out.println("Parameters:");
+ System.out.println("\tServiceUrl:\t" + serviceUrl);
+ System.out.println("\tInputTopic:\t" + inputTopic);
+ System.out.println("\tSubscription:\t" + subscription);
+ System.out.println("\tOutputTopic:\t" + outputTopic);
+ System.out.println("\tParallelism:\t" + parallelism);
+
+ PulsarSourceBuilder<String> builder = PulsarSourceBuilder.builder(new SimpleStringSchema())
+ .serviceUrl(serviceUrl)
+ .topic(inputTopic)
+ .subscriptionName(subscription);
+ SourceFunction<String> src = builder.build();
+ DataStream<String> input = env.addSource(src);
+
+
+ DataStream<WordWithCount> wc = input
+ .flatMap((FlatMapFunction<String, WordWithCount>) (line, collector) -> {
+ for (String word : line.split("\\s")) {
+ collector.collect(
+ WordWithCount.newBuilder().setWord(word).setCount(1).build()
+ );
+ }
+ })
+ .returns(WordWithCount.class)
+ .keyBy("word")
+ .timeWindow(Time.seconds(5))
+ .reduce((ReduceFunction<WordWithCount>) (c1, c2) ->
+ WordWithCount.newBuilder().setWord(c1.getWord()).setCount(c1.getCount() + c2.getCount()).build()
+ );
+
+ tableEnvironment.registerDataStream("wc",wc);
+
+ Table table = tableEnvironment.sqlQuery("select * from wc");
+ if (null != outputTopic) {
+ PulsarAvroTableSink sink = new PulsarAvroTableSink(SERVICE_URL, outputTopic, new ProducerConfiguration(), ROUTING_KEY,WordWithCount.class);
+ table.writeToSink(sink);
+ } else {
+ TableSink sink = new CsvTableSink("./examples/file", "|");
+ // print the results with a csv file
+ table.writeToSink(sink);
+ }
+
+ env.execute("Pulsar Stream WordCount");
+ }
+
+}
diff --git a/examples/flink-consumer-source/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToJsonTableSink.java b/examples/flink-consumer-source/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToJsonTableSink.java
new file mode 100644
index 0000000..95b2536
--- /dev/null
+++ b/examples/flink-consumer-source/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToJsonTableSink.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flink.streaming.connectors.pulsar.example;
+
+import lombok.AllArgsConstructor;
+import lombok.NoArgsConstructor;
+import lombok.ToString;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.connectors.pulsar.PulsarJsonTableSink;
+import org.apache.flink.streaming.connectors.pulsar.PulsarSourceBuilder;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.table.sinks.CsvTableSink;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.pulsar.client.api.ProducerConfiguration;
+
+/**
+ * Implements a streaming wordcount program on pulsar topics.
+ *
+ * <p>Example usage:
+ * --service-url pulsar://localhost:6650 --input-topic test_topic --subscription test_sub
+ */
+public class PulsarConsumerSourceWordCountToJsonTableSink {
+ private static final String SERVICE_URL = "pulsar://localhost:6650";
+ private static final String ROUTING_KEY = "word";
+
+ public static void main(String[] args) throws Exception {
+ // parse input arguments
+ final ParameterTool parameterTool = ParameterTool.fromArgs(args);
+
+ if (parameterTool.getNumberOfParameters() < 2) {
+ System.out.println("Missing parameters!");
+ System.out.println("Usage: pulsar --service-url <pulsar-service-url> --input-topic <topic> --subscription <sub> --output-topic <topic>");
+ return;
+ }
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().disableSysoutLogging();
+ env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
+ env.enableCheckpointing(5000);
+ env.getConfig().setGlobalJobParameters(parameterTool);
+ env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+ StreamTableEnvironment tableEnvironment = StreamTableEnvironment.getTableEnvironment(env);
+
+ String serviceUrl = parameterTool.getRequired("service-url");
+ String inputTopic = parameterTool.getRequired("input-topic");
+ String subscription = parameterTool.get("subscription", "flink-examples");
+ String outputTopic = parameterTool.get("output-topic", null);
+ int parallelism = parameterTool.getInt("parallelism", 1);
+
+ System.out.println("Parameters:");
+ System.out.println("\tServiceUrl:\t" + serviceUrl);
+ System.out.println("\tInputTopic:\t" + inputTopic);
+ System.out.println("\tSubscription:\t" + subscription);
+ System.out.println("\tOutputTopic:\t" + outputTopic);
+ System.out.println("\tParallelism:\t" + parallelism);
+
+ PulsarSourceBuilder<String> builder = PulsarSourceBuilder.builder(new SimpleStringSchema())
+ .serviceUrl(serviceUrl)
+ .topic(inputTopic)
+ .subscriptionName(subscription);
+ SourceFunction<String> src = builder.build();
+ DataStream<String> input = env.addSource(src);
+
+
+ DataStream<WordWithCount> wc = input
+ .flatMap((FlatMapFunction<String, WordWithCount>) (line, collector) -> {
+ for (String word : line.split("\\s")) {
+ collector.collect(
+ new WordWithCount(word, 1)
+ );
+ }
+ })
+ .returns(WordWithCount.class)
+ .keyBy("word")
+ .timeWindow(Time.seconds(5))
+ .reduce((ReduceFunction<WordWithCount>) (c1, c2) ->
+ new WordWithCount(c1.word, c1.count + c2.count));
+
+ tableEnvironment.registerDataStream("wc",wc);
+
+ Table table = tableEnvironment.sqlQuery("select * from wc");
+ if (null != outputTopic) {
+ PulsarJsonTableSink sink = new PulsarJsonTableSink(SERVICE_URL, outputTopic, new ProducerConfiguration(), ROUTING_KEY);
+ table.writeToSink(sink);
+ } else {
+ TableSink sink = new CsvTableSink("./examples/file", "|");
+ // print the results with a csv file
+ table.writeToSink(sink);
+ }
+
+ env.execute("Pulsar Stream WordCount");
+ }
+
+ /**
+ * Data type for words with count.
+ */
+ @AllArgsConstructor
+ @NoArgsConstructor
+ @ToString
+ public static class WordWithCount {
+
+ public String word;
+ public long count;
+
+ }
+}
diff --git a/examples/flink-consumer-source/src/main/resources/avro/NasaMission.avsc b/examples/flink-consumer-source/src/main/resources/avro/NasaMission.avsc
index 4a669e0..45adc98 100644
--- a/examples/flink-consumer-source/src/main/resources/avro/NasaMission.avsc
+++ b/examples/flink-consumer-source/src/main/resources/avro/NasaMission.avsc
@@ -1,4 +1,5 @@
-{"namespace": "org.apache.flink.batch.connectors.pulsar.avro.generated",
+[
+{"namespace": "org.apache.flink.avro.generated",
"type": "record",
"name": "NasaMission",
"fields": [
@@ -7,4 +8,13 @@
{"name": "start_year", "type": ["int", "null"]},
{"name": "end_year", "type": ["int", "null"]}
]
+},
+{"namespace": "org.apache.flink.avro.generated",
+ "type": "record",
+ "name": "WordWithCount",
+ "fields": [
+ {"name": "word", "type": "string"},
+ {"name": "count", "type": "long"}
+ ]
}
+]
diff --git a/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkScalaExample.scala b/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkScalaExample.scala
index 5f95611..0d255f2 100644
--- a/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkScalaExample.scala
+++ b/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkScalaExample.scala
@@ -19,8 +19,8 @@
package org.apache.flink.batch.connectors.pulsar.example
import org.apache.flink.api.scala._
+import org.apache.flink.avro.generated.NasaMission
import org.apache.flink.batch.connectors.pulsar.PulsarAvroOutputFormat
-import org.apache.flink.batch.connectors.pulsar.avro.generated.NasaMission
/**
* Implements a batch Scala program on Pulsar topic by writing Flink DataSet as Avro.
diff --git a/pulsar-flink/pom.xml b/pulsar-flink/pom.xml
index 92eb045..74d3e8c 100644
--- a/pulsar-flink/pom.xml
+++ b/pulsar-flink/pom.xml
@@ -111,6 +111,10 @@
<directory>src/main/resources</directory>
<filtering>true</filtering>
</resource>
+ <resource>
+ <directory>src/test/resources</directory>
+ <filtering>true</filtering>
+ </resource>
</resources>
<plugins>
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSink.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSink.java
new file mode 100644
index 0000000..9187a0d
--- /dev/null
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSink.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flink.streaming.connectors.pulsar;
+
+import org.apache.avro.Schema;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.formats.avro.AvroRowSerializationSchema;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor;
+import org.apache.flink.table.sinks.AppendStreamTableSink;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.types.Row;
+import org.apache.pulsar.client.api.ProducerConfiguration;
+
+import java.util.Arrays;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * An append-only table sink to emit a streaming table as a Pulsar stream that serializes data in Avro format.
+ */
+public class PulsarAvroTableSink implements AppendStreamTableSink<Row> {
+
+ protected final String serviceUrl;
+ protected final String topic;
+ protected final ProducerConfiguration producerConf;
+ protected final String routingKeyFieldName;
+ protected SerializationSchema<Row> serializationSchema;
+ protected String[] fieldNames;
+ protected TypeInformation[] fieldTypes;
+ protected PulsarKeyExtractor<Row> keyExtractor;
+ private Class<? extends SpecificRecord> recordClazz;
+
+ /**
+ * Create PulsarAvroTableSink.
+ *
+ * @param serviceUrl pulsar service url
+ * @param topic topic in pulsar to which table is written
+ * @param producerConf producer configuration
+ * @param routingKeyFieldName routing key field name
+ */
+ public PulsarAvroTableSink(
+ String serviceUrl,
+ String topic,
+ ProducerConfiguration producerConf,
+ String routingKeyFieldName,
+ Class<? extends SpecificRecord> recordClazz) {
+ this.serviceUrl = checkNotNull(serviceUrl, "Service url not set");
+ this.topic = checkNotNull(topic, "Topic is null");
+ this.producerConf = checkNotNull(producerConf, "Producer configuration not set");
+ this.routingKeyFieldName = routingKeyFieldName;
+ this.recordClazz = recordClazz;
+ }
+
+ /**
+ * Returns the low-level producer.
+ */
+ protected FlinkPulsarProducer<Row> createFlinkPulsarProducer() {
+ serializationSchema = new AvroRowSerializationSchema(recordClazz);
+ return new FlinkPulsarProducer<Row>(
+ serviceUrl,
+ topic,
+ serializationSchema,
+ producerConf,
+ keyExtractor);
+ }
+
+ @Override
+ public void emitDataStream(DataStream<Row> dataStream) {
+ checkState(fieldNames != null, "Table sink is not configured");
+ checkState(fieldTypes != null, "Table sink is not configured");
+ checkState(serializationSchema != null, "Table sink is not configured");
+ checkState(keyExtractor != null, "Table sink is not configured");
+ FlinkPulsarProducer<Row> producer = createFlinkPulsarProducer();
+ dataStream.addSink(producer);
+ }
+
+ @Override
+ public TypeInformation<Row> getOutputType() {
+ return new RowTypeInfo(fieldTypes, fieldNames);
+ }
+
+ @Override
+ public String[] getFieldNames() {
+ return fieldNames;
+ }
+
+ @Override
+ public TypeInformation<?>[] getFieldTypes() {
+ return fieldTypes;
+ }
+
+ @Override
+ public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
+ PulsarAvroTableSink sink = new PulsarAvroTableSink(serviceUrl, topic, producerConf, routingKeyFieldName, recordClazz);
+
+ sink.fieldNames = checkNotNull(fieldNames, "Field names are null");
+ sink.fieldTypes = checkNotNull(fieldTypes, "Field types are null");
+ checkArgument(fieldNames.length == fieldTypes.length,
+ "Number of provided field names and types do not match");
+
+ sink.serializationSchema = new AvroRowSerializationSchema(recordClazz);
+ sink.keyExtractor = new AvroKeyExtractor(
+ routingKeyFieldName,
+ fieldNames,
+ fieldTypes,
+ recordClazz);
+
+ return sink;
+ }
+
+
+ /**
+ * A key extractor that extracts the routing key from a {@link Row} by field name.
+ */
+ private static class AvroKeyExtractor implements PulsarKeyExtractor<Row> {
+ private final int keyIndex;
+
+ public AvroKeyExtractor(
+ String keyFieldName,
+ String[] fieldNames,
+ TypeInformation<?>[] fieldTypes,
+ Class<? extends SpecificRecord> recordClazz) {
+
+ checkArgument(fieldNames.length == fieldTypes.length,
+ "Number of provided field names and types does not match.");
+
+ Schema schema = SpecificData.get().getSchema(recordClazz);
+ Schema.Field keyField = schema.getField(keyFieldName);
+ Schema.Type keyType = keyField.schema().getType();
+
+ int keyIndex = Arrays.asList(fieldNames).indexOf(keyFieldName);
+ checkArgument(keyIndex >= 0,
+ "Key field '" + keyFieldName + "' not found");
+
+ checkArgument(Schema.Type.STRING.equals(keyType),
+ "Key field must be of type 'STRING'");
+ this.keyIndex = keyIndex;
+ }
+
+ @Override
+ public String getKey(Row event) {
+ return (String) event.getField(keyIndex);
+ }
+ }
+
+}
diff --git a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/serialization/AvroSerializationSchemaTest.java b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/serialization/AvroSerializationSchemaTest.java
index 176acc4..4f8f0c6 100644
--- a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/serialization/AvroSerializationSchemaTest.java
+++ b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/serialization/AvroSerializationSchemaTest.java
@@ -19,7 +19,7 @@
package org.apache.flink.batch.connectors.pulsar.serialization;
import org.apache.avro.generic.GenericRecord;
-import org.apache.flink.batch.connectors.pulsar.avro.generated.NasaMission;
+import org.apache.flink.avro.generated.NasaMission;
import org.apache.flink.formats.avro.AvroDeserializationSchema;
import org.junit.Test;
diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java
new file mode 100644
index 0000000..ae336ff
--- /dev/null
+++ b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flink.streaming.connectors.pulsar;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.avro.generated.NasaMission;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.types.Row;
+import org.apache.pulsar.client.api.ProducerConfiguration;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.internal.util.reflection.Whitebox;
+import org.powermock.api.mockito.PowerMockito;
+
+/**
+ * Unit test of {@link PulsarAvroTableSink}.
+ */
+public class PulsarAvroTableSinkTest {
+ private static final String SERVICE_URL = "pulsar://localhost:6650";
+ private static final String TOPIC_NAME = "test_topic";
+ private static final String ROUTING_KEY = "name";
+
+ private final String[] fieldNames = {"id", "name","start_year","end_year"};
+ private final TypeInformation[] typeInformations = {
+ TypeInformation.of(Integer.class),
+ TypeInformation.of(String.class),
+ TypeInformation.of(Integer.class),
+ TypeInformation.of(Integer.class)
+ };
+
+ /**
+ * Test configure PulsarTableSink.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testConfigure() throws Exception {
+ PulsarAvroTableSink sink = spySink();
+
+ TableSink<Row> configuredSink = sink.configure(fieldNames, typeInformations);
+
+ Assert.assertArrayEquals(fieldNames, configuredSink.getFieldNames());
+ Assert.assertArrayEquals(typeInformations, configuredSink.getFieldTypes());
+ Assert.assertNotNull(((PulsarAvroTableSink) configuredSink).keyExtractor);
+ Assert.assertNotNull(((PulsarAvroTableSink) configuredSink).serializationSchema);
+ }
+
+
+ /**
+ * Test emit data stream.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testEmitDataStream() throws Exception {
+ DataStream mockedDataStream = Mockito.mock(DataStream.class);
+
+ PulsarAvroTableSink sink = spySink();
+
+ sink.emitDataStream(mockedDataStream);
+
+ Mockito.verify(mockedDataStream).addSink(Mockito.any(FlinkPulsarProducer.class));
+ }
+
+
+ private PulsarAvroTableSink spySink() throws Exception {
+
+ PulsarAvroTableSink sink = new PulsarAvroTableSink(SERVICE_URL, TOPIC_NAME, new ProducerConfiguration(), ROUTING_KEY,NasaMission.class);
+ FlinkPulsarProducer producer = Mockito.mock(FlinkPulsarProducer.class);
+ PowerMockito.whenNew(
+ FlinkPulsarProducer.class
+ ).withArguments(
+ Mockito.anyString(),
+ Mockito.anyString(),
+ Mockito.any(SerializationSchema.class),
+ Mockito.any(PowerMockito.class),
+ Mockito.any(PulsarKeyExtractor.class)
+ ).thenReturn(producer);
+ Whitebox.setInternalState(sink, "fieldNames", fieldNames);
+ Whitebox.setInternalState(sink, "fieldTypes", typeInformations);
+ Whitebox.setInternalState(sink, "serializationSchema", Mockito.mock(SerializationSchema.class));
+ Whitebox.setInternalState(sink, "keyExtractor", Mockito.mock(PulsarKeyExtractor.class));
+ return sink;
+ }
+
+}
diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java
new file mode 100644
index 0000000..746e87e
--- /dev/null
+++ b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flink.streaming.connectors.pulsar;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.types.Row;
+
+import org.apache.pulsar.client.api.ProducerConfiguration;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.internal.util.reflection.Whitebox;
+import org.powermock.api.mockito.PowerMockito;
+
+/**
+ * Unit test of {@link PulsarJsonTableSink}.
+ */
+public class PulsarJsonTableSinkTest {
+
+ private static final String SERVICE_URL = "pulsar://localhost:6650";
+ private static final String TOPIC_NAME = "test_topic";
+ private static final String ROUTING_KEY = "key";
+ private final String[] fieldNames = {"key", "value"};
+ private final TypeInformation[] typeInformations = {
+ TypeInformation.of(String.class),
+ TypeInformation.of(String.class)
+ };
+
+ /**
+ * Test configure PulsarTableSink.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testConfigure() throws Exception {
+ PulsarJsonTableSink sink = spySink();
+
+ TableSink<Row> configuredSink = sink.configure(fieldNames, typeInformations);
+
+ Assert.assertArrayEquals(fieldNames, configuredSink.getFieldNames());
+ Assert.assertArrayEquals(typeInformations, configuredSink.getFieldTypes());
+ Assert.assertNotNull(((PulsarJsonTableSink) configuredSink).keyExtractor);
+ Assert.assertNotNull(((PulsarJsonTableSink) configuredSink).serializationSchema);
+ }
+
+ /**
+ * Test emit data stream.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testEmitDataStream() throws Exception {
+ DataStream mockedDataStream = Mockito.mock(DataStream.class);
+
+ PulsarJsonTableSink sink = spySink();
+
+ sink.emitDataStream(mockedDataStream);
+
+ Mockito.verify(mockedDataStream).addSink(Mockito.any(FlinkPulsarProducer.class));
+ }
+
+ private PulsarJsonTableSink spySink() throws Exception {
+ PulsarJsonTableSink sink = new PulsarJsonTableSink(SERVICE_URL, TOPIC_NAME, new ProducerConfiguration(), ROUTING_KEY);
+ FlinkPulsarProducer producer = Mockito.mock(FlinkPulsarProducer.class);
+ PowerMockito.whenNew(
+ FlinkPulsarProducer.class
+ ).withArguments(
+ Mockito.anyString(),
+ Mockito.anyString(),
+ Mockito.any(SerializationSchema.class),
+ Mockito.any(PowerMockito.class),
+ Mockito.any(PulsarKeyExtractor.class)
+ ).thenReturn(producer);
+ Whitebox.setInternalState(sink, "fieldNames", fieldNames);
+ Whitebox.setInternalState(sink, "fieldTypes", typeInformations);
+ Whitebox.setInternalState(sink, "serializationSchema", Mockito.mock(SerializationSchema.class));
+ Whitebox.setInternalState(sink, "keyExtractor", Mockito.mock(PulsarKeyExtractor.class));
+ return sink;
+ }
+}
diff --git a/pulsar-flink/src/test/resources/avro/NasaMission.avsc b/pulsar-flink/src/test/resources/avro/NasaMission.avsc
index 4a669e0..521f475 100644
--- a/pulsar-flink/src/test/resources/avro/NasaMission.avsc
+++ b/pulsar-flink/src/test/resources/avro/NasaMission.avsc
@@ -1,4 +1,4 @@
-{"namespace": "org.apache.flink.batch.connectors.pulsar.avro.generated",
+{"namespace": "org.apache.flink.avro.generated",
"type": "record",
"name": "NasaMission",
"fields": [