You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/05/05 13:42:57 UTC

[GitHub] [flink-web] wuchong commented on a change in pull request #335: Add Blog: "Flink SQL Demo: Building an End to End Streaming Application"

wuchong commented on a change in pull request #335:
URL: https://github.com/apache/flink-web/pull/335#discussion_r420110973



##########
File path: _posts/2020-05-03-flink-sql-demo-building-e2e-streaming-application.md
##########
@@ -0,0 +1,338 @@
+---
+layout: post
+title: "Flink SQL Demo: Building an End-to-End Streaming Application"
+date: 2020-05-03T12:00:00.000Z
+categories: news
+authors:
+- jark:
+  name: "Jark Wu"
+  twitter: "JarkWu"
+excerpt: Apache Flink 1.10 has released many exciting new features, including many developments in Flink SQL which is evolving at a fast pace. This article takes a closer look at how to quickly build streaming applications with Flink SQL from a practical point of view.
+---
+
+Apache Flink 1.10.0 has released many exciting new features, including many developments in Flink SQL which is evolving at a fast pace. This article takes a closer look at how to quickly build streaming applications with Flink SQL from a practical point of view.
+
+In the following sections, we describe how to integrate Kafka, MySQL, Elasticsearch, and Kibana with Flink SQL to analyze ecommerce user behavior in real-time. All exercises in this article are performed in the Flink SQL CLI, while the entire process uses plain SQL text, without a single line of Java or Scala code or IDE installation. The final result of this demo is shown in the following figure:
+
+<center>
+<img src="{{ site.baseurl }}/img/blog/2020-05-03-flink-sql-demo/image1.png" width="800px" alt="Demo Overview"/>
+</center>
+<br>
+
+# Preparation
+
+Prepare a Linux or MacOS computer with Docker and Java 8 installed. A Java environment is required because we will install and run Flink cluster in the host environment, not in a Docker container.
+
+## Use Docker Compose to Start Demo Environment
+
+The components required in this demo (except for Flink) are all managed in containers, so we will use `docker-compose` to start them. First, download the `docker-compose.yml` file that defines the demo environment, for example by running the following commands:
+
+```
+mkdir flink-demo; cd flink-demo;
+wget https://raw.githubusercontent.com/wuchong/flink-sql-demo/master/docker-compose.yml
+```
+
+The Docker Compose environment consists of the following containers:
+
+- **MySQL:** MySQL 5.7 and a `category` table in the database. The `category` table will be joined with data in Kafka to enrich the real-time data.
+- **Kafka:** It is mainly used as a data source. The DataGen component automatically writes data into a Kafka topic.
+- **Zookeeper:** This component is required by Kafka.
+- **Elasticsearch:** It is mainly used as a data sink.
+- **Kibana:** It's used to visualize the data in Elasticsearch.
+- **DataGen:** It is the data generator. After the container is started, user behavior data is automatically generated and sent to the Kafka topic. By default, 2000 data entries are generated each second for about 1.5 hours. You can modify datagen's `speedup` parameter in `docker-compose.yml` to adjust the generation rate (which takes effect after docker compose is restarted).
+
+**Important:** Before starting the containers, we recommend configuring Docker so that sufficient resources are available and the environment does not become unresponsive. We suggest running Docker at 3-4 GB memory and 3-4 CPU cores.
+
+To start all containers, run the following command in the directory that contains the `docker-compose.yml` file.
+
+```
+docker-compose up -d
+```
+
+This command automatically starts all the containers defined in the Docker Compose configuration in a detached mode. Run `docker ps` to check whether the five containers are running properly. You can also visit [http://localhost:5601/](http://localhost:5601/) to see if Kibana is running normally.
+
+Don’t forget to run the following command to stop all containers after you finished the tutorial:
+
+```
+docker-compose down
+```
+
+## Download and Install Flink Cluster
+
+We recommend to manually download and install Flink on your host system, instead of starting Flink through Docker because you’ll get a more intuitive understanding of the components, dependencies, and scripts of Flink.
+
+1. Download and decompress [Apache Flink 1.10.0](https://www.apache.org/dist/flink/flink-1.10.0/flink-1.10.0-bin-scala_2.11.tgz) into the `flink-1.10.0` directory:
+2. Go to the `flink-1.10.0` directory by running `cd flink-1.10.0`.
+3. Run the following command to download the JAR dependency package and copy it to the `lib/` directory.
+
+    ```
+wget -P ./lib/ https://repo1.maven.org/maven2/org/apache/flink/flink-json/1.10.0/flink-json-1.10.0.jar | \
+    wget -P ./lib/ https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.10.0/flink-sql-connector-kafka_2.11-1.10.0.jar | \
+    wget -P ./lib/ https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch6_2.11/1.10.0/flink-sql-connector-elasticsearch6_2.11-1.10.0.jar | \
+    wget -P ./lib/ https://repo1.maven.org/maven2/org/apache/flink/flink-jdbc_2.11/1.10.0/flink-jdbc_2.11-1.10.0.jar | \
+    wget -P ./lib/ https://repo1.maven.org/maven2/mysql/mysql-connector-java/5.1.48/mysql-connector-java-5.1.48.jar
+```
+
+4. In `conf/flink-conf.yaml`, set `taskmanager.numberOfTaskSlots` to `10`, since during this demo we will be launching multiple jobs.
+5. Run `./bin/start-cluster.sh` to start the cluster. Check if Flink is up by accessing the Flink Web UI at [http://localhost:8081](http://localhost:8081). The number of available slots should be 10.
+<center>
+<img src="{{ site.baseurl }}/img/blog/2020-05-03-flink-sql-demo/image2.png" width="800px" alt="Demo Overview"/>
+</center>
+<br>
+6. Run `bin/sql-client.sh embedded` to start the SQL CLI. You will see the following squirrel welcome page.
+
+<center>
+<img src="{{ site.baseurl }}/img/blog/2020-05-03-flink-sql-demo/image3.png" width="800px" alt="Flink SQL CLI welcome page"/>
+</center>
+<br>
+
+# Create a Kafka table using DDL
+
+The Datagen container continuously writes events into the Kafka `user_behavior` topic. This data contains the user behavior on the day of November 27, 2017 (behaviors include “click”, “like”, “purchase” and “add to shopping cart” events). Each row represents a user behavior event, with the user ID, product ID, product category ID, event type, and timestamp in JSON format. Note that the dataset is from the [Alibaba Cloud Tianchi public dataset](https://tianchi.aliyun.com/dataset/dataDetail?dataId=649).
+
+In the directory that contains `docker-compose.yml`, run the following command to view the first 10 data entries generated in the Kafka topic:
+
+```
+docker-compose exec kafka bash -c 'kafka-console-consumer.sh --topic user_behavior --bootstrap-server kafka:9094 --from-beginning --max-messages 10'
+
+{"user_id": "952483", "item_id":"310884", "category_id": "4580532", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"}
+{"user_id": "794777", "item_id":"5119439", "category_id": "982926", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"}
+...
+```
+
+In order to make the events in the Kafka topic accessible to Flink SQL, we run the following DDL statement to create a table that connects to the topic in the Kafka cluster:
+
+```sql
+CREATE TABLE user_behavior (
+    user_id BIGINT,
+    item_id BIGINT,
+    category_id BIGINT,
+    behavior STRING,
+    ts TIMESTAMP(3),
+    proctime AS PROCTIME(),   -- generates processing-time attribute using computed column
+    WATERMARK FOR ts AS ts - INTERVAL '5' SECOND  -- defines watermark on ts column, marks ts as event-time attribute
+) WITH (
+    'connector.type' = 'kafka',  -- using kafka connector
+    'connector.version' = 'universal',  -- kafka version, universal supports Kafka 0.11+
+    'connector.topic' = 'user_behavior',  -- kafka topic
+    'connector.startup-mode' = 'earliest-offset',  -- reading from the beginning
+    'connector.properties.zookeeper.connect' = 'localhost:2181',  -- zookeeper address
+    'connector.properties.bootstrap.servers' = 'localhost:9092',  -- kafka broker address
+    'format.type' = 'json'  -- the data format is json
+);
+```
+
+The above snippet declares five fields based on the data format. In addition, it uses the computed column syntax and built-in `PROCTIME()` function to declare a virtual column that generates the processing-time attribute. It also uses the WATERMARK syntax to declare the watermark strategy on the `ts` field (tolerate 5-seconds out-of-order). Therefore, the `ts` field becomes an event-time attribute. For more information about time attributes and DDL syntax, see the following official documents:
+
+- [For time attributes](https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/time_attributes.html)
+- [For SQL DDL](https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html#create-table)
+
+After creating the `user_behavior` table in the SQL CLI, run `show tables;` and `describe user_behavior;` to see registered tables and table details. Also, run the command `SELECT * FROM user_behavior;` directly in the SQL CLI to preview the data (press `q` to exit).
+
+Next, we discover more about Flink SQL through three real scenarios.
+
+# Hourly Trading Volume
+
+## Create Elasticsearch table using DDL
+
+Let’s create an Elasticsearch result table in the SQL CLI. We need two columns in this case: `hour_of_day` and `buy_cnt` (trading volume).
+
+```sql
+CREATE TABLE buy_cnt_per_hour (
+    hour_of_day BIGINT,
+    buy_cnt BIGINT
+) WITH (
+    'connector.type' = 'elasticsearch', -- using elasticsearch connector
+    'connector.version' = '6',  -- elasticsearch version, 6 supports both 6+ and 7+
+    'connector.hosts' = 'http://localhost:9200',  -- elasticsearch address
+    'connector.index' = 'buy_cnt_per_hour',  -- elasticsearch index name, similar to database table name
+    'connector.document-type' = 'user_behavior', -- elasticsearch type name
+    'connector.bulk-flush.max-actions' = '1',  -- refresh for every row
+    'format.type' = 'json',  -- output data in json format
+    'update-mode' = 'append'
+);
+```
+
+There is no need to create the `buy_cnt_per_hour` index in Elasticsearch in advance since Elasticsearch will automatically create the index if it doesn’t exist.

Review comment:
       Do we need to add `,` before `in advance`? In my understanding, it belongs to the "create index".

##########
File path: _posts/2020-05-03-flink-sql-demo-building-e2e-streaming-application.md
##########
@@ -0,0 +1,338 @@
+---
+layout: post
+title: "Flink SQL Demo: Building an End-to-End Streaming Application"
+date: 2020-05-03T12:00:00.000Z
+categories: news
+authors:
+- jark:
+  name: "Jark Wu"
+  twitter: "JarkWu"
+excerpt: Apache Flink 1.10 has released many exciting new features, including many developments in Flink SQL which is evolving at a fast pace. This article takes a closer look at how to quickly build streaming applications with Flink SQL from a practical point of view.
+---
+
+Apache Flink 1.10.0 has released many exciting new features, including many developments in Flink SQL which is evolving at a fast pace. This article takes a closer look at how to quickly build streaming applications with Flink SQL from a practical point of view.
+
+In the following sections, we describe how to integrate Kafka, MySQL, Elasticsearch, and Kibana with Flink SQL to analyze ecommerce user behavior in real-time. All exercises in this article are performed in the Flink SQL CLI, while the entire process uses plain SQL text, without a single line of Java or Scala code or IDE installation. The final result of this demo is shown in the following figure:
+
+<center>
+<img src="{{ site.baseurl }}/img/blog/2020-05-03-flink-sql-demo/image1.png" width="800px" alt="Demo Overview"/>
+</center>
+<br>
+
+# Preparation
+
+Prepare a Linux or MacOS computer with Docker and Java 8 installed. A Java environment is required because we will install and run Flink cluster in the host environment, not in a Docker container.
+
+## Use Docker Compose to Start Demo Environment
+
+The components required in this demo (except for Flink) are all managed in containers, so we will use `docker-compose` to start them. First, download the `docker-compose.yml` file that defines the demo environment, for example by running the following commands:
+
+```
+mkdir flink-demo; cd flink-demo;
+wget https://raw.githubusercontent.com/wuchong/flink-sql-demo/master/docker-compose.yml
+```
+
+The Docker Compose environment consists of the following containers:
+
+- **MySQL:** MySQL 5.7 and a `category` table in the database. The `category` table will be joined with data in Kafka to enrich the real-time data.
+- **Kafka:** It is mainly used as a data source. The DataGen component automatically writes data into a Kafka topic.
+- **Zookeeper:** This component is required by Kafka.
+- **Elasticsearch:** It is mainly used as a data sink.
+- **Kibana:** It's used to visualize the data in Elasticsearch.
+- **DataGen:** It is the data generator. After the container is started, user behavior data is automatically generated and sent to the Kafka topic. By default, 2000 data entries are generated each second for about 1.5 hours. You can modify datagen's `speedup` parameter in `docker-compose.yml` to adjust the generation rate (which takes effect after docker compose is restarted).
+
+**Important:** Before starting the containers, we recommend configuring Docker so that sufficient resources are available and the environment does not become unresponsive. We suggest running Docker at 3-4 GB memory and 3-4 CPU cores.
+
+To start all containers, run the following command in the directory that contains the `docker-compose.yml` file.
+
+```
+docker-compose up -d
+```
+
+This command automatically starts all the containers defined in the Docker Compose configuration in a detached mode. Run `docker ps` to check whether the five containers are running properly. You can also visit [http://localhost:5601/](http://localhost:5601/) to see if Kibana is running normally.
+
+Don’t forget to run the following command to stop all containers after you finished the tutorial:
+
+```
+docker-compose down
+```
+
+## Download and Install Flink Cluster
+
+We recommend to manually download and install Flink on your host system, instead of starting Flink through Docker because you’ll get a more intuitive understanding of the components, dependencies, and scripts of Flink.
+
+1. Download and decompress [Apache Flink 1.10.0](https://www.apache.org/dist/flink/flink-1.10.0/flink-1.10.0-bin-scala_2.11.tgz) into the `flink-1.10.0` directory:
+2. Go to the `flink-1.10.0` directory by running `cd flink-1.10.0`.
+3. Run the following command to download the JAR dependency package and copy it to the `lib/` directory.
+
+    ```
+wget -P ./lib/ https://repo1.maven.org/maven2/org/apache/flink/flink-json/1.10.0/flink-json-1.10.0.jar | \
+    wget -P ./lib/ https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.10.0/flink-sql-connector-kafka_2.11-1.10.0.jar | \
+    wget -P ./lib/ https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch6_2.11/1.10.0/flink-sql-connector-elasticsearch6_2.11-1.10.0.jar | \
+    wget -P ./lib/ https://repo1.maven.org/maven2/org/apache/flink/flink-jdbc_2.11/1.10.0/flink-jdbc_2.11-1.10.0.jar | \
+    wget -P ./lib/ https://repo1.maven.org/maven2/mysql/mysql-connector-java/5.1.48/mysql-connector-java-5.1.48.jar
+```
+
+4. In `conf/flink-conf.yaml`, set `taskmanager.numberOfTaskSlots` to `10`, since during this demo we will be launching multiple jobs.
+5. Run `./bin/start-cluster.sh` to start the cluster. Check if Flink is up by accessing the Flink Web UI at [http://localhost:8081](http://localhost:8081). The number of available slots should be 10.
+<center>
+<img src="{{ site.baseurl }}/img/blog/2020-05-03-flink-sql-demo/image2.png" width="800px" alt="Demo Overview"/>
+</center>
+<br>
+6. Run `bin/sql-client.sh embedded` to start the SQL CLI. You will see the following squirrel welcome page.
+
+<center>
+<img src="{{ site.baseurl }}/img/blog/2020-05-03-flink-sql-demo/image3.png" width="800px" alt="Flink SQL CLI welcome page"/>
+</center>
+<br>
+
+# Create a Kafka table using DDL
+
+The Datagen container continuously writes events into the Kafka `user_behavior` topic. This data contains the user behavior on the day of November 27, 2017 (behaviors include “click”, “like”, “purchase” and “add to shopping cart” events). Each row represents a user behavior event, with the user ID, product ID, product category ID, event type, and timestamp in JSON format. Note that the dataset is from the [Alibaba Cloud Tianchi public dataset](https://tianchi.aliyun.com/dataset/dataDetail?dataId=649).
+
+In the directory that contains `docker-compose.yml`, run the following command to view the first 10 data entries generated in the Kafka topic:
+
+```
+docker-compose exec kafka bash -c 'kafka-console-consumer.sh --topic user_behavior --bootstrap-server kafka:9094 --from-beginning --max-messages 10'
+
+{"user_id": "952483", "item_id":"310884", "category_id": "4580532", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"}
+{"user_id": "794777", "item_id":"5119439", "category_id": "982926", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"}
+...
+```
+
+In order to make the events in the Kafka topic accessible to Flink SQL, we run the following DDL statement to create a table that connects to the topic in the Kafka cluster:
+
+```sql
+CREATE TABLE user_behavior (
+    user_id BIGINT,
+    item_id BIGINT,
+    category_id BIGINT,
+    behavior STRING,
+    ts TIMESTAMP(3),
+    proctime AS PROCTIME(),   -- generates processing-time attribute using computed column
+    WATERMARK FOR ts AS ts - INTERVAL '5' SECOND  -- defines watermark on ts column, marks ts as event-time attribute
+) WITH (
+    'connector.type' = 'kafka',  -- using kafka connector
+    'connector.version' = 'universal',  -- kafka version, universal supports Kafka 0.11+
+    'connector.topic' = 'user_behavior',  -- kafka topic
+    'connector.startup-mode' = 'earliest-offset',  -- reading from the beginning
+    'connector.properties.zookeeper.connect' = 'localhost:2181',  -- zookeeper address
+    'connector.properties.bootstrap.servers' = 'localhost:9092',  -- kafka broker address
+    'format.type' = 'json'  -- the data format is json
+);
+```
+
+The above snippet declares five fields based on the data format. In addition, it uses the computed column syntax and built-in `PROCTIME()` function to declare a virtual column that generates the processing-time attribute. It also uses the WATERMARK syntax to declare the watermark strategy on the `ts` field (tolerate 5-seconds out-of-order). Therefore, the `ts` field becomes an event-time attribute. For more information about time attributes and DDL syntax, see the following official documents:
+
+- [For time attributes](https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/time_attributes.html)
+- [For SQL DDL](https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html#create-table)
+
+After creating the `user_behavior` table in the SQL CLI, run `show tables;` and `describe user_behavior;` to see registered tables and table details. Also, run the command `SELECT * FROM user_behavior;` directly in the SQL CLI to preview the data (press `q` to exit).
+
+Next, we discover more about Flink SQL through three real scenarios.
+
+# Hourly Trading Volume
+
+## Create Elasticsearch table using DDL
+
+Let’s create an Elasticsearch result table in the SQL CLI. We need two columns in this case: `hour_of_day` and `buy_cnt` (trading volume).
+
+```sql
+CREATE TABLE buy_cnt_per_hour (
+    hour_of_day BIGINT,
+    buy_cnt BIGINT
+) WITH (
+    'connector.type' = 'elasticsearch', -- using elasticsearch connector
+    'connector.version' = '6',  -- elasticsearch version, 6 supports both 6+ and 7+
+    'connector.hosts' = 'http://localhost:9200',  -- elasticsearch address
+    'connector.index' = 'buy_cnt_per_hour',  -- elasticsearch index name, similar to database table name
+    'connector.document-type' = 'user_behavior', -- elasticsearch type name
+    'connector.bulk-flush.max-actions' = '1',  -- refresh for every row
+    'format.type' = 'json',  -- output data in json format
+    'update-mode' = 'append'
+);
+```
+
+There is no need to create the `buy_cnt_per_hour` index in Elasticsearch in advance since Elasticsearch will automatically create the index if it doesn’t exist.
+
+## Submit a Query
+
+The hourly trading volume is the number of "buy" behaviors completed each hour. Therefore, we can use a TUMBLE window function to assign data into hourly windows. Then, we count the number of “buy” records in each window. To implement this, we can filter out the "buy" data first and then apply `COUNT(*)`.
+
+```sql
+INSERT INTO buy_cnt_per_hour
+SELECT HOUR(TUMBLE_START(ts, INTERVAL '1' HOUR)), COUNT(*)
+FROM user_behavior
+WHERE behavior = 'buy'
+GROUP BY TUMBLE(ts, INTERVAL '1' HOUR);
+```
+
+Here, we use the built-in `HOUR` function to extract the value for each hour in the day from a `TIMESTAMP` column. Use `INSERT INTO` to start a Flink SQL job that continuously writes results into the Elasticsearch `buy_cnt_per_hour` index. The ES result table can be seen as a materialized view of the query. You can find more information about Flink’s window aggregation in the [Apache Flink documentation](https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#group-windows).
+
+After running the previous query in the Flink SQL CLI, we can observe the submitted task on the Flink Web UI. This task is a streaming task and therefore runs continuously.
+
+<center>
+<img src="{{ site.baseurl }}/img/blog/2020-05-03-flink-sql-demo/image4.png" width="800px" alt="Flink Dashboard"/>
+</center>
+<br>
+
+## Using Kibana to Visualize Results
+
+Access Kibana at [http://localhost:5601](http://localhost:5601). First, configure an index pattern by clicking "Management" in the left-side toolbar and find "Index Patterns". Next, click "Create Index Pattern" and enter the full index name `buy_cnt_per_hour` to create the index pattern. After creating the index pattern, we can explore data in Kibana.
+
+Note: since we are using the TUMBLE window of one hour here, it might take about 4 minutes since containers started until the first row is emitted. Until then the index doesn’t exist and Kibana won’t find the index.
+
+Click "Discover" in the left-side toolbar. Kibana lists the content of the created index.
+
+<center>
+<img src="{{ site.baseurl }}/img/blog/2020-05-03-flink-sql-demo/image5.png" width="800px" alt="Kibana Discover"/>
+</center>
+<br>
+
+Next, create a dashboard to display various views. Click "Dashboard" on the left side of the page to create a dashboard named "User Behavior Analysis". Then, click "Create New" to create a new view. Select "Area"  (area graph), then select the `buy_cnt_per_hour` index, and draw the trading volume area chart as illustrated in the configuration on the left side of the following diagram. Apply the changes by clicking the “▶” play button. Then, save it as "Hourly Trading Volume".
+
+<center>
+<img src="{{ site.baseurl }}/img/blog/2020-05-03-flink-sql-demo/image6.png" width="800px" alt="Hourly Trading Volume"/>
+</center>
+<br>
+
+You can see that during the early morning hours the number of transactions have the lowest value for the entire day.
+
+As real-time data is added into the indices, you can enable auto-refresh in Kibana to feel the visualization is changing in real-time. You can click the timepicker, enter a refresh interval (e.g. 3 seconds) in the “Refresh every” field to enable auto-refresh.
+Cumulative number of Unique Visitors every 10-min

Review comment:
       I think I missed to mark this as a header. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org