You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by le...@apache.org on 2022/03/22 02:18:47 UTC

[flink] branch release-1.15 updated (559ed4c -> 46de6e9)

This is an automated email from the ASF dual-hosted git repository.

leonard pushed a change to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 559ed4c  [FLINK-25904][metrics] Lazily initialize Percentile
     new 50bfbdb  [FLINK-26422][docs-zh][table]update chinese doc with the new TablePipeline docs
     new 46de6e9  [hotfix][docs]add the missing ending symbol ‘;’

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../docs/connectors/datastream/filesystem.md       |  4 +-
 .../docs/connectors/datastream/formats/avro.md     |  2 +-
 .../docs/connectors/datastream/formats/hadoop.md   |  2 +-
 .../content.zh/docs/connectors/datastream/kafka.md | 16 ++++----
 .../docs/connectors/datastream/pulsar.md           | 42 ++++++++++-----------
 .../docs/deployment/filesystems/azure.md           |  2 +-
 docs/content.zh/docs/deployment/filesystems/oss.md |  2 +-
 docs/content.zh/docs/dev/dataset/examples.md       |  2 +-
 .../docs/dev/dataset/hadoop_compatibility.md       |  4 +-
 docs/content.zh/docs/dev/dataset/iterations.md     |  2 +-
 docs/content.zh/docs/dev/dataset/overview.md       | 22 +++++------
 .../content.zh/docs/dev/dataset/transformations.md |  2 +-
 .../docs/dev/datastream/application_parameters.md  |  2 +-
 .../docs/dev/datastream/execution/parallel.md      |  6 +--
 .../content.zh/docs/dev/datastream/experimental.md |  2 +-
 .../datastream/fault-tolerance/checkpointing.md    |  2 +-
 .../datastream/fault-tolerance/queryable_state.md  |  2 +-
 .../serialization/types_serialization.md           |  2 +-
 .../docs/dev/datastream/operators/joining.md       | 18 ++++-----
 .../docs/dev/datastream/operators/overview.md      |  2 +-
 .../dev/datastream/operators/process_function.md   |  6 +--
 docs/content.zh/docs/dev/datastream/overview.md    | 10 ++---
 docs/content.zh/docs/dev/datastream/sources.md     |  2 +-
 .../docs/dev/datastream/user_defined_functions.md  |  2 +-
 docs/content.zh/docs/dev/table/catalogs.md         |  2 +-
 docs/content.zh/docs/dev/table/common.md           | 42 ++++++++++++++-------
 docs/content.zh/docs/dev/table/config.md           |  2 +-
 docs/content.zh/docs/dev/table/data_stream_api.md  | 44 +++++++++++-----------
 .../docs/dev/table/functions/systemFunctions.md    |  2 +-
 docs/content.zh/docs/dev/table/tableApi.md         |  8 ++--
 docs/content.zh/docs/dev/table/tuning.md           |  6 +--
 .../docs/learn-flink/streaming_analytics.md        | 12 +++---
 docs/content.zh/docs/libs/cep.md                   | 22 +++++------
 docs/content.zh/docs/libs/gelly/bipartite_graph.md | 18 ++++-----
 docs/content.zh/docs/libs/gelly/graph_api.md       | 26 ++++++-------
 .../content.zh/docs/libs/gelly/graph_generators.md |  4 +-
 .../docs/libs/gelly/iterative_graph_processing.md  | 18 ++++-----
 docs/content.zh/docs/libs/gelly/library_methods.md |  2 +-
 docs/content.zh/docs/libs/state_processor_api.md   |  2 +-
 .../docs/connectors/dataset/formats/avro.md        |  2 +-
 .../docs/connectors/dataset/formats/hadoop.md      |  2 +-
 .../docs/connectors/datastream/filesystem.md       |  4 +-
 .../docs/connectors/datastream/formats/avro.md     |  2 +-
 docs/content/docs/connectors/datastream/jdbc.md    |  4 +-
 docs/content/docs/connectors/datastream/kafka.md   | 16 ++++----
 docs/content/docs/connectors/datastream/pulsar.md  | 42 ++++++++++-----------
 docs/content/docs/deployment/filesystems/azure.md  |  2 +-
 docs/content/docs/deployment/filesystems/oss.md    |  2 +-
 docs/content/docs/dev/dataset/examples.md          |  2 +-
 docs/content/docs/dev/dataset/hadoop_map_reduce.md |  2 +-
 docs/content/docs/dev/dataset/iterations.md        |  2 +-
 docs/content/docs/dev/dataset/overview.md          | 24 ++++++------
 docs/content/docs/dev/dataset/transformations.md   |  2 +-
 .../docs/dev/datastream/application_parameters.md  |  2 +-
 .../docs/dev/datastream/execution/parallel.md      |  6 +--
 docs/content/docs/dev/datastream/experimental.md   |  2 +-
 .../datastream/fault-tolerance/checkpointing.md    |  2 +-
 .../datastream/fault-tolerance/queryable_state.md  |  2 +-
 .../serialization/types_serialization.md           |  2 +-
 .../docs/dev/datastream/operators/joining.md       | 18 ++++-----
 .../docs/dev/datastream/operators/overview.md      |  2 +-
 .../dev/datastream/operators/process_function.md   |  6 +--
 docs/content/docs/dev/datastream/overview.md       | 18 ++++-----
 docs/content/docs/dev/datastream/sources.md        |  2 +-
 .../docs/dev/datastream/user_defined_functions.md  |  2 +-
 docs/content/docs/dev/table/catalogs.md            |  2 +-
 docs/content/docs/dev/table/common.md              | 10 ++---
 docs/content/docs/dev/table/config.md              |  2 +-
 docs/content/docs/dev/table/data_stream_api.md     | 32 ++++++++--------
 .../docs/dev/table/functions/systemFunctions.md    |  2 +-
 docs/content/docs/dev/table/tuning.md              |  6 +--
 .../docs/learn-flink/streaming_analytics.md        |  6 +--
 docs/content/docs/libs/cep.md                      | 30 +++++++--------
 docs/content/docs/libs/gelly/bipartite_graph.md    | 18 ++++-----
 docs/content/docs/libs/gelly/graph_api.md          | 30 +++++++--------
 docs/content/docs/libs/gelly/graph_generators.md   |  4 +-
 .../docs/libs/gelly/iterative_graph_processing.md  | 18 ++++-----
 docs/content/docs/libs/gelly/library_methods.md    |  2 +-
 docs/content/docs/libs/state_processor_api.md      |  2 +-
 79 files changed, 360 insertions(+), 344 deletions(-)

[flink] 01/02: [FLINK-26422][docs-zh][table]update chinese doc with the new TablePipeline docs

Posted by le...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 50bfbdbbf1fe817d33aca3a675904ce682203a12
Author: zoucao <zh...@hotmail.com>
AuthorDate: Fri Mar 4 14:24:03 2022 +0800

    [FLINK-26422][docs-zh][table]update chinese doc with the new TablePipeline docs
---
 docs/content.zh/docs/dev/table/common.md          | 36 ++++++++++++++++-------
 docs/content.zh/docs/dev/table/data_stream_api.md | 14 ++++-----
 docs/content.zh/docs/dev/table/tableApi.md        |  8 +++--
 docs/content/docs/dev/table/common.md             |  4 +--
 4 files changed, 39 insertions(+), 23 deletions(-)

diff --git a/docs/content.zh/docs/dev/table/common.md b/docs/content.zh/docs/dev/table/common.md
index a1d5539..dddac51 100644
--- a/docs/content.zh/docs/dev/table/common.md
+++ b/docs/content.zh/docs/dev/table/common.md
@@ -65,7 +65,7 @@ Table table2 = tableEnv.from("SourceTable");
 Table table3 = tableEnv.sqlQuery("SELECT * FROM SourceTable");
 
 // Emit a Table API result Table to a TableSink, same for SQL result
-TableResult tableResult = table2.executeInsert("SinkTable");
+TableResult tableResult = table2.insertInto("SinkTable").execute();
 ```
 {{< /tab >}}
 {{< tab "Scala" >}}
@@ -95,7 +95,7 @@ val table1 = tableEnv.from("SourceTable")
 val table2 = tableEnv.sqlQuery("SELECT * FROM SourceTable")
 
 // Emit a Table API result Table to a TableSink, same for SQL result
-val tableResult = table1.executeInsert("SinkTable")
+val tableResult = table1.insertInto("SinkTable").execute()
 ```
 {{< /tab >}}
 {{< tab "Python" >}}
@@ -645,7 +645,9 @@ Table API 和 SQL 查询的混用非常简单因为它们都返回 `Table` 对
 
 请参考文档 [Table Sources & Sinks]({{< ref "docs/dev/table/sourcesSinks" >}}) 以获取更多关于可用 Sink 的信息以及如何自定义 `DynamicTableSink`。
 
-方法 `Table.executeInsert(String tableName)` 将 `Table` 发送至已注册的 `TableSink`。该方法通过名称在 catalog 中查找 `TableSink` 并确认`Table` schema 和 `TableSink` schema 一致。
+方法 `Table.insertInto(String tableName)` 定义了一个完整的端到端管道将源表中的数据传输到一个被注册的输出表中。 
+该方法通过名称在 catalog 中查找输出表并确认 `Table` schema 和输出表 schema 一致。
+可以通过方法 `TablePipeline.explain()` 和 `TablePipeline.execute()` 分别来解释和执行一个数据流管道。
 
 下面的示例演示如何输出 `Table`:
 
@@ -673,8 +675,14 @@ tableEnv.createTemporaryTable("CsvSinkTable", TableDescriptor.forConnector("file
 // compute a result Table using Table API operators and/or SQL queries
 Table result = ...
 
+// Prepare the insert into pipeline
+TablePipeline pipeline = result.insertInto("CsvSinkTable");
+
+// Print explain details
+pipeline.printExplain();
+
 // emit the result Table to the registered TableSink
-result.executeInsert("CsvSinkTable");
+pipeline.execute();
 
 ```
 {{< /tab >}}
@@ -701,8 +709,14 @@ tableEnv.createTemporaryTable("CsvSinkTable", TableDescriptor.forConnector("file
 // compute a result Table using Table API operators and/or SQL queries
 val result: Table = ...
 
+// Prepare the insert into pipeline
+val pipeline = result.insertInto("CsvSinkTable")
+
+// Print explain details
+pipeline.printExplain()
+
 // emit the result Table to the registered TableSink
-result.executeInsert("CsvSinkTable")
+pipeline.execute()
 
 ```
 {{< /tab >}}
@@ -752,9 +766,9 @@ result.execute_insert("CsvSinkTable")
 Table API 或者 SQL 查询在下列情况下会被翻译:
 
 * 当 `TableEnvironment.executeSql()` 被调用时。该方法是用来执行一个 SQL 语句,一旦该方法被调用, SQL 语句立即被翻译。
-* 当 `Table.executeInsert()` 被调用时。该方法是用来将一个表的内容插入到目标表中,一旦该方法被调用, TABLE API 程序立即被翻译。
+* 当 `TablePipeline.execute()` 被调用时。该方法是用来执行一个源表到输出表的数据流,一旦该方法被调用, TABLE API 程序立即被翻译。
 * 当 `Table.execute()` 被调用时。该方法是用来将一个表的内容收集到本地,一旦该方法被调用, TABLE API 程序立即被翻译。
-* 当 `StatementSet.execute()` 被调用时。`Table` (通过 `StatementSet.addInsert()` 输出给某个 `Sink`)和 INSERT 语句 (通过调用 `StatementSet.addInsertSql()`)会先被缓存到 `StatementSet` 中,`StatementSet.execute()` 方法被调用时,所有的 sink 会被优化成一张有向无环图。
+* 当 `StatementSet.execute()` 被调用时。`TablePipeline` (通过 `StatementSet.add()` 输出给某个 `Sink`)和 INSERT 语句 (通过调用 `StatementSet.addInsertSql()`)会先被缓存到 `StatementSet` 中,`StatementSet.execute()` 方法被调用时,所有的 sink 会被优化成一张有向无环图。
 * 当 `Table` 被转换成 `DataStream` 时(参阅[与 DataStream 集成](#integration-with-datastream))。转换完成后,它就成为一个普通的 DataStream 程序,并会在调用 `StreamExecutionEnvironment.execute()` 时被执行。
 
 {{< top >}}
@@ -910,10 +924,10 @@ tEnv.createTemporaryTable("MySink2", TableDescriptor.forConnector("filesystem")
 StatementSet stmtSet = tEnv.createStatementSet();
 
 Table table1 = tEnv.from("MySource1").where($("word").like("F%"));
-stmtSet.addInsert("MySink1", table1);
+stmtSet.add(table1.insertInto("MySink1"));
 
 Table table2 = table1.unionAll(tEnv.from("MySource2"));
-stmtSet.addInsert("MySink2", table2);
+stmtSet.add(table2.insertInto("MySink2"));
 
 String explanation = stmtSet.explain();
 System.out.println(explanation);
@@ -954,10 +968,10 @@ tEnv.createTemporaryTable("MySink2", TableDescriptor.forConnector("filesystem")
 val stmtSet = tEnv.createStatementSet()
 
 val table1 = tEnv.from("MySource1").where($"word".like("F%"))
-stmtSet.addInsert("MySink1", table1)
+stmtSet.add(table1.insertInto("MySink1"))
 
 val table2 = table1.unionAll(tEnv.from("MySource2"))
-stmtSet.addInsert("MySink2", table2)
+stmtSet.add(table2.insertInto("MySink2"))
 
 val explanation = stmtSet.explain()
 println(explanation)
diff --git a/docs/content.zh/docs/dev/table/data_stream_api.md b/docs/content.zh/docs/dev/table/data_stream_api.md
index 3c91213..f04c561 100644
--- a/docs/content.zh/docs/dev/table/data_stream_api.md
+++ b/docs/content.zh/docs/dev/table/data_stream_api.md
@@ -598,13 +598,13 @@ pipeline or a statement set:
 
 ```java
 // execute with explicit sink
-tableEnv.from("InputTable").executeInsert("OutputTable")
+tableEnv.from("InputTable").insertInto("OutputTable").execute()
 
 tableEnv.executeSql("INSERT INTO OutputTable SELECT * FROM InputTable")
 
 tableEnv.createStatementSet()
-    .addInsert("OutputTable", tableEnv.from("InputTable"))
-    .addInsert("OutputTable2", tableEnv.from("InputTable"))
+    .add(tableEnv.from("InputTable").insertInto("OutputTable"))
+    .add(tableEnv.from("InputTable").insertInto("OutputTable2"))
     .execute()
 
 tableEnv.createStatementSet()
@@ -2562,12 +2562,12 @@ TableDescriptor sinkDescriptor = TableDescriptor.forConnector("print").build();
 
 // add a pure Table API pipeline
 Table tableFromSource = tableEnv.from(sourceDescriptor);
-statementSet.addInsert(sinkDescriptor, tableFromSource);
+statementSet.add(tableFromSource.insertInto(sinkDescriptor));
 
 // use table sinks for the DataStream API pipeline
 DataStream<Integer> dataStream = env.fromElements(1, 2, 3);
 Table tableFromStream = tableEnv.fromDataStream(dataStream);
-statementSet.addInsert(sinkDescriptor, tableFromStream);
+statementSet.add(tableFromStream.insertInto(sinkDescriptor));
 
 // attach both pipelines to StreamExecutionEnvironment
 // (the statement set will be cleared after calling this method)
@@ -2613,12 +2613,12 @@ val sinkDescriptor = TableDescriptor.forConnector("print").build
 
 // add a pure Table API pipeline
 val tableFromSource = tableEnv.from(sourceDescriptor)
-statementSet.addInsert(sinkDescriptor, tableFromSource)
+statementSet.add(tableFromSource.insertInto(sinkDescriptor))
 
 // use table sinks for the DataStream API pipeline
 val dataStream = env.fromElements(1, 2, 3)
 val tableFromStream = tableEnv.fromDataStream(dataStream)
-statementSet.addInsert(sinkDescriptor, tableFromStream)
+statementSet.add(tableFromStream.insertInto(sinkDescriptor))
 
 // attach both pipelines to StreamExecutionEnvironment
 // (the statement set will be cleared calling this method)
diff --git a/docs/content.zh/docs/dev/table/tableApi.md b/docs/content.zh/docs/dev/table/tableApi.md
index 860464a..65a9f9d 100644
--- a/docs/content.zh/docs/dev/table/tableApi.md
+++ b/docs/content.zh/docs/dev/table/tableApi.md
@@ -1455,7 +1455,9 @@ result3 = table.order_by(table.a.asc).offset(10).fetch(5)
 
 {{< label Batch >}} {{< label Streaming >}}
 
-和 SQL 查询中的 `INSERT INTO` 子句类似,该方法执行对已注册的输出表的插入操作。`executeInsert()` 方法将立即提交执行插入操作的 Flink job。
+和 SQL 查询中的 `INSERT INTO` 子句类似,该方法执行对已注册的输出表的插入操作。
+`insertInto()` 方法会将 `INSERT INTO` 转换为一个 `TablePipeline`。
+该数据流可以用 `TablePipeline.explain()` 来解释,用 `TablePipeline.execute()` 来执行。
 
 输出表必须已注册在 TableEnvironment(详见表连接器)中。此外,已注册表的 schema 必须与查询中的 schema 相匹配。
 
@@ -1463,13 +1465,13 @@ result3 = table.order_by(table.a.asc).offset(10).fetch(5)
 {{< tab "Java" >}}
 ```java
 Table orders = tableEnv.from("Orders");
-orders.executeInsert("OutOrders");
+orders.insertInto("OutOrders").execute();
 ```
 {{< /tab >}}
 {{< tab "Scala" >}}
 ```scala
 val orders = tableEnv.from("Orders")
-orders.executeInsert("OutOrders")
+orders.insertInto("OutOrders").execute()
 ```
 {{< /tab >}}
 {{< tab "Python" >}}
diff --git a/docs/content/docs/dev/table/common.md b/docs/content/docs/dev/table/common.md
index 5a783ad..0f8b309 100644
--- a/docs/content/docs/dev/table/common.md
+++ b/docs/content/docs/dev/table/common.md
@@ -921,10 +921,10 @@ tEnv.createTemporaryTable("MySink2", TableDescriptor.forConnector("filesystem")
 StatementSet stmtSet = tEnv.createStatementSet();
 
 Table table1 = tEnv.from("MySource1").where($("word").like("F%"));
-stmtSet.add(table1.insertInto("MySink1");
+stmtSet.add(table1.insertInto("MySink1"));
 
 Table table2 = table1.unionAll(tEnv.from("MySource2"));
-stmtSet.add(table2.insertInto("MySink2");
+stmtSet.add(table2.insertInto("MySink2"));
 
 String explanation = stmtSet.explain();
 System.out.println(explanation);

[flink] 02/02: [hotfix][docs]add the missing ending symbol ‘;’

Posted by le...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 46de6e9b560e56c31b6499aa4fc3a6d2db2602da
Author: zoucao <zh...@hotmail.com>
AuthorDate: Mon Mar 14 10:25:12 2022 +0800

    [hotfix][docs]add the missing ending symbol ‘;’
---
 .../docs/connectors/datastream/filesystem.md       |  4 +--
 .../docs/connectors/datastream/formats/avro.md     |  2 +-
 .../docs/connectors/datastream/formats/hadoop.md   |  2 +-
 .../content.zh/docs/connectors/datastream/kafka.md | 16 ++++-----
 .../docs/connectors/datastream/pulsar.md           | 42 +++++++++++-----------
 .../docs/deployment/filesystems/azure.md           |  2 +-
 docs/content.zh/docs/deployment/filesystems/oss.md |  2 +-
 docs/content.zh/docs/dev/dataset/examples.md       |  2 +-
 .../docs/dev/dataset/hadoop_compatibility.md       |  4 +--
 docs/content.zh/docs/dev/dataset/iterations.md     |  2 +-
 docs/content.zh/docs/dev/dataset/overview.md       | 22 ++++++------
 .../content.zh/docs/dev/dataset/transformations.md |  2 +-
 .../docs/dev/datastream/application_parameters.md  |  2 +-
 .../docs/dev/datastream/execution/parallel.md      |  6 ++--
 .../content.zh/docs/dev/datastream/experimental.md |  2 +-
 .../datastream/fault-tolerance/checkpointing.md    |  2 +-
 .../datastream/fault-tolerance/queryable_state.md  |  2 +-
 .../serialization/types_serialization.md           |  2 +-
 .../docs/dev/datastream/operators/joining.md       | 18 +++++-----
 .../docs/dev/datastream/operators/overview.md      |  2 +-
 .../dev/datastream/operators/process_function.md   |  6 ++--
 docs/content.zh/docs/dev/datastream/overview.md    | 10 +++---
 docs/content.zh/docs/dev/datastream/sources.md     |  2 +-
 .../docs/dev/datastream/user_defined_functions.md  |  2 +-
 docs/content.zh/docs/dev/table/catalogs.md         |  2 +-
 docs/content.zh/docs/dev/table/common.md           |  6 ++--
 docs/content.zh/docs/dev/table/config.md           |  2 +-
 docs/content.zh/docs/dev/table/data_stream_api.md  | 32 ++++++++---------
 .../docs/dev/table/functions/systemFunctions.md    |  2 +-
 docs/content.zh/docs/dev/table/tuning.md           |  6 ++--
 .../docs/learn-flink/streaming_analytics.md        | 12 +++----
 docs/content.zh/docs/libs/cep.md                   | 22 ++++++------
 docs/content.zh/docs/libs/gelly/bipartite_graph.md | 18 +++++-----
 docs/content.zh/docs/libs/gelly/graph_api.md       | 26 +++++++-------
 .../content.zh/docs/libs/gelly/graph_generators.md |  4 +--
 .../docs/libs/gelly/iterative_graph_processing.md  | 18 +++++-----
 docs/content.zh/docs/libs/gelly/library_methods.md |  2 +-
 docs/content.zh/docs/libs/state_processor_api.md   |  2 +-
 .../docs/connectors/dataset/formats/avro.md        |  2 +-
 .../docs/connectors/dataset/formats/hadoop.md      |  2 +-
 .../docs/connectors/datastream/filesystem.md       |  4 +--
 .../docs/connectors/datastream/formats/avro.md     |  2 +-
 docs/content/docs/connectors/datastream/jdbc.md    |  4 +--
 docs/content/docs/connectors/datastream/kafka.md   | 16 ++++-----
 docs/content/docs/connectors/datastream/pulsar.md  | 42 +++++++++++-----------
 docs/content/docs/deployment/filesystems/azure.md  |  2 +-
 docs/content/docs/deployment/filesystems/oss.md    |  2 +-
 docs/content/docs/dev/dataset/examples.md          |  2 +-
 docs/content/docs/dev/dataset/hadoop_map_reduce.md |  2 +-
 docs/content/docs/dev/dataset/iterations.md        |  2 +-
 docs/content/docs/dev/dataset/overview.md          | 24 ++++++-------
 docs/content/docs/dev/dataset/transformations.md   |  2 +-
 .../docs/dev/datastream/application_parameters.md  |  2 +-
 .../docs/dev/datastream/execution/parallel.md      |  6 ++--
 docs/content/docs/dev/datastream/experimental.md   |  2 +-
 .../datastream/fault-tolerance/checkpointing.md    |  2 +-
 .../datastream/fault-tolerance/queryable_state.md  |  2 +-
 .../serialization/types_serialization.md           |  2 +-
 .../docs/dev/datastream/operators/joining.md       | 18 +++++-----
 .../docs/dev/datastream/operators/overview.md      |  2 +-
 .../dev/datastream/operators/process_function.md   |  6 ++--
 docs/content/docs/dev/datastream/overview.md       | 18 +++++-----
 docs/content/docs/dev/datastream/sources.md        |  2 +-
 .../docs/dev/datastream/user_defined_functions.md  |  2 +-
 docs/content/docs/dev/table/catalogs.md            |  2 +-
 docs/content/docs/dev/table/common.md              |  6 ++--
 docs/content/docs/dev/table/config.md              |  2 +-
 docs/content/docs/dev/table/data_stream_api.md     | 32 ++++++++---------
 .../docs/dev/table/functions/systemFunctions.md    |  2 +-
 docs/content/docs/dev/table/tuning.md              |  6 ++--
 .../docs/learn-flink/streaming_analytics.md        |  6 ++--
 docs/content/docs/libs/cep.md                      | 30 ++++++++--------
 docs/content/docs/libs/gelly/bipartite_graph.md    | 18 +++++-----
 docs/content/docs/libs/gelly/graph_api.md          | 30 ++++++++--------
 docs/content/docs/libs/gelly/graph_generators.md   |  4 +--
 .../docs/libs/gelly/iterative_graph_processing.md  | 18 +++++-----
 docs/content/docs/libs/gelly/library_methods.md    |  2 +-
 docs/content/docs/libs/state_processor_api.md      |  2 +-
 78 files changed, 322 insertions(+), 322 deletions(-)

diff --git a/docs/content.zh/docs/connectors/datastream/filesystem.md b/docs/content.zh/docs/connectors/datastream/filesystem.md
index efd6d4c..b7514d1 100644
--- a/docs/content.zh/docs/connectors/datastream/filesystem.md
+++ b/docs/content.zh/docs/connectors/datastream/filesystem.md
@@ -68,10 +68,10 @@ under the License.
 {{< tab "Java" >}}
 ```java
 // 从文件流中读取文件内容
-FileSource.forRecordStreamFormat(StreamFormat,Path...)
+FileSource.forRecordStreamFormat(StreamFormat,Path...);
         
 // 从文件中一次读取一批记录
-FileSource.forBulkFileFormat(BulkFormat,Path...)
+FileSource.forBulkFileFormat(BulkFormat,Path...);
 ```
 {{< /tab >}}
 {{< /tabs >}}
diff --git a/docs/content.zh/docs/connectors/datastream/formats/avro.md b/docs/content.zh/docs/connectors/datastream/formats/avro.md
index 9e2190a..b8b4cea 100644
--- a/docs/content.zh/docs/connectors/datastream/formats/avro.md
+++ b/docs/content.zh/docs/connectors/datastream/formats/avro.md
@@ -52,7 +52,7 @@ DataStream<User> usersDS = env.createInput(users);
 注意,`User` 是一个通过 Avro schema生成的 POJO 类。Flink 还允许选择 POJO 中字符串类型的键。例如:
 
 ```java
-usersDS.keyBy("name")
+usersDS.keyBy("name");
 ```
 
 
diff --git a/docs/content.zh/docs/connectors/datastream/formats/hadoop.md b/docs/content.zh/docs/connectors/datastream/formats/hadoop.md
index b3077c1..bd19d3f 100644
--- a/docs/content.zh/docs/connectors/datastream/formats/hadoop.md
+++ b/docs/content.zh/docs/connectors/datastream/formats/hadoop.md
@@ -106,7 +106,7 @@ Flink 为 Hadoop `OutputFormats` 提供了一个兼容性包装器。支持任
 
 ```java
 // 获取我们希望发送的结果
-DataStream<Tuple2<Text, IntWritable>> hadoopResult = [...]
+DataStream<Tuple2<Text, IntWritable>> hadoopResult = [...];
 
 // 设置 the Hadoop TextOutputFormat。
 HadoopOutputFormat<Text, IntWritable> hadoopOF =
diff --git a/docs/content.zh/docs/connectors/datastream/kafka.md b/docs/content.zh/docs/connectors/datastream/kafka.md
index d064d60..caecbfe 100644
--- a/docs/content.zh/docs/connectors/datastream/kafka.md
+++ b/docs/content.zh/docs/connectors/datastream/kafka.md
@@ -74,18 +74,18 @@ env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
 Kafka Source 提供了 3 种 Topic / Partition 的订阅方式:
 - Topic 列表,订阅 Topic 列表中所有 Partition 的消息:
   ```java
-  KafkaSource.builder().setTopics("topic-a", "topic-b")
+  KafkaSource.builder().setTopics("topic-a", "topic-b");
   ```
 - 正则表达式匹配,订阅与正则表达式所匹配的 Topic 下的所有 Partition:
   ```java
-  KafkaSource.builder().setTopicPattern("topic.*")
+  KafkaSource.builder().setTopicPattern("topic.*");
   ```
 - Partition 列表,订阅指定的 Partition:
   ```java
   final HashSet<TopicPartition> partitionSet = new HashSet<>(Arrays.asList(
           new TopicPartition("topic-a", 0),    // Partition 0 of topic "topic-a"
           new TopicPartition("topic-b", 5)));  // Partition 5 of topic "topic-b"
-  KafkaSource.builder().setPartitions(partitionSet)
+  KafkaSource.builder().setPartitions(partitionSet);
   ```
 ### 消息解析
 代码中需要提供一个反序列化器(Deserializer)来对 Kafka 的消息进行解析。
@@ -119,7 +119,7 @@ KafkaSource.builder()
     // 从最早位点开始消费
     .setStartingOffsets(OffsetsInitializer.earliest())
     // 从最末尾位点开始消费
-    .setStartingOffsets(OffsetsInitializer.latest())
+    .setStartingOffsets(OffsetsInitializer.latest());
 ```
 如果内置的初始化器不能满足需求,也可以实现自定义的位点初始化器(```OffsetsInitializer```)。
 
@@ -153,7 +153,7 @@ Kafka consumer 的配置可以参考 [Apache Kafka 文档](http://kafka.apache.o
 ```java
 KafkaSource.builder()
     .setProperty("sasl.mechanism", "PLAIN")
-    .setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";")
+    .setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";");
 ```
 
 ### 动态分区检查
@@ -162,7 +162,7 @@ KafkaSource.builder()
 
 ```java
 KafkaSource.builder()
-    .setProperty("partition.discovery.interval.ms", "10000") // 每 10 秒检查一次新分区
+    .setProperty("partition.discovery.interval.ms", "10000"); // 每 10 秒检查一次新分区
 ```
 {{< hint warning >}}
 分区检查功能默认**不开启**。需要显式地设置分区检查间隔才能启用此功能。
@@ -172,7 +172,7 @@ KafkaSource.builder()
 默认情况下,Kafka Source 使用 Kafka 消息中的时间戳作为事件时间。您可以定义自己的水印策略(Watermark Strategy)
 以从消息中提取事件时间,并向下游发送水印:
 ```java
-env.fromSource(kafkaSource, new CustomWatermarkStrategy(), "Kafka Source With Custom Watermark Strategy")
+env.fromSource(kafkaSource, new CustomWatermarkStrategy(), "Kafka Source With Custom Watermark Strategy");
 ```
 [这篇文档]({{< ref "docs/dev/datastream/event-time/generating_watermarks.md" >}})描述了如何自定义水印策略(```WatermarkStrategy```)。
 
@@ -314,7 +314,7 @@ Kafka sink 提供了构建类来创建 ```KafkaSink``` 的实例。以下代码
 topic:
 
 ```java
-DataStream<String> stream = ...
+DataStream<String> stream = ...;
         
 KafkaSink<String> sink = KafkaSink.<String>builder()
         .setBootstrapServers(brokers)
diff --git a/docs/content.zh/docs/connectors/datastream/pulsar.md b/docs/content.zh/docs/connectors/datastream/pulsar.md
index 301f242..c14958a 100644
--- a/docs/content.zh/docs/connectors/datastream/pulsar.md
+++ b/docs/content.zh/docs/connectors/datastream/pulsar.md
@@ -78,15 +78,15 @@ Pulsar 数据源提供了两种订阅 topic 或 topic 分区的方式。
 
 - Topic 列表,从这个 Topic 的所有分区上消费消息,例如:
   ```java
-  PulsarSource.builder().setTopics("some-topic1", "some-topic2")
+  PulsarSource.builder().setTopics("some-topic1", "some-topic2");
 
   // 从 topic "topic-a" 的 0 和 1 分区上消费
-  PulsarSource.builder().setTopics("topic-a-partition-0", "topic-a-partition-2")
+  PulsarSource.builder().setTopics("topic-a-partition-0", "topic-a-partition-2");
   ```
 
 - Topic 正则,连接器使用给定的正则表达式匹配出所有合规的 topic,例如:
   ```java
-  PulsarSource.builder().setTopicPattern("topic-*")
+  PulsarSource.builder().setTopicPattern("topic-*");
   ```
 
 #### Topic 名称简写
@@ -146,21 +146,21 @@ Topic 名称 | 是否分区
 - 使用 Pulsar 的 [Schema](https://pulsar.apache.org/docs/en/schema-understand/) 解析消息。
   ```java
   // 基础数据类型
-  PulsarDeserializationSchema.pulsarSchema(Schema)
+  PulsarDeserializationSchema.pulsarSchema(Schema);
 
   // 结构类型 (JSON, Protobuf, Avro, etc.)
-  PulsarDeserializationSchema.pulsarSchema(Schema, Class)
+  PulsarDeserializationSchema.pulsarSchema(Schema, Class);
 
   // 键值对类型
-  PulsarDeserializationSchema.pulsarSchema(Schema, Class, Class)
+  PulsarDeserializationSchema.pulsarSchema(Schema, Class, Class);
   ```
 - 使用 Flink 的 `DeserializationSchema` 解析消息。
   ```java
-  PulsarDeserializationSchema.flinkSchema(DeserializationSchema)
+  PulsarDeserializationSchema.flinkSchema(DeserializationSchema);
   ```
 - 使用 Flink 的 `TypeInformation` 解析消息。
   ```java
-  PulsarDeserializationSchema.flinkTypeInfo(TypeInformation, ExecutionConfig)
+  PulsarDeserializationSchema.flinkTypeInfo(TypeInformation, ExecutionConfig);
   ```
 
 Pulsar 的 `Message<byte[]>` 包含了很多 [额外的属性](https://pulsar.apache.org/docs/zh-CN/concepts-messaging/#%E6%B6%88%E6%81%AF)。
@@ -185,10 +185,10 @@ Flink 使用 `TypeInformation` 将解析出来的结果序列化传递到下游
 
 ```java
 // 名为 "my-shared" 的共享订阅
-PulsarSource.builder().setSubscriptionName("my-shared")
+PulsarSource.builder().setSubscriptionName("my-shared");
 
 // 名为 "my-exclusive" 的独占订阅
-PulsarSource.builder().setSubscriptionName("my-exclusive").setSubscriptionType(SubscriptionType.Exclusive)
+PulsarSource.builder().setSubscriptionName("my-exclusive").setSubscriptionType(SubscriptionType.Exclusive);
 ```
 
 如果想在 Pulsar 连接器里面使用 `key 共享` 订阅,需要提供 `RangeGenerator` 实例。`RangeGenerator` 会生成一组消息 key 的 hash 范围,连接器会基于给定的范围来消费数据。
@@ -201,23 +201,23 @@ Pulsar 连接器也提供了一个名为 `UniformRangeGenerator` 的默认实现
 
 - 从 topic 里面最早的一条消息开始消费。
   ```java
-  StartCursor.earliest()
+  StartCursor.earliest();
   ```
 - 从 topic 里面最新的一条消息开始消费。
   ```java
-  StartCursor.latest()
+  StartCursor.latest();
   ```
 - 从给定的消息开始消费。
   ```java
-  StartCursor.fromMessageId(MessageId)
+  StartCursor.fromMessageId(MessageId);
   ```
 - 与前者不同的是,给定的消息可以跳过,再进行消费。
   ```java
-  StartCursor.fromMessageId(MessageId, boolean)
+  StartCursor.fromMessageId(MessageId, boolean);
   ```
 - 从给定的消息时间开始消费。
   ```java
-  StartCursor.fromMessageTime(long)
+  StartCursor.fromMessageTime(long);
   ```
 
 {{< hint info >}}
@@ -236,23 +236,23 @@ Pulsar 连接器同时支持流式和批的消费方式,默认情况下,连
 
 - 永不停止。
   ```java
-  StopCursor.never()
+  StopCursor.never();
   ```
 - 停止于 Pulsar 启动时 topic 里面最新的那条数据。
   ```java
-  StopCursor.latest()
+  StopCursor.latest();
   ```
 - 停止于某条消息,结果里不包含此消息。
   ```java
-  StopCursor.atMessageId(MessageId)
+  StopCursor.atMessageId(MessageId);
   ```
 - 停止于某条消息之后,结果里包含此消息。
   ```java
-  StopCursor.afterMessageId(MessageId)
+  StopCursor.afterMessageId(MessageId);
   ```
 - 停止于某个给定的消息时间戳。
   ```java
-  StopCursor.atEventTime(long)
+  StopCursor.atEventTime(long);
   ```
 
 ### 其他配置项
@@ -305,7 +305,7 @@ PulsarSource.builder()
 默认情况下,连接器使用 Pulsar 的 `Message<byte[]>` 里面的时间作为解析结果的时间戳。用户可以使用 `WatermarkStrategy` 来自行解析出想要的消息时间,并向下游传递对应的水位线。
 
 ```java
-env.fromSource(pulsarSource, new CustomWatermarkStrategy(), "Pulsar Source With Custom Watermark Strategy")
+env.fromSource(pulsarSource, new CustomWatermarkStrategy(), "Pulsar Source With Custom Watermark Strategy");
 ```
 
 [这篇文档]({{< ref "docs/dev/datastream/event-time/generating_watermarks.md" >}}) 详细讲解了如何定义 `WatermarkStrategy`。
diff --git a/docs/content.zh/docs/deployment/filesystems/azure.md b/docs/content.zh/docs/deployment/filesystems/azure.md
index 73bcf5c..42a0d09 100644
--- a/docs/content.zh/docs/deployment/filesystems/azure.md
+++ b/docs/content.zh/docs/deployment/filesystems/azure.md
@@ -61,7 +61,7 @@ abfss://<your-container>@$<your-azure-account>.dfs.core.windows.net/<object-path
 env.readTextFile("wasb://<your-container>@$<your-azure-account>.blob.core.windows.net/<object-path>");
 
 // 写入 Azure Blob 存储
-stream.writeAsText("wasb://<your-container>@$<your-azure-account>.blob.core.windows.net/<object-path>")
+stream.writeAsText("wasb://<your-container>@$<your-azure-account>.blob.core.windows.net/<object-path>");
 
 // 将 Azure Blob 存储用作 FsStatebackend
 env.setStateBackend(new FsStateBackend("wasb://<your-container>@$<your-azure-account>.blob.core.windows.net/<object-path>"));
diff --git a/docs/content.zh/docs/deployment/filesystems/oss.md b/docs/content.zh/docs/deployment/filesystems/oss.md
index 07dfb38..e1ca862 100644
--- a/docs/content.zh/docs/deployment/filesystems/oss.md
+++ b/docs/content.zh/docs/deployment/filesystems/oss.md
@@ -46,7 +46,7 @@ oss://<your-bucket>/<object-name>
 env.readTextFile("oss://<your-bucket>/<object-name>");
 
 // 写入 OSS bucket
-stream.writeAsText("oss://<your-bucket>/<object-name>")
+stream.writeAsText("oss://<your-bucket>/<object-name>");
 
 // 将 OSS 用作 FsStatebackend
 env.setStateBackend(new FsStateBackend("oss://<your-bucket>/<object-name>"));
diff --git a/docs/content.zh/docs/dev/dataset/examples.md b/docs/content.zh/docs/dev/dataset/examples.md
index e543ca1..08b23b1 100644
--- a/docs/content.zh/docs/dev/dataset/examples.md
+++ b/docs/content.zh/docs/dev/dataset/examples.md
@@ -135,7 +135,7 @@ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
 // 通过解析一个CSV文件来获取每个页面原始的rank值
 DataSet<Tuple2<Long, Double>> pagesWithRanks = env.readCsvFile(pagesInputPath)
-						   .types(Long.class, Double.class)
+						   .types(Long.class, Double.class);
 
 // 链接被编码为邻接表: (page-id, Array(neighbor-ids))
 DataSet<Tuple2<Long, Long[]>> pageLinkLists = getLinksDataSet(env);
diff --git a/docs/content.zh/docs/dev/dataset/hadoop_compatibility.md b/docs/content.zh/docs/dev/dataset/hadoop_compatibility.md
index 18851fa..737a78a 100644
--- a/docs/content.zh/docs/dev/dataset/hadoop_compatibility.md
+++ b/docs/content.zh/docs/dev/dataset/hadoop_compatibility.md
@@ -142,7 +142,7 @@ The following example shows how to use Hadoop's `TextOutputFormat`.
 
 ```java
 // Obtain the result we want to emit
-DataSet<Tuple2<Text, IntWritable>> hadoopResult = [...]
+DataSet<Tuple2<Text, IntWritable>> hadoopResult = [...];
 
 // Set up the Hadoop TextOutputFormat.
 HadoopOutputFormat<Text, IntWritable> hadoopOF =
@@ -198,7 +198,7 @@ The following example shows how to use Hadoop `Mapper` and `Reducer` functions.
 
 ```java
 // Obtain data to process somehow.
-DataSet<Tuple2<LongWritable, Text>> text = [...]
+DataSet<Tuple2<LongWritable, Text>> text = [...];
 
 DataSet<Tuple2<Text, LongWritable>> result = text
   // use Hadoop Mapper (Tokenizer) as MapFunction
diff --git a/docs/content.zh/docs/dev/dataset/iterations.md b/docs/content.zh/docs/dev/dataset/iterations.md
index c563e1d..194d341 100644
--- a/docs/content.zh/docs/dev/dataset/iterations.md
+++ b/docs/content.zh/docs/dev/dataset/iterations.md
@@ -171,7 +171,7 @@ IterationState solution = getInitialSolution();
 while (!terminationCriterion()) {
 	(delta, workset) = step(workset, solution);
 
-	solution.update(delta)
+	solution.update(delta);
 }
 
 setFinalState(solution);
diff --git a/docs/content.zh/docs/dev/dataset/overview.md b/docs/content.zh/docs/dev/dataset/overview.md
index 6b51e8e..b23771e 100644
--- a/docs/content.zh/docs/dev/dataset/overview.md
+++ b/docs/content.zh/docs/dev/dataset/overview.md
@@ -281,7 +281,7 @@ It removes the duplicate entries from the input DataSet, with respect to all fie
 {{< tabs "distinct" >}}
 {{< tab "Java" >}}
 ```java
-data.distinct()
+data.distinct();
 ```
 {{< /tab >}}
 {{< tab "Scala" >}}
@@ -431,7 +431,7 @@ Produces the union of two data sets.
 {{< tabs "union" >}}
 {{< tab "Java" >}}
 ```java
-data.union(data2)
+data.union(data2);
 ```
 {{< /tab >}}
 {{< tab "Scala" >}}
@@ -450,7 +450,7 @@ Only Map-like transformations may follow a rebalance transformation.
 {{< tab "Java" >}}
 ```java
 DataSet<Int> data1 = // [...]
-DataSet<Tuple2<Int, String>> result = data1.rebalance().map(...)
+DataSet<Tuple2<Int, String>> result = data1.rebalance().map(...);
 ```
 {{< /tab >}}
 {{< tab "Scala" >}}
@@ -651,7 +651,7 @@ The simplest case is grouping Tuples on one or more fields of the Tuple:
 {{< tab "Java" >}}
 ```java
 DataSet<Tuple3<Integer,String,Long>> input = // [...]
-UnsortedGrouping<Tuple3<Integer,String,Long>,Tuple> keyed = input.groupBy(0)
+UnsortedGrouping<Tuple3<Integer,String,Long>,Tuple> keyed = input.groupBy(0);
 ```
 {{< /tab >}}
 {{< tab "Scala" >}}
@@ -668,7 +668,7 @@ Tuples are grouped on the first field (the one of Integer type).
 {{< tab "Java" >}}
 ```java
 DataSet<Tuple3<Integer,String,Long>> input = // [...]
-UnsortedGrouping<Tuple3<Integer,String,Long>,Tuple> keyed = input.groupBy(0,1)
+UnsortedGrouping<Tuple3<Integer,String,Long>,Tuple> keyed = input.groupBy(0,1);
 ```
 {{< /tab >}}
 {{< tab "Scala" >}}
@@ -707,7 +707,7 @@ public class WC {
   public int count;
 }
 DataSet<WC> words = // [...]
-DataSet<WC> wordCounts = words.groupBy("word")
+DataSet<WC> wordCounts = words.groupBy("word");
 ```
 {{< /tab >}}
 {{< tab "Scala" >}}
@@ -1394,11 +1394,11 @@ final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
 DataSet<Integer> myInts = env.fromElements(1, 2, 3, 4, 5);
 
 // Create a DataSet from any Java collection
-List<Tuple2<String, Integer>> data = ...
+List<Tuple2<String, Integer>> data = ...;
 DataSet<Tuple2<String, Integer>> myTuples = env.fromCollection(data);
 
 // Create a DataSet from an Iterator
-Iterator<Long> longIt = ...
+Iterator<Long> longIt = ...;
 DataSet<Long> myLongs = env.fromCollection(longIt, Long.class);
 ```
 {{< /tab >}}
@@ -1496,14 +1496,14 @@ The distributed cache is used as follows:
 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
 // register a file from HDFS
-env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile")
+env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile");
 
 // register a local executable file (script, executable, ...)
-env.registerCachedFile("file:///path/to/exec/file", "localExecFile", true)
+env.registerCachedFile("file:///path/to/exec/file", "localExecFile", true);
 
 // define your program and execute
 ...
-DataSet<String> input = ...
+DataSet<String> input = ...;
 DataSet<Integer> result = input.map(new MyMapper());
 ...
 env.execute();
diff --git a/docs/content.zh/docs/dev/dataset/transformations.md b/docs/content.zh/docs/dev/dataset/transformations.md
index 0edbb8f..d58a382 100644
--- a/docs/content.zh/docs/dev/dataset/transformations.md
+++ b/docs/content.zh/docs/dev/dataset/transformations.md
@@ -204,7 +204,7 @@ DataSet<Tuple2<String, Integer>> out = in.project(2,0);
 Note that the Java compiler cannot infer the return type of `project` operator. This can cause a problem if you call another operator on a result of `project` operator such as:
 
 ```java
-DataSet<Tuple5<String,String,String,String,String>> ds = ....
+DataSet<Tuple5<String,String,String,String,String>> ds = ....;
 DataSet<Tuple1<String>> ds2 = ds.project(0).distinct(0);
 ```
 
diff --git a/docs/content.zh/docs/dev/datastream/application_parameters.md b/docs/content.zh/docs/dev/datastream/application_parameters.md
index 8d00b6e..a9ad2c1 100644
--- a/docs/content.zh/docs/dev/datastream/application_parameters.md
+++ b/docs/content.zh/docs/dev/datastream/application_parameters.md
@@ -84,7 +84,7 @@ ParameterTool parameters = // ...
 parameter.getRequired("input");
 parameter.get("output", "myDefaultValue");
 parameter.getLong("expectedCount", -1L);
-parameter.getNumberOfParameters()
+parameter.getNumberOfParameters();
 // .. there are more methods available.
 ```
 
diff --git a/docs/content.zh/docs/dev/datastream/execution/parallel.md b/docs/content.zh/docs/dev/datastream/execution/parallel.md
index bd5f336..d30495c 100644
--- a/docs/content.zh/docs/dev/datastream/execution/parallel.md
+++ b/docs/content.zh/docs/dev/datastream/execution/parallel.md
@@ -46,7 +46,7 @@ under the License.
 ```java
 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
-DataStream<String> text = [...]
+DataStream<String> text = [...];
 DataStream<Tuple2<String, Integer>> wordCounts = text
     .flatMap(new LineSplitter())
     .keyBy(value -> value.f0)
@@ -87,8 +87,8 @@ env.execute("Word Count Example")
 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 env.setParallelism(3);
 
-DataStream<String> text = [...]
-DataStream<Tuple2<String, Integer>> wordCounts = [...]
+DataStream<String> text = [...];
+DataStream<Tuple2<String, Integer>> wordCounts = [...];
 wordCounts.print();
 
 env.execute("Word Count Example");
diff --git a/docs/content.zh/docs/dev/datastream/experimental.md b/docs/content.zh/docs/dev/datastream/experimental.md
index 90212a5..aaeae8b 100644
--- a/docs/content.zh/docs/dev/datastream/experimental.md
+++ b/docs/content.zh/docs/dev/datastream/experimental.md
@@ -62,7 +62,7 @@ Code example:
 {{< tab "Java" >}}
 ```java
 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-DataStreamSource<Integer> source = ...
+DataStreamSource<Integer> source = ...;
 DataStreamUtils.reinterpretAsKeyedStream(source, (in) -> in, TypeInformation.of(Integer.class))
     .window(TumblingEventTimeWindows.of(Time.seconds(1)))
     .reduce((a, b) -> a + b)
diff --git a/docs/content.zh/docs/dev/datastream/fault-tolerance/checkpointing.md b/docs/content.zh/docs/dev/datastream/fault-tolerance/checkpointing.md
index faecd2c..650d735 100644
--- a/docs/content.zh/docs/dev/datastream/fault-tolerance/checkpointing.md
+++ b/docs/content.zh/docs/dev/datastream/fault-tolerance/checkpointing.md
@@ -90,7 +90,7 @@ env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
 env.getCheckpointConfig().setCheckpointTimeout(60000);
 
 // 允许两个连续的 checkpoint 错误
-env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2)
+env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);
         
 // 同一时间只允许一个 checkpoint 进行
 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
diff --git a/docs/content.zh/docs/dev/datastream/fault-tolerance/queryable_state.md b/docs/content.zh/docs/dev/datastream/fault-tolerance/queryable_state.md
index 3c2c7e5..4a9a908 100644
--- a/docs/content.zh/docs/dev/datastream/fault-tolerance/queryable_state.md
+++ b/docs/content.zh/docs/dev/datastream/fault-tolerance/queryable_state.md
@@ -98,7 +98,7 @@ QueryableStateStream asQueryableState(
 返回的 `QueryableStateStream` 可以被视作一个sink,而且**不能再**被进一步转换。在内部实现上,一个 `QueryableStateStream` 被转换成一个 operator,使用输入的数据来更新 queryable state。state 如何更新是由 `asQueryableState` 提供的 `StateDescriptor` 来决定的。在下面的代码中, keyed stream 的所有数据将会通过 `ValueState.update(value)` 来更新状态:
 
 ```java
-stream.keyBy(value -> value.f0).asQueryableState("query-name")
+stream.keyBy(value -> value.f0).asQueryableState("query-name");
 ```
 
 这个行为类似于 Scala API 中的 `flatMapWithState`。
diff --git a/docs/content.zh/docs/dev/datastream/fault-tolerance/serialization/types_serialization.md b/docs/content.zh/docs/dev/datastream/fault-tolerance/serialization/types_serialization.md
index ee1055e..c80f7b4 100644
--- a/docs/content.zh/docs/dev/datastream/fault-tolerance/serialization/types_serialization.md
+++ b/docs/content.zh/docs/dev/datastream/fault-tolerance/serialization/types_serialization.md
@@ -499,7 +499,7 @@ env.getConfig().enableForceKryo();
 
 If Kryo is not able to serialize your POJO, you can add a custom serializer to Kryo, using
 ```java
-env.getConfig().addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass)
+env.getConfig().addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass);
 ```
 
 There are different variants of these methods available.
diff --git a/docs/content.zh/docs/dev/datastream/operators/joining.md b/docs/content.zh/docs/dev/datastream/operators/joining.md
index 32695b7..4da2aa1 100644
--- a/docs/content.zh/docs/dev/datastream/operators/joining.md
+++ b/docs/content.zh/docs/dev/datastream/operators/joining.md
@@ -38,7 +38,7 @@ stream.join(otherStream)
     .where(<KeySelector>)
     .equalTo(<KeySelector>)
     .window(<WindowAssigner>)
-    .apply(<JoinFunction>)
+    .apply(<JoinFunction>);
 ```
 
 语义上有一些值得注意的地方:
@@ -63,8 +63,8 @@ import org.apache.flink.streaming.api.windowing.time.Time;
  
 ...
 
-DataStream<Integer> orangeStream = ...
-DataStream<Integer> greenStream = ...
+DataStream<Integer> orangeStream = ...;
+DataStream<Integer> greenStream = ...;
 
 orangeStream.join(greenStream)
     .where(<KeySelector>)
@@ -118,8 +118,8 @@ import org.apache.flink.streaming.api.windowing.time.Time;
 
 ...
 
-DataStream<Integer> orangeStream = ...
-DataStream<Integer> greenStream = ...
+DataStream<Integer> orangeStream = ...;
+DataStream<Integer> greenStream = ...;
 
 orangeStream.join(greenStream)
     .where(<KeySelector>)
@@ -171,8 +171,8 @@ import org.apache.flink.streaming.api.windowing.time.Time;
  
 ...
 
-DataStream<Integer> orangeStream = ...
-DataStream<Integer> greenStream = ...
+DataStream<Integer> orangeStream = ...;
+DataStream<Integer> greenStream = ...;
 
 orangeStream.join(greenStream)
     .where(<KeySelector>)
@@ -244,8 +244,8 @@ import org.apache.flink.streaming.api.windowing.time.Time;
 
 ...
 
-DataStream<Integer> orangeStream = ...
-DataStream<Integer> greenStream = ...
+DataStream<Integer> orangeStream = ...;
+DataStream<Integer> greenStream = ...;
 
 orangeStream
     .keyBy(<KeySelector>)
diff --git a/docs/content.zh/docs/dev/datastream/operators/overview.md b/docs/content.zh/docs/dev/datastream/operators/overview.md
index 3c62266..e0da3f5 100644
--- a/docs/content.zh/docs/dev/datastream/operators/overview.md
+++ b/docs/content.zh/docs/dev/datastream/operators/overview.md
@@ -771,7 +771,7 @@ Flink里的算子和作业节点会有一个名字和一个描述。名字和描
 {{< tabs namedescription>}}
 {{< tab "Java" >}}
 ```java
-someStream.filter(...).setName("filter").setDescription("x in (1, 2, 3, 4) and y > 1")
+someStream.filter(...).setName("filter").setDescription("x in (1, 2, 3, 4) and y > 1");
 ```
 {{< /tab >}}
 {{< tab "Scala" >}}
diff --git a/docs/content.zh/docs/dev/datastream/operators/process_function.md b/docs/content.zh/docs/dev/datastream/operators/process_function.md
index 314d1f3..0268308 100644
--- a/docs/content.zh/docs/dev/datastream/operators/process_function.md
+++ b/docs/content.zh/docs/dev/datastream/operators/process_function.md
@@ -53,7 +53,7 @@ to apply the `ProcessFunction` on a keyed stream:
 {{< /hint >}}
 
 ```java
-stream.keyBy(...).process(new MyProcessFunction())
+stream.keyBy(...).process(new MyProcessFunction());
 ```
 
 ## Low-level Joins
@@ -461,7 +461,7 @@ Stopping a processing-time timer:
 {{< tabs "5d0d1344-6f51-44f8-b500-ebe863cedba4" >}}
 {{< tab "Java" >}}
 ```java
-long timestampOfTimerToStop = ...
+long timestampOfTimerToStop = ...;
 ctx.timerService().deleteProcessingTimeTimer(timestampOfTimerToStop);
 ```
 {{< /tab >}}
@@ -484,7 +484,7 @@ Stopping an event-time timer:
 {{< tabs "581e5996-503c-452e-8b2a-a4daeaf4ac88" >}}
 {{< tab "Java" >}}
 ```java
-long timestampOfTimerToStop = ...
+long timestampOfTimerToStop = ...;
 ctx.timerService().deleteEventTimeTimer(timestampOfTimerToStop);
 ```
 {{< /tab >}}
diff --git a/docs/content.zh/docs/dev/datastream/overview.md b/docs/content.zh/docs/dev/datastream/overview.md
index b4e2059..164e622 100644
--- a/docs/content.zh/docs/dev/datastream/overview.md
+++ b/docs/content.zh/docs/dev/datastream/overview.md
@@ -69,11 +69,11 @@ Flink 程序看起来像一个转换 `DataStream` 的常规程序。每个程序
 `StreamExecutionEnvironment` 是所有 Flink 程序的基础。你可以使用 `StreamExecutionEnvironment` 的如下静态方法获取 `StreamExecutionEnvironment`:
 
 ```java
-getExecutionEnvironment()
+getExecutionEnvironment();
 
-createLocalEnvironment()
+createLocalEnvironment();
 
-createRemoteEnvironment(String host, int port, String... jarFiles)
+createRemoteEnvironment(String host, int port, String... jarFiles);
 ```
 
 通常,你只需要使用 `getExecutionEnvironment()` 即可,因为该方法会根据上下文做正确的处理:如果你在 IDE 中执行你的程序或将其作为一般的 Java 程序执行,那么它将创建一个本地环境,该环境将在你的本地机器上执行你的程序。如果你基于程序创建了一个 JAR 文件,并通过[命令行]({{< ref "docs/deployment/cli" >}})运行它,Flink 集群管理器将执行程序的 main 方法,同时 `getExecutionEnvironment()` 方法会返回一个执行环境以在集群上执行你的程序。
@@ -106,9 +106,9 @@ DataStream<Integer> parsed = input.map(new MapFunction<String, Integer>() {
 一旦你有了包含最终结果的 DataStream,你就可以通过创建 sink 把它写到外部系统。下面是一些用于创建 sink 的示例方法:
 
 ```java
-writeAsText(String path)
+writeAsText(String path);
 
-print()
+print();
 ```
 
 {{< /tab >}}
diff --git a/docs/content.zh/docs/dev/datastream/sources.md b/docs/content.zh/docs/dev/datastream/sources.md
index 0c3b7fe..b01c631 100644
--- a/docs/content.zh/docs/dev/datastream/sources.md
+++ b/docs/content.zh/docs/dev/datastream/sources.md
@@ -363,7 +363,7 @@ Source 的实现需要完成一部分*事件时间*分配和*水印生成*的工
 environment.fromSource(
     Source<OUT, ?, ?> source,
     WatermarkStrategy<OUT> timestampsAndWatermarks,
-    String sourceName)
+    String sourceName);
 ```
 
 `TimestampAssigner` 和 `WatermarkGenerator` 作为 `ReaderOutput`(或 `SourceOutput`)的一部分透明地运行,因此 Source 实现者不必实现任何时间戳提取和水印生成的代码。 
diff --git a/docs/content.zh/docs/dev/datastream/user_defined_functions.md b/docs/content.zh/docs/dev/datastream/user_defined_functions.md
index c8c0f3b..652fb5a 100644
--- a/docs/content.zh/docs/dev/datastream/user_defined_functions.md
+++ b/docs/content.zh/docs/dev/datastream/user_defined_functions.md
@@ -209,7 +209,7 @@ this.numLines.add(1);
 最终整体结果会存储在由执行环境的 `execute()` 方法返回的 ```JobExecutionResult``` 对象中(当前只有等待作业完成后执行才起作用)。
 
 ```java
-myJobExecutionResult.getAccumulatorResult("num-lines")
+myJobExecutionResult.getAccumulatorResult("num-lines");
 ```
 
 单个作业的所有累加器共享一个命名空间。因此你可以在不同的操作 function 里面使用同一个累加器。Flink 会在内部将所有具有相同名称的累加器合并起来。
diff --git a/docs/content.zh/docs/dev/table/catalogs.md b/docs/content.zh/docs/dev/table/catalogs.md
index f12f805..0b86fbd 100644
--- a/docs/content.zh/docs/dev/table/catalogs.md
+++ b/docs/content.zh/docs/dev/table/catalogs.md
@@ -70,7 +70,7 @@ Catalog 是可扩展的,用户可以通过实现 `Catalog` 接口来开发自
 {{< tabs "88ed733a-cf54-4676-9685-7d77d3cc9771" >}}
 {{< tab "Java" >}}
 ```java
-TableEnvironment tableEnv = ...
+TableEnvironment tableEnv = ...;
 
 // Create a HiveCatalog 
 Catalog catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>");
diff --git a/docs/content.zh/docs/dev/table/common.md b/docs/content.zh/docs/dev/table/common.md
index dddac51..02f0605 100644
--- a/docs/content.zh/docs/dev/table/common.md
+++ b/docs/content.zh/docs/dev/table/common.md
@@ -53,7 +53,7 @@ tableEnv.createTemporaryTable("SourceTable", TableDescriptor.forConnector("datag
       .column("f0", DataTypes.STRING())
       .build())
     .option(DataGenOptions.ROWS_PER_SECOND, 100)
-    .build())
+    .build());
 
 // Create a sink table (using SQL DDL)
 tableEnv.executeSql("CREATE TEMPORARY TABLE SinkTable WITH ('connector' = 'blackhole') LIKE SourceTable");
@@ -335,7 +335,7 @@ tableEnv.createTable("SourceTableA", sourceDescriptor);
 tableEnv.createTemporaryTable("SourceTableB", sourceDescriptor);
 
 // Using SQL DDL
-tableEnv.executeSql("CREATE [TEMPORARY] TABLE MyTable (...) WITH (...)")
+tableEnv.executeSql("CREATE [TEMPORARY] TABLE MyTable (...) WITH (...)");
 ```
 
 <a name="expanding-table-identifiers"></a>
@@ -673,7 +673,7 @@ tableEnv.createTemporaryTable("CsvSinkTable", TableDescriptor.forConnector("file
     .build());
 
 // compute a result Table using Table API operators and/or SQL queries
-Table result = ...
+Table result = ...;
 
 // Prepare the insert into pipeline
 TablePipeline pipeline = result.insertInto("CsvSinkTable");
diff --git a/docs/content.zh/docs/dev/table/config.md b/docs/content.zh/docs/dev/table/config.md
index 70939a9..07f69c7 100644
--- a/docs/content.zh/docs/dev/table/config.md
+++ b/docs/content.zh/docs/dev/table/config.md
@@ -46,7 +46,7 @@ Table 和 SQL API 的默认配置能够确保结果准确,同时也提供可
 {{< tab "Java" >}}
 ```java
 // instantiate table environment
-TableEnvironment tEnv = ...
+TableEnvironment tEnv = ...;
 
 // access flink configuration
 TableConfig configuration = tEnv.getConfig();
diff --git a/docs/content.zh/docs/dev/table/data_stream_api.md b/docs/content.zh/docs/dev/table/data_stream_api.md
index f04c561..9e30785 100644
--- a/docs/content.zh/docs/dev/table/data_stream_api.md
+++ b/docs/content.zh/docs/dev/table/data_stream_api.md
@@ -598,25 +598,25 @@ pipeline or a statement set:
 
 ```java
 // execute with explicit sink
-tableEnv.from("InputTable").insertInto("OutputTable").execute()
+tableEnv.from("InputTable").insertInto("OutputTable").execute();
 
-tableEnv.executeSql("INSERT INTO OutputTable SELECT * FROM InputTable")
+tableEnv.executeSql("INSERT INTO OutputTable SELECT * FROM InputTable");
 
 tableEnv.createStatementSet()
     .add(tableEnv.from("InputTable").insertInto("OutputTable"))
     .add(tableEnv.from("InputTable").insertInto("OutputTable2"))
-    .execute()
+    .execute();
 
 tableEnv.createStatementSet()
     .addInsertSql("INSERT INTO OutputTable SELECT * FROM InputTable")
     .addInsertSql("INSERT INTO OutputTable2 SELECT * FROM InputTable")
-    .execute()
+    .execute();
 
 // execute with implicit local sink
 
-tableEnv.from("InputTable").execute().print()
+tableEnv.from("InputTable").execute().print();
 
-tableEnv.executeSql("SELECT * FROM InputTable").print()
+tableEnv.executeSql("SELECT * FROM InputTable").print();
 ```
 
 To combine both execution behaviors, every call to `StreamTableEnvironment.toDataStream`
@@ -629,17 +629,17 @@ these "external parts".
 // (1)
 
 // adds a branch with a printing sink to the StreamExecutionEnvironment
-tableEnv.toDataStream(table).print()
+tableEnv.toDataStream(table).print();
 
 // (2)
 
 // executes a Table API end-to-end pipeline as a Flink job and prints locally,
 // thus (1) has still not been executed
-table.execute().print()
+table.execute().print();
 
 // executes the DataStream API pipeline with the sink defined in (1) as a
 // Flink job, (2) was already running before
-env.execute()
+env.execute();
 ```
 
 {{< top >}}
@@ -2774,7 +2774,7 @@ In particular, these parts might not be well integrated into many recent new fea
 {{< tab "Java" >}}
 ```java
 StreamTableEnvironment tableEnv = ...; 
-DataStream<Tuple2<Long, String>> stream = ...
+DataStream<Tuple2<Long, String>> stream = ...;
 
 Table table2 = tableEnv.fromDataStream(stream, $("myLong"), $("myString"));
 ```
@@ -2910,7 +2910,7 @@ Flink 的 DataStream API 支持多样的数据类型。
 ```java
 StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section;
 
-DataStream<Tuple2<Long, Integer>> stream = ...
+DataStream<Tuple2<Long, Integer>> stream = ...;
 
 // convert DataStream into Table with field "myLong" only
 Table table = tableEnv.fromDataStream(stream, $("myLong"));
@@ -2962,7 +2962,7 @@ table = t_env.from_data_stream(stream, col('my_long'), col('my_int'))
 ```java
 StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
 
-DataStream<Tuple2<Long, Integer>> stream = ...
+DataStream<Tuple2<Long, Integer>> stream = ...;
 
 // convert DataStream into Table with field "f1" only
 Table table = tableEnv.fromDataStream(stream, $("f1"));
@@ -3025,7 +3025,7 @@ Flink 将基础数据类型(`Integer`、`Double`、`String`)或者通用数
 ```java
 StreamTableEnvironment tableEnv = ...;
 
-DataStream<Long> stream = ...
+DataStream<Long> stream = ...;
 
 // Convert DataStream into Table with field name "myLong"
 Table table = tableEnv.fromDataStream(stream, $("myLong"));
@@ -3083,7 +3083,7 @@ tuple 的 DataStream 都能被转换成表。
 ```java
 StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
 
-DataStream<Tuple2<Long, String>> stream = ...
+DataStream<Tuple2<Long, String>> stream = ...;
 
 // convert DataStream into Table with renamed field names "myLong", "myString" (position-based)
 Table table = tableEnv.fromDataStream(stream, $("myLong"), $("myString"));
@@ -3178,7 +3178,7 @@ Flink 支持 POJO 类型作为复合类型。确定 POJO 类型的规则记录
 StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
 
 // Person is a POJO with fields "name" and "age"
-DataStream<Person> stream = ...
+DataStream<Person> stream = ...;
 
 // convert DataStream into Table with renamed fields "myAge", "myName" (name-based)
 Table table = tableEnv.fromDataStream(stream, $("age").as("myAge"), $("name").as("myName"));
@@ -3227,7 +3227,7 @@ Row 类型的字段映射支持基于名称和基于位置两种方式。
 StreamTableEnvironment tableEnv = ...; 
 
 // DataStream of Row with two fields "name" and "age" specified in `RowTypeInfo`
-DataStream<Row> stream = ...
+DataStream<Row> stream = ...;
 
 // Convert DataStream into Table with renamed field names "myName", "myAge" (position-based)
 Table table = tableEnv.fromDataStream(stream, $("myName"), $("myAge"));
diff --git a/docs/content.zh/docs/dev/table/functions/systemFunctions.md b/docs/content.zh/docs/dev/table/functions/systemFunctions.md
index 3a66018..69ab489 100644
--- a/docs/content.zh/docs/dev/table/functions/systemFunctions.md
+++ b/docs/content.zh/docs/dev/table/functions/systemFunctions.md
@@ -206,7 +206,7 @@ Known Limitations:
 ```java
 table
    .groupBy("withColumns(1 to 3)")
-   .select("withColumns(a to b), myUDAgg(myUDF(withColumns(5 to 20)))")
+   .select("withColumns(a to b), myUDAgg(myUDF(withColumns(5 to 20)))");
 ```
 {{< /tab >}}
 {{< tab "Scala" >}}
diff --git a/docs/content.zh/docs/dev/table/tuning.md b/docs/content.zh/docs/dev/table/tuning.md
index 5d3111a..266bfa3 100644
--- a/docs/content.zh/docs/dev/table/tuning.md
+++ b/docs/content.zh/docs/dev/table/tuning.md
@@ -59,7 +59,7 @@ Window TVF aggregation buffer records in [managed memory]({{< ref "docs/deployme
 {{< tab "Java" >}}
 ```java
 // instantiate table environment
-TableEnvironment tEnv = ...
+TableEnvironment tEnv = ...;
 
 // access flink configuration
 TableConfig configuration = tEnv.getConfig();
@@ -121,7 +121,7 @@ GROUP BY color
 {{< tab "Java" >}}
 ```java
 // instantiate table environment
-TableEnvironment tEnv = ...
+TableEnvironment tEnv = ...;
 
 // access flink configuration
 Configuration configuration = tEnv.getConfig().getConfiguration();
@@ -206,7 +206,7 @@ GROUP BY day
 {{< tab "Java" >}}
 ```java
 // instantiate table environment
-TableEnvironment tEnv = ...
+TableEnvironment tEnv = ...;
 
 tEnv.getConfig()
   .set("table.optimizer.distinct-agg.split.enabled", "true");  // enable distinct agg split
diff --git a/docs/content.zh/docs/learn-flink/streaming_analytics.md b/docs/content.zh/docs/learn-flink/streaming_analytics.md
index 2c0dc75..2273444 100644
--- a/docs/content.zh/docs/learn-flink/streaming_analytics.md
+++ b/docs/content.zh/docs/learn-flink/streaming_analytics.md
@@ -104,7 +104,7 @@ watermarks 给了开发者流处理的一种选择,它们使开发人员在开
 动手练习中使用的出租车数据源已经为我们处理了这些详细信息。但是,在您自己的应用程序中,您将必须自己进行处理,这通常是通过实现一个类来实现的,该类从事件中提取时间戳,并根据需要生成 watermarks。最简单的方法是使用 `WatermarkStrategy`:
 
 ```java
-DataStream<Event> stream = ...
+DataStream<Event> stream = ...;
 
 WatermarkStrategy<Event> strategy = WatermarkStrategy
         .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(20))
@@ -146,7 +146,7 @@ Flink 的窗口 API 还具有 _Triggers_ 和 _Evictors_ 的概念,_Triggers_ 
 stream.
     .keyBy(<key selector>)
     .window(<window assigner>)
-    .reduce|aggregate|process(<window function>)
+    .reduce|aggregate|process(<window function>);
 ```
 
 您不是必须使用键控事件流(keyed stream),但是值得注意的是,如果不使用键控事件流,我们的程序就不能 _并行_ 处理。
@@ -154,7 +154,7 @@ stream.
 ```java
 stream.
     .windowAll(<window assigner>)
-    .reduce|aggregate|process(<window function>)
+    .reduce|aggregate|process(<window function>);
 ```
 
 <a name="window-assigners"></a>
@@ -210,7 +210,7 @@ Flink 有一些内置的窗口分配器,如下所示:
 #### ProcessWindowFunction 示例
 
 ```java
-DataStream<SensorReading> input = ...
+DataStream<SensorReading> input = ...;
 
 input
     .keyBy(x -> x.key)
@@ -264,7 +264,7 @@ public abstract class Context implements java.io.Serializable {
 #### 增量聚合示例
 
 ```java
-DataStream<SensorReading> input = ...
+DataStream<SensorReading> input = ...;
 
 input
     .keyBy(x -> x.key)
@@ -361,7 +361,7 @@ stream
     .window(<window assigner>)
     .reduce(<reduce function>)
     .windowAll(<same window assigner>)
-    .reduce(<same reduce function>)
+    .reduce(<same reduce function>);
 ```
 
 可能我们会猜测以 Flink 的能力,想要做到这样看起来是可行的(前提是你使用的是 ReduceFunction 或 AggregateFunction ),但不是。
diff --git a/docs/content.zh/docs/libs/cep.md b/docs/content.zh/docs/libs/cep.md
index f2b6775..3bb7db0 100644
--- a/docs/content.zh/docs/libs/cep.md
+++ b/docs/content.zh/docs/libs/cep.md
@@ -64,7 +64,7 @@ FlinkCEP 不是二进制发布包的一部分。在集群上执行如何链接
 {{< tabs "4fef83d9-e4c5-4073-9607-4c8cde1ebf1e" >}}
 {{< tab "Java" >}}
 ```java
-DataStream<Event> input = ...
+DataStream<Event> input = ...;
 
 Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(
         new SimpleCondition<Event>() {
@@ -337,7 +337,7 @@ start.where(event => event.getName.startsWith("foo"))
 start.subtype(SubEvent.class).where(new SimpleCondition<SubEvent>() {
     @Override
     public boolean filter(SubEvent value) {
-        return ... // 一些判断条件
+        return ...; // 一些判断条件
     }
 });
 ```
@@ -358,12 +358,12 @@ start.subtype(classOf[SubEvent]).where(subEvent => ... /* 一些判断条件 */)
 pattern.where(new SimpleCondition<Event>() {
     @Override
     public boolean filter(Event value) {
-        return ... // 一些判断条件
+        return ...; // 一些判断条件
     }
 }).or(new SimpleCondition<Event>() {
     @Override
     public boolean filter(Event value) {
-        return ... // 一些判断条件
+        return ...; // 一些判断条件
     }
 });
 ```
@@ -1378,7 +1378,7 @@ pattern.within(Time.seconds(10))
 {{< tabs "e7240356-0fda-4a20-8b5a-7e4136753eca" >}}
 {{< tab "Java" >}}
 ```java
-AfterMatchSkipStrategy skipStrategy = ...
+AfterMatchSkipStrategy skipStrategy = ...;
 Pattern.begin("patternName", skipStrategy);
 ```
 {{< /tab >}}
@@ -1399,7 +1399,7 @@ Pattern.begin("patternName", skipStrategy)
 {{< tabs "48a6f23b-1861-4350-894d-0404d070cfb2" >}}
 {{< tab "Java" >}}
 ```java
-AfterMatchSkipStrategy.skipToFirst(patternName).throwExceptionOnMiss()
+AfterMatchSkipStrategy.skipToFirst(patternName).throwExceptionOnMiss();
 ```
 {{< /tab >}}
 {{< tab "Scala" >}}
@@ -1418,9 +1418,9 @@ AfterMatchSkipStrategy.skipToFirst(patternName).throwExceptionOnMiss()
 {{< tabs "c412e6ab-033c-496c-b72f-b351c056e365" >}}
 {{< tab "Java" >}}
 ```java
-DataStream<Event> input = ...
-Pattern<Event, ?> pattern = ...
-EventComparator<Event> comparator = ... // 可选的
+DataStream<Event> input = ...;
+Pattern<Event, ?> pattern = ...;
+EventComparator<Event> comparator = ...; // 可选的
 
 PatternStream<Event> patternStream = CEP.pattern(input, pattern, comparator);
 ```
@@ -1651,9 +1651,9 @@ public interface TimeContext {
 {{< tabs "01929551-b785-41f4-ab0d-b6369ce3cc41" >}}
 {{< tab "Java" >}}
 ```java
-StreamExecutionEnvironment env = ...
+StreamExecutionEnvironment env = ...;
 
-DataStream<Event> input = ...
+DataStream<Event> input = ...;
 
 DataStream<Event> partitionedInput = input.keyBy(new KeySelector<Event, Integer>() {
 	@Override
diff --git a/docs/content.zh/docs/libs/gelly/bipartite_graph.md b/docs/content.zh/docs/libs/gelly/bipartite_graph.md
index 2aba2ef..053f697 100644
--- a/docs/content.zh/docs/libs/gelly/bipartite_graph.md
+++ b/docs/content.zh/docs/libs/gelly/bipartite_graph.md
@@ -84,11 +84,11 @@ You can create a `BipartiteGraph` in the following ways:
 ```java
 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-DataSet<Vertex<String, Long>> topVertices = ...
+DataSet<Vertex<String, Long>> topVertices = ...;
 
-DataSet<Vertex<String, Long>> bottomVertices = ...
+DataSet<Vertex<String, Long>> bottomVertices = ...;
 
-DataSet<Edge<String, String, Double>> edges = ...
+DataSet<Edge<String, String, Double>> edges = ...;
 
 Graph<String, String, Long, Long, Double> graph = BipartiteGraph.fromDataSet(topVertices, bottomVertices, edges, env);
 ```
@@ -120,14 +120,14 @@ In the case of a simple projection each node in the result graph contains a pair
 ```java
 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 // Vertices (1, "top1")
-DataSet<Vertex<Long, String>> topVertices = ...
+DataSet<Vertex<Long, String>> topVertices = ...;
 
 // Vertices (2, "bottom2"); (4, "bottom4")
-DataSet<Vertex<Long, String>> bottomVertices = ...
+DataSet<Vertex<Long, String>> bottomVertices = ...;
 
 // Edge that connect vertex 2 to vertex 1 and vertex 4 to vertex 1:
 // (1, 2, "1-2-edge"); (1, 4, "1-4-edge")
-DataSet<Edge<Long, Long, String>> edges = ...
+DataSet<Edge<Long, Long, String>> edges = ...;
 
 BipartiteGraph<Long, Long, String, String, String> graph = BipartiteGraph.fromDataSet(topVertices, bottomVertices, edges, env);
 
@@ -155,14 +155,14 @@ Full projection preserves all the information about the connection between two v
 ```java
 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 // Vertices (1, "top1")
-DataSet<Vertex<Long, String>> topVertices = ...
+DataSet<Vertex<Long, String>> topVertices = ...;
 
 // Vertices (2, "bottom2"); (4, "bottom4")
-DataSet<Vertex<Long, String>> bottomVertices = ...
+DataSet<Vertex<Long, String>> bottomVertices = ...;
 
 // Edge that connect vertex 2 to vertex 1 and vertex 4 to vertex 1:
 // (1, 2, "1-2-edge"); (1, 4, "1-4-edge")
-DataSet<Edge<Long, Long, String>> edges = ...
+DataSet<Edge<Long, Long, String>> edges = ...;
 
 BipartiteGraph<Long, Long, String, String, String> graph = BipartiteGraph.fromDataSet(topVertices, bottomVertices, edges, env);
 
diff --git a/docs/content.zh/docs/libs/gelly/graph_api.md b/docs/content.zh/docs/libs/gelly/graph_api.md
index 998e6b6..c1e853d 100644
--- a/docs/content.zh/docs/libs/gelly/graph_api.md
+++ b/docs/content.zh/docs/libs/gelly/graph_api.md
@@ -96,9 +96,9 @@ You can create a `Graph` in the following ways:
 ```java
 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-DataSet<Vertex<String, Long>> vertices = ...
+DataSet<Vertex<String, Long>> vertices = ...;
 
-DataSet<Edge<String, Double>> edges = ...
+DataSet<Edge<String, Double>> edges = ...;
 
 Graph<String, Long, Double> graph = Graph.fromDataSet(vertices, edges, env);
 ```
@@ -231,9 +231,9 @@ val simpleGraph = Graph.fromCsvReader[Long, Double, NullValue](
 ```java
 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-List<Vertex<Long, Long>> vertexList = new ArrayList...
+List<Vertex<Long, Long>> vertexList = new ArrayList...;
 
-List<Edge<Long, String>> edgeList = new ArrayList...
+List<Edge<Long, String>> edgeList = new ArrayList...;
 
 Graph<Long, Long, String> graph = Graph.fromCollection(vertexList, edgeList, env);
 ```
@@ -428,7 +428,7 @@ val updatedGraph = graph.translateGraphIds(id => id.toString)
 {{< tabs "b33fe8f8-8a53-4710-9379-8d2f912a3105" >}}
 {{< tab "Java" >}}
 ```java
-Graph<Long, Long, Long> graph = ...
+Graph<Long, Long, Long> graph = ...;
 
 graph.subgraph(
 		new FilterFunction<Vertex<Long, Long>>() {
@@ -467,7 +467,7 @@ Note that if the input dataset contains a key multiple times, all Gelly join met
 {{< tabs "219b4d15-4be2-4bbf-a3ea-4155d3f6ba27" >}}
 {{< tab "Java" >}}
 ```java
-Graph<Long, Double, Double> network = ...
+Graph<Long, Double, Double> network = ...;
 
 DataSet<Tuple2<Long, LongValue>> vertexOutDegrees = network.outDegrees();
 
@@ -519,11 +519,11 @@ val networkWithWeights = network.joinWithEdgesOnSource(vertexOutDegrees, (v1: Do
 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
 // create first graph from edges {(1, 3, 12) (1, 3, 13), (1, 3, 13)}
-List<Edge<Long, Long>> edges1 = ...
+List<Edge<Long, Long>> edges1 = ...;
 Graph<Long, NullValue, Long> graph1 = Graph.fromCollection(edges1, env);
 
 // create second graph from edges {(1, 3, 13)}
-List<Edge<Long, Long>> edges2 = ...
+List<Edge<Long, Long>> edges2 = ...;
 Graph<Long, NullValue, Long> graph2 = Graph.fromCollection(edges2, env);
 
 // Using distinct = true results in {(1,3,13)}
@@ -638,7 +638,7 @@ The following code will collect the out-edges for each vertex and apply the `Sel
 {{< tabs "8ab0141f-ed3d-4372-bfab-7f78ed6d7d5f" >}}
 {{< tab "Java" >}}
 ```java
-Graph<Long, Long, Double> graph = ...
+Graph<Long, Long, Double> graph = ...;
 
 DataSet<Tuple2<Long, Double>> minWeights = graph.reduceOnEdges(new SelectMinWeight(), EdgeDirection.OUT);
 
@@ -677,7 +677,7 @@ Similarly, assume that you would like to compute the sum of the values of all in
 {{< tabs "67e6fe66-aef8-46b8-8e80-2762dd5c3f02" >}}
 {{< tab "Java" >}}
 ```java
-Graph<Long, Long, Double> graph = ...
+Graph<Long, Long, Double> graph = ...;
 
 DataSet<Tuple2<Long, Long>> verticesWithSum = graph.reduceOnNeighbors(new SumValues(), EdgeDirection.IN);
 
@@ -720,7 +720,7 @@ For example, the following code will output all the vertex pairs which are conne
 {{< tabs "2cf7a021-b67a-42dc-912f-ef79f36314b2" >}}
 {{< tab "Java" >}}
 ```java
-Graph<Long, Long, Double> graph = ...
+Graph<Long, Long, Double> graph = ...;
 
 DataSet<Tuple2<Vertex<Long, Long>, Vertex<Long, Long>>> vertexPairs = graph.groupReduceOnNeighbors(new SelectLargeWeightNeighbors(), EdgeDirection.OUT);
 
@@ -783,10 +783,10 @@ also exist in the vertex IDs set.
 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
 // create a list of vertices with IDs = {1, 2, 3, 4, 5}
-List<Vertex<Long, Long>> vertices = ...
+List<Vertex<Long, Long>> vertices = ...;
 
 // create a list of edges with IDs = {(1, 2) (1, 3), (2, 4), (5, 6)}
-List<Edge<Long, Long>> edges = ...
+List<Edge<Long, Long>> edges = ...;
 
 Graph<Long, Long, Long> graph = Graph.fromCollection(vertices, edges, env);
 
diff --git a/docs/content.zh/docs/libs/gelly/graph_generators.md b/docs/content.zh/docs/libs/gelly/graph_generators.md
index 08a6dd7..d823120 100644
--- a/docs/content.zh/docs/libs/gelly/graph_generators.md
+++ b/docs/content.zh/docs/libs/gelly/graph_generators.md
@@ -304,7 +304,7 @@ two `endpoint` vertices with degree `1` and all midpoint vertices with degree
 ```java
 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-long vertexCount = 5
+long vertexCount = 5;
 
 Graph<LongValue, NullValue, NullValue> graph = new PathGraph(env, vertexCount)
     .generate();
@@ -417,7 +417,7 @@ An undirected graph containing isolated two-paths where every vertex has degree
 ```java
 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-long vertexPairCount = 4
+long vertexPairCount = 4;
 
 // note: configured with the number of vertex pairs
 Graph<LongValue, NullValue, NullValue> graph = new SingletonEdgeGraph(env, vertexPairCount)
diff --git a/docs/content.zh/docs/libs/gelly/iterative_graph_processing.md b/docs/content.zh/docs/libs/gelly/iterative_graph_processing.md
index 99be5f9..e53cbd0 100644
--- a/docs/content.zh/docs/libs/gelly/iterative_graph_processing.md
+++ b/docs/content.zh/docs/libs/gelly/iterative_graph_processing.md
@@ -50,7 +50,7 @@ Let us consider computing Single-Source-Shortest-Paths with vertex-centric itera
 {{< tab "Java" >}}
 ```java
 // read the input graph
-Graph<Long, Double, Double> graph = ...
+Graph<Long, Double, Double> graph = ...;
 
 // define the maximum number of iterations
 int maxIterations = 10;
@@ -178,7 +178,7 @@ all aggregates globally once per superstep and makes them available in the next
 {{< tab "Java" >}}
 ```java
 
-Graph<Long, Double, Double> graph = ...
+Graph<Long, Double, Double> graph = ...;
 
 // configure the iteration
 VertexCentricConfiguration parameters = new VertexCentricConfiguration();
@@ -296,7 +296,7 @@ Let us consider computing Single-Source-Shortest-Paths with scatter-gather itera
 {{< tab "Java" >}}
 ```java
 // read the input graph
-Graph<Long, Double, Double> graph = ...
+Graph<Long, Double, Double> graph = ...;
 
 // define the maximum number of iterations
 int maxIterations = 10;
@@ -421,7 +421,7 @@ If the degrees option is not set in the configuration, these methods will return
 {{< tab "Java" >}}
 ```java
 
-Graph<Long, Double, Double> graph = ...
+Graph<Long, Double, Double> graph = ...;
 
 // configure the iteration
 ScatterGatherConfiguration parameters = new ScatterGatherConfiguration();
@@ -525,7 +525,7 @@ The following example illustrates the usage of the degree as well as the number
 {{< tab "Java" >}}
 ```java
 
-Graph<Long, Double, Double> graph = ...
+Graph<Long, Double, Double> graph = ...;
 
 // configure the iteration
 ScatterGatherConfiguration parameters = new ScatterGatherConfiguration();
@@ -599,7 +599,7 @@ The following example illustrates the usage of the edge direction option. Vertic
 {{< tabs "5efb0e23-7fd4-4e08-9952-981d6e4f3b9e" >}}
 {{< tab "Java" >}}
 ```java
-Graph<Long, HashSet<Long>, Double> graph = ...
+Graph<Long, HashSet<Long>, Double> graph = ...;
 
 // configure the iteration
 ScatterGatherConfiguration parameters = new ScatterGatherConfiguration();
@@ -664,7 +664,7 @@ To implement this example in Gelly GSA, the user only needs to call the `runGath
 {{< tab "Java" >}}
 ```java
 // read the input graph
-Graph<Long, Double, Double> graph = ...
+Graph<Long, Double, Double> graph = ...;
 
 // define the maximum number of iterations
 int maxIterations = 10;
@@ -786,7 +786,7 @@ The following example illustrates the usage of the number of vertices option.
 {{< tab "Java" >}}
 ```java
 
-Graph<Long, Double, Double> graph = ...
+Graph<Long, Double, Double> graph = ...;
 
 // configure the iteration
 GSAConfiguration parameters = new GSAConfiguration();
@@ -868,7 +868,7 @@ The following example illustrates the usage of the edge direction option.
 {{< tab "Java" >}}
 ```java
 
-Graph<Long, HashSet<Long>, Double> graph = ...
+Graph<Long, HashSet<Long>, Double> graph = ...;
 
 // configure the iteration
 GSAConfiguration parameters = new GSAConfiguration();
diff --git a/docs/content.zh/docs/libs/gelly/library_methods.md b/docs/content.zh/docs/libs/gelly/library_methods.md
index 2cd0266..1df9887 100644
--- a/docs/content.zh/docs/libs/gelly/library_methods.md
+++ b/docs/content.zh/docs/libs/gelly/library_methods.md
@@ -37,7 +37,7 @@ Gelly's library methods can be used by simply calling the `run()` method on the
 ```java
 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-Graph<Long, Long, NullValue> graph = ...
+Graph<Long, Long, NullValue> graph = ...;
 
 // run Label Propagation for 30 iterations to detect communities on the input graph
 DataSet<Vertex<Long, Long>> verticesWithCommunity = graph.run(new LabelPropagation<Long>(30));
diff --git a/docs/content.zh/docs/libs/state_processor_api.md b/docs/content.zh/docs/libs/state_processor_api.md
index 42dd32d..b220fe7 100644
--- a/docs/content.zh/docs/libs/state_processor_api.md
+++ b/docs/content.zh/docs/libs/state_processor_api.md
@@ -262,7 +262,7 @@ class ClickCounter implements AggregateFunction<Click, Integer, Integer> {
 	}
 }
 
-DataStream<Click> clicks = . . . 
+DataStream<Click> clicks = ...;
 
 clicks
     .keyBy(click -> click.userId)
diff --git a/docs/content/docs/connectors/dataset/formats/avro.md b/docs/content/docs/connectors/dataset/formats/avro.md
index 7320587..5adb669 100644
--- a/docs/content/docs/connectors/dataset/formats/avro.md
+++ b/docs/content/docs/connectors/dataset/formats/avro.md
@@ -51,7 +51,7 @@ DataSet<User> usersDS = env.createInput(users);
 Note that `User` is a POJO generated by Avro. Flink also allows to perform string-based key selection of these POJOs. For example:
 
 ```java
-usersDS.keyBy("name")
+usersDS.keyBy("name");
 ```
 
 
diff --git a/docs/content/docs/connectors/dataset/formats/hadoop.md b/docs/content/docs/connectors/dataset/formats/hadoop.md
index b517c98..be52054 100644
--- a/docs/content/docs/connectors/dataset/formats/hadoop.md
+++ b/docs/content/docs/connectors/dataset/formats/hadoop.md
@@ -117,7 +117,7 @@ The following example shows how to use Hadoop's `TextOutputFormat`.
 
 ```java
 // Obtain the result we want to emit
-DataSet<Tuple2<Text, IntWritable>> hadoopResult = [...]
+DataSet<Tuple2<Text, IntWritable>> hadoopResult = [...];
 
 // Set up the Hadoop TextOutputFormat.
 HadoopOutputFormat<Text, IntWritable> hadoopOF =
diff --git a/docs/content/docs/connectors/datastream/filesystem.md b/docs/content/docs/connectors/datastream/filesystem.md
index 4c6a8c2..1a116e0 100644
--- a/docs/content/docs/connectors/datastream/filesystem.md
+++ b/docs/content/docs/connectors/datastream/filesystem.md
@@ -65,10 +65,10 @@ You can start building a File Source via one of the following API calls:
 {{< tab "Java" >}}
 ```java
 // reads the contents of a file from a file stream. 
-FileSource.forRecordStreamFormat(StreamFormat,Path...)
+FileSource.forRecordStreamFormat(StreamFormat,Path...);
         
 // reads batches of records from a file at a time
-FileSource.forBulkFileFormat(BulkFormat,Path...)
+FileSource.forBulkFileFormat(BulkFormat,Path...);
 ```
 {{< /tab >}}
 {{< /tabs >}}
diff --git a/docs/content/docs/connectors/datastream/formats/avro.md b/docs/content/docs/connectors/datastream/formats/avro.md
index 1b2ffef..f6956e2 100644
--- a/docs/content/docs/connectors/datastream/formats/avro.md
+++ b/docs/content/docs/connectors/datastream/formats/avro.md
@@ -51,7 +51,7 @@ DataStream<User> usersDS = env.createInput(users);
 Note that `User` is a POJO generated by Avro. Flink also allows to perform string-based key selection of these POJOs. For example:
 
 ```java
-usersDS.keyBy("name")
+usersDS.keyBy("name");
 ```
 
 
diff --git a/docs/content/docs/connectors/datastream/jdbc.md b/docs/content/docs/connectors/datastream/jdbc.md
index bc96d23..85c6723 100644
--- a/docs/content/docs/connectors/datastream/jdbc.md
+++ b/docs/content/docs/connectors/datastream/jdbc.md
@@ -73,7 +73,7 @@ JdbcExecutionOptions.builder()
         .withBatchIntervalMs(200)             // optional: default = 0, meaning no time-based execution is done
         .withBatchSize(1000)                  // optional: default = 5000 values
         .withMaxRetries(5)                    // optional: default = 3 
-.build()
+.build();
 ```
 
 A JDBC batch is executed as soon as one of the following conditions is true:
@@ -184,7 +184,7 @@ In such cases, please use the following API to construct `JdbcExactlyOnceOptions
 ```java
 JdbcExactlyOnceOptions.builder()
 .withTransactionPerConnection(true)
-.build()
+.build();
 ```
 This will make Flink use a separate connection for every XA transaction. This may require adjusting connection limits.
 For PostgreSQL and MySQL, this can be done by increasing `max_connections`.
diff --git a/docs/content/docs/connectors/datastream/kafka.md b/docs/content/docs/connectors/datastream/kafka.md
index ff48b0a..8a623fc 100644
--- a/docs/content/docs/connectors/datastream/kafka.md
+++ b/docs/content/docs/connectors/datastream/kafka.md
@@ -73,19 +73,19 @@ The following properties are **required** for building a KafkaSource:
 Kafka source provide 3 ways of topic-partition subscription:
 - Topic list, subscribing messages from all partitions in a list of topics. For example:
   ```java
-  KafkaSource.builder().setTopics("topic-a", "topic-b")
+  KafkaSource.builder().setTopics("topic-a", "topic-b");
   ```
 - Topic pattern, subscribing messages from all topics whose name matches the provided regular
   expression. For example:
   ```java
-  KafkaSource.builder().setTopicPattern("topic.*")
+  KafkaSource.builder().setTopicPattern("topic.*");
   ```
 - Partition set, subscribing partitions in the provided partition set. For example:
   ```java
   final HashSet<TopicPartition> partitionSet = new HashSet<>(Arrays.asList(
           new TopicPartition("topic-a", 0),    // Partition 0 of topic "topic-a"
           new TopicPartition("topic-b", 5)));  // Partition 5 of topic "topic-b"
-  KafkaSource.builder().setPartitions(partitionSet)
+  KafkaSource.builder().setPartitions(partitionSet);
   ```
 ### Deserializer
 A deserializer is required for parsing Kafka messages. Deserializer (Deserialization schema) can be
@@ -121,7 +121,7 @@ KafkaSource.builder()
     // Start from earliest offset
     .setStartingOffsets(OffsetsInitializer.earliest())
     // Start from latest offset
-    .setStartingOffsets(OffsetsInitializer.latest())
+    .setStartingOffsets(OffsetsInitializer.latest());
 ```
 
 You can also implement a custom offsets initializer if built-in initializers above cannot fulfill
@@ -170,7 +170,7 @@ JAAS configuration:
 ```java
 KafkaSource.builder()
     .setProperty("sasl.mechanism", "PLAIN")
-    .setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";")
+    .setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";");
 ```
 
 ### Dynamic Partition Discovery
@@ -180,7 +180,7 @@ topic-partition subscribing pattern. To enable partition discovery, set a non-ne
 property ```partition.discovery.interval.ms```:
 ```java
 KafkaSource.builder()
-    .setProperty("partition.discovery.interval.ms", "10000") // discover new partitions per 10 seconds
+    .setProperty("partition.discovery.interval.ms", "10000"); // discover new partitions per 10 seconds
 ```
 {{< hint warning >}}
 Partition discovery is **disabled** by default. You need to explicitly set the partition discovery
@@ -192,7 +192,7 @@ By default, the record will use the timestamp embedded in Kafka ```ConsumerRecor
 time. You can define your own ```WatermarkStrategy``` for extract event time from the record itself,
 and emit watermark downstream:
 ```java
-env.fromSource(kafkaSource, new CustomWatermarkStrategy(), "Kafka Source With Custom Watermark Strategy")
+env.fromSource(kafkaSource, new CustomWatermarkStrategy(), "Kafka Source With Custom Watermark Strategy");
 ```
 [This documentation]({{< ref "docs/dev/datastream/event-time/generating_watermarks.md" >}}) describes
 details about how to define a ```WatermarkStrategy```.
@@ -361,7 +361,7 @@ Kafka sink provides a builder class to construct an instance of a KafkaSink. The
 shows how to write String records to a Kafka topic with a delivery guarantee of at least once.
 
 ```java
-DataStream<String> stream = ...
+DataStream<String> stream = ...;
         
 KafkaSink<String> sink = KafkaSink.<String>builder()
         .setBootstrapServers(brokers)
diff --git a/docs/content/docs/connectors/datastream/pulsar.md b/docs/content/docs/connectors/datastream/pulsar.md
index 2432227..d544241 100644
--- a/docs/content/docs/connectors/datastream/pulsar.md
+++ b/docs/content/docs/connectors/datastream/pulsar.md
@@ -83,15 +83,15 @@ Pulsar source provide two ways of topic-partition subscription:
 
 - Topic list, subscribing messages from all partitions in a list of topics. For example:
   ```java
-  PulsarSource.builder().setTopics("some-topic1", "some-topic2")
+  PulsarSource.builder().setTopics("some-topic1", "some-topic2");
 
   // Partition 0 and 2 of topic "topic-a"
-  PulsarSource.builder().setTopics("topic-a-partition-0", "topic-a-partition-2")
+  PulsarSource.builder().setTopics("topic-a-partition-0", "topic-a-partition-2");
   ```
 
 - Topic pattern, subscribing messages from all topics whose name matches the provided regular expression. For example:
   ```java
-  PulsarSource.builder().setTopicPattern("topic-*")
+  PulsarSource.builder().setTopicPattern("topic-*");
   ```
 
 #### Flexible Topic Naming
@@ -158,21 +158,21 @@ you can use the predefined `PulsarDeserializationSchema`. Pulsar connector provi
 - Decode the message by using Pulsar's [Schema](https://pulsar.apache.org/docs/en/schema-understand/).
   ```java
   // Primitive types
-  PulsarDeserializationSchema.pulsarSchema(Schema)
+  PulsarDeserializationSchema.pulsarSchema(Schema);
 
   // Struct types (JSON, Protobuf, Avro, etc.)
-  PulsarDeserializationSchema.pulsarSchema(Schema, Class)
+  PulsarDeserializationSchema.pulsarSchema(Schema, Class);
 
   // KeyValue type
-  PulsarDeserializationSchema.pulsarSchema(Schema, Class, Class)
+  PulsarDeserializationSchema.pulsarSchema(Schema, Class, Class);
   ```
 - Decode the message by using Flink's `DeserializationSchema`
   ```java
-  PulsarDeserializationSchema.flinkSchema(DeserializationSchema)
+  PulsarDeserializationSchema.flinkSchema(DeserializationSchema);
   ```
 - Decode the message by using Flink's `TypeInformation`
   ```java
-  PulsarDeserializationSchema.flinkTypeInfo(TypeInformation, ExecutionConfig)
+  PulsarDeserializationSchema.flinkTypeInfo(TypeInformation, ExecutionConfig);
   ```
 
 Pulsar `Message<byte[]>` contains some [extra properties](https://pulsar.apache.org/docs/en/concepts-messaging/#messages),
@@ -200,10 +200,10 @@ By default, if no subscription type is defined, Pulsar source uses `Shared` subs
 
 ```java
 // Shared subscription with name "my-shared"
-PulsarSource.builder().setSubscriptionName("my-shared")
+PulsarSource.builder().setSubscriptionName("my-shared");
 
 // Exclusive subscription with name "my-exclusive"
-PulsarSource.builder().setSubscriptionName("my-exclusive").setSubscriptionType(SubscriptionType.Exclusive)
+PulsarSource.builder().setSubscriptionName("my-exclusive").setSubscriptionType(SubscriptionType.Exclusive);
 ```
 
 If you want to use `Key_Shared` subscription type on the Pulsar connector. Ensure that you provide a `RangeGenerator` implementation.
@@ -220,29 +220,29 @@ Built-in start cursors include:
 
 - Start from the earliest available message in the topic.
   ```java
-  StartCursor.earliest()
+  StartCursor.earliest();
   ```
 - Start from the latest available message in the topic.
   ```java
-  StartCursor.latest()
+  StartCursor.latest();
   ```
 - Start from a specified message between the earliest and the latest.
   Pulsar connector would consume from the latest available message if the message id doesn't exist.
 
   The start message is included in consuming result.
   ```java
-  StartCursor.fromMessageId(MessageId)
+  StartCursor.fromMessageId(MessageId);
   ```
 - Start from a specified message between the earliest and the latest.
   Pulsar connector would consume from the latest available message if the message id doesn't exist.
 
   Include or exclude the start message by using the second boolean parameter.
   ```java
-  StartCursor.fromMessageId(MessageId, boolean)
+  StartCursor.fromMessageId(MessageId, boolean);
   ```
 - Start from the specified message time by `Message<byte[]>.getEventTime()`.
   ```java
-  StartCursor.fromMessageTime(long)
+  StartCursor.fromMessageTime(long);
   ```
 
 {{< hint info >}}
@@ -268,23 +268,23 @@ Built-in stop cursors include:
 
 - Connector will never stop consuming.
   ```java
-  StopCursor.never()
+  StopCursor.never();
   ```
 - Stop at the latest available message in Pulsar when the connector starts consuming.
   ```java
-  StopCursor.latest()
+  StopCursor.latest();
   ```
 - Stop when connector meet a given message, or stop at a message which is produced after this given message.
   ```java
-  StopCursor.atMessageId(MessageId)
+  StopCursor.atMessageId(MessageId);
   ```
 - Stop but include the given message in consuming result.
   ```java
-  StopCursor.afterMessageId(MessageId)
+  StopCursor.afterMessageId(MessageId);
   ```
 - Stop at the specified message time by `Message<byte[]>.getEventTime()`.
   ```java
-  StopCursor.atEventTime(long)
+  StopCursor.atEventTime(long);
   ```
 
 ### Configurable Options
@@ -349,7 +349,7 @@ You can define your own `WatermarkStrategy` to extract the event time from the m
 and emit the watermark downstream:
 
 ```java
-env.fromSource(pulsarSource, new CustomWatermarkStrategy(), "Pulsar Source With Custom Watermark Strategy")
+env.fromSource(pulsarSource, new CustomWatermarkStrategy(), "Pulsar Source With Custom Watermark Strategy");
 ```
 
 [This documentation]({{< ref "docs/dev/datastream/event-time/generating_watermarks.md" >}}) describes
diff --git a/docs/content/docs/deployment/filesystems/azure.md b/docs/content/docs/deployment/filesystems/azure.md
index 1b94c6d..c9d49f5 100644
--- a/docs/content/docs/deployment/filesystems/azure.md
+++ b/docs/content/docs/deployment/filesystems/azure.md
@@ -64,7 +64,7 @@ See below for how to use Azure Blob Storage in a Flink job:
 env.readTextFile("wasb://<your-container>@$<your-azure-account>.blob.core.windows.net/<object-path>");
 
 // Write to Azure Blob storage
-stream.writeAsText("wasb://<your-container>@$<your-azure-account>.blob.core.windows.net/<object-path>")
+stream.writeAsText("wasb://<your-container>@$<your-azure-account>.blob.core.windows.net/<object-path>");
 
 // Use Azure Blob Storage as checkpoint storage
 env.getCheckpointConfig().setCheckpointStorage("wasb://<your-container>@$<your-azure-account>.blob.core.windows.net/<object-path>");
diff --git a/docs/content/docs/deployment/filesystems/oss.md b/docs/content/docs/deployment/filesystems/oss.md
index 7ca5129..0dc0eba 100644
--- a/docs/content/docs/deployment/filesystems/oss.md
+++ b/docs/content/docs/deployment/filesystems/oss.md
@@ -47,7 +47,7 @@ Below shows how to use OSS in a Flink job:
 env.readTextFile("oss://<your-bucket>/<object-name>");
 
 // Write to OSS bucket
-stream.writeAsText("oss://<your-bucket>/<object-name>")
+stream.writeAsText("oss://<your-bucket>/<object-name>");
 
 // Use OSS as checkpoint storage
 env.getCheckpointConfig().setCheckpointStorage("oss://<your-bucket>/<object-name>");
diff --git a/docs/content/docs/dev/dataset/examples.md b/docs/content/docs/dev/dataset/examples.md
index cd3b42d..6aaf729 100644
--- a/docs/content/docs/dev/dataset/examples.md
+++ b/docs/content/docs/dev/dataset/examples.md
@@ -134,7 +134,7 @@ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
 // read the pages and initial ranks by parsing a CSV file
 DataSet<Tuple2<Long, Double>> pagesWithRanks = env.readCsvFile(pagesInputPath)
-						   .types(Long.class, Double.class)
+						   .types(Long.class, Double.class);
 
 // the links are encoded as an adjacency list: (page-id, Array(neighbor-ids))
 DataSet<Tuple2<Long, Long[]>> pageLinkLists = getLinksDataSet(env);
diff --git a/docs/content/docs/dev/dataset/hadoop_map_reduce.md b/docs/content/docs/dev/dataset/hadoop_map_reduce.md
index f72da27..04e9b38 100644
--- a/docs/content/docs/dev/dataset/hadoop_map_reduce.md
+++ b/docs/content/docs/dev/dataset/hadoop_map_reduce.md
@@ -85,7 +85,7 @@ The following example shows how to use Hadoop `Mapper` and `Reducer` functions.
 
 ```java
 // Obtain data to process somehow.
-DataSet<Tuple2<LongWritable, Text>> text = [...]
+DataSet<Tuple2<LongWritable, Text>> text = [...];
 
 DataSet<Tuple2<Text, LongWritable>> result = text
   // use Hadoop Mapper (Tokenizer) as MapFunction
diff --git a/docs/content/docs/dev/dataset/iterations.md b/docs/content/docs/dev/dataset/iterations.md
index 5ff217e..6366e4f 100644
--- a/docs/content/docs/dev/dataset/iterations.md
+++ b/docs/content/docs/dev/dataset/iterations.md
@@ -171,7 +171,7 @@ IterationState solution = getInitialSolution();
 while (!terminationCriterion()) {
 	(delta, workset) = step(workset, solution);
 
-	solution.update(delta)
+	solution.update(delta);
 }
 
 setFinalState(solution);
diff --git a/docs/content/docs/dev/dataset/overview.md b/docs/content/docs/dev/dataset/overview.md
index bbd33b6..c86c4f0 100644
--- a/docs/content/docs/dev/dataset/overview.md
+++ b/docs/content/docs/dev/dataset/overview.md
@@ -281,7 +281,7 @@ It removes the duplicate entries from the input DataSet, with respect to all fie
 {{< tabs "distinct" >}}
 {{< tab "Java" >}}
 ```java
-data.distinct()
+data.distinct();
 ```
 {{< /tab >}}
 {{< tab "Scala" >}}
@@ -431,7 +431,7 @@ Produces the union of two data sets.
 {{< tabs "union" >}}
 {{< tab "Java" >}}
 ```java
-data.union(data2)
+data.union(data2);
 ```
 {{< /tab >}}
 {{< tab "Scala" >}}
@@ -450,7 +450,7 @@ Only Map-like transformations may follow a rebalance transformation.
 {{< tab "Java" >}}
 ```java
 DataSet<Int> data1 = // [...]
-DataSet<Tuple2<Int, String>> result = data1.rebalance().map(...)
+DataSet<Tuple2<Int, String>> result = data1.rebalance().map(...);
 ```
 {{< /tab >}}
 {{< tab "Scala" >}}
@@ -651,7 +651,7 @@ The simplest case is grouping Tuples on one or more fields of the Tuple:
 {{< tab "Java" >}}
 ```java
 DataSet<Tuple3<Integer,String,Long>> input = // [...]
-UnsortedGrouping<Tuple3<Integer,String,Long>,Tuple> keyed = input.groupBy(0)
+UnsortedGrouping<Tuple3<Integer,String,Long>,Tuple> keyed = input.groupBy(0);
 ```
 {{< /tab >}}
 {{< tab "Scala" >}}
@@ -668,7 +668,7 @@ Tuples are grouped on the first field (the one of Integer type).
 {{< tab "Java" >}}
 ```java
 DataSet<Tuple3<Integer,String,Long>> input = // [...]
-UnsortedGrouping<Tuple3<Integer,String,Long>,Tuple> keyed = input.groupBy(0,1)
+UnsortedGrouping<Tuple3<Integer,String,Long>,Tuple> keyed = input.groupBy(0,1);
 ```
 {{< /tab >}}
 {{< tab "Scala" >}}
@@ -707,7 +707,7 @@ public class WC {
   public int count;
 }
 DataSet<WC> words = // [...]
-DataSet<WC> wordCounts = words.groupBy("word")
+DataSet<WC> wordCounts = words.groupBy("word");
 ```
 {{< /tab >}}
 {{< tab "Scala" >}}
@@ -1393,11 +1393,11 @@ final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
 DataSet<Integer> myInts = env.fromElements(1, 2, 3, 4, 5);
 
 // Create a DataSet from any Java collection
-List<Tuple2<String, Integer>> data = ...
+List<Tuple2<String, Integer>> data = ...;
 DataSet<Tuple2<String, Integer>> myTuples = env.fromCollection(data);
 
 // Create a DataSet from an Iterator
-Iterator<Long> longIt = ...
+Iterator<Long> longIt = ...;
 DataSet<Long> myLongs = env.fromCollection(longIt, Long.class);
 ```
 {{< /tab >}}
@@ -1495,14 +1495,14 @@ The distributed cache is used as follows:
 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
 // register a file from HDFS
-env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile")
+env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile");
 
 // register a local executable file (script, executable, ...)
-env.registerCachedFile("file:///path/to/exec/file", "localExecFile", true)
+env.registerCachedFile("file:///path/to/exec/file", "localExecFile", true);
 
 // define your program and execute
 ...
-DataSet<String> input = ...
+DataSet<String> input = ...;
 DataSet<Integer> result = input.map(new MyMapper());
 ...
 env.execute();
@@ -1709,4 +1709,4 @@ public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<S
       Configuration globConf = (Configuration) globalParams;
       mykey = globConf.getString("mykey", null);
   }
-```
\ No newline at end of file
+```
diff --git a/docs/content/docs/dev/dataset/transformations.md b/docs/content/docs/dev/dataset/transformations.md
index 3438f5c..d3ca6f7d 100644
--- a/docs/content/docs/dev/dataset/transformations.md
+++ b/docs/content/docs/dev/dataset/transformations.md
@@ -202,7 +202,7 @@ DataSet<Tuple2<String, Integer>> out = in.project(2,0);
 Note that the Java compiler cannot infer the return type of `project` operator. This can cause a problem if you call another operator on a result of `project` operator such as:
 
 ```java
-DataSet<Tuple5<String,String,String,String,String>> ds = ....
+DataSet<Tuple5<String,String,String,String,String>> ds = ....;
 DataSet<Tuple1<String>> ds2 = ds.project(0).distinct(0);
 ```
 
diff --git a/docs/content/docs/dev/datastream/application_parameters.md b/docs/content/docs/dev/datastream/application_parameters.md
index 2069a34..3cdf2f3 100644
--- a/docs/content/docs/dev/datastream/application_parameters.md
+++ b/docs/content/docs/dev/datastream/application_parameters.md
@@ -89,7 +89,7 @@ ParameterTool parameters = // ...
 parameter.getRequired("input");
 parameter.get("output", "myDefaultValue");
 parameter.getLong("expectedCount", -1L);
-parameter.getNumberOfParameters()
+parameter.getNumberOfParameters();
 // .. there are more methods available.
 ```
 
diff --git a/docs/content/docs/dev/datastream/execution/parallel.md b/docs/content/docs/dev/datastream/execution/parallel.md
index caa53ea..2fbe5c0 100644
--- a/docs/content/docs/dev/datastream/execution/parallel.md
+++ b/docs/content/docs/dev/datastream/execution/parallel.md
@@ -52,7 +52,7 @@ The parallelism of an individual operator, data source, or data sink can be defi
 ```java
 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
-DataStream<String> text = [...]
+DataStream<String> text = [...];
 DataStream<Tuple2<String, Integer>> wordCounts = text
     .flatMap(new LineSplitter())
     .keyBy(value -> value.f0)
@@ -99,8 +99,8 @@ of `3`, set the default parallelism of the execution environment as follows:
 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 env.setParallelism(3);
 
-DataStream<String> text = [...]
-DataStream<Tuple2<String, Integer>> wordCounts = [...]
+DataStream<String> text = [...];
+DataStream<Tuple2<String, Integer>> wordCounts = [...];
 wordCounts.print();
 
 env.execute("Word Count Example");
diff --git a/docs/content/docs/dev/datastream/experimental.md b/docs/content/docs/dev/datastream/experimental.md
index 654b768..75c1c47 100644
--- a/docs/content/docs/dev/datastream/experimental.md
+++ b/docs/content/docs/dev/datastream/experimental.md
@@ -62,7 +62,7 @@ Code example:
 {{< tab "Java" >}}
 ```java
 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-DataStreamSource<Integer> source = ...
+DataStreamSource<Integer> source = ...;
 DataStreamUtils.reinterpretAsKeyedStream(source, (in) -> in, TypeInformation.of(Integer.class))
     .window(TumblingEventTimeWindows.of(Time.seconds(1)))
     .reduce((a, b) -> a + b)
diff --git a/docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md b/docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md
index 3ead629..dc039a0 100644
--- a/docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md
+++ b/docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md
@@ -119,7 +119,7 @@ env.getCheckpointConfig().setExternalizedCheckpointCleanup(
 env.getCheckpointConfig().enableUnalignedCheckpoints();
 
 // sets the checkpoint storage where checkpoint snapshots will be written
-env.getCheckpointConfig().setCheckpointStorage("hdfs:///my/checkpoint/dir")
+env.getCheckpointConfig().setCheckpointStorage("hdfs:///my/checkpoint/dir");
 
 // enable checkpointing with finished tasks
 Configuration config = new Configuration();
diff --git a/docs/content/docs/dev/datastream/fault-tolerance/queryable_state.md b/docs/content/docs/dev/datastream/fault-tolerance/queryable_state.md
index 6f3b30f..2a9edc3 100644
--- a/docs/content/docs/dev/datastream/fault-tolerance/queryable_state.md
+++ b/docs/content/docs/dev/datastream/fault-tolerance/queryable_state.md
@@ -125,7 +125,7 @@ In a program like the following, all records of the keyed stream will be used to
 `ValueState.update(value)`:
 
 ```java
-stream.keyBy(value -> value.f0).asQueryableState("query-name")
+stream.keyBy(value -> value.f0).asQueryableState("query-name");
 ```
 
 This acts like the Scala API's `flatMapWithState`.
diff --git a/docs/content/docs/dev/datastream/fault-tolerance/serialization/types_serialization.md b/docs/content/docs/dev/datastream/fault-tolerance/serialization/types_serialization.md
index 6111537..4705586 100644
--- a/docs/content/docs/dev/datastream/fault-tolerance/serialization/types_serialization.md
+++ b/docs/content/docs/dev/datastream/fault-tolerance/serialization/types_serialization.md
@@ -500,7 +500,7 @@ env.getConfig().enableForceKryo();
 
 If Kryo is not able to serialize your POJO, you can add a custom serializer to Kryo, using
 ```java
-env.getConfig().addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass)
+env.getConfig().addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass);
 ```
 
 There are different variants of these methods available.
diff --git a/docs/content/docs/dev/datastream/operators/joining.md b/docs/content/docs/dev/datastream/operators/joining.md
index 50387af..6e7e469 100644
--- a/docs/content/docs/dev/datastream/operators/joining.md
+++ b/docs/content/docs/dev/datastream/operators/joining.md
@@ -39,7 +39,7 @@ stream.join(otherStream)
     .where(<KeySelector>)
     .equalTo(<KeySelector>)
     .window(<WindowAssigner>)
-    .apply(<JoinFunction>)
+    .apply(<JoinFunction>);
 ```
 
 Some notes on semantics:
@@ -65,8 +65,8 @@ import org.apache.flink.streaming.api.windowing.time.Time;
  
 ...
 
-DataStream<Integer> orangeStream = ...
-DataStream<Integer> greenStream = ...
+DataStream<Integer> orangeStream = ...;
+DataStream<Integer> greenStream = ...;
 
 orangeStream.join(greenStream)
     .where(<KeySelector>)
@@ -119,8 +119,8 @@ import org.apache.flink.streaming.api.windowing.time.Time;
 
 ...
 
-DataStream<Integer> orangeStream = ...
-DataStream<Integer> greenStream = ...
+DataStream<Integer> orangeStream = ...;
+DataStream<Integer> greenStream = ...;
 
 orangeStream.join(greenStream)
     .where(<KeySelector>)
@@ -172,8 +172,8 @@ import org.apache.flink.streaming.api.windowing.time.Time;
  
 ...
 
-DataStream<Integer> orangeStream = ...
-DataStream<Integer> greenStream = ...
+DataStream<Integer> orangeStream = ...;
+DataStream<Integer> greenStream = ...;
 
 orangeStream.join(greenStream)
     .where(<KeySelector>)
@@ -244,8 +244,8 @@ import org.apache.flink.streaming.api.windowing.time.Time;
 
 ...
 
-DataStream<Integer> orangeStream = ...
-DataStream<Integer> greenStream = ...
+DataStream<Integer> orangeStream = ...;
+DataStream<Integer> greenStream = ...;
 
 orangeStream
     .keyBy(<KeySelector>)
diff --git a/docs/content/docs/dev/datastream/operators/overview.md b/docs/content/docs/dev/datastream/operators/overview.md
index c6503ec..81e183a 100644
--- a/docs/content/docs/dev/datastream/operators/overview.md
+++ b/docs/content/docs/dev/datastream/operators/overview.md
@@ -773,7 +773,7 @@ The description can contain detail information about operators to facilitate deb
 {{< tabs namedescription >}}
 {{< tab "Java" >}}
 ```java
-someStream.filter(...).setName("filter").setDescription("x in (1, 2, 3, 4) and y > 1")
+someStream.filter(...).setName("filter").setDescription("x in (1, 2, 3, 4) and y > 1");
 ```
 {{< /tab >}}
 {{< tab "Scala" >}}
diff --git a/docs/content/docs/dev/datastream/operators/process_function.md b/docs/content/docs/dev/datastream/operators/process_function.md
index 41d1012..52fd53a 100644
--- a/docs/content/docs/dev/datastream/operators/process_function.md
+++ b/docs/content/docs/dev/datastream/operators/process_function.md
@@ -53,7 +53,7 @@ to apply the `ProcessFunction` on a keyed stream:
 {{< /hint >}}
 
 ```java
-stream.keyBy(...).process(new MyProcessFunction())
+stream.keyBy(...).process(new MyProcessFunction());
 ```
 
 ## Low-level Joins
@@ -461,7 +461,7 @@ Stopping a processing-time timer:
 {{< tabs "5d0d1344-6f51-44f8-b500-ebe863cedba4" >}}
 {{< tab "Java" >}}
 ```java
-long timestampOfTimerToStop = ...
+long timestampOfTimerToStop = ...;
 ctx.timerService().deleteProcessingTimeTimer(timestampOfTimerToStop);
 ```
 {{< /tab >}}
@@ -484,7 +484,7 @@ Stopping an event-time timer:
 {{< tabs "581e5996-503c-452e-8b2a-a4daeaf4ac88" >}}
 {{< tab "Java" >}}
 ```java
-long timestampOfTimerToStop = ...
+long timestampOfTimerToStop = ...;
 ctx.timerService().deleteEventTimeTimer(timestampOfTimerToStop);
 ```
 {{< /tab >}}
diff --git a/docs/content/docs/dev/datastream/overview.md b/docs/content/docs/dev/datastream/overview.md
index 329d008..3b4d139 100644
--- a/docs/content/docs/dev/datastream/overview.md
+++ b/docs/content/docs/dev/datastream/overview.md
@@ -85,11 +85,11 @@ The `StreamExecutionEnvironment` is the basis for all Flink programs. You can
 obtain one using these static methods on `StreamExecutionEnvironment`:
 
 ```java
-getExecutionEnvironment()
+getExecutionEnvironment();
 
-createLocalEnvironment()
+createLocalEnvironment();
 
-createRemoteEnvironment(String host, int port, String... jarFiles)
+createRemoteEnvironment(String host, int port, String... jarFiles);
 ```
 
 Typically, you only need to use `getExecutionEnvironment()`, since this will do
@@ -136,9 +136,9 @@ an outside system by creating a sink. These are just some example methods for
 creating a sink:
 
 ```java
-writeAsText(String path)
+writeAsText(String path);
 
-print()
+print();
 ```
 
 {{< /tab >}}
@@ -744,7 +744,7 @@ List<Tuple2<String, Integer>> data = ...
 DataStream<Tuple2<String, Integer>> myTuples = env.fromCollection(data);
 
 // Create a DataStream from an Iterator
-Iterator<Long> longIt = ...
+Iterator<Long> longIt = ...;
 DataStream<Long> myLongs = env.fromCollection(longIt, Long.class);
 ```
 {{< /tab >}}
@@ -777,10 +777,10 @@ Flink also provides a sink to collect DataStream results for testing and debuggi
 {{< tabs "125e228e-13b5-4c77-93a7-c0f436fcdd2f" >}}
 {{< tab "Java" >}}
 ```java
-import org.apache.flink.streaming.experimental.DataStreamUtils
+import org.apache.flink.streaming.experimental.DataStreamUtils;
 
-DataStream<Tuple2<String, Integer>> myResult = ...
-Iterator<Tuple2<String, Integer>> myOutput = DataStreamUtils.collect(myResult)
+DataStream<Tuple2<String, Integer>> myResult = ...;
+Iterator<Tuple2<String, Integer>> myOutput = DataStreamUtils.collect(myResult);
 ```
 
 {{< /tab >}}
diff --git a/docs/content/docs/dev/datastream/sources.md b/docs/content/docs/dev/datastream/sources.md
index cae739a..2a9265a 100644
--- a/docs/content/docs/dev/datastream/sources.md
+++ b/docs/content/docs/dev/datastream/sources.md
@@ -341,7 +341,7 @@ The `WatermarkStrategy` is passed to the Source during creation in the DataStrea
 environment.fromSource(
     Source<OUT, ?, ?> source,
     WatermarkStrategy<OUT> timestampsAndWatermarks,
-    String sourceName)
+    String sourceName);
 ```
 
 The `TimestampAssigner` and `WatermarkGenerator` run transparently as part of the `ReaderOutput`(or `SourceOutput`) so source implementors do not have to implement any timestamp extraction and watermark generation code.
diff --git a/docs/content/docs/dev/datastream/user_defined_functions.md b/docs/content/docs/dev/datastream/user_defined_functions.md
index 4d6db24..55aa7ac 100644
--- a/docs/content/docs/dev/datastream/user_defined_functions.md
+++ b/docs/content/docs/dev/datastream/user_defined_functions.md
@@ -212,7 +212,7 @@ returned from the `execute()` method of the execution environment
 completion of the job).
 
 ```java
-myJobExecutionResult.getAccumulatorResult("num-lines")
+myJobExecutionResult.getAccumulatorResult("num-lines");
 ```
 
 All accumulators share a single namespace per job. Thus you can use the same accumulator in
diff --git a/docs/content/docs/dev/table/catalogs.md b/docs/content/docs/dev/table/catalogs.md
index 54b6941..d0b7ab6 100644
--- a/docs/content/docs/dev/table/catalogs.md
+++ b/docs/content/docs/dev/table/catalogs.md
@@ -76,7 +76,7 @@ Users can use SQL DDL to create tables in catalogs in both Table API and SQL.
 {{< tabs "b462513f-2da9-4bd0-a55d-ca9a5e4cf512" >}}
 {{< tab "Java" >}}
 ```java
-TableEnvironment tableEnv = ...
+TableEnvironment tableEnv = ...;
 
 // Create a HiveCatalog 
 Catalog catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>");
diff --git a/docs/content/docs/dev/table/common.md b/docs/content/docs/dev/table/common.md
index 0f8b309..5fa3ead 100644
--- a/docs/content/docs/dev/table/common.md
+++ b/docs/content/docs/dev/table/common.md
@@ -51,7 +51,7 @@ tableEnv.createTemporaryTable("SourceTable", TableDescriptor.forConnector("datag
       .column("f0", DataTypes.STRING())
       .build())
     .option(DataGenOptions.ROWS_PER_SECOND, 100)
-    .build())
+    .build());
 
 // Create a sink table (using SQL DDL)
 tableEnv.executeSql("CREATE TEMPORARY TABLE SinkTable WITH ('connector' = 'blackhole') LIKE SourceTable");
@@ -344,7 +344,7 @@ tableEnv.createTable("SourceTableA", sourceDescriptor);
 tableEnv.createTemporaryTable("SourceTableB", sourceDescriptor);
 
 // Using SQL DDL
-tableEnv.executeSql("CREATE [TEMPORARY] TABLE MyTable (...) WITH (...)")
+tableEnv.executeSql("CREATE [TEMPORARY] TABLE MyTable (...) WITH (...)");
 ```
 
 ### Expanding Table identifiers
@@ -672,7 +672,7 @@ tableEnv.createTemporaryTable("CsvSinkTable", TableDescriptor.forConnector("file
     .build());
 
 // compute a result Table using Table API operators and/or SQL queries
-Table result = ...
+Table result = ...;
 
 // Prepare the insert into pipeline
 TablePipeline pipeline = result.insertInto("CsvSinkTable");
diff --git a/docs/content/docs/dev/table/config.md b/docs/content/docs/dev/table/config.md
index 9cc928d..ccdcc87 100644
--- a/docs/content/docs/dev/table/config.md
+++ b/docs/content/docs/dev/table/config.md
@@ -51,7 +51,7 @@ table environment.
 {{< tab "Java" >}}
 ```java
 // instantiate table environment
-TableEnvironment tEnv = ...
+TableEnvironment tEnv = ...;
 
 // access flink configuration
 TableConfig configuration = tEnv.getConfig();
diff --git a/docs/content/docs/dev/table/data_stream_api.md b/docs/content/docs/dev/table/data_stream_api.md
index 7506149..28950e2 100644
--- a/docs/content/docs/dev/table/data_stream_api.md
+++ b/docs/content/docs/dev/table/data_stream_api.md
@@ -598,25 +598,25 @@ pipeline or a statement set:
 
 ```java
 // execute with explicit sink
-tableEnv.from("InputTable").insertInto("OutputTable").execute()
+tableEnv.from("InputTable").insertInto("OutputTable").execute();
 
-tableEnv.executeSql("INSERT INTO OutputTable SELECT * FROM InputTable")
+tableEnv.executeSql("INSERT INTO OutputTable SELECT * FROM InputTable");
 
 tableEnv.createStatementSet()
     .add(tableEnv.from("InputTable").insertInto("OutputTable"))
     .add(tableEnv.from("InputTable").insertInto("OutputTable2"))
-    .execute()
+    .execute();
 
 tableEnv.createStatementSet()
     .addInsertSql("INSERT INTO OutputTable SELECT * FROM InputTable")
     .addInsertSql("INSERT INTO OutputTable2 SELECT * FROM InputTable")
-    .execute()
+    .execute();
 
 // execute with implicit local sink
 
-tableEnv.from("InputTable").execute().print()
+tableEnv.from("InputTable").execute().print();
 
-tableEnv.executeSql("SELECT * FROM InputTable").print()
+tableEnv.executeSql("SELECT * FROM InputTable").print();
 ```
 
 To combine both execution behaviors, every call to `StreamTableEnvironment.toDataStream`
@@ -629,17 +629,17 @@ these "external parts".
 // (1)
 
 // adds a branch with a printing sink to the StreamExecutionEnvironment
-tableEnv.toDataStream(table).print()
+tableEnv.toDataStream(table).print();
 
 // (2)
 
 // executes a Table API end-to-end pipeline as a Flink job and prints locally,
 // thus (1) has still not been executed
-table.execute().print()
+table.execute().print();
 
 // executes the DataStream API pipeline with the sink defined in (1) as a
 // Flink job, (2) was already running before
-env.execute()
+env.execute();
 ```
 
 {{< top >}}
@@ -2771,7 +2771,7 @@ The schema of the resulting view depends on the data type of the registered coll
 {{< tab "Java" >}}
 ```java
 StreamTableEnvironment tableEnv = ...; 
-DataStream<Tuple2<Long, String>> stream = ...
+DataStream<Tuple2<Long, String>> stream = ...;
 
 Table table2 = tableEnv.fromDataStream(stream, $("myLong"), $("myString"));
 ```
@@ -2926,7 +2926,7 @@ When defining a position-based mapping, the specified names must not exist in th
 ```java
 StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section;
 
-DataStream<Tuple2<Long, Integer>> stream = ...
+DataStream<Tuple2<Long, Integer>> stream = ...;
 
 // convert DataStream into Table with field "myLong" only
 Table table = tableEnv.fromDataStream(stream, $("myLong"));
@@ -2978,7 +2978,7 @@ If no field names are specified, the default field names and field order of the
 ```java
 StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
 
-DataStream<Tuple2<Long, Integer>> stream = ...
+DataStream<Tuple2<Long, Integer>> stream = ...;
 
 // convert DataStream into Table with field "f1" only
 Table table = tableEnv.fromDataStream(stream, $("f1"));
@@ -3039,7 +3039,7 @@ The type of the column is inferred from the atomic type. The name of the column
 ```java
 StreamTableEnvironment tableEnv = ...;
 
-DataStream<Long> stream = ...
+DataStream<Long> stream = ...;
 
 // Convert DataStream into Table with field name "myLong"
 Table table = tableEnv.fromDataStream(stream, $("myLong"));
@@ -3089,7 +3089,7 @@ Name-based mapping allows for reordering fields and projection with alias (`as`)
 ```java
 StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
 
-DataStream<Tuple2<Long, String>> stream = ...
+DataStream<Tuple2<Long, String>> stream = ...;
 
 // convert DataStream into Table with renamed field names "myLong", "myString" (position-based)
 Table table = tableEnv.fromDataStream(stream, $("myLong"), $("myString"));
@@ -3183,7 +3183,7 @@ When converting a POJO `DataStream` into a `Table` without specifying field name
 StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
 
 // Person is a POJO with fields "name" and "age"
-DataStream<Person> stream = ...
+DataStream<Person> stream = ...;
 
 // convert DataStream into Table with renamed fields "myAge", "myName" (name-based)
 Table table = tableEnv.fromDataStream(stream, $("age").as("myAge"), $("name").as("myName"));
@@ -3230,7 +3230,7 @@ Fields can be renamed by providing names for all fields (mapping based on positi
 StreamTableEnvironment tableEnv = ...; 
 
 // DataStream of Row with two fields "name" and "age" specified in `RowTypeInfo`
-DataStream<Row> stream = ...
+DataStream<Row> stream = ...;
 
 // Convert DataStream into Table with renamed field names "myName", "myAge" (position-based)
 Table table = tableEnv.fromDataStream(stream, $("myName"), $("myAge"));
diff --git a/docs/content/docs/dev/table/functions/systemFunctions.md b/docs/content/docs/dev/table/functions/systemFunctions.md
index a07cfa2..9e88fdb 100644
--- a/docs/content/docs/dev/table/functions/systemFunctions.md
+++ b/docs/content/docs/dev/table/functions/systemFunctions.md
@@ -209,7 +209,7 @@ The column functions can be used in all places where column fields are expected,
 ```java
 table
    .groupBy("withColumns(1 to 3)")
-   .select("withColumns(a to b), myUDAgg(myUDF(withColumns(5 to 20)))")
+   .select("withColumns(a to b), myUDAgg(myUDF(withColumns(5 to 20)))");
 ```
 {{< /tab >}}
 {{< tab "Scala" >}}
diff --git a/docs/content/docs/dev/table/tuning.md b/docs/content/docs/dev/table/tuning.md
index 804e75f..e9351fb 100644
--- a/docs/content/docs/dev/table/tuning.md
+++ b/docs/content/docs/dev/table/tuning.md
@@ -59,7 +59,7 @@ The following examples show how to enable these options.
 {{< tab "Java" >}}
 ```java
 // instantiate table environment
-TableEnvironment tEnv = ...
+TableEnvironment tEnv = ...;
 
 // access flink configuration
 TableConfig configuration = tEnv.getConfig();
@@ -122,7 +122,7 @@ The following examples show how to enable the local-global aggregation.
 {{< tab "Java" >}}
 ```java
 // instantiate table environment
-TableEnvironment tEnv = ...
+TableEnvironment tEnv = ...;
 
 // access flink configuration
 TableConfig configuration = tEnv.getConfig();
@@ -210,7 +210,7 @@ The following examples show how to enable the split distinct aggregation optimiz
 {{< tab "Java" >}}
 ```java
 // instantiate table environment
-TableEnvironment tEnv = ...
+TableEnvironment tEnv = ...;
 
 tEnv.getConfig()
   .set("table.optimizer.distinct-agg.split.enabled", "true");  // enable distinct agg split
diff --git a/docs/content/docs/learn-flink/streaming_analytics.md b/docs/content/docs/learn-flink/streaming_analytics.md
index b70c675..f3ddc13 100644
--- a/docs/content/docs/learn-flink/streaming_analytics.md
+++ b/docs/content/docs/learn-flink/streaming_analytics.md
@@ -134,7 +134,7 @@ a class that extracts the timestamps from the events, and generates watermarks o
 easiest way to do this is by using a `WatermarkStrategy`:
 
 ```java
-DataStream<Event> stream = ...
+DataStream<Event> stream = ...;
 
 WatermarkStrategy<Event> strategy = WatermarkStrategy
         .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(20))
@@ -244,7 +244,7 @@ end-of-window-timestamp, max_value)`.
 #### ProcessWindowFunction Example
 
 ```java
-DataStream<SensorReading> input = ...
+DataStream<SensorReading> input = ...;
 
 input
     .keyBy(x -> x.key)
@@ -296,7 +296,7 @@ per-key information for all windows of that key. This might be useful, for examp
 #### Incremental Aggregation Example
 
 ```java
-DataStream<SensorReading> input = ...
+DataStream<SensorReading> input = ...;
 
 input
     .keyBy(x -> x.key)
diff --git a/docs/content/docs/libs/cep.md b/docs/content/docs/libs/cep.md
index b927cdb..fc541e2 100644
--- a/docs/content/docs/libs/cep.md
+++ b/docs/content/docs/libs/cep.md
@@ -65,7 +65,7 @@ because FlinkCEP uses them for comparing and matching events.
 {{< tabs "8951ef0a-cdd4-40d1-bda8-dec1299aaf41" >}}
 {{< tab "Java" >}}
 ```java
-DataStream<Event> input = ...
+DataStream<Event> input = ...;
 
 Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(
         new SimpleCondition<Event>() {
@@ -341,7 +341,7 @@ via the `pattern.subtype(subClass)` method.
 start.subtype(SubEvent.class).where(new SimpleCondition<SubEvent>() {
     @Override
     public boolean filter(SubEvent value) {
-        return ... // some condition
+        return ...; // some condition
     }
 });
 ```
@@ -361,12 +361,12 @@ start.subtype(classOf[SubEvent]).where(subEvent => ... /* some condition */)
 pattern.where(new SimpleCondition<Event>() {
     @Override
     public boolean filter(Event value) {
-        return ... // some condition
+        return ...; // some condition
     }
 }).or(new SimpleCondition<Event>() {
     @Override
     public boolean filter(Event value) {
-        return ... // or condition
+        return ...; // or condition
     }
 });
 ```
@@ -403,7 +403,7 @@ Multiple consecutive where() clauses lead to their conditions being `AND`ed.
 pattern.where(new IterativeCondition<Event>() {
     @Override
     public boolean filter(Event value, Context ctx) throws Exception {
-        return ... // some condition
+        return ...; // some condition
     }
 });
 ```
@@ -425,12 +425,12 @@ Adds a new condition which is `OR`ed with an existing one. An event can match th
 pattern.where(new IterativeCondition<Event>() {
     @Override
     public boolean filter(Event value, Context ctx) throws Exception {
-        return ... // some condition
+        return ...; // some condition
     }
 }).or(new IterativeCondition<Event>() {
     @Override
     public boolean filter(Event value, Context ctx) throws Exception {
-        return ... // alternative condition
+        return ...; // alternative condition
     }
 });
 ```
@@ -455,7 +455,7 @@ events will be accepted into the pattern. Applicable only in conjunction with `o
 pattern.oneOrMore().until(new IterativeCondition<Event>() {
     @Override
     public boolean filter(Event value, Context ctx) throws Exception {
-        return ... // alternative condition
+        return ...; // alternative condition
     }
 });
 ```
@@ -1248,7 +1248,7 @@ Then apply the skip strategy to a pattern by calling:
 {{< tabs "64a34dcc-47f8-443d-b31a-515f7fd17243" >}}
 {{< tab "Java" >}}
 ```java
-AfterMatchSkipStrategy skipStrategy = ...
+AfterMatchSkipStrategy skipStrategy = ...;
 Pattern.begin("patternName", skipStrategy);
 ```
 {{< /tab >}}
@@ -1269,7 +1269,7 @@ One can enable this option by:
 {{< tabs "59e07b27-61d3-4348-ab60-c8a805500c87" >}}
 {{< tab "Java" >}}
 ```java
-AfterMatchSkipStrategy.skipToFirst(patternName).throwExceptionOnMiss()
+AfterMatchSkipStrategy.skipToFirst(patternName).throwExceptionOnMiss();
 ```
 {{< /tab >}}
 {{< tab "Scala" >}}
@@ -1288,9 +1288,9 @@ Given an input stream `input`, a pattern `pattern` and an optional comparator `c
 {{< tabs "79719c8a-f503-4f3e-9717-75540e637481" >}}
 {{< tab "Java" >}}
 ```java
-DataStream<Event> input = ...
-Pattern<Event, ?> pattern = ...
-EventComparator<Event> comparator = ... // optional
+DataStream<Event> input = ...;
+Pattern<Event, ?> pattern = ...;
+EventComparator<Event> comparator = ...; // optional
 
 PatternStream<Event> patternStream = CEP.pattern(input, pattern, comparator);
 ```
@@ -1528,9 +1528,9 @@ The whole processing is done with event time.
 {{< tabs "573ac3c5-e8b9-4ffa-b7b6-e2db19611ff5" >}}
 {{< tab "Java" >}}
 ```java
-StreamExecutionEnvironment env = ...
+StreamExecutionEnvironment env = ...;
 
-DataStream<Event> input = ...
+DataStream<Event> input = ...;
 
 DataStream<Event> partitionedInput = input.keyBy(new KeySelector<Event, Integer>() {
 	@Override
diff --git a/docs/content/docs/libs/gelly/bipartite_graph.md b/docs/content/docs/libs/gelly/bipartite_graph.md
index aefc527..e2763eb 100644
--- a/docs/content/docs/libs/gelly/bipartite_graph.md
+++ b/docs/content/docs/libs/gelly/bipartite_graph.md
@@ -84,11 +84,11 @@ You can create a `BipartiteGraph` in the following ways:
 ```java
 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-DataSet<Vertex<String, Long>> topVertices = ...
+DataSet<Vertex<String, Long>> topVertices = ...;
 
-DataSet<Vertex<String, Long>> bottomVertices = ...
+DataSet<Vertex<String, Long>> bottomVertices = ...;
 
-DataSet<Edge<String, String, Double>> edges = ...
+DataSet<Edge<String, String, Double>> edges = ...;
 
 Graph<String, String, Long, Long, Double> graph = BipartiteGraph.fromDataSet(topVertices, bottomVertices, edges, env);
 ```
@@ -120,14 +120,14 @@ In the case of a simple projection each node in the result graph contains a pair
 ```java
 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 // Vertices (1, "top1")
-DataSet<Vertex<Long, String>> topVertices = ...
+DataSet<Vertex<Long, String>> topVertices = ...;
 
 // Vertices (2, "bottom2"); (4, "bottom4")
-DataSet<Vertex<Long, String>> bottomVertices = ...
+DataSet<Vertex<Long, String>> bottomVertices = ...;
 
 // Edge that connect vertex 2 to vertex 1 and vertex 4 to vertex 1:
 // (1, 2, "1-2-edge"); (1, 4, "1-4-edge")
-DataSet<Edge<Long, Long, String>> edges = ...
+DataSet<Edge<Long, Long, String>> edges = ...;
 
 BipartiteGraph<Long, Long, String, String, String> graph = BipartiteGraph.fromDataSet(topVertices, bottomVertices, edges, env);
 
@@ -155,14 +155,14 @@ Full projection preserves all the information about the connection between two v
 ```java
 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 // Vertices (1, "top1")
-DataSet<Vertex<Long, String>> topVertices = ...
+DataSet<Vertex<Long, String>> topVertices = ...;
 
 // Vertices (2, "bottom2"); (4, "bottom4")
-DataSet<Vertex<Long, String>> bottomVertices = ...
+DataSet<Vertex<Long, String>> bottomVertices = ...;
 
 // Edge that connect vertex 2 to vertex 1 and vertex 4 to vertex 1:
 // (1, 2, "1-2-edge"); (1, 4, "1-4-edge")
-DataSet<Edge<Long, Long, String>> edges = ...
+DataSet<Edge<Long, Long, String>> edges = ...;
 
 BipartiteGraph<Long, Long, String, String, String> graph = BipartiteGraph.fromDataSet(topVertices, bottomVertices, edges, env);
 
diff --git a/docs/content/docs/libs/gelly/graph_api.md b/docs/content/docs/libs/gelly/graph_api.md
index a488a56..bd3f6b2 100644
--- a/docs/content/docs/libs/gelly/graph_api.md
+++ b/docs/content/docs/libs/gelly/graph_api.md
@@ -96,9 +96,9 @@ You can create a `Graph` in the following ways:
 ```java
 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-DataSet<Vertex<String, Long>> vertices = ...
+DataSet<Vertex<String, Long>> vertices = ...;
 
-DataSet<Edge<String, Double>> edges = ...
+DataSet<Edge<String, Double>> edges = ...;
 
 Graph<String, Long, Double> graph = Graph.fromDataSet(vertices, edges, env);
 ```
@@ -123,7 +123,7 @@ val graph = Graph.fromDataSet(vertices, edges, env)
 ```java
 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-DataSet<Tuple2<String, String>> edges = ...
+DataSet<Tuple2<String, String>> edges = ...;
 
 Graph<String, NullValue, NullValue> graph = Graph.fromTuple2DataSet(edges, env);
 ```
@@ -231,9 +231,9 @@ val simpleGraph = Graph.fromCsvReader[Long, Double, NullValue](
 ```java
 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-List<Vertex<Long, Long>> vertexList = new ArrayList...
+List<Vertex<Long, Long>> vertexList = new ArrayList...;
 
-List<Edge<Long, String>> edgeList = new ArrayList...
+List<Edge<Long, String>> edgeList = new ArrayList...;
 
 Graph<Long, Long, String> graph = Graph.fromCollection(vertexList, edgeList, env);
 ```
@@ -408,7 +408,7 @@ Graph<String, Long, Long> updatedGraph = graph.translateGraphIds(
 Graph<LongValue, LongValue, LongValue> updatedGraph = graph
                 .translateGraphIds(new LongToLongValue())
                 .translateVertexValues(new LongToLongValue())
-                .translateEdgeValues(new LongToLongValue())
+                .translateEdgeValues(new LongToLongValue());
 ```
 {{< /tab >}}
 {{< tab "Scala" >}}
@@ -428,7 +428,7 @@ val updatedGraph = graph.translateGraphIds(id => id.toString)
 {{< tabs "b33fe8f8-8a53-4710-9379-8d2f912a3105" >}}
 {{< tab "Java" >}}
 ```java
-Graph<Long, Long, Long> graph = ...
+Graph<Long, Long, Long> graph = ...;
 
 graph.subgraph(
 		new FilterFunction<Vertex<Long, Long>>() {
@@ -467,7 +467,7 @@ Note that if the input dataset contains a key multiple times, all Gelly join met
 {{< tabs "219b4d15-4be2-4bbf-a3ea-4155d3f6ba27" >}}
 {{< tab "Java" >}}
 ```java
-Graph<Long, Double, Double> network = ...
+Graph<Long, Double, Double> network = ...;
 
 DataSet<Tuple2<Long, LongValue>> vertexOutDegrees = network.outDegrees();
 
@@ -519,11 +519,11 @@ val networkWithWeights = network.joinWithEdgesOnSource(vertexOutDegrees, (v1: Do
 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
 // create first graph from edges {(1, 3, 12) (1, 3, 13), (1, 3, 13)}
-List<Edge<Long, Long>> edges1 = ...
+List<Edge<Long, Long>> edges1 = ...;
 Graph<Long, NullValue, Long> graph1 = Graph.fromCollection(edges1, env);
 
 // create second graph from edges {(1, 3, 13)}
-List<Edge<Long, Long>> edges2 = ...
+List<Edge<Long, Long>> edges2 = ...;
 Graph<Long, NullValue, Long> graph2 = Graph.fromCollection(edges2, env);
 
 // Using distinct = true results in {(1,3,13)}
@@ -638,7 +638,7 @@ The following code will collect the out-edges for each vertex and apply the `Sel
 {{< tabs "8ab0141f-ed3d-4372-bfab-7f78ed6d7d5f" >}}
 {{< tab "Java" >}}
 ```java
-Graph<Long, Long, Double> graph = ...
+Graph<Long, Long, Double> graph = ...;
 
 DataSet<Tuple2<Long, Double>> minWeights = graph.reduceOnEdges(new SelectMinWeight(), EdgeDirection.OUT);
 
@@ -677,7 +677,7 @@ Similarly, assume that you would like to compute the sum of the values of all in
 {{< tabs "67e6fe66-aef8-46b8-8e80-2762dd5c3f02" >}}
 {{< tab "Java" >}}
 ```java
-Graph<Long, Long, Double> graph = ...
+Graph<Long, Long, Double> graph = ...;
 
 DataSet<Tuple2<Long, Long>> verticesWithSum = graph.reduceOnNeighbors(new SumValues(), EdgeDirection.IN);
 
@@ -720,7 +720,7 @@ For example, the following code will output all the vertex pairs which are conne
 {{< tabs "2cf7a021-b67a-42dc-912f-ef79f36314b2" >}}
 {{< tab "Java" >}}
 ```java
-Graph<Long, Long, Double> graph = ...
+Graph<Long, Long, Double> graph = ...;
 
 DataSet<Tuple2<Vertex<Long, Long>, Vertex<Long, Long>>> vertexPairs = graph.groupReduceOnNeighbors(new SelectLargeWeightNeighbors(), EdgeDirection.OUT);
 
@@ -783,10 +783,10 @@ also exist in the vertex IDs set.
 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
 // create a list of vertices with IDs = {1, 2, 3, 4, 5}
-List<Vertex<Long, Long>> vertices = ...
+List<Vertex<Long, Long>> vertices = ...;
 
 // create a list of edges with IDs = {(1, 2) (1, 3), (2, 4), (5, 6)}
-List<Edge<Long, Long>> edges = ...
+List<Edge<Long, Long>> edges = ...;
 
 Graph<Long, Long, Long> graph = Graph.fromCollection(vertices, edges, env);
 
diff --git a/docs/content/docs/libs/gelly/graph_generators.md b/docs/content/docs/libs/gelly/graph_generators.md
index ab5afe8..0ea4cf0 100644
--- a/docs/content/docs/libs/gelly/graph_generators.md
+++ b/docs/content/docs/libs/gelly/graph_generators.md
@@ -304,7 +304,7 @@ two `endpoint` vertices with degree `1` and all midpoint vertices with degree
 ```java
 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-long vertexCount = 5
+long vertexCount = 5;
 
 Graph<LongValue, NullValue, NullValue> graph = new PathGraph(env, vertexCount)
     .generate();
@@ -417,7 +417,7 @@ An undirected graph containing isolated two-paths where every vertex has degree
 ```java
 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-long vertexPairCount = 4
+long vertexPairCount = 4;
 
 // note: configured with the number of vertex pairs
 Graph<LongValue, NullValue, NullValue> graph = new SingletonEdgeGraph(env, vertexPairCount)
diff --git a/docs/content/docs/libs/gelly/iterative_graph_processing.md b/docs/content/docs/libs/gelly/iterative_graph_processing.md
index 442dab3..1180a58 100644
--- a/docs/content/docs/libs/gelly/iterative_graph_processing.md
+++ b/docs/content/docs/libs/gelly/iterative_graph_processing.md
@@ -50,7 +50,7 @@ Let us consider computing Single-Source-Shortest-Paths with vertex-centric itera
 {{< tab "Java" >}}
 ```java
 // read the input graph
-Graph<Long, Double, Double> graph = ...
+Graph<Long, Double, Double> graph = ...;
 
 // define the maximum number of iterations
 int maxIterations = 10;
@@ -178,7 +178,7 @@ all aggregates globally once per superstep and makes them available in the next
 {{< tab "Java" >}}
 ```java
 
-Graph<Long, Double, Double> graph = ...
+Graph<Long, Double, Double> graph = ...;
 
 // configure the iteration
 VertexCentricConfiguration parameters = new VertexCentricConfiguration();
@@ -296,7 +296,7 @@ Let us consider computing Single-Source-Shortest-Paths with scatter-gather itera
 {{< tab "Java" >}}
 ```java
 // read the input graph
-Graph<Long, Double, Double> graph = ...
+Graph<Long, Double, Double> graph = ...;
 
 // define the maximum number of iterations
 int maxIterations = 10;
@@ -421,7 +421,7 @@ If the degrees option is not set in the configuration, these methods will return
 {{< tab "Java" >}}
 ```java
 
-Graph<Long, Double, Double> graph = ...
+Graph<Long, Double, Double> graph = ...;
 
 // configure the iteration
 ScatterGatherConfiguration parameters = new ScatterGatherConfiguration();
@@ -525,7 +525,7 @@ The following example illustrates the usage of the degree as well as the number
 {{< tab "Java" >}}
 ```java
 
-Graph<Long, Double, Double> graph = ...
+Graph<Long, Double, Double> graph = ...;
 
 // configure the iteration
 ScatterGatherConfiguration parameters = new ScatterGatherConfiguration();
@@ -599,7 +599,7 @@ The following example illustrates the usage of the edge direction option. Vertic
 {{< tabs "5efb0e23-7fd4-4e08-9952-981d6e4f3b9e" >}}
 {{< tab "Java" >}}
 ```java
-Graph<Long, HashSet<Long>, Double> graph = ...
+Graph<Long, HashSet<Long>, Double> graph = ...;
 
 // configure the iteration
 ScatterGatherConfiguration parameters = new ScatterGatherConfiguration();
@@ -664,7 +664,7 @@ To implement this example in Gelly GSA, the user only needs to call the `runGath
 {{< tab "Java" >}}
 ```java
 // read the input graph
-Graph<Long, Double, Double> graph = ...
+Graph<Long, Double, Double> graph = ...;
 
 // define the maximum number of iterations
 int maxIterations = 10;
@@ -786,7 +786,7 @@ The following example illustrates the usage of the number of vertices option.
 {{< tab "Java" >}}
 ```java
 
-Graph<Long, Double, Double> graph = ...
+Graph<Long, Double, Double> graph = ...;
 
 // configure the iteration
 GSAConfiguration parameters = new GSAConfiguration();
@@ -868,7 +868,7 @@ The following example illustrates the usage of the edge direction option.
 {{< tab "Java" >}}
 ```java
 
-Graph<Long, HashSet<Long>, Double> graph = ...
+Graph<Long, HashSet<Long>, Double> graph = ...;
 
 // configure the iteration
 GSAConfiguration parameters = new GSAConfiguration();
diff --git a/docs/content/docs/libs/gelly/library_methods.md b/docs/content/docs/libs/gelly/library_methods.md
index c83650f..6976141 100644
--- a/docs/content/docs/libs/gelly/library_methods.md
+++ b/docs/content/docs/libs/gelly/library_methods.md
@@ -35,7 +35,7 @@ Gelly's library methods can be used by simply calling the `run()` method on the
 ```java
 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-Graph<Long, Long, NullValue> graph = ...
+Graph<Long, Long, NullValue> graph = ...;
 
 // run Label Propagation for 30 iterations to detect communities on the input graph
 DataSet<Vertex<Long, Long>> verticesWithCommunity = graph.run(new LabelPropagation<Long>(30));
diff --git a/docs/content/docs/libs/state_processor_api.md b/docs/content/docs/libs/state_processor_api.md
index f3be14e..82c9fb8 100644
--- a/docs/content/docs/libs/state_processor_api.md
+++ b/docs/content/docs/libs/state_processor_api.md
@@ -261,7 +261,7 @@ class ClickCounter implements AggregateFunction<Click, Integer, Integer> {
 	}
 }
 
-DataStream<Click> clicks = . . . 
+DataStream<Click> clicks = ...;
 
 clicks
     .keyBy(click -> click.userId)