You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/12/04 09:09:48 UTC

[GitHub] asfgit closed pull request #7208: FLINK-11044 connect docs fix for registerTableSink

asfgit closed pull request #7208: FLINK-11044 connect docs fix for registerTableSink
URL: https://github.com/apache/flink/pull/7208
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/table/connect.md b/docs/dev/table/connect.md
index effd913707e..d8677714fa8 100644
--- a/docs/dev/table/connect.md
+++ b/docs/dev/table/connect.md
@@ -312,7 +312,7 @@ The following timestamp extractors are supported:
     .timestampsFromField("ts_field")    // required: original field name in the input
 )
 
-// Converts the assigned timestamps from a DataStream API record into the rowtime attribute 
+// Converts the assigned timestamps from a DataStream API record into the rowtime attribute
 // and thus preserves the assigned timestamps from the source.
 // This requires a source that assigns timestamps (e.g., Kafka 0.10+).
 .rowtime(
@@ -337,7 +337,7 @@ rowtime:
     type: from-field
     from: "ts_field"                 # required: original field name in the input
 
-# Converts the assigned timestamps from a DataStream API record into the rowtime attribute 
+# Converts the assigned timestamps from a DataStream API record into the rowtime attribute
 # and thus preserves the assigned timestamps from the source.
 rowtime:
   timestamps:
@@ -351,7 +351,7 @@ The following watermark strategies are supported:
 <div class="codetabs" markdown="1">
 <div data-lang="Java/Scala" markdown="1">
 {% highlight java %}
-// Sets a watermark strategy for ascending rowtime attributes. Emits a watermark of the maximum 
+// Sets a watermark strategy for ascending rowtime attributes. Emits a watermark of the maximum
 // observed timestamp so far minus 1. Rows that have a timestamp equal to the max timestamp
 // are not late.
 .rowtime(
@@ -377,7 +377,7 @@ The following watermark strategies are supported:
 
 <div data-lang="YAML" markdown="1">
 {% highlight yaml %}
-# Sets a watermark strategy for ascending rowtime attributes. Emits a watermark of the maximum 
+# Sets a watermark strategy for ascending rowtime attributes. Emits a watermark of the maximum
 # observed timestamp so far minus 1. Rows that have a timestamp equal to the max timestamp
 # are not late.
 rowtime:
@@ -695,7 +695,7 @@ connector:
 
 **Key extraction:** Flink automatically extracts valid keys from a query. For example, a query `SELECT a, b, c FROM t GROUP BY a, b` defines a composite key of the fields `a` and `b`. The Elasticsearch connector generates a document ID string for every row by concatenating all key fields in the order defined in the query using a key delimiter. A custom representation of null literals for key fields can be defined.
 
-<span class="label label-danger">Attention</span> A JSON format defines how to encode documents for the external system, therefore, it must be added as a [dependency](connect.html#formats). 
+<span class="label label-danger">Attention</span> A JSON format defines how to encode documents for the external system, therefore, it must be added as a [dependency](connect.html#formats).
 
 {% top %}
 
@@ -717,8 +717,8 @@ The CSV format allows to read and write comma-separated rows.
   new Csv()
     .field("field1", Types.STRING)    // required: ordered format fields
     .field("field2", Types.TIMESTAMP)
-    .fieldDelimiter(",")              // optional: string delimiter "," by default 
-    .lineDelimiter("\n")              // optional: string delimiter "\n" by default 
+    .fieldDelimiter(",")              // optional: string delimiter "," by default
+    .lineDelimiter("\n")              // optional: string delimiter "\n" by default
     .quoteCharacter('"')              // optional: single character for string values, empty by default
     .commentPrefix('#')               // optional: string to indicate comments, empty by default
     .ignoreFirstLine()                // optional: ignore the first line, by default it is not skipped
@@ -736,8 +736,8 @@ format:
       type: VARCHAR
     - name: field2
       type: TIMESTAMP
-  field-delimiter: ","       # optional: string delimiter "," by default 
-  line-delimiter: "\n"       # optional: string delimiter "\n" by default 
+  field-delimiter: ","       # optional: string delimiter "," by default
+  line-delimiter: "\n"       # optional: string delimiter "\n" by default
   quote-character: '"'       # optional: single character for string values, empty by default
   comment-prefix: '#'        # optional: string to indicate comments, empty by default
   ignore-first-line: false   # optional: boolean flag to ignore the first line, by default it is not skipped
@@ -992,7 +992,7 @@ These are the additional `TableSink`s which are provided with Flink:
 | **Class name** | **Maven dependency** | **Batch?** | **Streaming?** | **Description**
 | `CsvTableSink` | `flink-table` | Y | Append | A simple sink for CSV files.
 | `JDBCAppendTableSink` | `flink-jdbc` | Y | Append | Writes a Table to a JDBC table.
-| `CassandraAppendTableSink` | `flink-connector-cassandra` | N | Append | Writes a Table to a Cassandra table. 
+| `CassandraAppendTableSink` | `flink-connector-cassandra` | N | Append | Writes a Table to a Cassandra table.
 
 ### OrcTableSource
 
@@ -1044,7 +1044,7 @@ val orcTableSource = OrcTableSource.builder()
 
 ### CsvTableSink
 
-The `CsvTableSink` emits a `Table` to one or more CSV files. 
+The `CsvTableSink` emits a `Table` to one or more CSV files.
 
 The sink only supports append-only streaming tables. It cannot be used to emit a `Table` that is continuously updated. See the [documentation on Table to Stream conversions](./streaming/dynamic_tables.html#table-to-stream-conversion) for details. When emitting a streaming table, rows are written at least once (if checkpointing is enabled) and the `CsvTableSink` does not split output files into bucket files but continuously writes to the same files.
 
@@ -1053,17 +1053,17 @@ The sink only supports append-only streaming tables. It cannot be used to emit a
 {% highlight java %}
 
 CsvTableSink sink = new CsvTableSink(
-    path,                  // output path 
+    path,                  // output path
     "|",                   // optional: delimit files by '|'
     1,                     // optional: write to a single file
     WriteMode.OVERWRITE);  // optional: override existing files
 
 tableEnv.registerTableSink(
   "csvOutputTable",
-  sink,
   // specify table schema
   new String[]{"f0", "f1"},
-  new TypeInformation[]{Types.STRING, Types.INT});
+  new TypeInformation[]{Types.STRING, Types.INT},
+  sink);
 
 Table table = ...
 table.insertInto("csvOutputTable");
@@ -1074,17 +1074,17 @@ table.insertInto("csvOutputTable");
 {% highlight scala %}
 
 val sink: CsvTableSink = new CsvTableSink(
-    path,                             // output path 
+    path,                             // output path
     fieldDelim = "|",                 // optional: delimit files by '|'
     numFiles = 1,                     // optional: write to a single file
     writeMode = WriteMode.OVERWRITE)  // optional: override existing files
 
 tableEnv.registerTableSink(
   "csvOutputTable",
-  sink,
   // specify table schema
   Array[String]("f0", "f1"),
-  Array[TypeInformation[_]](Types.STRING, Types.INT))
+  Array[TypeInformation[_]](Types.STRING, Types.INT),
+  sink)
 
 val table: Table = ???
 table.insertInto("csvOutputTable")
@@ -1113,10 +1113,10 @@ JDBCAppendTableSink sink = JDBCAppendTableSink.builder()
 
 tableEnv.registerTableSink(
   "jdbcOutputTable",
-  sink,
   // specify table schema
   new String[]{"id"},
-  new TypeInformation[]{Types.INT});
+  new TypeInformation[]{Types.INT},
+  sink);
 
 Table table = ...
 table.insertInto("jdbcOutputTable");
@@ -1134,10 +1134,10 @@ val sink: JDBCAppendTableSink = JDBCAppendTableSink.builder()
 
 tableEnv.registerTableSink(
   "jdbcOutputTable",
-  sink,
   // specify table schema
   Array[String]("id"),
-  Array[TypeInformation[_]](Types.INT))
+  Array[TypeInformation[_]](Types.INT),
+  sink)
 
 val table: Table = ???
 table.insertInto("jdbcOutputTable")
@@ -1145,7 +1145,7 @@ table.insertInto("jdbcOutputTable")
 </div>
 </div>
 
-Similar to using <code>JDBCOutputFormat</code>, you have to explicitly specify the name of the JDBC driver, the JDBC URL, the query to be executed, and the field types of the JDBC table. 
+Similar to using <code>JDBCOutputFormat</code>, you have to explicitly specify the name of the JDBC driver, the JDBC URL, the query to be executed, and the field types of the JDBC table.
 
 {% top %}
 
@@ -1164,16 +1164,16 @@ To use the `CassandraAppendTableSink`, you have to add the Cassandra connector d
 ClusterBuilder builder = ... // configure Cassandra cluster connection
 
 CassandraAppendTableSink sink = new CassandraAppendTableSink(
-  builder, 
+  builder,
   // the query must match the schema of the table
   INSERT INTO flink.myTable (id, name, value) VALUES (?, ?, ?));
 
 tableEnv.registerTableSink(
   "cassandraOutputTable",
-  sink,
   // specify table schema
   new String[]{"id", "name", "value"},
-  new TypeInformation[]{Types.INT, Types.STRING, Types.DOUBLE});
+  new TypeInformation[]{Types.INT, Types.STRING, Types.DOUBLE},
+  sink);
 
 Table table = ...
 table.insertInto(cassandraOutputTable);
@@ -1185,16 +1185,16 @@ table.insertInto(cassandraOutputTable);
 val builder: ClusterBuilder = ... // configure Cassandra cluster connection
 
 val sink: CassandraAppendTableSink = new CassandraAppendTableSink(
-  builder, 
+  builder,
   // the query must match the schema of the table
   INSERT INTO flink.myTable (id, name, value) VALUES (?, ?, ?))
 
 tableEnv.registerTableSink(
   "cassandraOutputTable",
-  sink,
   // specify table schema
   Array[String]("id", "name", "value"),
-  Array[TypeInformation[_]](Types.INT, Types.STRING, Types.DOUBLE))
+  Array[TypeInformation[_]](Types.INT, Types.STRING, Types.DOUBLE),
+  sink)
 
 val table: Table = ???
 table.insertInto(cassandraOutputTable)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services