You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by lr...@apache.org on 2020/12/15 01:41:57 UTC
[bahir-website] 06/07: Update documentation for Flink extensions
This is an automated email from the ASF dual-hosted git repository.
lresende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir-website.git
commit ef547fddc9b6977cb8ba59597f0ab94cf83ec004
Author: Luciano Resende <lr...@apache.org>
AuthorDate: Mon Dec 14 17:37:17 2020 -0800
Update documentation for Flink extensions
---
site/docs/flink/current/flink-streaming-kudu.md | 315 ++++++++++++++++++-----
site/docs/flink/current/flink-streaming-redis.md | 6 +-
2 files changed, 259 insertions(+), 62 deletions(-)
diff --git a/site/docs/flink/current/flink-streaming-kudu.md b/site/docs/flink/current/flink-streaming-kudu.md
index 2eef38c..2af5e9a 100644
--- a/site/docs/flink/current/flink-streaming-kudu.md
+++ b/site/docs/flink/current/flink-streaming-kudu.md
@@ -27,8 +27,12 @@ limitations under the License.
# Flink Kudu Connector
-This connector provides a source (```KuduInputFormat```) and a sink/output (```KuduSink``` and ```KuduOutputFormat```, respectively) that can read and write to [Kudu](https://kudu.apache.org/). To use this connector, add the
-following dependency to your project:
+This connector provides a source (```KuduInputFormat```), a sink/output
+(```KuduSink``` and ```KuduOutputFormat```, respectively),
+ as well a table source (`KuduTableSource`), an upsert table sink (`KuduTableSink`), and a catalog (`KuduCatalog`),
+ to allow reading and writing to [Kudu](https://kudu.apache.org/).
+
+To use this connector, add the following dependency to your project:
<dependency>
<groupId>org.apache.bahir</groupId>
@@ -36,90 +40,283 @@ following dependency to your project:
<version>1.1-SNAPSHOT</version>
</dependency>
-*Version Compatibility*: This module is compatible with Apache Kudu *1.7.1* (last stable version).
+ *Version Compatibility*: This module is compatible with Apache Kudu *1.11.1* (last stable version) and Apache Flink 1.10.+.
Note that the streaming connectors are not part of the binary distribution of Flink. You need to link them into your job jar for cluster execution.
-See how to link with them for cluster execution [here](https://ci.apache.org/projects/flink/flink-docs-stable/start/dependencies.html).
+See how to link with them for cluster execution [here](https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/projectsetup/dependencies.html).
## Installing Kudu
Follow the instructions from the [Kudu Installation Guide](https://kudu.apache.org/docs/installation.html).
-Optionally, you can use the docker images provided in dockers folder.
+Optionally, you can use the docker images provided in dockers folder.
+
+## SQL and Table API
+
+The Kudu connector is fully integrated with the Flink Table and SQL APIs. Once we configure the Kudu catalog (see next section)
+we can start querying or inserting into existing Kudu tables using the Flink SQL or Table API.
+
+For more information about the possible queries please check the [official documentation](https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/)
-## KuduInputFormat
+### Kudu Catalog
+The connector comes with a catalog implementation to handle metadata about your Kudu setup and perform table management.
+By using the Kudu catalog, you can access all the tables already created in Kudu from Flink SQL queries. The Kudu catalog only
+allows users to create or access existing Kudu tables. Tables using other data sources must be defined in other catalogs such as
+in-memory catalog or Hive catalog.
+
+When using the SQL CLI you can easily add the Kudu catalog to your environment yaml file:
+
+```
+catalogs:
+ - name: kudu
+ type: kudu
+ kudu.masters: <host>:7051
```
-ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-env.setParallelism(PARALLELISM);
+Once the SQL CLI is started you can simply switch to the Kudu catalog by calling `USE CATALOG kudu;`
-// create a table info object
-KuduTableInfo tableInfo = KuduTableInfo.Builder
- .create("books")
- .addColumn(KuduColumnInfo.Builder.create("id", Type.INT32).key(true).hashKey(true).build())
- .addColumn(KuduColumnInfo.Builder.create("title", Type.STRING).build())
- .addColumn(KuduColumnInfo.Builder.create("author", Type.STRING).build())
- .addColumn(KuduColumnInfo.Builder.create("price", Type.DOUBLE).build())
- .addColumn(KuduColumnInfo.Builder.create("quantity", Type.INT32).build())
- .build();
-
-// Pass the tableInfo to the KuduInputFormat and provide kuduMaster ips
-env.createInput(new KuduInputFormat<>("172.25.0.6", tableInfo))
- .count();
-
-env.execute();
+You can also create and use the KuduCatalog directly in the Table environment:
+
+```java
+String KUDU_MASTERS="host1:port1,host2:port2"
+KuduCatalog catalog = new KuduCatalog(KUDU_MASTERS);
+tableEnv.registerCatalog("kudu", catalog);
+tableEnv.useCatalog("kudu");
```
-## KuduOutputFormat
+### DDL operations using SQL
+
+It is possible to manipulate Kudu tables using SQL DDL.
+
+When not using the Kudu catalog, the following additional properties must be specified in the `WITH` clause:
+* `'connector.type'='kudu'`
+* `'kudu.masters'='host1:port1,host2:port2,...'`: comma-delimitered list of Kudu masters
+* `'kudu.table'='...'`: The table's name within the Kudu database.
+
+If you have registered and are using the Kudu catalog, these properties are handled automatically.
+
+To create a table, the additional properties `kudu.primary-key-columns` and `kudu.hash-columns` must be specified
+as comma-delimited lists. Optionally, you can set the `kudu.replicas` property (defaults to 1).
+Other properties, such as range partitioning, cannot be configured here - for more flexibility, please use
+`catalog.createTable` as described in [this](#Creating-a-KuduTable-directly-with-KuduCatalog) section or create the table directly in Kudu.
+
+The `NOT NULL` constraint can be added to any of the column definitions.
+By setting a column as a primary key, it will automatically by created with the `NOT NULL` constraint.
+Hash columns must be a subset of primary key columns.
+
+Kudu Catalog
```
-ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+CREATE TABLE TestTable (
+ first STRING,
+ second STRING,
+ third INT NOT NULL
+) WITH (
+ 'kudu.hash-columns' = 'first',
+ 'kudu.primary-key-columns' = 'first,second'
+)
+```
-env.setParallelism(PARALLELISM);
+Other catalogs
+```
+CREATE TABLE TestTable (
+ first STRING,
+ second STRING,
+ third INT NOT NULL
+) WITH (
+ 'connector.type' = 'kudu',
+ 'kudu.masters' = '...',
+ 'kudu.table' = 'TestTable',
+ 'kudu.hash-columns' = 'first',
+ 'kudu.primary-key-columns' = 'first,second'
+)
+```
-// create a table info object
-KuduTableInfo tableInfo = KuduTableInfo.Builder
- .create("books")
- .createIfNotExist(true)
- .replicas(1)
- .addColumn(KuduColumnInfo.Builder.create("id", Type.INT32).key(true).hashKey(true).build())
- .addColumn(KuduColumnInfo.Builder.create("title", Type.STRING).build())
- .addColumn(KuduColumnInfo.Builder.create("author", Type.STRING).build())
- .addColumn(KuduColumnInfo.Builder.create("price", Type.DOUBLE).build())
- .addColumn(KuduColumnInfo.Builder.create("quantity", Type.INT32).build())
- .build();
+Renaming a table:
+```
+ALTER TABLE TestTable RENAME TO TestTableRen
+```
+
+Dropping a table:
+```sql
+DROP TABLE TestTableRen
+```
-...
+#### Creating a KuduTable directly with KuduCatalog
-env.fromCollection(books)
- .output(new KuduOutputFormat<>("172.25.0.6", tableInfo));
+The KuduCatalog also exposes a simple `createTable` method that required only the where table configuration,
+including schema, partitioning, replication, etc. can be specified using a `KuduTableInfo` object.
-env.execute();
+Use the `createTableIfNotExists` method, that takes a `ColumnSchemasFactory` and
+a `CreateTableOptionsFactory` parameter, that implement respectively `getColumnSchemas()`
+returning a list of Kudu [ColumnSchema](https://kudu.apache.org/apidocs/org/apache/kudu/ColumnSchema.html) objects;
+ and `getCreateTableOptions()` returning a
+[CreateTableOptions](https://kudu.apache.org/apidocs/org/apache/kudu/client/CreateTableOptions.html) object.
+
+This example shows the creation of a table called `ExampleTable` with two columns,
+`first` being a primary key; and configuration of replicas and hash partitioning.
+
+```java
+KuduTableInfo tableInfo = KuduTableInfo
+ .forTable("ExampleTable")
+ .createTableIfNotExists(
+ () ->
+ Lists.newArrayList(
+ new ColumnSchema
+ .ColumnSchemaBuilder("first", Type.INT32)
+ .key(true)
+ .build(),
+ new ColumnSchema
+ .ColumnSchemaBuilder("second", Type.STRING)
+ .build()
+ ),
+ () -> new CreateTableOptions()
+ .setNumReplicas(1)
+ .addHashPartitions(Lists.newArrayList("first"), 2));
+
+catalog.createTable(tableInfo, false);
```
+The example uses lambda expressions to implement the functional interfaces.
+
+Read more about Kudu schema design in the [Kudu docs](https://kudu.apache.org/docs/schema_design.html).
+
+### Supported data types
+| Flink/SQL | Kudu |
+| ------------- |:-------------:|
+| STRING | STRING |
+| BOOLEAN | BOOL |
+| TINYINT | INT8 |
+| SMALLINT | INT16 |
+| INT | INT32 |
+| BIGINT | INT64 |
+| FLOAT | FLOAT |
+| DOUBLE | DOUBLE |
+| BYTES | BINARY |
+| TIMESTAMP(3) | UNIXTIME_MICROS |
+
+Note:
+* `TIMESTAMP`s are fixed to a precision of 3, and the corresponding Java conversion class is `java.sql.Timestamp`
+* `BINARY` and `VARBINARY` are not yet supported - use `BYTES`, which is a `VARBINARY(2147483647)`
+* `CHAR` and `VARCHAR` are not yet supported - use `STRING`, which is a `VARCHAR(2147483647)`
+* `DECIMAL` types are not yet supported
-## KuduSink
+### Known limitations
+* Data type limitations (see above).
+* SQL Create table: primary keys can only be set by the `kudu.primary-key-columns` property, using the
+`PRIMARY KEY` constraint is not yet possible.
+* SQL Create table: range partitioning is not supported.
+* When getting a table through the Catalog, NOT NULL and PRIMARY KEY constraints are ignored. All columns
+are described as being nullable, and not being primary keys.
+* Kudu tables cannot be altered through the catalog other than simple renaming
+## DataStream API
+
+It is also possible to use the Kudu connector directly from the DataStream API however we
+encourage all users to explore the Table API as it provides a lot of useful tooling when working
+with Kudu data.
+
+### Reading tables into a DataStreams
+
+There are 2 main ways of reading a Kudu Table into a DataStream
+ 1. Using the `KuduCatalog` and the Table API
+ 2. Using the `KuduRowInputFormat` directly
+
+Using the `KuduCatalog` and Table API is the recommended way of reading tables as it automatically
+guarantees type safety and takes care of configuration of our readers.
+
+This is how it works in practice:
+```java
+StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, tableSettings);
+
+tableEnv.registerCatalog("kudu", new KuduCatalog("master:port"));
+tableEnv.useCatalog("kudu");
+
+Table table = tableEnv.sqlQuery("SELECT * FROM MyKuduTable");
+DataStream<Row> rows = tableEnv.toAppendStream(table, Row.class);
+```
+
+The second way of achieving the same thing is by using the `KuduRowInputFormat` directly.
+In this case we have to manually provide all information about our table:
+
+```java
+KuduTableInfo tableInfo = ...
+KuduReaderConfig readerConfig = ...
+KuduRowInputFormat inputFormat = new KuduRowInputFormat(readerConfig, tableInfo);
+
+DataStream<Row> rowStream = env.createInput(inputFormat, rowTypeInfo);
+```
+
+At the end of the day the `KuduTableSource` is just a convenient wrapper around the `KuduRowInputFormat`.
+
+### Kudu Sink
+The connector provides a `KuduSink` class that can be used to consume DataStreams
+and write the results into a Kudu table.
+
+The constructor takes 3 or 4 arguments.
+ * `KuduWriterConfig` is used to specify the Kudu masters and the flush mode.
+ * `KuduTableInfo` identifies the table to be written
+ * `KuduOperationMapper` maps the records coming from the DataStream to a list of Kudu operations.
+ * `KuduFailureHandler` (optional): If you want to provide your own logic for handling writing failures.
+
+The example below shows the creation of a sink for Row type records of 3 fields. It Upserts each record.
+It is assumed that a Kudu table with columns `col1, col2, col3` called `AlreadyExistingTable` exists. Note that if this were not the case,
+we could pass a `KuduTableInfo` as described in the [Catalog - Creating a table](#creating-a-table) section,
+and the sink would create the table with the provided configuration.
+
+```java
+KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(KUDU_MASTERS).build();
+
+KuduSink<Row> sink = new KuduSink<>(
+ writerConfig,
+ KuduTableInfo.forTable("AlreadyExistingTable"),
+ new RowOperationMapper<>(
+ new String[]{"col1", "col2", "col3"},
+ AbstractSingleOperationMapper.KuduOperation.UPSERT)
+)
```
-StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-env.setParallelism(PARALLELISM);
+#### KuduOperationMapper
+
+This section describes the Operation mapping logic in more detail.
+
+The connector supports insert, upsert, update, and delete operations.
+The operation to be performed can vary dynamically based on the record.
+To allow for more flexibility, it is also possible for one record to trigger
+0, 1, or more operations.
+For the highest level of control, implement the `KuduOperationMapper` interface.
-// create a table info object
-KuduTableInfo tableInfo = KuduTableInfo.Builder
- .create("books")
- .createIfNotExist(true)
- .replicas(1)
- .addColumn(KuduColumnInfo.Builder.create("id", Type.INT32).key(true).hashKey(true).build())
- .addColumn(KuduColumnInfo.Builder.create("title", Type.STRING).build())
- .addColumn(KuduColumnInfo.Builder.create("author", Type.STRING).build())
- .addColumn(KuduColumnInfo.Builder.create("price", Type.DOUBLE).build())
- .addColumn(KuduColumnInfo.Builder.create("quantity", Type.INT32).build())
- .build();
+If one record from the DataStream corresponds to one table operation,
+extend the `AbstractSingleOperationMapper` class. An array of column
+names must be provided. This must match the Kudu table's schema.
-...
+The `getField` method must be overridden, which extracts the value for the table column whose name is
+at the `i`th place in the `columnNames` array.
+If the operation is one of (`CREATE, UPSERT, UPDATE, DELETE`)
+and doesn't depend on the input record (constant during the life of the sink), it can be set in the constructor
+of `AbstractSingleOperationMapper`.
+It is also possible to implement your own logic by overriding the
+`createBaseOperation` method that returns a Kudu [Operation](https://kudu.apache.org/apidocs/org/apache/kudu/client/Operation.html).
-env.fromCollection(books)
- .addSink(new KuduSink<>("172.25.0.6", tableInfo));
+There are pre-defined operation mappers for Pojo, Flink Row, and Flink Tuple types for constant operation, 1-to-1 sinks.
+* `PojoOperationMapper`: Each table column must correspond to a POJO field
+with the same name. The `columnNames` array should contain those fields of the POJO that
+are present as table columns (the POJO fields can be a superset of table columns).
+* `RowOperationMapper` and `TupleOperationMapper`: the mapping is based on position. The
+`i`th field of the Row/Tuple corresponds to the column of the table at the `i`th
+position in the `columnNames` array.
-env.execute();
+## Building the connector
+
+The connector can be easily built by using maven:
+
+```
+cd bahir-flink
+mvn clean install
```
+
+### Running the tests
+
+The integration tests rely on the Kudu test harness which requires the current user to be able to ssh to localhost.
+
+This might not work out of the box on some operating systems (such as Mac OS X).
+To solve this problem go to *System Preferences/Sharing* and enable Remote login for your user.
diff --git a/site/docs/flink/current/flink-streaming-redis.md b/site/docs/flink/current/flink-streaming-redis.md
index 1a2f8c2..0c646fb 100644
--- a/site/docs/flink/current/flink-streaming-redis.md
+++ b/site/docs/flink/current/flink-streaming-redis.md
@@ -105,7 +105,7 @@ This example code does the same, but for Redis Cluster:
**Java:**
- FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder()
+ FlinkJedisPoolConfig conf = new FlinkJedisClusterConfig.Builder()
.setNodes(new HashSet<InetSocketAddress>(Arrays.asList(new InetSocketAddress(5601)))).build();
DataStream<String> stream = ...;
@@ -114,7 +114,7 @@ This example code does the same, but for Redis Cluster:
**Scala:**
- val conf = new FlinkJedisPoolConfig.Builder().setNodes(...).build()
+ val conf = new FlinkJedisClusterConfig.Builder().setNodes(...).build()
stream.addSink(new RedisSink[(String, String)](conf, new RedisExampleMapper))
@@ -155,7 +155,7 @@ This section gives a description of all the available data types and what Redis
</td>
</tr>
<tr>
- <td>SET</td><td><a href="http://redis.io/commands/rpush">SADD</a></td>
+ <td>SET</td><td><a href="http://redis.io/commands/sadd">SADD</a></td>
</tr>
<tr>
<td>PUBSUB</td><td><a href="http://redis.io/commands/publish">PUBLISH</a></td>