You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/12/31 18:31:43 UTC

[pulsar] branch master updated: [flink-consumer-source] fix flink streaming connector examples to be consistent with batch examples (#3265)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ab39aeb   [flink-consumer-source] fix flink streaming connector examples to be consistent with batch examples (#3265)
ab39aeb is described below

commit ab39aeb05365a1ee128e66c4c9e95922fd316680
Author: wpl <12...@qq.com>
AuthorDate: Tue Jan 1 02:31:38 2019 +0800

     [flink-consumer-source] fix flink streaming connector examples to be consistent with batch examples (#3265)
    
    fix flink streaming connector examples to be consistent with batch examples
---
 .../example/FlinkPulsarBatchAvroSinkExample.java   |   2 +-
 .../batch/connectors/pulsar/example/README.md      |  10 +-
 ...lsarConsumerSourceWordCountToAvroTableSink.java |  19 ++--
 ...lsarConsumerSourceWordCountToJsonTableSink.java |  19 ++--
 .../streaming/connectors/pulsar/example/README.md  | 124 ++++++++++++++++++++-
 .../connectors/pulsar/PulsarAvroTableSink.java     |   5 +-
 6 files changed, 149 insertions(+), 30 deletions(-)

diff --git a/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java b/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java
index 584d59f..1349dba 100644
--- a/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java
+++ b/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java
@@ -84,5 +84,5 @@ public class FlinkPulsarBatchAvroSinkExample {
         // execute program
         env.execute("Flink - Pulsar Batch Avro");
     }
-
+  
 }
diff --git a/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/README.md b/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/README.md
index 84c1bd8..29adc34 100644
--- a/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/README.md
+++ b/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/README.md
@@ -211,6 +211,10 @@ $ bin/pulsar-client consume -n 0 -s test test_flink_topic
 
 6. Please find sample output for above linked application as follows:
 ```
- "4,SKYLAB,1973,1974"
- "5,APOLLO–SOYUZ TEST PROJECT,1975,1975"
-```
+ ----- got message -----
+ 
+ Skylab��
+ ----- got message -----
+ 
+ 6Apollo–Soyuz Test Project��
+```
\ No newline at end of file
diff --git a/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToAvroTableSink.java b/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToAvroTableSink.java
index 7b78da5..4563b56 100644
--- a/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToAvroTableSink.java
+++ b/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToAvroTableSink.java
@@ -42,16 +42,17 @@ import org.apache.pulsar.client.api.ProducerConfiguration;
  *
  * <p>Example usage:
  *   --service-url pulsar://localhost:6650 --input-topic test_topic --subscription test_sub
+ * or
+ *   --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub --output-topic test_sub
  */
 public class PulsarConsumerSourceWordCountToAvroTableSink {
-    private static final String SERVICE_URL = "pulsar://localhost:6650";
     private static final String ROUTING_KEY = "word";
 
     public static void main(String[] args) throws Exception {
         // parse input arguments
         final ParameterTool parameterTool = ParameterTool.fromArgs(args);
 
-        if (parameterTool.getNumberOfParameters() < 2) {
+        if (parameterTool.getNumberOfParameters() < 3) {
             System.out.println("Missing parameters!");
             System.out.println("Usage: pulsar --service-url <pulsar-service-url> --input-topic <topic> --subscription <sub> --output-topic <topic>");
             return;
@@ -96,23 +97,23 @@ public class PulsarConsumerSourceWordCountToAvroTableSink {
                     }
                 })
                 .returns(WordWithCount.class)
-                .keyBy("word")
+                .keyBy(ROUTING_KEY)
                 .timeWindow(Time.seconds(5))
                 .reduce((ReduceFunction<WordWithCount>) (c1, c2) ->
                         WordWithCount.newBuilder().setWord(c1.getWord()).setCount(c1.getCount() + c2.getCount()).build()
                 );
 
         tableEnvironment.registerDataStream("wc",wc);
-
-        Table table = tableEnvironment.sqlQuery("select * from wc");
+        Table table = tableEnvironment.sqlQuery("select word, `count` from wc");
+        table.printSchema();
+        TableSink sink = null;
         if (null != outputTopic) {
-            PulsarAvroTableSink sink = new PulsarAvroTableSink(SERVICE_URL, outputTopic, new ProducerConfiguration(), ROUTING_KEY,WordWithCount.class);
-            table.writeToSink(sink);
+            sink = new PulsarAvroTableSink(serviceUrl, outputTopic, new ProducerConfiguration(), ROUTING_KEY, WordWithCount.class);
         } else {
-            TableSink sink = new CsvTableSink("./examples/file",  "|");
             // print the results with a csv file
-            table.writeToSink(sink);
+            sink = new CsvTableSink("./examples/file",  "|");
         }
+        table.writeToSink(sink);
 
         env.execute("Pulsar Stream WordCount");
     }
diff --git a/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToJsonTableSink.java b/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToJsonTableSink.java
index 95b2536..de09146 100644
--- a/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToJsonTableSink.java
+++ b/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToJsonTableSink.java
@@ -44,16 +44,17 @@ import org.apache.pulsar.client.api.ProducerConfiguration;
  *
  * <p>Example usage:
  *   --service-url pulsar://localhost:6650 --input-topic test_topic --subscription test_sub
+ * or
+ *   --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub --output-topic test_sub
  */
 public class PulsarConsumerSourceWordCountToJsonTableSink {
-    private static final String SERVICE_URL = "pulsar://localhost:6650";
     private static final String ROUTING_KEY = "word";
 
     public static void main(String[] args) throws Exception {
         // parse input arguments
         final ParameterTool parameterTool = ParameterTool.fromArgs(args);
 
-        if (parameterTool.getNumberOfParameters() < 2) {
+        if (parameterTool.getNumberOfParameters() < 3) {
             System.out.println("Missing parameters!");
             System.out.println("Usage: pulsar --service-url <pulsar-service-url> --input-topic <topic> --subscription <sub> --output-topic <topic>");
             return;
@@ -98,22 +99,22 @@ public class PulsarConsumerSourceWordCountToJsonTableSink {
                     }
                 })
                 .returns(WordWithCount.class)
-                .keyBy("word")
+                .keyBy(ROUTING_KEY)
                 .timeWindow(Time.seconds(5))
                 .reduce((ReduceFunction<WordWithCount>) (c1, c2) ->
                         new WordWithCount(c1.word, c1.count + c2.count));
 
         tableEnvironment.registerDataStream("wc",wc);
-
-        Table table = tableEnvironment.sqlQuery("select * from wc");
+        Table table = tableEnvironment.sqlQuery("select word, `count` from wc");
+        table.printSchema();
+        TableSink sink = null;
         if (null != outputTopic) {
-            PulsarJsonTableSink sink = new PulsarJsonTableSink(SERVICE_URL, outputTopic, new ProducerConfiguration(), ROUTING_KEY);
-            table.writeToSink(sink);
+            sink = new PulsarJsonTableSink(serviceUrl, outputTopic, new ProducerConfiguration(), ROUTING_KEY);
         } else {
-            TableSink sink = new CsvTableSink("./examples/file",  "|");
             // print the results with a csv file
-            table.writeToSink(sink);
+            sink = new CsvTableSink("./examples/file",  "|");
         }
+        table.writeToSink(sink);
 
         env.execute("Pulsar Stream WordCount");
     }
diff --git a/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/README.md b/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/README.md
index 7c8c176..ac36eb5 100644
--- a/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/README.md
+++ b/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/README.md
@@ -61,7 +61,7 @@ The steps to run the example:
 4. Run the word count example to print results to stdout.
 
     ```shell
-    $ ./bin/flink run  ${PULSAR_HOME}/examples/flink/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub
+    $ ./bin/flink run -c org.apache.flink.streaming.connectors.pulsar.example.PulsarConsumerSourceWordCount ${PULSAR_HOME}/examples/flink/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub
     ```
 
 5. Produce messages to topic `test_src`.
@@ -73,19 +73,16 @@ The steps to run the example:
 6. You can check the flink taskexecutor `.out` file. The `.out` file will print the counts at the end of each time window as long as words are floating in, e.g.:
 
     ```shell
-    PulsarConsumerSourceWordCount.WordWithCount(word=hello, count=200)
-    PulsarConsumerSourceWordCount.WordWithCount(word=again, count=200)
-    PulsarConsumerSourceWordCount.WordWithCount(word=test, count=200)
-    PulsarConsumerSourceWordCount.WordWithCount(word=world, count=200)
     PulsarConsumerSourceWordCount.WordWithCount(word=hello, count=100)
     PulsarConsumerSourceWordCount.WordWithCount(word=again, count=100)
     PulsarConsumerSourceWordCount.WordWithCount(word=test, count=100)
+    PulsarConsumerSourceWordCount.WordWithCount(word=world, count=100)  
     ```
 
 Alternatively, when you run the flink word count example at step 4, you can choose dump the result to another pulsar topic.
 
 ```shell
-$ ./bin/flink run  ${PULSAR_HOME}/examples/flink/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub --output-topic test_dest
+$ ./bin/flink run -c org.apache.flink.streaming.connectors.pulsar.example.PulsarConsumerSourceWordCount ${PULSAR_HOME}/examples/flink/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub --output-topic test_dest
 ```
 
 Once the flink word count example is running, you can use `bin/pulsar-client` to tail the results produced into topic `test_dest`.
@@ -95,3 +92,118 @@ $ bin/pulsar-client consume -n 0 -s test test_dest
 ```
 
 You will see similar results as what you see at step 6 when running the word count example to print results to stdout.
+
+
+### PulsarConsumerSourceWordCountToAvroTableSink
+
+This Flink streaming job is consuming from a Pulsar topic and counting the wordcount in a streaming fashion. The job can write the word count results
+to csv file or another Pulsar topic for avro format.
+
+The steps to run the example:
+
+Step 1, 2 and 3 are same as above.
+
+4. Run the word count example to print results to stdout.
+
+    ```shell
+    $ ./bin/flink run -c org.apache.flink.streaming.connectors.pulsar.example.PulsarConsumerSourceWordCountToAvroTableSink ${PULSAR_HOME}/examples/flink/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub
+    ```
+
+5. Produce messages to topic `test_src`.
+
+    ```shell
+    $ bin/pulsar-client produce -m "hello world again" -n 100 test_src
+    ```
+
+6. You can check the ${FLINK_HOME}/examples/file. The file contains the counts at the end of each time window as long as words are floating in, e.g.:
+
+    ```file
+    hello|100
+    again|100
+    test|100
+    world|100
+    ```
+
+Alternatively, when you run the flink word count example at step 4, you can choose dump the result to another pulsar topic.
+
+```shell
+$ ./bin/flink run -c org.apache.flink.streaming.connectors.pulsar.example.PulsarConsumerSourceWordCountToAvroTableSink ${PULSAR_HOME}/examples/flink/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub --output-topic test_dest
+```
+
+Once the flink word count example is running, you can use `bin/pulsar-client` to tail the results produced into topic `test_dest`.
+
+```shell
+$ bin/pulsar-client consume -n 0 -s test test_dest
+```
+
+You will see sample output for above linked application as follows:.
+```
+----- got message -----
+
+hello�
+----- got message -----
+
+again�
+----- got message -----
+test�
+----- got message -----
+
+world�
+
+```
+
+### PulsarConsumerSourceWordCountToJsonTableSink
+
+This Flink streaming job is consuming from a Pulsar topic and counting the wordcount in a streaming fashion. The job can write the word count results
+to csv file or another Pulsar topic for json format.
+
+The steps to run the example:
+
+Step 1, 2 and 3 are same as above.
+
+4. Run the word count example to print results to stdout.
+
+    ```shell
+    $ ./bin/flink run -c org.apache.flink.streaming.connectors.pulsar.example.PulsarConsumerSourceWordCountToJsonTableSink ${PULSAR_HOME}/examples/flink/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub
+    ```
+
+If java.lang.ClassNotFoundException: org.apache.flink.table.sinks.TableSink and java.lang.NoClassDefFoundError: org/apache/flink/formats/json/JsonRowSerializationSchema, you need build Apache Flink from source, then copy flink-table_{version}.jar, flink-json_{version}.jar to ${FLINK_HOME}/lib and restart flink cluster. 
+
+5. Produce messages to topic `test_src`.
+
+    ```shell
+    $ bin/pulsar-client produce -m "hello world again" -n 100 test_src
+    ```
+
+6. You can check the ${FLINK_HOME}/examples/file. The file contains the counts at the end of each time window as long as words are floating in, e.g.:
+
+    ```file
+    hello|100
+    again|100
+    test|100
+    world|100
+    ```
+
+Alternatively, when you run the flink word count example at step 4, you can choose dump the result to another pulsar topic.
+
+```shell
+$ ./bin/flink run -c org.apache.flink.streaming.connectors.pulsar.example.PulsarConsumerSourceWordCountToJsonTableSink ${PULSAR_HOME}/examples/flink/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub --output-topic test_dest
+```
+
+Once the flink word count example is running, you can use `bin/pulsar-client` to tail the results produced into topic `test_dest`.
+
+```shell
+$ bin/pulsar-client consume -n 0 -s test test_dest
+```
+
+You will see sample output for above linked application as follows:.
+```
+----- got message -----
+{"word":"hello","count":100}
+----- got message -----
+{"word":"again","count":100}
+----- got message -----
+{"word":"test","count":100}
+----- got message -----
+{"word":"world","count":100}
+```
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSink.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSink.java
index 9187a0d..a7a4412 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSink.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSink.java
@@ -99,7 +99,8 @@ public class PulsarAvroTableSink implements AppendStreamTableSink<Row> {
 
     @Override
     public TypeInformation<Row> getOutputType() {
-        return new RowTypeInfo(fieldTypes, fieldNames);
+        RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes, fieldNames);
+        return rowTypeInfo;
     }
 
     @Override
@@ -162,7 +163,7 @@ public class PulsarAvroTableSink implements AppendStreamTableSink<Row> {
 
         @Override
         public String getKey(Row event) {
-            return (String) event.getField(keyIndex);
+            return event.getField(keyIndex).toString();
         }
     }