You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/12/18 02:08:40 UTC

[GitHub] XiaoZYang closed pull request #5845: [FLINK-9168][flink-connectors]Pulsar Sink connector

XiaoZYang closed pull request #5845: [FLINK-9168][flink-connectors]Pulsar Sink connector
URL: https://github.com/apache/flink/pull/5845
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-connectors/flink-connector-pulsar/pom.xml b/flink-connectors/flink-connector-pulsar/pom.xml
new file mode 100644
index 00000000000..a0ac2c1facf
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/pom.xml
@@ -0,0 +1,121 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-connectors</artifactId>
+		<version>1.6-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-connector-pulsar_${scala.binary.version}</artifactId>
+	<name>flink-connector-pulsar</name>
+	<properties>
+		<pulsar.version>1.20.0-incubating</pulsar.version>
+	</properties>
+
+	<packaging>jar</packaging>
+
+	<dependencies>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-table_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+			<!-- Projects depending on this project, won't depend on flink-table. -->
+			<optional>true</optional>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.pulsar</groupId>
+			<artifactId>pulsar-client</artifactId>
+			<classifier>shaded</classifier>
+			<version>${pulsar.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-runtime_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-tests_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.javassist</groupId>
+			<artifactId>javassist</artifactId>
+			<version>3.20.0-GA</version>
+			<scope>test</scope>
+		</dependency>
+
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-jar-plugin</artifactId>
+				<executions>
+					<execution>
+						<goals>
+							<goal>test-jar</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+</project>
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
new file mode 100644
index 00000000000..64b1397c217
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pulsar;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor;
+import org.apache.flink.util.SerializableObject;
+
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageBuilder;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConfiguration;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.function.Function;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Flink Sink to produce data into a Pulsar topic.
+ */
+public class FlinkPulsarProducer<IN>
+		extends RichSinkFunction<IN>
+		implements CheckpointedFunction {
+
+	private static final Logger LOG = LoggerFactory.getLogger(FlinkPulsarProducer.class);
+
+	/**
+	 * The pulsar service url.
+	 */
+	protected final String serviceUrl;
+
+	/**
+	 * User defined configuration for the producer.
+	 */
+	protected final ProducerConfiguration producerConfig;
+
+	/**
+	 * The name of the default topic this producer is writing data to.
+	 */
+	protected final String defaultTopicName;
+
+	/**
+	 * (Serializable) SerializationSchema for turning objects used with Flink into.
+	 * byte[] for Pulsar.
+	 */
+	protected final SerializationSchema<IN> schema;
+
+	/**
+	 * User-provided key extractor for assigning a key to a pulsar message.
+	 */
+	protected final PulsarKeyExtractor<IN> flinkPulsarKeyExtractor;
+
+	/**
+	 * Produce Mode.
+	 */
+	protected PulsarProduceMode produceMode = PulsarProduceMode.AT_LEAST_ONE;
+
+	/**
+	 * If true, the producer will wait until all outstanding records have been send to the broker.
+	 */
+	protected boolean flushOnCheckpoint;
+
+	// -------------------------------- Runtime fields ------------------------------------------
+
+	/** Pulsar Producer instance. */
+	protected transient Producer producer;
+
+	/** The callback than handles error propagation or logging callbacks. */
+	protected transient Function<MessageId, MessageId> successCallback = msgId -> {
+		acknowledgeMessage();
+		return msgId;
+	};
+
+	protected transient Function<Throwable, MessageId> failureCallback;
+
+	/** Errors encountered in the async producer are stored here. */
+	protected transient volatile Exception asyncException;
+
+	/** Lock for accessing the pending records. */
+	protected final SerializableObject pendingRecordsLock = new SerializableObject();
+
+	/** Number of unacknowledged records. */
+	protected long pendingRecords;
+
+	public FlinkPulsarProducer(String serviceUrl,
+							String defaultTopicName,
+							SerializationSchema<IN> serializationSchema,
+							ProducerConfiguration producerConfig,
+							PulsarKeyExtractor<IN> keyExtractor) {
+		this.serviceUrl = checkNotNull(serviceUrl, "Service url not set");
+		this.defaultTopicName = checkNotNull(defaultTopicName, "TopicName not set");
+		this.schema = checkNotNull(serializationSchema, "Serialization Schema not set");
+		this.producerConfig = checkNotNull(producerConfig, "Producer Config is not set");
+		this.flinkPulsarKeyExtractor = getOrNullKeyExtractor(keyExtractor);
+		ClosureCleaner.ensureSerializable(serializationSchema);
+	}
+
+	// ---------------------------------- Properties --------------------------
+
+
+	/**
+	 * @return pulsar key extractor.
+	 */
+	public PulsarKeyExtractor<IN> getKeyExtractor() {
+		return flinkPulsarKeyExtractor;
+	}
+
+	/**
+	 * Gets this producer's operating mode.
+	 */
+	public PulsarProduceMode getProduceMode() {
+		return this.produceMode;
+	}
+
+	/**
+	 * Sets this producer's operating mode.
+	 *
+	 * @param produceMode The mode of operation.
+	 */
+	public void setProduceMode(PulsarProduceMode produceMode) {
+		this.produceMode = checkNotNull(produceMode);
+	}
+
+	/**
+	 * If set to true, the Flink producer will wait for all outstanding messages in the Pulsar buffers
+	 * to be acknowledged by the Pulsar producer on a checkpoint.
+	 * This way, the producer can guarantee that messages in the Pulsar buffers are part of the checkpoint.
+	 *
+	 * @param flush Flag indicating the flushing mode (true = flush on checkpoint)
+	 */
+	public void setFlushOnCheckpoint(boolean flush) {
+		this.flushOnCheckpoint = flush;
+	}
+
+	// ----------------------------------- Sink Methods --------------------------
+
+	@SuppressWarnings("unchecked")
+	private static final <T> PulsarKeyExtractor<T> getOrNullKeyExtractor(PulsarKeyExtractor<T> extractor) {
+		if (null == extractor) {
+			return PulsarKeyExtractor.NULL;
+		} else {
+			return extractor;
+		}
+	}
+
+	private Producer createProducer(ProducerConfiguration configuration) throws Exception {
+		PulsarClient client = PulsarClient.create(serviceUrl);
+		return client.createProducer(defaultTopicName, configuration);
+	}
+
+	/**
+	 * Initializes the connection to pulsar.
+	 *
+	 * @param parameters configuration used for initialization
+	 * @throws Exception
+	 */
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		this.producer = createProducer(producerConfig);
+
+		RuntimeContext ctx = getRuntimeContext();
+
+		LOG.info("Starting FlinkPulsarProducer ({}/{}) to produce into pulsar topic {}",
+			ctx.getIndexOfThisSubtask() + 1, ctx.getNumberOfParallelSubtasks(), defaultTopicName);
+
+		if (flushOnCheckpoint && !((StreamingRuntimeContext) this.getRuntimeContext()).isCheckpointingEnabled()) {
+			LOG.warn("Flushing on checkpoint is enabled, but checkpointing is not enabled. Disabling flushing.");
+			flushOnCheckpoint = false;
+		}
+
+		if (PulsarProduceMode.AT_MOST_ONCE == produceMode) {
+			this.failureCallback = cause -> {
+				LOG.error("Error while sending record to Pulsar : " + cause.getMessage(), cause);
+				return null;
+			};
+		} else if (PulsarProduceMode.AT_LEAST_ONE == produceMode){
+			this.failureCallback = cause -> {
+				if (null == asyncException) {
+					if (cause instanceof Exception) {
+						asyncException = (Exception) cause;
+					} else {
+						asyncException = new Exception(cause);
+					}
+				}
+				return null;
+			};
+		} else {
+			throw new UnsupportedOperationException("Unsupported produce mode " + produceMode);
+		}
+	}
+
+	@Override
+	public void invoke(IN value, Context context) throws Exception {
+		checkErroneous();
+
+		byte[] serializedValue = schema.serialize(value);
+
+		MessageBuilder msgBuilder = MessageBuilder.create();
+		if (null != context.timestamp()) {
+			msgBuilder = msgBuilder.setEventTime(context.timestamp());
+		}
+		String msgKey = flinkPulsarKeyExtractor.getKey(value);
+		if (null != msgKey) {
+			msgBuilder = msgBuilder.setKey(msgKey);
+		}
+		Message message = msgBuilder
+			.setContent(serializedValue)
+			.build();
+
+		if (flushOnCheckpoint) {
+			synchronized (pendingRecordsLock) {
+				pendingRecords++;
+			}
+		}
+		producer.sendAsync(message)
+			.thenApply(successCallback)
+			.exceptionally(failureCallback);
+	}
+
+	@Override
+	public void close() throws Exception {
+		if (producer != null) {
+			producer.close();
+		}
+
+		// make sure we propagate pending errors
+		checkErroneous();
+	}
+
+	// ------------------- Logic for handling checkpoint flushing -------------------------- //
+
+	private void acknowledgeMessage() {
+		if (flushOnCheckpoint) {
+			synchronized (pendingRecordsLock) {
+				pendingRecords--;
+				if (pendingRecords == 0) {
+					pendingRecordsLock.notifyAll();
+				}
+			}
+		}
+	}
+
+	@Override
+	public void snapshotState(FunctionSnapshotContext context) throws Exception {
+		// check for asynchronous errors and fail the checkpoint if necessary
+		checkErroneous();
+
+		if (flushOnCheckpoint) {
+			// wait until all the messages are acknowledged
+			synchronized (pendingRecordsLock) {
+				while (pendingRecords > 0) {
+					pendingRecordsLock.wait(100);
+				}
+			}
+
+			// if the flushed requests has errors, we should propagate it also and fail the checkpoint
+			checkErroneous();
+		}
+	}
+
+	@Override
+	public void initializeState(FunctionInitializationContext context) throws Exception {
+		// nothing to do
+	}
+
+	// ----------------------------------- Utilities --------------------------
+
+	protected void checkErroneous() throws Exception {
+		Exception e = asyncException;
+		if (e != null) {
+			// prevent double throwing
+			asyncException = null;
+			throw new Exception("Failed to send data to Kafka: " + e.getMessage(), e);
+		}
+	}
+
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSink.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSink.java
new file mode 100644
index 00000000000..2b2aef48c07
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSink.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pulsar;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.connectors.pulsar.serde.JsonRowSerializationSchema;
+import org.apache.flink.types.Row;
+
+import org.apache.pulsar.client.api.ProducerConfiguration;
+
+/**
+ * Base class for {@link PulsarTableSink} that serializes data in JSON format.
+ */
+public class PulsarJsonTableSink extends PulsarTableSink {
+
+	/**
+	 * Create PulsarJsonTableSink.
+	 *
+	 * @param serviceUrl pulsar service url
+	 * @param topic topic in pulsar to which table is written
+	 * @param producerConf producer configuration
+	 * @param routingKeyFieldName routing key field name
+	 */
+	public PulsarJsonTableSink(
+			String serviceUrl,
+			String topic,
+			ProducerConfiguration producerConf,
+			String routingKeyFieldName) {
+		super(serviceUrl, topic, producerConf, routingKeyFieldName);
+	}
+
+	@Override
+	protected SerializationSchema<Row> createSerializationSchema(RowTypeInfo rowSchema) {
+		return new JsonRowSerializationSchema(rowSchema);
+	}
+
+	@Override
+	protected PulsarTableSink createSink() {
+		return new PulsarJsonTableSink(
+			serviceUrl,
+			topic,
+			producerConf,
+			routingKeyFieldName);
+	}
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarProduceMode.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarProduceMode.java
new file mode 100644
index 00000000000..29f5cf2b392
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarProduceMode.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pulsar;
+
+/**
+ * The supported producing modes of operation for flink's pulsar producer.
+ */
+public enum PulsarProduceMode {
+
+	/**
+	 * Any produce failures will be ignored hence there could be data loss.
+	 */
+	AT_MOST_ONCE,
+
+	/**
+	 * The producer will ensure that all the events are persisted in pulsar.
+	 * There could be duplicate events written though.
+	 */
+	AT_LEAST_ONE,
+
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarTableSink.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarTableSink.java
new file mode 100644
index 00000000000..3c307b94fda
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarTableSink.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pulsar;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor;
+import org.apache.flink.table.sinks.AppendStreamTableSink;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.types.Row;
+
+import org.apache.pulsar.client.api.ProducerConfiguration;
+
+import java.util.Arrays;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * An append-only table sink to emit a streaming table as a Pulsar stream.
+ */
+public abstract class PulsarTableSink implements AppendStreamTableSink<Row> {
+
+	protected final String serviceUrl;
+	protected final String topic;
+	protected final ProducerConfiguration producerConf;
+	protected SerializationSchema<Row> serializationSchema;
+	protected PulsarKeyExtractor<Row> keyExtractor;
+	protected String[] fieldNames;
+	protected TypeInformation[] fieldTypes;
+	protected final String routingKeyFieldName;
+
+	public PulsarTableSink(
+			String serviceUrl,
+			String topic,
+			ProducerConfiguration producerConf,
+			String routingKeyFieldName) {
+		this.serviceUrl = checkNotNull(serviceUrl, "Service url not set");
+		this.topic = checkNotNull(topic, "Topic is null");
+		this.producerConf = checkNotNull(producerConf, "Producer configuration not set");
+		this.routingKeyFieldName = routingKeyFieldName;
+	}
+
+	/**
+	 * Create serialization schema for converting table rows into bytes.
+	 *
+	 * @param rowSchema the schema of the row to serialize.
+	 * @return Instance of serialization schema
+	 */
+	protected abstract SerializationSchema<Row> createSerializationSchema(RowTypeInfo rowSchema);
+
+	/**
+	 * Create a deep copy of this sink.
+	 *
+	 * @return Deep copy of this sink
+	 */
+	protected abstract PulsarTableSink createSink();
+
+	/**
+	 * Returns the low-level producer.
+	 */
+	protected FlinkPulsarProducer<Row> createFlinkPulsarProducer() {
+		return new FlinkPulsarProducer<Row>(
+			serviceUrl,
+			topic,
+			serializationSchema,
+			producerConf,
+			keyExtractor);
+	}
+
+	@Override
+	public void emitDataStream(DataStream<Row> dataStream) {
+		checkState(fieldNames != null, "Table sink is not configured");
+		checkState(fieldTypes != null, "Table sink is not configured");
+		checkState(serializationSchema != null, "Table sink is not configured");
+		checkState(keyExtractor != null, "Table sink is not configured");
+
+		FlinkPulsarProducer<Row> producer = createFlinkPulsarProducer();
+		dataStream.addSink(producer);
+	}
+
+	@Override
+	public TypeInformation<Row> getOutputType() {
+		return new RowTypeInfo(fieldTypes, fieldNames);
+	}
+
+	@Override
+	public String[] getFieldNames() {
+		return fieldNames;
+	}
+
+	@Override
+	public TypeInformation<?>[] getFieldTypes() {
+		return fieldTypes;
+	}
+
+	@Override
+	public TableSink<Row> configure(String[] fieldNames,
+									TypeInformation<?>[] fieldTypes) {
+
+		PulsarTableSink sink = createSink();
+
+		sink.fieldNames = checkNotNull(fieldNames, "Field names are null");
+		sink.fieldTypes = checkNotNull(fieldTypes, "Field types are null");
+		checkArgument(fieldNames.length == fieldTypes.length,
+			"Number of provided field names and types do not match");
+
+		RowTypeInfo rowSchema = new RowTypeInfo(fieldTypes, fieldNames);
+		sink.serializationSchema = createSerializationSchema(rowSchema);
+		sink.keyExtractor = new RowKeyExtractor(
+			routingKeyFieldName,
+			fieldNames,
+			fieldTypes);
+
+		return sink;
+	}
+
+	/**
+	 * A key extractor that extracts the routing key from a {@link Row} by field name.
+	 */
+	private static class RowKeyExtractor implements PulsarKeyExtractor<Row> {
+
+		private final int keyIndex;
+
+		public RowKeyExtractor(
+							String keyFieldName,
+							String[] fieldNames,
+							TypeInformation<?>[] fieldTypes) {
+			checkArgument(fieldNames.length == fieldTypes.length,
+				"Number of provided field names and types does not match.");
+			int keyIndex = Arrays.asList(fieldNames).indexOf(keyFieldName);
+			checkArgument(keyIndex >= 0,
+				"Key field '" + keyFieldName + "' not found");
+			checkArgument(Types.STRING.equals(fieldTypes[keyIndex]),
+				"Key field must be of type 'STRING'");
+			this.keyIndex = keyIndex;
+		}
+
+		@Override
+		public String getKey(Row event) {
+			return (String) event.getField(keyIndex);
+		}
+	}
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/package-info.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/package-info.java
new file mode 100644
index 00000000000..08fe38e9060
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Apache Pulsar Flink Connector.
+ */
+package org.apache.flink.streaming.connectors.pulsar;
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarKeyExtractor.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarKeyExtractor.java
new file mode 100644
index 00000000000..86e34cc4188
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarKeyExtractor.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pulsar.partitioner;
+
+/**
+ * Extract key from a value.
+ */
+public interface PulsarKeyExtractor<IN> {
+
+	PulsarKeyExtractor NULL = in -> null;
+
+	/**
+	 * Retrieve a key from the value.
+	 *
+	 * @param in the value to extract a key.
+	 * @return key.
+	 */
+	String getKey(IN in);
+
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/package-info.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/package-info.java
new file mode 100644
index 00000000000..69d602098f6
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Partitioner used by Flink Pulsar connectors.
+ */
+package org.apache.flink.streaming.connectors.pulsar.partitioner;
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/serde/JsonRowDeserializationSchema.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/serde/JsonRowDeserializationSchema.java
new file mode 100644
index 00000000000..6e9dc56d1d2
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/serde/JsonRowDeserializationSchema.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pulsar.serde;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+
+/**
+ * Deserialization schema from JSON to {@link Row}.
+ *
+ * <p>Deserializes the <code>byte[]</code> messages as a JSON object and reads
+ * the specified fields.
+ *
+ * <p>Failure during deserialization are forwarded as wrapped IOExceptions.
+ */
+public class JsonRowDeserializationSchema implements DeserializationSchema<Row> {
+
+	/** Type information describing the result type. */
+	private final TypeInformation<Row> typeInfo;
+
+	/** Field names to parse. Indices match fieldTypes indices. */
+	private final String[] fieldNames;
+
+	/** Types to parse fields as. Indices match fieldNames indices. */
+	private final TypeInformation<?>[] fieldTypes;
+
+	/** Object mapper for parsing the JSON. */
+	private final ObjectMapper objectMapper = new ObjectMapper();
+
+	/** Flag indicating whether to fail on a missing field. */
+	private boolean failOnMissingField;
+
+	/**
+	 * Creates a JSON deserialization schema for the given fields and types.
+	 *
+	 * @param typeInfo   Type information describing the result type. The field names are used
+	 *                   to parse the JSON file and so are the types.
+	 */
+	public JsonRowDeserializationSchema(TypeInformation<Row> typeInfo) {
+		Preconditions.checkNotNull(typeInfo, "Type information");
+		this.typeInfo = typeInfo;
+
+		this.fieldNames = ((RowTypeInfo) typeInfo).getFieldNames();
+		this.fieldTypes = ((RowTypeInfo) typeInfo).getFieldTypes();
+	}
+
+	@Override
+	public Row deserialize(byte[] message) throws IOException {
+		try {
+			JsonNode root = objectMapper.readTree(message);
+
+			Row row = new Row(fieldNames.length);
+			for (int i = 0; i < fieldNames.length; i++) {
+				JsonNode node = root.get(fieldNames[i]);
+
+				if (node == null) {
+					if (failOnMissingField) {
+						throw new IllegalStateException("Failed to find field with name '"
+								+ fieldNames[i] + "'.");
+					} else {
+						row.setField(i, null);
+					}
+				} else {
+					// Read the value as specified type
+					Object value = objectMapper.treeToValue(node, fieldTypes[i].getTypeClass());
+					row.setField(i, value);
+				}
+			}
+
+			return row;
+		} catch (Throwable t) {
+			throw new IOException("Failed to deserialize JSON object.", t);
+		}
+	}
+
+	@Override
+	public boolean isEndOfStream(Row nextElement) {
+		return false;
+	}
+
+	@Override
+	public TypeInformation<Row> getProducedType() {
+		return typeInfo;
+	}
+
+	/**
+	 * Configures the failure behaviour if a JSON field is missing.
+	 *
+	 * <p>By default, a missing field is ignored and the field is set to null.
+	 *
+	 * @param failOnMissingField Flag indicating whether to fail or not on a missing field.
+	 */
+	public void setFailOnMissingField(boolean failOnMissingField) {
+		this.failOnMissingField = failOnMissingField;
+	}
+
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/serde/JsonRowSerializationSchema.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/serde/JsonRowSerializationSchema.java
new file mode 100644
index 00000000000..b8e557357c0
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/serde/JsonRowSerializationSchema.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pulsar.serde;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+
+/**
+ * Serialization schema that serializes an object into a JSON bytes.
+ *
+ * <p>Serializes the input {@link Row} object into a JSON string and
+ * converts it into <code>byte[]</code>.
+ *
+ * <p>Result <code>byte[]</code> messages can be deserialized using
+ * {@link JsonRowDeserializationSchema}.
+ */
+public class JsonRowSerializationSchema implements SerializationSchema<Row> {
+	/** Fields names in the input Row object. */
+	private final String[] fieldNames;
+	/** Object mapper that is used to create output JSON objects. */
+	private static ObjectMapper mapper = new ObjectMapper();
+
+	/**
+	 * Creates a JSON serialization schema for the given fields and types.
+	 *
+	 * @param rowSchema The schema of the rows to encode.
+	 */
+	public JsonRowSerializationSchema(RowTypeInfo rowSchema) {
+
+		Preconditions.checkNotNull(rowSchema);
+		String[] fieldNames = rowSchema.getFieldNames();
+		TypeInformation[] fieldTypes = rowSchema.getFieldTypes();
+
+		// check that no field is composite
+		for (int i = 0; i < fieldTypes.length; i++) {
+			if (fieldTypes[i] instanceof CompositeType) {
+				throw new IllegalArgumentException("JsonRowSerializationSchema cannot encode rows with nested schema, " +
+					"but field '" + fieldNames[i] + "' is nested: " + fieldTypes[i].toString());
+			}
+		}
+
+		this.fieldNames = fieldNames;
+	}
+
+	@Override
+	public byte[] serialize(Row row) {
+		if (row.getArity() != fieldNames.length) {
+			throw new IllegalStateException(String.format(
+				"Number of elements in the row %s is different from number of field names: %d", row, fieldNames.length));
+		}
+
+		ObjectNode objectNode = mapper.createObjectNode();
+
+		for (int i = 0; i < row.getArity(); i++) {
+			JsonNode node = mapper.valueToTree(row.getField(i));
+			objectNode.set(fieldNames[i], node);
+		}
+
+		try {
+			return mapper.writeValueAsBytes(objectNode);
+		} catch (Exception e) {
+			throw new RuntimeException("Failed to serialize row", e);
+		}
+	}
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/serde/package-info.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/serde/package-info.java
new file mode 100644
index 00000000000..2daf9fd34c6
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/serde/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Serialization related classes used by Flink Pulsar Connector.
+ */
+package org.apache.flink.streaming.connectors.pulsar.serde;
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducerTest.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducerTest.java
new file mode 100644
index 00000000000..bda08a4bb3c
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducerTest.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pulsar;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor;
+import org.apache.flink.streaming.connectors.pulsar.serde.IntegerSerializationSchema;
+
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConfiguration;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.reflect.Whitebox;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.atMost;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.spy;
+
+/**
+ * Unit test of {@link FlinkPulsarProducer}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(PulsarClient.class)
+public class FlinkPulsarProducerTest {
+
+	private static final String MOCK_SERVICE_URL = "http://localhost:8080";
+	private static final String MOCK_TOPIC_NAME = "mock_topic";
+	private static final String ROUTING_KEY = "mock_key";
+
+	/**
+	 * Test Constructor.
+	 */
+	@Test
+	public void testConstructor() throws Exception {
+		Producer producer = mockPulsarProducer();
+		PulsarKeyExtractor<Integer> keyExtractor = new TestKeyExtractor<>();
+		FlinkPulsarProducer<Integer> sink = spySink(producer, keyExtractor);
+		assertEquals(PulsarProduceMode.AT_LEAST_ONE, sink.getProduceMode());
+		assertSame(keyExtractor, sink.getKeyExtractor());
+	}
+
+	/**
+	 * Test open producer.
+	 * @throws Exception
+	 */
+	@Test
+	public void testOpen() throws Exception {
+		Producer producer = mock(Producer.class);
+		FlinkPulsarProducer<Integer> sink = mock(FlinkPulsarProducer.class);
+		when(sink.getRuntimeContext()).thenReturn(mock(RuntimeContext.class));
+		PowerMockito.mockStatic(PulsarClient.class);
+		PulsarClient client = mock(PulsarClient.class);
+		when(PulsarClient.create(anyString())).thenReturn(client);
+		when(client.createProducer(anyString(), any(ProducerConfiguration.class))).thenReturn(producer);
+
+		sink.open(mock(Configuration.class));
+
+		PowerMockito.verifyPrivate(sink, times(1)).invoke("createProducer", any(ProducerConfiguration.class));
+	}
+
+	/**
+	 * Test invoke producer.
+	 * @throws Exception
+	 */
+	@Test
+	public void testInvoke() throws Exception {
+		SinkFunction.Context context = mock(SinkFunction.Context.class);
+		when(context.timestamp()).thenReturn(System.currentTimeMillis());
+		Producer producer = mockPulsarProducer();
+
+		PulsarKeyExtractor<Integer> keyExtractor = new TestKeyExtractor<>();
+		FlinkPulsarProducer<Integer> sink = spySink(producer, keyExtractor);
+		PowerMockito.when(
+			sink,
+			PowerMockito.method(
+				FlinkPulsarProducer.class,
+				"createProducer",
+				ProducerConfiguration.class))
+			.withArguments(any(ProducerConfiguration.class))
+			.thenReturn(producer);
+
+		CompletableFuture mockedFuture = mock(CompletableFuture.class);
+		when(producer.sendAsync(any(Message.class))).thenReturn(mockedFuture);
+		when(mockedFuture.thenApply(any(java.util.function.Function.class))).thenReturn(mockedFuture);
+
+		Whitebox.setInternalState(sink, "producer", producer);
+		sink.invoke(1, context);
+
+		verify(producer, times(1)).sendAsync(Mockito.any(Message.class));
+		verify(mockedFuture, atMost(1)).thenApply(any(java.util.function.Function.class));
+		verify(mockedFuture, atMost(1)).exceptionally(any(java.util.function.Function.class));
+	}
+
+	/**
+	 * Test close producer.
+	 * @throws Exception
+	 */
+	@Test
+	public void testClose() throws Exception {
+		Producer producer = mock(Producer.class);
+		PulsarKeyExtractor<Integer> keyExtractor = new TestKeyExtractor<>();
+
+		FlinkPulsarProducer sink = spySink(producer, keyExtractor);
+		Whitebox.setInternalState(sink, "producer", producer);
+
+		sink.close();
+
+		verify(producer, times(1)).close();
+	}
+
+	//
+	// Utilities
+	//
+
+	private Producer mockPulsarProducer() {
+		return mock(Producer.class);
+	}
+
+	private FlinkPulsarProducer<Integer> spySink(
+		Producer producer,
+		PulsarKeyExtractor<Integer> keyExtractor) throws Exception {
+		ProducerConfiguration conf = new ProducerConfiguration();
+		FlinkPulsarProducer<Integer> flinkProducer = spy(new FlinkPulsarProducer<>(
+			MOCK_SERVICE_URL,
+			MOCK_TOPIC_NAME,
+			new IntegerSerializationSchema(),
+			conf,
+			keyExtractor));
+		PowerMockito.mockStatic(PulsarClient.class);
+		PulsarClient client = mock(PulsarClient.class);
+		CompletableFuture mockedFuture = mock(CompletableFuture.class);
+		when(mockedFuture.thenApply(any(java.util.function.Function.class))).thenReturn(mockedFuture);
+		when(mockedFuture.exceptionally(any(java.util.function.Function.class))).thenReturn(mockedFuture);
+		when(producer.sendAsync(any(Message.class))).thenReturn(mock(CompletableFuture.class));
+		when(PulsarClient.create(anyString())).thenReturn(client);
+		when(client.createProducer(anyString(), any(ProducerConfiguration.class))).thenReturn(producer);
+		return flinkProducer;
+	}
+
+	/**
+	 * A key extractor used for testing.
+	 */
+	private static class TestKeyExtractor<T> implements PulsarKeyExtractor<T> {
+
+		@Override
+		public String getKey(T t) {
+			return ROUTING_KEY;
+		}
+
+	}
+
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/streaming/connectors/pulsar/PuslarTableSinkTest.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/streaming/connectors/pulsar/PuslarTableSinkTest.java
new file mode 100644
index 00000000000..334aae0b9cc
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/streaming/connectors/pulsar/PuslarTableSinkTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pulsar;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.types.Row;
+
+import org.apache.pulsar.client.api.ProducerConfiguration;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.internal.util.reflection.Whitebox;
+import org.powermock.api.mockito.PowerMockito;
+
+/**
+ * Unit test of {@link PulsarTableSink}.
+ */
+public class PuslarTableSinkTest {
+
+	private static final String MOCK_SERVICE_URL = "http://localhost:8080";
+	private static final String MOCK_TOPIC_NAME = "mock_topic";
+	private static final String MOCK_ROUTING_KEY = "mock_key";
+	private final String[] mockedFieldNames = {"mock_key", "mock_value"};
+	private final TypeInformation[] mockedTypeInformations = {
+		TypeInformation.of(String.class),
+		TypeInformation.of(String.class)
+	};
+
+	/**
+	 * Test configure PulsarTableSink.
+	 *
+	 * @throws Exception
+	 */
+	@Test
+	public void testConfigure() throws Exception {
+		PulsarTableSink sink = spySink();
+
+		TableSink<Row> configuredSink = sink.configure(mockedFieldNames, mockedTypeInformations);
+
+		Assert.assertArrayEquals(mockedFieldNames, configuredSink.getFieldNames());
+		Assert.assertArrayEquals(mockedTypeInformations, configuredSink.getFieldTypes());
+		Assert.assertNotNull(((PulsarTableSink) configuredSink).keyExtractor);
+		Assert.assertNotNull(((PulsarTableSink) configuredSink).serializationSchema);
+	}
+
+	/**
+	 * Test emit data stream.
+	 *
+	 * @throws Exception
+	 */
+	@Test
+	public void testEmitDataStream() throws Exception {
+		DataStream mockedDataStream = Mockito.mock(DataStream.class);
+
+		PulsarTableSink sink = spySink();
+
+		sink.emitDataStream(mockedDataStream);
+
+		Mockito.verify(mockedDataStream).addSink(Mockito.any(FlinkPulsarProducer.class));
+	}
+
+	private PulsarTableSink spySink() throws Exception {
+		PulsarTableSink sink = new PulsarJsonTableSink(MOCK_SERVICE_URL, MOCK_TOPIC_NAME, new ProducerConfiguration(), MOCK_ROUTING_KEY);
+		FlinkPulsarProducer producer = Mockito.mock(FlinkPulsarProducer.class);
+		PowerMockito.whenNew(
+			FlinkPulsarProducer.class
+		).withArguments(
+			Mockito.anyString(),
+			Mockito.anyString(),
+			Mockito.any(SerializationSchema.class),
+			Mockito.any(PowerMockito.class),
+			Mockito.any(PulsarKeyExtractor.class)
+		).thenReturn(producer);
+		Whitebox.setInternalState(sink, "fieldNames", mockedFieldNames);
+		Whitebox.setInternalState(sink, "fieldTypes", mockedTypeInformations);
+		Whitebox.setInternalState(sink, "serializationSchema", Mockito.mock(SerializationSchema.class));
+		Whitebox.setInternalState(sink, "keyExtractor", Mockito.mock(PulsarKeyExtractor.class));
+		return sink;
+	}
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarKeyExtractorTest.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarKeyExtractorTest.java
new file mode 100644
index 00000000000..21e31d504d8
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarKeyExtractorTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pulsar.partitioner;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertNull;
+
+/**
+ * Unit test of {@link PulsarKeyExtractor}.
+ */
+public class PulsarKeyExtractorTest {
+
+	@Test
+	public void testNullExtractor() {
+		assertNull(PulsarKeyExtractor.NULL.getKey(new Object()));
+	}
+
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/streaming/connectors/pulsar/serde/IntegerSerializationSchema.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/streaming/connectors/pulsar/serde/IntegerSerializationSchema.java
new file mode 100644
index 00000000000..73c4363e312
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/streaming/connectors/pulsar/serde/IntegerSerializationSchema.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pulsar.serde;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+
+import java.nio.ByteBuffer;
+
+/**
+ * A serialization schema on serializing integers.
+ */
+public class IntegerSerializationSchema implements SerializationSchema<Integer> {
+
+	@Override
+	public byte[] serialize(Integer integer) {
+		return ByteBuffer.allocate(4).putInt(0, integer).array();
+	}
+}
diff --git a/flink-connectors/pom.xml b/flink-connectors/pom.xml
index bb98403ff2d..01f082affc5 100644
--- a/flink-connectors/pom.xml
+++ b/flink-connectors/pom.xml
@@ -50,6 +50,7 @@ under the License.
 		<module>flink-connector-elasticsearch</module>
 		<module>flink-connector-elasticsearch2</module>
 		<module>flink-connector-elasticsearch5</module>
+		<module>flink-connector-pulsar</module>
 		<module>flink-connector-rabbitmq</module>
 		<module>flink-connector-twitter</module>
 		<module>flink-connector-nifi</module>


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services