You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/12/28 21:25:45 UTC
[pulsar] branch master updated: [Pulsar-Flink] Refactor Flink Batch
Sink Examples (#3262)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 140d375 [Pulsar-Flink] Refactor Flink Batch Sink Examples (#3262)
140d375 is described below
commit 140d375e795f26b6176d606030f7bcaa611858f2
Author: Eren Avsarogullari <er...@gmail.com>
AuthorDate: Fri Dec 28 21:25:40 2018 +0000
[Pulsar-Flink] Refactor Flink Batch Sink Examples (#3262)
### Motivation
This PR aims to align Flink Batch Sinks Java and Scala Examples with Streaming ones.
Following PR with #3190
### Modifications
1- Provides a single executable jar by covering all examples. Flink `-c <className>` parameter has been suggested to run each example (e.g: `./bin/flink run -c org.apache.flink.batch.connectors.pulsar.example.FlinkPulsarBatchSinkScalaExample ...`).
2- Adds user-defined arguments support to examples (e.g: `--service-url pulsar://127.0.0.1:6650 --topic test-topic`) instead of harcoded `service-url`and `topic-name`.
3- Aligns existing Batch documentation with Streaming documentation.
4- Combines both Java and Scala Examples documentations via single `README.md`
---
examples/flink-consumer-source/pom.xml | 32 +-
.../example/FlinkPulsarBatchAvroSinkExample.java | 24 +-
.../example/FlinkPulsarBatchCsvSinkExample.java | 23 +-
.../example/FlinkPulsarBatchJsonSinkExample.java | 23 +-
.../example/FlinkPulsarBatchSinkExample.java | 23 +-
.../batch/connectors/pulsar/example/README.md | 342 ++++++--------------
.../streaming/connectors/pulsar/example/README.md | 6 +-
.../FlinkPulsarBatchAvroSinkScalaExample.scala | 25 +-
.../FlinkPulsarBatchCsvSinkScalaExample.scala | 23 +-
.../FlinkPulsarBatchJsonSinkScalaExample.scala | 24 +-
.../example/FlinkPulsarBatchSinkScalaExample.scala | 22 +-
.../batch/connectors/pulsar/example/README.md | 345 ---------------------
.../connectors/pulsar/BasePulsarOutputFormat.java | 18 +-
.../connectors/pulsar/PulsarOutputFormat.java | 4 +-
14 files changed, 293 insertions(+), 641 deletions(-)
diff --git a/examples/flink-consumer-source/pom.xml b/examples/flink-consumer-source/pom.xml
index 35f5924..088444b 100644
--- a/examples/flink-consumer-source/pom.xml
+++ b/examples/flink-consumer-source/pom.xml
@@ -56,12 +56,6 @@
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-scala_${scala.binary.version}</artifactId>
- <version>${flink.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
@@ -77,6 +71,7 @@
<artifactId>pulsar-flink</artifactId>
<version>${project.version}</version>
</dependency>
+
</dependencies>
<build>
@@ -107,7 +102,7 @@
<mainClass>org.apache.flink.streaming.connectors.pulsar.example.PulsarConsumerSourceWordCount</mainClass>
</transformer>
</transformers>
- <finalName>pulsar-flink-streaming-wordcount</finalName>
+ <finalName>pulsar-flink-examples</finalName>
<filters>
<filter>
<artifact>*</artifact>
@@ -115,6 +110,15 @@
<include>org/apache/flink/streaming/examples/kafka/**</include>
<include>org/apache/flink/streaming/**</include>
<include>org/apache/pulsar/**</include>
+ <include>org/apache/flink/batch/**</include>
+ <include>net/jpountz/**</include>
+ <include>com/scurrilous/circe/**</include>
+ <include>org/apache/commons/csv/**</include>
+ <include>org/apache/flink/avro/generated/**</include>
+ <include>org/apache/avro/**</include>
+ <include>org/codehaus/jackson/**</include>
+ <include>avro/shaded/com/google/common/**</include>
+ <include>org/apache/flink/formats/avro/**</include>
</includes>
</filter>
</filters>
@@ -122,6 +126,20 @@
</execution>
</executions>
</plugin>
+ <!-- Scala Plugin to compile Scala Files -->
+ <plugin>
+ <groupId>net.alchim31.maven</groupId>
+ <artifactId>scala-maven-plugin</artifactId>
+ <version>3.4.4</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>compile</goal>
+ <goal>add-source</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
<!-- Generate Test class from avro schema -->
<plugin>
<groupId>org.apache.avro</groupId>
diff --git a/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java b/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java
index ef0048c..584d59f 100644
--- a/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java
+++ b/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java
@@ -18,10 +18,10 @@
*/
package org.apache.flink.batch.connectors.pulsar.example;
-
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.avro.generated.NasaMission;
import org.apache.flink.batch.connectors.pulsar.PulsarAvroOutputFormat;
@@ -40,16 +40,30 @@ public class FlinkPulsarBatchAvroSinkExample {
NasaMission.newBuilder().setId(4).setName("Skylab").setStartYear(1973).setEndYear(1974).build(),
NasaMission.newBuilder().setId(5).setName("Apollo–Soyuz Test Project").setStartYear(1975).setEndYear(1975).build());
- private static final String SERVICE_URL = "pulsar://127.0.0.1:6650";
- private static final String TOPIC_NAME = "my-flink-topic";
-
public static void main(String[] args) throws Exception {
+ // parse input arguments
+ final ParameterTool parameterTool = ParameterTool.fromArgs(args);
+
+ if (parameterTool.getNumberOfParameters() < 2) {
+ System.out.println("Missing parameters!");
+ System.out.println("Usage: pulsar --service-url <pulsar-service-url> --topic <topic>");
+ return;
+ }
+
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().setGlobalJobParameters(parameterTool);
+
+ String serviceUrl = parameterTool.getRequired("service-url");
+ String topic = parameterTool.getRequired("topic");
+
+ System.out.println("Parameters:");
+ System.out.println("\tServiceUrl:\t" + serviceUrl);
+ System.out.println("\tTopic:\t" + topic);
// create PulsarAvroOutputFormat instance
- final OutputFormat<NasaMission> pulsarAvroOutputFormat = new PulsarAvroOutputFormat<>(SERVICE_URL, TOPIC_NAME);
+ final OutputFormat<NasaMission> pulsarAvroOutputFormat = new PulsarAvroOutputFormat<>(serviceUrl, topic);
// create DataSet
DataSet<NasaMission> nasaMissionDS = env.fromCollection(nasaMissions);
diff --git a/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java b/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java
index 6b0f0ca..3e658dc 100644
--- a/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java
+++ b/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.batch.connectors.pulsar.PulsarCsvOutputFormat;
import java.util.Arrays;
@@ -40,17 +41,31 @@ public class FlinkPulsarBatchCsvSinkExample {
new Tuple4(4, "Skylab", 1973, 1974),
new Tuple4(5, "Apollo–Soyuz Test Project", 1975, 1975));
- private static final String SERVICE_URL = "pulsar://127.0.0.1:6650";
- private static final String TOPIC_NAME = "my-flink-topic";
-
public static void main(String[] args) throws Exception {
+ // parse input arguments
+ final ParameterTool parameterTool = ParameterTool.fromArgs(args);
+
+ if (parameterTool.getNumberOfParameters() < 2) {
+ System.out.println("Missing parameters!");
+ System.out.println("Usage: pulsar --service-url <pulsar-service-url> --topic <topic>");
+ return;
+ }
+
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().setGlobalJobParameters(parameterTool);
+
+ String serviceUrl = parameterTool.getRequired("service-url");
+ String topic = parameterTool.getRequired("topic");
+
+ System.out.println("Parameters:");
+ System.out.println("\tServiceUrl:\t" + serviceUrl);
+ System.out.println("\tTopic:\t" + topic);
// create PulsarCsvOutputFormat instance
final OutputFormat<Tuple4<Integer, String, Integer, Integer>> pulsarCsvOutputFormat =
- new PulsarCsvOutputFormat<>(SERVICE_URL, TOPIC_NAME);
+ new PulsarCsvOutputFormat<>(serviceUrl, topic);
// create DataSet
DataSet<Tuple4<Integer, String, Integer, Integer>> nasaMissionDS = env.fromCollection(nasaMissions);
diff --git a/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkExample.java b/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkExample.java
index e037616..3937ae9 100644
--- a/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkExample.java
+++ b/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkExample.java
@@ -21,6 +21,7 @@ package org.apache.flink.batch.connectors.pulsar.example;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.batch.connectors.pulsar.PulsarJsonOutputFormat;
import java.util.Arrays;
@@ -38,16 +39,30 @@ public class FlinkPulsarBatchJsonSinkExample {
new NasaMission(4, "Skylab", 1973, 1974),
new NasaMission(5, "Apollo–Soyuz Test Project", 1975, 1975));
- private static final String SERVICE_URL = "pulsar://127.0.0.1:6650";
- private static final String TOPIC_NAME = "my-flink-topic";
-
public static void main(String[] args) throws Exception {
+ // parse input arguments
+ final ParameterTool parameterTool = ParameterTool.fromArgs(args);
+
+ if (parameterTool.getNumberOfParameters() < 2) {
+ System.out.println("Missing parameters!");
+ System.out.println("Usage: pulsar --service-url <pulsar-service-url> --topic <topic>");
+ return;
+ }
+
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().setGlobalJobParameters(parameterTool);
+
+ String serviceUrl = parameterTool.getRequired("service-url");
+ String topic = parameterTool.getRequired("topic");
+
+ System.out.println("Parameters:");
+ System.out.println("\tServiceUrl:\t" + serviceUrl);
+ System.out.println("\tTopic:\t" + topic);
// create PulsarJsonOutputFormat instance
- final OutputFormat<NasaMission> pulsarJsonOutputFormat = new PulsarJsonOutputFormat<>(SERVICE_URL, TOPIC_NAME);
+ final OutputFormat<NasaMission> pulsarJsonOutputFormat = new PulsarJsonOutputFormat<>(serviceUrl, topic);
// create DataSet
DataSet<NasaMission> nasaMissionDS = env.fromCollection(nasaMissions);
diff --git a/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java b/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java
index 6724c62..c90d016 100644
--- a/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java
+++ b/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.batch.connectors.pulsar.PulsarOutputFormat;
import org.apache.flink.util.Collector;
@@ -35,17 +36,31 @@ public class FlinkPulsarBatchSinkExample {
private static final String EINSTEIN_QUOTE = "Imagination is more important than knowledge. " +
"Knowledge is limited. Imagination encircles the world.";
- private static final String SERVICE_URL = "pulsar://127.0.0.1:6650";
- private static final String TOPIC_NAME = "my-flink-topic";
-
public static void main(String[] args) throws Exception {
+ // parse input arguments
+ final ParameterTool parameterTool = ParameterTool.fromArgs(args);
+
+ if (parameterTool.getNumberOfParameters() < 2) {
+ System.out.println("Missing parameters!");
+ System.out.println("Usage: pulsar --service-url <pulsar-service-url> --topic <topic>");
+ return;
+ }
+
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().setGlobalJobParameters(parameterTool);
+
+ String serviceUrl = parameterTool.getRequired("service-url");
+ String topic = parameterTool.getRequired("topic");
+
+ System.out.println("Parameters:");
+ System.out.println("\tServiceUrl:\t" + serviceUrl);
+ System.out.println("\tTopic:\t" + topic);
// create PulsarOutputFormat instance
final OutputFormat pulsarOutputFormat =
- new PulsarOutputFormat(SERVICE_URL, TOPIC_NAME, wordWithCount -> wordWithCount.toString().getBytes());
+ new PulsarOutputFormat(serviceUrl, topic, wordWithCount -> wordWithCount.toString().getBytes());
// create DataSet
DataSet<String> textDS = env.fromElements(EINSTEIN_QUOTE);
diff --git a/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/README.md b/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/README.md
index 3116b3b..b93f5a3 100644
--- a/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/README.md
+++ b/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/README.md
@@ -53,71 +53,56 @@ dependencies {
}
```
-# PulsarOutputFormat
-### Usage
+# Example
-Please find a sample usage as follows:
+### PulsarOutputFormat
-```java
- private static final String EINSTEIN_QUOTE = "Imagination is more important than knowledge. " +
- "Knowledge is limited. Imagination encircles the world.";
+In this example, Flink DataSet is processed as word-count and being written to Pulsar. Please find a complete example for PulsarOutputFormat as follows:
+[java](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java)
+[scala](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkScalaExample.scala)
- private static final String SERVICE_URL = "pulsar://127.0.0.1:6650";
- private static final String TOPIC_NAME = "my-flink-topic";
+The steps to run the example:
- public static void main(String[] args) throws Exception {
+1. Start Pulsar Standalone.
- // set up the execution environment
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ You can follow the [instructions](https://pulsar.apache.org/docs/en/standalone/) to start a Pulsar standalone locally.
- // create PulsarOutputFormat instance
- final OutputFormat<String> pulsarOutputFormat =
- new PulsarOutputFormat(SERVICE_URL, TOPIC_NAME, wordWithCount -> wordWithCount.toString().getBytes());
+ ```shell
+ $ bin/pulsar standalone
+ ```
- // create DataSet
- DataSet<String> textDS = env.fromElements(EINSTEIN_QUOTE);
+2. Start Flink locally.
- textDS.flatMap(new FlatMapFunction<String, String>() {
- @Override
- public void flatMap(String value, Collector<String> out) throws Exception {
- String[] words = value.toLowerCase().split(" ");
- for(String word: words) {
- out.collect(word.replace(".", ""));
- }
- }
- })
- // filter words which length is bigger than 4
- .filter(word -> word.length() > 4)
+ You can follow the [instructions](https://ci.apache.org/projects/flink/flink-docs-release-1.6/quickstart/setup_quickstart.html) to download and start Flink.
- // write batch data to Pulsar
- .output(pulsarOutputFormat);
+ ```shell
+ $ ./bin/start-cluster.sh
+ ```
- // execute program
- env.execute("Flink - Pulsar Batch WordCount");
- }
-```
+3. Build the examples.
-### Sample Output
+ ```shell
+ $ cd ${PULSAR_HOME}
+ $ mvn clean install -DskipTests
+ ```
-Please find sample output for above application as follows:
-```
-imagination
-important
-knowledge
-knowledge
-limited
-imagination
-encircles
-world
-```
+4. Run the word count example to print results to stdout.
+
+ ```shell
+ # java
+ $ ./bin/flink run -c org.apache.flink.batch.connectors.pulsar.example.FlinkPulsarBatchSinkExample ${PULSAR_HOME}/examples/flink-consumer-source/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --topic test_flink_topic
-### Complete Example
+ # scala
+ $ ./bin/flink run -c org.apache.flink.batch.connectors.pulsar.example.FlinkPulsarBatchSinkScalaExample ${PULSAR_HOME}/examples/flink-consumer-source/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --topic test_flink_topic
+ ```
-You can find a complete example [here](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java).
-In this example, Flink DataSet is processed as word-count and being written to Pulsar.
+5. Once the flink word count example is running, you can use `bin/pulsar-client` to tail the results produced into topic `test_flink_topic`.
-### Complete Example Output
-Please find sample output for above linked application as follows:
+```shell
+$ bin/pulsar-client consume -n 0 -s test test_flink_topic
+```
+
+6. Please find sample output for above linked application as follows:
```
WordWithCount { word = important, count = 1 }
WordWithCount { word = encircles, count = 1 }
@@ -127,230 +112,105 @@ WordWithCount { word = limited, count = 1 }
WordWithCount { word = world, count = 1 }
```
-# PulsarCsvOutputFormat
-### Usage
-
-Please find a sample usage as follows:
-
-```java
- private static final List<Tuple4<Integer, String, Integer, Integer>> nasaMissions = Arrays.asList(
- new Tuple4(1, "Mercury program", 1959, 1963),
- new Tuple4(2, "Apollo program", 1961, 1972),
- new Tuple4(3, "Gemini program", 1963, 1966),
- new Tuple4(4, "Skylab", 1973, 1974),
- new Tuple4(5, "Apollo–Soyuz Test Project", 1975, 1975));
-
- private static final String SERVICE_URL = "pulsar://127.0.0.1:6650";
- private static final String TOPIC_NAME = "my-flink-topic";
-
- public static void main(String[] args) throws Exception {
-
- // set up the execution environment
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- // create PulsarCsvOutputFormat instance
- final OutputFormat<Tuple4<Integer, String, Integer, Integer>> pulsarCsvOutputFormat =
- new PulsarCsvOutputFormat<>(SERVICE_URL, TOPIC_NAME);
-
- // create DataSet
- DataSet<Tuple4<Integer, String, Integer, Integer>> nasaMissionDS = env.fromCollection(nasaMissions);
- // map nasa mission names to upper-case
- nasaMissionDS.map(
- new MapFunction<Tuple4<Integer, String, Integer, Integer>, Tuple4<Integer, String, Integer, Integer>>() {
- @Override
- public Tuple4<Integer, String, Integer, Integer> map(
- Tuple4<Integer, String, Integer, Integer> nasaMission) throws Exception {
- return new Tuple4(
- nasaMission.f0,
- nasaMission.f1.toUpperCase(),
- nasaMission.f2,
- nasaMission.f3);
- }
- }
- )
- // filter missions which started after 1970
- .filter(nasaMission -> nasaMission.f2 > 1970)
- // write batch data to Pulsar
- .output(pulsarCsvOutputFormat);
-
- // set parallelism to write Pulsar in parallel (optional)
- env.setParallelism(2);
-
- // execute program
- env.execute("Flink - Pulsar Batch Csv");
-
- }
-```
-
-### Sample Output
-Please find sample output for above application as follows:
-```
-4,SKYLAB,1973,1974
-5,APOLLO–SOYUZ TEST PROJECT,1975,1975
-```
+### PulsarCsvOutputFormat
-### Complete Example
+In this example, Flink DataSet is processed and written to Pulsar in Csv format. Please find a complete example for PulsarCsvOutputFormat as follows:
+[java](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java)
+[scala](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkScalaExample.scala)
-You can find a complete example [here](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java).
-In this example, Flink DataSet is processed and written to Pulsar in Csv format.
+The steps to run the example:
+Step 1, 2 and 3 are same as above.
-# PulsarJsonOutputFormat
-### Usage
+4. Run the word count example to print results to stdout.
-Please find a sample usage as follows:
+ ```shell
+ # java
+ $ ./bin/flink run -c org.apache.flink.batch.connectors.pulsar.example.FlinkPulsarBatchCsvSinkExample ${PULSAR_HOME}/examples/flink-consumer-source/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --topic test_flink_topic
-```java
- private static final List<NasaMission> nasaMissions = Arrays.asList(
- new NasaMission(1, "Mercury program", 1959, 1963),
- new NasaMission(2, "Apollo program", 1961, 1972),
- new NasaMission(3, "Gemini program", 1963, 1966),
- new NasaMission(4, "Skylab", 1973, 1974),
- new NasaMission(5, "Apollo–Soyuz Test Project", 1975, 1975));
+ # scala
+ $ ./bin/flink run -c org.apache.flink.batch.connectors.pulsar.example.FlinkPulsarBatchCsvSinkScalaExample ${PULSAR_HOME}/examples/flink-consumer-source/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --topic test_flink_topic
+ ```
- private static final String SERVICE_URL = "pulsar://127.0.0.1:6650";
- private static final String TOPIC_NAME = "my-flink-topic";
+5. Once the flink word count example is running, you can use `bin/pulsar-client` to tail the results produced into topic `test_flink_topic`.
- public static void main(String[] args) throws Exception {
-
- // set up the execution environment
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+```shell
+$ bin/pulsar-client consume -n 0 -s test test_flink_topic
+```
- // create PulsarJsonOutputFormat instance
- final OutputFormat<NasaMission> pulsarJsonOutputFormat = new PulsarJsonOutputFormat<>(SERVICE_URL, TOPIC_NAME);
+6. Please find sample output for above linked application as follows:
+```
+4,SKYLAB,1973,1974
+5,APOLLO–SOYUZ TEST PROJECT,1975,1975
+```
- // create DataSet
- DataSet<NasaMission> nasaMissionDS = env.fromCollection(nasaMissions);
- // map nasa mission names to upper-case
- nasaMissionDS.map(nasaMission -> new NasaMission(
- nasaMission.id,
- nasaMission.missionName.toUpperCase(),
- nasaMission.startYear,
- nasaMission.endYear))
- // filter missions which started after 1970
- .filter(nasaMission -> nasaMission.startYear > 1970)
- // write batch data to Pulsar
- .output(pulsarJsonOutputFormat);
- // set parallelism to write Pulsar in parallel (optional)
- env.setParallelism(2);
+### PulsarJsonOutputFormat
- // execute program
- env.execute("Flink - Pulsar Batch Json");
- }
+In this example, Flink DataSet is processed and written to Pulsar in Json format. Please find a complete example for PulsarJsonOutputFormat as follows:
+[java](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkExample.java)
+[scala](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkScalaExample.scala)
- /**
- * NasaMission data model
- *
- * Note: Property definitions of the model should be public or have getter functions to be visible
- */
- private static class NasaMission {
+**Note:** Property definitions of the model should be public or have getter functions to be visible.
- private int id;
- private String missionName;
- private int startYear;
- private int endYear;
+The steps to run the example:
- public NasaMission(int id, String missionName, int startYear, int endYear) {
- this.id = id;
- this.missionName = missionName;
- this.startYear = startYear;
- this.endYear = endYear;
- }
+Step 1, 2 and 3 are same as above.
- public int getId() {
- return id;
- }
+4. Run the word count example to print results to stdout.
- public String getMissionName() {
- return missionName;
- }
+ ```shell
+ # java
+ $ ./bin/flink run -c org.apache.flink.batch.connectors.pulsar.example.FlinkPulsarBatchJsonSinkExample ${PULSAR_HOME}/examples/flink-consumer-source/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --topic test_flink_topic
- public int getStartYear() {
- return startYear;
- }
+ # scala
+ $ ./bin/flink run -c org.apache.flink.batch.connectors.pulsar.example.FlinkPulsarBatchJsonSinkScalaExample ${PULSAR_HOME}/examples/flink-consumer-source/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --topic test_flink_topic
+ ```
- public int getEndYear() {
- return endYear;
- }
- }
+5. Once the flink word count example is running, you can use `bin/pulsar-client` to tail the results produced into topic `test_flink_topic`.
+```shell
+$ bin/pulsar-client consume -n 0 -s test test_flink_topic
```
-**Note:** Property definitions of the model should be public or have getter functions to be visible
-
-### Sample Output
-
-Please find sample output for above application as follows:
+6. Please find sample output for above linked application as follows:
```
{"id":4,"missionName":"SKYLAB","startYear":1973,"endYear":1974}
{"id":5,"missionName":"APOLLO–SOYUZ TEST PROJECT","startYear":1975,"endYear":1975}
```
-### Complete Example
-
-You can find a complete example [here](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkExample.java).
-In this example, Flink DataSet is processed and written to Pulsar in Json format.
-
-
-# PulsarAvroOutputFormat
-### Usage
-
-Please find a sample usage as follows:
-
-```java
- private static final List<NasaMission> nasaMissions = Arrays.asList(
- NasaMission.newBuilder().setId(1).setName("Mercury program").setStartYear(1959).setEndYear(1963).build(),
- NasaMission.newBuilder().setId(2).setName("Apollo program").setStartYear(1961).setEndYear(1972).build(),
- NasaMission.newBuilder().setId(3).setName("Gemini program").setStartYear(1963).setEndYear(1966).build(),
- NasaMission.newBuilder().setId(4).setName("Skylab").setStartYear(1973).setEndYear(1974).build(),
- NasaMission.newBuilder().setId(5).setName("Apollo–Soyuz Test Project").setStartYear(1975).setEndYear(1975).build());
-
- private static final String SERVICE_URL = "pulsar://127.0.0.1:6650";
- private static final String TOPIC_NAME = "my-flink-topic";
-
- public static void main(String[] args) throws Exception {
-
- // set up the execution environment
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- // create PulsarAvroOutputFormat instance
- final OutputFormat<NasaMission> pulsarAvroOutputFormat = new PulsarAvroOutputFormat<>(SERVICE_URL, TOPIC_NAME);
-
- // create DataSet
- DataSet<NasaMission> nasaMissionDS = env.fromCollection(nasaMissions);
- // map nasa mission names to upper-case
- nasaMissionDS.map(nasaMission -> new NasaMission(
- nasaMission.getId(),
- nasaMission.getName(),
- nasaMission.getStartYear(),
- nasaMission.getEndYear()))
- // filter missions which started after 1970
- .filter(nasaMission -> nasaMission.getStartYear() > 1970)
- // write batch data to Pulsar
- .output(pulsarAvroOutputFormat);
-
- // set parallelism to write Pulsar in parallel (optional)
- env.setParallelism(2);
-
- // execute program
- env.execute("Flink - Pulsar Batch Avro");
- }
-```
+### PulsarAvroOutputFormat
+
+In this example, Flink DataSet is processed and written to Pulsar in Json format. Please find a complete example for PulsarAvroOutputFormat as follows:
+[java](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java)
+[scala](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkScalaExample.scala)
+
+**Note:** NasaMission class are automatically generated by Avro.
+
+The steps to run the example:
-**Note:** NasaMission class are automatically generated by Avro
+Step 1, 2 and 3 are same as above.
-### Sample Output
+4. Run the word count example to print results to stdout.
-Please find sample output for above application as follows:
+ ```shell
+ # java
+ $ ./bin/flink run -c org.apache.flink.batch.connectors.pulsar.example.FlinkPulsarBatchAvroSinkExample ${PULSAR_HOME}/examples/flink-consumer-source/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --topic test_flink_topic
+
+ # scala
+ $ ./bin/flink run -c org.apache.flink.batch.connectors.pulsar.example.FlinkPulsarBatchAvroSinkScalaExample ${PULSAR_HOME}/examples/flink-consumer-source/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --topic test_flink_topic
+ ```
+
+5. Once the flink word count example is running, you can use `bin/pulsar-client` to tail the results produced into topic `test_flink_topic`.
+
+```shell
+$ bin/pulsar-client consume -n 0 -s test test_flink_topic
+```
+
+6. Please find sample output for above linked application as follows:
```
"4,SKYLAB,1973,1974"
"5,APOLLO–SOYUZ TEST PROJECT,1975,1975"
```
-
-### Complete Example
-
-You can find a complete example [here](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java).
-In this example, Flink DataSet is processed and written to Pulsar in Avro format.
diff --git a/examples/flink-consumer-source/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/README.md b/examples/flink-consumer-source/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/README.md
index 16c88f1..463e805 100644
--- a/examples/flink-consumer-source/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/README.md
+++ b/examples/flink-consumer-source/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/README.md
@@ -30,7 +30,7 @@ See the [Pulsar Concepts](https://pulsar.apache.org/docs/en/concepts-overview/)
### PulsarConsumerSourceWordCount
-This Flink streaming job is consuming from a Pulsar topic and couting the wordcount in a streaming fashion. The job can write the word count results
+This Flink streaming job is consuming from a Pulsar topic and counting the wordcount in a streaming fashion. The job can write the word count results
to stdout or another Pulsar topic.
The steps to run the example:
@@ -61,7 +61,7 @@ The steps to run the example:
4. Run the word count example to print results to stdout.
```shell
- $ ./bin/flink run ${PULSAR_HOME}/examples/flink-consumer-source/target/pulsar-flink-streaming-wordcount.jar --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub
+ $ ./bin/flink run ${PULSAR_HOME}/examples/flink-consumer-source/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub
```
5. Produce messages to topic `test_src`.
@@ -85,7 +85,7 @@ The steps to run the example:
Alternatively, when you run the flink word count example at step 4, you can choose dump the result to another pulsar topic.
```shell
-$ ./bin/flink run ${PULSAR_HOME}/examples/flink-consumer-source/target/pulsar-flink-streaming-wordcount.jar --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub --output-topic test_dest
+$ ./bin/flink run ${PULSAR_HOME}/examples/flink-consumer-source/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub --output-topic test_dest
```
Once the flink word count example is running, you can use `bin/pulsar-client` to tail the results produced into topic `test_dest`.
diff --git a/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkScalaExample.scala b/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkScalaExample.scala
index 0d255f2..f10d6c1 100644
--- a/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkScalaExample.scala
+++ b/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkScalaExample.scala
@@ -18,6 +18,7 @@
*/
package org.apache.flink.batch.connectors.pulsar.example
+import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.api.scala._
import org.apache.flink.avro.generated.NasaMission
import org.apache.flink.batch.connectors.pulsar.PulsarAvroOutputFormat
@@ -27,10 +28,7 @@ import org.apache.flink.batch.connectors.pulsar.PulsarAvroOutputFormat
*/
object FlinkPulsarBatchAvroSinkScalaExample {
- private val SERVICE_URL = "pulsar://127.0.0.1:6650"
- private val TOPIC_NAME = "my-flink-topic"
-
- val nasaMissions = List(
+ private val nasaMissions = List(
NasaMission.newBuilder.setId(1).setName("Mercury program").setStartYear(1959).setEndYear(1963).build,
NasaMission.newBuilder.setId(2).setName("Apollo program").setStartYear(1961).setEndYear(1972).build,
NasaMission.newBuilder.setId(3).setName("Gemini program").setStartYear(1963).setEndYear(1966).build,
@@ -39,12 +37,29 @@ object FlinkPulsarBatchAvroSinkScalaExample {
def main(args: Array[String]): Unit = {
+ // parse input arguments
+ val parameterTool = ParameterTool.fromArgs(args)
+
+ if (parameterTool.getNumberOfParameters < 2) {
+ println("Missing parameters!")
+ println("Usage: pulsar --service-url <pulsar-service-url> --topic <topic>")
+ return
+ }
+
// set up the execution environment
val env = ExecutionEnvironment.getExecutionEnvironment
+ env.getConfig.setGlobalJobParameters(parameterTool)
+
+ val serviceUrl = parameterTool.getRequired("service-url")
+ val topic = parameterTool.getRequired("topic")
+
+ println("Parameters:")
+ println("\tServiceUrl:\t" + serviceUrl)
+ println("\tTopic:\t" + topic)
// create PulsarCsvOutputFormat instance
val pulsarAvroOutputFormat =
- new PulsarAvroOutputFormat[NasaMission](SERVICE_URL, TOPIC_NAME)
+ new PulsarAvroOutputFormat[NasaMission](serviceUrl, topic)
// create DataSet
val textDS = env.fromCollection(nasaMissions)
diff --git a/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkScalaExample.scala b/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkScalaExample.scala
index 7db844b..3233616 100644
--- a/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkScalaExample.scala
+++ b/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkScalaExample.scala
@@ -19,6 +19,7 @@
package org.apache.flink.batch.connectors.pulsar.example
import org.apache.flink.api.java.tuple.Tuple4
+import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.api.scala._
import org.apache.flink.batch.connectors.pulsar.PulsarCsvOutputFormat
@@ -33,9 +34,6 @@ object FlinkPulsarBatchCsvSinkScalaExample {
private case class NasaMission(id: Int, missionName: String, startYear: Int, endYear: Int)
extends Tuple4(id, missionName, startYear, endYear)
- private val SERVICE_URL = "pulsar://127.0.0.1:6650"
- private val TOPIC_NAME = "my-flink-topic"
-
private val nasaMissions = List(
NasaMission(1, "Mercury program", 1959, 1963),
NasaMission(2, "Apollo program", 1961, 1972),
@@ -45,12 +43,29 @@ object FlinkPulsarBatchCsvSinkScalaExample {
def main(args: Array[String]): Unit = {
+ // parse input arguments
+ val parameterTool = ParameterTool.fromArgs(args)
+
+ if (parameterTool.getNumberOfParameters < 2) {
+ println("Missing parameters!")
+ println("Usage: pulsar --service-url <pulsar-service-url> --topic <topic>")
+ return
+ }
+
// set up the execution environment
val env = ExecutionEnvironment.getExecutionEnvironment
+ env.getConfig.setGlobalJobParameters(parameterTool)
+
+ val serviceUrl = parameterTool.getRequired("service-url")
+ val topic = parameterTool.getRequired("topic")
+
+ println("Parameters:")
+ println("\tServiceUrl:\t" + serviceUrl)
+ println("\tTopic:\t" + topic)
// create PulsarCsvOutputFormat instance
val pulsarCsvOutputFormat =
- new PulsarCsvOutputFormat[NasaMission](SERVICE_URL, TOPIC_NAME)
+ new PulsarCsvOutputFormat[NasaMission](serviceUrl, topic)
// create DataSet
val textDS = env.fromCollection(nasaMissions)
diff --git a/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkScalaExample.scala b/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkScalaExample.scala
index 1f7fc19..60d02e5 100644
--- a/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkScalaExample.scala
+++ b/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkScalaExample.scala
@@ -18,9 +18,9 @@
*/
package org.apache.flink.batch.connectors.pulsar.example
+import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.api.scala._
import org.apache.flink.batch.connectors.pulsar.PulsarJsonOutputFormat
-
import scala.beans.BeanProperty
/**
@@ -43,16 +43,30 @@ object FlinkPulsarBatchJsonSinkScalaExample {
NasaMission(4, "Skylab", 1973, 1974),
NasaMission(5, "Apollo–Soyuz Test Project", 1975, 1975))
- private val SERVICE_URL = "pulsar://127.0.0.1:6650"
- private val TOPIC_NAME = "my-flink-topic"
-
def main(args: Array[String]): Unit = {
+ // parse input arguments
+ val parameterTool = ParameterTool.fromArgs(args)
+
+ if (parameterTool.getNumberOfParameters < 2) {
+ println("Missing parameters!")
+ println("Usage: pulsar --service-url <pulsar-service-url> --topic <topic>")
+ return
+ }
+
// set up the execution environment
val env = ExecutionEnvironment.getExecutionEnvironment
+ env.getConfig.setGlobalJobParameters(parameterTool)
+
+ val serviceUrl = parameterTool.getRequired("service-url")
+ val topic = parameterTool.getRequired("topic")
+
+ println("Parameters:")
+ println("\tServiceUrl:\t" + serviceUrl)
+ println("\tTopic:\t" + topic)
// create PulsarJsonOutputFormat instance
- val pulsarJsonOutputFormat = new PulsarJsonOutputFormat[NasaMission](SERVICE_URL, TOPIC_NAME)
+ val pulsarJsonOutputFormat = new PulsarJsonOutputFormat[NasaMission](serviceUrl, topic)
// create DataSet
val nasaMissionDS = env.fromCollection(nasaMissions)
diff --git a/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkScalaExample.scala b/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkScalaExample.scala
index 5e536cf..4de0dcb 100644
--- a/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkScalaExample.scala
+++ b/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkScalaExample.scala
@@ -19,6 +19,7 @@
package org.apache.flink.batch.connectors.pulsar.example
import org.apache.flink.api.common.serialization.SerializationSchema
+import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.api.scala._
import org.apache.flink.batch.connectors.pulsar.PulsarOutputFormat
import org.apache.flink.util.Collector
@@ -37,17 +38,32 @@ object FlinkPulsarBatchSinkScalaExample {
private val EINSTEIN_QUOTE = "Imagination is more important than knowledge. " +
"Knowledge is limited. Imagination encircles the world."
- private val SERVICE_URL = "pulsar://127.0.0.1:6650"
- private val TOPIC_NAME = "my-flink-topic"
def main(args: Array[String]): Unit = {
+ // parse input arguments
+ val parameterTool = ParameterTool.fromArgs(args)
+
+ if (parameterTool.getNumberOfParameters < 2) {
+ println("Missing parameters!")
+ println("Usage: pulsar --service-url <pulsar-service-url> --topic <topic>")
+ return
+ }
+
// set up the execution environment
val env = ExecutionEnvironment.getExecutionEnvironment
+ env.getConfig.setGlobalJobParameters(parameterTool)
+
+ val serviceUrl = parameterTool.getRequired("service-url")
+ val topic = parameterTool.getRequired("topic")
+
+ println("Parameters:")
+ println("\tServiceUrl:\t" + serviceUrl)
+ println("\tTopic:\t" + topic)
// create PulsarOutputFormat instance
val pulsarOutputFormat =
- new PulsarOutputFormat[WordWithCount](SERVICE_URL, TOPIC_NAME, new SerializationSchema[WordWithCount] {
+ new PulsarOutputFormat[WordWithCount](serviceUrl, topic, new SerializationSchema[WordWithCount] {
override def serialize(wordWithCount: WordWithCount): Array[Byte] = wordWithCount.toString.getBytes
})
diff --git a/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/README.md b/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/README.md
deleted file mode 100644
index e206392..0000000
--- a/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/README.md
+++ /dev/null
@@ -1,345 +0,0 @@
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
--->
-
-The Flink Batch Sink for Pulsar is a custom sink that enables Apache [Flink](https://flink.apache.org/) to write [DataSet](https://ci.apache.org/projects/flink/flink-docs-stable/dev/batch/index.html) to Pulsar.
-This document explains how to develop Scala Applications by using Flink Batch Sink.
-# Prerequisites
-
-To use this sink, include a dependency for the `pulsar-flink` library in your Java configuration.
-
-# Maven
-
-If you're using Maven, add this to your `pom.xml`:
-
-```xml
-<!-- in your <properties> block -->
-<pulsar.version>{{pulsar:version}}</pulsar.version>
-
-<!-- in your <dependencies> block -->
-<dependency>
- <groupId>org.apache.pulsar</groupId>
- <artifactId>pulsar-flink</artifactId>
- <version>${pulsar.version}</version>
-</dependency>
-```
-
-# Gradle
-
-If you're using Gradle, add this to your `build.gradle` file:
-
-```groovy
-def pulsarVersion = "{{pulsar:version}}"
-
-dependencies {
- compile group: 'org.apache.pulsar', name: 'pulsar-flink', version: pulsarVersion
-}
-```
-
-# PulsarOutputFormat
-### Usage
-
-Please find Scala sample usage of `PulsarOutputFormat` as follows:
-
-```scala
- /**
- * Data type for words with count.
- */
- case class WordWithCount(word: String, count: Long) {
- override def toString: String = "WordWithCount { word = " + word + ", count = " + count + " }"
- }
-
- /**
- * Implementation
- */
- private val EINSTEIN_QUOTE = "Imagination is more important than knowledge. " +
- "Knowledge is limited. Imagination encircles the world."
- private val SERVICE_URL = "pulsar://127.0.0.1:6650"
- private val TOPIC_NAME = "my-flink-topic"
-
- def main(args: Array[String]): Unit = {
-
- // set up the execution environment
- val env = ExecutionEnvironment.getExecutionEnvironment
-
- // create PulsarOutputFormat instance
- val pulsarOutputFormat =
- new PulsarOutputFormat[WordWithCount](SERVICE_URL, TOPIC_NAME, new SerializationSchema[WordWithCount] {
- override def serialize(wordWithCount: WordWithCount): Array[Byte] = wordWithCount.toString.getBytes
- })
-
- // create DataSet
- val textDS = env.fromElements[String](EINSTEIN_QUOTE)
-
- // convert sentence to words
- textDS.flatMap((value: String, out: Collector[WordWithCount]) => {
- val words = value.toLowerCase.split(" ")
- for (word <- words) {
- out.collect(new WordWithCount(word.replace(".", ""), 1))
- }
- })
-
- // filter words which length is bigger than 4
- .filter((wordWithCount: WordWithCount) => wordWithCount.word.length > 4)
-
- // group the words
- .groupBy((wordWithCount: WordWithCount) => wordWithCount.word)
-
- // sum the word counts
- .reduce((wordWithCount1: WordWithCount, wordWithCount2: WordWithCount) =>
- new WordWithCount(wordWithCount1.word, wordWithCount1.count + wordWithCount2.count))
-
- // write batch data to Pulsar
- .output(pulsarOutputFormat)
-
- // set parallelism to write Pulsar in parallel (optional)
- env.setParallelism(2)
-
- // execute program
- env.execute("Flink - Pulsar Batch WordCount")
- }
-```
-
-### Sample Output
-
-Please find sample output for above application as follows:
-```
-WordWithCount { word = encircles, count = 1 }
-WordWithCount { word = important, count = 1 }
-WordWithCount { word = imagination, count = 2 }
-WordWithCount { word = limited, count = 1 }
-WordWithCount { word = knowledge, count = 2 }
-WordWithCount { word = world, count = 1 }
-```
-
-### Complete Example
-
-You can find a complete example [here](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkScalaExample.scala).
-In this example, Flink DataSet is processed as word-count and being written to Pulsar.
-
-
-# PulsarCsvOutputFormat
-### Usage
-
-Please find Scala sample usage of `PulsarCsvOutputFormat` as follows:
-
-```scala
- /**
- * NasaMission Model
- */
- private case class NasaMission(id: Int, missionName: String, startYear: Int, endYear: Int)
- extends Tuple4(id, missionName, startYear, endYear)
-
- /**
- * Implementation
- */
- private val SERVICE_URL = "pulsar://127.0.0.1:6650"
- private val TOPIC_NAME = "my-flink-topic"
-
- private val nasaMissions = List(
- NasaMission(1, "Mercury program", 1959, 1963),
- NasaMission(2, "Apollo program", 1961, 1972),
- NasaMission(3, "Gemini program", 1963, 1966),
- NasaMission(4, "Skylab", 1973, 1974),
- NasaMission(5, "Apollo–Soyuz Test Project", 1975, 1975))
-
- def main(args: Array[String]): Unit = {
-
- // set up the execution environment
- val env = ExecutionEnvironment.getExecutionEnvironment
-
- // create PulsarCsvOutputFormat instance
- val pulsarCsvOutputFormat =
- new PulsarCsvOutputFormat[NasaMission](SERVICE_URL, TOPIC_NAME)
-
- // create DataSet
- val textDS = env.fromCollection(nasaMissions)
-
- // map nasa mission names to upper-case
- textDS.map(nasaMission => NasaMission(
- nasaMission.id,
- nasaMission.missionName.toUpperCase,
- nasaMission.startYear,
- nasaMission.endYear))
-
- // filter missions which started after 1970
- .filter(_.startYear > 1970)
-
- // write batch data to Pulsar as Csv
- .output(pulsarCsvOutputFormat)
-
- // set parallelism to write Pulsar in parallel (optional)
- env.setParallelism(2)
-
- // execute program
- env.execute("Flink - Pulsar Batch Csv")
- }
-```
-
-### Sample Output
-
-Please find sample output for above application as follows:
-```
-4,SKYLAB,1973,1974
-5,APOLLO–SOYUZ TEST PROJECT,1975,1975
-```
-
-### Complete Example
-
-You can find a complete example [here](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkScalaExample.scala).
-In this example, Flink DataSet is processed and written to Pulsar in Csv format.
-
-
-# PulsarJsonOutputFormat
-### Usage
-
-Please find Scala sample usage of `PulsarJsonOutputFormat` as follows:
-
-```scala
- /**
- * NasaMission Model
- */
- private case class NasaMission(@BeanProperty id: Int,
- @BeanProperty missionName: String,
- @BeanProperty startYear: Int,
- @BeanProperty endYear: Int)
-
- /**
- * Implementation
- */
- private val nasaMissions = List(
- NasaMission(1, "Mercury program", 1959, 1963),
- NasaMission(2, "Apollo program", 1961, 1972),
- NasaMission(3, "Gemini program", 1963, 1966),
- NasaMission(4, "Skylab", 1973, 1974),
- NasaMission(5, "Apollo–Soyuz Test Project", 1975, 1975))
-
- private val SERVICE_URL = "pulsar://127.0.0.1:6650"
- private val TOPIC_NAME = "my-flink-topic"
-
- def main(args: Array[String]): Unit = {
-
- // set up the execution environment
- val env = ExecutionEnvironment.getExecutionEnvironment
-
- // create PulsarJsonOutputFormat instance
- val pulsarJsonOutputFormat = new PulsarJsonOutputFormat[NasaMission](SERVICE_URL, TOPIC_NAME)
-
- // create DataSet
- val nasaMissionDS = env.fromCollection(nasaMissions)
-
- // map nasa mission names to upper-case
- nasaMissionDS.map(nasaMission =>
- NasaMission(
- nasaMission.id,
- nasaMission.missionName.toUpperCase,
- nasaMission.startYear,
- nasaMission.endYear))
-
- // filter missions which started after 1970
- .filter(_.startYear > 1970)
-
- // write batch data to Pulsar
- .output(pulsarJsonOutputFormat)
-
- // set parallelism to write Pulsar in parallel (optional)
- env.setParallelism(2)
-
- // execute program
- env.execute("Flink - Pulsar Batch Json")
- }
-```
-
-**Note:** Property definitions of the model should cover `@BeanProperty` to be visible.
-
-### Sample Output
-
-Please find sample output for above application as follows:
-```
-{"id":4,"missionName":"SKYLAB","startYear":1973,"endYear":1974}
-{"id":5,"missionName":"APOLLO–SOYUZ TEST PROJECT","startYear":1975,"endYear":1975}
-```
-
-### Complete Example
-
-You can find a complete example [here](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkScalaExample.scala).
-In this example, Flink DataSet is processed and written to Pulsar in Json format.
-
-
-# PulsarAvroOutputFormat
-### Usage
-
-Please find Scala sample usage of `PulsarAvroOutputFormat` as follows:
-
-```scala
- val nasaMissions = List(
- NasaMission.newBuilder.setId(1).setName("Mercury program").setStartYear(1959).setEndYear(1963).build,
- NasaMission.newBuilder.setId(2).setName("Apollo program").setStartYear(1961).setEndYear(1972).build,
- NasaMission.newBuilder.setId(3).setName("Gemini program").setStartYear(1963).setEndYear(1966).build,
- NasaMission.newBuilder.setId(4).setName("Skylab").setStartYear(1973).setEndYear(1974).build,
- NasaMission.newBuilder.setId(5).setName("Apollo–Soyuz Test Project").setStartYear(1975).setEndYear(1975).build)
-
- def main(args: Array[String]): Unit = {
-
- // set up the execution environment
- val env = ExecutionEnvironment.getExecutionEnvironment
-
- // create PulsarCsvOutputFormat instance
- val pulsarAvroOutputFormat =
- new PulsarAvroOutputFormat[NasaMission](SERVICE_URL, TOPIC_NAME)
-
- // create DataSet
- val textDS = env.fromCollection(nasaMissions)
-
- // map nasa mission names to upper-case
- textDS.map(nasaMission => new NasaMission(
- nasaMission.getId,
- nasaMission.getName,
- nasaMission.getStartYear,
- nasaMission.getEndYear))
-
- // filter missions which started after 1970
- .filter(_.getStartYear > 1970)
-
- // write batch data to Pulsar as Avro
- .output(pulsarAvroOutputFormat)
-
- // set parallelism to write Pulsar in parallel (optional)
- env.setParallelism(2)
-
- // execute program
- env.execute("Flink - Pulsar Batch Avro")
- }
-```
-
-**Note:** NasaMission class are automatically generated by Avro
-
-### Sample Output
-
-Please find sample output for above application as follows:
-```
- "4,SKYLAB,1973,1974"
- "5,APOLLO–SOYUZ TEST PROJECT,1975,1975"
-```
-
-### Complete Example
-
-You can find a complete example [here](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkScalaExample.scala).
-In this example, Flink DataSet is processed and written to Pulsar in Avro format.
diff --git a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/BasePulsarOutputFormat.java b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/BasePulsarOutputFormat.java
index d5f4af5..ca34327 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/BasePulsarOutputFormat.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/BasePulsarOutputFormat.java
@@ -44,18 +44,18 @@ public abstract class BasePulsarOutputFormat<T> extends RichOutputFormat<T> {
private transient Function<Throwable, MessageId> failureCallback;
private static volatile Producer<byte[]> producer;
- protected static String serviceUrl;
- protected static String topicName;
+ protected final String serviceUrl;
+ protected final String topicName;
protected SerializationSchema<T> serializationSchema;
- protected BasePulsarOutputFormat(String serviceUrl, String topicName) {
+ protected BasePulsarOutputFormat(final String serviceUrl, final String topicName) {
Preconditions.checkArgument(StringUtils.isNotBlank(serviceUrl), "serviceUrl cannot be blank.");
Preconditions.checkArgument(StringUtils.isNotBlank(topicName), "topicName cannot be blank.");
this.serviceUrl = serviceUrl;
this.topicName = topicName;
- LOG.info("PulsarOutputFormat is being started to write batches to Pulsar topic {}", this.topicName);
+ LOG.info("PulsarOutputFormat is being started to write batches to Pulsar topic: {}", this.topicName);
}
@Override
@@ -65,10 +65,10 @@ public abstract class BasePulsarOutputFormat<T> extends RichOutputFormat<T> {
@Override
public void open(int taskNumber, int numTasks) throws IOException {
- this.producer = getProducerInstance();
+ this.producer = getProducerInstance(serviceUrl, topicName);
this.failureCallback = cause -> {
- LOG.error("Error while sending record to Pulsar : " + cause.getMessage(), cause);
+ LOG.error("Error while sending record to Pulsar: " + cause.getMessage(), cause);
return null;
};
}
@@ -85,11 +85,11 @@ public abstract class BasePulsarOutputFormat<T> extends RichOutputFormat<T> {
}
- private static Producer<byte[]> getProducerInstance() throws PulsarClientException {
+ private static Producer<byte[]> getProducerInstance(String serviceUrl, String topicName) throws PulsarClientException {
if(producer == null){
synchronized (PulsarOutputFormat.class) {
if(producer == null){
- producer = Preconditions.checkNotNull(createPulsarProducer(),
+ producer = Preconditions.checkNotNull(createPulsarProducer(serviceUrl, topicName),
"Pulsar producer cannot be null.");
}
}
@@ -97,7 +97,7 @@ public abstract class BasePulsarOutputFormat<T> extends RichOutputFormat<T> {
return producer;
}
- private static Producer<byte[]> createPulsarProducer() throws PulsarClientException {
+ private static Producer<byte[]> createPulsarProducer(String serviceUrl, String topicName) throws PulsarClientException {
try {
PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();
return client.newProducer().topic(topicName).create();
diff --git a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java
index e532bfd..889970f 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java
@@ -28,9 +28,9 @@ public class PulsarOutputFormat<T> extends BasePulsarOutputFormat<T> {
private static final long serialVersionUID = 2997027580167793000L;
- public PulsarOutputFormat(String serviceUrl, String topicName, SerializationSchema<T> serializationSchema) {
+ public PulsarOutputFormat(String serviceUrl, String topicName, final SerializationSchema<T> serializationSchema) {
super(serviceUrl, topicName);
- Preconditions.checkNotNull(serializationSchema, "serializationSchema cannot be null.");
+ Preconditions.checkNotNull(serializationSchema, "serializationSchema cannot be null.");
this.serializationSchema = serializationSchema;
}