You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/12/28 21:25:45 UTC

[pulsar] branch master updated: [Pulsar-Flink] Refactor Flink Batch Sink Examples (#3262)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 140d375  [Pulsar-Flink] Refactor Flink Batch Sink Examples (#3262)
140d375 is described below

commit 140d375e795f26b6176d606030f7bcaa611858f2
Author: Eren Avsarogullari <er...@gmail.com>
AuthorDate: Fri Dec 28 21:25:40 2018 +0000

    [Pulsar-Flink] Refactor Flink Batch Sink Examples (#3262)
    
    ### Motivation
    This PR aims to align Flink Batch Sinks Java and Scala Examples with Streaming ones.
    Following PR with #3190
    
    ### Modifications
    1- Provides a single executable jar by covering all examples. Flink `-c <className>` parameter has been suggested to run each example (e.g: `./bin/flink run -c org.apache.flink.batch.connectors.pulsar.example.FlinkPulsarBatchSinkScalaExample ...`).
    2- Adds user-defined arguments support to examples (e.g: `--service-url pulsar://127.0.0.1:6650 --topic test-topic`) instead of harcoded `service-url`and `topic-name`.
    3- Aligns existing Batch documentation with Streaming documentation.
    4- Combines both Java and Scala Examples documentations via single `README.md`
---
 examples/flink-consumer-source/pom.xml             |  32 +-
 .../example/FlinkPulsarBatchAvroSinkExample.java   |  24 +-
 .../example/FlinkPulsarBatchCsvSinkExample.java    |  23 +-
 .../example/FlinkPulsarBatchJsonSinkExample.java   |  23 +-
 .../example/FlinkPulsarBatchSinkExample.java       |  23 +-
 .../batch/connectors/pulsar/example/README.md      | 342 ++++++--------------
 .../streaming/connectors/pulsar/example/README.md  |   6 +-
 .../FlinkPulsarBatchAvroSinkScalaExample.scala     |  25 +-
 .../FlinkPulsarBatchCsvSinkScalaExample.scala      |  23 +-
 .../FlinkPulsarBatchJsonSinkScalaExample.scala     |  24 +-
 .../example/FlinkPulsarBatchSinkScalaExample.scala |  22 +-
 .../batch/connectors/pulsar/example/README.md      | 345 ---------------------
 .../connectors/pulsar/BasePulsarOutputFormat.java  |  18 +-
 .../connectors/pulsar/PulsarOutputFormat.java      |   4 +-
 14 files changed, 293 insertions(+), 641 deletions(-)

diff --git a/examples/flink-consumer-source/pom.xml b/examples/flink-consumer-source/pom.xml
index 35f5924..088444b 100644
--- a/examples/flink-consumer-source/pom.xml
+++ b/examples/flink-consumer-source/pom.xml
@@ -56,12 +56,6 @@
 
     <dependency>
       <groupId>org.apache.flink</groupId>
-      <artifactId>flink-scala_${scala.binary.version}</artifactId>
-      <version>${flink.version}</version>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.flink</groupId>
       <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
       <version>${flink.version}</version>
     </dependency>
@@ -77,6 +71,7 @@
       <artifactId>pulsar-flink</artifactId>
       <version>${project.version}</version>
     </dependency>
+
   </dependencies>
 
   <build>
@@ -107,7 +102,7 @@
                   <mainClass>org.apache.flink.streaming.connectors.pulsar.example.PulsarConsumerSourceWordCount</mainClass>
                 </transformer>
               </transformers>
-              <finalName>pulsar-flink-streaming-wordcount</finalName>
+              <finalName>pulsar-flink-examples</finalName>
               <filters>
                 <filter>
                   <artifact>*</artifact>
@@ -115,6 +110,15 @@
                     <include>org/apache/flink/streaming/examples/kafka/**</include>
                     <include>org/apache/flink/streaming/**</include>
                     <include>org/apache/pulsar/**</include>
+                    <include>org/apache/flink/batch/**</include>
+                    <include>net/jpountz/**</include>
+                    <include>com/scurrilous/circe/**</include>
+                    <include>org/apache/commons/csv/**</include>
+                    <include>org/apache/flink/avro/generated/**</include>
+                    <include>org/apache/avro/**</include>
+                    <include>org/codehaus/jackson/**</include>
+                    <include>avro/shaded/com/google/common/**</include>
+                    <include>org/apache/flink/formats/avro/**</include>
                   </includes>
                 </filter>
               </filters>
@@ -122,6 +126,20 @@
           </execution>
         </executions>
       </plugin>
+      <!-- Scala Plugin to compile Scala Files -->
+      <plugin>
+        <groupId>net.alchim31.maven</groupId>
+        <artifactId>scala-maven-plugin</artifactId>
+        <version>3.4.4</version>
+        <executions>
+          <execution>
+            <goals>
+              <goal>compile</goal>
+              <goal>add-source</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
       <!-- Generate Test class from avro schema -->
       <plugin>
         <groupId>org.apache.avro</groupId>
diff --git a/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java b/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java
index ef0048c..584d59f 100644
--- a/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java
+++ b/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java
@@ -18,10 +18,10 @@
  */
 package org.apache.flink.batch.connectors.pulsar.example;
 
-
 import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.avro.generated.NasaMission;
 import org.apache.flink.batch.connectors.pulsar.PulsarAvroOutputFormat;
 
@@ -40,16 +40,30 @@ public class FlinkPulsarBatchAvroSinkExample {
             NasaMission.newBuilder().setId(4).setName("Skylab").setStartYear(1973).setEndYear(1974).build(),
             NasaMission.newBuilder().setId(5).setName("Apollo–Soyuz Test Project").setStartYear(1975).setEndYear(1975).build());
 
-    private static final String SERVICE_URL = "pulsar://127.0.0.1:6650";
-    private static final String TOPIC_NAME = "my-flink-topic";
-
     public static void main(String[] args) throws Exception {
 
+        // parse input arguments
+        final ParameterTool parameterTool = ParameterTool.fromArgs(args);
+
+        if (parameterTool.getNumberOfParameters() < 2) {
+            System.out.println("Missing parameters!");
+            System.out.println("Usage: pulsar --service-url <pulsar-service-url> --topic <topic>");
+            return;
+        }
+
         // set up the execution environment
         final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+        env.getConfig().setGlobalJobParameters(parameterTool);
+
+        String serviceUrl = parameterTool.getRequired("service-url");
+        String topic = parameterTool.getRequired("topic");
+
+        System.out.println("Parameters:");
+        System.out.println("\tServiceUrl:\t" + serviceUrl);
+        System.out.println("\tTopic:\t" + topic);
 
         // create PulsarAvroOutputFormat instance
-        final OutputFormat<NasaMission> pulsarAvroOutputFormat = new PulsarAvroOutputFormat<>(SERVICE_URL, TOPIC_NAME);
+        final OutputFormat<NasaMission> pulsarAvroOutputFormat = new PulsarAvroOutputFormat<>(serviceUrl, topic);
 
         // create DataSet
         DataSet<NasaMission> nasaMissionDS = env.fromCollection(nasaMissions);
diff --git a/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java b/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java
index 6b0f0ca..3e658dc 100644
--- a/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java
+++ b/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.batch.connectors.pulsar.PulsarCsvOutputFormat;
 
 import java.util.Arrays;
@@ -40,17 +41,31 @@ public class FlinkPulsarBatchCsvSinkExample {
             new Tuple4(4, "Skylab", 1973, 1974),
             new Tuple4(5, "Apollo–Soyuz Test Project", 1975, 1975));
 
-    private static final String SERVICE_URL = "pulsar://127.0.0.1:6650";
-    private static final String TOPIC_NAME = "my-flink-topic";
-
     public static void main(String[] args) throws Exception {
 
+        // parse input arguments
+        final ParameterTool parameterTool = ParameterTool.fromArgs(args);
+
+        if (parameterTool.getNumberOfParameters() < 2) {
+            System.out.println("Missing parameters!");
+            System.out.println("Usage: pulsar --service-url <pulsar-service-url> --topic <topic>");
+            return;
+        }
+
         // set up the execution environment
         final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+        env.getConfig().setGlobalJobParameters(parameterTool);
+
+        String serviceUrl = parameterTool.getRequired("service-url");
+        String topic = parameterTool.getRequired("topic");
+
+        System.out.println("Parameters:");
+        System.out.println("\tServiceUrl:\t" + serviceUrl);
+        System.out.println("\tTopic:\t" + topic);
 
         // create PulsarCsvOutputFormat instance
         final OutputFormat<Tuple4<Integer, String, Integer, Integer>> pulsarCsvOutputFormat =
-                new PulsarCsvOutputFormat<>(SERVICE_URL, TOPIC_NAME);
+                new PulsarCsvOutputFormat<>(serviceUrl, topic);
 
         // create DataSet
         DataSet<Tuple4<Integer, String, Integer, Integer>> nasaMissionDS = env.fromCollection(nasaMissions);
diff --git a/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkExample.java b/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkExample.java
index e037616..3937ae9 100644
--- a/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkExample.java
+++ b/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkExample.java
@@ -21,6 +21,7 @@ package org.apache.flink.batch.connectors.pulsar.example;
 import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.batch.connectors.pulsar.PulsarJsonOutputFormat;
 
 import java.util.Arrays;
@@ -38,16 +39,30 @@ public class FlinkPulsarBatchJsonSinkExample {
             new NasaMission(4, "Skylab", 1973, 1974),
             new NasaMission(5, "Apollo–Soyuz Test Project", 1975, 1975));
 
-    private static final String SERVICE_URL = "pulsar://127.0.0.1:6650";
-    private static final String TOPIC_NAME = "my-flink-topic";
-
     public static void main(String[] args) throws Exception {
 
+        // parse input arguments
+        final ParameterTool parameterTool = ParameterTool.fromArgs(args);
+
+        if (parameterTool.getNumberOfParameters() < 2) {
+            System.out.println("Missing parameters!");
+            System.out.println("Usage: pulsar --service-url <pulsar-service-url> --topic <topic>");
+            return;
+        }
+
         // set up the execution environment
         final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+        env.getConfig().setGlobalJobParameters(parameterTool);
+
+        String serviceUrl = parameterTool.getRequired("service-url");
+        String topic = parameterTool.getRequired("topic");
+
+        System.out.println("Parameters:");
+        System.out.println("\tServiceUrl:\t" + serviceUrl);
+        System.out.println("\tTopic:\t" + topic);
 
         // create PulsarJsonOutputFormat instance
-        final OutputFormat<NasaMission> pulsarJsonOutputFormat = new PulsarJsonOutputFormat<>(SERVICE_URL, TOPIC_NAME);
+        final OutputFormat<NasaMission> pulsarJsonOutputFormat = new PulsarJsonOutputFormat<>(serviceUrl, topic);
 
         // create DataSet
         DataSet<NasaMission> nasaMissionDS = env.fromCollection(nasaMissions);
diff --git a/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java b/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java
index 6724c62..c90d016 100644
--- a/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java
+++ b/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.batch.connectors.pulsar.PulsarOutputFormat;
 import org.apache.flink.util.Collector;
 
@@ -35,17 +36,31 @@ public class FlinkPulsarBatchSinkExample {
     private static final String EINSTEIN_QUOTE = "Imagination is more important than knowledge. " +
             "Knowledge is limited. Imagination encircles the world.";
 
-    private static final String SERVICE_URL = "pulsar://127.0.0.1:6650";
-    private static final String TOPIC_NAME = "my-flink-topic";
-
     public static void main(String[] args) throws Exception {
 
+        // parse input arguments
+        final ParameterTool parameterTool = ParameterTool.fromArgs(args);
+
+        if (parameterTool.getNumberOfParameters() < 2) {
+            System.out.println("Missing parameters!");
+            System.out.println("Usage: pulsar --service-url <pulsar-service-url> --topic <topic>");
+            return;
+        }
+
         // set up the execution environment
         final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+        env.getConfig().setGlobalJobParameters(parameterTool);
+
+        String serviceUrl = parameterTool.getRequired("service-url");
+        String topic = parameterTool.getRequired("topic");
+
+        System.out.println("Parameters:");
+        System.out.println("\tServiceUrl:\t" + serviceUrl);
+        System.out.println("\tTopic:\t" + topic);
 
         // create PulsarOutputFormat instance
         final OutputFormat pulsarOutputFormat =
-                new PulsarOutputFormat(SERVICE_URL, TOPIC_NAME, wordWithCount -> wordWithCount.toString().getBytes());
+                new PulsarOutputFormat(serviceUrl, topic, wordWithCount -> wordWithCount.toString().getBytes());
 
         // create DataSet
         DataSet<String> textDS = env.fromElements(EINSTEIN_QUOTE);
diff --git a/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/README.md b/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/README.md
index 3116b3b..b93f5a3 100644
--- a/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/README.md
+++ b/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/README.md
@@ -53,71 +53,56 @@ dependencies {
 }
 ```
 
-# PulsarOutputFormat
-### Usage
+# Example
 
-Please find a sample usage as follows:
+### PulsarOutputFormat
 
-```java
-        private static final String EINSTEIN_QUOTE = "Imagination is more important than knowledge. " +
-                "Knowledge is limited. Imagination encircles the world.";
+In this example, Flink DataSet is processed as word-count and being written to Pulsar. Please find a complete example for PulsarOutputFormat as follows:
+[java](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java)
+[scala](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkScalaExample.scala)
 
-        private static final String SERVICE_URL = "pulsar://127.0.0.1:6650";
-        private static final String TOPIC_NAME = "my-flink-topic";
+The steps to run the example:
 
-        public static void main(String[] args) throws Exception {
+1. Start Pulsar Standalone.
 
-            // set up the execution environment
-            final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+    You can follow the [instructions](https://pulsar.apache.org/docs/en/standalone/) to start a Pulsar standalone locally.
 
-            // create PulsarOutputFormat instance
-            final OutputFormat<String> pulsarOutputFormat =
-                    new PulsarOutputFormat(SERVICE_URL, TOPIC_NAME, wordWithCount -> wordWithCount.toString().getBytes());
+    ```shell
+    $ bin/pulsar standalone
+    ```
 
-            // create DataSet
-            DataSet<String> textDS = env.fromElements(EINSTEIN_QUOTE);
+2. Start Flink locally.
 
-            textDS.flatMap(new FlatMapFunction<String, String>() {
-                @Override
-                public void flatMap(String value, Collector<String> out) throws Exception {
-                    String[] words = value.toLowerCase().split(" ");
-                    for(String word: words) {
-                        out.collect(word.replace(".", ""));
-                    }
-                }
-            })
-            // filter words which length is bigger than 4
-            .filter(word -> word.length() > 4)
+    You can follow the [instructions](https://ci.apache.org/projects/flink/flink-docs-release-1.6/quickstart/setup_quickstart.html) to download and start Flink.
 
-            // write batch data to Pulsar
-            .output(pulsarOutputFormat);
+    ```shell
+    $ ./bin/start-cluster.sh
+    ```
 
-            // execute program
-            env.execute("Flink - Pulsar Batch WordCount");
-        }
-```
+3. Build the examples.
 
-### Sample Output
+    ```shell
+    $ cd ${PULSAR_HOME}
+    $ mvn clean install -DskipTests
+    ```
 
-Please find sample output for above application as follows:
-```
-imagination
-important
-knowledge
-knowledge
-limited
-imagination
-encircles
-world
-```
+4. Run the word count example to print results to stdout.
+
+    ```shell
+    # java
+    $ ./bin/flink run -c org.apache.flink.batch.connectors.pulsar.example.FlinkPulsarBatchSinkExample ${PULSAR_HOME}/examples/flink-consumer-source/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --topic test_flink_topic
 
-### Complete Example
+    # scala
+    $ ./bin/flink run -c org.apache.flink.batch.connectors.pulsar.example.FlinkPulsarBatchSinkScalaExample ${PULSAR_HOME}/examples/flink-consumer-source/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --topic test_flink_topic
+    ```
 
-You can find a complete example [here](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java).
-In this example, Flink DataSet is processed as word-count and being written to Pulsar.
+5. Once the flink word count example is running, you can use `bin/pulsar-client` to tail the results produced into topic `test_flink_topic`.
 
-### Complete Example Output
-Please find sample output for above linked application as follows:
+```shell
+$ bin/pulsar-client consume -n 0 -s test test_flink_topic
+```
+
+6. Please find sample output for above linked application as follows:
 ```
 WordWithCount { word = important, count = 1 }
 WordWithCount { word = encircles, count = 1 }
@@ -127,230 +112,105 @@ WordWithCount { word = limited, count = 1 }
 WordWithCount { word = world, count = 1 }
 ```
 
-# PulsarCsvOutputFormat
-### Usage
-
-Please find a sample usage as follows:
-
-```java
-        private static final List<Tuple4<Integer, String, Integer, Integer>> nasaMissions = Arrays.asList(
-                new Tuple4(1, "Mercury program", 1959, 1963),
-                new Tuple4(2, "Apollo program", 1961, 1972),
-                new Tuple4(3, "Gemini program", 1963, 1966),
-                new Tuple4(4, "Skylab", 1973, 1974),
-                new Tuple4(5, "Apollo–Soyuz Test Project", 1975, 1975));
-
-        private static final String SERVICE_URL = "pulsar://127.0.0.1:6650";
-        private static final String TOPIC_NAME = "my-flink-topic";
-
-        public static void main(String[] args) throws Exception {
-
-            // set up the execution environment
-            final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-            // create PulsarCsvOutputFormat instance
-            final OutputFormat<Tuple4<Integer, String, Integer, Integer>> pulsarCsvOutputFormat =
-                    new PulsarCsvOutputFormat<>(SERVICE_URL, TOPIC_NAME);
-
-            // create DataSet
-            DataSet<Tuple4<Integer, String, Integer, Integer>> nasaMissionDS = env.fromCollection(nasaMissions);
-            // map nasa mission names to upper-case
-            nasaMissionDS.map(
-                new MapFunction<Tuple4<Integer, String, Integer, Integer>, Tuple4<Integer, String, Integer, Integer>>() {
-                               @Override
-                               public Tuple4<Integer, String, Integer, Integer> map(
-                                       Tuple4<Integer, String, Integer, Integer> nasaMission) throws Exception {
-                                   return new Tuple4(
-                                           nasaMission.f0,
-                                           nasaMission.f1.toUpperCase(),
-                                           nasaMission.f2,
-                                           nasaMission.f3);
-                               }
-                           }
-            )
-            // filter missions which started after 1970
-            .filter(nasaMission -> nasaMission.f2 > 1970)
-            // write batch data to Pulsar
-            .output(pulsarCsvOutputFormat);
-
-            // set parallelism to write Pulsar in parallel (optional)
-            env.setParallelism(2);
-
-            // execute program
-            env.execute("Flink - Pulsar Batch Csv");
-
-        }
-```
-
-### Sample Output
 
-Please find sample output for above application as follows:
-```
-4,SKYLAB,1973,1974
-5,APOLLO–SOYUZ TEST PROJECT,1975,1975
-```
+### PulsarCsvOutputFormat
 
-### Complete Example
+In this example, Flink DataSet is processed and written to Pulsar in Csv format. Please find a complete example for PulsarCsvOutputFormat as follows:
+[java](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java)
+[scala](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkScalaExample.scala)
 
-You can find a complete example [here](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java).
-In this example, Flink DataSet is processed and written to Pulsar in Csv format.
+The steps to run the example:
 
+Step 1, 2 and 3 are same as above.
 
-# PulsarJsonOutputFormat
-### Usage
+4. Run the word count example to print results to stdout.
 
-Please find a sample usage as follows:
+    ```shell
+    # java
+    $ ./bin/flink run -c org.apache.flink.batch.connectors.pulsar.example.FlinkPulsarBatchCsvSinkExample ${PULSAR_HOME}/examples/flink-consumer-source/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --topic test_flink_topic
 
-```java
-        private static final List<NasaMission> nasaMissions = Arrays.asList(
-                new NasaMission(1, "Mercury program", 1959, 1963),
-                new NasaMission(2, "Apollo program", 1961, 1972),
-                new NasaMission(3, "Gemini program", 1963, 1966),
-                new NasaMission(4, "Skylab", 1973, 1974),
-                new NasaMission(5, "Apollo–Soyuz Test Project", 1975, 1975));
+    # scala
+    $ ./bin/flink run -c org.apache.flink.batch.connectors.pulsar.example.FlinkPulsarBatchCsvSinkScalaExample ${PULSAR_HOME}/examples/flink-consumer-source/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --topic test_flink_topic
+    ```
 
-        private static final String SERVICE_URL = "pulsar://127.0.0.1:6650";
-        private static final String TOPIC_NAME = "my-flink-topic";
+5. Once the flink word count example is running, you can use `bin/pulsar-client` to tail the results produced into topic `test_flink_topic`.
 
-        public static void main(String[] args) throws Exception {
-
-            // set up the execution environment
-            final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+```shell
+$ bin/pulsar-client consume -n 0 -s test test_flink_topic
+```
 
-            // create PulsarJsonOutputFormat instance
-            final OutputFormat<NasaMission> pulsarJsonOutputFormat = new PulsarJsonOutputFormat<>(SERVICE_URL, TOPIC_NAME);
+6. Please find sample output for above linked application as follows:
+```
+4,SKYLAB,1973,1974
+5,APOLLO–SOYUZ TEST PROJECT,1975,1975
+```
 
-            // create DataSet
-            DataSet<NasaMission> nasaMissionDS = env.fromCollection(nasaMissions);
-            // map nasa mission names to upper-case
-            nasaMissionDS.map(nasaMission -> new NasaMission(
-                    nasaMission.id,
-                    nasaMission.missionName.toUpperCase(),
-                    nasaMission.startYear,
-                    nasaMission.endYear))
-            // filter missions which started after 1970
-            .filter(nasaMission -> nasaMission.startYear > 1970)
-            // write batch data to Pulsar
-            .output(pulsarJsonOutputFormat);
 
-            // set parallelism to write Pulsar in parallel (optional)
-            env.setParallelism(2);
+### PulsarJsonOutputFormat
 
-            // execute program
-            env.execute("Flink - Pulsar Batch Json");
-        }
+In this example, Flink DataSet is processed and written to Pulsar in Json format. Please find a complete example for PulsarJsonOutputFormat as follows:
+[java](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkExample.java)
+[scala](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkScalaExample.scala)
 
-        /**
-         * NasaMission data model
-         *
-         * Note: Property definitions of the model should be public or have getter functions to be visible
-         */
-        private static class NasaMission {
+**Note:** Property definitions of the model should be public or have getter functions to be visible.
 
-            private int id;
-            private String missionName;
-            private int startYear;
-            private int endYear;
+The steps to run the example:
 
-            public NasaMission(int id, String missionName, int startYear, int endYear) {
-                this.id = id;
-                this.missionName = missionName;
-                this.startYear = startYear;
-                this.endYear = endYear;
-            }
+Step 1, 2 and 3 are same as above.
 
-            public int getId() {
-                return id;
-            }
+4. Run the word count example to print results to stdout.
 
-            public String getMissionName() {
-                return missionName;
-            }
+    ```shell
+    # java
+    $ ./bin/flink run -c org.apache.flink.batch.connectors.pulsar.example.FlinkPulsarBatchJsonSinkExample ${PULSAR_HOME}/examples/flink-consumer-source/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --topic test_flink_topic
 
-            public int getStartYear() {
-                return startYear;
-            }
+    # scala
+    $ ./bin/flink run -c org.apache.flink.batch.connectors.pulsar.example.FlinkPulsarBatchJsonSinkScalaExample ${PULSAR_HOME}/examples/flink-consumer-source/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --topic test_flink_topic
+    ```
 
-            public int getEndYear() {
-                return endYear;
-            }
-        }
+5. Once the flink word count example is running, you can use `bin/pulsar-client` to tail the results produced into topic `test_flink_topic`.
 
+```shell
+$ bin/pulsar-client consume -n 0 -s test test_flink_topic
 ```
 
-**Note:** Property definitions of the model should be public or have getter functions to be visible
-
-### Sample Output
-
-Please find sample output for above application as follows:
+6. Please find sample output for above linked application as follows:
 ```
 {"id":4,"missionName":"SKYLAB","startYear":1973,"endYear":1974}
 {"id":5,"missionName":"APOLLO–SOYUZ TEST PROJECT","startYear":1975,"endYear":1975}
 ```
 
-### Complete Example
-
-You can find a complete example [here](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkExample.java).
-In this example, Flink DataSet is processed and written to Pulsar in Json format.
-
-
-# PulsarAvroOutputFormat
-### Usage
-
-Please find a sample usage as follows:
-
-```java
-        private static final List<NasaMission> nasaMissions = Arrays.asList(
-                    NasaMission.newBuilder().setId(1).setName("Mercury program").setStartYear(1959).setEndYear(1963).build(),
-                    NasaMission.newBuilder().setId(2).setName("Apollo program").setStartYear(1961).setEndYear(1972).build(),
-                    NasaMission.newBuilder().setId(3).setName("Gemini program").setStartYear(1963).setEndYear(1966).build(),
-                    NasaMission.newBuilder().setId(4).setName("Skylab").setStartYear(1973).setEndYear(1974).build(),
-                    NasaMission.newBuilder().setId(5).setName("Apollo–Soyuz Test Project").setStartYear(1975).setEndYear(1975).build());
-        
-            private static final String SERVICE_URL = "pulsar://127.0.0.1:6650";
-            private static final String TOPIC_NAME = "my-flink-topic";
-        
-            public static void main(String[] args) throws Exception {
-        
-                // set up the execution environment
-                final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-        
-                // create PulsarAvroOutputFormat instance
-                final OutputFormat<NasaMission> pulsarAvroOutputFormat = new PulsarAvroOutputFormat<>(SERVICE_URL, TOPIC_NAME);
-        
-                // create DataSet
-                DataSet<NasaMission> nasaMissionDS = env.fromCollection(nasaMissions);
-                // map nasa mission names to upper-case
-                nasaMissionDS.map(nasaMission -> new NasaMission(
-                        nasaMission.getId(),
-                        nasaMission.getName(),
-                        nasaMission.getStartYear(),
-                        nasaMission.getEndYear()))
-                        // filter missions which started after 1970
-                        .filter(nasaMission -> nasaMission.getStartYear() > 1970)
-                        // write batch data to Pulsar
-                        .output(pulsarAvroOutputFormat);
-        
-                // set parallelism to write Pulsar in parallel (optional)
-                env.setParallelism(2);
-        
-                // execute program
-                env.execute("Flink - Pulsar Batch Avro");
-            }
 
-```
+### PulsarAvroOutputFormat
+
+In this example, Flink DataSet is processed and written to Pulsar in Json format. Please find a complete example for PulsarAvroOutputFormat as follows:
+[java](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java)
+[scala](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkScalaExample.scala)
+
+**Note:** NasaMission class are automatically generated by Avro.
+
+The steps to run the example:
 
-**Note:** NasaMission class are automatically generated by Avro
+Step 1, 2 and 3 are same as above.
 
-### Sample Output
+4. Run the word count example to print results to stdout.
 
-Please find sample output for above application as follows:
+    ```shell
+    # java
+    $ ./bin/flink run -c org.apache.flink.batch.connectors.pulsar.example.FlinkPulsarBatchAvroSinkExample ${PULSAR_HOME}/examples/flink-consumer-source/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --topic test_flink_topic
+
+    # scala
+    $ ./bin/flink run -c org.apache.flink.batch.connectors.pulsar.example.FlinkPulsarBatchAvroSinkScalaExample ${PULSAR_HOME}/examples/flink-consumer-source/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --topic test_flink_topic
+    ```
+
+5. Once the flink word count example is running, you can use `bin/pulsar-client` to tail the results produced into topic `test_flink_topic`.
+
+```shell
+$ bin/pulsar-client consume -n 0 -s test test_flink_topic
+```
+
+6. Please find sample output for above linked application as follows:
 ```
  "4,SKYLAB,1973,1974"
  "5,APOLLO–SOYUZ TEST PROJECT,1975,1975"
 ```
-
-### Complete Example
-
-You can find a complete example [here](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java).
-In this example, Flink DataSet is processed and written to Pulsar in Avro format.
diff --git a/examples/flink-consumer-source/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/README.md b/examples/flink-consumer-source/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/README.md
index 16c88f1..463e805 100644
--- a/examples/flink-consumer-source/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/README.md
+++ b/examples/flink-consumer-source/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/README.md
@@ -30,7 +30,7 @@ See the [Pulsar Concepts](https://pulsar.apache.org/docs/en/concepts-overview/)
 
 ### PulsarConsumerSourceWordCount
 
-This Flink streaming job is consuming from a Pulsar topic and couting the wordcount in a streaming fashion. The job can write the word count results
+This Flink streaming job is consuming from a Pulsar topic and counting the wordcount in a streaming fashion. The job can write the word count results
 to stdout or another Pulsar topic.
 
 The steps to run the example:
@@ -61,7 +61,7 @@ The steps to run the example:
 4. Run the word count example to print results to stdout.
 
     ```shell
-    $ ./bin/flink run  ${PULSAR_HOME}/examples/flink-consumer-source/target/pulsar-flink-streaming-wordcount.jar --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub
+    $ ./bin/flink run  ${PULSAR_HOME}/examples/flink-consumer-source/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub
     ```
 
 5. Produce messages to topic `test_src`.
@@ -85,7 +85,7 @@ The steps to run the example:
 Alternatively, when you run the flink word count example at step 4, you can choose dump the result to another pulsar topic.
 
 ```shell
-$ ./bin/flink run  ${PULSAR_HOME}/examples/flink-consumer-source/target/pulsar-flink-streaming-wordcount.jar --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub --output-topic test_dest
+$ ./bin/flink run  ${PULSAR_HOME}/examples/flink-consumer-source/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub --output-topic test_dest
 ```
 
 Once the flink word count example is running, you can use `bin/pulsar-client` to tail the results produced into topic `test_dest`.
diff --git a/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkScalaExample.scala b/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkScalaExample.scala
index 0d255f2..f10d6c1 100644
--- a/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkScalaExample.scala
+++ b/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkScalaExample.scala
@@ -18,6 +18,7 @@
  */
 package org.apache.flink.batch.connectors.pulsar.example
 
+import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.api.scala._
 import org.apache.flink.avro.generated.NasaMission
 import org.apache.flink.batch.connectors.pulsar.PulsarAvroOutputFormat
@@ -27,10 +28,7 @@ import org.apache.flink.batch.connectors.pulsar.PulsarAvroOutputFormat
   */
 object FlinkPulsarBatchAvroSinkScalaExample {
 
-  private val SERVICE_URL = "pulsar://127.0.0.1:6650"
-  private val TOPIC_NAME = "my-flink-topic"
-
-  val nasaMissions = List(
+  private val nasaMissions = List(
     NasaMission.newBuilder.setId(1).setName("Mercury program").setStartYear(1959).setEndYear(1963).build,
     NasaMission.newBuilder.setId(2).setName("Apollo program").setStartYear(1961).setEndYear(1972).build,
     NasaMission.newBuilder.setId(3).setName("Gemini program").setStartYear(1963).setEndYear(1966).build,
@@ -39,12 +37,29 @@ object FlinkPulsarBatchAvroSinkScalaExample {
 
   def main(args: Array[String]): Unit = {
 
+    // parse input arguments
+    val parameterTool = ParameterTool.fromArgs(args)
+
+    if (parameterTool.getNumberOfParameters < 2) {
+      println("Missing parameters!")
+      println("Usage: pulsar --service-url <pulsar-service-url> --topic <topic>")
+      return
+    }
+
     // set up the execution environment
     val env = ExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.setGlobalJobParameters(parameterTool)
+
+    val serviceUrl = parameterTool.getRequired("service-url")
+    val topic = parameterTool.getRequired("topic")
+
+    println("Parameters:")
+    println("\tServiceUrl:\t" + serviceUrl)
+    println("\tTopic:\t" + topic)
 
     // create PulsarCsvOutputFormat instance
     val pulsarAvroOutputFormat =
-      new PulsarAvroOutputFormat[NasaMission](SERVICE_URL, TOPIC_NAME)
+      new PulsarAvroOutputFormat[NasaMission](serviceUrl, topic)
 
     // create DataSet
     val textDS = env.fromCollection(nasaMissions)
diff --git a/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkScalaExample.scala b/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkScalaExample.scala
index 7db844b..3233616 100644
--- a/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkScalaExample.scala
+++ b/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkScalaExample.scala
@@ -19,6 +19,7 @@
 package org.apache.flink.batch.connectors.pulsar.example
 
 import org.apache.flink.api.java.tuple.Tuple4
+import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.api.scala._
 import org.apache.flink.batch.connectors.pulsar.PulsarCsvOutputFormat
 
@@ -33,9 +34,6 @@ object FlinkPulsarBatchCsvSinkScalaExample {
   private case class NasaMission(id: Int, missionName: String, startYear: Int, endYear: Int)
     extends Tuple4(id, missionName, startYear, endYear)
 
-  private val SERVICE_URL = "pulsar://127.0.0.1:6650"
-  private val TOPIC_NAME = "my-flink-topic"
-
   private val nasaMissions = List(
     NasaMission(1, "Mercury program", 1959, 1963),
     NasaMission(2, "Apollo program", 1961, 1972),
@@ -45,12 +43,29 @@ object FlinkPulsarBatchCsvSinkScalaExample {
 
   def main(args: Array[String]): Unit = {
 
+    // parse input arguments
+    val parameterTool = ParameterTool.fromArgs(args)
+
+    if (parameterTool.getNumberOfParameters < 2) {
+      println("Missing parameters!")
+      println("Usage: pulsar --service-url <pulsar-service-url> --topic <topic>")
+      return
+    }
+
     // set up the execution environment
     val env = ExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.setGlobalJobParameters(parameterTool)
+
+    val serviceUrl = parameterTool.getRequired("service-url")
+    val topic = parameterTool.getRequired("topic")
+
+    println("Parameters:")
+    println("\tServiceUrl:\t" + serviceUrl)
+    println("\tTopic:\t" + topic)
 
     // create PulsarCsvOutputFormat instance
     val pulsarCsvOutputFormat =
-      new PulsarCsvOutputFormat[NasaMission](SERVICE_URL, TOPIC_NAME)
+      new PulsarCsvOutputFormat[NasaMission](serviceUrl, topic)
 
     // create DataSet
     val textDS = env.fromCollection(nasaMissions)
diff --git a/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkScalaExample.scala b/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkScalaExample.scala
index 1f7fc19..60d02e5 100644
--- a/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkScalaExample.scala
+++ b/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkScalaExample.scala
@@ -18,9 +18,9 @@
  */
 package org.apache.flink.batch.connectors.pulsar.example
 
+import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.api.scala._
 import org.apache.flink.batch.connectors.pulsar.PulsarJsonOutputFormat
-
 import scala.beans.BeanProperty
 
 /**
@@ -43,16 +43,30 @@ object FlinkPulsarBatchJsonSinkScalaExample {
     NasaMission(4, "Skylab", 1973, 1974),
     NasaMission(5, "Apollo–Soyuz Test Project", 1975, 1975))
 
-  private val SERVICE_URL = "pulsar://127.0.0.1:6650"
-  private val TOPIC_NAME = "my-flink-topic"
-
   def main(args: Array[String]): Unit = {
 
+    // parse input arguments
+    val parameterTool = ParameterTool.fromArgs(args)
+
+    if (parameterTool.getNumberOfParameters < 2) {
+      println("Missing parameters!")
+      println("Usage: pulsar --service-url <pulsar-service-url> --topic <topic>")
+      return
+    }
+
     // set up the execution environment
     val env = ExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.setGlobalJobParameters(parameterTool)
+
+    val serviceUrl = parameterTool.getRequired("service-url")
+    val topic = parameterTool.getRequired("topic")
+
+    println("Parameters:")
+    println("\tServiceUrl:\t" + serviceUrl)
+    println("\tTopic:\t" + topic)
 
     // create PulsarJsonOutputFormat instance
-    val pulsarJsonOutputFormat = new PulsarJsonOutputFormat[NasaMission](SERVICE_URL, TOPIC_NAME)
+    val pulsarJsonOutputFormat = new PulsarJsonOutputFormat[NasaMission](serviceUrl, topic)
 
     // create DataSet
     val nasaMissionDS = env.fromCollection(nasaMissions)
diff --git a/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkScalaExample.scala b/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkScalaExample.scala
index 5e536cf..4de0dcb 100644
--- a/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkScalaExample.scala
+++ b/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkScalaExample.scala
@@ -19,6 +19,7 @@
 package org.apache.flink.batch.connectors.pulsar.example
 
 import org.apache.flink.api.common.serialization.SerializationSchema
+import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.api.scala._
 import org.apache.flink.batch.connectors.pulsar.PulsarOutputFormat
 import org.apache.flink.util.Collector
@@ -37,17 +38,32 @@ object FlinkPulsarBatchSinkScalaExample {
 
   private val EINSTEIN_QUOTE = "Imagination is more important than knowledge. " +
     "Knowledge is limited. Imagination encircles the world."
-  private val SERVICE_URL = "pulsar://127.0.0.1:6650"
-  private val TOPIC_NAME = "my-flink-topic"
 
   def main(args: Array[String]): Unit = {
 
+    // parse input arguments
+    val parameterTool = ParameterTool.fromArgs(args)
+
+    if (parameterTool.getNumberOfParameters < 2) {
+      println("Missing parameters!")
+      println("Usage: pulsar --service-url <pulsar-service-url> --topic <topic>")
+      return
+    }
+
     // set up the execution environment
     val env = ExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.setGlobalJobParameters(parameterTool)
+
+    val serviceUrl = parameterTool.getRequired("service-url")
+    val topic = parameterTool.getRequired("topic")
+
+    println("Parameters:")
+    println("\tServiceUrl:\t" + serviceUrl)
+    println("\tTopic:\t" + topic)
 
     // create PulsarOutputFormat instance
     val pulsarOutputFormat =
-      new PulsarOutputFormat[WordWithCount](SERVICE_URL, TOPIC_NAME, new SerializationSchema[WordWithCount] {
+      new PulsarOutputFormat[WordWithCount](serviceUrl, topic, new SerializationSchema[WordWithCount] {
         override def serialize(wordWithCount: WordWithCount): Array[Byte] = wordWithCount.toString.getBytes
       })
 
diff --git a/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/README.md b/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/README.md
deleted file mode 100644
index e206392..0000000
--- a/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/README.md
+++ /dev/null
@@ -1,345 +0,0 @@
-<!--
-
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
-
--->
-
-The Flink Batch Sink for Pulsar is a custom sink that enables Apache [Flink](https://flink.apache.org/) to write [DataSet](https://ci.apache.org/projects/flink/flink-docs-stable/dev/batch/index.html) to Pulsar.
-This document explains how to develop Scala Applications by using Flink Batch Sink.
-# Prerequisites
-
-To use this sink, include a dependency for the `pulsar-flink` library in your Java configuration.
-
-# Maven
-
-If you're using Maven, add this to your `pom.xml`:
-
-```xml
-<!-- in your <properties> block -->
-<pulsar.version>{{pulsar:version}}</pulsar.version>
-
-<!-- in your <dependencies> block -->
-<dependency>
-  <groupId>org.apache.pulsar</groupId>
-  <artifactId>pulsar-flink</artifactId>
-  <version>${pulsar.version}</version>
-</dependency>
-```
-
-# Gradle
-
-If you're using Gradle, add this to your `build.gradle` file:
-
-```groovy
-def pulsarVersion = "{{pulsar:version}}"
-
-dependencies {
-    compile group: 'org.apache.pulsar', name: 'pulsar-flink', version: pulsarVersion
-}
-```
-
-# PulsarOutputFormat
-### Usage
-
-Please find Scala sample usage of `PulsarOutputFormat` as follows:
-
-```scala
-      /**
-        * Data type for words with count.
-        */
-      case class WordWithCount(word: String, count: Long) {
-        override def toString: String = "WordWithCount { word = " + word + ", count = " + count + " }"
-      }
-
-      /**
-        * Implementation
-        */
-      private val EINSTEIN_QUOTE = "Imagination is more important than knowledge. " +
-        "Knowledge is limited. Imagination encircles the world."
-      private val SERVICE_URL = "pulsar://127.0.0.1:6650"
-      private val TOPIC_NAME = "my-flink-topic"
-
-      def main(args: Array[String]): Unit = {
-
-        // set up the execution environment
-        val env = ExecutionEnvironment.getExecutionEnvironment
-
-        // create PulsarOutputFormat instance
-        val pulsarOutputFormat =
-          new PulsarOutputFormat[WordWithCount](SERVICE_URL, TOPIC_NAME, new SerializationSchema[WordWithCount] {
-            override def serialize(wordWithCount: WordWithCount): Array[Byte] = wordWithCount.toString.getBytes
-          })
-
-        // create DataSet
-        val textDS = env.fromElements[String](EINSTEIN_QUOTE)
-
-        // convert sentence to words
-        textDS.flatMap((value: String, out: Collector[WordWithCount]) => {
-          val words = value.toLowerCase.split(" ")
-          for (word <- words) {
-            out.collect(new WordWithCount(word.replace(".", ""), 1))
-          }
-        })
-
-        // filter words which length is bigger than 4
-        .filter((wordWithCount: WordWithCount) => wordWithCount.word.length > 4)
-
-        // group the words
-        .groupBy((wordWithCount: WordWithCount) => wordWithCount.word)
-
-        // sum the word counts
-        .reduce((wordWithCount1: WordWithCount, wordWithCount2: WordWithCount) =>
-          new WordWithCount(wordWithCount1.word, wordWithCount1.count + wordWithCount2.count))
-
-        // write batch data to Pulsar
-        .output(pulsarOutputFormat)
-
-        // set parallelism to write Pulsar in parallel (optional)
-        env.setParallelism(2)
-
-        // execute program
-        env.execute("Flink - Pulsar Batch WordCount")
-      }
-```
-
-### Sample Output
-
-Please find sample output for above application as follows:
-```
-WordWithCount { word = encircles, count = 1 }
-WordWithCount { word = important, count = 1 }
-WordWithCount { word = imagination, count = 2 }
-WordWithCount { word = limited, count = 1 }
-WordWithCount { word = knowledge, count = 2 }
-WordWithCount { word = world, count = 1 }
-```
-
-### Complete Example
-
-You can find a complete example [here](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkScalaExample.scala).
-In this example, Flink DataSet is processed as word-count and being written to Pulsar.
-
-
-# PulsarCsvOutputFormat
-### Usage
-
-Please find Scala sample usage of `PulsarCsvOutputFormat` as follows:
-
-```scala
-      /**
-        * NasaMission Model
-        */
-      private case class NasaMission(id: Int, missionName: String, startYear: Int, endYear: Int)
-        extends Tuple4(id, missionName, startYear, endYear)
-
-      /**
-        * Implementation
-        */
-      private val SERVICE_URL = "pulsar://127.0.0.1:6650"
-      private val TOPIC_NAME = "my-flink-topic"
-
-      private val nasaMissions = List(
-        NasaMission(1, "Mercury program", 1959, 1963),
-        NasaMission(2, "Apollo program", 1961, 1972),
-        NasaMission(3, "Gemini program", 1963, 1966),
-        NasaMission(4, "Skylab", 1973, 1974),
-        NasaMission(5, "Apollo–Soyuz Test Project", 1975, 1975))
-
-      def main(args: Array[String]): Unit = {
-
-        // set up the execution environment
-        val env = ExecutionEnvironment.getExecutionEnvironment
-
-        // create PulsarCsvOutputFormat instance
-        val pulsarCsvOutputFormat =
-          new PulsarCsvOutputFormat[NasaMission](SERVICE_URL, TOPIC_NAME)
-
-        // create DataSet
-        val textDS = env.fromCollection(nasaMissions)
-
-        // map nasa mission names to upper-case
-        textDS.map(nasaMission => NasaMission(
-          nasaMission.id,
-          nasaMission.missionName.toUpperCase,
-          nasaMission.startYear,
-          nasaMission.endYear))
-
-        // filter missions which started after 1970
-        .filter(_.startYear > 1970)
-
-        // write batch data to Pulsar as Csv
-        .output(pulsarCsvOutputFormat)
-
-        // set parallelism to write Pulsar in parallel (optional)
-        env.setParallelism(2)
-
-        // execute program
-        env.execute("Flink - Pulsar Batch Csv")
-      }
-```
-
-### Sample Output
-
-Please find sample output for above application as follows:
-```
-4,SKYLAB,1973,1974
-5,APOLLO–SOYUZ TEST PROJECT,1975,1975
-```
-
-### Complete Example
-
-You can find a complete example [here](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkScalaExample.scala).
-In this example, Flink DataSet is processed and written to Pulsar in Csv format.
-
-
-# PulsarJsonOutputFormat
-### Usage
-
-Please find Scala sample usage of `PulsarJsonOutputFormat` as follows:
-
-```scala
-      /**
-        * NasaMission Model
-        */
-      private case class NasaMission(@BeanProperty id: Int,
-                             @BeanProperty missionName: String,
-                             @BeanProperty startYear: Int,
-                             @BeanProperty endYear: Int)
-
-      /**
-        * Implementation
-        */
-      private val nasaMissions = List(
-        NasaMission(1, "Mercury program", 1959, 1963),
-        NasaMission(2, "Apollo program", 1961, 1972),
-        NasaMission(3, "Gemini program", 1963, 1966),
-        NasaMission(4, "Skylab", 1973, 1974),
-        NasaMission(5, "Apollo–Soyuz Test Project", 1975, 1975))
-
-      private val SERVICE_URL = "pulsar://127.0.0.1:6650"
-      private val TOPIC_NAME = "my-flink-topic"
-
-      def main(args: Array[String]): Unit = {
-
-        // set up the execution environment
-        val env = ExecutionEnvironment.getExecutionEnvironment
-
-        // create PulsarJsonOutputFormat instance
-        val pulsarJsonOutputFormat = new PulsarJsonOutputFormat[NasaMission](SERVICE_URL, TOPIC_NAME)
-
-        // create DataSet
-        val nasaMissionDS = env.fromCollection(nasaMissions)
-
-        // map nasa mission names to upper-case
-        nasaMissionDS.map(nasaMission =>
-          NasaMission(
-            nasaMission.id,
-            nasaMission.missionName.toUpperCase,
-            nasaMission.startYear,
-            nasaMission.endYear))
-
-        // filter missions which started after 1970
-        .filter(_.startYear > 1970)
-
-        // write batch data to Pulsar
-        .output(pulsarJsonOutputFormat)
-
-        // set parallelism to write Pulsar in parallel (optional)
-        env.setParallelism(2)
-
-        // execute program
-        env.execute("Flink - Pulsar Batch Json")
-      }
-```
-
-**Note:** Property definitions of the model should cover `@BeanProperty` to be visible.
-
-### Sample Output
-
-Please find sample output for above application as follows:
-```
-{"id":4,"missionName":"SKYLAB","startYear":1973,"endYear":1974}
-{"id":5,"missionName":"APOLLO–SOYUZ TEST PROJECT","startYear":1975,"endYear":1975}
-```
-
-### Complete Example
-
-You can find a complete example [here](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkScalaExample.scala).
-In this example, Flink DataSet is processed and written to Pulsar in Json format.
-
-
-# PulsarAvroOutputFormat
-### Usage
-
-Please find Scala sample usage of `PulsarAvroOutputFormat` as follows:
-
-```scala
-      val nasaMissions = List(
-          NasaMission.newBuilder.setId(1).setName("Mercury program").setStartYear(1959).setEndYear(1963).build,
-          NasaMission.newBuilder.setId(2).setName("Apollo program").setStartYear(1961).setEndYear(1972).build,
-          NasaMission.newBuilder.setId(3).setName("Gemini program").setStartYear(1963).setEndYear(1966).build,
-          NasaMission.newBuilder.setId(4).setName("Skylab").setStartYear(1973).setEndYear(1974).build,
-          NasaMission.newBuilder.setId(5).setName("Apollo–Soyuz Test Project").setStartYear(1975).setEndYear(1975).build)
-      
-        def main(args: Array[String]): Unit = {
-      
-          // set up the execution environment
-          val env = ExecutionEnvironment.getExecutionEnvironment
-      
-          // create PulsarCsvOutputFormat instance
-          val pulsarAvroOutputFormat =
-            new PulsarAvroOutputFormat[NasaMission](SERVICE_URL, TOPIC_NAME)
-      
-          // create DataSet
-          val textDS = env.fromCollection(nasaMissions)
-      
-          // map nasa mission names to upper-case
-          textDS.map(nasaMission => new NasaMission(
-            nasaMission.getId,
-            nasaMission.getName,
-            nasaMission.getStartYear,
-            nasaMission.getEndYear))
-      
-            // filter missions which started after 1970
-            .filter(_.getStartYear > 1970)
-      
-            // write batch data to Pulsar as Avro
-            .output(pulsarAvroOutputFormat)
-      
-          // set parallelism to write Pulsar in parallel (optional)
-          env.setParallelism(2)
-      
-          // execute program
-          env.execute("Flink - Pulsar Batch Avro")
-        }
-```
-
-**Note:** NasaMission class are automatically generated by Avro
-
-### Sample Output
-
-Please find sample output for above application as follows:
-```
- "4,SKYLAB,1973,1974"
- "5,APOLLO–SOYUZ TEST PROJECT,1975,1975"
-```
-
-### Complete Example
-
-You can find a complete example [here](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkScalaExample.scala).
-In this example, Flink DataSet is processed and written to Pulsar in Avro format.
diff --git a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/BasePulsarOutputFormat.java b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/BasePulsarOutputFormat.java
index d5f4af5..ca34327 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/BasePulsarOutputFormat.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/BasePulsarOutputFormat.java
@@ -44,18 +44,18 @@ public abstract class BasePulsarOutputFormat<T> extends RichOutputFormat<T>  {
     private transient Function<Throwable, MessageId> failureCallback;
     private static volatile Producer<byte[]> producer;
 
-    protected static String serviceUrl;
-    protected static String topicName;
+    protected final String serviceUrl;
+    protected final String topicName;
     protected SerializationSchema<T> serializationSchema;
 
-    protected BasePulsarOutputFormat(String serviceUrl, String topicName) {
+    protected BasePulsarOutputFormat(final String serviceUrl, final String topicName) {
         Preconditions.checkArgument(StringUtils.isNotBlank(serviceUrl), "serviceUrl cannot be blank.");
         Preconditions.checkArgument(StringUtils.isNotBlank(topicName),  "topicName cannot be blank.");
 
         this.serviceUrl = serviceUrl;
         this.topicName = topicName;
 
-        LOG.info("PulsarOutputFormat is being started to write batches to Pulsar topic {}", this.topicName);
+        LOG.info("PulsarOutputFormat is being started to write batches to Pulsar topic: {}", this.topicName);
     }
 
     @Override
@@ -65,10 +65,10 @@ public abstract class BasePulsarOutputFormat<T> extends RichOutputFormat<T>  {
 
     @Override
     public void open(int taskNumber, int numTasks) throws IOException {
-        this.producer = getProducerInstance();
+        this.producer = getProducerInstance(serviceUrl, topicName);
 
         this.failureCallback = cause -> {
-            LOG.error("Error while sending record to Pulsar : " + cause.getMessage(), cause);
+            LOG.error("Error while sending record to Pulsar: " + cause.getMessage(), cause);
             return null;
         };
     }
@@ -85,11 +85,11 @@ public abstract class BasePulsarOutputFormat<T> extends RichOutputFormat<T>  {
 
     }
 
-    private static Producer<byte[]> getProducerInstance() throws PulsarClientException {
+    private static Producer<byte[]> getProducerInstance(String serviceUrl, String topicName) throws PulsarClientException {
         if(producer == null){
             synchronized (PulsarOutputFormat.class) {
                 if(producer == null){
-                    producer = Preconditions.checkNotNull(createPulsarProducer(),
+                    producer = Preconditions.checkNotNull(createPulsarProducer(serviceUrl, topicName),
                             "Pulsar producer cannot be null.");
                 }
             }
@@ -97,7 +97,7 @@ public abstract class BasePulsarOutputFormat<T> extends RichOutputFormat<T>  {
         return producer;
     }
 
-    private static Producer<byte[]> createPulsarProducer() throws PulsarClientException {
+    private static Producer<byte[]> createPulsarProducer(String serviceUrl, String topicName) throws PulsarClientException {
         try {
             PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();
             return client.newProducer().topic(topicName).create();
diff --git a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java
index e532bfd..889970f 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java
@@ -28,9 +28,9 @@ public class PulsarOutputFormat<T> extends BasePulsarOutputFormat<T> {
 
     private static final long serialVersionUID = 2997027580167793000L;
 
-    public PulsarOutputFormat(String serviceUrl, String topicName, SerializationSchema<T> serializationSchema) {
+    public PulsarOutputFormat(String serviceUrl, String topicName, final SerializationSchema<T> serializationSchema) {
         super(serviceUrl, topicName);
-        Preconditions.checkNotNull(serializationSchema,  "serializationSchema cannot be null.");
+        Preconditions.checkNotNull(serializationSchema, "serializationSchema cannot be null.");
         this.serializationSchema = serializationSchema;
     }