You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/02/03 21:59:05 UTC

[pulsar-adapters] branch master updated: Removing Flink in favor of https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-pulsar (#34)

This is an automated email from the ASF dual-hosted git repository.

mmarshall pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-adapters.git


The following commit(s) were added to refs/heads/master by this push:
     new 195d125  Removing Flink in favor of https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-pulsar (#34)
195d125 is described below

commit 195d12552f9d168072a2ed830a69f5373e9655af
Author: Andrey Yegorov <86...@users.noreply.github.com>
AuthorDate: Thu Feb 3 13:59:01 2022 -0800

    Removing Flink in favor of https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-pulsar (#34)
    
    ### Motivation
    
    Removing Flink adapter in favor of https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-pulsar
    This one is based on an old version of Flink which brings in dependencies with various CVEs, since that version Flink added pulsar connector in their project.
    
    ### Modifications
    
    Removed Flink adapter, tests, examples, and dependencies.
---
 .gitignore                                         |   2 -
 README.md                                          |   2 +-
 examples/flink/pom.xml                             | 141 -----
 .../example/FlinkPulsarBatchAvroSinkExample.java   |  89 ---
 .../example/FlinkPulsarBatchCsvSinkExample.java    | 100 ----
 .../example/FlinkPulsarBatchJsonSinkExample.java   | 124 ----
 .../example/FlinkPulsarBatchSinkExample.java       | 128 ----
 .../batch/connectors/pulsar/example/README.md      | 220 -------
 .../example/PulsarConsumerSourceWordCount.java     | 128 ----
 ...lsarConsumerSourceWordCountToAvroTableSink.java | 121 ----
 ...lsarConsumerSourceWordCountToJsonTableSink.java | 134 -----
 .../streaming/connectors/pulsar/example/README.md  | 209 -------
 .../flink/src/main/resources/avro/NasaMission.avsc |  20 -
 examples/flink/src/main/resources/log4j2.xml       |  37 --
 .../FlinkPulsarBatchAvroSinkScalaExample.scala     |  88 ---
 .../FlinkPulsarBatchCsvSinkScalaExample.scala      |  94 ---
 .../FlinkPulsarBatchJsonSinkScalaExample.scala     |  96 ---
 .../example/FlinkPulsarBatchSinkScalaExample.scala | 102 ----
 examples/pom.xml                                   |   1 -
 pom.xml                                            |  45 --
 pulsar-flink/README.md                             |  27 -
 pulsar-flink/pom.xml                               | 196 ------
 .../connectors/pulsar/BasePulsarOutputFormat.java  | 133 -----
 .../connectors/pulsar/PulsarAvroOutputFormat.java  |  44 --
 .../connectors/pulsar/PulsarCsvOutputFormat.java   |  45 --
 .../connectors/pulsar/PulsarJsonOutputFormat.java  |  43 --
 .../connectors/pulsar/PulsarOutputFormat.java      |  49 --
 .../batch/connectors/pulsar/package-info.java      |  22 -
 .../serialization/AvroSerializationSchema.java     |  60 --
 .../serialization/CsvSerializationSchema.java      |  53 --
 .../serialization/JsonSerializationSchema.java     |  41 --
 .../pulsar/serialization/package-info.java         |  22 -
 .../connectors/pulsar/CachedPulsarClient.java      | 110 ----
 .../connectors/pulsar/FlinkPulsarProducer.java     | 351 -----------
 .../connectors/pulsar/PulsarAvroTableSink.java     | 197 ------
 .../connectors/pulsar/PulsarConsumerSource.java    | 200 -------
 .../connectors/pulsar/PulsarJsonTableSink.java     |  69 ---
 .../connectors/pulsar/PulsarProduceMode.java       |  37 --
 .../connectors/pulsar/PulsarSourceBase.java        |  31 -
 .../connectors/pulsar/PulsarSourceBuilder.java     | 332 -----------
 .../connectors/pulsar/PulsarTableSink.java         | 178 ------
 .../streaming/connectors/pulsar/package-info.java  |  22 -
 .../pulsar/partitioner/PulsarKeyExtractor.java     |  38 --
 .../partitioner/PulsarPropertiesExtractor.java     |  40 --
 .../pulsar/partitioner/package-info.java           |  22 -
 .../pulsar/PulsarAvroOutputFormatTest.java         | 117 ----
 .../pulsar/PulsarCsvOutputFormatTest.java          | 116 ----
 .../pulsar/PulsarJsonOutputFormatTest.java         | 116 ----
 .../connectors/pulsar/PulsarOutputFormatTest.java  | 195 ------
 .../serialization/AvroSerializationSchemaTest.java |  66 ---
 .../serialization/CsvSerializationSchemaTest.java  |  53 --
 .../serialization/JsonSerializationSchemaTest.java |  94 ---
 .../connectors/pulsar/CachedPulsarClientTest.java  | 128 ----
 .../connectors/pulsar/PulsarAvroTableSinkTest.java | 122 ----
 .../pulsar/PulsarConsumerSourceTests.java          | 659 ---------------------
 .../connectors/pulsar/PulsarJsonTableSinkTest.java | 118 ----
 .../connectors/pulsar/PulsarSourceBuilderTest.java | 237 --------
 .../src/test/resources/avro/NasaMission.avsc       |  10 -
 58 files changed, 1 insertion(+), 6273 deletions(-)

diff --git a/.gitignore b/.gitignore
index 297f31d..eecdc23 100644
--- a/.gitignore
+++ b/.gitignore
@@ -84,7 +84,5 @@ docker.debug-info
 **/website/brodocs/documents/*.md
 
 # Avro
-examples/flink/src/main/java/org/apache/flink/avro/generated
-pulsar-flink/src/test/java/org/apache/flink/avro/generated
 pulsar-client/src/test/java/org/apache/pulsar/client/avro/generated
 /build/
diff --git a/README.md b/README.md
index 00fccc8..3099f5d 100644
--- a/README.md
+++ b/README.md
@@ -23,7 +23,7 @@
 
 This repository is used for hosting all the adapters maintained and supported by Apache Pulsar PMC.
 
-
+[Apache Flink adapter](https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-pulsar) is supported and maintained by Apache Flink Community.
 
 ## Building
 
diff --git a/examples/flink/pom.xml b/examples/flink/pom.xml
deleted file mode 100644
index 0ab275d..0000000
--- a/examples/flink/pom.xml
+++ /dev/null
@@ -1,141 +0,0 @@
-<!--
-
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
-
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-  <parent>
-    <groupId>org.apache.pulsar.examples</groupId>
-    <artifactId>pulsar-adapters-examples</artifactId>
-    <version>2.8.0-SNAPSHOT</version>
-  </parent>
-
-  <groupId>org.apache.pulsar.examples</groupId>
-  <artifactId>flink</artifactId>
-  <name>Pulsar Examples :: Flink</name>
-
-  <dependencies>
-    <dependency>
-      <groupId>org.apache.flink</groupId>
-      <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.flink</groupId>
-      <artifactId>flink-table_${scala.binary.version}</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.pulsar</groupId>
-      <artifactId>pulsar-flink</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-
-  </dependencies>
-
-  <build>
-    <resources>
-      <resource>
-        <directory>src/main/resources</directory>
-        <filtering>true</filtering>
-      </resource>
-    </resources>
-
-    <plugins>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-shade-plugin</artifactId>
-        <executions>
-          <execution>
-            <id>pulsar-flink-examples</id>
-            <phase>package</phase>
-            <goals>
-              <goal>shade</goal>
-            </goals>
-            <configuration>
-              <shadeTestJar>false</shadeTestJar>
-              <shadedArtifactAttached>false</shadedArtifactAttached>
-              <createDependencyReducedPom>false</createDependencyReducedPom>
-              <transformers>
-                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
-                  <mainClass>org.apache.flink.streaming.connectors.pulsar.example.PulsarConsumerSourceWordCount</mainClass>
-                </transformer>
-              </transformers>
-              <finalName>pulsar-flink-examples</finalName>
-              <filters>
-                <filter>
-                  <artifact>*</artifact>
-                  <includes>
-                    <include>org/apache/flink/streaming/examples/kafka/**</include>
-                    <include>org/apache/flink/streaming/**</include>
-                    <include>org/apache/pulsar/**</include>
-                    <include>org/bouncycastle/**</include>
-                    <include>org/apache/flink/batch/**</include>
-                    <include>net/jpountz/**</include>
-                    <include>com/scurrilous/circe/**</include>
-                    <include>org/apache/commons/csv/**</include>
-                    <include>org/apache/flink/avro/generated/**</include>
-                    <include>org/apache/avro/**</include>
-                    <include>org/codehaus/jackson/**</include>
-                    <include>avro/shaded/com/google/common/**</include>
-                    <include>org/apache/flink/formats/avro/**</include>
-                  </includes>
-                </filter>
-              </filters>
-            </configuration>
-          </execution>
-        </executions>
-      </plugin>
-      <!-- Scala Plugin to compile Scala Files -->
-      <plugin>
-        <groupId>net.alchim31.maven</groupId>
-        <artifactId>scala-maven-plugin</artifactId>
-        <version>4.3.0</version>
-        <executions>
-          <execution>
-            <goals>
-              <goal>compile</goal>
-              <goal>add-source</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-      <!-- Generate Test class from avro schema -->
-      <plugin>
-        <groupId>org.apache.avro</groupId>
-        <artifactId>avro-maven-plugin</artifactId>
-        <version>${avro.version}</version>
-        <executions>
-          <execution>
-            <phase>generate-sources</phase>
-            <goals>
-              <goal>schema</goal>
-            </goals>
-            <configuration>
-              <testSourceDirectory>${project.basedir}/src/main/resources/avro</testSourceDirectory>
-              <testOutputDirectory>${project.basedir}/src/main/java/</testOutputDirectory>
-            </configuration>
-          </execution>
-        </executions>
-      </plugin>
-    </plugins>
-  </build>
-
-</project>
diff --git a/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java b/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java
deleted file mode 100644
index 6d077f9..0000000
--- a/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.batch.connectors.pulsar.example;
-
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.avro.generated.NasaMission;
-import org.apache.flink.batch.connectors.pulsar.PulsarAvroOutputFormat;
-import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
-
-import java.util.Arrays;
-import java.util.List;
-
-/**
- * Implements a batch program on Pulsar topic by writing Flink DataSet as Avro.
- */
-public class FlinkPulsarBatchAvroSinkExample {
-
-    private static final List<NasaMission> nasaMissions = Arrays.asList(
-            NasaMission.newBuilder().setId(1).setName("Mercury program").setStartYear(1959).setEndYear(1963).build(),
-            NasaMission.newBuilder().setId(2).setName("Apollo program").setStartYear(1961).setEndYear(1972).build(),
-            NasaMission.newBuilder().setId(3).setName("Gemini program").setStartYear(1963).setEndYear(1966).build(),
-            NasaMission.newBuilder().setId(4).setName("Skylab").setStartYear(1973).setEndYear(1974).build(),
-            NasaMission.newBuilder().setId(5).setName("Apollo–Soyuz Test Project").setStartYear(1975).setEndYear(1975).build());
-
-    public static void main(String[] args) throws Exception {
-
-        // parse input arguments
-        final ParameterTool parameterTool = ParameterTool.fromArgs(args);
-
-        if (parameterTool.getNumberOfParameters() < 2) {
-            System.out.println("Missing parameters!");
-            System.out.println("Usage: pulsar --service-url <pulsar-service-url> --topic <topic>");
-            return;
-        }
-
-        // set up the execution environment
-        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-        env.getConfig().setGlobalJobParameters(parameterTool);
-
-        String serviceUrl = parameterTool.getRequired("service-url");
-        String topic = parameterTool.getRequired("topic");
-
-        System.out.println("Parameters:");
-        System.out.println("\tServiceUrl:\t" + serviceUrl);
-        System.out.println("\tTopic:\t" + topic);
-
-        // create PulsarAvroOutputFormat instance
-        final OutputFormat<NasaMission> pulsarAvroOutputFormat = new PulsarAvroOutputFormat<>(serviceUrl, topic, new AuthenticationDisabled());
-
-        // create DataSet
-        DataSet<NasaMission> nasaMissionDS = env.fromCollection(nasaMissions);
-        // map nasa mission names to upper-case
-        nasaMissionDS.map(nasaMission -> new NasaMission(
-                nasaMission.getId(),
-                nasaMission.getName(),
-                nasaMission.getStartYear(),
-                nasaMission.getEndYear()))
-                // filter missions which started after 1970
-                .filter(nasaMission -> nasaMission.getStartYear() > 1970)
-                // write batch data to Pulsar
-                .output(pulsarAvroOutputFormat);
-
-        // set parallelism to write Pulsar in parallel (optional)
-        env.setParallelism(2);
-
-        // execute program
-        env.execute("Flink - Pulsar Batch Avro");
-    }
-  
-}
diff --git a/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java b/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java
deleted file mode 100644
index 4abb0a4..0000000
--- a/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.batch.connectors.pulsar.example;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.batch.connectors.pulsar.PulsarCsvOutputFormat;
-import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
-
-import java.util.Arrays;
-import java.util.List;
-
-/**
- * Implements a batch program on Pulsar topic by writing Flink DataSet as Csv.
- */
-public class FlinkPulsarBatchCsvSinkExample {
-
-    private static final List<Tuple4<Integer, String, Integer, Integer>> nasaMissions = Arrays.asList(
-            new Tuple4(1, "Mercury program", 1959, 1963),
-            new Tuple4(2, "Apollo program", 1961, 1972),
-            new Tuple4(3, "Gemini program", 1963, 1966),
-            new Tuple4(4, "Skylab", 1973, 1974),
-            new Tuple4(5, "Apollo–Soyuz Test Project", 1975, 1975));
-
-    public static void main(String[] args) throws Exception {
-
-        // parse input arguments
-        final ParameterTool parameterTool = ParameterTool.fromArgs(args);
-
-        if (parameterTool.getNumberOfParameters() < 2) {
-            System.out.println("Missing parameters!");
-            System.out.println("Usage: pulsar --service-url <pulsar-service-url> --topic <topic>");
-            return;
-        }
-
-        // set up the execution environment
-        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-        env.getConfig().setGlobalJobParameters(parameterTool);
-
-        String serviceUrl = parameterTool.getRequired("service-url");
-        String topic = parameterTool.getRequired("topic");
-
-        System.out.println("Parameters:");
-        System.out.println("\tServiceUrl:\t" + serviceUrl);
-        System.out.println("\tTopic:\t" + topic);
-
-        // create PulsarCsvOutputFormat instance
-        final OutputFormat<Tuple4<Integer, String, Integer, Integer>> pulsarCsvOutputFormat =
-                new PulsarCsvOutputFormat<>(serviceUrl, topic, new AuthenticationDisabled());
-
-        // create DataSet
-        DataSet<Tuple4<Integer, String, Integer, Integer>> nasaMissionDS = env.fromCollection(nasaMissions);
-        // map nasa mission names to upper-case
-        nasaMissionDS.map(
-            new MapFunction<Tuple4<Integer, String, Integer, Integer>, Tuple4<Integer, String, Integer, Integer>>() {
-                           @Override
-                           public Tuple4<Integer, String, Integer, Integer> map(
-                                   Tuple4<Integer, String, Integer, Integer> nasaMission) throws Exception {
-                               return new Tuple4(
-                                       nasaMission.f0,
-                                       nasaMission.f1.toUpperCase(),
-                                       nasaMission.f2,
-                                       nasaMission.f3);
-                           }
-                       }
-        )
-        // filter missions which started after 1970
-        .filter(nasaMission -> nasaMission.f2 > 1970)
-        // write batch data to Pulsar
-        .output(pulsarCsvOutputFormat);
-
-        // set parallelism to write Pulsar in parallel (optional)
-        env.setParallelism(2);
-
-        // execute program
-        env.execute("Flink - Pulsar Batch Csv");
-
-    }
-
-}
\ No newline at end of file
diff --git a/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkExample.java b/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkExample.java
deleted file mode 100644
index dc56364..0000000
--- a/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkExample.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.batch.connectors.pulsar.example;
-
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.batch.connectors.pulsar.PulsarJsonOutputFormat;
-import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
-
-import java.util.Arrays;
-import java.util.List;
-
-/**
- * Implements a batch program on Pulsar topic by writing Flink DataSet as Json.
- */
-public class FlinkPulsarBatchJsonSinkExample {
-
-    private static final List<NasaMission> nasaMissions = Arrays.asList(
-            new NasaMission(1, "Mercury program", 1959, 1963),
-            new NasaMission(2, "Apollo program", 1961, 1972),
-            new NasaMission(3, "Gemini program", 1963, 1966),
-            new NasaMission(4, "Skylab", 1973, 1974),
-            new NasaMission(5, "Apollo–Soyuz Test Project", 1975, 1975));
-
-    public static void main(String[] args) throws Exception {
-
-        // parse input arguments
-        final ParameterTool parameterTool = ParameterTool.fromArgs(args);
-
-        if (parameterTool.getNumberOfParameters() < 2) {
-            System.out.println("Missing parameters!");
-            System.out.println("Usage: pulsar --service-url <pulsar-service-url> --topic <topic>");
-            return;
-        }
-
-        // set up the execution environment
-        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-        env.getConfig().setGlobalJobParameters(parameterTool);
-
-        String serviceUrl = parameterTool.getRequired("service-url");
-        String topic = parameterTool.getRequired("topic");
-
-        System.out.println("Parameters:");
-        System.out.println("\tServiceUrl:\t" + serviceUrl);
-        System.out.println("\tTopic:\t" + topic);
-
-        // create PulsarJsonOutputFormat instance
-        final OutputFormat<NasaMission> pulsarJsonOutputFormat = new PulsarJsonOutputFormat<>(serviceUrl, topic, new AuthenticationDisabled());
-
-        // create DataSet
-        DataSet<NasaMission> nasaMissionDS = env.fromCollection(nasaMissions);
-        // map nasa mission names to upper-case
-        nasaMissionDS.map(nasaMission -> new NasaMission(
-                nasaMission.id,
-                nasaMission.missionName.toUpperCase(),
-                nasaMission.startYear,
-                nasaMission.endYear))
-        // filter missions which started after 1970
-        .filter(nasaMission -> nasaMission.startYear > 1970)
-        // write batch data to Pulsar
-        .output(pulsarJsonOutputFormat);
-
-        // set parallelism to write Pulsar in parallel (optional)
-        env.setParallelism(2);
-
-        // execute program
-        env.execute("Flink - Pulsar Batch Json");
-    }
-
-    /**
-     * NasaMission data model
-     *
-     * Note: Properties should be public or have getter functions to be visible
-     */
-    private static class NasaMission {
-
-        private int id;
-        private String missionName;
-        private int startYear;
-        private int endYear;
-
-        public NasaMission(int id, String missionName, int startYear, int endYear) {
-            this.id = id;
-            this.missionName = missionName;
-            this.startYear = startYear;
-            this.endYear = endYear;
-        }
-
-        public int getId() {
-            return id;
-        }
-
-        public String getMissionName() {
-            return missionName;
-        }
-
-        public int getStartYear() {
-            return startYear;
-        }
-
-        public int getEndYear() {
-            return endYear;
-        }
-    }
-
-}
\ No newline at end of file
diff --git a/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java b/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java
deleted file mode 100644
index 2c89579..0000000
--- a/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.batch.connectors.pulsar.example;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.batch.connectors.pulsar.PulsarOutputFormat;
-import org.apache.flink.util.Collector;
-import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
-
-/**
- * Implements a batch word-count program on Pulsar topic by writing Flink DataSet.
- */
-public class FlinkPulsarBatchSinkExample {
-
-    private static final String EINSTEIN_QUOTE = "Imagination is more important than knowledge. " +
-            "Knowledge is limited. Imagination encircles the world.";
-
-    public static void main(String[] args) throws Exception {
-
-        // parse input arguments
-        final ParameterTool parameterTool = ParameterTool.fromArgs(args);
-
-        if (parameterTool.getNumberOfParameters() < 2) {
-            System.out.println("Missing parameters!");
-            System.out.println("Usage: pulsar --service-url <pulsar-service-url> --topic <topic>");
-            return;
-        }
-
-        // set up the execution environment
-        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-        env.getConfig().setGlobalJobParameters(parameterTool);
-
-        String serviceUrl = parameterTool.getRequired("service-url");
-        String topic = parameterTool.getRequired("topic");
-
-        System.out.println("Parameters:");
-        System.out.println("\tServiceUrl:\t" + serviceUrl);
-        System.out.println("\tTopic:\t" + topic);
-
-        // create PulsarOutputFormat instance
-        final OutputFormat pulsarOutputFormat =
-                new PulsarOutputFormat(serviceUrl, topic, new AuthenticationDisabled(), wordWithCount -> wordWithCount.toString().getBytes());
-
-        // create DataSet
-        DataSet<String> textDS = env.fromElements(EINSTEIN_QUOTE);
-
-        // convert sentences to words
-        textDS.flatMap(new FlatMapFunction<String, WordWithCount>() {
-            @Override
-            public void flatMap(String value, Collector<WordWithCount> out) throws Exception {
-                String[] words = value.toLowerCase().split(" ");
-                for(String word: words) {
-                    out.collect(new WordWithCount(word.replace(".", ""), 1));
-                }
-            }
-        })
-
-        // filter words which length is bigger than 4
-        .filter(wordWithCount -> wordWithCount.word.length() > 4)
-
-        // group the words
-        .groupBy(new KeySelector<WordWithCount, String>() {
-            @Override
-            public String getKey(WordWithCount wordWithCount) throws Exception {
-                return wordWithCount.word;
-            }
-        })
-
-        // sum the word counts
-        .reduce(new ReduceFunction<WordWithCount>() {
-            @Override
-            public WordWithCount reduce(WordWithCount wordWithCount1, WordWithCount wordWithCount2) throws Exception {
-                return  new WordWithCount(wordWithCount1.word, wordWithCount1.count + wordWithCount2.count);
-            }
-        })
-
-        // write batch data to Pulsar
-        .output(pulsarOutputFormat);
-
-        // set parallelism to write Pulsar in parallel (optional)
-        env.setParallelism(2);
-
-        // execute program
-        env.execute("Flink - Pulsar Batch WordCount");
-
-    }
-
-    /**
-     * Data type for words with count.
-     */
-    private static class WordWithCount {
-
-        public String word;
-        public long count;
-
-        public WordWithCount(String word, long count) {
-            this.word = word;
-            this.count = count;
-        }
-
-        @Override
-        public String toString() {
-            return "WordWithCount { word = " + word + ", count = " + count + " }";
-        }
-    }
-}
\ No newline at end of file
diff --git a/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/README.md b/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/README.md
deleted file mode 100644
index 29adc34..0000000
--- a/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/README.md
+++ /dev/null
@@ -1,220 +0,0 @@
-<!--
-
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
-
--->
-
-The Flink Batch Sink for Pulsar is a custom sink that enables Apache [Flink](https://flink.apache.org/) to write [DataSet](https://ci.apache.org/projects/flink/flink-docs-stable/dev/batch/index.html) to Pulsar.
-
-# Prerequisites
-
-To use this sink, include a dependency for the `pulsar-flink` library in your Java configuration.
-
-# Maven
-
-If you're using Maven, add this to your `pom.xml`:
-
-```xml
-<!-- in your <properties> block -->
-<pulsar.version>{{pulsar:version}}</pulsar.version>
-
-<!-- in your <dependencies> block -->
-<dependency>
-  <groupId>org.apache.pulsar</groupId>
-  <artifactId>pulsar-flink</artifactId>
-  <version>${pulsar.version}</version>
-</dependency>
-```
-
-# Gradle
-
-If you're using Gradle, add this to your `build.gradle` file:
-
-```groovy
-def pulsarVersion = "{{pulsar:version}}"
-
-dependencies {
-    compile group: 'org.apache.pulsar', name: 'pulsar-flink', version: pulsarVersion
-}
-```
-
-# Example
-
-### PulsarOutputFormat
-
-In this example, Flink DataSet is processed as word-count and being written to Pulsar. Please find a complete example for PulsarOutputFormat as follows:
-[java](https://github.com/apache/pulsar/tree/master/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java)
-[scala](https://github.com/apache/pulsar/tree/master/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkScalaExample.scala)
-
-The steps to run the example:
-
-1. Start Pulsar Standalone.
-
-    You can follow the [instructions](https://pulsar.apache.org/docs/en/standalone/) to start a Pulsar standalone locally.
-
-    ```shell
-    $ bin/pulsar standalone
-    ```
-
-2. Start Flink locally.
-
-    You can follow the [instructions](https://ci.apache.org/projects/flink/flink-docs-release-1.6/quickstart/setup_quickstart.html) to download and start Flink.
-
-    ```shell
-    $ ./bin/start-cluster.sh
-    ```
-
-3. Build the examples.
-
-    ```shell
-    $ cd ${PULSAR_HOME}
-    $ mvn clean install -DskipTests
-    ```
-
-4. Run the word count example to print results to stdout.
-
-    ```shell
-    # java
-    $ ./bin/flink run -c org.apache.flink.batch.connectors.pulsar.example.FlinkPulsarBatchSinkExample ${PULSAR_HOME}/examples/flink/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --topic test_flink_topic
-
-    # scala
-    $ ./bin/flink run -c org.apache.flink.batch.connectors.pulsar.example.FlinkPulsarBatchSinkScalaExample ${PULSAR_HOME}/examples/flink/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --topic test_flink_topic
-    ```
-
-5. Once the flink word count example is running, you can use `bin/pulsar-client` to tail the results produced into topic `test_flink_topic`.
-
-```shell
-$ bin/pulsar-client consume -n 0 -s test test_flink_topic
-```
-
-6. Please find sample output for above linked application as follows:
-```
-WordWithCount { word = important, count = 1 }
-WordWithCount { word = encircles, count = 1 }
-WordWithCount { word = imagination, count = 2 }
-WordWithCount { word = knowledge, count = 2 }
-WordWithCount { word = limited, count = 1 }
-WordWithCount { word = world, count = 1 }
-```
-
-
-### PulsarCsvOutputFormat
-
-In this example, Flink DataSet is processed and written to Pulsar in Csv format. Please find a complete example for PulsarCsvOutputFormat as follows:
-[java](https://github.com/apache/pulsar/tree/master/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java)
-[scala](https://github.com/apache/pulsar/tree/master/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkScalaExample.scala)
-
-The steps to run the example:
-
-Step 1, 2 and 3 are same as above.
-
-4. Run the word count example to print results to stdout.
-
-    ```shell
-    # java
-    $ ./bin/flink run -c org.apache.flink.batch.connectors.pulsar.example.FlinkPulsarBatchCsvSinkExample ${PULSAR_HOME}/examples/flink/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --topic test_flink_topic
-
-    # scala
-    $ ./bin/flink run -c org.apache.flink.batch.connectors.pulsar.example.FlinkPulsarBatchCsvSinkScalaExample ${PULSAR_HOME}/examples/flink/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --topic test_flink_topic
-    ```
-
-5. Once the flink word count example is running, you can use `bin/pulsar-client` to tail the results produced into topic `test_flink_topic`.
-
-```shell
-$ bin/pulsar-client consume -n 0 -s test test_flink_topic
-```
-
-6. Please find sample output for above linked application as follows:
-```
-4,SKYLAB,1973,1974
-5,APOLLO–SOYUZ TEST PROJECT,1975,1975
-```
-
-
-### PulsarJsonOutputFormat
-
-In this example, Flink DataSet is processed and written to Pulsar in Json format. Please find a complete example for PulsarJsonOutputFormat as follows:
-[java](https://github.com/apache/pulsar/tree/master/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkExample.java)
-[scala](https://github.com/apache/pulsar/tree/master/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkScalaExample.scala)
-
-**Note:** Property definitions of the model should be public or have getter functions to be visible.
-
-The steps to run the example:
-
-Step 1, 2 and 3 are same as above.
-
-4. Run the word count example to print results to stdout.
-
-    ```shell
-    # java
-    $ ./bin/flink run -c org.apache.flink.batch.connectors.pulsar.example.FlinkPulsarBatchJsonSinkExample ${PULSAR_HOME}/examples/flink/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --topic test_flink_topic
-
-    # scala
-    $ ./bin/flink run -c org.apache.flink.batch.connectors.pulsar.example.FlinkPulsarBatchJsonSinkScalaExample ${PULSAR_HOME}/examples/flink/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --topic test_flink_topic
-    ```
-
-5. Once the flink word count example is running, you can use `bin/pulsar-client` to tail the results produced into topic `test_flink_topic`.
-
-```shell
-$ bin/pulsar-client consume -n 0 -s test test_flink_topic
-```
-
-6. Please find sample output for above linked application as follows:
-```
-{"id":4,"missionName":"SKYLAB","startYear":1973,"endYear":1974}
-{"id":5,"missionName":"APOLLO–SOYUZ TEST PROJECT","startYear":1975,"endYear":1975}
-```
-
-
-### PulsarAvroOutputFormat
-
-In this example, Flink DataSet is processed and written to Pulsar in Json format. Please find a complete example for PulsarAvroOutputFormat as follows:
-[java](https://github.com/apache/pulsar/tree/master/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java)
-[scala](https://github.com/apache/pulsar/tree/master/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkScalaExample.scala)
-
-**Note:** NasaMission class are automatically generated by Avro.
-
-The steps to run the example:
-
-Step 1, 2 and 3 are same as above.
-
-4. Run the word count example to print results to stdout.
-
-    ```shell
-    # java
-    $ ./bin/flink run -c org.apache.flink.batch.connectors.pulsar.example.FlinkPulsarBatchAvroSinkExample ${PULSAR_HOME}/examples/flink/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --topic test_flink_topic
-
-    # scala
-    $ ./bin/flink run -c org.apache.flink.batch.connectors.pulsar.example.FlinkPulsarBatchAvroSinkScalaExample ${PULSAR_HOME}/examples/flink/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --topic test_flink_topic
-    ```
-
-5. Once the flink word count example is running, you can use `bin/pulsar-client` to tail the results produced into topic `test_flink_topic`.
-
-```shell
-$ bin/pulsar-client consume -n 0 -s test test_flink_topic
-```
-
-6. Please find sample output for above linked application as follows:
-```
- ----- got message -----
- 
- Skylab��
- ----- got message -----
- 
- 6Apollo–Soyuz Test Project��
-```
\ No newline at end of file
diff --git a/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCount.java b/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCount.java
deleted file mode 100644
index 2240a4d..0000000
--- a/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCount.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.streaming.connectors.pulsar.example;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-
-import lombok.AllArgsConstructor;
-import lombok.NoArgsConstructor;
-import lombok.ToString;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.api.common.serialization.SimpleStringSchema;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarProducer;
-import org.apache.flink.streaming.connectors.pulsar.PulsarSourceBuilder;
-import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarPropertiesExtractor;
-import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
-
-/**
- * Implements a streaming wordcount program on pulsar topics.
- *
- * <p>Example usage:
- *   --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub
- */
-public class PulsarConsumerSourceWordCount {
-
-    public static void main(String[] args) throws Exception {
-        // parse input arguments
-        final ParameterTool parameterTool = ParameterTool.fromArgs(args);
-
-        if (parameterTool.getNumberOfParameters() < 2) {
-            System.out.println("Missing parameters!");
-            System.out.println("Usage: pulsar --service-url <pulsar-service-url> --input-topic <topic> --subscription <sub> --output-topic <topic>");
-            return;
-        }
-
-        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-        env.getConfig().disableSysoutLogging();
-        env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
-        env.enableCheckpointing(5000);
-        env.getConfig().setGlobalJobParameters(parameterTool);
-        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
-
-        String serviceUrl = parameterTool.getRequired("service-url");
-        String inputTopic = parameterTool.getRequired("input-topic");
-        String subscription = parameterTool.get("subscription", "flink-examples");
-        String outputTopic = parameterTool.get("output-topic", null);
-        int parallelism = parameterTool.getInt("parallelism", 1);
-
-        System.out.println("Parameters:");
-        System.out.println("\tServiceUrl:\t" + serviceUrl);
-        System.out.println("\tInputTopic:\t" + inputTopic);
-        System.out.println("\tSubscription:\t" + subscription);
-        System.out.println("\tOutputTopic:\t" + outputTopic);
-        System.out.println("\tParallelism:\t" + parallelism);
-
-        PulsarSourceBuilder<String> builder = PulsarSourceBuilder.builder(new SimpleStringSchema())
-            .serviceUrl(serviceUrl)
-            .topic(inputTopic)
-            .subscriptionName(subscription);
-        SourceFunction<String> src = builder.build();
-        DataStream<String> input = env.addSource(src);
-
-        DataStream<WordWithCount> wc = input
-            .flatMap((FlatMapFunction<String, WordWithCount>) (line, collector) -> {
-                for (String word : line.split("\\s")) {
-                    collector.collect(new WordWithCount(word, 1));
-                }
-            })
-            .returns(WordWithCount.class)
-            .keyBy("word")
-            .timeWindow(Time.seconds(5))
-            .reduce((ReduceFunction<WordWithCount>) (c1, c2) ->
-                new WordWithCount(c1.word, c1.count + c2.count));
-
-        if (null != outputTopic) {
-            wc.addSink(new FlinkPulsarProducer<>(
-                serviceUrl,
-                outputTopic,
-                new AuthenticationDisabled(),
-                wordWithCount -> wordWithCount.toString().getBytes(UTF_8),
-                wordWithCount -> wordWithCount.word,
-                null
-            )).setParallelism(parallelism);
-        } else {
-            // print the results with a single thread, rather than in parallel
-            wc.print().setParallelism(1);
-        }
-
-        env.execute("Pulsar Stream WordCount");
-    }
-
-    /**
-     * Data type for words with count.
-     */
-    @AllArgsConstructor
-    @NoArgsConstructor
-    @ToString
-    public static class WordWithCount {
-
-        public String word;
-        public long count;
-
-    }
-
-}
diff --git a/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToAvroTableSink.java b/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToAvroTableSink.java
deleted file mode 100644
index 84e85e8..0000000
--- a/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToAvroTableSink.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.streaming.connectors.pulsar.example;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.api.common.serialization.SimpleStringSchema;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.avro.generated.WordWithCount;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.streaming.connectors.pulsar.PulsarAvroTableSink;
-import org.apache.flink.streaming.connectors.pulsar.PulsarSourceBuilder;
-import org.apache.flink.table.api.Table;
-import org.apache.flink.table.api.java.StreamTableEnvironment;
-import org.apache.flink.table.sinks.CsvTableSink;
-import org.apache.flink.table.sinks.TableSink;
-import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
-
-/**
- * Implements a streaming wordcount program on pulsar topics.
- *
- * <p>Example usage:
- *   --service-url pulsar://localhost:6650 --input-topic test_topic --subscription test_sub
- * or
- *   --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub --output-topic test_sub
- */
-public class PulsarConsumerSourceWordCountToAvroTableSink {
-    private static final String ROUTING_KEY = "word";
-
-    public static void main(String[] args) throws Exception {
-        // parse input arguments
-        final ParameterTool parameterTool = ParameterTool.fromArgs(args);
-
-        if (parameterTool.getNumberOfParameters() < 3) {
-            System.out.println("Missing parameters!");
-            System.out.println("Usage: pulsar --service-url <pulsar-service-url> --input-topic <topic> --subscription <sub> --output-topic <topic>");
-            return;
-        }
-
-        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-        env.getConfig().disableSysoutLogging();
-        env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
-        env.enableCheckpointing(5000);
-        env.getConfig().setGlobalJobParameters(parameterTool);
-        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
-
-        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.getTableEnvironment(env);
-
-        String serviceUrl = parameterTool.getRequired("service-url");
-        String inputTopic = parameterTool.getRequired("input-topic");
-        String subscription = parameterTool.get("subscription", "flink-examples");
-        String outputTopic = parameterTool.get("output-topic", null);
-        int parallelism = parameterTool.getInt("parallelism", 1);
-
-        System.out.println("Parameters:");
-        System.out.println("\tServiceUrl:\t" + serviceUrl);
-        System.out.println("\tInputTopic:\t" + inputTopic);
-        System.out.println("\tSubscription:\t" + subscription);
-        System.out.println("\tOutputTopic:\t" + outputTopic);
-        System.out.println("\tParallelism:\t" + parallelism);
-
-        PulsarSourceBuilder<String> builder = PulsarSourceBuilder.builder(new SimpleStringSchema())
-                .serviceUrl(serviceUrl)
-                .topic(inputTopic)
-                .subscriptionName(subscription);
-        SourceFunction<String> src = builder.build();
-        DataStream<String> input = env.addSource(src);
-
-
-        DataStream<WordWithCount> wc = input
-                .flatMap((FlatMapFunction<String, WordWithCount>) (line, collector) -> {
-                    for (String word : line.split("\\s")) {
-                        collector.collect(
-                                WordWithCount.newBuilder().setWord(word).setCount(1).build()
-                        );
-                    }
-                })
-                .returns(WordWithCount.class)
-                .keyBy(ROUTING_KEY)
-                .timeWindow(Time.seconds(5))
-                .reduce((ReduceFunction<WordWithCount>) (c1, c2) ->
-                        WordWithCount.newBuilder().setWord(c1.getWord()).setCount(c1.getCount() + c2.getCount()).build()
-                );
-
-        tableEnvironment.registerDataStream("wc",wc);
-        Table table = tableEnvironment.sqlQuery("select word, `count` from wc");
-        table.printSchema();
-        TableSink sink = null;
-        if (null != outputTopic) {
-            sink = new PulsarAvroTableSink(serviceUrl, outputTopic, new AuthenticationDisabled(), ROUTING_KEY, WordWithCount.class);
-        } else {
-            // print the results with a csv file
-            sink = new CsvTableSink("./examples/file",  "|");
-        }
-        table.writeToSink(sink);
-
-        env.execute("Pulsar Stream WordCount");
-    }
-
-}
diff --git a/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToJsonTableSink.java b/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToJsonTableSink.java
deleted file mode 100644
index a4f9c3c..0000000
--- a/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToJsonTableSink.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.streaming.connectors.pulsar.example;
-
-import lombok.AllArgsConstructor;
-import lombok.NoArgsConstructor;
-import lombok.ToString;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.api.common.serialization.SimpleStringSchema;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.streaming.connectors.pulsar.PulsarJsonTableSink;
-import org.apache.flink.streaming.connectors.pulsar.PulsarSourceBuilder;
-import org.apache.flink.table.api.Table;
-import org.apache.flink.table.api.java.StreamTableEnvironment;
-import org.apache.flink.table.sinks.CsvTableSink;
-import org.apache.flink.table.sinks.TableSink;
-import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
-
-/**
- * Implements a streaming wordcount program on pulsar topics.
- *
- * <p>Example usage:
- *   --service-url pulsar://localhost:6650 --input-topic test_topic --subscription test_sub
- * or
- *   --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub --output-topic test_sub
- */
-public class PulsarConsumerSourceWordCountToJsonTableSink {
-    private static final String ROUTING_KEY = "word";
-
-    public static void main(String[] args) throws Exception {
-        // parse input arguments
-        final ParameterTool parameterTool = ParameterTool.fromArgs(args);
-
-        if (parameterTool.getNumberOfParameters() < 3) {
-            System.out.println("Missing parameters!");
-            System.out.println("Usage: pulsar --service-url <pulsar-service-url> --input-topic <topic> --subscription <sub> --output-topic <topic>");
-            return;
-        }
-
-        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-        env.getConfig().disableSysoutLogging();
-        env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
-        env.enableCheckpointing(5000);
-        env.getConfig().setGlobalJobParameters(parameterTool);
-        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
-
-        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.getTableEnvironment(env);
-
-        String serviceUrl = parameterTool.getRequired("service-url");
-        String inputTopic = parameterTool.getRequired("input-topic");
-        String subscription = parameterTool.get("subscription", "flink-examples");
-        String outputTopic = parameterTool.get("output-topic", null);
-        int parallelism = parameterTool.getInt("parallelism", 1);
-
-        System.out.println("Parameters:");
-        System.out.println("\tServiceUrl:\t" + serviceUrl);
-        System.out.println("\tInputTopic:\t" + inputTopic);
-        System.out.println("\tSubscription:\t" + subscription);
-        System.out.println("\tOutputTopic:\t" + outputTopic);
-        System.out.println("\tParallelism:\t" + parallelism);
-
-        PulsarSourceBuilder<String> builder = PulsarSourceBuilder.builder(new SimpleStringSchema())
-                .serviceUrl(serviceUrl)
-                .topic(inputTopic)
-                .subscriptionName(subscription);
-        SourceFunction<String> src = builder.build();
-        DataStream<String> input = env.addSource(src);
-
-
-        DataStream<WordWithCount> wc = input
-                .flatMap((FlatMapFunction<String, WordWithCount>) (line, collector) -> {
-                    for (String word : line.split("\\s")) {
-                        collector.collect(
-                            new WordWithCount(word, 1)
-                        );
-                    }
-                })
-                .returns(WordWithCount.class)
-                .keyBy(ROUTING_KEY)
-                .timeWindow(Time.seconds(5))
-                .reduce((ReduceFunction<WordWithCount>) (c1, c2) ->
-                        new WordWithCount(c1.word, c1.count + c2.count));
-
-        tableEnvironment.registerDataStream("wc",wc);
-        Table table = tableEnvironment.sqlQuery("select word, `count` from wc");
-        table.printSchema();
-        TableSink sink = null;
-        if (null != outputTopic) {
-            sink = new PulsarJsonTableSink(serviceUrl, outputTopic, new AuthenticationDisabled(), ROUTING_KEY);
-        } else {
-            // print the results with a csv file
-            sink = new CsvTableSink("./examples/file",  "|");
-        }
-        table.writeToSink(sink);
-
-        env.execute("Pulsar Stream WordCount");
-    }
-
-    /**
-     * Data type for words with count.
-     */
-    @AllArgsConstructor
-    @NoArgsConstructor
-    @ToString
-    public static class WordWithCount {
-
-        public String word;
-        public long count;
-
-    }
-}
diff --git a/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/README.md b/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/README.md
deleted file mode 100644
index ac36eb5..0000000
--- a/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/README.md
+++ /dev/null
@@ -1,209 +0,0 @@
-<!--
-
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
-
--->
-
-## Apache Flink Connectors for Pulsar
-
-This page describes how to use the connectors to read and write Pulsar topics with [Apache Flink](https://flink.apache.org/) stream processing applications.
-
-Build end-to-end stream processing pipelines that use Pulsar as the stream storage and message bus, and Apache Flink for computation over the streams.
-See the [Pulsar Concepts](https://pulsar.apache.org/docs/en/concepts-overview/) page for more information.
-
-## Example
-
-### PulsarConsumerSourceWordCount
-
-This Flink streaming job is consuming from a Pulsar topic and counting the wordcount in a streaming fashion. The job can write the word count results
-to stdout or another Pulsar topic.
-
-The steps to run the example:
-
-1. Start Pulsar Standalone.
-
-    You can follow the [instructions](https://pulsar.apache.org/docs/en/standalone/) to start a Pulsar standalone locally.
-
-    ```shell
-    $ bin/pulsar standalone
-    ```
-
-2. Start Flink locally.
-
-    You can follow the [instructions](https://ci.apache.org/projects/flink/flink-docs-release-1.6/quickstart/setup_quickstart.html) to download and start Flink.
-
-    ```shell
-    $ ./bin/start-cluster.sh
-    ```
-
-3. Build the examples.
-
-    ```shell
-    $ cd ${PULSAR_HOME}
-    $ mvn clean install -DskipTests
-    ```
-
-4. Run the word count example to print results to stdout.
-
-    ```shell
-    $ ./bin/flink run -c org.apache.flink.streaming.connectors.pulsar.example.PulsarConsumerSourceWordCount ${PULSAR_HOME}/examples/flink/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub
-    ```
-
-5. Produce messages to topic `test_src`.
-
-    ```shell
-    $ bin/pulsar-client produce -m "hello world test again" -n 100 test_src
-    ```
-
-6. You can check the flink taskexecutor `.out` file. The `.out` file will print the counts at the end of each time window as long as words are floating in, e.g.:
-
-    ```shell
-    PulsarConsumerSourceWordCount.WordWithCount(word=hello, count=100)
-    PulsarConsumerSourceWordCount.WordWithCount(word=again, count=100)
-    PulsarConsumerSourceWordCount.WordWithCount(word=test, count=100)
-    PulsarConsumerSourceWordCount.WordWithCount(word=world, count=100)  
-    ```
-
-Alternatively, when you run the flink word count example at step 4, you can choose dump the result to another pulsar topic.
-
-```shell
-$ ./bin/flink run -c org.apache.flink.streaming.connectors.pulsar.example.PulsarConsumerSourceWordCount ${PULSAR_HOME}/examples/flink/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub --output-topic test_dest
-```
-
-Once the flink word count example is running, you can use `bin/pulsar-client` to tail the results produced into topic `test_dest`.
-
-```shell
-$ bin/pulsar-client consume -n 0 -s test test_dest
-```
-
-You will see similar results as what you see at step 6 when running the word count example to print results to stdout.
-
-
-### PulsarConsumerSourceWordCountToAvroTableSink
-
-This Flink streaming job is consuming from a Pulsar topic and counting the wordcount in a streaming fashion. The job can write the word count results
-to csv file or another Pulsar topic for avro format.
-
-The steps to run the example:
-
-Step 1, 2 and 3 are same as above.
-
-4. Run the word count example to print results to stdout.
-
-    ```shell
-    $ ./bin/flink run -c org.apache.flink.streaming.connectors.pulsar.example.PulsarConsumerSourceWordCountToAvroTableSink ${PULSAR_HOME}/examples/flink/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub
-    ```
-
-5. Produce messages to topic `test_src`.
-
-    ```shell
-    $ bin/pulsar-client produce -m "hello world again" -n 100 test_src
-    ```
-
-6. You can check the ${FLINK_HOME}/examples/file. The file contains the counts at the end of each time window as long as words are floating in, e.g.:
-
-    ```file
-    hello|100
-    again|100
-    test|100
-    world|100
-    ```
-
-Alternatively, when you run the flink word count example at step 4, you can choose dump the result to another pulsar topic.
-
-```shell
-$ ./bin/flink run -c org.apache.flink.streaming.connectors.pulsar.example.PulsarConsumerSourceWordCountToAvroTableSink ${PULSAR_HOME}/examples/flink/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub --output-topic test_dest
-```
-
-Once the flink word count example is running, you can use `bin/pulsar-client` to tail the results produced into topic `test_dest`.
-
-```shell
-$ bin/pulsar-client consume -n 0 -s test test_dest
-```
-
-You will see sample output for above linked application as follows:.
-```
------ got message -----
-
-hello�
------ got message -----
-
-again�
------ got message -----
-test�
------ got message -----
-
-world�
-
-```
-
-### PulsarConsumerSourceWordCountToJsonTableSink
-
-This Flink streaming job is consuming from a Pulsar topic and counting the wordcount in a streaming fashion. The job can write the word count results
-to csv file or another Pulsar topic for json format.
-
-The steps to run the example:
-
-Step 1, 2 and 3 are same as above.
-
-4. Run the word count example to print results to stdout.
-
-    ```shell
-    $ ./bin/flink run -c org.apache.flink.streaming.connectors.pulsar.example.PulsarConsumerSourceWordCountToJsonTableSink ${PULSAR_HOME}/examples/flink/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub
-    ```
-
-If java.lang.ClassNotFoundException: org.apache.flink.table.sinks.TableSink and java.lang.NoClassDefFoundError: org/apache/flink/formats/json/JsonRowSerializationSchema, you need build Apache Flink from source, then copy flink-table_{version}.jar, flink-json_{version}.jar to ${FLINK_HOME}/lib and restart flink cluster. 
-
-5. Produce messages to topic `test_src`.
-
-    ```shell
-    $ bin/pulsar-client produce -m "hello world again" -n 100 test_src
-    ```
-
-6. You can check the ${FLINK_HOME}/examples/file. The file contains the counts at the end of each time window as long as words are floating in, e.g.:
-
-    ```file
-    hello|100
-    again|100
-    test|100
-    world|100
-    ```
-
-Alternatively, when you run the flink word count example at step 4, you can choose dump the result to another pulsar topic.
-
-```shell
-$ ./bin/flink run -c org.apache.flink.streaming.connectors.pulsar.example.PulsarConsumerSourceWordCountToJsonTableSink ${PULSAR_HOME}/examples/flink/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub --output-topic test_dest
-```
-
-Once the flink word count example is running, you can use `bin/pulsar-client` to tail the results produced into topic `test_dest`.
-
-```shell
-$ bin/pulsar-client consume -n 0 -s test test_dest
-```
-
-You will see sample output for above linked application as follows:.
-```
------ got message -----
-{"word":"hello","count":100}
------ got message -----
-{"word":"again","count":100}
------ got message -----
-{"word":"test","count":100}
------ got message -----
-{"word":"world","count":100}
-```
diff --git a/examples/flink/src/main/resources/avro/NasaMission.avsc b/examples/flink/src/main/resources/avro/NasaMission.avsc
deleted file mode 100644
index 45adc98..0000000
--- a/examples/flink/src/main/resources/avro/NasaMission.avsc
+++ /dev/null
@@ -1,20 +0,0 @@
-[
-{"namespace": "org.apache.flink.avro.generated",
- "type": "record",
- "name": "NasaMission",
- "fields": [
-     {"name": "id", "type": "int"},
-     {"name": "name", "type": "string"},
-     {"name": "start_year",  "type": ["int", "null"]},
-     {"name": "end_year", "type": ["int", "null"]}
- ]
-},
-{"namespace": "org.apache.flink.avro.generated",
- "type": "record",
- "name": "WordWithCount",
- "fields": [
-     {"name": "word", "type": "string"},
-     {"name": "count", "type": "long"}
- ]
-}
-]
diff --git a/examples/flink/src/main/resources/log4j2.xml b/examples/flink/src/main/resources/log4j2.xml
deleted file mode 100644
index 2fdc2d0..0000000
--- a/examples/flink/src/main/resources/log4j2.xml
+++ /dev/null
@@ -1,37 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
-
--->
-<Configuration status="INFO">
-    <Appenders>
-        <Console name="Console" target="SYSTEM_OUT">
-            <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t:%C@%L] %-5level %logger{36} - %msg%n" />
-        </Console>
-    </Appenders>
-    <Loggers>
-        <Root level="warn">
-            <AppenderRef ref="Console" />
-        </Root>
-        <Logger name="org.eclipse.jetty" level="info"/>
-        <Logger name="org.apache.pulsar" level="info"/>
-        <Logger name="org.apache.bookkeeper" level="info"/>
-        <Logger name="org.apache.kafka" level="info"/>
-    </Loggers>
-</Configuration>
diff --git a/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkScalaExample.scala b/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkScalaExample.scala
deleted file mode 100644
index a64e656..0000000
--- a/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkScalaExample.scala
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.batch.connectors.pulsar.example
-
-import org.apache.flink.api.java.utils.ParameterTool
-import org.apache.flink.api.scala._
-import org.apache.flink.avro.generated.NasaMission
-import org.apache.flink.batch.connectors.pulsar.PulsarAvroOutputFormat
-import org.apache.pulsar.client.impl.auth.AuthenticationDisabled
-
-/**
-  * Implements a batch Scala program on Pulsar topic by writing Flink DataSet as Avro.
-  */
-object FlinkPulsarBatchAvroSinkScalaExample {
-
-  private val nasaMissions = List(
-    NasaMission.newBuilder.setId(1).setName("Mercury program").setStartYear(1959).setEndYear(1963).build,
-    NasaMission.newBuilder.setId(2).setName("Apollo program").setStartYear(1961).setEndYear(1972).build,
-    NasaMission.newBuilder.setId(3).setName("Gemini program").setStartYear(1963).setEndYear(1966).build,
-    NasaMission.newBuilder.setId(4).setName("Skylab").setStartYear(1973).setEndYear(1974).build,
-    NasaMission.newBuilder.setId(5).setName("Apollo–Soyuz Test Project").setStartYear(1975).setEndYear(1975).build)
-
-  def main(args: Array[String]): Unit = {
-
-    // parse input arguments
-    val parameterTool = ParameterTool.fromArgs(args)
-
-    if (parameterTool.getNumberOfParameters < 2) {
-      println("Missing parameters!")
-      println("Usage: pulsar --service-url <pulsar-service-url> --topic <topic>")
-      return
-    }
-
-    // set up the execution environment
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    env.getConfig.setGlobalJobParameters(parameterTool)
-
-    val serviceUrl = parameterTool.getRequired("service-url")
-    val topic = parameterTool.getRequired("topic")
-
-    println("Parameters:")
-    println("\tServiceUrl:\t" + serviceUrl)
-    println("\tTopic:\t" + topic)
-
-    // create PulsarCsvOutputFormat instance
-    val pulsarAvroOutputFormat =
-      new PulsarAvroOutputFormat[NasaMission](serviceUrl, topic, new AuthenticationDisabled())
-
-    // create DataSet
-    val textDS = env.fromCollection(nasaMissions)
-
-    // map nasa mission names to upper-case
-    textDS.map(nasaMission => new NasaMission(
-      nasaMission.getId,
-      nasaMission.getName,
-      nasaMission.getStartYear,
-      nasaMission.getEndYear))
-
-      // filter missions which started after 1970
-      .filter(_.getStartYear > 1970)
-
-      // write batch data to Pulsar as Avro
-      .output(pulsarAvroOutputFormat)
-
-    // set parallelism to write Pulsar in parallel (optional)
-    env.setParallelism(2)
-
-    // execute program
-    env.execute("Flink - Pulsar Batch Avro")
-  }
-
-}
diff --git a/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkScalaExample.scala b/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkScalaExample.scala
deleted file mode 100644
index 302d0ab..0000000
--- a/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkScalaExample.scala
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.batch.connectors.pulsar.example
-
-import org.apache.flink.api.java.tuple.Tuple4
-import org.apache.flink.api.java.utils.ParameterTool
-import org.apache.flink.api.scala._
-import org.apache.flink.batch.connectors.pulsar.PulsarCsvOutputFormat
-import org.apache.pulsar.client.impl.auth.AuthenticationDisabled
-
-/**
-  * Implements a batch Scala program on Pulsar topic by writing Flink DataSet as Csv.
-  */
-object FlinkPulsarBatchCsvSinkScalaExample {
-
-  /**
-    * NasaMission Model
-    */
-  private case class NasaMission(id: Int, missionName: String, startYear: Int, endYear: Int)
-    extends Tuple4(id, missionName, startYear, endYear)
-
-  private val nasaMissions = List(
-    NasaMission(1, "Mercury program", 1959, 1963),
-    NasaMission(2, "Apollo program", 1961, 1972),
-    NasaMission(3, "Gemini program", 1963, 1966),
-    NasaMission(4, "Skylab", 1973, 1974),
-    NasaMission(5, "Apollo–Soyuz Test Project", 1975, 1975))
-
-  def main(args: Array[String]): Unit = {
-
-    // parse input arguments
-    val parameterTool = ParameterTool.fromArgs(args)
-
-    if (parameterTool.getNumberOfParameters < 2) {
-      println("Missing parameters!")
-      println("Usage: pulsar --service-url <pulsar-service-url> --topic <topic>")
-      return
-    }
-
-    // set up the execution environment
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    env.getConfig.setGlobalJobParameters(parameterTool)
-
-    val serviceUrl = parameterTool.getRequired("service-url")
-    val topic = parameterTool.getRequired("topic")
-
-    println("Parameters:")
-    println("\tServiceUrl:\t" + serviceUrl)
-    println("\tTopic:\t" + topic)
-
-    // create PulsarCsvOutputFormat instance
-    val pulsarCsvOutputFormat =
-      new PulsarCsvOutputFormat[NasaMission](serviceUrl, topic, new AuthenticationDisabled())
-
-    // create DataSet
-    val textDS = env.fromCollection(nasaMissions)
-
-    // map nasa mission names to upper-case
-    textDS.map(nasaMission => NasaMission(
-      nasaMission.id,
-      nasaMission.missionName.toUpperCase,
-      nasaMission.startYear,
-      nasaMission.endYear))
-
-    // filter missions which started after 1970
-    .filter(_.startYear > 1970)
-
-    // write batch data to Pulsar as Csv
-    .output(pulsarCsvOutputFormat)
-
-    // set parallelism to write Pulsar in parallel (optional)
-    env.setParallelism(2)
-
-    // execute program
-    env.execute("Flink - Pulsar Batch Csv")
-  }
-
-}
\ No newline at end of file
diff --git a/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkScalaExample.scala b/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkScalaExample.scala
deleted file mode 100644
index 9518751..0000000
--- a/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkScalaExample.scala
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.batch.connectors.pulsar.example
-
-import org.apache.flink.api.java.utils.ParameterTool
-import org.apache.flink.api.scala._
-import org.apache.flink.batch.connectors.pulsar.PulsarJsonOutputFormat
-import scala.beans.BeanProperty
-import org.apache.pulsar.client.impl.auth.AuthenticationDisabled
-
-/**
-  * Implements a batch Scala program on Pulsar topic by writing Flink DataSet as Json.
-  */
-object FlinkPulsarBatchJsonSinkScalaExample {
-
-  /**
-    * NasaMission Model
-    */
-  private case class NasaMission(@BeanProperty id: Int,
-                         @BeanProperty missionName: String,
-                         @BeanProperty startYear: Int,
-                         @BeanProperty endYear: Int)
-
-  private val nasaMissions = List(
-    NasaMission(1, "Mercury program", 1959, 1963),
-    NasaMission(2, "Apollo program", 1961, 1972),
-    NasaMission(3, "Gemini program", 1963, 1966),
-    NasaMission(4, "Skylab", 1973, 1974),
-    NasaMission(5, "Apollo–Soyuz Test Project", 1975, 1975))
-
-  def main(args: Array[String]): Unit = {
-
-    // parse input arguments
-    val parameterTool = ParameterTool.fromArgs(args)
-
-    if (parameterTool.getNumberOfParameters < 2) {
-      println("Missing parameters!")
-      println("Usage: pulsar --service-url <pulsar-service-url> --topic <topic>")
-      return
-    }
-
-    // set up the execution environment
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    env.getConfig.setGlobalJobParameters(parameterTool)
-
-    val serviceUrl = parameterTool.getRequired("service-url")
-    val topic = parameterTool.getRequired("topic")
-
-    println("Parameters:")
-    println("\tServiceUrl:\t" + serviceUrl)
-    println("\tTopic:\t" + topic)
-
-    // create PulsarJsonOutputFormat instance
-    val pulsarJsonOutputFormat = new PulsarJsonOutputFormat[NasaMission](serviceUrl, topic, new AuthenticationDisabled())
-
-    // create DataSet
-    val nasaMissionDS = env.fromCollection(nasaMissions)
-
-    // map nasa mission names to upper-case
-    nasaMissionDS.map(nasaMission =>
-      NasaMission(
-        nasaMission.id,
-        nasaMission.missionName.toUpperCase,
-        nasaMission.startYear,
-        nasaMission.endYear))
-
-    // filter missions which started after 1970
-    .filter(_.startYear > 1970)
-
-    // write batch data to Pulsar
-    .output(pulsarJsonOutputFormat)
-
-    // set parallelism to write Pulsar in parallel (optional)
-    env.setParallelism(2)
-
-    // execute program
-    env.execute("Flink - Pulsar Batch Json")
-  }
-
-}
diff --git a/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkScalaExample.scala b/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkScalaExample.scala
deleted file mode 100644
index 369e56d..0000000
--- a/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkScalaExample.scala
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.batch.connectors.pulsar.example
-
-import org.apache.flink.api.common.serialization.SerializationSchema
-import org.apache.flink.api.java.utils.ParameterTool
-import org.apache.flink.api.scala._
-import org.apache.flink.batch.connectors.pulsar.PulsarOutputFormat
-import org.apache.flink.util.Collector
-import org.apache.pulsar.client.impl.auth.AuthenticationDisabled
-
-/**
-  * Data type for words with count.
-  */
-case class WordWithCount(word: String, count: Long) {
-  override def toString: String = "WordWithCount { word = " + word + ", count = " + count + " }"
-}
-
-/**
-  * Implements a batch word-count Scala program on Pulsar topic by writing Flink DataSet.
-  */
-object FlinkPulsarBatchSinkScalaExample {
-
-  private val EINSTEIN_QUOTE = "Imagination is more important than knowledge. " +
-    "Knowledge is limited. Imagination encircles the world."
-
-  def main(args: Array[String]): Unit = {
-
-    // parse input arguments
-    val parameterTool = ParameterTool.fromArgs(args)
-
-    if (parameterTool.getNumberOfParameters < 2) {
-      println("Missing parameters!")
-      println("Usage: pulsar --service-url <pulsar-service-url> --topic <topic>")
-      return
-    }
-
-    // set up the execution environment
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    env.getConfig.setGlobalJobParameters(parameterTool)
-
-    val serviceUrl = parameterTool.getRequired("service-url")
-    val topic = parameterTool.getRequired("topic")
-
-    println("Parameters:")
-    println("\tServiceUrl:\t" + serviceUrl)
-    println("\tTopic:\t" + topic)
-
-    // create PulsarOutputFormat instance
-    val pulsarOutputFormat =
-      new PulsarOutputFormat[WordWithCount](serviceUrl, topic, new AuthenticationDisabled(), new SerializationSchema[WordWithCount] {
-        override def serialize(wordWithCount: WordWithCount): Array[Byte] = wordWithCount.toString.getBytes
-      })
-
-    // create DataSet
-    val textDS = env.fromElements[String](EINSTEIN_QUOTE)
-
-    // convert sentence to words
-    textDS.flatMap((value: String, out: Collector[WordWithCount]) => {
-      val words = value.toLowerCase.split(" ")
-      for (word <- words) {
-        out.collect(new WordWithCount(word.replace(".", ""), 1))
-      }
-    })
-
-    // filter words which length is bigger than 4
-    .filter((wordWithCount: WordWithCount) => wordWithCount.word.length > 4)
-
-    // group the words
-    .groupBy((wordWithCount: WordWithCount) => wordWithCount.word)
-
-    // sum the word counts
-    .reduce((wordWithCount1: WordWithCount, wordWithCount2: WordWithCount) =>
-      new WordWithCount(wordWithCount1.word, wordWithCount1.count + wordWithCount2.count))
-
-    // write batch data to Pulsar
-    .output(pulsarOutputFormat)
-
-    // set parallelism to write Pulsar in parallel (optional)
-    env.setParallelism(2)
-
-    // execute program
-    env.execute("Flink - Pulsar Batch WordCount")
-  }
-
-}
\ No newline at end of file
diff --git a/examples/pom.xml b/examples/pom.xml
index 55382c4..f567c6b 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -33,7 +33,6 @@
   <name>Pulsar Adapter Examples :: Parent</name>
 
   <modules>
-    <module>flink</module>
     <module>spark</module>
     <module>kafka-streams</module>
   </modules>
diff --git a/pom.xml b/pom.xml
index 6ccf5f0..6683198 100644
--- a/pom.xml
+++ b/pom.xml
@@ -77,7 +77,6 @@
 
   <properties>
     <pulsar.version>2.8.0</pulsar.version>
-    <flink.version>1.7.2</flink.version>
     <storm.version>2.0.0</storm.version>
     <kafka-client.version>2.7.0</kafka-client.version>
     <kafka_0_8.version>0.8.1.1</kafka_0_8.version>
@@ -253,7 +252,6 @@
 
   <modules>
     <module>pulsar-storm</module>
-    <module>pulsar-flink</module>
     <module>pulsar-spark</module>
     <module>pulsar-client-kafka-compat</module>
     <module>pulsar-log4j2-appender</module>
@@ -363,49 +361,6 @@
       </dependency>
 
       <dependency>
-        <groupId>org.apache.flink</groupId>
-        <artifactId>flink-streaming-java_2.11</artifactId>
-        <version>${flink.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.apache.flink</groupId>
-        <artifactId>flink-table_2.11</artifactId>
-        <version>${flink.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.apache.flink</groupId>
-        <artifactId>flink-json</artifactId>
-        <version>${flink.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.apache.flink</groupId>
-        <artifactId>flink-avro</artifactId>
-        <version>${flink.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.apache.flink</groupId>
-        <artifactId>flink-test-utils_2.11</artifactId>
-        <version>${flink.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.apache.flink</groupId>
-        <artifactId>flink-streaming-scala_2.11</artifactId>
-        <version>${flink.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.apache.flink</groupId>
-        <artifactId>flink-runtime_2.11</artifactId>
-        <type>test-jar</type>
-        <version>${flink.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.apache.flink</groupId>
-        <artifactId>flink-streaming-java_2.11</artifactId>
-        <type>test-jar</type>
-        <version>${flink.version}</version>
-      </dependency>
-
-      <dependency>
         <groupId>org.apache.kafka</groupId>
         <artifactId>kafka_2.9.2</artifactId>
         <version>${kafka_0_8.version}</version>
diff --git a/pulsar-flink/README.md b/pulsar-flink/README.md
deleted file mode 100644
index f970a55..0000000
--- a/pulsar-flink/README.md
+++ /dev/null
@@ -1,27 +0,0 @@
-<!--
-
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
-
--->
-
-# Pulsar Flink Connector
-
-This Pulsar Flink connector is superseded by a newer version of [pulsar-flink](https://flink-packages.org/packages/pulsar-flink-connector)
-in Flink Packages.
-
-Details: https://cwiki.apache.org/confluence/display/FLINK/FLIP-72%3A+Introduce+Pulsar+Connector
\ No newline at end of file
diff --git a/pulsar-flink/pom.xml b/pulsar-flink/pom.xml
deleted file mode 100644
index bfa989f..0000000
--- a/pulsar-flink/pom.xml
+++ /dev/null
@@ -1,196 +0,0 @@
-<!--
-
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
-
--->
-<project
-        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
-        xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
-  <modelVersion>4.0.0</modelVersion>
-
-  <parent>
-    <groupId>org.apache.pulsar</groupId>
-    <artifactId>pulsar-adapters</artifactId>
-    <version>2.8.0-SNAPSHOT</version>
-    <relativePath>..</relativePath>
-  </parent>
-
-  <artifactId>pulsar-flink</artifactId>
-  <!-- This Pulsar Flink connector is superseded by a new version of `pulsar-flink` connector.
-       https://flink-packages.org/packages/pulsar-flink-connector
-       https://cwiki.apache.org/confluence/display/FLINK/FLIP-72%3A+Introduce+Pulsar+Connector -->
-  <name>Pulsar Flink Connectors (Deprecated)</name>
-
-  <dependencies>
-    <dependency>
-      <groupId>org.apache.flink</groupId>
-      <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.flink</groupId>
-      <artifactId>flink-table_${scala.binary.version}</artifactId>
-      <scope>provided</scope>
-      <!-- Projects depending on this project, won't depend on flink-table. -->
-      <optional>true</optional>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.flink</groupId>
-      <artifactId>flink-json</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.flink</groupId>
-      <artifactId>flink-avro</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.avro</groupId>
-      <artifactId>avro</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.avro</groupId>
-      <artifactId>avro-protobuf</artifactId>
-      <exclusions>
-        <exclusion>
-          <groupId>com.google.protobuf</groupId>
-          <artifactId>protobuf-java</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.pulsar</groupId>
-      <artifactId>pulsar-client</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.commons</groupId>
-      <artifactId>commons-csv</artifactId>
-      <version>1.6</version>
-      <scope>compile</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>com.fasterxml.jackson.core</groupId>
-      <artifactId>jackson-core</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>com.fasterxml.jackson.core</groupId>
-      <artifactId>jackson-databind</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.flink</groupId>
-      <artifactId>flink-runtime_${scala.binary.version}</artifactId>
-      <type>test-jar</type>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.flink</groupId>
-      <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
-      <type>test-jar</type>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.flink</groupId>
-      <artifactId>flink-test-utils_${scala.binary.version}</artifactId>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.javassist</groupId>
-      <artifactId>javassist</artifactId>
-      <version>3.20.0-GA</version>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.testng</groupId>
-      <artifactId>testng</artifactId>
-      <scope>test</scope>
-      <exclusions>
-        <exclusion>
-          <groupId>org.yaml</groupId>
-          <artifactId>*</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-
-    <dependency>
-      <groupId>org.mockito</groupId>
-      <artifactId>mockito-core</artifactId>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.powermock</groupId>
-      <artifactId>powermock-api-mockito2</artifactId>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.powermock</groupId>
-      <artifactId>powermock-module-testng</artifactId>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>com.google.guava</groupId>
-      <artifactId>guava</artifactId>
-    </dependency>
-  </dependencies>
-
-  <build>
-    <resources>
-      <resource>
-        <directory>src/main/resources</directory>
-        <filtering>true</filtering>
-      </resource>
-      <resource>
-        <directory>src/test/resources</directory>
-        <filtering>true</filtering>
-      </resource>
-    </resources>
-
-    <plugins>
-      <!-- Generate Test class from avro schema -->
-      <plugin>
-        <groupId>org.apache.avro</groupId>
-        <artifactId>avro-maven-plugin</artifactId>
-        <version>${avro.version}</version>
-        <executions>
-          <execution>
-            <phase>generate-sources</phase>
-            <goals>
-              <goal>schema</goal>
-            </goals>
-            <configuration>
-              <testSourceDirectory>${project.basedir}/src/test/resources/avro/</testSourceDirectory>
-            </configuration>
-          </execution>
-        </executions>
-      </plugin>
-    </plugins>
-  </build>
-</project>
diff --git a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/BasePulsarOutputFormat.java b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/BasePulsarOutputFormat.java
deleted file mode 100644
index d061559..0000000
--- a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/BasePulsarOutputFormat.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.batch.connectors.pulsar;
-
-import java.io.IOException;
-import java.util.concurrent.ExecutionException;
-import java.util.function.Function;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.api.common.io.RichOutputFormat;
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.Preconditions;
-import org.apache.pulsar.client.api.Authentication;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.impl.PulsarClientImpl;
-import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
-import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Base Pulsar Output Format to write Flink DataSets into a Pulsar topic.
- */
-public abstract class BasePulsarOutputFormat<T> extends RichOutputFormat<T>  {
-
-    private static final Logger LOG = LoggerFactory.getLogger(BasePulsarOutputFormat.class);
-    private static final long serialVersionUID = 2304601727522060427L;
-
-    private transient Function<Throwable, MessageId> failureCallback;
-    private static volatile Producer<byte[]> producer;
-
-    protected SerializationSchema<T> serializationSchema;
-
-    private ClientConfigurationData clientConf;
-    private ProducerConfigurationData producerConf;
-
-
-    protected BasePulsarOutputFormat(final String serviceUrl, final String topicName,
-        final Authentication authentication) {
-        Preconditions.checkArgument(StringUtils.isNotBlank(serviceUrl), "serviceUrl cannot be blank.");
-        Preconditions.checkArgument(StringUtils.isNotBlank(topicName),  "topicName cannot be blank.");
-
-        clientConf = new ClientConfigurationData();
-        producerConf = new ProducerConfigurationData();
-
-        this.clientConf.setServiceUrl(serviceUrl);
-        this.clientConf.setAuthentication(authentication);
-        this.producerConf.setTopicName(topicName);
-
-        LOG.info("PulsarOutputFormat is being started to write batches to Pulsar topic: {}",
-            this.producerConf.getTopicName());
-    }
-
-    protected BasePulsarOutputFormat(ClientConfigurationData clientConf, ProducerConfigurationData producerConf) {
-        this.clientConf = Preconditions.checkNotNull(clientConf, "client config data should not be null");
-        this.producerConf = Preconditions.checkNotNull(producerConf, "producer config data should not be null");
-
-        Preconditions.checkArgument(StringUtils.isNotBlank(clientConf.getServiceUrl()), "serviceUrl cannot be blank.");
-        Preconditions.checkArgument(StringUtils.isNotBlank(producerConf.getTopicName()),  "topicName cannot be blank.");
-
-        LOG.info("PulsarOutputFormat is being started to write batches to Pulsar topic: {}",
-            this.producerConf.getTopicName());
-    }
-
-    @Override
-    public void configure(Configuration configuration) {
-
-    }
-
-    @Override
-    public void open(int taskNumber, int numTasks) throws IOException {
-        this.producer = getProducerInstance();
-
-        this.failureCallback = cause -> {
-            LOG.error("Error while sending record to Pulsar: " + cause.getMessage(), cause);
-            return null;
-        };
-    }
-
-    @Override
-    public void writeRecord(T t) throws IOException {
-        byte[] data = this.serializationSchema.serialize(t);
-        this.producer.sendAsync(data)
-                .exceptionally(this.failureCallback);
-    }
-
-    @Override
-    public void close() throws IOException {
-
-    }
-
-    private Producer<byte[]> getProducerInstance()
-            throws PulsarClientException {
-        if (producer == null){
-            synchronized (PulsarOutputFormat.class) {
-                if (producer == null){
-                    producer = Preconditions.checkNotNull(createPulsarProducer(),
-                            "Pulsar producer cannot be null.");
-                }
-            }
-        }
-        return producer;
-    }
-
-    private Producer<byte[]> createPulsarProducer()
-            throws PulsarClientException {
-        try {
-            PulsarClientImpl client = new PulsarClientImpl(clientConf);
-            return client.createProducerAsync(producerConf).get();
-        } catch (PulsarClientException | InterruptedException | ExecutionException e) {
-            LOG.error("Pulsar producer cannot be created.", e);
-            throw new PulsarClientException(e);
-        }
-    }
-}
diff --git a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormat.java b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormat.java
deleted file mode 100644
index cc2acab..0000000
--- a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormat.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.batch.connectors.pulsar;
-
-import org.apache.avro.specific.SpecificRecord;
-import org.apache.flink.batch.connectors.pulsar.serialization.AvroSerializationSchema;
-import org.apache.pulsar.client.api.Authentication;
-import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
-import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
-
-/**
- * Pulsar Avro Output Format to write Flink DataSets into a Pulsar topic in Avro format.
- */
-public class PulsarAvroOutputFormat<T extends SpecificRecord> extends BasePulsarOutputFormat<T> {
-
-    private static final long serialVersionUID = -6794070714728773530L;
-
-    public PulsarAvroOutputFormat(String serviceUrl, String topicName, Authentication authentication) {
-        super(serviceUrl, topicName, authentication);
-        this.serializationSchema = new AvroSerializationSchema();
-    }
-
-    public PulsarAvroOutputFormat(ClientConfigurationData clientConfigurationData,
-        ProducerConfigurationData producerConfigurationData) {
-        super(clientConfigurationData, producerConfigurationData);
-        this.serializationSchema = new AvroSerializationSchema();
-    }
-}
diff --git a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormat.java b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormat.java
deleted file mode 100644
index 74680d3..0000000
--- a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormat.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.batch.connectors.pulsar;
-
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.batch.connectors.pulsar.serialization.CsvSerializationSchema;
-import org.apache.pulsar.client.api.Authentication;
-import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
-import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
-
-/**
- * Pulsar Csv Output Format to write Flink DataSets into a Pulsar topic in Csv format.
- */
-public class PulsarCsvOutputFormat<T extends Tuple> extends BasePulsarOutputFormat<T> {
-
-    private static final long serialVersionUID = -4461671510903404196L;
-
-    public PulsarCsvOutputFormat(String serviceUrl, String topicName, Authentication authentication) {
-        super(serviceUrl, topicName, authentication);
-        this.serializationSchema = new CsvSerializationSchema<>();
-    }
-
-    public PulsarCsvOutputFormat(ClientConfigurationData clientConfigurationData,
-        ProducerConfigurationData producerConfigurationData) {
-        super(clientConfigurationData, producerConfigurationData);
-        this.serializationSchema = new CsvSerializationSchema<>();
-    }
-
-}
diff --git a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormat.java b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormat.java
deleted file mode 100644
index 837f743..0000000
--- a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormat.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.batch.connectors.pulsar;
-
-import org.apache.flink.batch.connectors.pulsar.serialization.JsonSerializationSchema;
-import org.apache.pulsar.client.api.Authentication;
-import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
-import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
-
-/**
- * Pulsar Json Output Format to write Flink DataSets into a Pulsar topic in Json format.
- */
-public class PulsarJsonOutputFormat<T> extends BasePulsarOutputFormat<T> {
-
-    private static final long serialVersionUID = 8499620770848461958L;
-
-    public PulsarJsonOutputFormat(String serviceUrl, String topicName, Authentication authentication) {
-        super(serviceUrl, topicName, authentication);
-        this.serializationSchema = new JsonSerializationSchema();
-    }
-
-    public PulsarJsonOutputFormat(ClientConfigurationData clientConfigurationData,
-        ProducerConfigurationData producerConfigurationData) {
-        super(clientConfigurationData, producerConfigurationData);
-        this.serializationSchema = new JsonSerializationSchema();
-    }
-}
diff --git a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java
deleted file mode 100644
index 14e12e4..0000000
--- a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.batch.connectors.pulsar;
-
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.util.Preconditions;
-import org.apache.pulsar.client.api.Authentication;
-import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
-import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
-
-/**
- * Pulsar Output Format to write Flink DataSets into a Pulsar topic in user-defined format.
- */
-public class PulsarOutputFormat<T> extends BasePulsarOutputFormat<T> {
-
-    private static final long serialVersionUID = 2997027580167793000L;
-
-    public PulsarOutputFormat(String serviceUrl, String topicName, Authentication authentication,
-        final SerializationSchema<T> serializationSchema) {
-        super(serviceUrl, topicName, authentication);
-        Preconditions.checkNotNull(serializationSchema, "serializationSchema cannot be null.");
-        this.serializationSchema = serializationSchema;
-    }
-
-    public PulsarOutputFormat(final ClientConfigurationData clientConfigurationData,
-                              final ProducerConfigurationData producerConfigurationData,
-                              final SerializationSchema<T> serializationSchema) {
-        super(clientConfigurationData, producerConfigurationData);
-        Preconditions.checkNotNull(serializationSchema, "serializationSchema cannot be null.");
-        this.serializationSchema = serializationSchema;
-    }
-
-}
diff --git a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/package-info.java b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/package-info.java
deleted file mode 100644
index 79a8213..0000000
--- a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-/**
- * Implementations of different output formats.
- */
-package org.apache.flink.batch.connectors.pulsar;
\ No newline at end of file
diff --git a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/serialization/AvroSerializationSchema.java b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/serialization/AvroSerializationSchema.java
deleted file mode 100644
index ea71d4d..0000000
--- a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/serialization/AvroSerializationSchema.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.batch.connectors.pulsar.serialization;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import org.apache.avro.io.DatumWriter;
-import org.apache.avro.io.Encoder;
-import org.apache.avro.io.EncoderFactory;
-import org.apache.avro.specific.SpecificDatumWriter;
-import org.apache.avro.specific.SpecificRecord;
-import org.apache.flink.api.common.serialization.SerializationSchema;
-
-/**
- * Avro Serialization Schema to serialize Dataset records to Avro.
- */
-public class AvroSerializationSchema<T extends SpecificRecord> implements SerializationSchema<T> {
-
-    private static final long serialVersionUID = -6691140169413760919L;
-
-    @Override
-    public byte[] serialize(T t) {
-        if (null == t) {
-            return null;
-        }
-
-        // Writer to serialize Avro record into a byte array.
-        DatumWriter<T> writer = new SpecificDatumWriter<>(t.getSchema());
-        // Output stream to serialize records into byte array.
-        ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream();
-        // Low-level class for serialization of Avro values.
-        Encoder encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
-        arrayOutputStream.reset();
-        try {
-            writer.write(t, encoder);
-            encoder.flush();
-        } catch (IOException e) {
-            throw new RuntimeException("Error while serializing the record to Avro", e);
-        }
-
-        return arrayOutputStream.toByteArray();
-    }
-
-}
diff --git a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/serialization/CsvSerializationSchema.java b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/serialization/CsvSerializationSchema.java
deleted file mode 100644
index 4ee6f22..0000000
--- a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/serialization/CsvSerializationSchema.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.batch.connectors.pulsar.serialization;
-
-import java.io.IOException;
-import java.io.StringWriter;
-import org.apache.commons.csv.CSVFormat;
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.api.java.tuple.Tuple;
-
-/**
- * Csv Serialization Schema to serialize Tuples to Csv.
- */
-public class CsvSerializationSchema<T extends Tuple> implements SerializationSchema<T> {
-
-    private static final long serialVersionUID = -3379119592495232636L;
-    private static final int STRING_WRITER_INITIAL_BUFFER_SIZE = 256;
-
-    @Override
-    public byte[] serialize(T t) {
-        StringWriter stringWriter;
-        try {
-            Object[] fieldsValues = new Object[t.getArity()];
-            for (int index = 0; index < t.getArity(); index++) {
-                fieldsValues[index] = (t.getField(index));
-            }
-
-            stringWriter = new StringWriter(STRING_WRITER_INITIAL_BUFFER_SIZE);
-            CSVFormat.DEFAULT.withRecordSeparator("").printRecord(stringWriter, fieldsValues);
-        } catch (IOException e) {
-            throw new RuntimeException("Error while serializing the record to Csv", e);
-        }
-
-        return stringWriter.toString().getBytes();
-    }
-
-}
diff --git a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/serialization/JsonSerializationSchema.java b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/serialization/JsonSerializationSchema.java
deleted file mode 100644
index b7a56c5..0000000
--- a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/serialization/JsonSerializationSchema.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.batch.connectors.pulsar.serialization;
-
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-
-/**
- * Json Serialization Schema to serialize Dataset records to Json.
- */
-public class JsonSerializationSchema<T> implements SerializationSchema<T> {
-
-    private static final long serialVersionUID = -6938065355389311385L;
-    private ObjectMapper mapper = new ObjectMapper();
-
-    @Override
-    public byte[] serialize(T t) {
-        try {
-            return mapper.writeValueAsBytes(t);
-        } catch (JsonProcessingException e) {
-            throw new RuntimeException("Error while serializing the record to Json", e);
-        }
-    }
-}
diff --git a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/serialization/package-info.java b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/serialization/package-info.java
deleted file mode 100644
index e63f03f..0000000
--- a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/serialization/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-/**
- * Implementations of the serialization schemas.
- */
-package org.apache.flink.batch.connectors.pulsar.serialization;
\ No newline at end of file
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/CachedPulsarClient.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/CachedPulsarClient.java
deleted file mode 100644
index 4de5145..0000000
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/CachedPulsarClient.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.streaming.connectors.pulsar;
-
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.cache.RemovalListener;
-
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutionException;
-
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.impl.PulsarClientImpl;
-import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A Pulsar Client cache that enables client sharing among different flink tasks in same process.
- */
-public class CachedPulsarClient {
-    private static final Logger LOG = LoggerFactory.getLogger(CachedPulsarClient.class);
-
-    private static int cacheSize = 5;
-
-    public static void setCacheSize(int size) {
-        cacheSize = size;
-    }
-
-    private static CacheLoader<ClientConfigurationData, PulsarClientImpl> cacheLoader =
-        new CacheLoader<ClientConfigurationData, PulsarClientImpl>() {
-            @Override
-            public PulsarClientImpl load(ClientConfigurationData key) throws Exception {
-                return createPulsarClient(key);
-            }
-        };
-
-    private static RemovalListener<ClientConfigurationData, PulsarClientImpl> removalListener = notification -> {
-        ClientConfigurationData config = notification.getKey();
-        PulsarClientImpl client = notification.getValue();
-        LOG.debug("Evicting pulsar client {} with config {}, due to {}",
-            client.toString(), config.toString(), notification.getCause().toString());
-        close(config, client);
-    };
-
-    private static LoadingCache<ClientConfigurationData, PulsarClientImpl> guavaCache =
-        CacheBuilder.newBuilder().maximumSize(cacheSize).removalListener(removalListener).build(cacheLoader);
-
-    private static PulsarClientImpl createPulsarClient(
-            ClientConfigurationData clientConfig) throws PulsarClientException {
-        PulsarClientImpl client;
-        try {
-            client = new PulsarClientImpl(clientConfig);
-            LOG.debug("Created a new instance of PulsarClientImpl for clientConf = {}", clientConfig.toString());
-        } catch (PulsarClientException e) {
-            LOG.error("Failed to create PulsarClientImpl for clientConf = {}", clientConfig.toString());
-            throw e;
-        }
-        return client;
-    }
-
-    public static PulsarClientImpl getOrCreate(ClientConfigurationData config) throws ExecutionException {
-        PulsarClientImpl instance = guavaCache.get(config);
-        if (instance.getState().get() == PulsarClientImpl.State.Open) {
-            return instance;
-        } else {
-            guavaCache.invalidate(config);
-            return guavaCache.get(config);
-        }
-    }
-
-    private static void close(ClientConfigurationData clientConfig, PulsarClientImpl client) {
-        try {
-            LOG.info("Closing the Pulsar client with config {}", clientConfig.toString());
-            client.close();
-        } catch (PulsarClientException e) {
-            LOG.warn("Error while closing the Pulsar client with config {}", clientConfig.toString(), e);
-        }
-    }
-
-    static void close(ClientConfigurationData clientConfig) {
-        guavaCache.invalidate(clientConfig);
-    }
-
-    static void clear() {
-        LOG.info("Cleaning up guava cache.");
-        guavaCache.invalidateAll();
-    }
-
-    static ConcurrentMap<ClientConfigurationData, PulsarClientImpl> getAsMap() {
-        return guavaCache.asMap();
-    }
-}
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
deleted file mode 100644
index cf3c657..0000000
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
+++ /dev/null
@@ -1,351 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.streaming.connectors.pulsar;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-import java.util.function.Function;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.api.java.ClosureCleaner;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor;
-import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarPropertiesExtractor;
-import org.apache.flink.util.SerializableObject;
-import org.apache.pulsar.client.api.Authentication;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.TypedMessageBuilder;
-import org.apache.pulsar.client.impl.PulsarClientImpl;
-import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
-import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Flink Sink to produce data into a Pulsar topic.
- */
-public class FlinkPulsarProducer<T>
-        extends RichSinkFunction<T>
-        implements CheckpointedFunction {
-
-    private static final Logger LOG = LoggerFactory.getLogger(FlinkPulsarProducer.class);
-
-    private ClientConfigurationData clientConf;
-    private ProducerConfigurationData producerConf;
-
-    /**
-     * (Serializable) SerializationSchema for turning objects used with Flink into.
-     * byte[] for Pulsar.
-     */
-    protected final SerializationSchema<T> schema;
-
-    /**
-     * User-provided key extractor for assigning a key to a pulsar message.
-     */
-    protected final PulsarKeyExtractor<T> flinkPulsarKeyExtractor;
-
-    /**
-     * User-provided properties extractor for assigning a key to a pulsar message.
-     */
-    protected final PulsarPropertiesExtractor<T> flinkPulsarPropertiesExtractor;
-
-    /**
-     * Produce Mode.
-     */
-    protected PulsarProduceMode produceMode = PulsarProduceMode.AT_LEAST_ONCE;
-
-    /**
-     * If true, the producer will wait until all outstanding records have been send to the broker.
-     */
-    protected boolean flushOnCheckpoint;
-
-    // -------------------------------- Runtime fields ------------------------------------------
-
-    /**
-     * Pulsar Producer instance.
-     */
-    protected transient Producer<byte[]> producer;
-
-    /**
-     * The callback than handles error propagation or logging callbacks.
-     */
-    protected transient Function<MessageId, MessageId> successCallback;
-
-    protected transient Function<Throwable, MessageId> failureCallback;
-
-    /**
-     * Errors encountered in the async producer are stored here.
-     */
-    protected transient volatile Exception asyncException;
-
-    /**
-     * Lock for accessing the pending records.
-     */
-    protected final SerializableObject pendingRecordsLock = new SerializableObject();
-
-    /**
-     * Number of unacknowledged records.
-     */
-    protected long pendingRecords;
-
-    public FlinkPulsarProducer(String serviceUrl,
-                               String defaultTopicName,
-                               Authentication authentication,
-                               SerializationSchema<T> serializationSchema,
-                               PulsarKeyExtractor<T> keyExtractor,
-                               PulsarPropertiesExtractor<T> propertiesExtractor) {
-        checkArgument(StringUtils.isNotBlank(serviceUrl), "Service url cannot be blank");
-        checkArgument(StringUtils.isNotBlank(defaultTopicName), "TopicName cannot be blank");
-        checkNotNull(authentication, "auth cannot be null, set disabled for no auth");
-
-        clientConf = new ClientConfigurationData();
-        producerConf = new ProducerConfigurationData();
-
-        this.clientConf.setServiceUrl(serviceUrl);
-        this.clientConf.setAuthentication(authentication);
-        this.producerConf.setTopicName(defaultTopicName);
-        this.schema = checkNotNull(serializationSchema, "Serialization Schema not set");
-        this.flinkPulsarKeyExtractor = getOrNullKeyExtractor(keyExtractor);
-        this.flinkPulsarPropertiesExtractor = getOrNullPropertiesExtractor(propertiesExtractor);
-        ClosureCleaner.ensureSerializable(serializationSchema);
-    }
-
-    public FlinkPulsarProducer(ClientConfigurationData clientConfigurationData,
-                               ProducerConfigurationData producerConfigurationData,
-                               SerializationSchema<T> serializationSchema,
-                               PulsarKeyExtractor<T> keyExtractor,
-                               PulsarPropertiesExtractor<T> propertiesExtractor) {
-        this.clientConf = checkNotNull(clientConfigurationData, "client conf can not be null");
-        this.producerConf = checkNotNull(producerConfigurationData, "producer conf can not be null");
-        this.schema = checkNotNull(serializationSchema, "Serialization Schema not set");
-        this.flinkPulsarKeyExtractor = getOrNullKeyExtractor(keyExtractor);
-        this.flinkPulsarPropertiesExtractor = getOrNullPropertiesExtractor(propertiesExtractor);
-        ClosureCleaner.ensureSerializable(serializationSchema);
-    }
-
-    // ---------------------------------- Properties --------------------------
-
-
-    /**
-     * @return pulsar key extractor.
-     */
-    public PulsarKeyExtractor<T> getKeyExtractor() {
-        return flinkPulsarKeyExtractor;
-    }
-
-    /**
-     * @return pulsar properties extractor.
-     */
-    public PulsarPropertiesExtractor<T> getPulsarPropertiesExtractor() {
-        return flinkPulsarPropertiesExtractor;
-    }
-
-    /**
-     * Gets this producer's operating mode.
-     */
-    public PulsarProduceMode getProduceMode() {
-        return this.produceMode;
-    }
-
-    /**
-     * Sets this producer's operating mode.
-     *
-     * @param produceMode The mode of operation.
-     */
-    public void setProduceMode(PulsarProduceMode produceMode) {
-        this.produceMode = checkNotNull(produceMode);
-    }
-
-    /**
-     * If set to true, the Flink producer will wait for all outstanding messages in the Pulsar buffers
-     * to be acknowledged by the Pulsar producer on a checkpoint.
-     * This way, the producer can guarantee that messages in the Pulsar buffers are part of the checkpoint.
-     *
-     * @param flush Flag indicating the flushing mode (true = flush on checkpoint)
-     */
-    public void setFlushOnCheckpoint(boolean flush) {
-        this.flushOnCheckpoint = flush;
-    }
-
-    // ----------------------------------- Sink Methods --------------------------
-
-    @SuppressWarnings("unchecked")
-    private static <T> PulsarKeyExtractor<T> getOrNullKeyExtractor(PulsarKeyExtractor<T> extractor) {
-        if (null == extractor) {
-            return PulsarKeyExtractor.NULL;
-        } else {
-            return extractor;
-        }
-    }
-
-    @SuppressWarnings("unchecked")
-    private static <T> PulsarPropertiesExtractor<T> getOrNullPropertiesExtractor(
-            PulsarPropertiesExtractor<T> extractor) {
-        if (null == extractor) {
-            return PulsarPropertiesExtractor.EMPTY;
-        } else {
-            return extractor;
-        }
-    }
-
-    private Producer<byte[]> createProducer() throws Exception {
-        PulsarClientImpl client = CachedPulsarClient.getOrCreate(clientConf);
-        return client.createProducerAsync(producerConf).get();
-    }
-
-    /**
-     * Initializes the connection to pulsar.
-     *
-     * @param parameters configuration used for initialization
-     * @throws Exception
-     */
-    @Override
-    public void open(Configuration parameters) throws Exception {
-        this.producer = createProducer();
-
-        RuntimeContext ctx = getRuntimeContext();
-
-        LOG.info("Starting FlinkPulsarProducer ({}/{}) to produce into pulsar topic {}",
-                ctx.getIndexOfThisSubtask() + 1, ctx.getNumberOfParallelSubtasks(), producerConf.getTopicName());
-
-        if (flushOnCheckpoint && !((StreamingRuntimeContext) this.getRuntimeContext()).isCheckpointingEnabled()) {
-            LOG.warn("Flushing on checkpoint is enabled, but checkpointing is not enabled. Disabling flushing.");
-            flushOnCheckpoint = false;
-        }
-
-        this.successCallback =  msgId -> {
-            acknowledgeMessage();
-            return msgId;
-        };
-
-        if (PulsarProduceMode.AT_MOST_ONCE == produceMode) {
-            this.failureCallback = cause -> {
-                LOG.error("Error while sending record to Pulsar : " + cause.getMessage(), cause);
-                return null;
-            };
-        } else if (PulsarProduceMode.AT_LEAST_ONCE == produceMode) {
-            this.failureCallback = cause -> {
-                if (null == asyncException) {
-                    if (cause instanceof Exception) {
-                        asyncException = (Exception) cause;
-                    } else {
-                        asyncException = new Exception(cause);
-                    }
-                }
-                return null;
-            };
-        } else {
-            throw new UnsupportedOperationException("Unsupported produce mode " + produceMode);
-        }
-    }
-
-    @Override
-    public void invoke(T value, Context context) throws Exception {
-        checkErroneous();
-
-        byte[] serializedValue = schema.serialize(value);
-
-        TypedMessageBuilder<byte[]> msgBuilder = producer.newMessage();
-        if (null != context.timestamp()) {
-            msgBuilder = msgBuilder.eventTime(context.timestamp());
-        }
-        String msgKey = flinkPulsarKeyExtractor.getKey(value);
-        if (null != msgKey) {
-            msgBuilder = msgBuilder.key(msgKey);
-        }
-
-        if (flushOnCheckpoint) {
-            synchronized (pendingRecordsLock) {
-                pendingRecords++;
-            }
-        }
-        msgBuilder.value(serializedValue)
-                .properties(this.flinkPulsarPropertiesExtractor.getProperties(value))
-                .sendAsync()
-                .thenApply(successCallback)
-                .exceptionally(failureCallback);
-    }
-
-    @Override
-    public void close() throws Exception {
-        if (producer != null) {
-            producer.close();
-        }
-
-        // make sure we propagate pending errors
-        checkErroneous();
-    }
-
-    // ------------------- Logic for handling checkpoint flushing -------------------------- //
-
-    private void acknowledgeMessage() {
-        if (flushOnCheckpoint) {
-            synchronized (pendingRecordsLock) {
-                pendingRecords--;
-                if (pendingRecords == 0) {
-                    pendingRecordsLock.notifyAll();
-                }
-            }
-        }
-    }
-
-    @Override
-    public void snapshotState(FunctionSnapshotContext context) throws Exception {
-        // check for asynchronous errors and fail the checkpoint if necessary
-        checkErroneous();
-
-        if (flushOnCheckpoint) {
-            // wait until all the messages are acknowledged
-            synchronized (pendingRecordsLock) {
-                while (pendingRecords > 0) {
-                    pendingRecordsLock.wait(100);
-                }
-            }
-
-            // if the flushed requests has errors, we should propagate it also and fail the checkpoint
-            checkErroneous();
-        }
-    }
-
-    @Override
-    public void initializeState(FunctionInitializationContext context) throws Exception {
-        // nothing to do
-    }
-
-    // ----------------------------------- Utilities --------------------------
-
-    protected void checkErroneous() throws Exception {
-        Exception e = asyncException;
-        if (e != null) {
-            // prevent double throwing
-            asyncException = null;
-            throw new Exception("Failed to send data to Pulsar: " + e.getMessage(), e);
-        }
-    }
-
-}
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSink.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSink.java
deleted file mode 100644
index 51b3572..0000000
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSink.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.streaming.connectors.pulsar;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
-
-import java.util.Arrays;
-
-import org.apache.avro.Schema;
-import org.apache.avro.specific.SpecificData;
-import org.apache.avro.specific.SpecificRecord;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.formats.avro.AvroRowSerializationSchema;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor;
-import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarPropertiesExtractor;
-import org.apache.flink.table.sinks.AppendStreamTableSink;
-import org.apache.flink.table.sinks.TableSink;
-import org.apache.flink.types.Row;
-import org.apache.pulsar.client.api.Authentication;
-import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
-import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
-
-/**
- * An append-only table sink to emit a streaming table as a Pulsar stream that serializes data in Avro format.
- */
-public class PulsarAvroTableSink implements AppendStreamTableSink<Row> {
-
-    protected ClientConfigurationData clientConfigurationData;
-    protected ProducerConfigurationData producerConfigurationData;
-    protected final String routingKeyFieldName;
-    protected SerializationSchema<Row> serializationSchema;
-    protected String[] fieldNames;
-    protected TypeInformation[] fieldTypes;
-    protected PulsarKeyExtractor<Row> keyExtractor;
-    protected PulsarPropertiesExtractor<Row> propertiesExtractor;
-    private Class<? extends SpecificRecord> recordClazz;
-
-    /**
-     * Create PulsarAvroTableSink.
-     *
-     * @param serviceUrl          pulsar service url
-     * @param topic               topic in pulsar to which table is written
-     * @param routingKeyFieldName routing key field name
-     */
-    public PulsarAvroTableSink(
-            String serviceUrl,
-            String topic,
-            Authentication authentication,
-            String routingKeyFieldName,
-            Class<? extends SpecificRecord> recordClazz) {
-        checkArgument(StringUtils.isNotBlank(serviceUrl), "Service url not set");
-        checkArgument(StringUtils.isNotBlank(topic), "Topic is null");
-        checkNotNull(authentication, "authentication is null, set new AuthenticationDisabled() instead");
-
-        clientConfigurationData = new ClientConfigurationData();
-        producerConfigurationData = new ProducerConfigurationData();
-
-        clientConfigurationData.setServiceUrl(serviceUrl);
-        clientConfigurationData.setAuthentication(authentication);
-        producerConfigurationData.setTopicName(topic);
-        this.routingKeyFieldName = routingKeyFieldName;
-        this.recordClazz = recordClazz;
-    }
-
-    public PulsarAvroTableSink(
-            ClientConfigurationData clientConfigurationData,
-            ProducerConfigurationData producerConfigurationData,
-            String routingKeyFieldName,
-            Class<? extends SpecificRecord> recordClazz) {
-        this.clientConfigurationData = checkNotNull(clientConfigurationData, "client config can not be null");
-        this.producerConfigurationData = checkNotNull(producerConfigurationData, "producer config can not be null");
-
-        checkArgument(StringUtils.isNotBlank(clientConfigurationData.getServiceUrl()), "Service url not set");
-        checkArgument(StringUtils.isNotBlank(producerConfigurationData.getTopicName()), "Topic is null");
-
-        this.routingKeyFieldName = routingKeyFieldName;
-        this.recordClazz = recordClazz;
-    }
-
-    /**
-     * Returns the low-level producer.
-     */
-    protected FlinkPulsarProducer<Row> createFlinkPulsarProducer() {
-        serializationSchema = new AvroRowSerializationSchema(recordClazz);
-        return new FlinkPulsarProducer<Row>(
-                clientConfigurationData,
-                producerConfigurationData,
-                serializationSchema,
-                keyExtractor,
-                propertiesExtractor);
-    }
-
-    @Override
-    public void emitDataStream(DataStream<Row> dataStream) {
-        checkState(fieldNames != null, "Table sink is not configured");
-        checkState(fieldTypes != null, "Table sink is not configured");
-        checkState(serializationSchema != null, "Table sink is not configured");
-        checkState(keyExtractor != null, "Table sink is not configured");
-        FlinkPulsarProducer<Row> producer = createFlinkPulsarProducer();
-        dataStream.addSink(producer);
-    }
-
-    @Override
-    public TypeInformation<Row> getOutputType() {
-        RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes, fieldNames);
-        return rowTypeInfo;
-    }
-
-    @Override
-    public String[] getFieldNames() {
-        return fieldNames;
-    }
-
-    @Override
-    public TypeInformation<?>[] getFieldTypes() {
-        return fieldTypes;
-    }
-
-    @Override
-    public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
-        PulsarAvroTableSink sink = new PulsarAvroTableSink(
-                clientConfigurationData, producerConfigurationData, routingKeyFieldName, recordClazz);
-
-        sink.fieldNames = checkNotNull(fieldNames, "Field names are null");
-        sink.fieldTypes = checkNotNull(fieldTypes, "Field types are null");
-        checkArgument(fieldNames.length == fieldTypes.length,
-                "Number of provided field names and types do not match");
-
-        sink.serializationSchema = new AvroRowSerializationSchema(recordClazz);
-        sink.keyExtractor = new AvroKeyExtractor(
-                routingKeyFieldName,
-                fieldNames,
-                fieldTypes,
-                recordClazz);
-        sink.propertiesExtractor = PulsarPropertiesExtractor.EMPTY;
-
-        return sink;
-    }
-
-
-    /**
-     * A key extractor that extracts the routing key from a {@link Row} by field name.
-     */
-    private static class AvroKeyExtractor implements PulsarKeyExtractor<Row> {
-        private final int keyIndex;
-
-        public AvroKeyExtractor(
-                String keyFieldName,
-                String[] fieldNames,
-                TypeInformation<?>[] fieldTypes,
-                Class<? extends SpecificRecord> recordClazz) {
-
-            checkArgument(fieldNames.length == fieldTypes.length,
-                    "Number of provided field names and types does not match.");
-
-            Schema schema = SpecificData.get().getSchema(recordClazz);
-            Schema.Field keyField = schema.getField(keyFieldName);
-            Schema.Type keyType = keyField.schema().getType();
-
-            int keyIndex = Arrays.asList(fieldNames).indexOf(keyFieldName);
-            checkArgument(keyIndex >= 0,
-                    "Key field '" + keyFieldName + "' not found");
-
-            checkArgument(Schema.Type.STRING.equals(keyType),
-                    "Key field must be of type 'STRING'");
-            this.keyIndex = keyIndex;
-        }
-
-        @Override
-        public String getKey(Row event) {
-            return event.getField(keyIndex).toString();
-        }
-    }
-
-}
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
deleted file mode 100644
index 30361a0..0000000
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.streaming.connectors.pulsar;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import org.apache.flink.util.IOUtils;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.impl.PulsarClientImpl;
-import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
-import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Pulsar source (consumer) which receives messages from a topic and acknowledges messages.
- * When checkpointing is enabled, it guarantees at least once processing semantics.
- *
- * <p>When checkpointing is disabled, it auto acknowledges messages based on the number of messages it has
- * received. In this mode messages may be dropped.
- */
-class PulsarConsumerSource<T> extends MessageAcknowledgingSourceBase<T, MessageId> implements PulsarSourceBase<T> {
-
-    private static final Logger LOG = LoggerFactory.getLogger(PulsarConsumerSource.class);
-
-    private int messageReceiveTimeoutMs;
-
-    private ClientConfigurationData clientConfigurationData;
-    private ConsumerConfigurationData<byte[]> consumerConfigurationData;
-
-    private final DeserializationSchema<T> deserializer;
-
-    private PulsarClient client;
-    private Consumer<byte[]> consumer;
-
-    private boolean isCheckpointingEnabled;
-
-    private final long acknowledgementBatchSize;
-    private long batchCount;
-
-    private transient volatile boolean isRunning;
-
-    PulsarConsumerSource(PulsarSourceBuilder<T> builder) {
-        super(MessageId.class);
-
-        clientConfigurationData = new ClientConfigurationData();
-        consumerConfigurationData = new ConsumerConfigurationData<>();
-
-        this.clientConfigurationData = builder.clientConfigurationData;
-        this.consumerConfigurationData = builder.consumerConfigurationData;
-        this.deserializer = builder.deserializationSchema;
-        this.acknowledgementBatchSize = builder.acknowledgementBatchSize;
-        this.messageReceiveTimeoutMs = builder.messageReceiveTimeoutMs;
-    }
-
-    @Override
-    public void open(Configuration parameters) throws Exception {
-        super.open(parameters);
-
-        final RuntimeContext context = getRuntimeContext();
-        if (context instanceof StreamingRuntimeContext) {
-            isCheckpointingEnabled = ((StreamingRuntimeContext) context).isCheckpointingEnabled();
-        }
-
-        client = getClient();
-        consumer = createConsumer(client);
-
-        isRunning = true;
-    }
-
-    @Override
-    protected void acknowledgeIDs(long checkpointId, Set<MessageId> messageIds) {
-        if (consumer == null) {
-            LOG.error("null consumer unable to acknowledge messages");
-            throw new RuntimeException("null pulsar consumer unable to acknowledge messages");
-        }
-
-        if (messageIds.isEmpty()) {
-            LOG.info("no message ids to acknowledge");
-            return;
-        }
-
-        Map<String, CompletableFuture<Void>> futures = new HashMap<>(messageIds.size());
-        for (MessageId id : messageIds) {
-            futures.put(id.toString(), consumer.acknowledgeAsync(id));
-        }
-
-        futures.forEach((k, f) -> {
-            try {
-                f.get();
-            } catch (Exception e) {
-                LOG.error("failed to acknowledge messageId " + k, e);
-                throw new RuntimeException("Messages could not be acknowledged during checkpoint creation.", e);
-            }
-        });
-    }
-
-    @Override
-    public void run(SourceContext<T> context) throws Exception {
-        Message message;
-        while (isRunning) {
-            message = consumer.receive(messageReceiveTimeoutMs, TimeUnit.MILLISECONDS);
-            if (message == null) {
-                continue;
-            }
-
-            if (isCheckpointingEnabled) {
-                emitCheckpointing(context, message);
-            } else {
-                emitAutoAcking(context, message);
-            }
-        }
-    }
-
-    private void emitCheckpointing(SourceContext<T> context, Message message) throws IOException {
-        synchronized (context.getCheckpointLock()) {
-            if (!addId(message.getMessageId())) {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("messageId=" + message.getMessageId().toString() + " already processed.");
-                }
-                return;
-            }
-            context.collect(deserialize(message));
-        }
-    }
-
-    private void emitAutoAcking(SourceContext<T> context, Message message) throws IOException {
-        context.collect(deserialize(message));
-        batchCount++;
-        if (batchCount >= acknowledgementBatchSize) {
-            LOG.info("processed {} messages acknowledging messageId {}", batchCount, message.getMessageId());
-            consumer.acknowledgeCumulative(message.getMessageId());
-            batchCount = 0;
-        }
-    }
-
-    protected T deserialize(Message message) throws IOException {
-        return deserializer.deserialize(message.getData());
-    }
-
-    @Override
-    public void cancel() {
-        isRunning = false;
-    }
-
-    @Override
-    public void close() throws Exception {
-        super.close();
-        IOUtils.cleanup(LOG, consumer);
-        IOUtils.cleanup(LOG, client);
-    }
-
-    @Override
-    public TypeInformation<T> getProducedType() {
-        return deserializer.getProducedType();
-    }
-
-    boolean isCheckpointingEnabled() {
-        return isCheckpointingEnabled;
-    }
-
-    PulsarClient getClient() throws ExecutionException {
-        return CachedPulsarClient.getOrCreate(clientConfigurationData);
-    }
-
-    Consumer<byte[]> createConsumer(PulsarClient client) throws PulsarClientException {
-        return ((PulsarClientImpl) client).subscribeAsync(consumerConfigurationData).join();
-    }
-}
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSink.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSink.java
deleted file mode 100644
index b6f82a5..0000000
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSink.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.streaming.connectors.pulsar;
-
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.formats.json.JsonRowSerializationSchema;
-import org.apache.flink.types.Row;
-import org.apache.pulsar.client.api.Authentication;
-import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
-import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
-
-/**
- * Base class for {@link PulsarTableSink} that serializes data in JSON format.
- */
-public class PulsarJsonTableSink extends PulsarTableSink {
-
-    /**
-     * Create PulsarJsonTableSink.
-     *
-     * @param serviceUrl          pulsar service url
-     * @param topic               topic in pulsar to which table is written
-     * @param authentication      authetication info required by pulsar client
-     * @param routingKeyFieldName routing key field name
-     */
-    public PulsarJsonTableSink(
-            String serviceUrl,
-            String topic,
-            Authentication authentication,
-            String routingKeyFieldName) {
-        super(serviceUrl, topic, authentication, routingKeyFieldName);
-    }
-
-    public PulsarJsonTableSink(
-            ClientConfigurationData clientConfigurationData,
-            ProducerConfigurationData producerConfigurationData,
-            String routingKeyFieldName) {
-        super(clientConfigurationData, producerConfigurationData, routingKeyFieldName);
-    }
-
-    @Override
-    protected SerializationSchema<Row> createSerializationSchema(RowTypeInfo rowSchema) {
-        return new JsonRowSerializationSchema(rowSchema);
-    }
-
-    @Override
-    protected PulsarTableSink createSink() {
-        return new PulsarJsonTableSink(
-                clientConfigurationData,
-                producerConfigurationData,
-                routingKeyFieldName);
-    }
-}
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarProduceMode.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarProduceMode.java
deleted file mode 100644
index d42f5c3..0000000
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarProduceMode.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.streaming.connectors.pulsar;
-
-/**
- * The supported producing modes of operation for flink's pulsar producer.
- */
-public enum PulsarProduceMode {
-
-    /**
-     * Any produce failures will be ignored hence there could be data loss.
-     */
-    AT_MOST_ONCE,
-
-    /**
-     * The producer will ensure that all the events are persisted in pulsar.
-     * There could be duplicate events written though.
-     */
-    AT_LEAST_ONCE,
-
-}
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBase.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBase.java
deleted file mode 100644
index 9d44215..0000000
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBase.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.streaming.connectors.pulsar;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
-
-/**
- * Base class for pulsar sources.
- * @param <T>
- */
-@PublicEvolving
-interface PulsarSourceBase<T> extends ParallelSourceFunction<T>, ResultTypeQueryable<T> {
-}
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java
deleted file mode 100644
index 67690be..0000000
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java
+++ /dev/null
@@ -1,332 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.streaming.connectors.pulsar;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeSet;
-import java.util.regex.Pattern;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.util.Preconditions;
-import org.apache.pulsar.client.api.Authentication;
-import org.apache.pulsar.client.api.AuthenticationFactory;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.SubscriptionInitialPosition;
-import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
-import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
-
-/**
- * A class for building a pulsar source.
- */
-@PublicEvolving
-public class PulsarSourceBuilder<T> {
-
-    private static final String SERVICE_URL = "pulsar://localhost:6650";
-    private static final long ACKNOWLEDGEMENT_BATCH_SIZE = 100;
-    private static final long MAX_ACKNOWLEDGEMENT_BATCH_SIZE = 1000;
-    private static final int DEFAULT_MESSAGE_RECEIVE_TIMEOUT_MS = 100;
-    private static final String SUBSCRIPTION_NAME = "flink-sub";
-
-    final DeserializationSchema<T> deserializationSchema;
-
-    ClientConfigurationData clientConfigurationData;
-    ConsumerConfigurationData<byte[]> consumerConfigurationData;
-
-    long acknowledgementBatchSize = ACKNOWLEDGEMENT_BATCH_SIZE;
-    //
-    int messageReceiveTimeoutMs = DEFAULT_MESSAGE_RECEIVE_TIMEOUT_MS;
-
-    private PulsarSourceBuilder(DeserializationSchema<T> deserializationSchema) {
-        this.deserializationSchema = deserializationSchema;
-
-        clientConfigurationData = new ClientConfigurationData();
-        consumerConfigurationData = new ConsumerConfigurationData<>();
-        clientConfigurationData.setServiceUrl(SERVICE_URL);
-        consumerConfigurationData.setTopicNames(new TreeSet<>());
-        consumerConfigurationData.setSubscriptionName(SUBSCRIPTION_NAME);
-        consumerConfigurationData.setSubscriptionInitialPosition(SubscriptionInitialPosition.Latest);
-    }
-
-    /**
-     * Sets the pulsar service url to connect to. Defaults to pulsar://localhost:6650.
-     *
-     * @param serviceUrl service url to connect to
-     * @return this builder
-     */
-    public PulsarSourceBuilder<T> serviceUrl(String serviceUrl) {
-        Preconditions.checkArgument(StringUtils.isNotBlank(serviceUrl), "serviceUrl cannot be blank");
-        this.clientConfigurationData.setServiceUrl(serviceUrl);
-        return this;
-    }
-
-    /**
-     * Sets topics to consumer from. This is required.
-     *
-     * <p>Topic names (https://pulsar.apache.org/docs/latest/getting-started/ConceptsAndArchitecture/#Topics)
-     * are in the following format:
-     * {persistent|non-persistent}://tenant/namespace/topic
-     *
-     * @param topics the topic to consumer from
-     * @return this builder
-     */
-    public PulsarSourceBuilder<T> topic(String... topics) {
-        Preconditions.checkArgument(topics != null && topics.length > 0,
-                "topics cannot be blank");
-        for (String topic : topics) {
-            Preconditions.checkArgument(StringUtils.isNotBlank(topic), "topicNames cannot have blank topic");
-        }
-        this.consumerConfigurationData.getTopicNames().addAll(Arrays.asList(topics));
-        return this;
-    }
-
-    /**
-     * Sets topics to consumer from. This is required.
-     *
-     * <p>Topic names (https://pulsar.apache.org/docs/latest/getting-started/ConceptsAndArchitecture/#Topics)
-     * are in the following format:
-     * {persistent|non-persistent}://tenant/namespace/topic
-     *
-     * @param topics the topic to consumer from
-     * @return this builder
-     */
-    public PulsarSourceBuilder<T> topics(List<String> topics) {
-        Preconditions.checkArgument(topics != null && !topics.isEmpty(), "topics cannot be blank");
-        topics.forEach(topicName ->
-                Preconditions.checkArgument(StringUtils.isNotBlank(topicName), "topicNames cannot have blank topic"));
-        this.consumerConfigurationData.getTopicNames().addAll(topics);
-        return this;
-    }
-
-    /**
-     * Use topic pattern to config sets of topics to consumer.
-     *
-     * <p>Topic names (https://pulsar.apache.org/docs/latest/getting-started/ConceptsAndArchitecture/#Topics)
-     * are in the following format:
-     * {persistent|non-persistent}://tenant/namespace/topic
-     *
-     * @param topicsPattern topic pattern to consumer from
-     * @return this builder
-     */
-    public PulsarSourceBuilder<T> topicsPattern(Pattern topicsPattern) {
-        Preconditions.checkArgument(topicsPattern != null, "Param topicsPattern cannot be null");
-        Preconditions.checkArgument(this.consumerConfigurationData.getTopicsPattern() == null,
-            "Pattern has already been set.");
-        this.consumerConfigurationData.setTopicsPattern(topicsPattern);
-        return this;
-    }
-
-    /**
-     * Use topic pattern to config sets of topics to consumer.
-     *
-     * <p>Topic names (https://pulsar.apache.org/docs/latest/getting-started/ConceptsAndArchitecture/#Topics)
-     * are in the following format:
-     * {persistent|non-persistent}://tenant/namespace/topic
-     *
-     * @param topicsPattern topic pattern string to consumer from
-     * @return this builder
-     */
-    public PulsarSourceBuilder<T> topicsPatternString(String topicsPattern) {
-        Preconditions.checkArgument(StringUtils.isNotBlank(topicsPattern), "Topics pattern string cannot be blank");
-        Preconditions.checkArgument(this.consumerConfigurationData.getTopicsPattern() == null,
-            "Pattern has already been set.");
-        this.consumerConfigurationData.setTopicsPattern(Pattern.compile(topicsPattern));
-        return this;
-    }
-
-    /**
-     * Sets the subscription name for the topic consumer. Defaults to flink-sub.
-     *
-     * @param subscriptionName the subscription name for the topic consumer
-     * @return this builder
-     */
-    public PulsarSourceBuilder<T> subscriptionName(String subscriptionName) {
-        Preconditions.checkArgument(StringUtils.isNotBlank(subscriptionName),
-                "subscriptionName cannot be blank");
-        this.consumerConfigurationData.setSubscriptionName(subscriptionName);
-        return this;
-    }
-
-    /**
-     * Sets the subscription initial position for the topic consumer.
-     * Default is {@link SubscriptionInitialPosition#Latest}
-     *
-     * @param initialPosition the subscription initial position.
-     * @return this builder
-     */
-    public PulsarSourceBuilder<T> subscriptionInitialPosition(SubscriptionInitialPosition initialPosition) {
-        Preconditions.checkNotNull(initialPosition, "subscription initial position cannot be null");
-        this.consumerConfigurationData.setSubscriptionInitialPosition(initialPosition);
-        return this;
-    }
-
-    /**
-     * Sets the number of messages to receive before acknowledging. This defaults to 100. This
-     * value is only used when checkpointing is disabled.
-     *
-     * @param size number of messages to receive before acknowledging
-     * @return this builder
-     */
-    public PulsarSourceBuilder<T> acknowledgementBatchSize(long size) {
-        if (size > 0 && size <= MAX_ACKNOWLEDGEMENT_BATCH_SIZE) {
-            acknowledgementBatchSize = size;
-            return this;
-        }
-        throw new IllegalArgumentException(
-            "acknowledgementBatchSize can only take values > 0 and <= " + MAX_ACKNOWLEDGEMENT_BATCH_SIZE);
-    }
-
-    /**
-     * parameterize messageReceiveTimeoutMs for `PulsarConsumerSource`.
-     * @param timeout timeout in ms, should be gt 0
-     * @return this builder
-     */
-    public PulsarSourceBuilder<T> messageReceiveTimeoutMs(int timeout) {
-        if (timeout <= 0) {
-            throw new IllegalArgumentException("messageReceiveTimeoutMs can only take values > 0");
-        }
-        this.messageReceiveTimeoutMs = timeout;
-        return this;
-    }
-
-    /**
-     * Set the authentication provider to use in the Pulsar client instance.
-     *
-     * @param authentication an instance of the {@link Authentication} provider already constructed
-     * @return this builder
-     */
-    public PulsarSourceBuilder<T> authentication(Authentication authentication) {
-        Preconditions.checkArgument(authentication != null,
-                "authentication instance can not be null, use new AuthenticationDisabled() to disable authentication");
-        this.clientConfigurationData.setAuthentication(authentication);
-        return this;
-    }
-
-    /**
-     * Configure the authentication provider to use in the Pulsar client instance.
-     *
-     * @param authPluginClassName
-     *            name of the Authentication-Plugin to use
-     * @param authParamsString
-     *            string which represents parameters for the Authentication-Plugin, e.g., "key1:val1,key2:val2"
-     * @return this builder
-     * @throws PulsarClientException.UnsupportedAuthenticationException
-     *             failed to instantiate specified Authentication-Plugin
-     */
-    public PulsarSourceBuilder<T> authentication(String authPluginClassName, String authParamsString)
-            throws PulsarClientException.UnsupportedAuthenticationException {
-        Preconditions.checkArgument(StringUtils.isNotBlank(authPluginClassName),
-                "Authentication-Plugin class name can not be blank");
-        Preconditions.checkArgument(StringUtils.isNotBlank(authParamsString),
-                "Authentication-Plugin parameters can not be blank");
-        this.clientConfigurationData
-            .setAuthentication(AuthenticationFactory.create(authPluginClassName, authParamsString));
-        return this;
-    }
-
-    /**
-     * Configure the authentication provider to use in the Pulsar client instance
-     * using a config map.
-     *
-     * @param authPluginClassName
-     *            name of the Authentication-Plugin you want to use
-     * @param authParams
-     *            map which represents parameters for the Authentication-Plugin
-     * @return this builder
-     * @throws PulsarClientException.UnsupportedAuthenticationException
-     *             failed to instantiate specified Authentication-Plugin
-     */
-    public PulsarSourceBuilder<T> authentication(String authPluginClassName, Map<String, String> authParams)
-            throws PulsarClientException.UnsupportedAuthenticationException {
-        Preconditions.checkArgument(StringUtils.isNotBlank(authPluginClassName),
-                "Authentication-Plugin class name can not be blank");
-        Preconditions.checkArgument((authParams != null && !authParams.isEmpty()),
-                "parameters to authentication plugin can not be null/empty");
-        this.clientConfigurationData.setAuthentication(AuthenticationFactory.create(authPluginClassName, authParams));
-        return this;
-    }
-
-    /**
-     *
-     * @param clientConfigurationData All client conf wrapped in a POJO
-     * @return this builder
-     */
-    public PulsarSourceBuilder<T> pulsarAllClientConf(ClientConfigurationData clientConfigurationData) {
-        Preconditions.checkNotNull(clientConfigurationData, "client conf should not be null");
-        this.clientConfigurationData = clientConfigurationData;
-        return this;
-    }
-
-    /**
-     *
-     * @param consumerConfigurationData All consumer conf wrapped in a POJO
-     * @return this builder
-     */
-    public PulsarSourceBuilder<T> pulsarAllConsumerConf(ConsumerConfigurationData consumerConfigurationData) {
-        Preconditions.checkNotNull(consumerConfigurationData, "consumer conf should not be null");
-        this.consumerConfigurationData = consumerConfigurationData;
-        return this;
-    }
-
-
-    public SourceFunction<T> build() throws PulsarClientException{
-        Preconditions.checkArgument(StringUtils.isNotBlank(this.clientConfigurationData.getServiceUrl()),
-                "a service url is required");
-        Preconditions.checkArgument((this.consumerConfigurationData.getTopicNames() != null
-                && !this.consumerConfigurationData.getTopicNames().isEmpty())
-                || this.consumerConfigurationData.getTopicsPattern() != null,
-                "At least one topic or topics pattern is required");
-        Preconditions.checkArgument(StringUtils.isNotBlank(this.consumerConfigurationData.getSubscriptionName()),
-                "a subscription name is required");
-
-        setTransientFields();
-
-        return new PulsarConsumerSource<>(this);
-    }
-
-    private void setTransientFields() throws PulsarClientException {
-        setAuth();
-    }
-
-    private void setAuth() throws PulsarClientException{
-        if (StringUtils.isBlank(this.clientConfigurationData.getAuthPluginClassName())
-                || StringUtils.isBlank(this.clientConfigurationData.getAuthParams())) {
-            return;
-        }
-
-        clientConfigurationData.setAuthentication(
-                AuthenticationFactory.create(
-                        this.clientConfigurationData.getAuthPluginClassName(),
-                        this.clientConfigurationData.getAuthParams()));
-    }
-
-    /**
-     * Creates a PulsarSourceBuilder.
-     *
-     * @param deserializationSchema the deserializer used to convert between Pulsar's byte messages and Flink's objects.
-     * @return a builder
-     */
-    public static <T> PulsarSourceBuilder<T> builder(DeserializationSchema<T> deserializationSchema) {
-        Preconditions.checkNotNull(deserializationSchema, "deserializationSchema cannot be null");
-        return new PulsarSourceBuilder<>(deserializationSchema);
-    }
-}
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarTableSink.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarTableSink.java
deleted file mode 100644
index 6d7479c..0000000
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarTableSink.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.streaming.connectors.pulsar;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
-
-import java.util.Arrays;
-
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor;
-import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarPropertiesExtractor;
-import org.apache.flink.table.sinks.AppendStreamTableSink;
-import org.apache.flink.table.sinks.TableSink;
-import org.apache.flink.types.Row;
-import org.apache.pulsar.client.api.Authentication;
-import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
-import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
-
-/**
- * An append-only table sink to emit a streaming table as a Pulsar stream.
- */
-public abstract class PulsarTableSink implements AppendStreamTableSink<Row> {
-
-    protected ClientConfigurationData clientConfigurationData = new ClientConfigurationData();
-    protected ProducerConfigurationData producerConfigurationData = new ProducerConfigurationData();
-    protected SerializationSchema<Row> serializationSchema;
-    protected PulsarKeyExtractor<Row> keyExtractor;
-    protected PulsarPropertiesExtractor<Row> propertiesExtractor;
-    protected String[] fieldNames;
-    protected TypeInformation[] fieldTypes;
-    protected final String routingKeyFieldName;
-
-    public PulsarTableSink(
-            String serviceUrl,
-            String topic,
-            Authentication authentication,
-            String routingKeyFieldName) {
-        checkNotNull(serviceUrl, "Service url not set");
-        checkNotNull(topic, "Topic is null");
-        this.clientConfigurationData.setServiceUrl(serviceUrl);
-        this.clientConfigurationData.setAuthentication(authentication);
-        this.producerConfigurationData.setTopicName(topic);
-        this.routingKeyFieldName = routingKeyFieldName;
-    }
-
-    public PulsarTableSink(
-            ClientConfigurationData clientConfigurationData,
-            ProducerConfigurationData producerConfigurationData,
-            String routingKeyFieldName) {
-        this.clientConfigurationData = checkNotNull(clientConfigurationData, "client config is null");
-        this.producerConfigurationData = checkNotNull(producerConfigurationData, "producer config is null");
-        this.routingKeyFieldName = routingKeyFieldName;
-    }
-
-    /**
-     * Create serialization schema for converting table rows into bytes.
-     *
-     * @param rowSchema the schema of the row to serialize.
-     * @return Instance of serialization schema
-     */
-    protected abstract SerializationSchema<Row> createSerializationSchema(RowTypeInfo rowSchema);
-
-    /**
-     * Create a deep copy of this sink.
-     *
-     * @return Deep copy of this sink
-     */
-    protected abstract PulsarTableSink createSink();
-
-    /**
-     * Returns the low-level producer.
-     */
-    protected FlinkPulsarProducer<Row> createFlinkPulsarProducer() {
-        return new FlinkPulsarProducer<>(
-            clientConfigurationData,
-            producerConfigurationData,
-            serializationSchema,
-            keyExtractor,
-            propertiesExtractor);
-    }
-
-    @Override
-    public void emitDataStream(DataStream<Row> dataStream) {
-        checkState(fieldNames != null, "Table sink is not configured");
-        checkState(fieldTypes != null, "Table sink is not configured");
-        checkState(serializationSchema != null, "Table sink is not configured");
-        checkState(keyExtractor != null, "Table sink is not configured");
-
-        FlinkPulsarProducer<Row> producer = createFlinkPulsarProducer();
-        dataStream.addSink(producer);
-    }
-
-    @Override
-    public TypeInformation<Row> getOutputType() {
-        return new RowTypeInfo(fieldTypes, fieldNames);
-    }
-
-    @Override
-    public String[] getFieldNames() {
-        return fieldNames;
-    }
-
-    @Override
-    public TypeInformation<?>[] getFieldTypes() {
-        return fieldTypes;
-    }
-
-    @Override
-    public TableSink<Row> configure(String[] fieldNames,
-                                    TypeInformation<?>[] fieldTypes) {
-
-        PulsarTableSink sink = createSink();
-
-        sink.fieldNames = checkNotNull(fieldNames, "Field names are null");
-        sink.fieldTypes = checkNotNull(fieldTypes, "Field types are null");
-        checkArgument(fieldNames.length == fieldTypes.length,
-                "Number of provided field names and types do not match");
-
-        RowTypeInfo rowSchema = new RowTypeInfo(fieldTypes, fieldNames);
-        sink.serializationSchema = createSerializationSchema(rowSchema);
-        sink.keyExtractor = new RowKeyExtractor(
-                routingKeyFieldName,
-                fieldNames,
-                fieldTypes);
-        sink.propertiesExtractor = PulsarPropertiesExtractor.EMPTY;
-
-        return sink;
-    }
-
-    /**
-     * A key extractor that extracts the routing key from a {@link Row} by field name.
-     */
-    private static class RowKeyExtractor implements PulsarKeyExtractor<Row> {
-
-        private final int keyIndex;
-
-        public RowKeyExtractor(
-                String keyFieldName,
-                String[] fieldNames,
-                TypeInformation<?>[] fieldTypes) {
-            checkArgument(fieldNames.length == fieldTypes.length,
-                    "Number of provided field names and types does not match.");
-            int keyIndex = Arrays.asList(fieldNames).indexOf(keyFieldName);
-            checkArgument(keyIndex >= 0,
-                    "Key field '" + keyFieldName + "' not found");
-            checkArgument(Types.STRING.equals(fieldTypes[keyIndex]),
-                    "Key field must be of type 'STRING'");
-            this.keyIndex = keyIndex;
-        }
-
-        @Override
-        public String getKey(Row event) {
-            return (String) event.getField(keyIndex);
-        }
-    }
-}
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/package-info.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/package-info.java
deleted file mode 100644
index 6ed4a01..0000000
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-/**
- * Classes for implementing pulsar flink connector.
- */
-package org.apache.flink.streaming.connectors.pulsar;
\ No newline at end of file
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarKeyExtractor.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarKeyExtractor.java
deleted file mode 100644
index c8a858d..0000000
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarKeyExtractor.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.streaming.connectors.pulsar.partitioner;
-
-import java.io.Serializable;
-
-/**
- * Extract key from a value.
- */
-public interface PulsarKeyExtractor<T> extends Serializable {
-
-    PulsarKeyExtractor NULL = in -> null;
-
-    /**
-     * Retrieve a key from the value.
-     *
-     * @param in the value to extract a key.
-     * @return key.
-     */
-    String getKey(T in);
-
-}
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarPropertiesExtractor.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarPropertiesExtractor.java
deleted file mode 100644
index 67e86f3..0000000
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarPropertiesExtractor.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.streaming.connectors.pulsar.partitioner;
-
-import java.io.Serializable;
-import java.util.Collections;
-import java.util.Map;
-
-/**
- * Extract message properties from a value or others.
- */
-public interface PulsarPropertiesExtractor<T> extends Serializable {
-
-    PulsarPropertiesExtractor EMPTY = in -> Collections.emptyMap();
-
-    /**
-     * Retrieve properties from the value or others.
-     *
-     * @param in the value to extract a key.
-     * @return key.
-     */
-    Map<String, String> getProperties(T in);
-
-}
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/package-info.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/package-info.java
deleted file mode 100644
index 7a25be1..0000000
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-/**
- * Classes for implementing key extractors.
- */
-package org.apache.flink.streaming.connectors.pulsar.partitioner;
\ No newline at end of file
diff --git a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormatTest.java b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormatTest.java
deleted file mode 100644
index 736c417..0000000
--- a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormatTest.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.batch.connectors.pulsar;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
-import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
-import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
-import org.testng.annotations.Test;
-
-import static org.testng.Assert.assertNotNull;
-
-
-/**
- * Tests for Pulsar Avro Output Format
- */
-public class PulsarAvroOutputFormatTest {
-
-    @Test(expectedExceptions = IllegalArgumentException.class)
-    public void testPulsarAvroOutputFormatConstructorWhenServiceUrlIsNull() {
-        new PulsarAvroOutputFormat(null, "testTopic", new AuthenticationDisabled());
-    }
-
-    @Test(expectedExceptions = IllegalArgumentException.class)
-    public void testPulsarAvroOutputFormatConstructorWhenTopicNameIsNull() {
-        new PulsarAvroOutputFormat("testServiceUrl", null, new AuthenticationDisabled());
-    }
-
-    @Test(expectedExceptions = IllegalArgumentException.class)
-    public void testPulsarAvroOutputFormatConstructorWhenTopicNameIsBlank() {
-        new PulsarAvroOutputFormat("testServiceUrl", " ", new AuthenticationDisabled());
-    }
-
-    @Test(expectedExceptions = IllegalArgumentException.class)
-    public void testPulsarAvroOutputFormatConstructorWhenServiceUrlIsBlank() {
-        new PulsarAvroOutputFormat(" ", "testTopic", new AuthenticationDisabled());
-    }
-
-    @Test
-    public void testPulsarAvroOutputFormatConstructor() {
-        PulsarAvroOutputFormat pulsarAvroOutputFormat =
-                new PulsarAvroOutputFormat("testServiceUrl", "testTopic", new AuthenticationDisabled());
-        assertNotNull(pulsarAvroOutputFormat);
-    }
-
-    @Test(expectedExceptions = IllegalArgumentException.class)
-    public void testPulsarAvroOutputFormatConstructorV2WhenServiceUrlIsNull() {
-        ClientConfigurationData clientConf = new ClientConfigurationData();
-        clientConf.setServiceUrl(null);
-
-        ProducerConfigurationData producerConf = new ProducerConfigurationData();
-        producerConf.setTopicName("testTopic");
-
-        new PulsarAvroOutputFormat(clientConf, producerConf);
-    }
-
-    @Test(expectedExceptions = IllegalArgumentException.class)
-    public void testPulsarAvroOutputFormatConstructorV2WhenTopicNameIsNull() {
-        ClientConfigurationData clientConf = new ClientConfigurationData();
-        clientConf.setServiceUrl("testServiceUrl");
-
-        ProducerConfigurationData producerConf = new ProducerConfigurationData();
-        producerConf.setTopicName(null);
-
-        new PulsarAvroOutputFormat(clientConf, producerConf);
-    }
-
-    @Test(expectedExceptions = IllegalArgumentException.class)
-    public void testPulsarAvroOutputFormatConstructorV2WhenTopicNameIsBlank() {
-        ClientConfigurationData clientConf = new ClientConfigurationData();
-        clientConf.setServiceUrl("testServiceUrl");
-
-        ProducerConfigurationData producerConf = new ProducerConfigurationData();
-        producerConf.setTopicName(StringUtils.EMPTY);
-
-        new PulsarAvroOutputFormat(clientConf, producerConf);
-    }
-
-    @Test(expectedExceptions = IllegalArgumentException.class)
-    public void testPulsarAvroOutputFormatConstructorV2WhenServiceUrlIsBlank() {
-        ClientConfigurationData clientConf = new ClientConfigurationData();
-        clientConf.setServiceUrl(StringUtils.EMPTY);
-
-        ProducerConfigurationData producerConf = new ProducerConfigurationData();
-        producerConf.setTopicName(StringUtils.EMPTY);
-
-        new PulsarAvroOutputFormat(clientConf, producerConf);
-    }
-
-    @Test
-    public void testPulsarAvroOutputFormatConstructorV2() {
-        ClientConfigurationData clientConf = new ClientConfigurationData();
-        clientConf.setServiceUrl("testServiceUrl");
-
-        ProducerConfigurationData producerConf = new ProducerConfigurationData();
-        producerConf.setTopicName("testTopic");
-
-        PulsarAvroOutputFormat pulsarAvroOutputFormat = new PulsarAvroOutputFormat(clientConf, producerConf);
-        assertNotNull(pulsarAvroOutputFormat);
-    }
-}
diff --git a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormatTest.java b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormatTest.java
deleted file mode 100644
index 713f867..0000000
--- a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormatTest.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.batch.connectors.pulsar;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
-import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
-import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
-import org.testng.annotations.Test;
-
-import static org.testng.Assert.assertNotNull;
-
-/**
- * Tests for Pulsar Csv Output Format
- */
-public class PulsarCsvOutputFormatTest {
-
-    @Test(expectedExceptions = IllegalArgumentException.class)
-    public void testPulsarCsvOutputFormatConstructorWhenServiceUrlIsNull() {
-        new PulsarCsvOutputFormat(null, "testTopic", new AuthenticationDisabled());
-    }
-
-    @Test(expectedExceptions = IllegalArgumentException.class)
-    public void testPulsarCsvOutputFormatConstructorWhenTopicNameIsNull() {
-        new PulsarCsvOutputFormat("testServiceUrl", null, new AuthenticationDisabled());
-    }
-
-    @Test(expectedExceptions = IllegalArgumentException.class)
-    public void testPulsarCsvOutputFormatConstructorWhenTopicNameIsBlank() {
-        new PulsarCsvOutputFormat("testServiceUrl", " ", new AuthenticationDisabled());
-    }
-
-    @Test(expectedExceptions = IllegalArgumentException.class)
-    public void testPulsarCsvOutputFormatConstructorWhenServiceUrlIsBlank() {
-        new PulsarCsvOutputFormat(" ", "testTopic", new AuthenticationDisabled());
-    }
-
-    @Test
-    public void testPulsarCsvOutputFormatConstructor() {
-        PulsarCsvOutputFormat pulsarCsvOutputFormat =
-                new PulsarCsvOutputFormat("testServiceUrl", "testTopic", new AuthenticationDisabled());
-        assertNotNull(pulsarCsvOutputFormat);
-    }
-
-    @Test(expectedExceptions = IllegalArgumentException.class)
-    public void testPulsarCsvOutputFormatConstructorV2WhenServiceUrlIsNull() {
-        ClientConfigurationData clientConf = new ClientConfigurationData();
-        clientConf.setServiceUrl(null);
-
-        ProducerConfigurationData producerConf = new ProducerConfigurationData();
-        producerConf.setTopicName("testTopic");
-
-        new PulsarAvroOutputFormat(clientConf, producerConf);
-    }
-
-    @Test(expectedExceptions = IllegalArgumentException.class)
-    public void testPulsarCsvOutputFormatConstructorV2WhenTopicNameIsNull() {
-        ClientConfigurationData clientConf = new ClientConfigurationData();
-        clientConf.setServiceUrl("testServiceUrl");
-
-        ProducerConfigurationData producerConf = new ProducerConfigurationData();
-        producerConf.setTopicName(null);
-
-        new PulsarAvroOutputFormat(clientConf, producerConf);
-    }
-
-    @Test(expectedExceptions = IllegalArgumentException.class)
-    public void testPulsarCsvOutputFormatConstructorV2WhenTopicNameIsBlank() {
-        ClientConfigurationData clientConf = new ClientConfigurationData();
-        clientConf.setServiceUrl("testServiceUrl");
-
-        ProducerConfigurationData producerConf = new ProducerConfigurationData();
-        producerConf.setTopicName(StringUtils.EMPTY);
-
-        new PulsarAvroOutputFormat(clientConf, producerConf);
-    }
-
-    @Test(expectedExceptions = IllegalArgumentException.class)
-    public void testPulsarCsvOutputFormatConstructorV2WhenServiceUrlIsBlank() {
-        ClientConfigurationData clientConf = new ClientConfigurationData();
-        clientConf.setServiceUrl(StringUtils.EMPTY);
-
-        ProducerConfigurationData producerConf = new ProducerConfigurationData();
-        producerConf.setTopicName("testTopic");
-
-        new PulsarAvroOutputFormat(clientConf, producerConf);
-    }
-
-    @Test
-    public void testPulsarCsvOutputFormatConstructorV2() {
-        ClientConfigurationData clientConf = new ClientConfigurationData();
-        clientConf.setServiceUrl("testServiceUrl");
-
-        ProducerConfigurationData producerConf = new ProducerConfigurationData();
-        producerConf.setTopicName("testTopic");
-
-        PulsarCsvOutputFormat pulsarCsvOutputFormat = new PulsarCsvOutputFormat(clientConf, producerConf);
-        assertNotNull(pulsarCsvOutputFormat);
-    }
-}
diff --git a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormatTest.java b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormatTest.java
deleted file mode 100644
index d45d9b1..0000000
--- a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormatTest.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.batch.connectors.pulsar;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
-import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
-import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
-import org.testng.annotations.Test;
-
-import static org.testng.Assert.assertNotNull;
-
-/**
- * Tests for Pulsar Json Output Format
- */
-public class PulsarJsonOutputFormatTest {
-
-    @Test(expectedExceptions = IllegalArgumentException.class)
-    public void testPulsarJsonOutputFormatConstructorWhenServiceUrlIsNull() {
-        new PulsarJsonOutputFormat(null, "testTopic", new AuthenticationDisabled());
-    }
-
-    @Test(expectedExceptions = IllegalArgumentException.class)
-    public void testPulsarJsonOutputFormatConstructorWhenTopicNameIsNull() {
-        new PulsarJsonOutputFormat("testServiceUrl", null, new AuthenticationDisabled());
-    }
-
-    @Test(expectedExceptions = IllegalArgumentException.class)
-    public void testPulsarJsonOutputFormatConstructorWhenTopicNameIsBlank() {
-        new PulsarJsonOutputFormat("testServiceUrl", " ", new AuthenticationDisabled());
-    }
-
-    @Test(expectedExceptions = IllegalArgumentException.class)
-    public void testPulsarJsonOutputFormatConstructorWhenServiceUrlIsBlank() {
-        new PulsarJsonOutputFormat(" ", "testTopic", new AuthenticationDisabled());
-    }
-
-    @Test
-    public void testPulsarJsonOutputFormatConstructor() {
-        PulsarJsonOutputFormat pulsarJsonOutputFormat =
-                new PulsarJsonOutputFormat("testServiceUrl", "testTopic", new AuthenticationDisabled());
-        assertNotNull(pulsarJsonOutputFormat);
-    }
-
-    @Test(expectedExceptions = IllegalArgumentException.class)
-    public void testPulsarJsonOutputFormatConstructorV2WhenServiceUrlIsNull() {
-        ClientConfigurationData clientConf = new ClientConfigurationData();
-        clientConf.setServiceUrl(null);
-
-        ProducerConfigurationData producerConf = new ProducerConfigurationData();
-        producerConf.setTopicName("testTopic");
-
-        new PulsarAvroOutputFormat(clientConf, producerConf);
-    }
-
-    @Test(expectedExceptions = IllegalArgumentException.class)
-    public void testPulsarJsonROutputFormatConstructorV2WhenTopicNameIsNull() {
-        ClientConfigurationData clientConf = new ClientConfigurationData();
-        clientConf.setServiceUrl("testServiceUrl");
-
-        ProducerConfigurationData producerConf = new ProducerConfigurationData();
-        producerConf.setTopicName(null);
-
-        new PulsarAvroOutputFormat(clientConf, producerConf);
-    }
-
-    @Test(expectedExceptions = IllegalArgumentException.class)
-    public void testPulsarJsonOutputFormatConstructorV2WhenTopicNameIsBlank() {
-        ClientConfigurationData clientConf = new ClientConfigurationData();
-        clientConf.setServiceUrl("testServiceUrl");
-
-        ProducerConfigurationData producerConf = new ProducerConfigurationData();
-        producerConf.setTopicName(StringUtils.EMPTY);
-
-        new PulsarAvroOutputFormat(clientConf, producerConf);
-    }
-
-    @Test(expectedExceptions = IllegalArgumentException.class)
-    public void testPulsarJsonOutputFormatConstructorV2WhenServiceUrlIsBlank() {
-        ClientConfigurationData clientConf = new ClientConfigurationData();
-        clientConf.setServiceUrl(StringUtils.EMPTY);
-
-        ProducerConfigurationData producerConf = new ProducerConfigurationData();
-        producerConf.setTopicName("testTopic");
-
-        new PulsarAvroOutputFormat(clientConf, producerConf);
-    }
-
-    @Test
-    public void testPulsarJsonOutputFormatConstructorV2() {
-        ClientConfigurationData clientConf = new ClientConfigurationData();
-        clientConf.setServiceUrl("testServiceUrl");
-
-        ProducerConfigurationData producerConf = new ProducerConfigurationData();
-        producerConf.setTopicName("testTopic");
-
-        PulsarJsonOutputFormat pulsarJsonOutputFormat = new PulsarJsonOutputFormat(clientConf, producerConf);
-        assertNotNull(pulsarJsonOutputFormat);
-    }
-}
diff --git a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormatTest.java b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormatTest.java
deleted file mode 100644
index 97b23b4..0000000
--- a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormatTest.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.batch.connectors.pulsar;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
-import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
-import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
-import org.testng.annotations.Test;
-
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-
-/**
- * Tests for Pulsar Output Format
- */
-public class PulsarOutputFormatTest {
-
-    @Test(expectedExceptions = IllegalArgumentException.class)
-    public void testPulsarOutputFormatConstructorWhenServiceUrlIsNull() {
-        new PulsarOutputFormat(null, "testTopic", new AuthenticationDisabled(), text -> text.toString().getBytes());
-    }
-
-    @Test(expectedExceptions = IllegalArgumentException.class)
-    public void testPulsarOutputFormatConstructorWhenTopicNameIsNull() {
-        new PulsarOutputFormat("testServiceUrl", null, new AuthenticationDisabled(), text -> text.toString().getBytes());
-    }
-
-    @Test(expectedExceptions = IllegalArgumentException.class)
-    public void testPulsarOutputFormatConstructorWhenTopicNameIsBlank() {
-        new PulsarOutputFormat("testServiceUrl", " ", new AuthenticationDisabled(), text -> text.toString().getBytes());
-    }
-
-    @Test(expectedExceptions = IllegalArgumentException.class)
-    public void testPulsarOutputFormatConstructorWhenServiceUrlIsBlank() {
-        new PulsarOutputFormat(" ", "testTopic", new AuthenticationDisabled(), text -> text.toString().getBytes());
-    }
-
-    @Test(expectedExceptions = NullPointerException.class)
-    public void testPulsarOutputFormatConstructorWhenSerializationSchemaIsNull() {
-        new PulsarOutputFormat("testServiceUrl", "testTopic", new AuthenticationDisabled(), null);
-    }
-
-    @Test(expectedExceptions = IllegalArgumentException.class)
-    public void testPulsarOutputFormatConstructorV2WhenServiceUrlIsNull() {
-        ClientConfigurationData clientConf = new ClientConfigurationData();
-        clientConf.setServiceUrl(null);
-
-        ProducerConfigurationData producerConf = new ProducerConfigurationData();
-        producerConf.setTopicName("testTopic");
-
-        new PulsarOutputFormat(clientConf, producerConf, text -> text.toString().getBytes());
-    }
-
-    @Test(expectedExceptions = IllegalArgumentException.class)
-    public void testPulsarOutputFormatConstructorV2WhenTopicNameIsNull() {
-        ClientConfigurationData clientConf = new ClientConfigurationData();
-        clientConf.setServiceUrl("testServiceUrl");
-
-        ProducerConfigurationData producerConf = new ProducerConfigurationData();
-        producerConf.setTopicName(null);
-
-        new PulsarOutputFormat(clientConf, producerConf, text -> text.toString().getBytes());
-    }
-
-    @Test(expectedExceptions = IllegalArgumentException.class)
-    public void testPulsarOutputFormatConstructorV2WhenTopicNameIsBlank() {
-        ClientConfigurationData clientConf = new ClientConfigurationData();
-        clientConf.setServiceUrl("testServiceUrl");
-
-        ProducerConfigurationData producerConf = new ProducerConfigurationData();
-        producerConf.setTopicName(StringUtils.EMPTY);
-
-        new PulsarOutputFormat(clientConf, producerConf, text -> text.toString().getBytes());
-    }
-
-    @Test(expectedExceptions = IllegalArgumentException.class)
-    public void testPulsarOutputFormatConstructorV2WhenServiceUrlIsBlank() {
-        ClientConfigurationData clientConf = new ClientConfigurationData();
-        clientConf.setServiceUrl(StringUtils.EMPTY);
-
-        ProducerConfigurationData producerConf = new ProducerConfigurationData();
-        producerConf.setTopicName("testTopic");
-
-        new PulsarOutputFormat(clientConf, producerConf, text -> text.toString().getBytes());
-    }
-
-    @Test(expectedExceptions = NullPointerException.class)
-    public void testPulsarOutputFormatConstructorV2WhenSerializationSchemaIsNull() {
-        ClientConfigurationData clientConf = new ClientConfigurationData();
-        clientConf.setServiceUrl("testServiceUrl");
-
-        ProducerConfigurationData producerConf = new ProducerConfigurationData();
-        producerConf.setTopicName("testTopic");
-        new PulsarOutputFormat(clientConf, producerConf, null);
-    }
-
-    @Test
-    public void testPulsarOutputFormatConstructorV2() {
-        ClientConfigurationData clientConf = new ClientConfigurationData();
-        clientConf.setServiceUrl("testServiceUrl");
-
-        ProducerConfigurationData producerConf = new ProducerConfigurationData();
-        producerConf.setTopicName("testTopic");
-
-        PulsarCsvOutputFormat pulsarCsvOutputFormat = new PulsarCsvOutputFormat(clientConf, producerConf);
-        assertNotNull(pulsarCsvOutputFormat);
-    }
-
-    @Test
-    public void testPulsarOutputFormatWithStringSerializationSchema() throws IOException {
-        String input = "Wolfgang Amadeus Mozart";
-        PulsarOutputFormat pulsarOutputFormat =
-                new PulsarOutputFormat("testServiceUrl", "testTopic", new AuthenticationDisabled(),
-                        text -> text.toString().getBytes());
-        assertNotNull(pulsarOutputFormat);
-        byte[] bytes = pulsarOutputFormat.serializationSchema.serialize(input);
-        String resultString = IOUtils.toString(bytes, StandardCharsets.UTF_8.toString());
-        assertEquals(input, resultString);
-    }
-
-    @Test
-    public void testPulsarOutputFormatWithCustomSerializationSchema() throws IOException {
-        Employee employee = new Employee(1, "Test Employee", "Test Department");
-        PulsarOutputFormat pulsarOutputFormat =
-                new PulsarOutputFormat("testServiceUrl", "testTopic", new AuthenticationDisabled(),
-                        new EmployeeSerializationSchema());
-        assertNotNull(pulsarOutputFormat);
-
-        byte[] bytes = pulsarOutputFormat.serializationSchema.serialize(employee);
-        String resultString = IOUtils.toString(bytes, StandardCharsets.UTF_8.toString());
-        assertEquals(employee.toString(), resultString);
-    }
-
-    /**
-     * Employee Serialization Schema.
-     */
-    private class EmployeeSerializationSchema implements SerializationSchema<Employee> {
-
-        @Override
-        public byte[] serialize(Employee employee) {
-            StringBuilder stringBuilder = new StringBuilder();
-            stringBuilder.append(employee.id);
-            stringBuilder.append(" - ");
-            stringBuilder.append(employee.name);
-            stringBuilder.append(" - ");
-            stringBuilder.append(employee.department);
-
-            return stringBuilder.toString().getBytes();
-        }
-    }
-
-    /**
-     * Data type for Employee Model.
-     */
-    private class Employee {
-
-        public long id;
-        public String name;
-        public String department;
-
-        public Employee(long id, String name, String department) {
-            this.id = id;
-            this.name = name;
-            this.department = department;
-        }
-
-        @Override
-        public String toString() {
-            return id + " - " + name + " - " + department;
-        }
-    }
-
-}
diff --git a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/serialization/AvroSerializationSchemaTest.java b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/serialization/AvroSerializationSchemaTest.java
deleted file mode 100644
index c283d2f..0000000
--- a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/serialization/AvroSerializationSchemaTest.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.batch.connectors.pulsar.serialization;
-
-import org.apache.avro.generic.GenericRecord;
-import org.apache.flink.avro.generated.NasaMission;
-import org.apache.flink.formats.avro.AvroDeserializationSchema;
-import org.testng.annotations.Test;
-
-import java.io.IOException;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNull;
-
-/**
- * Tests for Avro Serialization Schema
- */
-public class AvroSerializationSchemaTest {
-
-    @Test
-    public void testAvroSerializationSchemaWithSuccessfulCase() throws IOException {
-        NasaMission nasaMission = NasaMission.newBuilder().setId(1).setName("Mercury program").setStartYear(1959).setEndYear(1963).build();
-        AvroSerializationSchema schema = new AvroSerializationSchema();
-        byte[] rowBytes = schema.serialize(nasaMission);
-
-        AvroDeserializationSchema<GenericRecord> deserializationSchema = AvroDeserializationSchema.forGeneric(nasaMission.getSchema());
-        GenericRecord genericRecord = deserializationSchema.deserialize(rowBytes);
-
-        assertEquals(nasaMission.getId(), genericRecord.get("id"));
-        assertEquals(nasaMission.getName(), genericRecord.get("name").toString());
-        assertEquals(nasaMission.getStartYear(), genericRecord.get("start_year"));
-        assertEquals(nasaMission.getEndYear(), genericRecord.get("end_year"));
-    }
-
-    @Test
-    public void testAvroSerializationSchemaWithEmptyRecord() throws IOException {
-        NasaMission nasaMission = NasaMission.newBuilder().setId(1).setName("Mercury program").setStartYear(null).setEndYear(null).build();
-        AvroSerializationSchema schema = new AvroSerializationSchema();
-        byte[] rowBytes = schema.serialize(nasaMission);
-
-        AvroDeserializationSchema<GenericRecord> deserializationSchema = AvroDeserializationSchema.forGeneric(nasaMission.getSchema());
-        GenericRecord genericRecord = deserializationSchema.deserialize(rowBytes);
-
-        assertEquals(nasaMission.getId(), genericRecord.get("id"));
-        assertEquals(nasaMission.getName(), genericRecord.get("name").toString());
-        assertNull(genericRecord.get("start_year"));
-        assertNull(genericRecord.get("end_year"));
-    }
-
-}
diff --git a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/serialization/CsvSerializationSchemaTest.java b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/serialization/CsvSerializationSchemaTest.java
deleted file mode 100644
index 3666e54..0000000
--- a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/serialization/CsvSerializationSchemaTest.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.batch.connectors.pulsar.serialization;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.testng.annotations.Test;
-
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-
-import static org.testng.Assert.assertEquals;
-
-/**
- * Tests for Csv Serialization Schema
- */
-public class CsvSerializationSchemaTest {
-
-    @Test
-    public void testCsvSerializationSchemaWithSuccessfulCase() throws IOException {
-        Tuple3<Integer, String, String> employee = new Tuple3(1, "Wolfgang Amadeus", "Mozart");
-        CsvSerializationSchema schema = new CsvSerializationSchema();
-        byte[] rowBytes = schema.serialize(employee);
-        String csvContent = IOUtils.toString(rowBytes, StandardCharsets.UTF_8.toString());
-        assertEquals(csvContent, "1,Wolfgang Amadeus,Mozart");
-    }
-
-    @Test
-    public void testCsvSerializationSchemaWithEmptyRecord() throws IOException {
-        Tuple3<Integer, String, String> employee = new Tuple3();
-        CsvSerializationSchema schema = new CsvSerializationSchema();
-        byte[] employeeBytes = schema.serialize(employee);
-        String str = IOUtils.toString(employeeBytes, StandardCharsets.UTF_8.toString());
-        assertEquals(str, ",,");
-    }
-
-}
diff --git a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/serialization/JsonSerializationSchemaTest.java b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/serialization/JsonSerializationSchemaTest.java
deleted file mode 100644
index 80a1cd8..0000000
--- a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/serialization/JsonSerializationSchemaTest.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.batch.connectors.pulsar.serialization;
-
-import org.apache.commons.io.IOUtils;
-import org.testng.annotations.Test;
-
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-
-import static org.testng.Assert.assertEquals;
-
-/**
- * Tests for Json Serialization Schema
- */
-public class JsonSerializationSchemaTest {
-
-    @Test
-    public void testJsonSerializationSchemaWithSuccessfulCase() throws IOException {
-        Employee employee = new Employee(1, "Test Name");
-        JsonSerializationSchema schema = new JsonSerializationSchema();
-        byte[] rowBytes = schema.serialize(employee);
-        String jsonContent = IOUtils.toString(rowBytes, StandardCharsets.UTF_8.toString());
-        assertEquals(jsonContent, "{\"id\":1,\"name\":\"Test Name\"}");
-    }
-
-    @Test
-    public void testJsonSerializationSchemaWithEmptyRecord() throws IOException {
-        Employee employee = new Employee();
-        JsonSerializationSchema schema = new JsonSerializationSchema();
-        byte[] employeeBytes = schema.serialize(employee);
-        String jsonContent = IOUtils.toString(employeeBytes, StandardCharsets.UTF_8.toString());
-        assertEquals(jsonContent, "{\"id\":0,\"name\":null}");
-    }
-
-    @Test(expectedExceptions = RuntimeException.class)
-    public void testJsonSerializationSchemaWithNotSerializableObject() {
-        NotSerializableObject notSerializableObject = new NotSerializableObject();
-        JsonSerializationSchema schema = new JsonSerializationSchema();
-        schema.serialize(notSerializableObject);
-    }
-
-    /**
-     * Employee data model
-     */
-    private static class Employee {
-
-        private long id;
-        private String name;
-
-        public Employee() {
-        }
-
-        public Employee(long id, String name) {
-            this.id = id;
-            this.name = name;
-        }
-
-        public long getId() {
-            return id;
-        }
-
-        public String getName() {
-            return name;
-        }
-
-    }
-
-    /**
-     * Not Serializable Object due to not having any public property
-     */
-    private static class NotSerializableObject {
-
-        private long id;
-        private String name;
-
-    }
-}
diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/CachedPulsarClientTest.java b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/CachedPulsarClientTest.java
deleted file mode 100644
index 0c81c4a..0000000
--- a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/CachedPulsarClientTest.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.streaming.connectors.pulsar;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotEquals;
-
-import java.util.concurrent.ConcurrentMap;
-
-import org.apache.pulsar.client.impl.PulsarClientImpl;
-import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
-import org.mockito.Mockito;
-import org.powermock.api.mockito.PowerMockito;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-/**
- * Unit test of {@link CachedPulsarClient}.
- */
-public class CachedPulsarClientTest {
-
-    private static final String SERVICE_URL = "pulsar://localhost:6650";
-
-    @BeforeMethod
-    public void clearCache() {
-        CachedPulsarClient.clear();
-    }
-
-    @Test
-    public void testShouldReturnSameInstanceWithSameParam() throws Exception {
-        PulsarClientImpl impl1 = Mockito.mock(PulsarClientImpl.class);
-        PulsarClientImpl impl2 = Mockito.mock(PulsarClientImpl.class);
-
-        ClientConfigurationData conf1 = new ClientConfigurationData();
-        conf1.setServiceUrl(SERVICE_URL);
-
-        ClientConfigurationData conf2 = new ClientConfigurationData();
-        conf2.setServiceUrl(SERVICE_URL);
-
-        PowerMockito.whenNew(PulsarClientImpl.class)
-            .withArguments(conf1).thenReturn(impl1);
-        PowerMockito.whenNew(PulsarClientImpl.class)
-            .withArguments(conf2).thenReturn(impl2);
-
-        PulsarClientImpl client1 = CachedPulsarClient.getOrCreate(conf1);
-        PulsarClientImpl client2 = CachedPulsarClient.getOrCreate(conf2);
-        PulsarClientImpl client3 = CachedPulsarClient.getOrCreate(conf1);
-
-        assertEquals(client1, client2);
-        assertEquals(client1, client3);
-
-        assertEquals(CachedPulsarClient.getAsMap().size(), 1);
-    }
-
-    @Test
-    public void testShouldCloseTheCorrectClient() throws Exception {
-        PulsarClientImpl impl1 = Mockito.mock(PulsarClientImpl.class);
-        PulsarClientImpl impl2 = Mockito.mock(PulsarClientImpl.class);
-
-        ClientConfigurationData conf1 = new ClientConfigurationData();
-        conf1.setServiceUrl(SERVICE_URL);
-
-        ClientConfigurationData conf2 = new ClientConfigurationData();
-        conf2.setServiceUrl(SERVICE_URL);
-        conf2.setNumIoThreads(5);
-
-        PowerMockito.whenNew(PulsarClientImpl.class)
-            .withArguments(conf1).thenReturn(impl1);
-        PowerMockito.whenNew(PulsarClientImpl.class)
-            .withArguments(conf2).thenReturn(impl2);
-
-        PulsarClientImpl client1 = CachedPulsarClient.getOrCreate(conf1);
-        PulsarClientImpl client2 = CachedPulsarClient.getOrCreate(conf2);
-
-        assertNotEquals(client1, client2);
-
-        ConcurrentMap<ClientConfigurationData, PulsarClientImpl> map1 = CachedPulsarClient.getAsMap();
-        assertEquals(map1.size(), 2);
-
-        CachedPulsarClient.close(conf2);
-
-        ConcurrentMap<ClientConfigurationData, PulsarClientImpl> map2 = CachedPulsarClient.getAsMap();
-        assertEquals(map2.size(), 1);
-
-        assertEquals(map2.values().iterator().next(), client1);
-    }
-
-    @Test
-    public void getClientFromCacheShouldAlwaysReturnAnOpenedInstance() throws Exception {
-        PulsarClientImpl impl1 = Mockito.mock(PulsarClientImpl.class);
-
-        ClientConfigurationData conf1 = new ClientConfigurationData();
-        conf1.setServiceUrl(SERVICE_URL);
-
-        PowerMockito.whenNew(PulsarClientImpl.class)
-                .withArguments(conf1).thenReturn(impl1);
-
-        PulsarClientImpl client1 = CachedPulsarClient.getOrCreate(conf1);
-
-        ConcurrentMap<ClientConfigurationData, PulsarClientImpl> map1 = CachedPulsarClient.getAsMap();
-        assertEquals(map1.size(), 1);
-
-        client1.getState().set(PulsarClientImpl.State.Closed);
-
-        PulsarClientImpl client2 = CachedPulsarClient.getOrCreate(conf1);
-
-        assertNotEquals(client1, client2);
-
-        ConcurrentMap<ClientConfigurationData, PulsarClientImpl> map2 = CachedPulsarClient.getAsMap();
-        assertEquals(map2.size(), 1);
-    }
-}
diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java
deleted file mode 100644
index 49b50bb..0000000
--- a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.streaming.connectors.pulsar;
-
-import org.apache.commons.lang3.reflect.FieldUtils;
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.avro.generated.NasaMission;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor;
-import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarPropertiesExtractor;
-import org.apache.flink.table.sinks.TableSink;
-import org.apache.flink.types.Row;
-import org.apache.pulsar.client.api.Authentication;
-import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
-import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
-import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
-import org.mockito.Mockito;
-import org.powermock.api.mockito.PowerMockito;
-import org.testng.annotations.Test;
-
-import static org.testng.Assert.assertNotNull;
-import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals;
-
-/**
- * Unit test of {@link PulsarAvroTableSink}.
- */
-public class PulsarAvroTableSinkTest {
-    private static final String SERVICE_URL = "pulsar://localhost:6650";
-    private static final String TOPIC_NAME = "test_topic";
-    private static final Authentication AUTHENTICATION = new AuthenticationDisabled();
-    private static final String ROUTING_KEY = "name";
-
-    private final String[] fieldNames = {"id", "name","start_year","end_year"};
-    private final TypeInformation[] typeInformations = {
-            TypeInformation.of(Integer.class),
-            TypeInformation.of(String.class),
-            TypeInformation.of(Integer.class),
-            TypeInformation.of(Integer.class)
-    };
-
-    /**
-     * Test configure PulsarTableSink.
-     *
-     * @throws Exception
-     */
-    @Test
-    public void testConfigure() throws Exception {
-        PulsarAvroTableSink sink = spySink();
-
-        TableSink<Row> configuredSink = sink.configure(fieldNames, typeInformations);
-
-        assertArrayEquals(fieldNames, configuredSink.getFieldNames());
-        assertArrayEquals(typeInformations, configuredSink.getFieldTypes());
-        assertNotNull(((PulsarAvroTableSink) configuredSink).keyExtractor);
-        assertNotNull(((PulsarAvroTableSink) configuredSink).serializationSchema);
-    }
-
-
-    /**
-     * Test emit data stream.
-     *
-     * @throws Exception
-     */
-    @Test
-    public void testEmitDataStream() throws Exception {
-        DataStream mockedDataStream = Mockito.mock(DataStream.class);
-
-        PulsarAvroTableSink sink = spySink();
-
-        sink.emitDataStream(mockedDataStream);
-
-        Mockito.verify(mockedDataStream).addSink(Mockito.any(FlinkPulsarProducer.class));
-    }
-
-
-    private PulsarAvroTableSink spySink() throws Exception {
-
-        ClientConfigurationData clientConf = new ClientConfigurationData();
-        clientConf.setServiceUrl(SERVICE_URL);
-
-        ProducerConfigurationData producerConf = new ProducerConfigurationData();
-        producerConf.setTopicName(TOPIC_NAME);
-
-        PulsarAvroTableSink sink =
-                new PulsarAvroTableSink(clientConf, producerConf, ROUTING_KEY, NasaMission.class);
-        FlinkPulsarProducer producer = Mockito.mock(FlinkPulsarProducer.class);
-        PowerMockito.whenNew(
-                FlinkPulsarProducer.class
-        ).withArguments(
-                Mockito.anyString(),
-                Mockito.anyString(),
-                Mockito.any(Authentication.class),
-                Mockito.any(SerializationSchema.class),
-                Mockito.any(PulsarKeyExtractor.class),
-                Mockito.any(PulsarPropertiesExtractor.class)
-        ).thenReturn(producer);
-        FieldUtils.writeField(sink, "fieldNames", fieldNames, true);
-        FieldUtils.writeField(sink, "fieldTypes", typeInformations, true);
-        FieldUtils.writeField(sink, "serializationSchema", Mockito.mock(SerializationSchema.class), true);
-        FieldUtils.writeField(sink, "keyExtractor", Mockito.mock(PulsarKeyExtractor.class), true);
-        FieldUtils.writeField(sink, "propertiesExtractor", Mockito.mock(PulsarPropertiesExtractor.class), true);
-        return sink;
-    }
-
-}
diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java
deleted file mode 100644
index 4266008..0000000
--- a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java
+++ /dev/null
@@ -1,659 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.streaming.connectors.pulsar;
-
-import java.util.function.Function;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.serialization.SimpleStringSchema;
-import org.apache.flink.api.common.state.OperatorStateStore;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.operators.StreamSource;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
-import org.apache.pulsar.client.api.transaction.Transaction;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerStats;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.Messages;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.impl.MessageImpl;
-import org.apache.pulsar.common.api.proto.MessageMetadata;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
-
-/**
- * Tests for the PulsarConsumerSource. The source supports two operation modes.
- * 1) At-least-once (when checkpointed) with Pulsar message acknowledgements and the deduplication mechanism in
- *    {@link org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase}..
- * 3) No strong delivery guarantees (without checkpointing) with Pulsar acknowledging messages after
- *	  after it receives x number of messages.
- *
- * <p>This tests assumes that the MessageIds are increasing monotonously. That doesn't have to be the
- * case. The MessageId is used to uniquely identify messages.
- */
-public class PulsarConsumerSourceTests {
-
-    private PulsarConsumerSource<String> source;
-
-    private TestConsumer consumer;
-
-    private TestSourceContext context;
-
-    private Thread sourceThread;
-
-    private Exception exception;
-
-    @BeforeMethod
-    public void before() {
-        context = new TestSourceContext();
-
-        sourceThread = new Thread(() -> {
-            try {
-                source.run(context);
-            } catch (Exception e) {
-                exception = e;
-            }
-        });
-    }
-
-    @AfterMethod
-    public void after() throws Exception {
-        if (source != null) {
-            source.cancel();
-        }
-        if (sourceThread != null) {
-            sourceThread.join();
-        }
-    }
-
-    @Test
-    public void testCheckpointing() throws Exception {
-        final int numMessages = 5;
-        consumer = new TestConsumer(numMessages);
-
-        source = createSource(consumer, 1, true);
-        source.open(new Configuration());
-
-        final StreamSource<String, PulsarConsumerSource<String>> src = new StreamSource<>(source);
-        final AbstractStreamOperatorTestHarness<String> testHarness =
-            new AbstractStreamOperatorTestHarness<>(src, 1, 1, 0);
-
-        testHarness.open();
-
-        sourceThread.start();
-
-        final Random random = new Random(System.currentTimeMillis());
-        for (int i = 0; i < 3; ++i) {
-
-            // wait and receive messages from the test consumer
-            receiveMessages();
-
-            final long snapshotId = random.nextLong();
-            OperatorSubtaskState data;
-            synchronized (context.getCheckpointLock()) {
-                data = testHarness.snapshot(snapshotId, System.currentTimeMillis());
-            }
-
-            final TestPulsarConsumerSource sourceCopy =
-                createSource(mock(Consumer.class), 1, true);
-            final StreamSource<String, TestPulsarConsumerSource> srcCopy = new StreamSource<>(sourceCopy);
-            final AbstractStreamOperatorTestHarness<String> testHarnessCopy =
-                new AbstractStreamOperatorTestHarness<>(srcCopy, 1, 1, 0);
-
-            testHarnessCopy.setup();
-            testHarnessCopy.initializeState(data);
-            testHarnessCopy.open();
-
-            final ArrayDeque<Tuple2<Long, Set<MessageId>>> deque = sourceCopy.getRestoredState();
-            final Set<MessageId> messageIds = deque.getLast().f1;
-
-            final int start = consumer.currentMessage.get() - numMessages;
-            for (int mi = start; mi < (start + numMessages); ++mi) {
-                assertTrue(messageIds.contains(consumer.messages.get(mi).getMessageId()));
-            }
-
-            // check if the messages are being acknowledged
-            synchronized (context.getCheckpointLock()) {
-                source.notifyCheckpointComplete(snapshotId);
-
-                assertEquals(consumer.acknowledgedIds.keySet(), messageIds);
-                // clear acknowledgements for the next snapshot comparison
-                consumer.acknowledgedIds.clear();
-            }
-
-            final int lastMessageIndex = consumer.currentMessage.get();
-            consumer.addMessages(createMessages(lastMessageIndex, 5));
-        }
-    }
-
-    @Test
-    public void testCheckpointingDuplicatedIds() throws Exception {
-        consumer = new TestConsumer(5);
-
-        source = createSource(consumer, 1, true);
-        source.open(new Configuration());
-
-        sourceThread.start();
-
-        receiveMessages();
-
-        assertEquals(5, context.elements.size());
-
-        // try to reprocess the messages we should not collect any more elements
-        consumer.reset();
-
-        receiveMessages();
-
-        assertEquals(5, context.elements.size());
-    }
-
-    @Test
-    public void testCheckpointingDisabledMessagesEqualBatchSize() throws Exception {
-
-        consumer = new TestConsumer(5);
-
-        source = createSource(consumer, 5, false);
-        source.open(new Configuration());
-
-        sourceThread.start();
-
-        receiveMessages();
-
-        assertEquals(1, consumer.acknowledgedIds.size());
-    }
-
-    @Test
-    public void testCheckpointingDisabledMoreMessagesThanBatchSize() throws Exception {
-
-        consumer = new TestConsumer(6);
-
-        source = createSource(consumer, 5, false);
-        source.open(new Configuration());
-
-        sourceThread.start();
-
-        receiveMessages();
-
-        assertEquals(1, consumer.acknowledgedIds.size());
-    }
-
-    @Test
-    public void testCheckpointingDisabledLessMessagesThanBatchSize() throws Exception {
-
-        consumer = new TestConsumer(4);
-
-        source = createSource(consumer, 5, false);
-        source.open(new Configuration());
-
-        sourceThread.start();
-
-        receiveMessages();
-
-        assertEquals(0, consumer.acknowledgedIds.size());
-    }
-
-    @Test
-    public void testCheckpointingDisabledMessages2XBatchSize() throws Exception {
-
-        consumer = new TestConsumer(10);
-
-        source = createSource(consumer, 5, false);
-        source.open(new Configuration());
-
-        sourceThread.start();
-
-        receiveMessages();
-
-        assertEquals(2, consumer.acknowledgedIds.size());
-    }
-
-    private void receiveMessages() throws InterruptedException {
-        while (consumer.currentMessage.get() < consumer.messages.size()) {
-            Thread.sleep(5);
-        }
-    }
-
-    private TestPulsarConsumerSource createSource(Consumer<byte[]> testConsumer,
-                                                  long batchSize, boolean isCheckpointingEnabled) throws Exception {
-        PulsarSourceBuilder<String> builder =
-            PulsarSourceBuilder.builder(new SimpleStringSchema())
-                .acknowledgementBatchSize(batchSize);
-        TestPulsarConsumerSource source = new TestPulsarConsumerSource(builder, testConsumer, isCheckpointingEnabled);
-
-        OperatorStateStore mockStore = mock(OperatorStateStore.class);
-        FunctionInitializationContext mockContext = mock(FunctionInitializationContext.class);
-        when(mockContext.getOperatorStateStore()).thenReturn(mockStore);
-        when(mockStore.getSerializableListState(any(String.class))).thenReturn(null);
-
-        source.initializeState(mockContext);
-
-        return source;
-    }
-
-    private static class TestPulsarConsumerSource extends PulsarConsumerSource<String> {
-
-        private ArrayDeque<Tuple2<Long, Set<MessageId>>> restoredState;
-
-        private Consumer<byte[]> testConsumer;
-        private boolean isCheckpointingEnabled;
-
-        TestPulsarConsumerSource(PulsarSourceBuilder<String> builder,
-                                 Consumer<byte[]> testConsumer, boolean isCheckpointingEnabled) {
-            super(builder);
-            this.testConsumer = testConsumer;
-            this.isCheckpointingEnabled = isCheckpointingEnabled;
-        }
-
-        @Override
-        protected boolean addId(MessageId messageId) {
-            assertTrue(isCheckpointingEnabled());
-            return super.addId(messageId);
-        }
-
-        @Override
-        public RuntimeContext getRuntimeContext() {
-            StreamingRuntimeContext context = mock(StreamingRuntimeContext.class);
-            when(context.isCheckpointingEnabled()).thenReturn(isCheckpointingEnabled);
-            return context;
-        }
-
-        @Override
-        public void initializeState(FunctionInitializationContext context) throws Exception {
-            super.initializeState(context);
-            this.restoredState = this.pendingCheckpoints;
-        }
-
-        public ArrayDeque<Tuple2<Long, Set<MessageId>>> getRestoredState() {
-            return this.restoredState;
-        }
-
-        @Override
-        PulsarClient getClient() {
-            return mock(PulsarClient.class);
-        }
-
-        @Override
-        Consumer<byte[]> createConsumer(PulsarClient client) {
-            return testConsumer;
-        }
-    }
-
-    private static class TestSourceContext implements SourceFunction.SourceContext<String> {
-
-        private static final Object lock = new Object();
-
-        private final List<String> elements = new ArrayList<>();
-
-        @Override
-        public void collect(String element) {
-            elements.add(element);
-        }
-
-        @Override
-        public void collectWithTimestamp(String element, long timestamp) {
-
-        }
-
-        @Override
-        public void emitWatermark(Watermark mark) {
-
-        }
-
-        @Override
-        public void markAsTemporarilyIdle() {
-
-        }
-
-        @Override
-        public Object getCheckpointLock() {
-            return lock;
-        }
-
-        @Override
-        public void close() {
-
-        }
-    }
-
-    private static class TestConsumer implements Consumer<byte[]> {
-
-        private final List<Message> messages = new ArrayList<>();
-
-        private AtomicInteger currentMessage = new AtomicInteger();
-
-        private final Map<MessageId, MessageId> acknowledgedIds = new ConcurrentHashMap<>();
-
-        private TestConsumer(int numMessages) {
-            messages.addAll(createMessages(0, numMessages));
-        }
-
-        private void reset() {
-            currentMessage.set(0);
-        }
-
-        @Override
-        public String getTopic() {
-            return null;
-        }
-
-        @Override
-        public String getSubscription() {
-            return null;
-        }
-
-        @Override
-        public void unsubscribe() throws PulsarClientException {
-
-        }
-
-        @Override
-        public long getLastDisconnectedTimestamp() {
-            return 0L;
-        }
-
-
-        @Override
-        public CompletableFuture<Void> unsubscribeAsync() {
-            return null;
-        }
-
-        @Override
-        public Message<byte[]> receive() throws PulsarClientException {
-            return null;
-        }
-
-        public synchronized void addMessages(List<Message> messages) {
-            this.messages.addAll(messages);
-        }
-
-        @Override
-        public CompletableFuture<Message<byte[]>> receiveAsync() {
-            return null;
-        }
-
-        @Override
-        public Message<byte[]> receive(int i, TimeUnit timeUnit) throws PulsarClientException {
-            synchronized (this) {
-                if (currentMessage.get() == messages.size()) {
-                    try {
-                        Thread.sleep(10);
-                    } catch (InterruptedException e) {
-                        System.out.println("no more messages sleeping index: " + currentMessage.get());
-                    }
-                    return null;
-                }
-                return messages.get(currentMessage.getAndIncrement());
-            }
-        }
-
-        @Override
-        public Messages<byte[]> batchReceive() throws PulsarClientException {
-            return null;
-        }
-
-        @Override
-        public CompletableFuture<Messages<byte[]>> batchReceiveAsync() {
-            return null;
-        }
-
-        @Override
-        public void acknowledge(Message<?> message) throws PulsarClientException {
-
-        }
-
-        @Override
-        public void acknowledge(MessageId messageId) throws PulsarClientException {
-
-        }
-
-        @Override
-        public void acknowledge(Messages<?> messages) throws PulsarClientException {
-
-        }
-
-        @Override
-        public void acknowledge(List<MessageId> messageIdList) throws PulsarClientException {
-
-        }
-
-        @Override
-        public void negativeAcknowledge(Message<?> message) {
-        }
-
-        @Override
-        public void negativeAcknowledge(MessageId messageId) {
-        }
-
-        @Override
-        public void negativeAcknowledge(Messages<?> messages) {
-
-        }
-
-        @Override
-        public void acknowledgeCumulative(Message<?> message) throws PulsarClientException {
-
-        }
-
-        @Override
-        public void acknowledgeCumulative(MessageId messageId) throws PulsarClientException {
-            acknowledgedIds.put(messageId, messageId);
-        }
-
-        @Override
-        public CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId, Transaction transaction) {
-            return null;
-        }
-
-        @Override
-        public CompletableFuture<Void> acknowledgeAsync(Message<?> message) {
-            return null;
-        }
-
-        @Override
-        public CompletableFuture<Void> acknowledgeAsync(MessageId messageId) {
-            acknowledgedIds.put(messageId, messageId);
-            return CompletableFuture.completedFuture(null);
-        }
-
-        @Override
-        public CompletableFuture<Void> acknowledgeAsync(MessageId messageId, Transaction transaction) {
-            return null;
-        }
-
-        @Override
-        public CompletableFuture<Void> acknowledgeAsync(Messages<?> messages) {
-            return null;
-        }
-
-        @Override
-        public CompletableFuture<Void> acknowledgeAsync(List<MessageId> messageIdList) {
-            return null;
-        }
-
-        @Override
-        public CompletableFuture<Void> acknowledgeCumulativeAsync(Message<?> message) {
-            return null;
-        }
-
-        @Override
-        public CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId) {
-            return null;
-        }
-
-        @Override
-        public ConsumerStats getStats() {
-            return null;
-        }
-
-        @Override
-        public void close() throws PulsarClientException {
-
-        }
-
-        @Override
-        public CompletableFuture<Void> closeAsync() {
-            return null;
-        }
-
-        @Override
-        public boolean hasReachedEndOfTopic() {
-            return false;
-        }
-
-        @Override
-        public void redeliverUnacknowledgedMessages() {
-
-        }
-
-        @Override
-        public void seek(MessageId messageId) throws PulsarClientException {
-
-        }
-
-        @Override
-        public void seek(long timestamp) throws PulsarClientException {
-
-        }
-
-        @Override
-        public void seek(Function<String, Object> function) throws PulsarClientException {
-
-        }
-
-        @Override
-        public CompletableFuture<Void> seekAsync(Function<String, Object> function) {
-            return null;
-        }
-
-        @Override
-        public CompletableFuture<Void> seekAsync(MessageId messageId) {
-            return null;
-        }
-
-        @Override
-        public CompletableFuture<Void> seekAsync(long timestamp) {
-            return null;
-        }
-
-        @Override
-        public boolean isConnected() {
-            return true;
-        }
-
-        @Override
-        public String getConsumerName() {
-            return "test-consumer-0";
-        }
-
-        @Override
-        public void pause() {
-        }
-
-        @Override
-        public void resume() {
-        }
-
-        @Override
-        public MessageId getLastMessageId() throws PulsarClientException {
-            return null;
-        }
-
-        @Override
-        public CompletableFuture<MessageId> getLastMessageIdAsync() {
-            return null;
-        }
-
-        @Override
-        public void reconsumeLater(Message<?> message, long delayTime, TimeUnit unit) throws PulsarClientException {
-
-        }
-
-        @Override
-        public void reconsumeLater(Messages<?> messages, long delayTime, TimeUnit unit) throws PulsarClientException {
-
-        }
-
-        @Override
-        public void reconsumeLaterCumulative(Message<?> message, long delayTime, TimeUnit unit)
-                throws PulsarClientException {
-
-        }
-
-        @Override
-        public CompletableFuture<Void> reconsumeLaterAsync(Message<?> message, long delayTime, TimeUnit unit) {
-            return null;
-        }
-
-        @Override
-        public CompletableFuture<Void> reconsumeLaterAsync(Messages<?> messages, long delayTime, TimeUnit unit) {
-            return null;
-        }
-
-        @Override
-        public CompletableFuture<Void> reconsumeLaterCumulativeAsync(Message<?> message, long delayTime,
-                TimeUnit unit) {
-            return null;
-        }
-    }
-
-    private static List<Message> createMessages(int startIndex, int numMessages) {
-        final List<Message> messages = new ArrayList<>();
-        for (int i = startIndex; i < (startIndex + numMessages); ++i) {
-            String content = "message-" + i;
-            messages.add(createMessage(content, createMessageId(1, i + 1, 1)));
-        }
-        return messages;
-    }
-
-    private static Message<byte[]> createMessage(String content, String messageId) {
-        return new MessageImpl<byte[]>("my-topic", messageId, Collections.emptyMap(),
-                                       content.getBytes(), Schema.BYTES, new MessageMetadata());
-    }
-
-    private static String createMessageId(long ledgerId, long entryId, long partitionIndex) {
-        return String.format("%d:%d:%d", ledgerId, entryId, partitionIndex);
-    }
-}
diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java
deleted file mode 100644
index d895a5f..0000000
--- a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.streaming.connectors.pulsar;
-
-import org.apache.commons.lang3.reflect.FieldUtils;
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor;
-import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarPropertiesExtractor;
-import org.apache.flink.table.sinks.TableSink;
-import org.apache.flink.types.Row;
-import org.apache.pulsar.client.api.Authentication;
-import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
-import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
-import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
-import org.mockito.Mockito;
-import org.powermock.api.mockito.PowerMockito;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals;
-
-/**
- * Unit test of {@link PulsarJsonTableSink}.
- */
-public class PulsarJsonTableSinkTest {
-
-    private static final String SERVICE_URL = "pulsar://localhost:6650";
-    private static final String TOPIC_NAME = "test_topic";
-    private static final Authentication AUTHENTICATION = new AuthenticationDisabled();
-    private static final String ROUTING_KEY = "key";
-    private final String[] fieldNames = {"key", "value"};
-    private final TypeInformation[] typeInformations = {
-            TypeInformation.of(String.class),
-            TypeInformation.of(String.class)
-    };
-
-    /**
-     * Test configure PulsarTableSink.
-     *
-     * @throws Exception
-     */
-    @Test
-    public void testConfigure() throws Exception {
-        PulsarJsonTableSink sink = spySink();
-
-        TableSink<Row> configuredSink = sink.configure(fieldNames, typeInformations);
-
-        assertArrayEquals(fieldNames, configuredSink.getFieldNames());
-        assertArrayEquals(typeInformations, configuredSink.getFieldTypes());
-        Assert.assertNotNull(((PulsarJsonTableSink) configuredSink).keyExtractor);
-        Assert.assertNotNull(((PulsarJsonTableSink) configuredSink).serializationSchema);
-    }
-
-    /**
-     * Test emit data stream.
-     *
-     * @throws Exception
-     */
-    @Test
-    public void testEmitDataStream() throws Exception {
-        DataStream mockedDataStream = Mockito.mock(DataStream.class);
-
-        PulsarJsonTableSink sink = spySink();
-
-        sink.emitDataStream(mockedDataStream);
-
-        Mockito.verify(mockedDataStream).addSink(Mockito.any(FlinkPulsarProducer.class));
-    }
-
-    private PulsarJsonTableSink spySink() throws Exception {
-        ClientConfigurationData clientConfigurationData = new ClientConfigurationData();
-        clientConfigurationData.setServiceUrl(SERVICE_URL);
-
-        ProducerConfigurationData producerConfigurationData = new ProducerConfigurationData();
-        producerConfigurationData.setTopicName(TOPIC_NAME);
-
-        PulsarJsonTableSink sink = new PulsarJsonTableSink(
-                clientConfigurationData, producerConfigurationData,
-                ROUTING_KEY);
-
-        FlinkPulsarProducer producer = Mockito.mock(FlinkPulsarProducer.class);
-        PowerMockito.whenNew(
-                FlinkPulsarProducer.class
-        ).withArguments(
-                Mockito.anyString(),
-                Mockito.anyString(),
-                Mockito.any(Authentication.class),
-                Mockito.any(SerializationSchema.class),
-                Mockito.any(PulsarKeyExtractor.class),
-                Mockito.any(PulsarPropertiesExtractor.class)
-        ).thenReturn(producer);
-
-        FieldUtils.writeField(sink, "fieldNames", fieldNames, true);
-        FieldUtils.writeField(sink, "fieldTypes", typeInformations, true);
-        FieldUtils.writeField(sink, "serializationSchema", Mockito.mock(SerializationSchema.class), true);
-        FieldUtils.writeField(sink, "keyExtractor", Mockito.mock(PulsarKeyExtractor.class), true);
-        FieldUtils.writeField(sink, "propertiesExtractor", Mockito.mock(PulsarPropertiesExtractor.class), true);
-        return sink;
-    }
-}
diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilderTest.java b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilderTest.java
deleted file mode 100644
index c89ad7e..0000000
--- a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilderTest.java
+++ /dev/null
@@ -1,237 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.streaming.connectors.pulsar;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.SubscriptionInitialPosition;
-import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
-import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
-import org.testng.Assert;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.regex.Pattern;
-
-/**
- * Tests for PulsarSourceBuilder
- */
-public class PulsarSourceBuilderTest {
-
-    private PulsarSourceBuilder pulsarSourceBuilder;
-
-    @BeforeMethod
-    public void before() {
-        pulsarSourceBuilder = PulsarSourceBuilder.builder(new TestDeserializationSchema());
-    }
-
-    @Test
-    public void testBuild() throws PulsarClientException {
-        SourceFunction sourceFunction = pulsarSourceBuilder
-                .serviceUrl("testServiceUrl")
-                .topic("testTopic")
-                .subscriptionName("testSubscriptionName")
-                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
-                .build();
-        Assert.assertNotNull(sourceFunction);
-    }
-
-
-    @Test
-    public void testBuildWithConfPojo() throws PulsarClientException {
-        ClientConfigurationData clientConf = new ClientConfigurationData();
-        clientConf.setServiceUrl("testServiceUrl");
-
-        ConsumerConfigurationData consumerConf = new ConsumerConfigurationData();
-        consumerConf.setTopicNames(new HashSet<>(Arrays.asList("testTopic")));
-        consumerConf.setSubscriptionName("testSubscriptionName");
-        consumerConf.setSubscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
-
-        SourceFunction sourceFunction = pulsarSourceBuilder
-                .pulsarAllClientConf(clientConf)
-                .pulsarAllConsumerConf(consumerConf)
-                .build();
-        Assert.assertNotNull(sourceFunction);
-    }
-
-    @Test(expectedExceptions = IllegalArgumentException.class)
-    public void testBuildWithoutSettingRequiredProperties() throws PulsarClientException {
-        pulsarSourceBuilder.build();
-    }
-
-    @Test(expectedExceptions = IllegalArgumentException.class)
-    public void testServiceUrlWithNull() {
-        pulsarSourceBuilder.serviceUrl(null);
-    }
-
-    @Test(expectedExceptions = IllegalArgumentException.class)
-    public void testServiceUrlWithBlank() {
-        pulsarSourceBuilder.serviceUrl(" ");
-    }
-
-    @Test(expectedExceptions = IllegalArgumentException.class)
-    public void testTopicWithNull() {
-        pulsarSourceBuilder.topic(null);
-    }
-
-    @Test(expectedExceptions = IllegalArgumentException.class)
-    public void testTopicWithBlank() {
-        pulsarSourceBuilder.topic(" ");
-    }
-
-    @Test(expectedExceptions = IllegalArgumentException.class)
-    public void testTopicsWithNull() {
-        pulsarSourceBuilder.topics(null);
-    }
-
-    @Test(expectedExceptions = IllegalArgumentException.class)
-    public void testTopicsWithBlank() {
-        pulsarSourceBuilder.topics(Arrays.asList(" ", " "));
-    }
-
-    @Test(expectedExceptions = IllegalArgumentException.class)
-    public void testTopicPatternWithNull() {
-        pulsarSourceBuilder.topicsPattern(null);
-    }
-
-    @Test(expectedExceptions = IllegalArgumentException.class)
-    public void testTopicPatternAlreadySet() {
-        pulsarSourceBuilder.topicsPattern(Pattern.compile("persistent://tenants/ns/topic-*"));
-        pulsarSourceBuilder.topicsPattern(Pattern.compile("persistent://tenants/ns/topic-my-*"));
-    }
-
-    @Test(expectedExceptions = IllegalArgumentException.class)
-    public void testTopicPattenStringWithNull() {
-        pulsarSourceBuilder.topicsPatternString(null);
-    }
-
-    @Test(expectedExceptions = IllegalArgumentException.class)
-    public void testSubscriptionNameWithNull() {
-        pulsarSourceBuilder.subscriptionName(null);
-    }
-
-    @Test(expectedExceptions = IllegalArgumentException.class)
-    public void testSubscriptionNameWithBlank() {
-        pulsarSourceBuilder.subscriptionName(" ");
-    }
-
-    @Test(expectedExceptions = NullPointerException.class)
-    public void testSubscriptionInitialPosition() {
-        pulsarSourceBuilder.subscriptionInitialPosition(null);
-    }
-
-    private class TestDeserializationSchema<T> implements DeserializationSchema<T> {
-
-        @Override
-        public T deserialize(byte[] bytes) throws IOException {
-            return null;
-        }
-
-        @Override
-        public boolean isEndOfStream(T t) {
-            return false;
-        }
-
-        @Override
-        public TypeInformation<T> getProducedType() {
-            return null;
-        }
-    }
-
-    @Test(expectedExceptions = IllegalArgumentException.class)
-    public void testServiceUrlNullWithConfPojo() throws PulsarClientException {
-        ClientConfigurationData clientConf = new ClientConfigurationData();
-        clientConf.setServiceUrl(null);
-
-        ConsumerConfigurationData consumerConf = new ConsumerConfigurationData();
-        consumerConf.setTopicNames(new HashSet<String>(Arrays.asList("testServiceUrl")));
-        consumerConf.setSubscriptionName("testSubscriptionName");
-
-        pulsarSourceBuilder
-                .pulsarAllClientConf(clientConf)
-                .pulsarAllConsumerConf(consumerConf)
-                .build();
-    }
-
-    @Test(expectedExceptions = IllegalArgumentException.class)
-    public void testServiceUrlWithBlankWithConfPojo() throws PulsarClientException {
-        ClientConfigurationData clientConf = new ClientConfigurationData();
-        clientConf.setServiceUrl(StringUtils.EMPTY);
-
-        ConsumerConfigurationData consumerConf = new ConsumerConfigurationData();
-        consumerConf.setTopicNames(new HashSet<String>(Arrays.asList("testTopic")));
-        consumerConf.setSubscriptionName("testSubscriptionName");
-
-        pulsarSourceBuilder
-                .pulsarAllClientConf(clientConf)
-                .pulsarAllConsumerConf(consumerConf)
-                .build();
-    }
-
-    @Test(expectedExceptions = IllegalArgumentException.class)
-    public void testTopicPatternWithNullWithConfPojo() throws PulsarClientException {
-        ClientConfigurationData clientConf = new ClientConfigurationData();
-        clientConf.setServiceUrl("testServiceUrl");
-        ConsumerConfigurationData consumerConf = new ConsumerConfigurationData();
-        consumerConf.setTopicsPattern(null);
-        consumerConf.setSubscriptionName("testSubscriptionName");
-
-        pulsarSourceBuilder
-                .pulsarAllClientConf(clientConf)
-                .pulsarAllConsumerConf(consumerConf)
-                .build();
-    }
-
-    @Test(expectedExceptions = IllegalArgumentException.class)
-    public void testSubscriptionNameWithNullWithConfPojo() throws PulsarClientException {
-        ClientConfigurationData clientConf = new ClientConfigurationData();
-        clientConf.setServiceUrl("testServiceUrl");
-
-        ConsumerConfigurationData consumerConf = new ConsumerConfigurationData();
-        consumerConf.setTopicNames(new HashSet<String>(Arrays.asList("testTopic")));
-        consumerConf.setSubscriptionName(null);
-
-        pulsarSourceBuilder
-                .pulsarAllClientConf(clientConf)
-                .pulsarAllConsumerConf(consumerConf)
-                .build();
-    }
-
-    @Test(expectedExceptions = IllegalArgumentException.class)
-    public void testSubscriptionNameWithBlankWithConfPojo() throws PulsarClientException {
-        ClientConfigurationData clientConf = new ClientConfigurationData();
-        clientConf.setServiceUrl("testServiceUrl");
-
-        ConsumerConfigurationData consumerConf = new ConsumerConfigurationData();
-        consumerConf.setTopicNames(new HashSet<String>(Arrays.asList("testTopic")));
-        consumerConf.setSubscriptionName(StringUtils.EMPTY);
-
-        pulsarSourceBuilder
-                .pulsarAllClientConf(clientConf)
-                .pulsarAllConsumerConf(consumerConf)
-                .build();
-    }
-
-}
diff --git a/pulsar-flink/src/test/resources/avro/NasaMission.avsc b/pulsar-flink/src/test/resources/avro/NasaMission.avsc
deleted file mode 100644
index 521f475..0000000
--- a/pulsar-flink/src/test/resources/avro/NasaMission.avsc
+++ /dev/null
@@ -1,10 +0,0 @@
-{"namespace": "org.apache.flink.avro.generated",
- "type": "record",
- "name": "NasaMission",
- "fields": [
-     {"name": "id", "type": "int"},
-     {"name": "name", "type": "string"},
-     {"name": "start_year",  "type": ["int", "null"]},
-     {"name": "end_year", "type": ["int", "null"]}
- ]
-}