You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2022/01/25 08:27:35 UTC

[flink] branch master updated (2160735 -> fa161d3)

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 2160735  [FLINK-25739][dist] Include Changelog to flink-dist jar
     new 5e28f66  [FLINK-25391][connector-elasticsearch] Forward catalog table options
     new c61162b  [FLINK-25391][connector-jdbc] Forward catalog table options
     new c926031  [FLINK-25391][connector-files] Forward catalog table options
     new 0c34c99  [FLINK-25391][connector-kafka] Forward catalog table options
     new 2cb86ff  [FLINK-25391][connector-kinesis] Forward catalog table options
     new 5175ed0  [FLINK-25391][connector-hbase] Forward catalog table options
     new 8f77862  [FLINK-25391][format-avro] Forward catalog table options
     new 0370c36e [FLINK-25391][format-csv] Forward catalog table options
     new fa161d3  [FLINK-25391][format-json] Forward catalog table options

The 9 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../content/docs/connectors/table/elasticsearch.md | 23 ++++++-
 docs/content/docs/connectors/table/filesystem.md   | 80 +++++++++++++++++-----
 .../connectors/table/formats/avro-confluent.md     | 15 +++-
 docs/content/docs/connectors/table/formats/avro.md |  5 +-
 docs/content/docs/connectors/table/formats/csv.md  | 12 +++-
 docs/content/docs/connectors/table/formats/json.md | 10 ++-
 docs/content/docs/connectors/table/hbase.md        | 17 ++++-
 docs/content/docs/connectors/table/jdbc.md         | 24 ++++++-
 docs/content/docs/connectors/table/kafka.md        | 24 ++++++-
 docs/content/docs/connectors/table/kinesis.md      | 79 +++++++++++++++++++--
 .../table/ElasticsearchDynamicSinkFactoryBase.java | 50 +++++++++-----
 .../table/Elasticsearch6DynamicSinkFactory.java    | 21 +++---
 .../file/table/AbstractFileSystemTable.java        | 24 +++----
 .../file/table/FileSystemTableFactory.java         | 29 +++++++-
 .../connector/file/table/FileSystemTableSink.java  | 21 ++++--
 .../file/table/FileSystemTableSource.java          | 25 ++++---
 .../hbase1/HBase1DynamicTableFactory.java          | 29 ++++++--
 .../hbase2/HBase2DynamicTableFactory.java          | 27 ++++++--
 .../hbase/table/HBaseConnectorOptionsUtil.java     | 18 ++---
 .../jdbc/table/JdbcDynamicTableFactory.java        | 23 +++++++
 .../kafka/table/KafkaDynamicTableFactory.java      | 23 ++++---
 .../kinesis/table/KinesisDynamicTableFactory.java  | 16 ++++-
 .../confluent/RegistryAvroFormatFactory.java       | 19 +++++
 .../flink/formats/avro/AvroFileFormatFactory.java  |  5 ++
 .../apache/flink/formats/csv/CsvFormatFactory.java | 13 ++++
 .../flink/formats/json/JsonFormatFactory.java      |  1 -
 26 files changed, 509 insertions(+), 124 deletions(-)

[flink] 05/09: [FLINK-25391][connector-kinesis] Forward catalog table options

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2cb86ff03747499bfceda74cb8cc1ea48c385452
Author: slinkydeveloper <fr...@gmail.com>
AuthorDate: Thu Jan 6 16:21:52 2022 +0100

    [FLINK-25391][connector-kinesis] Forward catalog table options
---
 docs/content/docs/connectors/table/kinesis.md      | 79 ++++++++++++++++++++--
 .../kinesis/table/KinesisDynamicTableFactory.java  | 16 ++++-
 2 files changed, 88 insertions(+), 7 deletions(-)

diff --git a/docs/content/docs/connectors/table/kinesis.md b/docs/content/docs/connectors/table/kinesis.md
index f26b1e9..1840862 100644
--- a/docs/content/docs/connectors/table/kinesis.md
+++ b/docs/content/docs/connectors/table/kinesis.md
@@ -122,11 +122,12 @@ Connector Options
 <table class="table table-bordered">
     <thead>
     <tr>
-      <th class="text-left" style="width: 25%">Option</th>
-      <th class="text-center" style="width: 8%">Required</th>
-      <th class="text-center" style="width: 7%">Default</th>
-      <th class="text-center" style="width: 10%">Type</th>
-      <th class="text-center" style="width: 50%">Description</th>
+        <th class="text-left" style="width: 25%">Option</th>
+        <th class="text-center" style="width: 8%">Required</th>
+        <th class="text-center" style="width: 8%">Forwarded</th>
+        <th class="text-center" style="width: 7%">Default</th>
+        <th class="text-center" style="width: 10%">Type</th>
+        <th class="text-center" style="width: 42%">Description</th>
     </tr>
     <tr>
       <th colspan="5" class="text-left" style="width: 100%">Common Options</th>
@@ -136,6 +137,7 @@ Connector Options
     <tr>
       <td><h5>connector</h5></td>
       <td>required</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>Specify what connector to use. For Kinesis use <code>'kinesis'</code>.</td>
@@ -143,6 +145,7 @@ Connector Options
     <tr>
       <td><h5>stream</h5></td>
       <td>required</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>Name of the Kinesis data stream backing this table.</td>
@@ -150,6 +153,7 @@ Connector Options
     <tr>
       <td><h5>format</h5></td>
       <td>required</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>The format used to deserialize and serialize Kinesis data stream records. See <a href="#data-type-mapping">Data Type Mapping</a> for details.</td>
@@ -157,6 +161,7 @@ Connector Options
     <tr>
       <td><h5>aws.region</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>The AWS region where the stream is defined. Either this or <code>aws.endpoint</code> are required.</td>
@@ -164,6 +169,7 @@ Connector Options
     <tr>
       <td><h5>aws.endpoint</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>The AWS endpoint for Kinesis (derived from the AWS region setting if not set). Either this or <code>aws.region</code> are required.</td>
@@ -185,6 +191,7 @@ Connector Options
     <tr>
       <td><h5>aws.credentials.provider</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">AUTO</td>
       <td>String</td>
       <td>A credentials provider to use when authenticating against the Kinesis endpoint. See <a href="#authentication">Authentication</a> for details.</td>
@@ -192,6 +199,7 @@ Connector Options
     <tr>
 	  <td><h5>aws.credentials.basic.accesskeyid</h5></td>
 	  <td>optional</td>
+      <td>no</td>
 	  <td style="word-wrap: break-word;">(none)</td>
 	  <td>String</td>
 	  <td>The AWS access key ID to use when setting credentials provider type to BASIC.</td>
@@ -199,6 +207,7 @@ Connector Options
     <tr>
 	  <td><h5>aws.credentials.basic.secretkey</h5></td>
 	  <td>optional</td>
+      <td>no</td>
 	  <td style="word-wrap: break-word;">(none)</td>
 	  <td>String</td>
 	  <td>The AWS secret key to use when setting credentials provider type to BASIC.</td>
@@ -206,6 +215,7 @@ Connector Options
     <tr>
 	  <td><h5>aws.credentials.profile.path</h5></td>
 	  <td>optional</td>
+      <td>no</td>
 	  <td style="word-wrap: break-word;">(none)</td>
 	  <td>String</td>
 	  <td>Optional configuration for profile path if credential provider type is set to be PROFILE.</td>
@@ -213,6 +223,7 @@ Connector Options
     <tr>
 	  <td><h5>aws.credentials.profile.name</h5></td>
 	  <td>optional</td>
+      <td>no</td>
 	  <td style="word-wrap: break-word;">(none)</td>
 	  <td>String</td>
 	  <td>Optional configuration for profile name if credential provider type is set to be PROFILE.</td>
@@ -220,6 +231,7 @@ Connector Options
     <tr>
 	  <td><h5>aws.credentials.role.arn</h5></td>
 	  <td>optional</td>
+      <td>no</td>
 	  <td style="word-wrap: break-word;">(none)</td>
 	  <td>String</td>
 	  <td>The role ARN to use when credential provider type is set to ASSUME_ROLE or WEB_IDENTITY_TOKEN.</td>
@@ -227,6 +239,7 @@ Connector Options
     <tr>
 	  <td><h5>aws.credentials.role.sessionName</h5></td>
 	  <td>optional</td>
+      <td>no</td>
 	  <td style="word-wrap: break-word;">(none)</td>
 	  <td>String</td>
 	  <td>The role session name to use when credential provider type is set to ASSUME_ROLE or WEB_IDENTITY_TOKEN.</td>
@@ -234,6 +247,7 @@ Connector Options
     <tr>
 	  <td><h5>aws.credentials.role.externalId</h5></td>
 	  <td>optional</td>
+      <td>no</td>
 	  <td style="word-wrap: break-word;">(none)</td>
 	  <td>String</td>
 	  <td>The external ID to use when credential provider type is set to ASSUME_ROLE.</td>
@@ -241,6 +255,7 @@ Connector Options
     <tr>
 	  <td><h5>aws.credentials.role.provider</h5></td>
 	  <td>optional</td>
+      <td>no</td>
 	  <td style="word-wrap: break-word;">(none)</td>
 	  <td>String</td>
 	  <td>The credentials provider that provides credentials for assuming the role when credential provider type is set to ASSUME_ROLE. Roles can be nested, so this value can again be set to ASSUME_ROLE</td>
@@ -248,6 +263,7 @@ Connector Options
     <tr>
 	  <td><h5>aws.credentials.webIdentityToken.file</h5></td>
 	  <td>optional</td>
+      <td>no</td>
 	  <td style="word-wrap: break-word;">(none)</td>
 	  <td>String</td>
 	  <td>The absolute path to the web identity token file that should be used if provider type is set to WEB_IDENTITY_TOKEN.</td>
@@ -262,6 +278,7 @@ Connector Options
     <tr>
       <td><h5>scan.stream.initpos</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">LATEST</td>
       <td>String</td>
       <td>Initial position to be used when reading from the table. See <a href="#start-reading-position">Start Reading Position</a> for details.</td>
@@ -269,6 +286,7 @@ Connector Options
     <tr>
       <td><h5>scan.stream.initpos-timestamp</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>The initial timestamp to start reading Kinesis stream from (when <code>scan.stream.initpos</code> is AT_TIMESTAMP). See <a href="#start-reading-position">Start Reading Position</a> for details.</td>
@@ -276,6 +294,7 @@ Connector Options
     <tr>
       <td><h5>scan.stream.initpos-timestamp-format</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">yyyy-MM-dd'T'HH:mm:ss.SSSXXX</td>
       <td>String</td>
       <td>The date format of initial timestamp to start reading Kinesis stream from (when <code>scan.stream.initpos</code> is AT_TIMESTAMP). See <a href="#start-reading-position">Start Reading Position</a> for details.</td>
@@ -283,6 +302,7 @@ Connector Options
     <tr>
       <td><h5>scan.stream.recordpublisher</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">POLLING</td>
       <td>String</td>
       <td>The <code>RecordPublisher</code> type to use for sources. See <a href="#enhanced-fan-out">Enhanced Fan-Out</a> for details.</td>
@@ -290,6 +310,7 @@ Connector Options
     <tr>
       <td><h5>scan.stream.efo.consumername</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>The name of the EFO consumer to register with KDS. See <a href="#enhanced-fan-out">Enhanced Fan-Out</a> for details.</td>
@@ -297,6 +318,7 @@ Connector Options
     <tr>
       <td><h5>scan.stream.efo.registration</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">LAZY</td>
       <td>String</td>
       <td>Determine how and when consumer de-/registration is performed (LAZY|EAGER|NONE). See <a href="#enhanced-fan-out">Enhanced Fan-Out</a> for details.</td>
@@ -304,6 +326,7 @@ Connector Options
     <tr>
       <td><h5>scan.stream.efo.consumerarn</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>The prefix of consumer ARN for a given stream. See <a href="#enhanced-fan-out">Enhanced Fan-Out</a> for details.</td>
@@ -311,6 +334,7 @@ Connector Options
     <tr>
       <td><h5>scan.stream.efo.http-client.max-concurrency</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">10000</td>
       <td>Integer</td>
       <td>Maximum number of allowed concurrent requests for the EFO client. See <a href="#enhanced-fan-out">Enhanced Fan-Out</a> for details.</td>
@@ -318,6 +342,7 @@ Connector Options
     <tr>
       <td><h5>scan.stream.describe.maxretries</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">50</td>
       <td>Integer</td>
       <td>The maximum number of <code>describeStream</code> attempts if we get a recoverable exception.</td>
@@ -325,6 +350,7 @@ Connector Options
     <tr>
       <td><h5>scan.stream.describe.backoff.base</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">2000</td>
       <td>Long</td>
       <td>The base backoff time (in milliseconds) between each <code>describeStream</code> attempt (for consuming from DynamoDB streams).</td>
@@ -332,6 +358,7 @@ Connector Options
     <tr>
       <td><h5>scan.stream.describe.backoff.max</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">5000</td>
       <td>Long</td>
       <td>The maximum backoff time (in milliseconds)  between each <code>describeStream</code> attempt (for consuming from DynamoDB streams).</td>
@@ -339,6 +366,7 @@ Connector Options
     <tr>
       <td><h5>scan.stream.describe.backoff.expconst</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">1.5</td>
       <td>Double</td>
       <td>The power constant for exponential backoff between each <code>describeStream</code> attempt (for consuming from DynamoDB streams).</td>
@@ -346,6 +374,7 @@ Connector Options
     <tr>
       <td><h5>scan.list.shards.maxretries</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">10</td>
       <td>Integer</td>
       <td>The maximum number of <code>listShards</code> attempts if we get a recoverable exception.</td>
@@ -353,6 +382,7 @@ Connector Options
     <tr>
       <td><h5>scan.list.shards.backoff.base</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">1000</td>
       <td>Long</td>
       <td>The base backoff time (in milliseconds) between each <code>listShards</code> attempt.</td>
@@ -360,6 +390,7 @@ Connector Options
     <tr>
       <td><h5>scan.list.shards.backoff.max</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">5000</td>
       <td>Long</td>
       <td>The maximum backoff time (in milliseconds) between each <code>listShards</code> attempt.</td>
@@ -367,6 +398,7 @@ Connector Options
     <tr>
       <td><h5>scan.list.shards.backoff.expconst</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">1.5</td>
       <td>Double</td>
       <td>The power constant for exponential backoff between each <code>listShards</code> attempt.</td>
@@ -374,6 +406,7 @@ Connector Options
     <tr>
       <td><h5>scan.stream.describestreamconsumer.maxretries</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">50</td>
       <td>Integer</td>
       <td>The maximum number of <code>describeStreamConsumer</code> attempts if we get a recoverable exception.</td>
@@ -381,6 +414,7 @@ Connector Options
     <tr>
       <td><h5>scan.stream.describestreamconsumer.backoff.base</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">2000</td>
       <td>Long</td>
       <td>The base backoff time (in milliseconds) between each <code>describeStreamConsumer</code> attempt.</td>
@@ -388,6 +422,7 @@ Connector Options
     <tr>
       <td><h5>scan.stream.describestreamconsumer.backoff.max</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">5000</td>
       <td>Long</td>
       <td>The maximum backoff time (in milliseconds) between each <code>describeStreamConsumer</code> attempt.</td>
@@ -395,6 +430,7 @@ Connector Options
     <tr>
       <td><h5>scan.stream.describestreamconsumer.backoff.expconst</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">1.5</td>
       <td>Double</td>
       <td>The power constant for exponential backoff between each <code>describeStreamConsumer</code> attempt.</td>
@@ -402,6 +438,7 @@ Connector Options
     <tr>
       <td><h5>scan.stream.registerstreamconsumer.maxretries</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">10</td>
       <td>Integer</td>
       <td>The maximum number of <code>registerStream</code> attempts if we get a recoverable exception.</td>
@@ -409,6 +446,7 @@ Connector Options
     <tr>
       <td><h5>scan.stream.registerstreamconsumer.timeout</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">60</td>
       <td>Integer</td>
       <td>The maximum time in seconds to wait for a stream consumer to become active before giving up.</td>
@@ -416,6 +454,7 @@ Connector Options
     <tr>
       <td><h5>scan.stream.registerstreamconsumer.backoff.base</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">500</td>
       <td>Long</td>
       <td>The base backoff time (in milliseconds) between each <code>registerStream</code> attempt.</td>
@@ -423,6 +462,7 @@ Connector Options
     <tr>
       <td><h5>scan.stream.registerstreamconsumer.backoff.max</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">2000</td>
       <td>Long</td>
       <td>The maximum backoff time (in milliseconds) between each <code>registerStream</code> attempt.</td>
@@ -430,6 +470,7 @@ Connector Options
     <tr>
       <td><h5>scan.stream.registerstreamconsumer.backoff.expconst</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">1.5</td>
       <td>Double</td>
       <td>The power constant for exponential backoff between each <code>registerStream</code> attempt.</td>
@@ -437,6 +478,7 @@ Connector Options
     <tr>
       <td><h5>scan.stream.deregisterstreamconsumer.maxretries</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">10</td>
       <td>Integer</td>
       <td>The maximum number of <code>deregisterStream</code> attempts if we get a recoverable exception.</td>
@@ -444,6 +486,7 @@ Connector Options
     <tr>
       <td><h5>scan.stream.deregisterstreamconsumer.timeout</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">60</td>
       <td>Integer</td>
       <td>The maximum time in seconds to wait for a stream consumer to deregister before giving up.</td>
@@ -451,6 +494,7 @@ Connector Options
     <tr>
       <td><h5>scan.stream.deregisterstreamconsumer.backoff.base</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">500</td>
       <td>Long</td>
       <td>The base backoff time (in milliseconds) between each <code>deregisterStream</code> attempt.</td>
@@ -458,6 +502,7 @@ Connector Options
     <tr>
       <td><h5>scan.stream.deregisterstreamconsumer.backoff.max</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">2000</td>
       <td>Long</td>
       <td>The maximum backoff time (in milliseconds) between each <code>deregisterStream</code> attempt.</td>
@@ -465,6 +510,7 @@ Connector Options
     <tr>
       <td><h5>scan.stream.deregisterstreamconsumer.backoff.expconst</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">1.5</td>
       <td>Double</td>
       <td>The power constant for exponential backoff between each <code>deregisterStream</code> attempt.</td>
@@ -472,6 +518,7 @@ Connector Options
     <tr>
       <td><h5>scan.shard.subscribetoshard.maxretries</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">10</td>
       <td>Integer</td>
       <td>The maximum number of <code>subscribeToShard</code> attempts if we get a recoverable exception.</td>
@@ -479,6 +526,7 @@ Connector Options
     <tr>
       <td><h5>scan.shard.subscribetoshard.backoff.base</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">1000</td>
       <td>Long</td>
       <td>The base backoff time (in milliseconds) between each <code>subscribeToShard</code> attempt.</td>
@@ -486,6 +534,7 @@ Connector Options
     <tr>
       <td><h5>scan.shard.subscribetoshard.backoff.max</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">2000</td>
       <td>Long</td>
       <td>The maximum backoff time (in milliseconds) between each <code>subscribeToShard</code> attempt.</td>
@@ -493,6 +542,7 @@ Connector Options
     <tr>
       <td><h5>scan.shard.subscribetoshard.backoff.expconst</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">1.5</td>
       <td>Double</td>
       <td>The power constant for exponential backoff between each <code>subscribeToShard</code> attempt.</td>
@@ -500,6 +550,7 @@ Connector Options
     <tr>
       <td><h5>scan.shard.getrecords.maxrecordcount</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">10000</td>
       <td>Integer</td>
       <td>The maximum number of records to try to get each time we fetch records from a AWS Kinesis shard.</td>
@@ -507,6 +558,7 @@ Connector Options
     <tr>
       <td><h5>scan.shard.getrecords.maxretries</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">3</td>
       <td>Integer</td>
       <td>The maximum number of <code>getRecords</code> attempts if we get a recoverable exception.</td>
@@ -514,6 +566,7 @@ Connector Options
     <tr>
       <td><h5>scan.shard.getrecords.backoff.base</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">300</td>
       <td>Long</td>
       <td>The base backoff time (in milliseconds) between <code>getRecords</code> attempts if we get a ProvisionedThroughputExceededException.</td>
@@ -521,6 +574,7 @@ Connector Options
     <tr>
       <td><h5>scan.shard.getrecords.backoff.max</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">1000</td>
       <td>Long</td>
       <td>The maximum backoff time (in milliseconds) between <code>getRecords</code> attempts if we get a ProvisionedThroughputExceededException.</td>
@@ -528,6 +582,7 @@ Connector Options
     <tr>
       <td><h5>scan.shard.getrecords.backoff.expconst</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">1.5</td>
       <td>Double</td>
       <td>The power constant for exponential backoff between each <code>getRecords</code> attempt.</td>
@@ -535,6 +590,7 @@ Connector Options
     <tr>
       <td><h5>scan.shard.getrecords.intervalmillis</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">200</td>
       <td>Long</td>
       <td>The interval (in milliseconds) between each <code>getRecords</code> request to a AWS Kinesis shard in milliseconds.</td>
@@ -542,6 +598,7 @@ Connector Options
     <tr>
       <td><h5>scan.shard.getiterator.maxretries</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">3</td>
       <td>Integer</td>
       <td>The maximum number of <code>getShardIterator</code> attempts if we get ProvisionedThroughputExceededException.</td>
@@ -549,6 +606,7 @@ Connector Options
     <tr>
       <td><h5>scan.shard.getiterator.backoff.base</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">300</td>
       <td>Long</td>
       <td>The base backoff time (in milliseconds) between <code>getShardIterator</code> attempts if we get a ProvisionedThroughputExceededException.</td>
@@ -556,6 +614,7 @@ Connector Options
     <tr>
       <td><h5>scan.shard.getiterator.backoff.max</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">1000</td>
       <td>Long</td>
       <td>The maximum backoff time (in milliseconds) between <code>getShardIterator</code> attempts if we get a ProvisionedThroughputExceededException.</td>
@@ -563,6 +622,7 @@ Connector Options
     <tr>
       <td><h5>scan.shard.getiterator.backoff.expconst</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">1.5</td>
       <td>Double</td>
       <td>The power constant for exponential backoff between each <code>getShardIterator</code> attempt.</td>
@@ -570,6 +630,7 @@ Connector Options
     <tr>
       <td><h5>scan.shard.discovery.intervalmillis</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">10000</td>
       <td>Integer</td>
       <td>The interval between each attempt to discover new shards.</td>
@@ -577,6 +638,7 @@ Connector Options
     <tr>
       <td><h5>scan.shard.adaptivereads</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">false</td>
       <td>Boolean</td>
       <td>The config to turn on adaptive reads from a shard. See the <code>AdaptivePollingRecordPublisher</code> documentation for details.</td>
@@ -584,6 +646,7 @@ Connector Options
     <tr>
       <td><h5>scan.shard.idle.interval</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">-1</td>
       <td>Long</td>
       <td>The interval (in milliseconds) after which to consider a shard idle for purposes of watermark generation. A positive value will allow the watermark to progress even when some shards don't receive new records.</td>
@@ -591,6 +654,7 @@ Connector Options
     <tr>
       <td><h5>scan.watermark.sync.interval</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">30000</td>
       <td>Long</td>
       <td>The interval (in milliseconds) for periodically synchronizing the shared watermark state.</td>
@@ -598,6 +662,7 @@ Connector Options
     <tr>
       <td><h5>scan.watermark.lookahead.millis</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">0</td>
       <td>Long</td>
       <td>The maximum delta (in milliseconds) allowed for the reader to advance ahead of the shared global watermark.</td>
@@ -605,6 +670,7 @@ Connector Options
     <tr>
       <td><h5>scan.watermark.sync.queue.capacity</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">100</td>
       <td>Integer</td>
       <td>The maximum number of records that will be buffered before suspending consumption of a shard.</td>
@@ -619,6 +685,7 @@ Connector Options
     <tr>
       <td><h5>sink.partitioner</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">random or row-based</td>
       <td>String</td>
       <td>Optional output partitioning from Flink's partitions into Kinesis shards. See <a href="#sink-partitioning">Sink Partitioning</a> for details.</td>
@@ -626,6 +693,7 @@ Connector Options
     <tr>
       <td><h5>sink.partitioner-field-delimiter</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">|</td>
       <td>String</td>
       <td>Optional field delimiter for a fields-based partitioner derived from a PARTITION BY clause. See <a href="#sink-partitioning">Sink Partitioning</a> for details.</td>
@@ -633,6 +701,7 @@ Connector Options
     <tr>
       <td><h5>sink.producer.*</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td></td>
       <td>
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/table/KinesisDynamicTableFactory.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/table/KinesisDynamicTableFactory.java
index 2f7e569..7a7dc96 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/table/KinesisDynamicTableFactory.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/table/KinesisDynamicTableFactory.java
@@ -31,11 +31,14 @@ import org.apache.flink.table.factories.DynamicTableSourceFactory;
 import org.apache.flink.table.factories.FactoryUtil;
 import org.apache.flink.table.types.DataType;
 
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.Properties;
 import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
+import static org.apache.flink.connector.kinesis.table.KinesisConnectorOptions.SINK_PARTITIONER;
+import static org.apache.flink.connector.kinesis.table.KinesisConnectorOptions.SINK_PARTITIONER_FIELD_DELIMITER;
 import static org.apache.flink.connector.kinesis.table.KinesisConnectorOptions.STREAM;
 import static org.apache.flink.table.factories.FactoryUtil.FORMAT;
 
@@ -82,6 +85,15 @@ public class KinesisDynamicTableFactory implements DynamicTableSourceFactory {
 
     @Override
     public Set<ConfigOption<?>> optionalOptions() {
-        return Collections.emptySet();
+        final Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(SINK_PARTITIONER);
+        options.add(SINK_PARTITIONER_FIELD_DELIMITER);
+        return options;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> forwardOptions() {
+        return Stream.of(STREAM, SINK_PARTITIONER, SINK_PARTITIONER_FIELD_DELIMITER)
+                .collect(Collectors.toSet());
     }
 }

[flink] 01/09: [FLINK-25391][connector-elasticsearch] Forward catalog table options

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5e28f66f6ef2ed03f9ee69148fe5079ae5e358c4
Author: slinkydeveloper <fr...@gmail.com>
AuthorDate: Thu Jan 6 14:52:38 2022 +0100

    [FLINK-25391][connector-elasticsearch] Forward catalog table options
---
 .../content/docs/connectors/table/elasticsearch.md | 23 +++++++++-
 .../table/ElasticsearchDynamicSinkFactoryBase.java | 50 ++++++++++++++--------
 .../table/Elasticsearch6DynamicSinkFactory.java    | 21 +++++----
 3 files changed, 66 insertions(+), 28 deletions(-)

diff --git a/docs/content/docs/connectors/table/elasticsearch.md b/docs/content/docs/connectors/table/elasticsearch.md
index 22f0b60..b5ae31d 100644
--- a/docs/content/docs/connectors/table/elasticsearch.md
+++ b/docs/content/docs/connectors/table/elasticsearch.md
@@ -67,15 +67,17 @@ Connector Options
       <tr>
         <th class="text-left" style="width: 25%">Option</th>
         <th class="text-center" style="width: 8%">Required</th>
+        <th class="text-center" style="width: 8%">Forwarded</th>
         <th class="text-center" style="width: 7%">Default</th>
         <th class="text-center" style="width: 10%">Type</th>
-        <th class="text-center" style="width: 50%">Description</th>
+        <th class="text-center" style="width: 42%">Description</th>
       </tr>
     </thead>
     <tbody>
     <tr>
       <td><h5>connector</h5></td>
       <td>required</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>Specify what connector to use, valid values are:
@@ -87,6 +89,7 @@ Connector Options
     <tr>
       <td><h5>hosts</h5></td>
       <td>required</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>One or more Elasticsearch hosts to connect to, e.g. <code>'http://host_name:9092;http://host_name:9093'</code>.</td>
@@ -94,6 +97,7 @@ Connector Options
     <tr>
       <td><h5>index</h5></td>
       <td>required</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>Elasticsearch index for every record. Can be a static index (e.g. <code>'myIndex'</code>) or
@@ -103,6 +107,7 @@ Connector Options
     <tr>
       <td><h5>document-type</h5></td>
       <td>required in 6.x</td>
+      <td>yes in 6.x</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>Elasticsearch document type. Not necessary anymore in <code>elasticsearch-7</code>.</td>
@@ -110,6 +115,7 @@ Connector Options
     <tr>
       <td><h5>document-id.key-delimiter</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">_</td>
       <td>String</td>
       <td>Delimiter for composite keys ("_" by default), e.g., "$" would result in IDs "KEY1$KEY2$KEY3".</td>
@@ -117,6 +123,7 @@ Connector Options
     <tr>
       <td><h5>username</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>Username used to connect to Elasticsearch instance. Please notice that Elasticsearch doesn't pre-bundled security feature, but you can enable it by following the <a href="https://www.elastic.co/guide/en/elasticsearch/reference/master/configuring-security.html">guideline</a> to secure an Elasticsearch cluster.</td>
@@ -124,6 +131,7 @@ Connector Options
     <tr>
       <td><h5>password</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>Password used to connect to Elasticsearch instance. If <code>username</code> is configured, this option must be configured with non-empty string as well.</td>
@@ -131,6 +139,7 @@ Connector Options
     <tr>
       <td><h5>sink.delivery-guarantee</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">NONE</td>
       <td>String</td>
       <td>Optional delivery guarantee when committing. Valid values are <code>NONE</code> or <code>AT_LEAST_ONCE</code>.</td>
@@ -138,6 +147,7 @@ Connector Options
     <tr>
       <td><h5>sink.bulk-flush.max-actions</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">1000</td>
       <td>Integer</td>
       <td>Maximum number of buffered actions per bulk request.
@@ -147,6 +157,7 @@ Connector Options
     <tr>
       <td><h5>sink.bulk-flush.max-size</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">2mb</td>
       <td>MemorySize</td>
       <td>Maximum size in memory of buffered actions per bulk request. Must be in MB granularity.
@@ -156,6 +167,7 @@ Connector Options
     <tr>
       <td><h5>sink.bulk-flush.interval</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">1s</td>
       <td>Duration</td>
       <td>The interval to flush buffered actions.
@@ -166,6 +178,7 @@ Connector Options
     <tr>
       <td><h5>sink.bulk-flush.backoff.strategy</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">NONE</td>
       <td>String</td>
       <td>Specify how to perform retries if any flush actions failed due to a temporary request error. Valid strategies are:
@@ -179,6 +192,7 @@ Connector Options
     <tr>
       <td><h5>sink.bulk-flush.backoff.max-retries</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>Integer</td>
       <td>Maximum number of backoff retries.</td>
@@ -186,6 +200,7 @@ Connector Options
     <tr>
       <td><h5>sink.bulk-flush.backoff.delay</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>Duration</td>
       <td>Delay between each backoff attempt. For <code>CONSTANT</code> backoff, this is simply the delay between each retry. For <code>EXPONENTIAL</code> backoff, this is the initial base delay.</td>
@@ -193,6 +208,7 @@ Connector Options
     <tr>
       <td><h5>sink.parallelism</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>Integer</td>
       <td>Defines the parallelism of the Elasticsearch sink operator. By default, the parallelism is determined by the framework using the same parallelism of the upstream chained operator.</td>
@@ -200,6 +216,7 @@ Connector Options
     <tr>
       <td><h5>connection.path-prefix</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>Prefix string to be added to every REST communication, e.g., <code>'/v1'</code>.</td>
@@ -207,6 +224,7 @@ Connector Options
     <tr>
       <td><h5>connection.request-timeout</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>Duration</td>
       <td>The timeout in milliseconds for requesting a connection from the connection manager.
@@ -217,6 +235,7 @@ Connector Options
     <tr>
       <td><h5>connection.timeout</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>Duration</td>
       <td>The timeout in milliseconds for establishing a connection.
@@ -227,6 +246,7 @@ Connector Options
     <tr>
       <td><h5>socket.timeout</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>Duration</td>
       <td>The socket timeout (SO_TIMEOUT) for waiting for data or, put differently,
@@ -238,6 +258,7 @@ Connector Options
     <tr>
       <td><h5>format</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">json</td>
       <td>String</td>
       <td>Elasticsearch connector supports to specify a format. The format must produce a valid json document.
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSinkFactoryBase.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSinkFactoryBase.java
index 677a8e3..af35888 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSinkFactoryBase.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSinkFactoryBase.java
@@ -21,7 +21,6 @@ package org.apache.flink.connector.elasticsearch.table;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ResolvedSchema;
@@ -29,7 +28,6 @@ import org.apache.flink.table.connector.Projection;
 import org.apache.flink.table.connector.format.EncodingFormat;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.factories.DynamicTableFactory;
 import org.apache.flink.table.factories.DynamicTableSinkFactory;
 import org.apache.flink.table.factories.FactoryUtil;
 import org.apache.flink.table.factories.SerializationFormatFactory;
@@ -83,7 +81,7 @@ abstract class ElasticsearchDynamicSinkFactoryBase implements DynamicTableSinkFa
     }
 
     @Nullable
-    String getDocumentType(Context context) {
+    String getDocumentType(ElasticsearchConfiguration configuration) {
         return null; // document type is only set in Elasticsearch versions < 7
     }
 
@@ -91,10 +89,14 @@ abstract class ElasticsearchDynamicSinkFactoryBase implements DynamicTableSinkFa
     public DynamicTableSink createDynamicTableSink(Context context) {
         List<LogicalTypeWithIndex> primaryKeyLogicalTypesWithIndex =
                 getPrimaryKeyLogicalTypesWithIndex(context);
+
+        final FactoryUtil.TableFactoryHelper helper =
+                FactoryUtil.createTableFactoryHelper(this, context);
         EncodingFormat<SerializationSchema<RowData>> format =
-                getValidatedEncodingFormat(this, context);
+                helper.discoverEncodingFormat(SerializationFormatFactory.class, FORMAT_OPTION);
 
-        ElasticsearchConfiguration config = getConfiguration(context);
+        ElasticsearchConfiguration config = getConfiguration(helper);
+        helper.validate();
         validateConfiguration(config);
 
         return new ElasticsearchDynamicSink(
@@ -104,12 +106,11 @@ abstract class ElasticsearchDynamicSinkFactoryBase implements DynamicTableSinkFa
                 context.getPhysicalRowDataType(),
                 capitalize(factoryIdentifier),
                 sinkBuilderSupplier,
-                getDocumentType(context));
+                getDocumentType(config));
     }
 
-    ElasticsearchConfiguration getConfiguration(Context context) {
-        return new ElasticsearchConfiguration(
-                Configuration.fromMap(context.getCatalogTable().getOptions()));
+    ElasticsearchConfiguration getConfiguration(FactoryUtil.TableFactoryHelper helper) {
+        return new ElasticsearchConfiguration(helper.getOptions());
     }
 
     void validateConfiguration(ElasticsearchConfiguration config) {
@@ -161,16 +162,6 @@ abstract class ElasticsearchDynamicSinkFactoryBase implements DynamicTableSinkFa
         }
     }
 
-    EncodingFormat<SerializationSchema<RowData>> getValidatedEncodingFormat(
-            DynamicTableFactory factory, DynamicTableFactory.Context context) {
-        final FactoryUtil.TableFactoryHelper helper =
-                FactoryUtil.createTableFactoryHelper(factory, context);
-        final EncodingFormat<SerializationSchema<RowData>> format =
-                helper.discoverEncodingFormat(SerializationFormatFactory.class, FORMAT_OPTION);
-        helper.validate();
-        return format;
-    }
-
     List<LogicalTypeWithIndex> getPrimaryKeyLogicalTypesWithIndex(Context context) {
         DataType physicalRowDataType = context.getPhysicalRowDataType();
         int[] primaryKeyIndexes = context.getPrimaryKeyIndexes();
@@ -225,6 +216,27 @@ abstract class ElasticsearchDynamicSinkFactoryBase implements DynamicTableSinkFa
     }
 
     @Override
+    public Set<ConfigOption<?>> forwardOptions() {
+        return Stream.of(
+                        HOSTS_OPTION,
+                        INDEX_OPTION,
+                        PASSWORD_OPTION,
+                        USERNAME_OPTION,
+                        KEY_DELIMITER_OPTION,
+                        BULK_FLUSH_MAX_ACTIONS_OPTION,
+                        BULK_FLUSH_MAX_SIZE_OPTION,
+                        BULK_FLUSH_INTERVAL_OPTION,
+                        BULK_FLUSH_BACKOFF_TYPE_OPTION,
+                        BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION,
+                        BULK_FLUSH_BACKOFF_DELAY_OPTION,
+                        CONNECTION_PATH_PREFIX_OPTION,
+                        CONNECTION_REQUEST_TIMEOUT,
+                        CONNECTION_TIMEOUT,
+                        SOCKET_TIMEOUT)
+                .collect(Collectors.toSet());
+    }
+
+    @Override
     public String factoryIdentifier() {
         return factoryIdentifier;
     }
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch6DynamicSinkFactory.java b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch6DynamicSinkFactory.java
index 2bb2c8a..6957697 100644
--- a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch6DynamicSinkFactory.java
+++ b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch6DynamicSinkFactory.java
@@ -20,14 +20,16 @@ package org.apache.flink.connector.elasticsearch.table;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.elasticsearch.sink.Elasticsearch6SinkBuilder;
 import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.FactoryUtil;
 import org.apache.flink.util.StringUtils;
 
 import javax.annotation.Nullable;
 
 import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.apache.flink.connector.elasticsearch.table.Elasticsearch6ConnectorOptions.DOCUMENT_TYPE_OPTION;
 
@@ -41,17 +43,14 @@ public class Elasticsearch6DynamicSinkFactory extends ElasticsearchDynamicSinkFa
     }
 
     @Override
-    ElasticsearchConfiguration getConfiguration(Context context) {
-        return new Elasticsearch6Configuration(
-                Configuration.fromMap(context.getCatalogTable().getOptions()));
+    ElasticsearchConfiguration getConfiguration(FactoryUtil.TableFactoryHelper helper) {
+        return new Elasticsearch6Configuration(helper.getOptions());
     }
 
     @Nullable
     @Override
-    String getDocumentType(Context context) {
-        Elasticsearch6Configuration config =
-                (Elasticsearch6Configuration) getConfiguration(context);
-        return config.getDocumentType();
+    String getDocumentType(ElasticsearchConfiguration configuration) {
+        return ((Elasticsearch6Configuration) configuration).getDocumentType();
     }
 
     @Override
@@ -69,4 +68,10 @@ public class Elasticsearch6DynamicSinkFactory extends ElasticsearchDynamicSinkFa
         requiredOptions.add(DOCUMENT_TYPE_OPTION);
         return requiredOptions;
     }
+
+    @Override
+    public Set<ConfigOption<?>> forwardOptions() {
+        return Stream.concat(super.forwardOptions().stream(), Stream.of(DOCUMENT_TYPE_OPTION))
+                .collect(Collectors.toSet());
+    }
 }

[flink] 04/09: [FLINK-25391][connector-kafka] Forward catalog table options

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0c34c994f9906e58963f85739fc951221b11d26a
Author: slinkydeveloper <fr...@gmail.com>
AuthorDate: Thu Jan 6 16:20:12 2022 +0100

    [FLINK-25391][connector-kafka] Forward catalog table options
---
 docs/content/docs/connectors/table/kafka.md        | 24 +++++++++++++++++++++-
 .../kafka/table/KafkaDynamicTableFactory.java      | 23 +++++++++++++--------
 2 files changed, 38 insertions(+), 9 deletions(-)

diff --git a/docs/content/docs/connectors/table/kafka.md b/docs/content/docs/connectors/table/kafka.md
index 6b728f6..34d73c4 100644
--- a/docs/content/docs/connectors/table/kafka.md
+++ b/docs/content/docs/connectors/table/kafka.md
@@ -179,15 +179,17 @@ Connector Options
     <tr>
       <th class="text-left" style="width: 25%">Option</th>
       <th class="text-center" style="width: 8%">Required</th>
+      <th class="text-center" style="width: 8%">Forwarded</th>
       <th class="text-center" style="width: 7%">Default</th>
       <th class="text-center" style="width: 10%">Type</th>
-      <th class="text-center" style="width: 50%">Description</th>
+      <th class="text-center" style="width: 42%">Description</th>
     </tr>
     </thead>
     <tbody>
     <tr>
       <td><h5>connector</h5></td>
       <td>required</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>Specify what connector to use, for Kafka use <code>'kafka'</code>.</td>
@@ -195,6 +197,7 @@ Connector Options
     <tr>
       <td><h5>topic</h5></td>
       <td>required for sink</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>Topic name(s) to read data from when the table is used as source. It also supports topic list for source by separating topic by semicolon like <code>'topic-1;topic-2'</code>. Note, only one of "topic-pattern" and "topic" can be specified for sources. When the table is used as sink, the topic name is the topic to write data to. Note topic list is not supported for sinks.</td>
@@ -202,6 +205,7 @@ Connector Options
     <tr>
       <td><h5>topic-pattern</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>The regular expression for a pattern of topic names to read from. All topics with names that match the specified regular expression will be subscribed by the consumer when the job starts running. Note, only one of "topic-pattern" and "topic" can be specified for sources.</td>
@@ -209,6 +213,7 @@ Connector Options
     <tr>
       <td><h5>properties.bootstrap.servers</h5></td>
       <td>required</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>Comma separated list of Kafka brokers.</td>
@@ -216,6 +221,7 @@ Connector Options
     <tr>
       <td><h5>properties.group.id</h5></td>
       <td>optional for source, not applicable for sink</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>The id of the consumer group for Kafka source. If group ID is not specified, an automatically generated id "KafkaSource-{tableIdentifier}" will be used.</td>
@@ -223,6 +229,7 @@ Connector Options
     <tr>
       <td><h5>properties.*</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>
@@ -232,6 +239,7 @@ Connector Options
     <tr>
       <td><h5>format</h5></td>
       <td>required</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>The format used to deserialize and serialize the value part of Kafka messages.
@@ -243,6 +251,7 @@ Connector Options
     <tr>
       <td><h5>key.format</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>The format used to deserialize and serialize the key part of Kafka messages.
@@ -254,6 +263,7 @@ Connector Options
     <tr>
       <td><h5>key.fields</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">[]</td>
       <td>List&lt;String&gt;</td>
       <td>Defines an explicit list of physical columns from the table schema that configure the data
@@ -264,6 +274,7 @@ Connector Options
     <tr>
       <td><h5>key.fields-prefix</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>Defines a custom prefix for all fields of the key format to avoid name clashes with fields
@@ -277,6 +288,7 @@ Connector Options
     <tr>
       <td><h5>value.format</h5></td>
       <td>required</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>The format used to deserialize and serialize the value part of Kafka messages.
@@ -288,6 +300,7 @@ Connector Options
     <tr>
       <td><h5>value.fields-include</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">ALL</td>
       <td><p>Enum</p>Possible values: [ALL, EXCEPT_KEY]</td>
       <td>Defines a strategy how to deal with key columns in the data type of the value format. By
@@ -298,6 +311,7 @@ Connector Options
     <tr>
       <td><h5>scan.startup.mode</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">group-offsets</td>
       <td>String</td>
       <td>Startup mode for Kafka consumer, valid values are <code>'earliest-offset'</code>, <code>'latest-offset'</code>, <code>'group-offsets'</code>, <code>'timestamp'</code> and <code>'specific-offsets'</code>.
@@ -306,6 +320,7 @@ Connector Options
     <tr>
       <td><h5>scan.startup.specific-offsets</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>Specify offsets for each partition in case of <code>'specific-offsets'</code> startup mode, e.g. <code>'partition:0,offset:42;partition:1,offset:300'</code>.
@@ -314,6 +329,7 @@ Connector Options
     <tr>
       <td><h5>scan.startup.timestamp-millis</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>Long</td>
       <td>Start from the specified epoch timestamp (milliseconds) used in case of <code>'timestamp'</code> startup mode.</td>
@@ -321,6 +337,7 @@ Connector Options
     <tr>
       <td><h5>scan.topic-partition-discovery.interval</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>Duration</td>
       <td>Interval for consumer to discover dynamically created Kafka topics and partitions periodically.</td>
@@ -328,6 +345,7 @@ Connector Options
     <tr>
       <td><h5>sink.partitioner</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">'default'</td>
       <td>String</td>
       <td>Output partitioning from Flink's partitions into Kafka's partitions. Valid values are
@@ -343,6 +361,7 @@ Connector Options
     <tr>
       <td><h5>sink.semantic</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">at-least-once</td>
       <td>String</td>
       <td>Deprecated: Please use <code>sink.delivery-guarantee</code>.</td>
@@ -350,6 +369,7 @@ Connector Options
     <tr>
       <td><h5>sink.delivery-guarantee</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">at-least-once</td>
       <td>String</td>
       <td>Defines the delivery semantic for the Kafka sink. Valid enumerationns are <code>'at-least-once'</code>, <code>'exactly-once'</code> and <code>'none'</code>. See <a href='#consistency-guarantees'>Consistency guarantees</a> for more details. </td>
@@ -357,6 +377,7 @@ Connector Options
     <tr>
       <td><h5>sink.transactional-id-prefix</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>If the delivery guarantee is configured as <code>'exactly-once'</code> this value must be set and is used a prefix for the identifier of all opened Kafka transactions.</td>
@@ -364,6 +385,7 @@ Connector Options
     <tr>
       <td><h5>sink.parallelism</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>Integer</td>
       <td>Defines the parallelism of the Kafka sink operator. By default, the parallelism is determined by the framework using the same parallelism of the upstream chained operator.</td>
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java
index 290f396..5ac146d 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java
@@ -60,6 +60,8 @@ import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.DELIVERY_GUARANTEE;
 import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS;
@@ -146,14 +148,19 @@ public class KafkaDynamicTableFactory
 
     @Override
     public Set<ConfigOption<?>> forwardOptions() {
-        final Set<ConfigOption<?>> options = new HashSet<>();
-        options.add(TOPIC);
-        options.add(TOPIC_PATTERN);
-        options.add(SCAN_STARTUP_MODE);
-        options.add(SCAN_STARTUP_SPECIFIC_OFFSETS);
-        options.add(SCAN_TOPIC_PARTITION_DISCOVERY);
-        options.add(SCAN_STARTUP_TIMESTAMP_MILLIS);
-        return options;
+        return Stream.of(
+                        PROPS_BOOTSTRAP_SERVERS,
+                        PROPS_GROUP_ID,
+                        TOPIC,
+                        TOPIC_PATTERN,
+                        SCAN_STARTUP_MODE,
+                        SCAN_STARTUP_SPECIFIC_OFFSETS,
+                        SCAN_TOPIC_PARTITION_DISCOVERY,
+                        SCAN_STARTUP_TIMESTAMP_MILLIS,
+                        SINK_PARTITIONER,
+                        SINK_PARALLELISM,
+                        TRANSACTIONAL_ID_PREFIX)
+                .collect(Collectors.toSet());
     }
 
     @Override

[flink] 08/09: [FLINK-25391][format-csv] Forward catalog table options

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0370c36eff86a0af9485405ca3a51663c33cbadf
Author: slinkydeveloper <fr...@gmail.com>
AuthorDate: Thu Jan 6 16:42:25 2022 +0100

    [FLINK-25391][format-csv] Forward catalog table options
---
 docs/content/docs/connectors/table/formats/csv.md           | 12 +++++++++++-
 .../java/org/apache/flink/formats/csv/CsvFormatFactory.java | 13 +++++++++++++
 2 files changed, 24 insertions(+), 1 deletion(-)

diff --git a/docs/content/docs/connectors/table/formats/csv.md b/docs/content/docs/connectors/table/formats/csv.md
index 1e9918c..e7cc549 100644
--- a/docs/content/docs/connectors/table/formats/csv.md
+++ b/docs/content/docs/connectors/table/formats/csv.md
@@ -67,15 +67,17 @@ Format Options
       <tr>
         <th class="text-left" style="width: 25%">Option</th>
         <th class="text-center" style="width: 8%">Required</th>
+        <th class="text-center" style="width: 8%">Forwarded</th>
         <th class="text-center" style="width: 7%">Default</th>
         <th class="text-center" style="width: 10%">Type</th>
-        <th class="text-center" style="width: 50%">Description</th>
+        <th class="text-center" style="width: 42%">Description</th>
       </tr>
     </thead>
     <tbody>
     <tr>
       <td><h5>format</h5></td>
       <td>required</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>Specify what format to use, here should be <code>'csv'</code>.</td>
@@ -83,6 +85,7 @@ Format Options
     <tr>
       <td><h5>csv.field-delimiter</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;"><code>,</code></td>
       <td>String</td>
       <td>Field delimiter character (<code>','</code> by default), must be single character. You can use backslash to specify special characters, e.g. <code>'\t'</code> represents the tab character.
@@ -92,6 +95,7 @@ Format Options
     <tr>
       <td><h5>csv.disable-quote-character</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">false</td>
       <td>Boolean</td>
       <td>Disabled quote character for enclosing field values (false by default).
@@ -100,6 +104,7 @@ Format Options
     <tr>
       <td><h5>csv.quote-character</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;"><code>"</code></td>
       <td>String</td>
       <td>Quote character for enclosing field values (<code>"</code> by default).</td>
@@ -107,6 +112,7 @@ Format Options
     <tr>
       <td><h5>csv.allow-comments</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">false</td>
       <td>Boolean</td>
       <td>Ignore comment lines that start with <code>'#'</code> (disabled by default).
@@ -115,6 +121,7 @@ Format Options
     <tr>
       <td><h5>csv.ignore-parse-errors</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">false</td>
       <td>Boolean</td>
       <td>Skip fields and rows with parse errors instead of failing.
@@ -123,6 +130,7 @@ Format Options
     <tr>
       <td><h5>csv.array-element-delimiter</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;"><code>;</code></td>
       <td>String</td>
       <td>Array element delimiter string for separating
@@ -131,6 +139,7 @@ Format Options
     <tr>
       <td><h5>csv.escape-character</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>Escape character for escaping values (disabled by default).</td>
@@ -138,6 +147,7 @@ Format Options
     <tr>
       <td><h5>csv.null-literal</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>Null literal string that is interpreted as a null value (disabled by default).</td>
diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFormatFactory.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFormatFactory.java
index 124f4a2..ddfd685 100644
--- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFormatFactory.java
+++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFormatFactory.java
@@ -137,6 +137,19 @@ public final class CsvFormatFactory
         return options;
     }
 
+    @Override
+    public Set<ConfigOption<?>> forwardOptions() {
+        Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(FIELD_DELIMITER);
+        options.add(DISABLE_QUOTE_CHARACTER);
+        options.add(QUOTE_CHARACTER);
+        options.add(ALLOW_COMMENTS);
+        options.add(ARRAY_ELEMENT_DELIMITER);
+        options.add(ESCAPE_CHARACTER);
+        options.add(NULL_LITERAL);
+        return options;
+    }
+
     // ------------------------------------------------------------------------
     //  Validation
     // ------------------------------------------------------------------------

[flink] 02/09: [FLINK-25391][connector-jdbc] Forward catalog table options

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c61162b30f4b5567ecc2ee29481fcc87e5016428
Author: slinkydeveloper <fr...@gmail.com>
AuthorDate: Thu Jan 6 15:01:39 2022 +0100

    [FLINK-25391][connector-jdbc] Forward catalog table options
---
 docs/content/docs/connectors/table/jdbc.md         | 24 +++++++++++++++++++++-
 .../jdbc/table/JdbcDynamicTableFactory.java        | 23 +++++++++++++++++++++
 2 files changed, 46 insertions(+), 1 deletion(-)

diff --git a/docs/content/docs/connectors/table/jdbc.md b/docs/content/docs/connectors/table/jdbc.md
index b289831..81a179a 100644
--- a/docs/content/docs/connectors/table/jdbc.md
+++ b/docs/content/docs/connectors/table/jdbc.md
@@ -93,15 +93,17 @@ Connector Options
       <tr>
         <th class="text-left" style="width: 25%">Option</th>
         <th class="text-left" style="width: 8%">Required</th>
+        <th class="text-left" style="width: 8%">Forwarded</th>
         <th class="text-left" style="width: 7%">Default</th>
         <th class="text-left" style="width: 10%">Type</th>
-        <th class="text-left" style="width: 50%">Description</th>
+        <th class="text-left" style="width: 42%">Description</th>
       </tr>
     </thead>
     <tbody>
     <tr>
       <td><h5>connector</h5></td>
       <td>required</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>Specify what connector to use, here should be <code>'jdbc'</code>.</td>
@@ -109,6 +111,7 @@ Connector Options
     <tr>
       <td><h5>url</h5></td>
       <td>required</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>The JDBC database url.</td>
@@ -116,6 +119,7 @@ Connector Options
     <tr>
       <td><h5>table-name</h5></td>
       <td>required</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>The name of JDBC table to connect.</td>
@@ -123,6 +127,7 @@ Connector Options
     <tr>
       <td><h5>driver</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>The class name of the JDBC driver to use to connect to this URL, if not set, it will automatically be derived from the URL.</td>
@@ -130,6 +135,7 @@ Connector Options
     <tr>
       <td><h5>username</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>The JDBC user name. <code>'username'</code> and <code>'password'</code> must both be specified if any of them is specified.</td>
@@ -137,6 +143,7 @@ Connector Options
     <tr>
       <td><h5>password</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>The JDBC password.</td>
@@ -144,6 +151,7 @@ Connector Options
     <tr>
       <td><h5>connection.max-retry-timeout</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">60s</td>
       <td>Duration</td>
       <td>Maximum timeout between retries. The timeout should be in second granularity and shouldn't be smaller than 1 second.</td>
@@ -151,6 +159,7 @@ Connector Options
     <tr>
       <td><h5>scan.partition.column</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>The column name used for partitioning the input. See the following <a href="#partitioned-scan">Partitioned Scan</a> section for more details.</td>
@@ -158,6 +167,7 @@ Connector Options
     <tr>
       <td><h5>scan.partition.num</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>Integer</td>
       <td>The number of partitions.</td>
@@ -165,6 +175,7 @@ Connector Options
     <tr>
       <td><h5>scan.partition.lower-bound</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>Integer</td>
       <td>The smallest value of the first partition.</td>
@@ -172,6 +183,7 @@ Connector Options
     <tr>
       <td><h5>scan.partition.upper-bound</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>Integer</td>
       <td>The largest value of the last partition.</td>
@@ -179,6 +191,7 @@ Connector Options
     <tr>
       <td><h5>scan.fetch-size</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">0</td>
       <td>Integer</td>
       <td>The number of rows that should be fetched from the database when reading per round trip. If the value specified is zero, then the hint is ignored.</td>
@@ -186,6 +199,7 @@ Connector Options
     <tr>
       <td><h5>scan.auto-commit</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">true</td>
       <td>Boolean</td>
       <td>Sets the <a href="https://docs.oracle.com/javase/tutorial/jdbc/basics/transactions.html#commit_transactions">auto-commit</a> flag on the JDBC driver,
@@ -195,6 +209,7 @@ Connector Options
     <tr>
       <td><h5>lookup.cache.max-rows</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>Integer</td>
       <td>The max number of rows of lookup cache, over this value, the oldest rows will be expired.
@@ -203,6 +218,7 @@ Connector Options
     <tr>
       <td><h5>lookup.cache.ttl</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>Duration</td>
       <td>The max time to live for each rows in lookup cache, over this time, the oldest rows will be expired.
@@ -211,6 +227,7 @@ Connector Options
     <tr>
       <td><h5>lookup.cache.caching-missing-key</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">true</td>
       <td>Boolean</td>
       <td>Flag to cache missing key, true by default</td>
@@ -218,6 +235,7 @@ Connector Options
     <tr>
       <td><h5>lookup.max-retries</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">3</td>
       <td>Integer</td>
       <td>The max retry times if lookup database failed.</td>
@@ -225,6 +243,7 @@ Connector Options
     <tr>
       <td><h5>sink.buffer-flush.max-rows</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">100</td>
       <td>Integer</td>
       <td>The max size of buffered records before flush. Can be set to zero to disable it.</td>
@@ -232,6 +251,7 @@ Connector Options
     <tr>
       <td><h5>sink.buffer-flush.interval</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">1s</td>
       <td>Duration</td>
       <td>The flush interval mills, over this time, asynchronous threads will flush data. Can be set to <code>'0'</code> to disable it. Note, <code>'sink.buffer-flush.max-rows'</code> can be set to <code>'0'</code> with the flush interval set allowing for complete async processing of buffered actions.</td>
@@ -239,6 +259,7 @@ Connector Options
     <tr>
       <td><h5>sink.max-retries</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">3</td>
       <td>Integer</td>
       <td>The max retry times if writing records to database failed.</td>
@@ -246,6 +267,7 @@ Connector Options
     <tr>
       <td><h5>sink.parallelism</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>Integer</td>
       <td>Defines the parallelism of the JDBC sink operator. By default, the parallelism is determined by the framework using the same parallelism of the upstream chained operator.</td>
diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactory.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactory.java
index dc5051d..89af619 100644
--- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactory.java
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactory.java
@@ -42,6 +42,8 @@ import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Optional;
 import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.DRIVER;
 import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.LOOKUP_CACHE_MAX_ROWS;
@@ -216,6 +218,27 @@ public class JdbcDynamicTableFactory implements DynamicTableSourceFactory, Dynam
         return optionalOptions;
     }
 
+    @Override
+    public Set<ConfigOption<?>> forwardOptions() {
+        return Stream.of(
+                        URL,
+                        TABLE_NAME,
+                        USERNAME,
+                        PASSWORD,
+                        DRIVER,
+                        SINK_BUFFER_FLUSH_MAX_ROWS,
+                        SINK_BUFFER_FLUSH_INTERVAL,
+                        SINK_MAX_RETRIES,
+                        MAX_RETRY_TIMEOUT,
+                        SCAN_FETCH_SIZE,
+                        SCAN_AUTO_COMMIT,
+                        LOOKUP_CACHE_MAX_ROWS,
+                        LOOKUP_CACHE_TTL,
+                        LOOKUP_MAX_RETRIES,
+                        LOOKUP_CACHE_MISSING_KEY)
+                .collect(Collectors.toSet());
+    }
+
     private void validateConfigOptions(ReadableConfig config) {
         String jdbcUrl = config.get(URL);
         JdbcDialectLoader.load(jdbcUrl);

[flink] 07/09: [FLINK-25391][format-avro] Forward catalog table options

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8f77862fa5ecbec5ee26b7b2b68478ad50943a3e
Author: slinkydeveloper <fr...@gmail.com>
AuthorDate: Thu Jan 6 16:37:17 2022 +0100

    [FLINK-25391][format-avro] Forward catalog table options
---
 .../docs/connectors/table/formats/avro-confluent.md   | 15 ++++++++++++++-
 docs/content/docs/connectors/table/formats/avro.md    |  5 ++++-
 .../registry/confluent/RegistryAvroFormatFactory.java | 19 +++++++++++++++++++
 .../flink/formats/avro/AvroFileFormatFactory.java     |  5 +++++
 4 files changed, 42 insertions(+), 2 deletions(-)

diff --git a/docs/content/docs/connectors/table/formats/avro-confluent.md b/docs/content/docs/connectors/table/formats/avro-confluent.md
index cf2fe70..28b33da 100644
--- a/docs/content/docs/connectors/table/formats/avro-confluent.md
+++ b/docs/content/docs/connectors/table/formats/avro-confluent.md
@@ -176,15 +176,17 @@ Format Options
       <tr>
         <th class="text-left" style="width: 25%">Option</th>
         <th class="text-center" style="width: 8%">Required</th>
+        <th class="text-center" style="width: 8%">Forwarded</th>
         <th class="text-center" style="width: 7%">Default</th>
         <th class="text-center" style="width: 10%">Type</th>
-        <th class="text-center" style="width: 50%">Description</th>
+        <th class="text-center" style="width: 42%">Description</th>
       </tr>
     </thead>
     <tbody>
         <tr>
             <td><h5>format</h5></td>
             <td>required</td>
+            <td>no</td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>String</td>
             <td>Specify what format to use, here should be <code>'avro-confluent'</code>.</td>
@@ -192,6 +194,7 @@ Format Options
         <tr>
             <td><h5>avro-confluent.basic-auth.credentials-source</h5></td>
             <td>optional</td>
+            <td>yes</td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>String</td>
             <td>Basic auth credentials source for Schema Registry</td>
@@ -199,6 +202,7 @@ Format Options
         <tr>
             <td><h5>avro-confluent.basic-auth.user-info</h5></td>
             <td>optional</td>
+            <td>yes</td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>String</td>
             <td>Basic auth user info for schema registry</td>
@@ -206,6 +210,7 @@ Format Options
         <tr>
             <td><h5>avro-confluent.bearer-auth.credentials-source</h5></td>
             <td>optional</td>
+            <td>yes</td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>String</td>
             <td>Bearer auth credentials source for Schema Registry</td>
@@ -213,6 +218,7 @@ Format Options
         <tr>
             <td><h5>avro-confluent.bearer-auth.token</h5></td>
             <td>optional</td>
+            <td>yes</td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>String</td>
             <td>Bearer auth token for Schema Registry</td>
@@ -220,6 +226,7 @@ Format Options
         <tr>
             <td><h5>avro-confluent.properties</h5></td>
             <td>optional</td>
+            <td>yes</td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>Map</td>
             <td>Properties map that is forwarded to the underlying Schema Registry. This is useful for options that are not officially exposed via Flink config options. However, note that Flink options have higher precedence.</td>
@@ -227,6 +234,7 @@ Format Options
         <tr>
             <td><h5>avro-confluent.ssl.keystore.location</h5></td>
             <td>optional</td>
+            <td>yes</td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>String</td>
             <td>Location / File of SSL keystore</td>
@@ -234,6 +242,7 @@ Format Options
         <tr>
             <td><h5>avro-confluent.ssl.keystore.password</h5></td>
             <td>optional</td>
+            <td>yes</td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>String</td>
             <td>Password for SSL keystore</td>
@@ -241,6 +250,7 @@ Format Options
         <tr>
             <td><h5>avro-confluent.ssl.truststore.location</h5></td>
             <td>optional</td>
+            <td>yes</td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>String</td>
             <td>Location / File of SSL truststore</td>
@@ -248,6 +258,7 @@ Format Options
         <tr>
             <td><h5>avro-confluent.ssl.truststore.password</h5></td>
             <td>optional</td>
+            <td>yes</td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>String</td>
             <td>Password for SSL truststore</td>
@@ -255,6 +266,7 @@ Format Options
         <tr>
             <td><h5>avro-confluent.subject</h5></td>
             <td>optional</td>
+            <td>yes</td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>String</td>
             <td>The Confluent Schema Registry subject under which to register the schema used by this format during serialization. By default, 'kafka' and 'upsert-kafka' connectors use '&lt;topic_name&gt;-value' or '&lt;topic_name&gt;-key' as the default subject name if this format is used as the value or key format. But for other connectors (e.g. 'filesystem'), the subject option is required when used as sink.</td>
@@ -262,6 +274,7 @@ Format Options
         <tr>
             <td><h5>avro-confluent.url</h5></td>
             <td>required</td>
+            <td>yes</td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>String</td>
             <td>The URL of the Confluent Schema Registry to fetch/register schemas.</td>
diff --git a/docs/content/docs/connectors/table/formats/avro.md b/docs/content/docs/connectors/table/formats/avro.md
index 341ca0a..601a9dc 100644
--- a/docs/content/docs/connectors/table/formats/avro.md
+++ b/docs/content/docs/connectors/table/formats/avro.md
@@ -65,15 +65,17 @@ Format Options
       <tr>
         <th class="text-left" style="width: 25%">Option</th>
         <th class="text-center" style="width: 8%">Required</th>
+        <th class="text-center" style="width: 8%">Forwarded</th>
         <th class="text-center" style="width: 7%">Default</th>
         <th class="text-center" style="width: 10%">Type</th>
-        <th class="text-center" style="width: 50%">Description</th>
+        <th class="text-center" style="width: 42%">Description</th>
       </tr>
     </thead>
     <tbody>
     <tr>
       <td><h5>format</h5></td>
       <td>required</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>Specify what format to use, here should be <code>'avro'</code>.</td>
@@ -81,6 +83,7 @@ Format Options
     <tr>
       <td><h5>avro.codec</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>For <a href="{{< ref "docs/connectors/table/filesystem" >}}">Filesystem</a> only, the compression codec for avro. Snappy compression as default. The valid enumerations are: null, deflate, snappy, bzip2, xz.</td>
diff --git a/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactory.java b/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactory.java
index 659160b..4030168 100644
--- a/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactory.java
+++ b/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactory.java
@@ -52,6 +52,8 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.BASIC_AUTH_CREDENTIALS_SOURCE;
 import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.BASIC_AUTH_USER_INFO;
@@ -175,6 +177,23 @@ public class RegistryAvroFormatFactory
         return options;
     }
 
+    @Override
+    public Set<ConfigOption<?>> forwardOptions() {
+        return Stream.of(
+                        URL,
+                        SUBJECT,
+                        PROPERTIES,
+                        SSL_KEYSTORE_LOCATION,
+                        SSL_KEYSTORE_PASSWORD,
+                        SSL_TRUSTSTORE_LOCATION,
+                        SSL_TRUSTSTORE_PASSWORD,
+                        BASIC_AUTH_CREDENTIALS_SOURCE,
+                        BASIC_AUTH_USER_INFO,
+                        BEARER_AUTH_CREDENTIALS_SOURCE,
+                        BEARER_AUTH_TOKEN)
+                .collect(Collectors.toSet());
+    }
+
     public static @Nullable Map<String, String> buildOptionalPropertiesMap(
             ReadableConfig formatOptions) {
         final Map<String, String> properties = new HashMap<>();
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFileFormatFactory.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFileFormatFactory.java
index 630d990..e188405 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFileFormatFactory.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFileFormatFactory.java
@@ -116,6 +116,11 @@ public class AvroFileFormatFactory implements BulkReaderFormatFactory, BulkWrite
         return options;
     }
 
+    @Override
+    public Set<ConfigOption<?>> forwardOptions() {
+        return optionalOptions();
+    }
+
     private static class AvroGenericRecordBulkFormat
             extends AbstractAvroBulkFormat<GenericRecord, RowData, FileSourceSplit> {
 

[flink] 03/09: [FLINK-25391][connector-files] Forward catalog table options

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c9260311637ad47a6e67f154c629ddd49d9f262a
Author: slinkydeveloper <fr...@gmail.com>
AuthorDate: Thu Jan 6 14:52:50 2022 +0100

    [FLINK-25391][connector-files] Forward catalog table options
---
 docs/content/docs/connectors/table/filesystem.md   | 80 +++++++++++++++++-----
 .../file/table/AbstractFileSystemTable.java        | 24 +++----
 .../file/table/FileSystemTableFactory.java         | 29 +++++++-
 .../connector/file/table/FileSystemTableSink.java  | 21 ++++--
 .../file/table/FileSystemTableSource.java          | 25 ++++---
 5 files changed, 131 insertions(+), 48 deletions(-)

diff --git a/docs/content/docs/connectors/table/filesystem.md b/docs/content/docs/connectors/table/filesystem.md
index 24dfa11..e2c0aeb 100644
--- a/docs/content/docs/connectors/table/filesystem.md
+++ b/docs/content/docs/connectors/table/filesystem.md
@@ -208,21 +208,27 @@ a timeout that specifies the maximum duration for which a file can be open.
 <table class="table table-bordered">
   <thead>
     <tr>
-        <th class="text-left" style="width: 20%">Key</th>
-        <th class="text-left" style="width: 15%">Default</th>
+        <th class="text-left" style="width: 25%">Option</th>
+        <th class="text-left" style="width: 8%">Required</th>
+        <th class="text-left" style="width: 8%">Forwarded</th>
+        <th class="text-left" style="width: 7%">Default</th>
         <th class="text-left" style="width: 10%">Type</th>
-        <th class="text-left" style="width: 55%">Description</th>
+        <th class="text-left" style="width: 42%">Description</th>
     </tr>
   </thead>
   <tbody>
     <tr>
         <td><h5>sink.rolling-policy.file-size</h5></td>
+        <td>optional</td>
+        <td>yes</td>
         <td style="word-wrap: break-word;">128MB</td>
         <td>MemorySize</td>
         <td>The maximum part file size before rolling.</td>
     </tr>
     <tr>
         <td><h5>sink.rolling-policy.rollover-interval</h5></td>
+        <td>optional</td>
+        <td>yes</td>
         <td style="word-wrap: break-word;">30 min</td>
         <td>Duration</td>
         <td>The maximum time duration a part file can stay open before rolling (by default 30 min to avoid to many small files).
@@ -230,6 +236,8 @@ a timeout that specifies the maximum duration for which a file can be open.
     </tr>
     <tr>
         <td><h5>sink.rolling-policy.check-interval</h5></td>
+        <td>optional</td>
+        <td>yes</td>
         <td style="word-wrap: break-word;">1 min</td>
         <td>Duration</td>
         <td>The interval for checking time based rolling policies. This controls the frequency to check whether a part file should rollover based on 'sink.rolling-policy.rollover-interval'.</td>
@@ -250,21 +258,27 @@ The file sink supports file compactions, which allows applications to have small
 <table class="table table-bordered">
   <thead>
     <tr>
-        <th class="text-left" style="width: 20%">Key</th>
-        <th class="text-left" style="width: 15%">Default</th>
+        <th class="text-left" style="width: 25%">Option</th>
+        <th class="text-left" style="width: 8%">Required</th>
+        <th class="text-left" style="width: 8%">Forwarded</th>
+        <th class="text-left" style="width: 7%">Default</th>
         <th class="text-left" style="width: 10%">Type</th>
-        <th class="text-left" style="width: 55%">Description</th>
+        <th class="text-left" style="width: 42%">Description</th>
     </tr>
   </thead>
   <tbody>
     <tr>
         <td><h5>auto-compaction</h5></td>
+        <td>optional</td>
+        <td>no</td>
         <td style="word-wrap: break-word;">false</td>
         <td>Boolean</td>
         <td>Whether to enable automatic compaction in streaming sink or not. The data will be written to temporary files. After the checkpoint is completed, the temporary files generated by a checkpoint will be compacted. The temporary files are invisible before compaction.</td>
     </tr>
     <tr>
         <td><h5>compaction.file-size</h5></td>
+        <td>optional</td>
+        <td>yes</td>
         <td style="word-wrap: break-word;">(none)</td>
         <td>MemorySize</td>
         <td>The compaction target file size, the default value is the rolling file size.</td>
@@ -294,27 +308,35 @@ To define when to commit a partition, providing partition commit trigger:
 <table class="table table-bordered">
   <thead>
     <tr>
-        <th class="text-left" style="width: 20%">Key</th>
-        <th class="text-left" style="width: 15%">Default</th>
+        <th class="text-left" style="width: 25%">Option</th>
+        <th class="text-left" style="width: 8%">Required</th>
+        <th class="text-left" style="width: 8%">Forwarded</th>
+        <th class="text-left" style="width: 7%">Default</th>
         <th class="text-left" style="width: 10%">Type</th>
-        <th class="text-left" style="width: 55%">Description</th>
+        <th class="text-left" style="width: 42%">Description</th>
     </tr>
   </thead>
   <tbody>
     <tr>
         <td><h5>sink.partition-commit.trigger</h5></td>
+        <td>optional</td>
+        <td>yes</td>
         <td style="word-wrap: break-word;">process-time</td>
         <td>String</td>
         <td>Trigger type for partition commit: 'process-time': based on the time of the machine, it neither requires partition time extraction nor watermark generation. Commit partition once the 'current system time' passes 'partition creation system time' plus 'delay'. 'partition-time': based on the time that extracted from partition values, it requires watermark generation. Commit partition once the 'watermark' passes 'time extracted from partition values' plus 'delay'.</td>
     </tr>
     <tr>
         <td><h5>sink.partition-commit.delay</h5></td>
+        <td>optional</td>
+        <td>yes</td>
         <td style="word-wrap: break-word;">0 s</td>
         <td>Duration</td>
         <td>The partition will not commit until the delay time. If it is a daily partition, should be '1 d', if it is a hourly partition, should be '1 h'.</td>
     </tr>
     <tr>
         <td><h5>sink.partition-commit.watermark-time-zone</h5></td>
+        <td>optional</td>
+        <td>yes</td>
         <td style="word-wrap: break-word;">UTC</td>
         <td>String</td>
         <td>The time zone to parse the long watermark value to TIMESTAMP value, the parsed watermark timestamp is used to compare with partition time to decide the partition should commit or not. This option is only take effect when `sink.partition-commit.trigger` is set to 'partition-time'. If this option is not configured correctly, e.g. source rowtime is defined on TIMESTAMP_LTZ column, but this config is not configured, then users may see the partition committed after a few hours. Th [...]
@@ -356,33 +378,43 @@ Time extractors define extracting time from partition values.
 <table class="table table-bordered">
   <thead>
     <tr>
-        <th class="text-left" style="width: 20%">Key</th>
-        <th class="text-left" style="width: 15%">Default</th>
+        <th class="text-left" style="width: 25%">Option</th>
+        <th class="text-left" style="width: 8%">Required</th>
+        <th class="text-left" style="width: 8%">Forwarded</th>
+        <th class="text-left" style="width: 7%">Default</th>
         <th class="text-left" style="width: 10%">Type</th>
-        <th class="text-left" style="width: 55%">Description</th>
+        <th class="text-left" style="width: 42%">Description</th>
     </tr>
   </thead>
   <tbody>
     <tr>
         <td><h5>partition.time-extractor.kind</h5></td>
+        <td>optional</td>
+        <td>no</td>
         <td style="word-wrap: break-word;">default</td>
         <td>String</td>
         <td>Time extractor to extract time from partition values. Support default and custom. For default, can configure timestamp pattern\formatter. For custom, should configure extractor class.</td>
     </tr>
     <tr>
         <td><h5>partition.time-extractor.class</h5></td>
+        <td>optional</td>
+        <td>no</td>
         <td style="word-wrap: break-word;">(none)</td>
         <td>String</td>
         <td>The extractor class for implement PartitionTimeExtractor interface.</td>
     </tr>
     <tr>
         <td><h5>partition.time-extractor.timestamp-pattern</h5></td>
+        <td>optional</td>
+        <td>no</td>
         <td style="word-wrap: break-word;">(none)</td>
         <td>String</td>
         <td>The 'default' construction way allows users to use partition fields to get a legal timestamp pattern. Default support 'yyyy-MM-dd hh:mm:ss' from first field. If timestamp should be extracted from a single partition field 'dt', can configure: '$dt'. If timestamp should be extracted from multiple partition fields, say 'year', 'month', 'day' and 'hour', can configure: '$year-$month-$day $hour:00:00'. If timestamp should be extracted from two partition fields 'dt' and 'hour', can [...]
     </tr>
     <tr>
         <td><h5>partition.time-extractor.timestamp-formatter</h5></td>
+        <td>optional</td>
+        <td>no</td>
         <td style="word-wrap: break-word;">yyyy-MM-dd&nbsp;HH:mm:ss</td>
         <td>String</td>
         <td>The formatter that formats the partition timestamp string value to timestamp, the partition timestamp string value is expressed by 'partition.time-extractor.timestamp-pattern'. For example, the partition timestamp is extracted from multiple partition fields, say 'year', 'month' and 'day', you can configure 'partition.time-extractor.timestamp-pattern' to '$year$month$day', and configure `partition.time-extractor.timestamp-formatter` to 'yyyyMMdd'. By default the formatter is ' [...]
@@ -417,27 +449,35 @@ The partition commit policy defines what action is taken when partitions are com
 <table class="table table-bordered">
   <thead>
     <tr>
-        <th class="text-left" style="width: 20%">Key</th>
-        <th class="text-left" style="width: 15%">Default</th>
+        <th class="text-left" style="width: 25%">Option</th>
+        <th class="text-left" style="width: 8%">Required</th>
+        <th class="text-left" style="width: 8%">Forwarded</th>
+        <th class="text-left" style="width: 7%">Default</th>
         <th class="text-left" style="width: 10%">Type</th>
-        <th class="text-left" style="width: 55%">Description</th>
+        <th class="text-left" style="width: 42%">Description</th>
     </tr>
   </thead>
   <tbody>
     <tr>
         <td><h5>sink.partition-commit.policy.kind</h5></td>
+        <td>optional</td>
+        <td>yes</td>
         <td style="word-wrap: break-word;">(none)</td>
         <td>String</td>
         <td>Policy to commit a partition is to notify the downstream application that the partition has finished writing, the partition is ready to be read. metastore: add partition to metastore. Only hive table supports metastore policy, file system manages partitions through directory structure. success-file: add '_success' file to directory. Both can be configured at the same time: 'metastore,success-file'. custom: use policy class to create a commit policy. Support to configure multi [...]
     </tr>
     <tr>
         <td><h5>sink.partition-commit.policy.class</h5></td>
+        <td>optional</td>
+        <td>yes</td>
         <td style="word-wrap: break-word;">(none)</td>
         <td>String</td>
         <td>The partition commit policy class for implement PartitionCommitPolicy interface. Only work in custom commit policy.</td>
     </tr>
     <tr>
         <td><h5>sink.partition-commit.success-file.name</h5></td>
+        <td>optional</td>
+        <td>yes</td>
         <td style="word-wrap: break-word;">_SUCCESS</td>
         <td>String</td>
         <td>The file name for success-file partition commit policy, default is '_SUCCESS'.</td>
@@ -482,15 +522,19 @@ The parallelism of writing files into external file system (including Hive) can
 <table class="table table-bordered">
   <thead>
     <tr>
-        <th class="text-left" style="width: 20%">Key</th>
-        <th class="text-left" style="width: 15%">Default</th>
+        <th class="text-left" style="width: 25%">Option</th>
+        <th class="text-left" style="width: 8%">Required</th>
+        <th class="text-left" style="width: 8%">Forwarded</th>
+        <th class="text-left" style="width: 7%">Default</th>
         <th class="text-left" style="width: 10%">Type</th>
-        <th class="text-left" style="width: 55%">Description</th>
+        <th class="text-left" style="width: 42%">Description</th>
     </tr>
   </thead>
   <tbody>
     <tr>
         <td><h5>sink.parallelism</h5></td>
+        <td>optional</td>
+        <td>no</td>
         <td style="word-wrap: break-word;">(none)</td>
         <td>Integer</td>
         <td>Parallelism of writing files into external file system. The value should greater than zero otherwise exception will be thrown.</td>
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/AbstractFileSystemTable.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/AbstractFileSystemTable.java
index f78bee6..265c04a 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/AbstractFileSystemTable.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/AbstractFileSystemTable.java
@@ -25,7 +25,6 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ResolvedSchema;
-import org.apache.flink.table.factories.DynamicTableFactory;
 import org.apache.flink.table.types.DataType;
 
 import java.util.List;
@@ -34,7 +33,6 @@ import java.util.stream.Collectors;
 /** Abstract File system table for providing some common methods. */
 abstract class AbstractFileSystemTable {
 
-    final DynamicTableFactory.Context context;
     final ObjectIdentifier tableIdentifier;
     final Configuration tableOptions;
     final ResolvedSchema schema;
@@ -43,16 +41,18 @@ abstract class AbstractFileSystemTable {
 
     List<String> partitionKeys;
 
-    AbstractFileSystemTable(DynamicTableFactory.Context context) {
-        this.context = context;
-        this.tableIdentifier = context.getObjectIdentifier();
-        this.tableOptions = new Configuration();
-        context.getCatalogTable().getOptions().forEach(tableOptions::setString);
-        this.schema = context.getCatalogTable().getResolvedSchema();
-        this.path = new Path(tableOptions.get(FileSystemConnectorOptions.PATH));
-        this.defaultPartName = tableOptions.get(FileSystemConnectorOptions.PARTITION_DEFAULT_NAME);
-
-        this.partitionKeys = context.getCatalogTable().getPartitionKeys();
+    AbstractFileSystemTable(
+            ObjectIdentifier tableIdentifier,
+            ResolvedSchema schema,
+            List<String> partitionKeys,
+            ReadableConfig tableOptions) {
+        this.tableIdentifier = tableIdentifier;
+        this.tableOptions = (Configuration) tableOptions;
+        this.schema = schema;
+        this.path = new Path(this.tableOptions.get(FileSystemConnectorOptions.PATH));
+        this.defaultPartName =
+                this.tableOptions.get(FileSystemConnectorOptions.PARTITION_DEFAULT_NAME);
+        this.partitionKeys = partitionKeys;
     }
 
     ReadableConfig formatOptions(String identifier) {
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableFactory.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableFactory.java
index 952c522..86a4ed3 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableFactory.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableFactory.java
@@ -45,6 +45,7 @@ import java.util.List;
 import java.util.ServiceLoader;
 import java.util.Set;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static java.time.ZoneId.SHORT_IDS;
 
@@ -70,7 +71,10 @@ public class FileSystemTableFactory implements DynamicTableSourceFactory, Dynami
         FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
         validate(helper);
         return new FileSystemTableSource(
-                context,
+                context.getObjectIdentifier(),
+                context.getCatalogTable().getResolvedSchema(),
+                context.getCatalogTable().getPartitionKeys(),
+                helper.getOptions(),
                 discoverDecodingFormat(context, BulkReaderFormatFactory.class),
                 discoverDecodingFormat(context, DeserializationFormatFactory.class),
                 discoverFormatFactory(context));
@@ -81,7 +85,10 @@ public class FileSystemTableFactory implements DynamicTableSourceFactory, Dynami
         FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
         validate(helper);
         return new FileSystemTableSink(
-                context,
+                context.getObjectIdentifier(),
+                context.getCatalogTable().getResolvedSchema(),
+                context.getCatalogTable().getPartitionKeys(),
+                helper.getOptions(),
                 discoverDecodingFormat(context, BulkReaderFormatFactory.class),
                 discoverDecodingFormat(context, DeserializationFormatFactory.class),
                 discoverFormatFactory(context),
@@ -122,6 +129,24 @@ public class FileSystemTableFactory implements DynamicTableSourceFactory, Dynami
         return options;
     }
 
+    @Override
+    public Set<ConfigOption<?>> forwardOptions() {
+        return Stream.of(
+                        FileSystemConnectorOptions.PATH,
+                        FileSystemConnectorOptions.PARTITION_DEFAULT_NAME,
+                        FileSystemConnectorOptions.SINK_ROLLING_POLICY_FILE_SIZE,
+                        FileSystemConnectorOptions.SINK_ROLLING_POLICY_ROLLOVER_INTERVAL,
+                        FileSystemConnectorOptions.SINK_ROLLING_POLICY_CHECK_INTERVAL,
+                        FileSystemConnectorOptions.SINK_PARTITION_COMMIT_TRIGGER,
+                        FileSystemConnectorOptions.SINK_PARTITION_COMMIT_DELAY,
+                        FileSystemConnectorOptions.SINK_PARTITION_COMMIT_WATERMARK_TIME_ZONE,
+                        FileSystemConnectorOptions.SINK_PARTITION_COMMIT_POLICY_KIND,
+                        FileSystemConnectorOptions.SINK_PARTITION_COMMIT_POLICY_CLASS,
+                        FileSystemConnectorOptions.SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME,
+                        FileSystemConnectorOptions.COMPACTION_FILE_SIZE)
+                .collect(Collectors.toSet());
+    }
+
     private void validate(FactoryUtil.TableFactoryHelper helper) {
         // Except format options, some formats like parquet and orc can not list all supported
         // options.
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java
index 09705cd..6708b60 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java
@@ -55,6 +55,8 @@ import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.format.DecodingFormat;
 import org.apache.flink.table.connector.format.EncodingFormat;
@@ -65,7 +67,6 @@ import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.expressions.ResolvedExpression;
-import org.apache.flink.table.factories.DynamicTableFactory;
 import org.apache.flink.table.factories.FactoryUtil;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
@@ -117,20 +118,22 @@ public class FileSystemTableSink extends AbstractFileSystemTable
     @Nullable private Integer configuredParallelism;
 
     FileSystemTableSink(
-            DynamicTableFactory.Context context,
+            ObjectIdentifier tableIdentifier,
+            ResolvedSchema schema,
+            List<String> partitionKeys,
+            ReadableConfig tableOptions,
             @Nullable DecodingFormat<BulkFormat<RowData, FileSourceSplit>> bulkReaderFormat,
             @Nullable DecodingFormat<DeserializationSchema<RowData>> deserializationFormat,
             @Nullable FileSystemFormatFactory formatFactory,
             @Nullable EncodingFormat<BulkWriter.Factory<RowData>> bulkWriterFormat,
             @Nullable EncodingFormat<SerializationSchema<RowData>> serializationFormat) {
-        super(context);
+        super(tableIdentifier, schema, partitionKeys, tableOptions);
         this.bulkReaderFormat = bulkReaderFormat;
         this.deserializationFormat = deserializationFormat;
         this.formatFactory = formatFactory;
         if (Stream.of(bulkWriterFormat, serializationFormat, formatFactory)
                 .allMatch(Objects::isNull)) {
-            Configuration options = Configuration.fromMap(context.getCatalogTable().getOptions());
-            String identifier = options.get(FactoryUtil.FORMAT);
+            String identifier = tableOptions.get(FactoryUtil.FORMAT);
             throw new ValidationException(
                     String.format(
                             "Could not find any format factory for identifier '%s' in the classpath.",
@@ -138,7 +141,8 @@ public class FileSystemTableSink extends AbstractFileSystemTable
         }
         this.bulkWriterFormat = bulkWriterFormat;
         this.serializationFormat = serializationFormat;
-        this.configuredParallelism = tableOptions.get(FileSystemConnectorOptions.SINK_PARALLELISM);
+        this.configuredParallelism =
+                this.tableOptions.get(FileSystemConnectorOptions.SINK_PARALLELISM);
     }
 
     @Override
@@ -545,7 +549,10 @@ public class FileSystemTableSink extends AbstractFileSystemTable
     public DynamicTableSink copy() {
         FileSystemTableSink sink =
                 new FileSystemTableSink(
-                        context,
+                        tableIdentifier,
+                        schema,
+                        partitionKeys,
+                        tableOptions,
                         bulkReaderFormat,
                         deserializationFormat,
                         formatFactory,
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java
index 5287f6f..f8cb703 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java
@@ -22,7 +22,6 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.java.io.CollectionInputFormat;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.connector.file.src.FileSource;
 import org.apache.flink.connector.file.src.FileSourceSplit;
@@ -35,6 +34,8 @@ import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.Projection;
 import org.apache.flink.table.connector.format.DecodingFormat;
@@ -52,7 +53,6 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.expressions.ResolvedExpression;
-import org.apache.flink.table.factories.DynamicTableFactory;
 import org.apache.flink.table.factories.FactoryUtil;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.utils.PartitionPathUtils;
@@ -97,15 +97,17 @@ public class FileSystemTableSource extends AbstractFileSystemTable
     private DataType producedDataType;
 
     public FileSystemTableSource(
-            DynamicTableFactory.Context context,
+            ObjectIdentifier tableIdentifier,
+            ResolvedSchema schema,
+            List<String> partitionKeys,
+            ReadableConfig tableOptions,
             @Nullable DecodingFormat<BulkFormat<RowData, FileSourceSplit>> bulkReaderFormat,
             @Nullable DecodingFormat<DeserializationSchema<RowData>> deserializationFormat,
             @Nullable FileSystemFormatFactory formatFactory) {
-        super(context);
+        super(tableIdentifier, schema, partitionKeys, tableOptions);
         if (Stream.of(bulkReaderFormat, deserializationFormat, formatFactory)
                 .allMatch(Objects::isNull)) {
-            Configuration options = Configuration.fromMap(context.getCatalogTable().getOptions());
-            String identifier = options.get(FactoryUtil.FORMAT);
+            String identifier = tableOptions.get(FactoryUtil.FORMAT);
             throw new ValidationException(
                     String.format(
                             "Could not find any format factory for identifier '%s' in the classpath.",
@@ -114,8 +116,7 @@ public class FileSystemTableSource extends AbstractFileSystemTable
         this.bulkReaderFormat = bulkReaderFormat;
         this.deserializationFormat = deserializationFormat;
         this.formatFactory = formatFactory;
-
-        this.producedDataType = context.getPhysicalRowDataType();
+        this.producedDataType = schema.toPhysicalRowDataType();
     }
 
     @Override
@@ -410,7 +411,13 @@ public class FileSystemTableSource extends AbstractFileSystemTable
     public FileSystemTableSource copy() {
         FileSystemTableSource source =
                 new FileSystemTableSource(
-                        context, bulkReaderFormat, deserializationFormat, formatFactory);
+                        tableIdentifier,
+                        schema,
+                        partitionKeys,
+                        tableOptions,
+                        bulkReaderFormat,
+                        deserializationFormat,
+                        formatFactory);
         source.partitionKeys = partitionKeys;
         source.remainingPartitions = remainingPartitions;
         source.filters = filters;

[flink] 06/09: [FLINK-25391][connector-hbase] Forward catalog table options

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5175ed0d48835344c1cd4282372d6b01571d914b
Author: slinkydeveloper <fr...@gmail.com>
AuthorDate: Thu Jan 6 16:35:09 2022 +0100

    [FLINK-25391][connector-hbase] Forward catalog table options
---
 docs/content/docs/connectors/table/hbase.md        | 17 ++++++++++++-
 .../hbase1/HBase1DynamicTableFactory.java          | 29 ++++++++++++++++------
 .../hbase2/HBase2DynamicTableFactory.java          | 27 ++++++++++++++------
 .../hbase/table/HBaseConnectorOptionsUtil.java     | 18 ++++++--------
 4 files changed, 65 insertions(+), 26 deletions(-)

diff --git a/docs/content/docs/connectors/table/hbase.md b/docs/content/docs/connectors/table/hbase.md
index 86d45e5..21436cd 100644
--- a/docs/content/docs/connectors/table/hbase.md
+++ b/docs/content/docs/connectors/table/hbase.md
@@ -82,15 +82,17 @@ Connector Options
       <tr>
         <th class="text-left" style="width: 25%">Option</th>
         <th class="text-center" style="width: 8%">Required</th>
+        <th class="text-center" style="width: 8%">Forwarded</th>
         <th class="text-center" style="width: 7%">Default</th>
         <th class="text-center" style="width: 10%">Type</th>
-        <th class="text-center" style="width: 50%">Description</th>
+        <th class="text-center" style="width: 42%">Description</th>
       </tr>
     </thead>
     <tbody>
     <tr>
       <td><h5>connector</h5></td>
       <td>required</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>Specify what connector to use, valid values are:
@@ -103,6 +105,7 @@ Connector Options
     <tr>
       <td><h5>table-name</h5></td>
       <td>required</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>The name of HBase table to connect. By default, the table is in 'default' namespace. To assign the table a specified namespace you need to use 'namespace:table'.</td>
@@ -110,6 +113,7 @@ Connector Options
     <tr>
       <td><h5>zookeeper.quorum</h5></td>
       <td>required</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>The HBase Zookeeper quorum.</td>
@@ -117,6 +121,7 @@ Connector Options
     <tr>
       <td><h5>zookeeper.znode.parent</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">/hbase</td>
       <td>String</td>
       <td>The root dir in Zookeeper for HBase cluster.</td>
@@ -124,6 +129,7 @@ Connector Options
     <tr>
       <td><h5>null-string-literal</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">null</td>
       <td>String</td>
       <td>Representation for null values for string fields. HBase source and sink encodes/decodes empty bytes as null values for all types except string type.</td>
@@ -131,6 +137,7 @@ Connector Options
     <tr>
       <td><h5>sink.buffer-flush.max-size</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">2mb</td>
       <td>MemorySize</td>
       <td>Writing option, maximum size in memory of buffered rows for each writing request.
@@ -141,6 +148,7 @@ Connector Options
     <tr>
       <td><h5>sink.buffer-flush.max-rows</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">1000</td>
       <td>Integer</td>
       <td>Writing option, maximum number of rows to buffer for each writing request.
@@ -151,6 +159,7 @@ Connector Options
     <tr>
       <td><h5>sink.buffer-flush.interval</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">1s</td>
       <td>Duration</td>
       <td>Writing option, the interval to flush any buffered rows.
@@ -162,6 +171,7 @@ Connector Options
     <tr>
       <td><h5>sink.parallelism</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>Integer</td>
       <td>Defines the parallelism of the HBase sink operator. By default, the parallelism is determined by the framework using the same parallelism of the upstream chained operator.</td>
@@ -169,6 +179,7 @@ Connector Options
     <tr>
       <td><h5>lookup.async</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">false</td>
       <td>Boolean</td>
       <td>Whether async lookup are enabled. If true, the lookup will be async. Note, async only supports hbase-2.2 connector.</td>
@@ -176,6 +187,7 @@ Connector Options
     <tr>
       <td><h5>lookup.cache.max-rows</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">-1</td>
       <td>Long</td>
       <td>The max number of rows of lookup cache, over this value, the oldest rows will be expired. Note, "lookup.cache.max-rows" and "lookup.cache.ttl" options must all be specified if any of them is specified. Lookup cache is disabled by default.</td>
@@ -183,6 +195,7 @@ Connector Options
     <tr>
       <td><h5>lookup.cache.ttl</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">0 s</td>
       <td>Duration</td>
       <td>The max time to live for each rows in lookup cache, over this time, the oldest rows will be expired. Note, "cache.max-rows" and "cache.ttl" options must all be specified if any of them is specified.Lookup cache is disabled by default.</td>
@@ -190,6 +203,7 @@ Connector Options
     <tr>
       <td><h5>lookup.max-retries</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">3</td>
       <td>Integer</td>
       <td>The max retry times if lookup database failed.</td>
@@ -197,6 +211,7 @@ Connector Options
     <tr>
       <td><h5>properties.*</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>
diff --git a/flink-connectors/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/HBase1DynamicTableFactory.java b/flink-connectors/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/HBase1DynamicTableFactory.java
index 3454064..6a3e6ba 100644
--- a/flink-connectors/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/HBase1DynamicTableFactory.java
+++ b/flink-connectors/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/HBase1DynamicTableFactory.java
@@ -34,9 +34,11 @@ import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper;
 import org.apache.hadoop.conf.Configuration;
 
 import java.util.HashSet;
-import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
+import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.LOOKUP_ASYNC;
 import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.LOOKUP_CACHE_MAX_ROWS;
 import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.LOOKUP_CACHE_TTL;
 import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.LOOKUP_MAX_RETRIES;
@@ -69,12 +71,10 @@ public class HBase1DynamicTableFactory
 
         final ReadableConfig tableOptions = helper.getOptions();
 
-        Map<String, String> options = context.getCatalogTable().getOptions();
-
         validatePrimaryKey(context.getPhysicalRowDataType(), context.getPrimaryKeyIndexes());
 
         String tableName = tableOptions.get(TABLE_NAME);
-        Configuration hbaseClientConf = getHBaseConfiguration(options);
+        Configuration hbaseClientConf = getHBaseConfiguration(tableOptions);
         String nullStringLiteral = tableOptions.get(NULL_STRING_LITERAL);
         HBaseTableSchema hbaseSchema =
                 HBaseTableSchema.fromDataType(context.getPhysicalRowDataType());
@@ -94,12 +94,10 @@ public class HBase1DynamicTableFactory
 
         final ReadableConfig tableOptions = helper.getOptions();
 
-        Map<String, String> options = context.getCatalogTable().getOptions();
-
         validatePrimaryKey(context.getPhysicalRowDataType(), context.getPrimaryKeyIndexes());
 
         String tableName = tableOptions.get(TABLE_NAME);
-        Configuration hbaseConf = getHBaseConfiguration(options);
+        Configuration hbaseConf = getHBaseConfiguration(tableOptions);
         HBaseWriteOptions hBaseWriteOptions = getHBaseWriteOptions(tableOptions);
         String nullStringLiteral = tableOptions.get(NULL_STRING_LITERAL);
         HBaseTableSchema hbaseSchema =
@@ -131,9 +129,26 @@ public class HBase1DynamicTableFactory
         set.add(SINK_BUFFER_FLUSH_MAX_ROWS);
         set.add(SINK_BUFFER_FLUSH_INTERVAL);
         set.add(SINK_PARALLELISM);
+        set.add(LOOKUP_ASYNC);
         set.add(LOOKUP_CACHE_MAX_ROWS);
         set.add(LOOKUP_CACHE_TTL);
         set.add(LOOKUP_MAX_RETRIES);
         return set;
     }
+
+    @Override
+    public Set<ConfigOption<?>> forwardOptions() {
+        return Stream.of(
+                        TABLE_NAME,
+                        ZOOKEEPER_ZNODE_PARENT,
+                        ZOOKEEPER_QUORUM,
+                        NULL_STRING_LITERAL,
+                        SINK_BUFFER_FLUSH_MAX_SIZE,
+                        SINK_BUFFER_FLUSH_MAX_ROWS,
+                        SINK_BUFFER_FLUSH_INTERVAL,
+                        LOOKUP_CACHE_MAX_ROWS,
+                        LOOKUP_CACHE_TTL,
+                        LOOKUP_MAX_RETRIES)
+                .collect(Collectors.toSet());
+    }
 }
diff --git a/flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/HBase2DynamicTableFactory.java b/flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/HBase2DynamicTableFactory.java
index c9246b6..9b9a525 100644
--- a/flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/HBase2DynamicTableFactory.java
+++ b/flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/HBase2DynamicTableFactory.java
@@ -35,8 +35,9 @@ import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper;
 import org.apache.hadoop.conf.Configuration;
 
 import java.util.HashSet;
-import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.LOOKUP_ASYNC;
 import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.LOOKUP_CACHE_MAX_ROWS;
@@ -71,12 +72,10 @@ public class HBase2DynamicTableFactory
 
         final ReadableConfig tableOptions = helper.getOptions();
 
-        Map<String, String> options = context.getCatalogTable().getOptions();
-
         validatePrimaryKey(context.getPhysicalRowDataType(), context.getPrimaryKeyIndexes());
 
         String tableName = tableOptions.get(TABLE_NAME);
-        Configuration hbaseConf = getHBaseConfiguration(options);
+        Configuration hbaseConf = getHBaseConfiguration(tableOptions);
         HBaseLookupOptions lookupOptions = getHBaseLookupOptions(tableOptions);
         String nullStringLiteral = tableOptions.get(NULL_STRING_LITERAL);
         HBaseTableSchema hbaseSchema =
@@ -93,12 +92,10 @@ public class HBase2DynamicTableFactory
 
         final ReadableConfig tableOptions = helper.getOptions();
 
-        Map<String, String> options = context.getCatalogTable().getOptions();
-
         validatePrimaryKey(context.getPhysicalRowDataType(), context.getPrimaryKeyIndexes());
 
         String tableName = tableOptions.get(TABLE_NAME);
-        Configuration hbaseConf = getHBaseConfiguration(options);
+        Configuration hbaseConf = getHBaseConfiguration(tableOptions);
         HBaseWriteOptions hBaseWriteOptions = getHBaseWriteOptions(tableOptions);
         String nullStringLiteral = tableOptions.get(NULL_STRING_LITERAL);
         HBaseTableSchema hbaseSchema =
@@ -136,4 +133,20 @@ public class HBase2DynamicTableFactory
         set.add(LOOKUP_MAX_RETRIES);
         return set;
     }
+
+    @Override
+    public Set<ConfigOption<?>> forwardOptions() {
+        return Stream.of(
+                        TABLE_NAME,
+                        ZOOKEEPER_ZNODE_PARENT,
+                        ZOOKEEPER_QUORUM,
+                        NULL_STRING_LITERAL,
+                        LOOKUP_CACHE_MAX_ROWS,
+                        LOOKUP_CACHE_TTL,
+                        LOOKUP_MAX_RETRIES,
+                        SINK_BUFFER_FLUSH_MAX_SIZE,
+                        SINK_BUFFER_FLUSH_MAX_ROWS,
+                        SINK_BUFFER_FLUSH_INTERVAL)
+                .collect(Collectors.toSet());
+    }
 }
diff --git a/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptionsUtil.java b/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptionsUtil.java
index 6b9804f..4585610 100644
--- a/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptionsUtil.java
+++ b/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptionsUtil.java
@@ -105,22 +105,18 @@ public class HBaseConnectorOptionsUtil {
         return builder.build();
     }
 
-    /**
-     * config HBase Configuration.
-     *
-     * @param options properties option
-     */
-    public static Configuration getHBaseConfiguration(Map<String, String> options) {
-        org.apache.flink.configuration.Configuration tableOptions =
-                org.apache.flink.configuration.Configuration.fromMap(options);
+    /** config HBase Configuration. */
+    public static Configuration getHBaseConfiguration(ReadableConfig tableOptions) {
         // create default configuration from current runtime env (`hbase-site.xml` in classpath)
         // first,
         Configuration hbaseClientConf = HBaseConfigurationUtil.getHBaseConfiguration();
-        hbaseClientConf.set(HConstants.ZOOKEEPER_QUORUM, tableOptions.getString(ZOOKEEPER_QUORUM));
+        hbaseClientConf.set(HConstants.ZOOKEEPER_QUORUM, tableOptions.get(ZOOKEEPER_QUORUM));
         hbaseClientConf.set(
-                HConstants.ZOOKEEPER_ZNODE_PARENT, tableOptions.getString(ZOOKEEPER_ZNODE_PARENT));
+                HConstants.ZOOKEEPER_ZNODE_PARENT, tableOptions.get(ZOOKEEPER_ZNODE_PARENT));
         // add HBase properties
-        final Properties properties = getHBaseClientProperties(options);
+        final Properties properties =
+                getHBaseClientProperties(
+                        ((org.apache.flink.configuration.Configuration) tableOptions).toMap());
         properties.forEach((k, v) -> hbaseClientConf.set(k.toString(), v.toString()));
         return hbaseClientConf;
     }

[flink] 09/09: [FLINK-25391][format-json] Forward catalog table options

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fa161d3c5636370f5129320a9ca464e38f88fc6f
Author: slinkydeveloper <fr...@gmail.com>
AuthorDate: Wed Jan 12 18:57:48 2022 +0100

    [FLINK-25391][format-json] Forward catalog table options
    
    This closes #18290.
---
 docs/content/docs/connectors/table/formats/json.md             | 10 +++++++++-
 .../java/org/apache/flink/formats/json/JsonFormatFactory.java  |  1 -
 2 files changed, 9 insertions(+), 2 deletions(-)

diff --git a/docs/content/docs/connectors/table/formats/json.md b/docs/content/docs/connectors/table/formats/json.md
index 56b8c84..d9c5e5c 100644
--- a/docs/content/docs/connectors/table/formats/json.md
+++ b/docs/content/docs/connectors/table/formats/json.md
@@ -69,15 +69,17 @@ Format Options
       <tr>
         <th class="text-left" style="width: 25%">Option</th>
         <th class="text-center" style="width: 8%">Required</th>
+        <th class="text-center" style="width: 8%">Forwarded</th>
         <th class="text-center" style="width: 7%">Default</th>
         <th class="text-center" style="width: 10%">Type</th>
-        <th class="text-center" style="width: 50%">Description</th>
+        <th class="text-center" style="width: 42%">Description</th>
       </tr>
     </thead>
     <tbody>
     <tr>
       <td><h5>format</h5></td>
       <td>required</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>Specify what format to use, here should be <code>'json'</code>.</td>
@@ -85,6 +87,7 @@ Format Options
     <tr>
       <td><h5>json.fail-on-missing-field</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">false</td>
       <td>Boolean</td>
       <td>Whether to fail if a field is missing or not.</td>
@@ -92,6 +95,7 @@ Format Options
     <tr>
       <td><h5>json.ignore-parse-errors</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">false</td>
       <td>Boolean</td>
       <td>Skip fields and rows with parse errors instead of failing.
@@ -100,6 +104,7 @@ Format Options
     <tr>
       <td><h5>json.timestamp-format.standard</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;"><code>'SQL'</code></td>
       <td>String</td>
       <td>Specify the input and output timestamp format for <code>TIMESTAMP</code> and <code>TIMESTAMP_LTZ</code> type. Currently supported values are <code>'SQL'</code> and <code>'ISO-8601'</code>:
@@ -114,6 +119,7 @@ Format Options
     <tr>
       <td><h5>json.map-null-key.mode</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;"><code>'FAIL'</code></td>
       <td>String</td>
       <td>Specify the handling mode when serializing null keys for map data. Currently supported values are <code>'FAIL'</code>, <code>'DROP'</code> and <code>'LITERAL'</code>:
@@ -127,6 +133,7 @@ Format Options
     <tr>
       <td><h5>json.map-null-key.literal</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">'null'</td>
       <td>String</td>
       <td>Specify string literal to replace null key when <code>'json.map-null-key.mode'</code> is LITERAL.</td>
@@ -134,6 +141,7 @@ Format Options
     <tr>
       <td><h5>json.encode.decimal-as-plain-number</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">false</td>
       <td>Boolean</td>
       <td>Encode all decimals as plain numbers instead of possible scientific notations. By default, decimals may be written using scientific notation. For example, <code>0.000000027</code> is encoded as <code>2.7E-8</code> by default, and will be written as <code>0.000000027</code> if set this option to true.</td>
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java
index bf2e287..74d8c53 100644
--- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java
@@ -155,7 +155,6 @@ public class JsonFormatFactory implements DeserializationFormatFactory, Serializ
     @Override
     public Set<ConfigOption<?>> forwardOptions() {
         Set<ConfigOption<?>> options = new HashSet<>();
-        options.add(IGNORE_PARSE_ERRORS);
         options.add(TIMESTAMP_FORMAT);
         options.add(MAP_NULL_KEY_MODE);
         options.add(MAP_NULL_KEY_LITERAL);