You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ji...@apache.org on 2019/07/01 04:06:02 UTC
[flink] branch master updated: [FLINK-12897][python][docs] Improve
the Python Table API docs by adding more examples.
This is an automated email from the ASF dual-hosted git repository.
jincheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 534d53b [FLINK-12897][python][docs] Improve the Python Table API docs by adding more examples.
534d53b is described below
commit 534d53bf828404ef3dcd7aad9e7a8b6528606254
Author: Wei Zhong <we...@gmail.com>
AuthorDate: Thu Jun 27 20:30:36 2019 +0800
[FLINK-12897][python][docs] Improve the Python Table API docs by adding more examples.
This closes #8916
---
docs/dev/event_time.md | 11 +
docs/dev/event_time.zh.md | 11 +
docs/dev/event_timestamps_watermarks.md | 6 +
docs/dev/event_timestamps_watermarks.zh.md | 6 +
docs/dev/stream/state/checkpointing.md | 28 ++
docs/dev/stream/state/checkpointing.zh.md | 28 ++
docs/dev/stream/state/state_backends.md | 6 +
docs/dev/stream/state/state_backends.zh.md | 6 +
docs/dev/table/common.md | 210 ++++++++++++-
docs/dev/table/common.zh.md | 206 ++++++++++++-
docs/dev/table/connect.md | 335 ++++++++++++++++++++
docs/dev/table/connect.zh.md | 336 +++++++++++++++++++++
docs/dev/table/functions.md | 40 ++-
docs/dev/table/functions.zh.md | 40 ++-
docs/dev/table/sql.md | 24 ++
docs/dev/table/sql.zh.md | 25 ++
docs/dev/table/streaming/query_configuration.md | 35 +++
docs/dev/table/streaming/query_configuration.zh.md | 35 +++
docs/dev/table/streaming/time_attributes.md | 11 +
docs/dev/table/streaming/time_attributes.zh.md | 11 +
docs/dev/table/tableApi.md | 12 +-
docs/dev/table/tableApi.zh.md | 12 +-
.../pyflink/dataset/execution_environment.py | 8 +-
.../datastream/stream_execution_environment.py | 4 +-
flink-python/pyflink/table/__init__.py | 3 +-
flink-python/pyflink/table/descriptors.py | 10 +-
flink-python/pyflink/table/query_config.py | 9 +
flink-python/pyflink/table/sinks.py | 12 +-
flink-python/pyflink/table/table.py | 17 +-
flink-python/pyflink/table/table_environment.py | 167 ++++++++--
30 files changed, 1564 insertions(+), 100 deletions(-)
diff --git a/docs/dev/event_time.md b/docs/dev/event_time.md
index 1d747aa..5790746 100644
--- a/docs/dev/event_time.md
+++ b/docs/dev/event_time.md
@@ -131,6 +131,17 @@ stream
.addSink(...)
{% endhighlight %}
</div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+env = StreamExecutionEnvironment.get_execution_environment()
+
+env.set_stream_time_characteristic(TimeCharacteristic.ProcessingTime)
+
+# alternatively:
+# env.set_stream_time_characteristic(TimeCharacteristic.IngestionTime)
+# env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
+{% endhighlight %}
+</div>
</div>
diff --git a/docs/dev/event_time.zh.md b/docs/dev/event_time.zh.md
index 1d747aa..5790746 100644
--- a/docs/dev/event_time.zh.md
+++ b/docs/dev/event_time.zh.md
@@ -131,6 +131,17 @@ stream
.addSink(...)
{% endhighlight %}
</div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+env = StreamExecutionEnvironment.get_execution_environment()
+
+env.set_stream_time_characteristic(TimeCharacteristic.ProcessingTime)
+
+# alternatively:
+# env.set_stream_time_characteristic(TimeCharacteristic.IngestionTime)
+# env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
+{% endhighlight %}
+</div>
</div>
diff --git a/docs/dev/event_timestamps_watermarks.md b/docs/dev/event_timestamps_watermarks.md
index cb1c5d4..fdb716b 100644
--- a/docs/dev/event_timestamps_watermarks.md
+++ b/docs/dev/event_timestamps_watermarks.md
@@ -44,6 +44,12 @@ val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
{% endhighlight %}
</div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+env = StreamExecutionEnvironment.get_execution_environment()
+env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
+{% endhighlight %}
+</div>
</div>
## Assigning Timestamps
diff --git a/docs/dev/event_timestamps_watermarks.zh.md b/docs/dev/event_timestamps_watermarks.zh.md
index cb1c5d4..fdb716b 100644
--- a/docs/dev/event_timestamps_watermarks.zh.md
+++ b/docs/dev/event_timestamps_watermarks.zh.md
@@ -44,6 +44,12 @@ val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
{% endhighlight %}
</div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+env = StreamExecutionEnvironment.get_execution_environment()
+env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
+{% endhighlight %}
+</div>
</div>
## Assigning Timestamps
diff --git a/docs/dev/stream/state/checkpointing.md b/docs/dev/stream/state/checkpointing.md
index fab1a49..c193fc3 100644
--- a/docs/dev/stream/state/checkpointing.md
+++ b/docs/dev/stream/state/checkpointing.md
@@ -132,6 +132,34 @@ env.getCheckpointConfig.setFailTasksOnCheckpointingErrors(false)
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
{% endhighlight %}
</div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# start a checkpoint every 1000 ms
+env.enable_checkpointing(1000)
+
+# advanced options:
+
+# set mode to exactly-once (this is the default)
+env.get_checkpoint_config().set_checkpointing_mode(CheckpointingMode.EXACTLY_ONCE)
+
+# make sure 500 ms of progress happen between checkpoints
+env.get_checkpoint_config().set_min_pause_between_checkpoints(500)
+
+# checkpoints have to complete within one minute, or are discarded
+env.get_checkpoint_config().set_checkpoint_timeout(60000)
+
+# allow only one checkpoint to be in progress at the same time
+env.get_checkpoint_config().set_max_concurrent_checkpoints(1)
+
+# enable externalized checkpoints which are retained after job cancellation
+env.get_checkpoint_config().enable_externalized_checkpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
+
+# allow job recovery fallback to checkpoint when there is a more recent savepoint
+env.get_checkpoint_config().set_prefer_checkpoint_for_recovery(True)
+{% endhighlight %}
+</div>
</div>
### Related Config Options
diff --git a/docs/dev/stream/state/checkpointing.zh.md b/docs/dev/stream/state/checkpointing.zh.md
index fab1a49..c193fc3 100644
--- a/docs/dev/stream/state/checkpointing.zh.md
+++ b/docs/dev/stream/state/checkpointing.zh.md
@@ -132,6 +132,34 @@ env.getCheckpointConfig.setFailTasksOnCheckpointingErrors(false)
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
{% endhighlight %}
</div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# start a checkpoint every 1000 ms
+env.enable_checkpointing(1000)
+
+# advanced options:
+
+# set mode to exactly-once (this is the default)
+env.get_checkpoint_config().set_checkpointing_mode(CheckpointingMode.EXACTLY_ONCE)
+
+# make sure 500 ms of progress happen between checkpoints
+env.get_checkpoint_config().set_min_pause_between_checkpoints(500)
+
+# checkpoints have to complete within one minute, or are discarded
+env.get_checkpoint_config().set_checkpoint_timeout(60000)
+
+# allow only one checkpoint to be in progress at the same time
+env.get_checkpoint_config().set_max_concurrent_checkpoints(1)
+
+# enable externalized checkpoints which are retained after job cancellation
+env.get_checkpoint_config().enable_externalized_checkpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
+
+# allow job recovery fallback to checkpoint when there is a more recent savepoint
+env.get_checkpoint_config().set_prefer_checkpoint_for_recovery(True)
+{% endhighlight %}
+</div>
</div>
### Related Config Options
diff --git a/docs/dev/stream/state/state_backends.md b/docs/dev/stream/state/state_backends.md
index 8e32f8e..4cd6d06 100644
--- a/docs/dev/stream/state/state_backends.md
+++ b/docs/dev/stream/state/state_backends.md
@@ -43,6 +43,12 @@ val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.setStateBackend(...)
{% endhighlight %}
</div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+env = StreamExecutionEnvironment.get_execution_environment()
+env.set_state_backend(...)
+{% endhighlight %}
+</div>
</div>
{% top %}
diff --git a/docs/dev/stream/state/state_backends.zh.md b/docs/dev/stream/state/state_backends.zh.md
index 8e32f8e..4cd6d06 100644
--- a/docs/dev/stream/state/state_backends.zh.md
+++ b/docs/dev/stream/state/state_backends.zh.md
@@ -43,6 +43,12 @@ val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.setStateBackend(...)
{% endhighlight %}
</div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+env = StreamExecutionEnvironment.get_execution_environment()
+env.set_state_backend(...)
+{% endhighlight %}
+</div>
</div>
{% top %}
diff --git a/docs/dev/table/common.md b/docs/dev/table/common.md
index d6655f7..705a978 100644
--- a/docs/dev/table/common.md
+++ b/docs/dev/table/common.md
@@ -80,7 +80,7 @@ tableEnv.registerTableSink("outputTable", ...);
// create a Table from a Table API query
val tapiResult = tableEnv.scan("table1").select(...)
-// Create a Table from a SQL query
+// create a Table from a SQL query
val sqlResult = tableEnv.sqlQuery("SELECT ... FROM table2 ...")
// emit a Table API result Table to a TableSink, same for SQL result
@@ -91,6 +91,35 @@ env.execute()
{% endhighlight %}
</div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+# for batch programs use ExecutionEnvironment instead of StreamExecutionEnvironment
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# create a TableEnvironment
+table_env = StreamTableEnvironment.create(env)
+
+# register a Table
+table_env.register_table("table1", ...) # or
+table_env.register_table_source("table2", ...)
+
+# register an output Table
+table_env.register_table_sink("outputTable", ...);
+
+# create a Table from a Table API query
+tapi_result = table_env.scan("table1").select(...)
+# create a Table from a SQL query
+sql_result = table_env.sql_query("SELECT ... FROM table2 ...")
+
+# emit a Table API result Table to a TableSink, same for SQL result
+tapi_result.insert_into("outputTable")
+
+# execute
+env.execute()
+
+{% endhighlight %}
+</div>
</div>
**Note:** Table API and SQL queries can be easily integrated with and embedded into DataStream or DataSet programs. Have a look at the [Integration with DataStream and DataSet API](#integration-with-datastream-and-dataset-api) section to learn how DataStreams and DataSets can be converted into Tables and vice versa.
@@ -121,6 +150,7 @@ Make sure to choose the `BatchTableEnvironment`/`StreamTableEnvironment` that ma
// ***************
// STREAMING QUERY
// ***************
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;
StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
@@ -130,6 +160,7 @@ StreamTableEnvironment sTableEnv = StreamTableEnvironment.create(sEnv);
// ***********
// BATCH QUERY
// ***********
+import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.java.BatchTableEnvironment;
ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment();
@@ -143,6 +174,7 @@ BatchTableEnvironment bTableEnv = BatchTableEnvironment.create(bEnv);
// ***************
// STREAMING QUERY
// ***************
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.table.api.scala.StreamTableEnvironment
val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
@@ -152,6 +184,7 @@ val sTableEnv = StreamTableEnvironment.create(sEnv)
// ***********
// BATCH QUERY
// ***********
+import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.scala.BatchTableEnvironment
val bEnv = ExecutionEnvironment.getExecutionEnvironment
@@ -159,6 +192,30 @@ val bEnv = ExecutionEnvironment.getExecutionEnvironment
val bTableEnv = BatchTableEnvironment.create(bEnv)
{% endhighlight %}
</div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+# ***************
+# STREAMING QUERY
+# ***************
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.table import StreamTableEnvironment
+
+s_env = StreamExecutionEnvironment.get_execution_environment()
+# create a TableEnvironment for streaming queries
+s_table_env = StreamTableEnvironment.create(s_env)
+
+# ***********
+# BATCH QUERY
+# ***********
+from pyflink.dataset import ExecutionEnvironment
+from pyflink.table import BatchTableEnvironment
+
+b_env = ExecutionEnvironment.get_execution_environment()
+# create a TableEnvironment for batch queries
+b_table_env = BatchTableEnvironment.create(b_env)
+{% endhighlight %}
+</div>
</div>
{% top %}
@@ -186,10 +243,10 @@ A `Table` is registered in a `TableEnvironment` as follows:
// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
-// Table is the result of a simple projection query
+// table is the result of a simple projection query
Table projTable = tableEnv.scan("X").select(...);
-// register the Table projTable as table "projectedX"
+// register the Table projTable as table "projectedTable"
tableEnv.registerTable("projectedTable", projTable);
{% endhighlight %}
</div>
@@ -199,13 +256,26 @@ tableEnv.registerTable("projectedTable", projTable);
// get a TableEnvironment
val tableEnv = StreamTableEnvironment.create(env)
-// Table is the result of a simple projection query
+// table is the result of a simple projection query
val projTable: Table = tableEnv.scan("X").select(...)
-// register the Table projTable as table "projectedX"
+// register the Table projTable as table "projectedTable"
tableEnv.registerTable("projectedTable", projTable)
{% endhighlight %}
</div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+# get a TableEnvironment
+table_env = StreamTableEnvironment.create(env)
+
+# table is the result of a simple projection query
+proj_table = table_env.scan("X").select(...)
+
+# register the Table projTable as table "projectedTable"
+table_env.register_table("projectedTable", proj_table)
+{% endhighlight %}
+</div>
</div>
**Note:** A registered `Table` is treated similarly to a `VIEW` as known from relational database systems, i.e., the query that defines the `Table` is not optimized but will be inlined when another query references the registered `Table`. If multiple queries reference the same registered `Table`, it will be inlined for each referencing query and executed multiple times, i.e., the result of the registered `Table` will *not* be shared.
@@ -246,6 +316,19 @@ val csvSource: TableSource = new CsvTableSource("/path/to/file", ...)
tableEnv.registerTableSource("CsvTable", csvSource)
{% endhighlight %}
</div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+# get a TableEnvironment
+table_env = StreamTableEnvironment.create(env)
+
+# create a TableSource
+csv_source = CsvTableSource("/path/to/file", ...)
+
+# register the TableSource as table "csvTable"
+table_env.register_table_source("csvTable", csv_source)
+{% endhighlight %}
+</div>
</div>
{% top %}
@@ -292,6 +375,23 @@ val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.STRING, Types
tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink)
{% endhighlight %}
</div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+# get a TableEnvironment
+table_env = StreamTableEnvironment.create(env)
+
+# define the field names and types
+field_names = ["a", "b", "c"]
+field_types = [DataTypes.INT(), DataTypes.STRING(), DataTypes.BIGINT()]
+
+# create a TableSink
+csv_sink = CsvTableSink(field_names, field_types, "/path/to/file", ...)
+
+# register the TableSink as table "CsvSinkTable"
+table_env.register_table_sink("CsvSinkTable", csv_sink)
+{% endhighlight %}
+</div>
</div>
{% top %}
@@ -392,6 +492,26 @@ val revenue = orders
**Note:** The Scala Table API uses Scala Symbols, which start with a single tick (`'`) to reference the attributes of a `Table`. The Table API uses Scala implicits. Make sure to import `org.apache.flink.api.scala._` and `org.apache.flink.table.api.scala._` in order to use Scala implicit conversions.
</div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+# get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
+table_env = StreamTableEnvironment.create(env)
+
+# register Orders table
+
+# scan registered Orders table
+orders = table_env.scan("Orders")
+# compute revenue for all customers from France
+revenue = orders \
+ .filter("cCountry === 'FRANCE'") \
+ .group_by("cID, cName") \
+ .select("cID, cName, revenue.sum AS revSum")
+
+# emit or convert Table
+# execute query
+{% endhighlight %}
+</div>
</div>
{% top %}
@@ -445,6 +565,26 @@ val revenue = tableEnv.sqlQuery("""
{% endhighlight %}
</div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+# get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
+table_env = StreamTableEnvironment.create(env)
+
+# register Orders table
+
+# compute revenue for all customers from France
+revenue = table_env.sql_query(
+ "SELECT cID, cName, SUM(revenue) AS revSum "
+ "FROM Orders "
+ "WHERE cCountry = 'FRANCE' "
+ "GROUP BY cID, cName"
+)
+
+# emit or convert Table
+# execute query
+{% endhighlight %}
+</div>
</div>
The following example shows how to specify an update query that inserts its result into a registered table.
@@ -492,6 +632,27 @@ tableEnv.sqlUpdate("""
{% endhighlight %}
</div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+# get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
+table_env = StreamTableEnvironment.create(env)
+
+# register "Orders" table
+# register "RevenueFrance" output table
+
+# compute revenue for all customers from France and emit to "RevenueFrance"
+table_env.sql_update(
+ "INSERT INTO RevenueFrance "
+ "SELECT cID, cName, SUM(revenue) AS revSum "
+ "FROM Orders "
+ "WHERE cCountry = 'FRANCE' "
+ "GROUP BY cID, cName"
+)
+
+# execute query
+{% endhighlight %}
+</div>
</div>
{% top %}
@@ -563,6 +724,29 @@ result.insertInto("CsvSinkTable")
// execute the program
{% endhighlight %}
</div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+# get a TableEnvironment
+table_env = StreamTableEnvironment.create(env)
+
+field_names = ["a", "b", "c"]
+field_types = [DataTypes.INT(), DataTypes.STRING(), DataTypes.BIGINT()]
+
+# create a TableSink
+sink = CsvTableSink(field_names, field_types, "/path/to/file", "|")
+
+table_env.register_table_sink("CsvSinkTable", sink)
+
+# compute a result Table using Table API operators and/or SQL queries
+result = ...
+
+# emit the result Table to the registered TableSink
+result.insert_into("CsvSinkTable")
+
+# execute the program
+{% endhighlight %}
+</div>
</div>
{% top %}
@@ -1170,6 +1354,22 @@ val explanation: String = tEnv.explain(table)
println(explanation)
{% endhighlight %}
</div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+env = StreamExecutionEnvironment.get_execution_environment()
+t_env = StreamTableEnvironment.create(env)
+
+table1 = t_env.from_elements([(1, "hello")], ["count", "word"])
+table2 = t_env.from_elements([(1, "hello")], ["count", "word"])
+table = table1 \
+ .where("LIKE(word, 'F%')") \
+ .union_all(table2)
+
+explanation = t_env.explain(table)
+print(explanation)
+{% endhighlight %}
+</div>
</div>
{% highlight text %}
diff --git a/docs/dev/table/common.zh.md b/docs/dev/table/common.zh.md
index 5ab8664..2de8f74 100644
--- a/docs/dev/table/common.zh.md
+++ b/docs/dev/table/common.zh.md
@@ -80,7 +80,7 @@ tableEnv.registerTableSink("outputTable", ...);
// create a Table from a Table API query
val tapiResult = tableEnv.scan("table1").select(...)
-// Create a Table from a SQL query
+// create a Table from a SQL query
val sqlResult = tableEnv.sqlQuery("SELECT ... FROM table2 ...")
// emit a Table API result Table to a TableSink, same for SQL result
@@ -91,6 +91,35 @@ env.execute()
{% endhighlight %}
</div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+# for batch programs use ExecutionEnvironment instead of StreamExecutionEnvironment
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# create a TableEnvironment
+table_env = StreamTableEnvironment.create(env)
+
+# register a Table
+table_env.register_table("table1", ...) # or
+table_env.register_table_source("table2", ...)
+
+# register an output Table
+table_env.register_table_sink("outputTable", ...);
+
+# create a Table from a Table API query
+tapi_result = table_env.scan("table1").select(...)
+# create a Table from a SQL query
+sql_result = table_env.sql_query("SELECT ... FROM table2 ...")
+
+# emit a Table API result Table to a TableSink, same for SQL result
+tapi_result.insert_into("outputTable")
+
+# execute
+env.execute()
+
+{% endhighlight %}
+</div>
</div>
**Note:** Table API and SQL queries can be easily integrated with and embedded into DataStream or DataSet programs. Have a look at the [Integration with DataStream and DataSet API](#integration-with-datastream-and-dataset-api) section to learn how DataStreams and DataSets can be converted into Tables and vice versa.
@@ -121,6 +150,7 @@ Make sure to choose the `BatchTableEnvironment`/`StreamTableEnvironment` that ma
// ***************
// STREAMING QUERY
// ***************
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;
StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
@@ -130,6 +160,7 @@ StreamTableEnvironment sTableEnv = StreamTableEnvironment.create(sEnv);
// ***********
// BATCH QUERY
// ***********
+import org.apache.flink.api.java.ExecutionEnvironment
import org.apache.flink.table.api.java.BatchTableEnvironment;
ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment();
@@ -143,6 +174,7 @@ BatchTableEnvironment bTableEnv = BatchTableEnvironment.create(bEnv);
// ***************
// STREAMING QUERY
// ***************
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.table.api.scala.StreamTableEnvironment
val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
@@ -152,6 +184,7 @@ val sTableEnv = StreamTableEnvironment.create(sEnv)
// ***********
// BATCH QUERY
// ***********
+import org.apache.flink.api.java.ExecutionEnvironment
import org.apache.flink.table.api.scala.BatchTableEnvironment
val bEnv = ExecutionEnvironment.getExecutionEnvironment
@@ -159,6 +192,30 @@ val bEnv = ExecutionEnvironment.getExecutionEnvironment
val bTableEnv = BatchTableEnvironment.create(bEnv)
{% endhighlight %}
</div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+# ***************
+# STREAMING QUERY
+# ***************
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.table import StreamTableEnvironment
+
+s_env = StreamExecutionEnvironment.get_execution_environment()
+# create a TableEnvironment for streaming queries
+s_table_env = StreamTableEnvironment.create(s_env)
+
+# ***********
+# BATCH QUERY
+# ***********
+from pyflink.dataset import ExecutionEnvironment
+from pyflink.table import BatchTableEnvironment
+
+b_env = ExecutionEnvironment.get_execution_environment()
+# create a TableEnvironment for batch queries
+b_table_env = BatchTableEnvironment.create(b_env)
+{% endhighlight %}
+</div>
</div>
{% top %}
@@ -189,7 +246,7 @@ StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// Table is the result of a simple projection query
Table projTable = tableEnv.scan("X").select(...);
-// register the Table projTable as table "projectedX"
+// register the Table projTable as table "projectedTable"
tableEnv.registerTable("projectedTable", projTable);
{% endhighlight %}
</div>
@@ -202,10 +259,23 @@ val tableEnv = StreamTableEnvironment.create(env)
// Table is the result of a simple projection query
val projTable: Table = tableEnv.scan("X").select(...)
-// register the Table projTable as table "projectedX"
+// register the Table projTable as table "projectedTable"
tableEnv.registerTable("projectedTable", projTable)
{% endhighlight %}
</div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+# get a TableEnvironment
+table_env = StreamTableEnvironment.create(env)
+
+# Table is the result of a simple projection query
+proj_table = table_env.scan("X").select(...)
+
+# register the Table projTable as table "projectedTable"
+table_env.register_table("projectedTable", proj_table)
+{% endhighlight %}
+</div>
</div>
**Note:** A registered `Table` is treated similarly to a `VIEW` as known from relational database systems, i.e., the query that defines the `Table` is not optimized but will be inlined when another query references the registered `Table`. If multiple queries reference the same registered `Table`, it will be inlined for each referencing query and executed multiple times, i.e., the result of the registered `Table` will *not* be shared.
@@ -246,6 +316,19 @@ val csvSource: TableSource = new CsvTableSource("/path/to/file", ...)
tableEnv.registerTableSource("CsvTable", csvSource)
{% endhighlight %}
</div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+# get a TableEnvironment
+table_env = StreamTableEnvironment.create(env)
+
+# create a TableSource
+csv_source = CsvTableSource("/path/to/file", ...)
+
+# register the TableSource as table "csvTable"
+table_env.register_table_source("csvTable", csv_source)
+{% endhighlight %}
+</div>
</div>
{% top %}
@@ -292,6 +375,23 @@ val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.STRING, Types
tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink)
{% endhighlight %}
</div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+# get a TableEnvironment
+table_env = StreamTableEnvironment.create(env)
+
+# define the field names and types
+field_names = ["a", "b", "c"]
+field_types = [DataTypes.INT(), DataTypes.STRING(), DataTypes.BIGINT()]
+
+# create a TableSink
+csv_sink = CsvTableSink(field_names, field_types, "/path/to/file", ...)
+
+# register the TableSink as table "CsvSinkTable"
+table_env.register_table_sink("CsvSinkTable", csv_sink)
+{% endhighlight %}
+</div>
</div>
{% top %}
@@ -392,6 +492,26 @@ val revenue = orders
**Note:** The Scala Table API uses Scala Symbols, which start with a single tick (`'`) to reference the attributes of a `Table`. The Table API uses Scala implicits. Make sure to import `org.apache.flink.api.scala._` and `org.apache.flink.table.api.scala._` in order to use Scala implicit conversions.
</div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+# get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
+table_env = StreamTableEnvironment.create(env)
+
+# register Orders table
+
+# scan registered Orders table
+orders = table_env.scan("Orders")
+# compute revenue for all customers from France
+revenue = orders \
+ .filter("cCountry === 'FRANCE'") \
+ .group_by("cID, cName") \
+ .select("cID, cName, revenue.sum AS revSum")
+
+# emit or convert Table
+# execute query
+{% endhighlight %}
+</div>
</div>
{% top %}
@@ -445,6 +565,26 @@ val revenue = tableEnv.sqlQuery("""
{% endhighlight %}
</div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+# get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
+table_env = StreamTableEnvironment.create(env)
+
+# register Orders table
+
+# compute revenue for all customers from France
+revenue = table_env.sql_query(
+ "SELECT cID, cName, SUM(revenue) AS revSum "
+ "FROM Orders "
+ "WHERE cCountry = 'FRANCE' "
+ "GROUP BY cID, cName"
+)
+
+# emit or convert Table
+# execute query
+{% endhighlight %}
+</div>
</div>
The following example shows how to specify an update query that inserts its result into a registered table.
@@ -492,6 +632,27 @@ tableEnv.sqlUpdate("""
{% endhighlight %}
</div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+# get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
+table_env = StreamTableEnvironment.create(env)
+
+# register "Orders" table
+# register "RevenueFrance" output table
+
+# compute revenue for all customers from France and emit to "RevenueFrance"
+table_env.sql_update(
+ "INSERT INTO RevenueFrance "
+ "SELECT cID, cName, SUM(revenue) AS revSum "
+ "FROM Orders "
+ "WHERE cCountry = 'FRANCE' "
+ "GROUP BY cID, cName"
+)
+
+# execute query
+{% endhighlight %}
+</div>
</div>
{% top %}
@@ -563,6 +724,29 @@ result.insertInto("CsvSinkTable")
// execute the program
{% endhighlight %}
</div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+# get a TableEnvironment
+table_env = StreamTableEnvironment.create(env)
+
+field_names = ["a", "b", "c"]
+field_types = [DataTypes.INT(), DataTypes.STRING(), DataTypes.BIGINT()]
+
+# create a TableSink
+sink = CsvTableSink(field_names, field_types, "/path/to/file", "|")
+
+table_env.register_table_sink("CsvSinkTable", sink)
+
+# compute a result Table using Table API operators and/or SQL queries
+result = ...
+
+# emit the result Table to the registered TableSink
+result.insert_into("CsvSinkTable")
+
+# execute the program
+{% endhighlight %}
+</div>
</div>
{% top %}
@@ -1170,6 +1354,22 @@ val explanation: String = tEnv.explain(table)
println(explanation)
{% endhighlight %}
</div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+env = StreamExecutionEnvironment.get_execution_environment()
+t_env = StreamTableEnvironment.create(env)
+
+table1 = t_env.from_elements([(1, "hello")], ["count", "word"])
+table2 = t_env.from_elements([(1, "hello")], ["count", "word"])
+table = table1 \
+ .where("LIKE(word, 'F%')") \
+ .union_all(table2)
+
+explanation = t_env.explain(table)
+print(explanation)
+{% endhighlight %}
+</div>
</div>
{% highlight text %}
diff --git a/docs/dev/table/connect.md b/docs/dev/table/connect.md
index 98e0891..e365ee6 100644
--- a/docs/dev/table/connect.md
+++ b/docs/dev/table/connect.md
@@ -101,6 +101,17 @@ tableEnvironment
{% endhighlight %}
</div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+table_environment \
+ .connect(...) \
+ .with_format(...) \
+ .with_schema(...) \
+ .in_append_mode() \
+ .register_table_source("MyTable")
+{% endhighlight %}
+</div>
+
<div data-lang="YAML" markdown="1">
{% highlight yaml %}
name: MyTable
@@ -170,6 +181,50 @@ tableEnvironment
{% endhighlight %}
</div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+table_environment \
+ .connect( # declare the external system to connect to
+ Kafka()
+ .version("0.10")
+ .topic("test-input")
+ .start_from_earliest()
+ .property("zookeeper.connect", "localhost:2181")
+ .property("bootstrap.servers", "localhost:9092")
+ ) \
+ .with_format( # declare a format for this system
+ Avro()
+ .avro_schema(
+ "{"
+ " \"namespace\": \"org.myorganization\","
+ " \"type\": \"record\","
+ " \"name\": \"UserMessage\","
+ " \"fields\": ["
+ " {\"name\": \"timestamp\", \"type\": \"string\"},"
+ " {\"name\": \"user\", \"type\": \"long\"},"
+ " {\"name\": \"message\", \"type\": [\"string\", \"null\"]}"
+ " ]"
+ "}"
+ )
+ ) \
+ .with_schema( # declare the schema of the table
+ Schema()
+ .field("rowtime", DataTypes.TIMESTAMP())
+ .rowtime(
+ Rowtime()
+ .timestamps_from_field("timestamp")
+ .watermarks_periodic_bounded(60000)
+ )
+ .field("user", DataTypes.BIGINT())
+ .field("message", DataTypes.STRING())
+ ) \
+ .in_append_mode() \
+ .register_table_source("MyUserTable")
+ # specify the update-mode for streaming tables and
+ # register as source, sink, or both and under a name
+{% endhighlight %}
+</div>
+
<div data-lang="YAML" markdown="1">
{% highlight yaml %}
tables:
@@ -248,6 +303,17 @@ The following example shows a simple schema without time attributes and one-to-o
{% endhighlight %}
</div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+.with_schema(
+ Schema()
+ .field("MyField1", DataTypes.INT()) # required: specify the fields of the table (in this order)
+ .field("MyField2", DataTypes.STRING())
+ .field("MyField3", DataTypes.BOOLEAN())
+)
+{% endhighlight %}
+</div>
+
<div data-lang="YAML" markdown="1">
{% highlight yaml %}
schema:
@@ -278,6 +344,20 @@ For *each field*, the following properties can be declared in addition to the co
{% endhighlight %}
</div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+.with_schema(
+ Schema()
+ .field("MyField1", DataTypes.TIMESTAMP())
+ .proctime() # optional: declares this field as a processing-time attribute
+ .field("MyField2", DataTypes.TIMESTAMP())
+ .rowtime(...) # optional: declares this field as a event-time attribute
+ .field("MyField3", DataTypes.BOOLEAN())
+ .from_origin_field("mf3") # optional: original field in the input that is referenced/aliased by this field
+)
+{% endhighlight %}
+</div>
+
<div data-lang="YAML" markdown="1">
{% highlight yaml %}
schema:
@@ -330,6 +410,32 @@ The following timestamp extractors are supported:
{% endhighlight %}
</div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+# Converts an existing BIGINT or TIMESTAMP field in the input into the rowtime attribute.
+.rowtime(
+ Rowtime()
+ .timestamps_from_field("ts_field") # required: original field name in the input
+)
+
+# Converts the assigned timestamps into the rowtime attribute
+# and thus preserves the assigned timestamps from the source.
+# This requires a source that assigns timestamps (e.g., Kafka 0.10+).
+.rowtime(
+ Rowtime()
+ .timestamps_from_source()
+)
+
+# Sets a custom timestamp extractor to be used for the rowtime attribute.
+# The extractor must extend `org.apache.flink.table.sources.tsextractors.TimestampExtractor`.
+# Due to python can not accept java object, so it requires a full-qualified class name of the extractor.
+.rowtime(
+ Rowtime()
+ .timestamps_from_extractor(...)
+)
+{% endhighlight %}
+</div>
+
<div data-lang="YAML" markdown="1">
{% highlight yaml %}
# Converts an existing BIGINT or TIMESTAMP field in the input into the rowtime attribute.
@@ -376,6 +482,32 @@ The following watermark strategies are supported:
{% endhighlight %}
</div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+# Sets a watermark strategy for ascending rowtime attributes. Emits a watermark of the maximum
+# observed timestamp so far minus 1. Rows that have a timestamp equal to the max timestamp
+# are not late.
+.rowtime(
+ Rowtime()
+ .watermarks_periodic_ascending()
+)
+
+# Sets a built-in watermark strategy for rowtime attributes which are out-of-order by a bounded time interval.
+# Emits watermarks which are the maximum observed timestamp minus the specified delay.
+.rowtime(
+ Rowtime()
+ .watermarks_periodic_bounded(2000) # delay in milliseconds
+)
+
+# Sets a built-in watermark strategy which indicates the watermarks should be preserved from the
+# underlying DataStream API and thus preserves the assigned watermarks from the source.
+.rowtime(
+ Rowtime()
+ .watermarks_from_source()
+)
+{% endhighlight %}
+</div>
+
<div data-lang="YAML" markdown="1">
{% highlight yaml %}
# Sets a watermark strategy for ascending rowtime attributes. Emits a watermark of the maximum
@@ -457,6 +589,13 @@ For streaming queries, it is required to declare how to perform the [conversion
{% endhighlight %}
</div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+.connect(...) \
+ .in_append_mode() # otherwise: in_upsert_mode() or in_retract_mode()
+{% endhighlight %}
+</div>
+
<div data-lang="YAML" markdown="1">
{% highlight yaml %}
tables:
@@ -497,6 +636,15 @@ The file system connector allows for reading and writing from a local or distrib
{% endhighlight %}
</div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+.connect(
+ FileSystem()
+ .path("file:///path/to/whatever") # required: path to a file or directory
+)
+{% endhighlight %}
+</div>
+
<div data-lang="YAML" markdown="1">
{% highlight yaml %}
connector:
@@ -548,6 +696,32 @@ The Kafka connector allows for reading and writing from and to an Apache Kafka t
{% endhighlight %}
</div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+.connect(
+ Kafka()
+ .version("0.11") # required: valid connector versions are
+ # "0.8", "0.9", "0.10", "0.11", and "universal"
+ .topic("...") # required: topic name from which the table is read
+
+ # optional: connector specific properties
+ .property("zookeeper.connect", "localhost:2181")
+ .property("bootstrap.servers", "localhost:9092")
+ .property("group.id", "testGroup")
+
+ # optional: select a startup mode for Kafka offsets
+ .start_from_earliest()
+ .start_from_latest()
+ .start_from_specific_offsets(...)
+
+ # optional: output partitioning from Flink's partitions into Kafka's partitions
+ .sink_partitioner_fixed() # each Flink partition ends up in at-most one Kafka partition (default)
+ .sink_partitioner_round_robin() # a Flink partition is distributed to Kafka partitions round-robin
+ .sink_partitioner_custom("full.qualified.custom.class.name") # use a custom FlinkKafkaPartitioner subclass
+)
+{% endhighlight %}
+</div>
+
<div data-lang="YAML" markdown="1">
{% highlight yaml %}
connector:
@@ -648,6 +822,44 @@ The connector can be defined as follows:
{% endhighlight %}
</div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+.connect(
+ Elasticsearch()
+ .version("6") # required: valid connector versions are "6"
+ .host("localhost", 9200, "http") # required: one or more Elasticsearch hosts to connect to
+ .index("MyUsers") # required: Elasticsearch index
+ .document_type("user") # required: Elasticsearch document type
+
+ .key_delimiter("$") # optional: delimiter for composite keys ("_" by default)
+ # e.g., "$" would result in IDs "KEY1$KEY2$KEY3"
+ .key_null_literal("n/a") # optional: representation for null fields in keys ("null" by default)
+
+ # optional: failure handling strategy in case a request to Elasticsearch fails (fail by default)
+ .failure_handler_fail() # optional: throws an exception if a request fails and causes a job failure
+ .failure_handler_ignore() # or ignores failures and drops the request
+ .failure_handler_retry_rejected() # or re-adds requests that have failed due to queue capacity saturation
+ .failure_handler_custom(...) # or custom failure handling with a ActionRequestFailureHandler subclass
+
+ # optional: configure how to buffer elements before sending them in bulk to the cluster for efficiency
+ .disable_flush_on_checkpoint() # optional: disables flushing on checkpoint (see notes below!)
+ .bulk_flush_max_actions(42) # optional: maximum number of actions to buffer for each bulk request
+ .bulk_flush_max_size("42 mb") # optional: maximum size of buffered actions in bytes per bulk request
+ # (only MB granularity is supported)
+ .bulk_flush_interval(60000) # optional: bulk flush interval (in milliseconds)
+
+ .bulk_flush_backoff_constant() # optional: use a constant backoff type
+ .bulk_flush_backoff_exponential() # or use an exponential backoff type
+ .bulk_flush_backoff_max_retries(3) # optional: maximum number of retries
+ .bulk_flush_backoff_delay(30000) # optional: delay between each backoff attempt (in milliseconds)
+
+ # optional: connection properties to be used during REST communication to Elasticsearch
+ .connection_max_retry_timeout(3) # optional: maximum timeout (in milliseconds) between retries
+ .connection_path_prefix("/v1") # optional: prefix string to be added to every REST communication
+)
+{% endhighlight %}
+</div>
+
<div data-lang="YAML" markdown="1">
{% highlight yaml %}
connector:
@@ -754,6 +966,34 @@ The CSV format can be used as follows:
{% endhighlight %}
</div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+.with_format(
+ Csv()
+
+ # required: define the schema either by using type information
+ .schema(DataTypes.ROW(...))
+
+ # or use the table's schema
+ .derive_schema()
+
+ .field_delimiter(';') # optional: field delimiter character (',' by default)
+ .line_delimiter("\r\n") # optional: line delimiter ("\n" by default;
+ # otherwise "\r" or "\r\n" are allowed)
+ .quote_character('\'') # optional: quote character for enclosing field values ('"' by default)
+ .allow_comments() # optional: ignores comment lines that start with '#' (disabled by default);
+ # if enabled, make sure to also ignore parse errors to allow empty rows
+ .ignore_parse_errors() # optional: skip fields and rows with parse errors instead of failing;
+ # fields are set to null in case of errors
+ .array_element_delimiter("|") # optional: the array element delimiter string for separating
+ # array and row element values (";" by default)
+ .escape_character('\\') # optional: escape character for escaping values (disabled by default)
+ .null_literal("n/a") # optional: null literal string that is interpreted as a
+ # null value (disabled by default)
+)
+{% endhighlight %}
+</div>
+
<div data-lang="YAML" markdown="1">
{% highlight yaml %}
format:
@@ -872,6 +1112,37 @@ The JSON format can be used as follows:
{% endhighlight %}
</div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+.with_format(
+ Json()
+ .fail_on_missing_field(True) # optional: flag whether to fail if a field is missing or not, False by default
+
+ # required: define the schema either by using type information which parses numbers to corresponding types
+ .schema(DataTypes.ROW(...))
+
+ # or by using a JSON schema which parses to DECIMAL and TIMESTAMP
+ .json_schema(
+ "{"
+ " type: 'object',"
+ " properties: {"
+ " lon: {"
+ " type: 'number'"
+ " },"
+ " rideTime: {"
+ " type: 'string',"
+ " format: 'date-time'"
+ " }"
+ " }"
+ "}"
+ )
+
+ # or use the table's schema
+ .derive_schema()
+)
+{% endhighlight %}
+</div>
+
<div data-lang="YAML" markdown="1">
{% highlight yaml %}
format:
@@ -1004,6 +1275,29 @@ The Avro format can be used as follows:
{% endhighlight %}
</div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+.with_format(
+ Avro()
+
+ # required: define the schema either by using an Avro specific record class
+ .record_class("full.qualified.user.class.name")
+
+ # or by using an Avro schema
+ .avro_schema(
+ "{"
+ " \"type\": \"record\","
+ " \"name\": \"test\","
+ " \"fields\" : ["
+ " {\"name\": \"a\", \"type\": \"long\"},"
+ " {\"name\": \"b\", \"type\": \"string\"}"
+ " ]"
+ "}"
+ )
+)
+{% endhighlight %}
+</div>
+
<div data-lang="YAML" markdown="1">
{% highlight yaml %}
format:
@@ -1083,6 +1377,22 @@ Use the old one for stream/batch filesystem operations for now.
{% endhighlight %}
</div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+.with_format(
+ OldCsv()
+ .field("field1", DataTypes.STRING()) # required: ordered format fields
+ .field("field2", DataTypes.TIMESTAMP())
+ .field_delimiter(",") # optional: string delimiter "," by default
+ .line_delimiter("\n") # optional: string delimiter "\n" by default
+ .quote_character('"') # optional: single character for string values, empty by default
+ .comment_prefix('#') # optional: string to indicate comments, empty by default
+ .ignore_first_line() # optional: ignore the first line, by default it is not skipped
+ .ignore_parse_errors() # optional: skip records with parse error instead of failing by default
+)
+{% endhighlight %}
+</div>
+
<div data-lang="YAML" markdown="1">
{% highlight yaml %}
format:
@@ -1221,6 +1531,31 @@ val table: Table = ???
table.insertInto("csvOutputTable")
{% endhighlight %}
</div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+
+field_names = ["f0", "f1"]
+field_types = [DataTypes.STRING(), DataTypes.INT()]
+
+sink = CsvTableSink(
+ field_names,
+ field_types,
+ path, # output path
+ "|", # optional: delimit files by '|'
+ 1, # optional: write to a single file
+ WriteMode.OVERWRITE # optional: override existing files
+)
+
+table_env.register_table_sink(
+ "csvOutputTable",
+ sink
+)
+
+table = ...
+table.insert_into("csvOutputTable")
+{% endhighlight %}
+</div>
</div>
### JDBCAppendTableSink
diff --git a/docs/dev/table/connect.zh.md b/docs/dev/table/connect.zh.md
index 30dd05e..a685f72 100644
--- a/docs/dev/table/connect.zh.md
+++ b/docs/dev/table/connect.zh.md
@@ -101,6 +101,17 @@ tableEnvironment
{% endhighlight %}
</div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+table_environment \
+ .connect(...) \
+ .with_format(...) \
+ .with_schema(...) \
+ .in_append_mode() \
+ .register_table_source("MyTable")
+{% endhighlight %}
+</div>
+
<div data-lang="YAML" markdown="1">
{% highlight yaml %}
name: MyTable
@@ -170,6 +181,50 @@ tableEnvironment
{% endhighlight %}
</div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+table_environment \
+ .connect( # declare the external system to connect to
+ Kafka()
+ .version("0.10")
+ .topic("test-input")
+ .start_from_earliest()
+ .property("zookeeper.connect", "localhost:2181")
+ .property("bootstrap.servers", "localhost:9092")
+ ) \
+ .with_format( # declare a format for this system
+ Avro()
+ .avro_schema(
+ "{"
+ " \"namespace\": \"org.myorganization\","
+ " \"type\": \"record\","
+ " \"name\": \"UserMessage\","
+ " \"fields\": ["
+ " {\"name\": \"timestamp\", \"type\": \"string\"},"
+ " {\"name\": \"user\", \"type\": \"long\"},"
+ " {\"name\": \"message\", \"type\": [\"string\", \"null\"]}"
+ " ]"
+ "}"
+ )
+ ) \
+ .with_schema( # declare the schema of the table
+ Schema()
+ .field("rowtime", DataTypes.TIMESTAMP())
+ .rowtime(
+ Rowtime()
+ .timestamps_from_field("timestamp")
+ .watermarks_periodic_bounded(60000)
+ )
+ .field("user", DataTypes.BIGINT())
+ .field("message", DataTypes.STRING())
+ ) \
+ .in_append_mode() \
+ .register_table_source("MyUserTable")
+ # specify the update-mode for streaming tables and
+ # register as source, sink, or both and under a name
+{% endhighlight %}
+</div>
+
<div data-lang="YAML" markdown="1">
{% highlight yaml %}
tables:
@@ -248,6 +303,17 @@ The following example shows a simple schema without time attributes and one-to-o
{% endhighlight %}
</div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+.with_schema(
+ Schema()
+ .field("MyField1", DataTypes.INT()) # required: specify the fields of the table (in this order)
+ .field("MyField2", DataTypes.STRING())
+ .field("MyField3", DataTypes.BOOLEAN())
+)
+{% endhighlight %}
+</div>
+
<div data-lang="YAML" markdown="1">
{% highlight yaml %}
schema:
@@ -278,6 +344,20 @@ For *each field*, the following properties can be declared in addition to the co
{% endhighlight %}
</div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+.with_schema(
+ Schema()
+ .field("MyField1", DataTypes.TIMESTAMP())
+ .proctime() # optional: declares this field as a processing-time attribute
+ .field("MyField2", DataTypes.TIMESTAMP())
+ .rowtime(...) # optional: declares this field as a event-time attribute
+ .field("MyField3", DataTypes.BOOLEAN())
+ .from_origin_field("mf3") # optional: original field in the input that is referenced/aliased by this field
+)
+{% endhighlight %}
+</div>
+
<div data-lang="YAML" markdown="1">
{% highlight yaml %}
schema:
@@ -330,6 +410,32 @@ The following timestamp extractors are supported:
{% endhighlight %}
</div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+# Converts an existing BIGINT or TIMESTAMP field in the input into the rowtime attribute.
+.rowtime(
+ Rowtime()
+ .timestamps_from_field("ts_field") # required: original field name in the input
+)
+
+# Converts the assigned timestamps into the rowtime attribute
+# and thus preserves the assigned timestamps from the source.
+# This requires a source that assigns timestamps (e.g., Kafka 0.10+).
+.rowtime(
+ Rowtime()
+ .timestamps_from_source()
+)
+
+# Sets a custom timestamp extractor to be used for the rowtime attribute.
+# The extractor must extend `org.apache.flink.table.sources.tsextractors.TimestampExtractor`.
+# Due to python can not accept java object, so it requires a full-qualified class name of the extractor.
+.rowtime(
+ Rowtime()
+ .timestamps_from_extractor(...)
+)
+{% endhighlight %}
+</div>
+
<div data-lang="YAML" markdown="1">
{% highlight yaml %}
# Converts an existing BIGINT or TIMESTAMP field in the input into the rowtime attribute.
@@ -376,6 +482,32 @@ The following watermark strategies are supported:
{% endhighlight %}
</div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+# Sets a watermark strategy for ascending rowtime attributes. Emits a watermark of the maximum
+# observed timestamp so far minus 1. Rows that have a timestamp equal to the max timestamp
+# are not late.
+.rowtime(
+ Rowtime()
+ .watermarks_periodic_ascending()
+)
+
+# Sets a built-in watermark strategy for rowtime attributes which are out-of-order by a bounded time interval.
+# Emits watermarks which are the maximum observed timestamp minus the specified delay.
+.rowtime(
+ Rowtime()
+ .watermarks_periodic_bounded(2000) # delay in milliseconds
+)
+
+# Sets a built-in watermark strategy which indicates the watermarks should be preserved from the
+# underlying DataStream API and thus preserves the assigned watermarks from the source.
+.rowtime(
+ Rowtime()
+ .watermarks_from_source()
+)
+{% endhighlight %}
+</div>
+
<div data-lang="YAML" markdown="1">
{% highlight yaml %}
# Sets a watermark strategy for ascending rowtime attributes. Emits a watermark of the maximum
@@ -457,6 +589,13 @@ For streaming queries, it is required to declare how to perform the [conversion
{% endhighlight %}
</div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+.connect(...) \
+ .in_append_mode() # otherwise: in_upsert_mode() or in_retract_mode()
+{% endhighlight %}
+</div>
+
<div data-lang="YAML" markdown="1">
{% highlight yaml %}
tables:
@@ -497,6 +636,15 @@ The file system connector allows for reading and writing from a local or distrib
{% endhighlight %}
</div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+.connect(
+ FileSystem()
+ .path("file:///path/to/whatever") # required: path to a file or directory
+)
+{% endhighlight %}
+</div>
+
<div data-lang="YAML" markdown="1">
{% highlight yaml %}
connector:
@@ -548,6 +696,32 @@ The Kafka connector allows for reading and writing from and to an Apache Kafka t
{% endhighlight %}
</div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+.connect(
+ Kafka()
+ .version("0.11") # required: valid connector versions are
+ # "0.8", "0.9", "0.10", "0.11", and "universal"
+ .topic("...") # required: topic name from which the table is read
+
+ # optional: connector specific properties
+ .property("zookeeper.connect", "localhost:2181")
+ .property("bootstrap.servers", "localhost:9092")
+ .property("group.id", "testGroup")
+
+ # optional: select a startup mode for Kafka offsets
+ .start_from_earliest()
+ .start_from_latest()
+ .start_from_specific_offsets(...)
+
+ # optional: output partitioning from Flink's partitions into Kafka's partitions
+ .sink_partitioner_fixed() # each Flink partition ends up in at-most one Kafka partition (default)
+ .sink_partitioner_round_robin() # a Flink partition is distributed to Kafka partitions round-robin
+ .sink_partitioner_custom("full.qualified.custom.class.name") # use a custom FlinkKafkaPartitioner subclass
+)
+{% endhighlight %}
+</div>
+
<div data-lang="YAML" markdown="1">
{% highlight yaml %}
connector:
@@ -648,6 +822,44 @@ The connector can be defined as follows:
{% endhighlight %}
</div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+.connect(
+ Elasticsearch()
+ .version("6") # required: valid connector versions are "6"
+ .host("localhost", 9200, "http") # required: one or more Elasticsearch hosts to connect to
+ .index("MyUsers") # required: Elasticsearch index
+ .document_type("user") # required: Elasticsearch document type
+
+ .key_delimiter("$") # optional: delimiter for composite keys ("_" by default)
+ # e.g., "$" would result in IDs "KEY1$KEY2$KEY3"
+ .key_null_literal("n/a") # optional: representation for null fields in keys ("null" by default)
+
+ # optional: failure handling strategy in case a request to Elasticsearch fails (fail by default)
+ .failure_handler_fail() # optional: throws an exception if a request fails and causes a job failure
+ .failure_handler_ignore() # or ignores failures and drops the request
+ .failure_handler_retry_rejected() # or re-adds requests that have failed due to queue capacity saturation
+ .failure_handler_custom(...) # or custom failure handling with a ActionRequestFailureHandler subclass
+
+ # optional: configure how to buffer elements before sending them in bulk to the cluster for efficiency
+ .disable_flush_on_checkpoint() # optional: disables flushing on checkpoint (see notes below!)
+ .bulk_flush_max_actions(42) # optional: maximum number of actions to buffer for each bulk request
+ .bulk_flush_max_size("42 mb") # optional: maximum size of buffered actions in bytes per bulk request
+ # (only MB granularity is supported)
+ .bulk_flush_interval(60000) # optional: bulk flush interval (in milliseconds)
+
+ .bulk_flush_backoff_constant() # optional: use a constant backoff type
+ .bulk_flush_backoff_exponential() # or use an exponential backoff type
+ .bulk_flush_backoff_max_retries(3) # optional: maximum number of retries
+ .bulk_flush_backoff_delay(30000) # optional: delay between each backoff attempt (in milliseconds)
+
+ # optional: connection properties to be used during REST communication to Elasticsearch
+ .connection_max_retry_timeout(3) # optional: maximum timeout (in milliseconds) between retries
+ .connection_path_prefix("/v1") # optional: prefix string to be added to every REST communication
+)
+{% endhighlight %}
+</div>
+
<div data-lang="YAML" markdown="1">
{% highlight yaml %}
connector:
@@ -754,6 +966,34 @@ The CSV format can be used as follows:
{% endhighlight %}
</div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+.with_format(
+ Csv()
+
+ # required: define the schema either by using type information
+ .schema(DataTypes.ROW(...))
+
+ # or use the table's schema
+ .derive_schema()
+
+ .field_delimiter(";") # optional: field delimiter character ("," by default)
+ .line_delimiter("\r\n") # optional: line delimiter ("\n" by default;
+ # otherwise "\r" or "\r\n" are allowed)
+ .quote_character("'") # optional: quote character for enclosing field values ('"' by default)
+ .allow_comments() # optional: ignores comment lines that start with "#" (disabled by default);
+ # if enabled, make sure to also ignore parse errors to allow empty rows
+ .ignore_parse_errors() # optional: skip fields and rows with parse errors instead of failing;
+ # fields are set to null in case of errors
+ .array_element_delimiter("|") # optional: the array element delimiter string for separating
+ # array and row element values (";" by default)
+ .escape_character("\\") # optional: escape character for escaping values (disabled by default)
+ .null_literal("n/a") # optional: null literal string that is interpreted as a
+ # null value (disabled by default)
+)
+{% endhighlight %}
+</div>
+
<div data-lang="YAML" markdown="1">
{% highlight yaml %}
format:
@@ -872,6 +1112,37 @@ The JSON format can be used as follows:
{% endhighlight %}
</div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+.with_format(
+ Json()
+ .fail_on_missing_field(True) # optional: flag whether to fail if a field is missing or not, False by default
+
+ # required: define the schema either by using type information which parses numbers to corresponding types
+ .schema(DataTypes.ROW(...))
+
+ # or by using a JSON schema which parses to DECIMAL and TIMESTAMP
+ .json_schema(
+ "{"
+ " type: 'object',"
+ " properties: {"
+ " lon: {"
+ " type: 'number'"
+ " },"
+ " rideTime: {"
+ " type: 'string',"
+ " format: 'date-time'"
+ " }"
+ " }"
+ "}"
+ )
+
+ # or use the table's schema
+ .derive_schema()
+)
+{% endhighlight %}
+</div>
+
<div data-lang="YAML" markdown="1">
{% highlight yaml %}
format:
@@ -1004,6 +1275,29 @@ The Avro format can be used as follows:
{% endhighlight %}
</div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+.with_format(
+ Avro()
+
+ # required: define the schema either by using an Avro specific record class
+ .record_class("full.qualified.user.class.name")
+
+ # or by using an Avro schema
+ .avro_schema(
+ "{"
+ " \"type\": \"record\","
+ " \"name\": \"test\","
+ " \"fields\" : ["
+ " {\"name\": \"a\", \"type\": \"long\"},"
+ " {\"name\": \"b\", \"type\": \"string\"}"
+ " ]"
+ "}"
+ )
+)
+{% endhighlight %}
+</div>
+
<div data-lang="YAML" markdown="1">
{% highlight yaml %}
format:
@@ -1083,6 +1377,22 @@ Use the old one for stream/batch filesystem operations for now.
{% endhighlight %}
</div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+.with_format(
+ OldCsv()
+ .field("field1", DataTypes.STRING()) # required: ordered format fields
+ .field("field2", DataTypes.TIMESTAMP())
+ .field_delimiter(",") # optional: string delimiter "," by default
+ .line_delimiter("\n") # optional: string delimiter "\n" by default
+ .quote_character('"') # optional: single character for string values, empty by default
+ .comment_prefix('#') # optional: string to indicate comments, empty by default
+ .ignore_first_line() # optional: ignore the first line, by default it is not skipped
+ .ignore_parse_errors() # optional: skip records with parse error instead of failing by default
+)
+{% endhighlight %}
+</div>
+
<div data-lang="YAML" markdown="1">
{% highlight yaml %}
format:
@@ -1221,6 +1531,32 @@ val table: Table = ???
table.insertInto("csvOutputTable")
{% endhighlight %}
</div>
+
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+
+field_names = ["f0", "f1"]
+field_types = [DataTypes.STRING(), DataTypes.INT()]
+
+sink = CsvTableSink(
+ field_names,
+ field_types,
+ path, # output path
+ "|", # optional: delimit files by '|'
+ 1, # optional: write to a single file
+ WriteMode.OVERWRITE # optional: override existing files
+)
+
+table_env.register_table_sink(
+ "csvOutputTable",
+ sink
+)
+
+table = ...
+table.insert_into("csvOutputTable")
+{% endhighlight %}
+</div>
</div>
### JDBCAppendTableSink
diff --git a/docs/dev/table/functions.md b/docs/dev/table/functions.md
index 3179838..0939a9a 100644
--- a/docs/dev/table/functions.md
+++ b/docs/dev/table/functions.md
@@ -316,7 +316,7 @@ value NOT IN (sub-query)
</table>
</div>
-<div data-lang="java" markdown="1">
+<div data-lang="Java/Python" markdown="1">
<table class="table table-bordered">
<thead>
<tr>
@@ -429,7 +429,7 @@ STRING1.like(STRING2)
<tr>
<td>
{% highlight java %}
-STRING.similar(STRING)
+STRING1.similar(STRING2)
{% endhighlight %}
</td>
<td>
@@ -796,7 +796,7 @@ boolean IS NOT UNKNOWN
</table>
</div>
-<div data-lang="java" markdown="1">
+<div data-lang="Java/Python" markdown="1">
<table class="table table-bordered">
<thead>
<tr>
@@ -1471,7 +1471,7 @@ TRUNCATE(numeric1, integer2)
</table>
</div>
-<div data-lang="java" markdown="1">
+<div data-lang="Java/Python" markdown="1">
<table class="table table-bordered">
<thead>
<tr>
@@ -2693,7 +2693,7 @@ TO_BASE64(string)
</table>
</div>
-<div data-lang="java" markdown="1">
+<div data-lang="Java/Python" markdown="1">
<table class="table table-bordered">
<thead>
@@ -3545,7 +3545,7 @@ TIMESTAMPDIFF(timepointunit, timepoint1, timepoint2)
</table>
</div>
-<div data-lang="java" markdown="1">
+<div data-lang="Java/Python" markdown="1">
<table class="table table-bordered">
<thead>
<tr>
@@ -4185,7 +4185,7 @@ COALESCE(value1, value2 [, value3 ]* )
</table>
</div>
-<div data-lang="java" markdown="1">
+<div data-lang="Java/Python" markdown="1">
<table class="table table-bordered">
<thead>
<tr>
@@ -4267,7 +4267,7 @@ CAST(value AS type)
</table>
</div>
-<div data-lang="java" markdown="1">
+<div data-lang="Java/Python" markdown="1">
<table class="table table-bordered">
<thead>
<tr>
@@ -4393,7 +4393,7 @@ map ‘[’ value ‘]’
</table>
</div>
-<div data-lang="java" markdown="1">
+<div data-lang="Java/Python" markdown="1">
<table class="table table-bordered">
<thead>
<tr>
@@ -4581,7 +4581,7 @@ MAP ‘[’ value1, value2 [, value3, value4 ]* ‘]’
</table>
</div>
-<div data-lang="java" markdown="1">
+<div data-lang="Java/Python" markdown="1">
<table class="table table-bordered">
<thead>
<tr>
@@ -4738,7 +4738,7 @@ tableName.compositeType.*
</table>
</div>
-<div data-lang="java" markdown="1">
+<div data-lang="Java/Python" markdown="1">
<table class="table table-bordered">
<thead>
<tr>
@@ -4856,7 +4856,7 @@ GROUPING_ID(expression1 [, expression2]* )
</table>
</div>
-<div data-lang="Java" markdown="1">
+<div data-lang="Java/Python" markdown="1">
<table class="table table-bordered">
<thead>
<tr>
@@ -4977,7 +4977,7 @@ SHA2(string, hashLength)
</table>
</div>
-<div data-lang="java" markdown="1">
+<div data-lang="Java/Python" markdown="1">
<table class="table table-bordered">
<thead>
<tr>
@@ -5180,7 +5180,7 @@ STRING.sha2(INT)
</table>
</div>
-<div data-lang="java" markdown="1">
+<div data-lang="Java/Python" markdown="1">
<table class="table table-bordered">
<thead>
<tr>
@@ -5372,7 +5372,7 @@ COLLECT([ ALL | DISTINCT ] expression)
</div>
-<div data-lang="java" markdown="1">
+<div data-lang="Java/Python" markdown="1">
<table class="table table-bordered">
<thead>
<tr>
@@ -5832,7 +5832,7 @@ The usage of the column function is illustrated in the following table. (Suppose
<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
+<div data-lang="Java/Python" markdown="1">
<table class="table table-bordered">
<thead>
@@ -6077,6 +6077,14 @@ table
.select(withColumns('a to 'b), myUDAgg(myUDF(withColumns(5 to 20))))
{% endhighlight %}
</div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+table \
+ .group_by("withColumns(1 to 3)") \
+ .select("withColumns(a to b), myUDAgg(myUDF(withColumns(5 to 20)))")
+{% endhighlight %}
+</div>
</div>
<span class="label label-info">Note</span> Column functions are only used in Table API.
diff --git a/docs/dev/table/functions.zh.md b/docs/dev/table/functions.zh.md
index aaba9ca..76a03e0 100644
--- a/docs/dev/table/functions.zh.md
+++ b/docs/dev/table/functions.zh.md
@@ -316,7 +316,7 @@ value NOT IN (sub-query)
</table>
</div>
-<div data-lang="java" markdown="1">
+<div data-lang="Java/Python" markdown="1">
<table class="table table-bordered">
<thead>
<tr>
@@ -429,7 +429,7 @@ STRING1.like(STRING2)
<tr>
<td>
{% highlight java %}
-STRING.similar(STRING)
+STRING1.similar(STRING2)
{% endhighlight %}
</td>
<td>
@@ -796,7 +796,7 @@ boolean IS NOT UNKNOWN
</table>
</div>
-<div data-lang="java" markdown="1">
+<div data-lang="Java/Python" markdown="1">
<table class="table table-bordered">
<thead>
<tr>
@@ -1471,7 +1471,7 @@ TRUNCATE(numeric1, integer2)
</table>
</div>
-<div data-lang="java" markdown="1">
+<div data-lang="Java/Python" markdown="1">
<table class="table table-bordered">
<thead>
<tr>
@@ -2693,7 +2693,7 @@ TO_BASE64(string)
</table>
</div>
-<div data-lang="java" markdown="1">
+<div data-lang="Java/Python" markdown="1">
<table class="table table-bordered">
<thead>
@@ -3545,7 +3545,7 @@ TIMESTAMPDIFF(timepointunit, timepoint1, timepoint2)
</table>
</div>
-<div data-lang="java" markdown="1">
+<div data-lang="Java/Python" markdown="1">
<table class="table table-bordered">
<thead>
<tr>
@@ -4185,7 +4185,7 @@ COALESCE(value1, value2 [, value3 ]* )
</table>
</div>
-<div data-lang="java" markdown="1">
+<div data-lang="Java/Python" markdown="1">
<table class="table table-bordered">
<thead>
<tr>
@@ -4267,7 +4267,7 @@ CAST(value AS type)
</table>
</div>
-<div data-lang="java" markdown="1">
+<div data-lang="Java/Python" markdown="1">
<table class="table table-bordered">
<thead>
<tr>
@@ -4393,7 +4393,7 @@ map ‘[’ value ‘]’
</table>
</div>
-<div data-lang="java" markdown="1">
+<div data-lang="Java/Python" markdown="1">
<table class="table table-bordered">
<thead>
<tr>
@@ -4581,7 +4581,7 @@ MAP ‘[’ value1, value2 [, value3, value4 ]* ‘]’
</table>
</div>
-<div data-lang="java" markdown="1">
+<div data-lang="Java/Python" markdown="1">
<table class="table table-bordered">
<thead>
<tr>
@@ -4738,7 +4738,7 @@ tableName.compositeType.*
</table>
</div>
-<div data-lang="java" markdown="1">
+<div data-lang="Java/Python" markdown="1">
<table class="table table-bordered">
<thead>
<tr>
@@ -4856,7 +4856,7 @@ GROUPING_ID(expression1 [, expression2]* )
</table>
</div>
-<div data-lang="Java" markdown="1">
+<div data-lang="Java/Python" markdown="1">
<table class="table table-bordered">
<thead>
<tr>
@@ -4977,7 +4977,7 @@ SHA2(string, hashLength)
</table>
</div>
-<div data-lang="java" markdown="1">
+<div data-lang="Java/Python" markdown="1">
<table class="table table-bordered">
<thead>
<tr>
@@ -5180,7 +5180,7 @@ STRING.sha2(INT)
</table>
</div>
-<div data-lang="java" markdown="1">
+<div data-lang="Java/Python" markdown="1">
<table class="table table-bordered">
<thead>
<tr>
@@ -5372,7 +5372,7 @@ COLLECT([ ALL | DISTINCT ] expression)
</div>
-<div data-lang="java" markdown="1">
+<div data-lang="Java/Python" markdown="1">
<table class="table table-bordered">
<thead>
<tr>
@@ -5832,7 +5832,7 @@ The usage of the column function is illustrated in the following table. (Suppose
<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
+<div data-lang="Java/Python" markdown="1">
<table class="table table-bordered">
<thead>
@@ -6077,6 +6077,14 @@ table
.select(withColumns('a to 'b), myUDAgg(myUDF(withColumns(5 to 20))))
{% endhighlight %}
</div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+table \
+ .group_by("withColumns(1 to 3)") \
+ .select("withColumns(a to b), myUDAgg(myUDF(withColumns(5 to 20)))")
+{% endhighlight %}
+</div>
</div>
<span class="label label-info">Note</span> Column functions are only used in Table API.
diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index 18ade21..f297409 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -102,6 +102,30 @@ tableEnv.sqlUpdate(
"INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
{% endhighlight %}
</div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+env = StreamExecutionEnvironment.get_execution_environment()
+table_env = StreamTableEnvironment.create(env)
+
+# SQL query with an inlined (unregistered) table
+# elements data type: BIGINT, STRING, BIGINT
+table = table_env.from_elements(..., ['user', 'product', 'amount'])
+result = table_env \
+ .sql_query("SELECT SUM(amount) FROM %s WHERE product LIKE '%%Rubber%%'" % table)
+
+# SQL update with a registered table
+# create and register a TableSink
+table_env.register_table("Orders", table)
+field_names = ["product", "amount"]
+field_types = [DataTypes.STRING(), DataTypes.BIGINT()]
+csv_sink = CsvTableSink(field_names, field_types, "/path/to/file", ...)
+table_env.register_table_sink("RubberOrders", csv_sink)
+# run a SQL update query on the Table and emit the result to the TableSink
+table_env \
+ .sql_update("INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
+{% endhighlight %}
+</div>
</div>
{% top %}
diff --git a/docs/dev/table/sql.zh.md b/docs/dev/table/sql.zh.md
index 6f16c9c..60c4571 100644
--- a/docs/dev/table/sql.zh.md
+++ b/docs/dev/table/sql.zh.md
@@ -102,6 +102,31 @@ tableEnv.sqlUpdate(
"INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
{% endhighlight %}
</div>
+
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+env = StreamExecutionEnvironment.get_execution_environment()
+table_env = StreamTableEnvironment.create(env)
+
+# SQL query with an inlined (unregistered) table
+# elements data type: BIGINT, STRING, BIGINT
+table = table_env.from_elements(..., ['user', 'product', 'amount'])
+result = table_env \
+ .sql_query("SELECT SUM(amount) FROM %s WHERE product LIKE '%%Rubber%%'" % table)
+
+# SQL update with a registered table
+# create and register a TableSink
+table_env.register_table("Orders", table)
+field_names = ["product", "amount"]
+field_types = [DataTypes.STRING(), DataTypes.BIGINT()]
+csv_sink = CsvTableSink(field_names, field_types, "/path/to/file", ...)
+table_env.register_table_sink("RubberOrders", csv_sink)
+# run a SQL update query on the Table and emit the result to the TableSink
+table_env \
+ .sql_update("INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
+{% endhighlight %}
+</div>
</div>
{% top %}
diff --git a/docs/dev/table/streaming/query_configuration.md b/docs/dev/table/streaming/query_configuration.md
index 58ee506..1e0527d 100644
--- a/docs/dev/table/streaming/query_configuration.md
+++ b/docs/dev/table/streaming/query_configuration.md
@@ -89,6 +89,31 @@ val stream: DataStream[Row] = result.toAppendStream[Row](qConfig)
{% endhighlight %}
</div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+env = StreamExecutionEnvironment.get_execution_environment()
+table_env = StreamTableEnvironment.create(env)
+
+# obtain query configuration from TableEnvironment
+q_config = StreamQueryConfig()
+# set query parameters
+q_config.with_idle_state_retention_time(timedelta(hours=12), timedelta(hours=24))
+
+# define query
+result = ...
+
+# create TableSink
+sink = ...
+
+# register TableSink
+table_env.register_table_sink("outputTable", # table name
+ sink) # table sink
+
+# emit result Table via a TableSink
+result.insert_into("outputTable", q_config)
+
+{% endhighlight %}
+</div>
</div>
In the following we describe the parameters of the `QueryConfig` and how they affect the accuracy and resource consumption of a query.
@@ -137,6 +162,16 @@ qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(24))
{% endhighlight %}
</div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+
+q_config = ... # type: StreamQueryConfig
+
+# set idle state retention time: min = 12 hours, max = 24 hours
+q_config.with_idle_state_retention_time(timedelta(hours=12), timedelta(hours=24))
+
+{% endhighlight %}
+</div>
</div>
Cleaning up state requires additional bookkeeping which becomes less expensive for larger differences of `minTime` and `maxTime`. The difference between `minTime` and `maxTime` must be at least 5 minutes.
diff --git a/docs/dev/table/streaming/query_configuration.zh.md b/docs/dev/table/streaming/query_configuration.zh.md
index 58ee506..1e0527d 100644
--- a/docs/dev/table/streaming/query_configuration.zh.md
+++ b/docs/dev/table/streaming/query_configuration.zh.md
@@ -89,6 +89,31 @@ val stream: DataStream[Row] = result.toAppendStream[Row](qConfig)
{% endhighlight %}
</div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+env = StreamExecutionEnvironment.get_execution_environment()
+table_env = StreamTableEnvironment.create(env)
+
+# obtain query configuration from TableEnvironment
+q_config = StreamQueryConfig()
+# set query parameters
+q_config.with_idle_state_retention_time(timedelta(hours=12), timedelta(hours=24))
+
+# define query
+result = ...
+
+# create TableSink
+sink = ...
+
+# register TableSink
+table_env.register_table_sink("outputTable", # table name
+ sink) # table sink
+
+# emit result Table via a TableSink
+result.insert_into("outputTable", q_config)
+
+{% endhighlight %}
+</div>
</div>
In the following we describe the parameters of the `QueryConfig` and how they affect the accuracy and resource consumption of a query.
@@ -137,6 +162,16 @@ qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(24))
{% endhighlight %}
</div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+
+q_config = ... # type: StreamQueryConfig
+
+# set idle state retention time: min = 12 hours, max = 24 hours
+q_config.with_idle_state_retention_time(timedelta(hours=12), timedelta(hours=24))
+
+{% endhighlight %}
+</div>
</div>
Cleaning up state requires additional bookkeeping which becomes less expensive for larger differences of `minTime` and `maxTime`. The difference between `minTime` and `maxTime` must be at least 5 minutes.
diff --git a/docs/dev/table/streaming/time_attributes.md b/docs/dev/table/streaming/time_attributes.md
index 9014d2a..20e4301 100644
--- a/docs/dev/table/streaming/time_attributes.md
+++ b/docs/dev/table/streaming/time_attributes.md
@@ -69,6 +69,17 @@ env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) // default
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
{% endhighlight %}
</div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+env = StreamExecutionEnvironment.get_execution_environment()
+
+env.set_stream_time_characteristic(TimeCharacteristic.ProcessingTime) # default
+
+# alternatively:
+# env.set_stream_time_characteristic(TimeCharacteristic.IngestionTime)
+# env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
+{% endhighlight %}
+</div>
</div>
Processing time
diff --git a/docs/dev/table/streaming/time_attributes.zh.md b/docs/dev/table/streaming/time_attributes.zh.md
index 9014d2a..20e4301 100644
--- a/docs/dev/table/streaming/time_attributes.zh.md
+++ b/docs/dev/table/streaming/time_attributes.zh.md
@@ -69,6 +69,17 @@ env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) // default
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
{% endhighlight %}
</div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+env = StreamExecutionEnvironment.get_execution_environment()
+
+env.set_stream_time_characteristic(TimeCharacteristic.ProcessingTime) # default
+
+# alternatively:
+# env.set_stream_time_characteristic(TimeCharacteristic.IngestionTime)
+# env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
+{% endhighlight %}
+</div>
</div>
Processing time
diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index 3995b97..4fb9f75 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -101,9 +101,11 @@ The following example shows how a Python Table API program is constructed and ho
{% highlight python %}
from pyflink.table import *
+from pyflink.dataset import *
# environment configuration
-t_env = TableEnvironment.create(TableConfig.Builder().as_batch_execution().build())
+env = ExecutionEnvironment.get_execution_environment()
+t_env = TableEnvironment.create(env, TableConfig())
# register Orders table and Result table sink in table environment
# ...
@@ -113,7 +115,7 @@ orders = t_env.scan("Orders") # schema (a, b, c, rowtime)
orders.group_by("a").select("a, b.count as cnt").insert_into("result")
-t_env.execute()
+env.execute()
{% endhighlight %}
@@ -2287,10 +2289,10 @@ A session window is defined by using the `Session` class as follows:
<div data-lang="python" markdown="1">
{% highlight python %}
# Session Event-time Window
-.window(Session.withGap("10.minutes").on("rowtime").alias("w"))
+.window(Session.with_gap("10.minutes").on("rowtime").alias("w"))
# Session Processing-time Window (assuming a processing-time attribute "proctime")
-.window(Session.withGap("10.minutes").on("proctime").alias("w"))
+.window(Session.with_gap("10.minutes").on("proctime").alias("w"))
{% endhighlight %}
</div>
</div>
@@ -2324,7 +2326,7 @@ val table = input
{% highlight python %}
# define over window with alias w and aggregate over the over window w
table = input.over_window([OverWindow w].alias("w")) \
- .select("a, b.sum over w, c.min over w")
+ .select("a, b.sum over w, c.min over w")
{% endhighlight %}
</div>
</div>
diff --git a/docs/dev/table/tableApi.zh.md b/docs/dev/table/tableApi.zh.md
index ddcdd5d..33ad6f9 100644
--- a/docs/dev/table/tableApi.zh.md
+++ b/docs/dev/table/tableApi.zh.md
@@ -101,9 +101,11 @@ val result = orders
{% highlight python %}
from pyflink.table import *
+from pyflink.dataset import *
# environment configuration
-t_env = TableEnvironment.create(TableConfig.Builder().as_batch_execution().build())
+env = ExecutionEnvironment.get_execution_environment()
+t_env = TableEnvironment.create(env, TableConfig())
# register Orders table and Result table sink in table environment
# ...
@@ -113,7 +115,7 @@ orders = t_env.scan("Orders") # schema (a, b, c, rowtime)
orders.group_by("a").select("a, b.count as cnt").insert_into("result")
-t_env.execute()
+env.execute()
{% endhighlight %}
@@ -2286,10 +2288,10 @@ A session window is defined by using the `Session` class as follows:
<div data-lang="python" markdown="1">
{% highlight python %}
# Session Event-time Window
-.window(Session.withGap("10.minutes").on("rowtime").alias("w"))
+.window(Session.with_gap("10.minutes").on("rowtime").alias("w"))
# Session Processing-time Window (assuming a processing-time attribute "proctime")
-.window(Session.withGap("10.minutes").on("proctime").alias("w"))
+.window(Session.with_gap("10.minutes").on("proctime").alias("w"))
{% endhighlight %}
</div>
</div>
@@ -2323,7 +2325,7 @@ val table = input
{% highlight python %}
# define over window with alias w and aggregate over the over window w
table = input.over_window([OverWindow w].alias("w")) \
- .select("a, b.sum over w, c.min over w")
+ .select("a, b.sum over w, c.min over w")
{% endhighlight %}
</div>
</div>
diff --git a/flink-python/pyflink/dataset/execution_environment.py b/flink-python/pyflink/dataset/execution_environment.py
index 75fe47d..89115a3 100644
--- a/flink-python/pyflink/dataset/execution_environment.py
+++ b/flink-python/pyflink/dataset/execution_environment.py
@@ -70,7 +70,7 @@ class ExecutionEnvironment(object):
"""
Gets the config object that defines execution parameters.
- :return: The environment's execution configuration.
+ :return: An :class:`ExecutionConfig` object, the environment's execution configuration.
"""
return ExecutionConfig(self._j_execution_environment.getConfig())
@@ -179,15 +179,15 @@ class ExecutionEnvironment(object):
"""
return self._j_execution_environment.getExecutionPlan()
- @classmethod
- def get_execution_environment(cls):
+ @staticmethod
+ def get_execution_environment():
"""
Creates an execution environment that represents the context in which the program is
currently executed. If the program is invoked standalone, this method returns a local
execution environment. If the program is invoked from within the command line client to be
submitted to a cluster, this method returns the execution environment of this cluster.
- :return: The execution environment of the context in which the program is executed.
+ :return: The :class:`ExecutionEnvironment` of the context in which the program is executed.
"""
gateway = get_gateway()
j_execution_environment = gateway.jvm.org.apache.flink.api.java.ExecutionEnvironment\
diff --git a/flink-python/pyflink/datastream/stream_execution_environment.py b/flink-python/pyflink/datastream/stream_execution_environment.py
index f8a4897..f5451d7 100644
--- a/flink-python/pyflink/datastream/stream_execution_environment.py
+++ b/flink-python/pyflink/datastream/stream_execution_environment.py
@@ -422,8 +422,8 @@ class StreamExecutionEnvironment(object):
"""
return self._j_stream_execution_environment.getExecutionPlan()
- @classmethod
- def get_execution_environment(cls):
+ @staticmethod
+ def get_execution_environment():
"""
Creates an execution environment that represents the context in which the
program is currently executed. If the program is invoked standalone, this
diff --git a/flink-python/pyflink/table/__init__.py b/flink-python/pyflink/table/__init__.py
index 66528bf..ac5991d 100644
--- a/flink-python/pyflink/table/__init__.py
+++ b/flink-python/pyflink/table/__init__.py
@@ -59,7 +59,7 @@ from pyflink.table.table import Table, GroupedTable, GroupWindowedTable, OverWin
from pyflink.table.table_config import TableConfig
from pyflink.table.table_environment import (TableEnvironment, StreamTableEnvironment,
BatchTableEnvironment)
-from pyflink.table.sinks import TableSink, CsvTableSink
+from pyflink.table.sinks import TableSink, CsvTableSink, WriteMode
from pyflink.table.sources import TableSource, CsvTableSource
from pyflink.table.types import DataTypes, UserDefinedType, Row
from pyflink.table.table_schema import TableSchema
@@ -78,6 +78,7 @@ __all__ = [
'BatchQueryConfig',
'TableSink',
'TableSource',
+ 'WriteMode',
'CsvTableSink',
'CsvTableSource',
'DataTypes',
diff --git a/flink-python/pyflink/table/descriptors.py b/flink-python/pyflink/table/descriptors.py
index d31561c..cb8fc7b 100644
--- a/flink-python/pyflink/table/descriptors.py
+++ b/flink-python/pyflink/table/descriptors.py
@@ -814,9 +814,9 @@ class Kafka(ConnectorDescriptor):
| Flink Sinks --------- Kafka Partitions
| 1 ----------------> 1
| 2 ----------------> 2
- | 3
- | 4
- | 5
+ | ................ 3
+ | ................ 4
+ | ................ 5
:return: This object.
"""
@@ -1199,6 +1199,8 @@ class ConnectTableDescriptor(Descriptor):
class StreamTableDescriptor(ConnectTableDescriptor):
"""
Descriptor for specifying a table source and/or sink in a streaming environment.
+
+ .. seealso:: parent class: :class:`ConnectTableDescriptor`
"""
def __init__(self, j_stream_table_descriptor):
@@ -1259,6 +1261,8 @@ class StreamTableDescriptor(ConnectTableDescriptor):
class BatchTableDescriptor(ConnectTableDescriptor):
"""
Descriptor for specifying a table source and/or sink in a batch environment.
+
+ .. seealso:: parent class: :class:`ConnectTableDescriptor`
"""
def __init__(self, j_batch_table_descriptor):
diff --git a/flink-python/pyflink/table/query_config.py b/flink-python/pyflink/table/query_config.py
index d39109a..69b6488 100644
--- a/flink-python/pyflink/table/query_config.py
+++ b/flink-python/pyflink/table/query_config.py
@@ -36,6 +36,15 @@ class QueryConfig(object):
class StreamQueryConfig(QueryConfig):
"""
The :class:`StreamQueryConfig` holds parameters to configure the behavior of streaming queries.
+
+ Example:
+ ::
+
+ >>> query_config = StreamQueryConfig() \\
+ ... .with_idle_state_retention_time(datetime.timedelta(days=1),
+ ... datetime.timedelta(days=3))
+ >>> table_env.sql_update("...", query_config)
+
"""
def __init__(self, j_stream_query_config=None):
diff --git a/flink-python/pyflink/table/sinks.py b/flink-python/pyflink/table/sinks.py
index 9722602..4aa968f 100644
--- a/flink-python/pyflink/table/sinks.py
+++ b/flink-python/pyflink/table/sinks.py
@@ -20,7 +20,7 @@ from pyflink.java_gateway import get_gateway
from pyflink.table.types import _to_java_type, DataType
from pyflink.util import utils
-__all__ = ['TableSink', 'CsvTableSink']
+__all__ = ['TableSink', 'CsvTableSink', 'WriteMode']
class TableSink(object):
@@ -41,12 +41,20 @@ class CsvTableSink(TableSink):
"""
A simple :class:`TableSink` to emit data as CSV files.
+ Example:
+ ::
+
+ >>> CsvTableSink(["a", "b"], [DataTypes.INT(), DataTypes.STRING()],
+ ... "/csv/file/path", "|", 1, WriteMode.OVERWRITE)
+
:param field_names: The list of field names.
:param field_types: The list of field data types.
:param path: The output path to write the Table to.
:param field_delimiter: The field delimiter.
:param num_files: The number of files to write to.
- :param write_mode: The write mode to specify whether existing files are overwritten or not.
+ :param write_mode: The write mode to specify whether existing files are overwritten or not,
+ which contains: :data:`WriteMode.NO_OVERWRITE`
+ and :data:`WriteMode.OVERWRITE`.
"""
def __init__(self, field_names, field_types, path, field_delimiter=',', num_files=1,
diff --git a/flink-python/pyflink/table/table.py b/flink-python/pyflink/table/table.py
index 508ea17..fda8ebf 100644
--- a/flink-python/pyflink/table/table.py
+++ b/flink-python/pyflink/table/table.py
@@ -49,7 +49,7 @@ class Table(object):
>>> t_env.register_table_source("source", ...)
>>> t = t_env.scan("source")
>>> t.select(...)
- ...
+ >>> ...
>>> t_env.register_table_sink("result", ...)
>>> t.insert_into("result")
>>> env.execute()
@@ -452,6 +452,13 @@ class Table(object):
If the :func:`~pyflink.table.GroupWindowedTable.group_by` only references a GroupWindow
alias, the streamed table will be processed by a single task, i.e., with parallelism 1.
+ Example:
+ ::
+
+ >>> tab.window(Tumble.over("10.minutes").on("rowtime").alias("w")) \\
+ ... .group_by("w") \\
+ ... .select("a.sum as a, w.start as b, w.end as c, w.rowtime as d")
+
:param window: A :class:`pyflink.table.window.GroupWindow` created from
:class:`pyflink.table.window.Tumble`, :class:`pyflink.table.window.Session`
or :class:`pyflink.table.window.Slide`.
@@ -470,8 +477,8 @@ class Table(object):
Example:
::
- >>> table.window(Over.partition_by("c").order_by("rowTime")\\
- ... .preceding("10.seconds").alias("ow"))\\
+ >>> table.window(Over.partition_by("c").order_by("rowTime") \\
+ ... .preceding("10.seconds").alias("ow")) \\
... .select("c, b.count over ow, e.sum over ow")
.. note::
@@ -563,7 +570,7 @@ class Table(object):
Example:
::
- >>> tab.insert_into("print")
+ >>> tab.insert_into("sink")
:param table_path: The first part of the path of the registered :class:`TableSink` to which
the :class:`Table` is written. This is to ensure at least the name of the
@@ -641,7 +648,7 @@ class GroupWindowedTable(object):
Example:
::
- >>> tab.window(groupWindow.alias("w")).group_by("w, key").select("key, value.avg")
+ >>> tab.window(group_window.alias("w")).group_by("w, key").select("key, value.avg")
:param fields: Group keys.
:return: A :class:`WindowGroupedTable`.
diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py
index 9c4eabe..19b5199 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -59,7 +59,7 @@ class TableEnvironment(object):
>>> csv_table_source = CsvTableSource(
... csv_file_path, ['a', 'b'], [DataTypes.STRING(), DataTypes.BIGINT()])
- ... table_env.from_table_source(csv_table_source)
+ >>> table_env.from_table_source(csv_table_source)
:param table_source: The table source used as table.
:return: The result :class:`Table`.
@@ -95,6 +95,12 @@ class TableEnvironment(object):
Registers a :class:`Table` under a unique name in the TableEnvironment's catalog.
Registered tables can be referenced in SQL queries.
+ Example:
+ ::
+
+ >>> tab = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['a', 'b'])
+ >>> table_env.register_table("source", tab)
+
:param name: The name under which the table will be registered.
:param table: The table to register.
"""
@@ -105,6 +111,15 @@ class TableEnvironment(object):
Registers an external :class:`TableSource` in this :class:`TableEnvironment`'s catalog.
Registered tables can be referenced in SQL queries.
+ Example:
+ ::
+
+ >>> table_env.register_table_source("source",
+ ... CsvTableSource("./1.csv",
+ ... ["a", "b"],
+ ... [DataTypes.INT(),
+ ... DataTypes.STRING()]))
+
:param name: The name under which the :class:`TableSource` is registered.
:param table_source: The :class:`TableSource` to register.
"""
@@ -116,6 +131,15 @@ class TableEnvironment(object):
:class:`TableEnvironment`'s catalog.
Registered sink tables can be referenced in SQL DML statements.
+ Example:
+ ::
+
+ >>> table_env.register_table_sink("sink",
+ ... CsvTableSink(["a", "b"],
+ ... [DataTypes.INT(),
+ ... DataTypes.STRING()],
+ ... "./2.csv"))
+
:param name: The name under which the :class:`TableSink` is registered.
:param table_sink: The :class:`TableSink` to register.
"""
@@ -135,12 +159,12 @@ class TableEnvironment(object):
Scanning a directly registered table
::
- >>> tab = t_env.scan("tableName")
+ >>> tab = table_env.scan("tableName")
Scanning a table from a registered catalog
::
- >>> tab = t_env.scan("catalogName", "dbName", "tableName")
+ >>> tab = table_env.scan("catalogName", "dbName", "tableName")
:param table_path: The path of the table to scan.
:throws: Exception if no table is found using the given table path.
@@ -160,8 +184,8 @@ class TableEnvironment(object):
Example:
::
- >>> tab = t_env.scan("tableName")
- >>> t_env.insert_into(tab, "print")
+ >>> tab = table_env.scan("tableName")
+ >>> table_env.insert_into(tab, "sink")
:param table: :class:`Table` to write to the sink.
:param table_path: The first part of the path of the registered :class:`TableSink` to which
@@ -225,7 +249,7 @@ class TableEnvironment(object):
>>> table = ...
# the table is not registered to the table environment
- >>> t_env.sql_query("SELECT * FROM %s" % table)
+ >>> table_env.sql_query("SELECT * FROM %s" % table)
:param query: The sql query string.
:return: The result :class:`Table`.
@@ -248,10 +272,10 @@ class TableEnvironment(object):
::
# register the table sink into which the result is inserted.
- >>> t_env.register_table_sink("sink_table", table_sink)
+ >>> table_env.register_table_sink("sink_table", table_sink)
>>> source_table = ...
# source_table is not registered to the table environment
- >>> tEnv.sql_update(s"INSERT INTO sink_table SELECT * FROM %s" % source_table)
+ >>> table_env.sql_update("INSERT INTO sink_table SELECT * FROM %s" % source_table)
:param stmt: The SQL statement to evaluate.
:param query_config: The :class:`QueryConfig` to use.
@@ -404,16 +428,16 @@ class TableEnvironment(object):
Example:
::
- >>> table_env\\
+ >>> table_env \\
... .connect(ExternalSystemXYZ()
- ... .version("0.11"))\\
+ ... .version("0.11")) \\
... .with_format(Json()
- ... .json_schema("{...}")
- ... .fail_on_missing_field(False))\\
+ ... .json_schema("{...}")
+ ... .fail_on_missing_field(False)) \\
... .with_schema(Schema()
- ... .field("user-name", "VARCHAR")
- ... .from_origin_field("u_name")
- ... .field("count", "DECIMAL"))\\
+ ... .field("user-name", "VARCHAR")
+ ... .from_origin_field("u_name")
+ ... .field("count", "DECIMAL")) \\
... .register_table_source("MyTable")
:param connector_descriptor: Connector descriptor describing the external system.
@@ -425,11 +449,50 @@ class TableEnvironment(object):
def from_elements(self, elements, schema=None, verify_schema=True):
"""
Creates a table from a collection of elements.
+ The elements types must be acceptable atomic types or acceptable composite types.
+ All elements must be of the same type.
+ If the elements types are composite types, the composite types must be strictly equal,
+ and its subtypes must also be acceptable types.
+ e.g. if the elements are tuples, the length of the tuples must be equal, the element types
+ of the tuples must be equal in order.
+
+ The built-in acceptable atomic element types contains:
+
+ **int**, **long**, **str**, **unicode**, **bool**,
+ **float**, **bytearray**, **datetime.date**, **datetime.time**, **datetime.datetime**,
+ **datetime.timedelta**, **decimal.Decimal**
+
+ The built-in acceptable composite element types contains:
+
+ **list**, **tuple**, **dict**, **array**, :class:`pyflink.table.Row`
+
+ If the element type is a composite type, it will be unboxed.
+ e.g. table_env.from_elements([(1, 'Hi'), (2, 'Hello')]) will return a table like:
+
+ +----+-------+
+ | _1 | _2 |
+ +====+=======+
+ | 1 | Hi |
+ +----+-------+
+ | 2 | Hello |
+ +----+-------+
+
+ "_1" and "_2" are generated field names.
Example:
::
+ # use the second parameter to specify custom field names
>>> table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['a', 'b'])
+ # use the second parameter to specify custom table schema
+ >>> table_env.from_elements([(1, 'Hi'), (2, 'Hello')],
+ ... DataTypes.ROW([DataTypes.FIELD("a", DataTypes.INT()),
+ ... DataTypes.FIELD("b", DataTypes.STRING())]))
+ # use the thrid parameter to switch whether to verify the elements against the schema
+ >>> table_env.from_elements([(1, 'Hi'), (2, 'Hello')],
+ ... DataTypes.ROW([DataTypes.FIELD("a", DataTypes.INT()),
+ ... DataTypes.FIELD("b", DataTypes.STRING())]),
+ ... False)
:param elements: The elements to create a table from.
:param schema: The schema of the table.
@@ -541,16 +604,16 @@ class StreamTableEnvironment(TableEnvironment):
registering a table source as "MyTable":
::
- >>> table_env\\
+ >>> table_env \\
... .connect(ExternalSystemXYZ()
- ... .version("0.11"))\\
+ ... .version("0.11")) \\
... .with_format(Json()
- ... .json_schema("{...}")
- ... .fail_on_missing_field(False))\\
+ ... .json_schema("{...}")
+ ... .fail_on_missing_field(False)) \\
... .with_schema(Schema()
- ... .field("user-name", "VARCHAR")
- ... .from_origin_field("u_name")
- ... .field("count", "DECIMAL"))\\
+ ... .field("user-name", "VARCHAR")
+ ... .from_origin_field("u_name")
+ ... .field("count", "DECIMAL")) \\
... .register_table_source("MyTable")
:param connector_descriptor: Connector descriptor describing the external system.
@@ -560,8 +623,28 @@ class StreamTableEnvironment(TableEnvironment):
return StreamTableDescriptor(
self._j_tenv.connect(connector_descriptor._j_connector_descriptor))
- @classmethod
- def create(cls, stream_execution_environment, table_config=None):
+ @staticmethod
+ def create(stream_execution_environment, table_config=None):
+ """
+ Creates a :class:`TableEnvironment` for a :class:`StreamExecutionEnvironment`
+
+ Example:
+ ::
+
+ >>> env = StreamExecutionEnvironment.get_execution_environment()
+ # create without TableConfig
+ >>> table_env = StreamTableEnvironment.create(env)
+ # create with TableConfig
+ >>> table_config = TableConfig()
+ >>> table_config.set_null_check(False)
+ >>> table_env = StreamTableEnvironment.create(env, table_config)
+
+ :param stream_execution_environment: The :class:`StreamExecutionEnvironment` of the
+ TableEnvironment.
+ :param table_config: The configuration of the TableEnvironment, optional.
+ :return: The :class:`StreamTableEnvironment` created from given StreamExecutionEnvironment
+ and configuration.
+ """
gateway = get_gateway()
if table_config is not None:
j_tenv = gateway.jvm.StreamTableEnvironment.create(
@@ -608,16 +691,16 @@ class BatchTableEnvironment(TableEnvironment):
registering a table source as "MyTable":
::
- >>> table_env\\
+ >>> table_env \\
... .connect(ExternalSystemXYZ()
- ... .version("0.11"))\\
+ ... .version("0.11")) \\
... .with_format(Json()
- ... .json_schema("{...}")
- ... .fail_on_missing_field(False))\\
+ ... .json_schema("{...}")
+ ... .fail_on_missing_field(False)) \\
... .with_schema(Schema()
- ... .field("user-name", "VARCHAR")
- ... .from_origin_field("u_name")
- ... .field("count", "DECIMAL"))\\
+ ... .field("user-name", "VARCHAR")
+ ... .from_origin_field("u_name")
+ ... .field("count", "DECIMAL")) \\
... .register_table_source("MyTable")
:param connector_descriptor: Connector descriptor describing the external system.
@@ -627,8 +710,26 @@ class BatchTableEnvironment(TableEnvironment):
return BatchTableDescriptor(
self._j_tenv.connect(connector_descriptor._j_connector_descriptor))
- @classmethod
- def create(cls, execution_environment, table_config=None):
+ @staticmethod
+ def create(execution_environment, table_config=None):
+ """
+ Creates a :class:`TableEnvironment` for a batch :class:`ExecutionEnvironment`.
+
+ Example:
+ ::
+
+ >>> env = ExecutionEnvironment.get_execution_environment()
+ >>> table_env = BatchTableEnvironment.create(env)
+ >>> table_config = TableConfig()
+ >>> table_config.set_null_check(False)
+ >>> table_env = BatchTableEnvironment.create(env, table_config)
+
+ :param execution_environment: The batch :class:`ExecutionEnvironment` of the
+ TableEnvironment.
+ :param table_config: The configuration of the TableEnvironment, optional.
+ :return: The :class:`BatchTableEnvironment` created from given ExecutionEnvironment and
+ configuration.
+ """
gateway = get_gateway()
if table_config is not None:
j_tenv = gateway.jvm.BatchTableEnvironment.create(