You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/12/31 18:31:43 UTC
[pulsar] branch master updated: [flink-consumer-source] fix flink
streaming connector examples to be consistent with batch examples (#3265)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ab39aeb [flink-consumer-source] fix flink streaming connector examples to be consistent with batch examples (#3265)
ab39aeb is described below
commit ab39aeb05365a1ee128e66c4c9e95922fd316680
Author: wpl <12...@qq.com>
AuthorDate: Tue Jan 1 02:31:38 2019 +0800
[flink-consumer-source] fix flink streaming connector examples to be consistent with batch examples (#3265)
fix flink streaming connector examples to be consistent with batch examples
---
.../example/FlinkPulsarBatchAvroSinkExample.java | 2 +-
.../batch/connectors/pulsar/example/README.md | 10 +-
...lsarConsumerSourceWordCountToAvroTableSink.java | 19 ++--
...lsarConsumerSourceWordCountToJsonTableSink.java | 19 ++--
.../streaming/connectors/pulsar/example/README.md | 124 ++++++++++++++++++++-
.../connectors/pulsar/PulsarAvroTableSink.java | 5 +-
6 files changed, 149 insertions(+), 30 deletions(-)
diff --git a/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java b/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java
index 584d59f..1349dba 100644
--- a/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java
+++ b/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java
@@ -84,5 +84,5 @@ public class FlinkPulsarBatchAvroSinkExample {
// execute program
env.execute("Flink - Pulsar Batch Avro");
}
-
+
}
diff --git a/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/README.md b/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/README.md
index 84c1bd8..29adc34 100644
--- a/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/README.md
+++ b/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/README.md
@@ -211,6 +211,10 @@ $ bin/pulsar-client consume -n 0 -s test test_flink_topic
6. Please find sample output for above linked application as follows:
```
- "4,SKYLAB,1973,1974"
- "5,APOLLO–SOYUZ TEST PROJECT,1975,1975"
-```
+ ----- got message -----
+
+ Skylab��
+ ----- got message -----
+
+ 6Apollo–Soyuz Test Project��
+```
\ No newline at end of file
diff --git a/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToAvroTableSink.java b/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToAvroTableSink.java
index 7b78da5..4563b56 100644
--- a/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToAvroTableSink.java
+++ b/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToAvroTableSink.java
@@ -42,16 +42,17 @@ import org.apache.pulsar.client.api.ProducerConfiguration;
*
* <p>Example usage:
* --service-url pulsar://localhost:6650 --input-topic test_topic --subscription test_sub
+ * or
+ * --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub --output-topic test_sub
*/
public class PulsarConsumerSourceWordCountToAvroTableSink {
- private static final String SERVICE_URL = "pulsar://localhost:6650";
private static final String ROUTING_KEY = "word";
public static void main(String[] args) throws Exception {
// parse input arguments
final ParameterTool parameterTool = ParameterTool.fromArgs(args);
- if (parameterTool.getNumberOfParameters() < 2) {
+ if (parameterTool.getNumberOfParameters() < 3) {
System.out.println("Missing parameters!");
System.out.println("Usage: pulsar --service-url <pulsar-service-url> --input-topic <topic> --subscription <sub> --output-topic <topic>");
return;
@@ -96,23 +97,23 @@ public class PulsarConsumerSourceWordCountToAvroTableSink {
}
})
.returns(WordWithCount.class)
- .keyBy("word")
+ .keyBy(ROUTING_KEY)
.timeWindow(Time.seconds(5))
.reduce((ReduceFunction<WordWithCount>) (c1, c2) ->
WordWithCount.newBuilder().setWord(c1.getWord()).setCount(c1.getCount() + c2.getCount()).build()
);
tableEnvironment.registerDataStream("wc",wc);
-
- Table table = tableEnvironment.sqlQuery("select * from wc");
+ Table table = tableEnvironment.sqlQuery("select word, `count` from wc");
+ table.printSchema();
+ TableSink sink = null;
if (null != outputTopic) {
- PulsarAvroTableSink sink = new PulsarAvroTableSink(SERVICE_URL, outputTopic, new ProducerConfiguration(), ROUTING_KEY,WordWithCount.class);
- table.writeToSink(sink);
+ sink = new PulsarAvroTableSink(serviceUrl, outputTopic, new ProducerConfiguration(), ROUTING_KEY, WordWithCount.class);
} else {
- TableSink sink = new CsvTableSink("./examples/file", "|");
// print the results with a csv file
- table.writeToSink(sink);
+ sink = new CsvTableSink("./examples/file", "|");
}
+ table.writeToSink(sink);
env.execute("Pulsar Stream WordCount");
}
diff --git a/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToJsonTableSink.java b/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToJsonTableSink.java
index 95b2536..de09146 100644
--- a/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToJsonTableSink.java
+++ b/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToJsonTableSink.java
@@ -44,16 +44,17 @@ import org.apache.pulsar.client.api.ProducerConfiguration;
*
* <p>Example usage:
* --service-url pulsar://localhost:6650 --input-topic test_topic --subscription test_sub
+ * or
+ * --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub --output-topic test_sub
*/
public class PulsarConsumerSourceWordCountToJsonTableSink {
- private static final String SERVICE_URL = "pulsar://localhost:6650";
private static final String ROUTING_KEY = "word";
public static void main(String[] args) throws Exception {
// parse input arguments
final ParameterTool parameterTool = ParameterTool.fromArgs(args);
- if (parameterTool.getNumberOfParameters() < 2) {
+ if (parameterTool.getNumberOfParameters() < 3) {
System.out.println("Missing parameters!");
System.out.println("Usage: pulsar --service-url <pulsar-service-url> --input-topic <topic> --subscription <sub> --output-topic <topic>");
return;
@@ -98,22 +99,22 @@ public class PulsarConsumerSourceWordCountToJsonTableSink {
}
})
.returns(WordWithCount.class)
- .keyBy("word")
+ .keyBy(ROUTING_KEY)
.timeWindow(Time.seconds(5))
.reduce((ReduceFunction<WordWithCount>) (c1, c2) ->
new WordWithCount(c1.word, c1.count + c2.count));
tableEnvironment.registerDataStream("wc",wc);
-
- Table table = tableEnvironment.sqlQuery("select * from wc");
+ Table table = tableEnvironment.sqlQuery("select word, `count` from wc");
+ table.printSchema();
+ TableSink sink = null;
if (null != outputTopic) {
- PulsarJsonTableSink sink = new PulsarJsonTableSink(SERVICE_URL, outputTopic, new ProducerConfiguration(), ROUTING_KEY);
- table.writeToSink(sink);
+ sink = new PulsarJsonTableSink(serviceUrl, outputTopic, new ProducerConfiguration(), ROUTING_KEY);
} else {
- TableSink sink = new CsvTableSink("./examples/file", "|");
// print the results with a csv file
- table.writeToSink(sink);
+ sink = new CsvTableSink("./examples/file", "|");
}
+ table.writeToSink(sink);
env.execute("Pulsar Stream WordCount");
}
diff --git a/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/README.md b/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/README.md
index 7c8c176..ac36eb5 100644
--- a/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/README.md
+++ b/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/README.md
@@ -61,7 +61,7 @@ The steps to run the example:
4. Run the word count example to print results to stdout.
```shell
- $ ./bin/flink run ${PULSAR_HOME}/examples/flink/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub
+ $ ./bin/flink run -c org.apache.flink.streaming.connectors.pulsar.example.PulsarConsumerSourceWordCount ${PULSAR_HOME}/examples/flink/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub
```
5. Produce messages to topic `test_src`.
@@ -73,19 +73,16 @@ The steps to run the example:
6. You can check the flink taskexecutor `.out` file. The `.out` file will print the counts at the end of each time window as long as words are floating in, e.g.:
```shell
- PulsarConsumerSourceWordCount.WordWithCount(word=hello, count=200)
- PulsarConsumerSourceWordCount.WordWithCount(word=again, count=200)
- PulsarConsumerSourceWordCount.WordWithCount(word=test, count=200)
- PulsarConsumerSourceWordCount.WordWithCount(word=world, count=200)
PulsarConsumerSourceWordCount.WordWithCount(word=hello, count=100)
PulsarConsumerSourceWordCount.WordWithCount(word=again, count=100)
PulsarConsumerSourceWordCount.WordWithCount(word=test, count=100)
+ PulsarConsumerSourceWordCount.WordWithCount(word=world, count=100)
```
Alternatively, when you run the flink word count example at step 4, you can choose dump the result to another pulsar topic.
```shell
-$ ./bin/flink run ${PULSAR_HOME}/examples/flink/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub --output-topic test_dest
+$ ./bin/flink run -c org.apache.flink.streaming.connectors.pulsar.example.PulsarConsumerSourceWordCount ${PULSAR_HOME}/examples/flink/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub --output-topic test_dest
```
Once the flink word count example is running, you can use `bin/pulsar-client` to tail the results produced into topic `test_dest`.
@@ -95,3 +92,118 @@ $ bin/pulsar-client consume -n 0 -s test test_dest
```
You will see similar results as what you see at step 6 when running the word count example to print results to stdout.
+
+
+### PulsarConsumerSourceWordCountToAvroTableSink
+
+This Flink streaming job is consuming from a Pulsar topic and counting the wordcount in a streaming fashion. The job can write the word count results
+to csv file or another Pulsar topic for avro format.
+
+The steps to run the example:
+
+Step 1, 2 and 3 are same as above.
+
+4. Run the word count example to print results to stdout.
+
+ ```shell
+ $ ./bin/flink run -c org.apache.flink.streaming.connectors.pulsar.example.PulsarConsumerSourceWordCountToAvroTableSink ${PULSAR_HOME}/examples/flink/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub
+ ```
+
+5. Produce messages to topic `test_src`.
+
+ ```shell
+ $ bin/pulsar-client produce -m "hello world again" -n 100 test_src
+ ```
+
+6. You can check the ${FLINK_HOME}/examples/file. The file contains the counts at the end of each time window as long as words are floating in, e.g.:
+
+ ```file
+ hello|100
+ again|100
+ test|100
+ world|100
+ ```
+
+Alternatively, when you run the flink word count example at step 4, you can choose dump the result to another pulsar topic.
+
+```shell
+$ ./bin/flink run -c org.apache.flink.streaming.connectors.pulsar.example.PulsarConsumerSourceWordCountToAvroTableSink ${PULSAR_HOME}/examples/flink/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub --output-topic test_dest
+```
+
+Once the flink word count example is running, you can use `bin/pulsar-client` to tail the results produced into topic `test_dest`.
+
+```shell
+$ bin/pulsar-client consume -n 0 -s test test_dest
+```
+
+You will see sample output for above linked application as follows:.
+```
+----- got message -----
+
+hello�
+----- got message -----
+
+again�
+----- got message -----
+test�
+----- got message -----
+
+world�
+
+```
+
+### PulsarConsumerSourceWordCountToJsonTableSink
+
+This Flink streaming job is consuming from a Pulsar topic and counting the wordcount in a streaming fashion. The job can write the word count results
+to csv file or another Pulsar topic for json format.
+
+The steps to run the example:
+
+Step 1, 2 and 3 are same as above.
+
+4. Run the word count example to print results to stdout.
+
+ ```shell
+ $ ./bin/flink run -c org.apache.flink.streaming.connectors.pulsar.example.PulsarConsumerSourceWordCountToJsonTableSink ${PULSAR_HOME}/examples/flink/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub
+ ```
+
+If java.lang.ClassNotFoundException: org.apache.flink.table.sinks.TableSink and java.lang.NoClassDefFoundError: org/apache/flink/formats/json/JsonRowSerializationSchema, you need build Apache Flink from source, then copy flink-table_{version}.jar, flink-json_{version}.jar to ${FLINK_HOME}/lib and restart flink cluster.
+
+5. Produce messages to topic `test_src`.
+
+ ```shell
+ $ bin/pulsar-client produce -m "hello world again" -n 100 test_src
+ ```
+
+6. You can check the ${FLINK_HOME}/examples/file. The file contains the counts at the end of each time window as long as words are floating in, e.g.:
+
+ ```file
+ hello|100
+ again|100
+ test|100
+ world|100
+ ```
+
+Alternatively, when you run the flink word count example at step 4, you can choose dump the result to another pulsar topic.
+
+```shell
+$ ./bin/flink run -c org.apache.flink.streaming.connectors.pulsar.example.PulsarConsumerSourceWordCountToJsonTableSink ${PULSAR_HOME}/examples/flink/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub --output-topic test_dest
+```
+
+Once the flink word count example is running, you can use `bin/pulsar-client` to tail the results produced into topic `test_dest`.
+
+```shell
+$ bin/pulsar-client consume -n 0 -s test test_dest
+```
+
+You will see sample output for above linked application as follows:.
+```
+----- got message -----
+{"word":"hello","count":100}
+----- got message -----
+{"word":"again","count":100}
+----- got message -----
+{"word":"test","count":100}
+----- got message -----
+{"word":"world","count":100}
+```
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSink.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSink.java
index 9187a0d..a7a4412 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSink.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSink.java
@@ -99,7 +99,8 @@ public class PulsarAvroTableSink implements AppendStreamTableSink<Row> {
@Override
public TypeInformation<Row> getOutputType() {
- return new RowTypeInfo(fieldTypes, fieldNames);
+ RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes, fieldNames);
+ return rowTypeInfo;
}
@Override
@@ -162,7 +163,7 @@ public class PulsarAvroTableSink implements AppendStreamTableSink<Row> {
@Override
public String getKey(Row event) {
- return (String) event.getField(keyIndex);
+ return event.getField(keyIndex).toString();
}
}