You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sj...@apache.org on 2020/07/28 13:32:52 UTC

[flink-web] 02/06: Update contents to 1.11.0

This is an automated email from the ASF dual-hosted git repository.

sjwiesman pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/flink-web.git

commit 4d92336fcc08be53bf53787d63ed71f413742439
Author: Jark Wu <ja...@apache.org>
AuthorDate: Sun Jul 5 11:19:17 2020 +0800

    Update contents to 1.11.0
---
 ...-sql-demo-building-e2e-streaming-application.md | 182 ++++++++-------------
 img/blog/2020-05-03-flink-sql-demo/image1.gif      | Bin 0 -> 964219 bytes
 img/blog/2020-05-03-flink-sql-demo/image1.png      | Bin 169097 -> 0 bytes
 img/blog/2020-05-03-flink-sql-demo/image2.png      | Bin 82835 -> 0 bytes
 img/blog/2020-05-03-flink-sql-demo/image3.png      | Bin 509096 -> 99027 bytes
 img/blog/2020-05-03-flink-sql-demo/image4.jpg      | Bin 0 -> 418490 bytes
 img/blog/2020-05-03-flink-sql-demo/image4.png      | Bin 189699 -> 0 bytes
 img/blog/2020-05-03-flink-sql-demo/image5.jpg      | Bin 0 -> 262516 bytes
 img/blog/2020-05-03-flink-sql-demo/image5.png      | Bin 125472 -> 0 bytes
 img/blog/2020-05-03-flink-sql-demo/image6.jpg      | Bin 0 -> 296556 bytes
 img/blog/2020-05-03-flink-sql-demo/image6.png      | Bin 161101 -> 0 bytes
 img/blog/2020-05-03-flink-sql-demo/image7.jpg      | Bin 0 -> 336316 bytes
 img/blog/2020-05-03-flink-sql-demo/image7.png      | Bin 161440 -> 0 bytes
 img/blog/2020-05-03-flink-sql-demo/image8.jpg      | Bin 0 -> 322865 bytes
 img/blog/2020-05-03-flink-sql-demo/image8.png      | Bin 151899 -> 0 bytes
 15 files changed, 70 insertions(+), 112 deletions(-)

diff --git a/_posts/2020-05-03-flink-sql-demo-building-e2e-streaming-application.md b/_posts/2020-05-03-flink-sql-demo-building-e2e-streaming-application.md
index 0af26a8..14450a5 100644
--- a/_posts/2020-05-03-flink-sql-demo-building-e2e-streaming-application.md
+++ b/_posts/2020-05-03-flink-sql-demo-building-e2e-streaming-application.md
@@ -7,33 +7,35 @@ authors:
 - jark:
   name: "Jark Wu"
   twitter: "JarkWu"
-excerpt: Apache Flink 1.10 has released many exciting new features, including many developments in Flink SQL which is evolving at a fast pace. This article takes a closer look at how to quickly build streaming applications with Flink SQL from a practical point of view.
+excerpt: Apache Flink 1.11 has released many exciting new features, including many developments in Flink SQL which is evolving at a fast pace. This article takes a closer look at how to quickly build streaming applications with Flink SQL from a practical point of view.
 ---
 
-Apache Flink 1.10.0 has released many exciting new features, including many developments in Flink SQL which is evolving at a fast pace. This article takes a closer look at how to quickly build streaming applications with Flink SQL from a practical point of view.
+Apache Flink 1.11.0 has released many exciting new features, including many developments in Flink SQL which is evolving at a fast pace. This article takes a closer look at how to quickly build streaming applications with Flink SQL from a practical point of view.
 
 In the following sections, we describe how to integrate Kafka, MySQL, Elasticsearch, and Kibana with Flink SQL to analyze e-commerce user behavior in real-time. All exercises in this article are performed in the Flink SQL CLI, while the entire process uses standard SQL syntax, without a single line of Java or Scala code or IDE installation. The final result of this demo is shown in the following figure:
 
 <center>
-<img src="{{ site.baseurl }}/img/blog/2020-05-03-flink-sql-demo/image1.png" width="800px" alt="Demo Overview"/>
+<img src="{{ site.baseurl }}/img/blog/2020-05-03-flink-sql-demo/image1.gif" width="800px" alt="Demo Overview"/>
 </center>
 <br>
 
 # Preparation
 
-Prepare a Linux or MacOS computer with Docker and Java 8 installed. A Java environment is required because we will install and run a Flink cluster in the host environment, not in a Docker container.
+Prepare a Linux or MacOS computer with Docker installed.
 
 ## Use Docker Compose to Start Demo Environment
 
-The components required in this demo (except for Flink) are all managed in containers, so we will use `docker-compose` to start them. First, download the `docker-compose.yml` file that defines the demo environment, for example by running the following commands:
+The components required in this demo are all managed in containers, so we will use `docker-compose` to start them. First, download the `docker-compose.yml` file that defines the demo environment, for example by running the following commands:
 
 ```
-mkdir flink-demo; cd flink-demo;
-wget https://raw.githubusercontent.com/wuchong/flink-sql-demo/master/docker-compose.yml
+mkdir flink-sql-demo; cd flink-sql-demo;
+wget https://raw.githubusercontent.com/wuchong/flink-sql-demo/v1.11-EN/docker-compose.yml
 ```
 
 The Docker Compose environment consists of the following containers:
 
+- **Flink SQL CLI:** It's used to submit queries and visualize their results.
+- **Flink Cluster:** A Flink master and a Flink worker container to execute queries.
 - **MySQL:** MySQL 5.7 and a `category` table in the database. The `category` table will be joined with data in Kafka to enrich the real-time data.
 - **Kafka:** It is mainly used as a data source. The DataGen component automatically writes data into a Kafka topic.
 - **Zookeeper:** This component is required by Kafka.
@@ -49,7 +51,7 @@ To start all containers, run the following command in the directory that contain
 docker-compose up -d
 ```
 
-This command automatically starts all the containers defined in the Docker Compose configuration in a detached mode. Run `docker ps` to check whether the five containers are running properly. You can also visit [http://localhost:5601/](http://localhost:5601/) to see if Kibana is running normally.
+This command automatically starts all the containers defined in the Docker Compose configuration in a detached mode. Run `docker ps` to check whether the 9 containers are running properly. You can also visit [http://localhost:5601/](http://localhost:5601/) to see if Kibana is running normally.
 
 Don’t forget to run the following command to stop all containers after you finished the tutorial:
 
@@ -57,38 +59,25 @@ Don’t forget to run the following command to stop all containers after you fin
 docker-compose down
 ```
 
-## Download and Install Flink Cluster
+## Entering the Flink SQL CLI client
 
-We recommend to manually download and install Flink on your host system, instead of starting Flink through Docker because you’ll get a more intuitive understanding of the components, dependencies, and scripts of Flink.
+To enter the SQL CLI client run:
 
-1. Download and decompress [Apache Flink 1.10.0](https://www.apache.org/dist/flink/flink-1.10.0/flink-1.10.0-bin-scala_2.11.tgz) into the `flink-1.10.0` directory:
-2. Go to the `flink-1.10.0` directory by running `cd flink-1.10.0`.
-3. Run the following command to download the JAR dependency package and copy it to the `lib/` directory.
-
-    ```
-wget -P ./lib/ https://repo1.maven.org/maven2/org/apache/flink/flink-json/1.10.0/flink-json-1.10.0.jar | \
-    wget -P ./lib/ https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.10.0/flink-sql-connector-kafka_2.11-1.10.0.jar | \
-    wget -P ./lib/ https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch6_2.11/1.10.0/flink-sql-connector-elasticsearch6_2.11-1.10.0.jar | \
-    wget -P ./lib/ https://repo1.maven.org/maven2/org/apache/flink/flink-jdbc_2.11/1.10.0/flink-jdbc_2.11-1.10.0.jar | \
-    wget -P ./lib/ https://repo1.maven.org/maven2/mysql/mysql-connector-java/5.1.48/mysql-connector-java-5.1.48.jar
+```bash
+docker-compose exec sql-client ./sql-client.sh
 ```
 
-4. In `conf/flink-conf.yaml`, set `taskmanager.numberOfTaskSlots` to `10`, since during this demo we will be launching multiple jobs.
-5. Run `./bin/start-cluster.sh` to start the cluster. Check that Flink is up and running by accessing the Flink Web UI at [http://localhost:8081](http://localhost:8081). The number of available slots should be 10.
-<center>
-<img src="{{ site.baseurl }}/img/blog/2020-05-03-flink-sql-demo/image2.png" width="800px" alt="Demo Overview"/>
-</center>
-<br>
-6. Run `bin/sql-client.sh embedded` to start the SQL CLI. You will see the following squirrel welcome page.
+The command starts the SQL CLI client in the container.
+You should see the welcome screen of the CLI client.
 
 <center>
-<img src="{{ site.baseurl }}/img/blog/2020-05-03-flink-sql-demo/image3.png" width="800px" alt="Flink SQL CLI welcome page"/>
+<img src="{{ site.baseurl }}/img/blog/2020-05-03-flink-sql-demo/image3.png" width="700px" alt="Flink SQL CLI welcome page"/>
 </center>
 <br>
 
 # Create a Kafka table using DDL
 
-The Datagen container continuously writes events into the Kafka `user_behavior` topic. This data contains the user behavior on the day of November 27, 2017 (behaviors include “click”, “like”, “purchase” and “add to shopping cart” events). Each row represents a user behavior event, with the user ID, product ID, product category ID, event type, and timestamp in JSON format. Note that the dataset is from the [Alibaba Cloud Tianchi public dataset](https://tianchi.aliyun.com/dataset/dataDetai [...]
+The DataGen container continuously writes events into the Kafka `user_behavior` topic. This data contains the user behavior on the day of November 27, 2017 (behaviors include “click”, “like”, “purchase” and “add to shopping cart” events). Each row represents a user behavior event, with the user ID, product ID, product category ID, event type, and timestamp in JSON format. Note that the dataset is from the [Alibaba Cloud Tianchi public dataset](https://tianchi.aliyun.com/dataset/dataDetai [...]
 
 In the directory that contains `docker-compose.yml`, run the following command to view the first 10 data entries generated in the Kafka topic:
 
@@ -100,7 +89,7 @@ docker-compose exec kafka bash -c 'kafka-console-consumer.sh --topic user_behavi
 ...
 ```
 
-In order to make the events in the Kafka topic accessible to Flink SQL, we run the following DDL statement to create a table that connects to the topic in the Kafka cluster:
+In order to make the events in the Kafka topic accessible to Flink SQL, we run the following DDL statement in SQL CLI to create a table that connects to the topic in the Kafka cluster:
 
 ```sql
 CREATE TABLE user_behavior (
@@ -112,20 +101,18 @@ CREATE TABLE user_behavior (
     proctime AS PROCTIME(),   -- generates processing-time attribute using computed column
     WATERMARK FOR ts AS ts - INTERVAL '5' SECOND  -- defines watermark on ts column, marks ts as event-time attribute
 ) WITH (
-    'connector.type' = 'kafka',  -- using kafka connector
-    'connector.version' = 'universal',  -- kafka version, universal supports Kafka 0.11+
-    'connector.topic' = 'user_behavior',  -- kafka topic
-    'connector.startup-mode' = 'earliest-offset',  -- reading from the beginning
-    'connector.properties.zookeeper.connect' = 'localhost:2181',  -- zookeeper address
-    'connector.properties.bootstrap.servers' = 'localhost:9092',  -- kafka broker address
-    'format.type' = 'json'  -- the data format is json
+    'connector' = 'kafka',  -- using kafka connector
+    'topic' = 'user_behavior',  -- kafka topic
+    'scan.startup.mode' = 'earliest-offset',  -- reading from the beginning
+    'properties.bootstrap.servers' = 'kafka:9094',  -- kafka broker address
+    'format' = 'json'  -- the data format is json
 );
 ```
 
 The above snippet declares five fields based on the data format. In addition, it uses the computed column syntax and built-in `PROCTIME()` function to declare a virtual column that generates the processing-time attribute. It also uses the WATERMARK syntax to declare the watermark strategy on the `ts` field (tolerate 5-seconds out-of-order). Therefore, the `ts` field becomes an event-time attribute. For more information about time attributes and DDL syntax, see the following official documents:
 
-- [For time attributes](https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/time_attributes.html)
-- [For SQL DDL](https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html#create-table)
+- [For time attributes](https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/time_attributes.html)
+- [For SQL DDL](https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/create.html#create-table)
 
 After creating the `user_behavior` table in the SQL CLI, run `show tables;` and `describe user_behavior;` to see registered tables and table details. Also, run the command `SELECT * FROM user_behavior;` directly in the SQL CLI to preview the data (press `q` to exit).
 
@@ -142,14 +129,9 @@ CREATE TABLE buy_cnt_per_hour (
     hour_of_day BIGINT,
     buy_cnt BIGINT
 ) WITH (
-    'connector.type' = 'elasticsearch', -- using elasticsearch connector
-    'connector.version' = '6',  -- elasticsearch version, 6 supports both 6+ and 7+
-    'connector.hosts' = 'http://localhost:9200',  -- elasticsearch address
-    'connector.index' = 'buy_cnt_per_hour',  -- elasticsearch index name, similar to database table name
-    'connector.document-type' = 'user_behavior', -- elasticsearch type name
-    'connector.bulk-flush.max-actions' = '1',  -- refresh for every row
-    'format.type' = 'json',  -- output data in json format
-    'update-mode' = 'append'
+    'connector' = 'elasticsearch-7', -- using elasticsearch connector
+    'hosts' = 'http://elasticsearch:9200',  -- elasticsearch address
+    'index' = 'buy_cnt_per_hour'  -- elasticsearch index name, similar to database table name
 );
 ```
 
@@ -167,12 +149,12 @@ WHERE behavior = 'buy'
 GROUP BY TUMBLE(ts, INTERVAL '1' HOUR);
 ```
 
-Here, we use the built-in `HOUR` function to extract the value for each hour in the day from a `TIMESTAMP` column. Use `INSERT INTO` to start a Flink SQL job that continuously writes results into the Elasticsearch `buy_cnt_per_hour` index. The Elasticearch result table can be seen as a materialized view of the query. You can find more information about Flink’s window aggregation in the [Apache Flink documentation](https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql [...]
+Here, we use the built-in `HOUR` function to extract the value for each hour in the day from a `TIMESTAMP` column. Use `INSERT INTO` to start a Flink SQL job that continuously writes results into the Elasticsearch `buy_cnt_per_hour` index. The Elasticearch result table can be seen as a materialized view of the query. You can find more information about Flink’s window aggregation in the [Apache Flink documentation](https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql [...]
 
 After running the previous query in the Flink SQL CLI, we can observe the submitted task on the Flink Web UI. This task is a streaming task and therefore runs continuously.
 
 <center>
-<img src="{{ site.baseurl }}/img/blog/2020-05-03-flink-sql-demo/image4.png" width="800px" alt="Flink Dashboard"/>
+<img src="{{ site.baseurl }}/img/blog/2020-05-03-flink-sql-demo/image4.jpg" width="800px" alt="Flink Dashboard"/>
 </center>
 <br>
 
@@ -185,14 +167,14 @@ Note: since we are using the TUMBLE window of one hour here, it might take about
 Click "Discover" in the left-side toolbar. Kibana lists the content of the created index.
 
 <center>
-<img src="{{ site.baseurl }}/img/blog/2020-05-03-flink-sql-demo/image5.png" width="800px" alt="Kibana Discover"/>
+<img src="{{ site.baseurl }}/img/blog/2020-05-03-flink-sql-demo/image5.jpg" width="800px" alt="Kibana Discover"/>
 </center>
 <br>
 
 Next, create a dashboard to display various views. Click "Dashboard" on the left side of the page to create a dashboard named "User Behavior Analysis". Then, click "Create New" to create a new view. Select "Area" (area graph), then select the `buy_cnt_per_hour` index, and draw the trading volume area chart as illustrated in the configuration on the left side of the following diagram. Apply the changes by clicking the “▶” play button. Then, save it as "Hourly Trading Volume".
 
 <center>
-<img src="{{ site.baseurl }}/img/blog/2020-05-03-flink-sql-demo/image6.png" width="800px" alt="Hourly Trading Volume"/>
+<img src="{{ site.baseurl }}/img/blog/2020-05-03-flink-sql-demo/image6.jpg" width="800px" alt="Hourly Trading Volume"/>
 </center>
 <br>
 
@@ -204,51 +186,42 @@ As real-time data is added into the indices, you can enable auto-refresh in Kiba
 
 Another interesting visualization is the cumulative number of unique visitors (UV). For example, the number of UV at 10:00 represents the total number of UV from 00:00 to 10:00. Therefore, the curve is monotonically increasing.
 
-Let’s create another Elasticsearch table in the SQL CLI to store the UV results. This table has two columns: time and cumulative UVs.
+Let’s create another Elasticsearch table in the SQL CLI to store the UV results. This table contains 3 columns: date, time and cumulative UVs.
+The `date_str` and `time_str` column are defined as primary key, Elasticsearch sink will use them to calculate the document id and work in upsert mode to update UV values under the document id.
 
 ```sql
 CREATE TABLE cumulative_uv (
+    date_str STRING,
     time_str STRING,
-    uv BIGINT
+    uv BIGINT,
+    PRIMARY KEY (date_str, time_str) NOT ENFORCED
 ) WITH (
-    'connector.type' = 'elasticsearch',
-    'connector.version' = '6',
-    'connector.hosts' = 'http://localhost:9200',
-    'connector.index' = 'cumulative_uv',
-    'connector.document-type' = 'user_behavior',
-    'format.type' = 'json',
-    'update-mode' = 'upsert'
+    'connector' = 'elasticsearch-7',
+    'hosts' = 'http://elasticsearch:9200',
+    'index' = 'cumulative_uv'
 );
 ```
 
-We can use SQL’s OVER and WINDOW clauses to calculate the cumulative UVs (the number of UVs from 00:00 to current record) and the time point of the current record. Here, the built-in `COUNT(DISTINCT user_id)` is used to count the number of UVs. Flink SQL has significant performance improvements for COUNT DISTINCT.
-
-```sql
-CREATE VIEW uv_per_10min AS
-SELECT
-  MAX(SUBSTR(DATE_FORMAT(ts, 'HH:mm'),1,4) || '0') OVER w AS time_str,
-  COUNT(DISTINCT user_id) OVER w AS uv
-FROM user_behavior
-WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW);
-```
-
-We use SUBSTR, `DATE_FORMAT`, and `||` functions to convert a TIMESTAMP field into a 10-minute interval time string, such as 12:10, 12:20. You can find more information about Flink's support for [OVER WINDOW](https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#aggregations) in the Apache Flink documentation.
-
-Additionally, we use the `CREATE VIEW` syntax to register the query as a logical view, allowing us to easily reference this query in subsequent queries and simplify nested queries. Please note that creating a logical view does not trigger the execution of the job and the view results are not persisted. Therefore, this statement is lightweight and does not have additional overhead.
-
-However, `uv_per_10min` generates an output row for each input row. We can perform an aggregation after `uv_per_10min` to group data by `time_str`, so that only one row is stored in Elasticsearch for every 10 minutes. This can greatly ease the workload of Elasticsearch.
+We can extract the date and time using `DATE_FORMAT` function based on the `ts` field. As the section title describes, we only need to report every 10 minute. So we can use `SUBSTR` and the string concat function `||` to convert the time value into a 10-minute interval time string, such as `12:00`, `12:10`.
+Next, we group data by `date_str` and perform a `COUNT DISTINCT` aggregation on `user_id` to get the current cumulative UV in this day. Additionally, we perform a `MAX` aggregation on `time_str` field to get the current stream time: the maximum event time observed so far.
+As the maximum time is also a part of the primary key of the sink, the final result is that we will insert a new point into the elasticsearch every 10 minute. And every latest point will be updated continuously until the next 10-minute point is generated.
 
 ```sql
 INSERT INTO cumulative_uv
-SELECT time_str, MAX(uv)
-FROM uv_per_10min
-GROUP BY time_str;
+SELECT date_str, MAX(time_str), COUNT(DISTINCT user_id) as uv
+FROM (
+  SELECT
+    DATE_FORMAT(ts, 'yyyy-MM-dd') as date_str,
+    SUBSTR(DATE_FORMAT(ts, 'HH:mm'),1,4) || '0' as time_str,
+    user_id
+  FROM user_behavior)
+GROUP BY date_str;
 ```
 
 After submitting this query, we create a `cumulative_uv` index pattern in Kibana. We then create a "Line" (line graph) on the dashboard, by selecting the `cumulative_uv` index, and drawing the cumulative UV curve according to the configuration on the left side of the following figure before finally, saving the curve.
 
 <center>
-<img src="{{ site.baseurl }}/img/blog/2020-05-03-flink-sql-demo/image7.png" width="800px" alt="Cumulative Unique Visitors every 10-min"/>
+<img src="{{ site.baseurl }}/img/blog/2020-05-03-flink-sql-demo/image7.jpg" width="800px" alt="Cumulative Unique Visitors every 10-min"/>
 </center>
 <br>
 
@@ -261,55 +234,40 @@ Create a table in the SQL CLI to make the data in MySQL accessible to Flink SQL.
 ```sql
 CREATE TABLE category_dim (
     sub_category_id BIGINT,
-    parent_category_id BIGINT
+    parent_category_name STRING
 ) WITH (
-    'connector.type' = 'jdbc',
-    'connector.url' = 'jdbc:mysql://localhost:3306/flink',
-    'connector.table' = 'category',
-    'connector.driver' = 'com.mysql.jdbc.Driver',
-    'connector.username' = 'root',
-    'connector.password' = '123456',
-    'connector.lookup.cache.max-rows' = '5000',
-    'connector.lookup.cache.ttl' = '10min'
+    'connector' = 'jdbc',
+    'url' = 'jdbc:mysql://mysql:3306/flink',
+    'table-name' = 'category',
+    'username' = 'root',
+    'password' = '123456',
+    'lookup.cache.max-rows' = '5000',
+    'lookup.cache.ttl' = '10min'
 );
 ```
 
-The underlying of JDBC connectors implements `LookupableTableSource` interface, so the created JDBC table `category_dim` can be used as a temporal table (aka. lookup table) out-of-the-box in the data enrichment.
+The underlying of JDBC connectors implements `LookupTableSource` interface, so the created JDBC table `category_dim` can be used as a temporal table (aka. lookup table) out-of-the-box in the data enrichment.
 
 In addition, create an Elasticsearch table to store the category statistics.
 
 ```sql
 CREATE TABLE top_category (
-    category_name STRING,
+    category_name STRING PRIMARY KEY NOT ENFORCED,
     buy_cnt BIGINT
 ) WITH (
-    'connector.type' = 'elasticsearch',
-    'connector.version' = '6',
-    'connector.hosts' = 'http://localhost:9200',
-    'connector.index' = 'top_category',
-    'connector.document-type' = 'user_behavior',
-    'format.type' = 'json',
-    'update-mode' = 'upsert'
+    'connector' = 'elasticsearch-7',
+    'hosts' = 'http://elasticsearch:9200',
+    'index' = 'top_category'
 );
 ```
 
-In order to enrich the category names, we use Flink SQL’s temporal table joins to join a dimension table. You can access more information about [temporal joins](https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/joins.html#join-with-a-temporal-table) in the Flink documentation:
+In order to enrich the category names, we use Flink SQL’s temporal table joins to join a dimension table. You can access more information about [temporal joins](https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html#join-with-a-temporal-table) in the Flink documentation:
 
+Additionally, we use the `CREATE VIEW` syntax to register the query as a logical view, allowing us to easily reference this query in subsequent queries and simplify nested queries. Please note that creating a logical view does not trigger the execution of the job and the view results are not persisted. Therefore, this statement is lightweight and does not have additional overhead.
 
 ```sql
 CREATE VIEW rich_user_behavior AS
-SELECT U.user_id, U.item_id, U.behavior,
-  CASE C.parent_category_id
-    WHEN 1 THEN 'Clothing & Shoes'
-    WHEN 2 THEN 'Home & Kitchens'
-    WHEN 3 THEN 'Books'
-    WHEN 4 THEN 'Electronics'
-    WHEN 5 THEN 'Tools'
-    WHEN 6 THEN 'Cell Phones'
-    WHEN 7 THEN 'Sports & Outdoors'
-    WHEN 8 THEN 'Foods'
-    ELSE 'Others'
-  END AS category_name
+SELECT U.user_id, U.item_id, U.behavior, C.parent_category_name as category_name
 FROM user_behavior AS U LEFT JOIN category_dim FOR SYSTEM_TIME AS OF U.proctime AS C
 ON U.category_id = C.sub_category_id;
 ```
@@ -327,7 +285,7 @@ GROUP BY category_name;
 After submitting the query, we create a `top_category` index pattern in Kibana. We then  create a "Horizontal Bar" (bar graph) on the dashboard, by selecting the `top_category` index and drawing the category ranking according to the configuration on the left side of the following diagram before finally saving the list.
 
 <center>
-<img src="{{ site.baseurl }}/img/blog/2020-05-03-flink-sql-demo/image8.png" width="800px" alt="Top Categories"/>
+<img src="{{ site.baseurl }}/img/blog/2020-05-03-flink-sql-demo/image8.jpg" width="800px" alt="Top Categories"/>
 </center>
 <br>
 
diff --git a/img/blog/2020-05-03-flink-sql-demo/image1.gif b/img/blog/2020-05-03-flink-sql-demo/image1.gif
new file mode 100644
index 0000000..7689e60
Binary files /dev/null and b/img/blog/2020-05-03-flink-sql-demo/image1.gif differ
diff --git a/img/blog/2020-05-03-flink-sql-demo/image1.png b/img/blog/2020-05-03-flink-sql-demo/image1.png
deleted file mode 100644
index 3bd1188..0000000
Binary files a/img/blog/2020-05-03-flink-sql-demo/image1.png and /dev/null differ
diff --git a/img/blog/2020-05-03-flink-sql-demo/image2.png b/img/blog/2020-05-03-flink-sql-demo/image2.png
deleted file mode 100644
index c38bc0f..0000000
Binary files a/img/blog/2020-05-03-flink-sql-demo/image2.png and /dev/null differ
diff --git a/img/blog/2020-05-03-flink-sql-demo/image3.png b/img/blog/2020-05-03-flink-sql-demo/image3.png
index 73bae84..7b5b162 100644
Binary files a/img/blog/2020-05-03-flink-sql-demo/image3.png and b/img/blog/2020-05-03-flink-sql-demo/image3.png differ
diff --git a/img/blog/2020-05-03-flink-sql-demo/image4.jpg b/img/blog/2020-05-03-flink-sql-demo/image4.jpg
new file mode 100644
index 0000000..bd5e281
Binary files /dev/null and b/img/blog/2020-05-03-flink-sql-demo/image4.jpg differ
diff --git a/img/blog/2020-05-03-flink-sql-demo/image4.png b/img/blog/2020-05-03-flink-sql-demo/image4.png
deleted file mode 100644
index b455a89..0000000
Binary files a/img/blog/2020-05-03-flink-sql-demo/image4.png and /dev/null differ
diff --git a/img/blog/2020-05-03-flink-sql-demo/image5.jpg b/img/blog/2020-05-03-flink-sql-demo/image5.jpg
new file mode 100644
index 0000000..722d3d9
Binary files /dev/null and b/img/blog/2020-05-03-flink-sql-demo/image5.jpg differ
diff --git a/img/blog/2020-05-03-flink-sql-demo/image5.png b/img/blog/2020-05-03-flink-sql-demo/image5.png
deleted file mode 100644
index dc7dd2e..0000000
Binary files a/img/blog/2020-05-03-flink-sql-demo/image5.png and /dev/null differ
diff --git a/img/blog/2020-05-03-flink-sql-demo/image6.jpg b/img/blog/2020-05-03-flink-sql-demo/image6.jpg
new file mode 100644
index 0000000..4482e4e
Binary files /dev/null and b/img/blog/2020-05-03-flink-sql-demo/image6.jpg differ
diff --git a/img/blog/2020-05-03-flink-sql-demo/image6.png b/img/blog/2020-05-03-flink-sql-demo/image6.png
deleted file mode 100644
index 0a23bfe..0000000
Binary files a/img/blog/2020-05-03-flink-sql-demo/image6.png and /dev/null differ
diff --git a/img/blog/2020-05-03-flink-sql-demo/image7.jpg b/img/blog/2020-05-03-flink-sql-demo/image7.jpg
new file mode 100644
index 0000000..42b9d68
Binary files /dev/null and b/img/blog/2020-05-03-flink-sql-demo/image7.jpg differ
diff --git a/img/blog/2020-05-03-flink-sql-demo/image7.png b/img/blog/2020-05-03-flink-sql-demo/image7.png
deleted file mode 100644
index 624b987..0000000
Binary files a/img/blog/2020-05-03-flink-sql-demo/image7.png and /dev/null differ
diff --git a/img/blog/2020-05-03-flink-sql-demo/image8.jpg b/img/blog/2020-05-03-flink-sql-demo/image8.jpg
new file mode 100644
index 0000000..b92de91
Binary files /dev/null and b/img/blog/2020-05-03-flink-sql-demo/image8.jpg differ
diff --git a/img/blog/2020-05-03-flink-sql-demo/image8.png b/img/blog/2020-05-03-flink-sql-demo/image8.png
deleted file mode 100644
index 5f440e1..0000000
Binary files a/img/blog/2020-05-03-flink-sql-demo/image8.png and /dev/null differ