You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/11/01 01:52:13 UTC
[4/4] flink git commit: [FLINK-6563] [table] Add builders with time
attribute support for KafkaTableSources.
[FLINK-6563] [table] Add builders with time attribute support for KafkaTableSources.
This closes #4638.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0e92b663
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0e92b663
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0e92b663
Branch: refs/heads/master
Commit: 0e92b6632f35b69c62d7747f1cbaa3ee207fb235
Parents: 505d478
Author: Fabian Hueske <fh...@apache.org>
Authored: Thu Aug 31 14:35:41 2017 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Nov 1 00:20:38 2017 +0100
----------------------------------------------------------------------
docs/dev/table/sourceSinks.md | 400 ++++++++++++++++---
.../kafka/Kafka010AvroTableSource.java | 83 ++++
.../kafka/Kafka010JsonTableSource.java | 104 ++++-
.../connectors/kafka/Kafka010TableSource.java | 16 +-
.../kafka/Kafka010AvroTableSourceTest.java | 13 +-
.../kafka/Kafka010JsonTableSourceTest.java | 9 +-
.../kafka/Kafka011AvroTableSource.java | 83 ++++
.../kafka/Kafka011JsonTableSource.java | 104 ++++-
.../connectors/kafka/Kafka011TableSource.java | 17 +-
.../kafka/Kafka011AvroTableSourceTest.java | 13 +-
.../kafka/Kafka011JsonTableSourceTest.java | 9 +-
.../kafka/Kafka08AvroTableSource.java | 83 ++++
.../kafka/Kafka08JsonTableSource.java | 104 ++++-
.../connectors/kafka/Kafka08TableSource.java | 16 +-
.../kafka/Kafka08AvroTableSourceTest.java | 12 +-
.../kafka/Kafka08JsonTableSourceTest.java | 9 +-
.../kafka/Kafka09AvroTableSource.java | 83 ++++
.../kafka/Kafka09JsonTableSource.java | 104 ++++-
.../connectors/kafka/Kafka09TableSource.java | 16 +-
.../kafka/Kafka09AvroTableSourceTest.java | 13 +-
.../kafka/Kafka09JsonTableSourceTest.java | 9 +-
.../connectors/kafka/KafkaAvroTableSource.java | 112 +++++-
.../connectors/kafka/KafkaJsonTableSource.java | 163 +++++++-
.../connectors/kafka/KafkaTableSource.java | 299 +++++++++++++-
.../kafka/KafkaAvroTableSourceTestBase.java | 186 +++++++++
.../kafka/KafkaJsonTableSourceTestBase.java | 121 ++++++
.../kafka/KafkaTableSourceTestBase.java | 196 ++++++---
.../apache/flink/table/api/TableSchema.scala | 22 +
.../flink/table/codegen/CodeGenerator.scala | 33 +-
.../table/expressions/fieldExpression.scala | 19 +-
.../sql/StreamRecordTimestampSqlFunction.scala | 38 ++
.../plan/nodes/datastream/DataStreamScan.scala | 20 +-
.../datastream/StreamTableSourceScan.scala | 1 +
.../table/sources/DefinedFieldMapping.scala | 1 +
.../table/sources/definedTimeAttributes.scala | 2 +
.../table/sources/timestampExtractors.scala | 77 ----
.../sources/tsextractors/ExistingField.scala | 68 ++++
.../tsextractors/StreamRecordTimestamp.scala | 46 +++
.../tsextractors/TimestampExtractor.scala | 32 ++
.../table/sources/watermarkStrategies.scala | 101 -----
.../wmstrategies/AscendingTimestamps.scala | 40 ++
.../BoundedOutOfOrderTimestamps.scala | 41 ++
.../wmstrategies/watermarkStrategies.scala | 62 +++
.../validation/TableSourceValidationTest.scala | 4 +-
.../flink/table/utils/testTableSources.scala | 4 +-
45 files changed, 2520 insertions(+), 468 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/docs/dev/table/sourceSinks.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sourceSinks.md b/docs/dev/table/sourceSinks.md
index d5bb874..43542f3 100644
--- a/docs/dev/table/sourceSinks.md
+++ b/docs/dev/table/sourceSinks.md
@@ -38,106 +38,394 @@ Currently, Flink provides the `CsvTableSource` to read CSV files and a few table
A custom `TableSource` can be defined by implementing the `BatchTableSource` or `StreamTableSource` interface. See section on [defining a custom TableSource](#define-a-tablesource) for details.
| **Class name** | **Maven dependency** | **Batch?** | **Streaming?** | **Description**
-| `CsvTableSource` | `flink-table` | Y | Y | A simple source for CSV files.
-| `Kafka08JsonTableSource` | `flink-connector-kafka-0.8` | N | Y | A Kafka 0.8 source for JSON data.
-| `Kafka08AvroTableSource` | `flink-connector-kafka-0.8` | N | Y | A Kafka 0.8 source for Avro data.
-| `Kafka09JsonTableSource` | `flink-connector-kafka-0.9` | N | Y | A Kafka 0.9 source for JSON data.
-| `Kafka09AvroTableSource` | `flink-connector-kafka-0.9` | N | Y | A Kafka 0.9 source for Avro data.
-| `Kafka010JsonTableSource` | `flink-connector-kafka-0.10` | N | Y | A Kafka 0.10 source for JSON data.
-| `Kafka010AvroTableSource` | `flink-connector-kafka-0.10` | N | Y | A Kafka 0.10 source for Avro data.
-
-All sources that come with the `flink-table` dependency can be directly used by your Table programs. For all other table sources, you have to add the respective dependency in addition to the `flink-table` dependency.
+| `Kafka011AvroTableSource` | `flink-connector-kafka-0.11` | N | Y | A `TableSource` for Avro-encoded Kafka 0.11 topics.
+| `Kafka011JsonTableSource` | `flink-connector-kafka-0.11` | N | Y | A `TableSource` for flat Json-encoded Kafka 0.11 topics.
+| `Kafka010AvroTableSource` | `flink-connector-kafka-0.10` | N | Y | A `TableSource` for Avro-encoded Kafka 0.10 topics.
+| `Kafka010JsonTableSource` | `flink-connector-kafka-0.10` | N | Y | A `TableSource` for flat Json-encoded Kafka 0.10 topics.
+| `Kafka09AvroTableSource` | `flink-connector-kafka-0.9` | N | Y | A `TableSource` for Avro-encoded Kafka 0.9 topics.
+| `Kafka09JsonTableSource` | `flink-connector-kafka-0.9` | N | Y | A `TableSource` for flat Json-encoded Kafka 0.9 topics.
+| `Kafka08AvroTableSource` | `flink-connector-kafka-0.8` | N | Y | A `TableSource` for Avro-encoded Kafka 0.8 topics.
+| `Kafka08JsonTableSource` | `flink-connector-kafka-0.8` | N | Y | A `TableSource` for flat Json-encoded Kafka 0.8 topics.
+| `CsvTableSource` | `flink-table` | Y | Y | A simple `TableSource` for CSV files.
+
+All sources that come with the `flink-table` dependency are directly available for Table API or SQL programs. For all other table sources, you have to add the respective dependency in addition to the `flink-table` dependency.
{% top %}
### KafkaJsonTableSource
-To use the Kafka JSON source, you have to add the Kafka connector dependency to your project:
+A `KafkaJsonTableSource` ingests JSON-encoded messages from a Kafka topic. Currently, only JSON records with flat (non-nested) schema are supported.
- - `flink-connector-kafka-0.8` for Kafka 0.8,
- - `flink-connector-kafka-0.9` for Kafka 0.9, or
- - `flink-connector-kafka-0.10` for Kafka 0.10, respectively.
+A `KafkaJsonTableSource` is created and configured using a builder. The following example shows how to create a `KafkaJsonTableSource` with basic properties:
-You can then create the source as follows (example for Kafka 0.8):
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
-// specify JSON field names and types
-TypeInformation<Row> typeInfo = Types.ROW(
- new String[] { "id", "name", "score" },
- new TypeInformation<?>[] { Types.INT(), Types.STRING(), Types.DOUBLE() }
-);
-
-KafkaJsonTableSource kafkaTableSource = new Kafka08JsonTableSource(
- kafkaTopic,
- kafkaProperties,
- typeInfo);
+// create builder
+TableSource source = Kafka010JsonTableSource.builder()
+ // set Kafka topic
+ .forTopic("sensors")
+ // set Kafka consumer properties
+ .withKafkaProperties(kafkaProps)
+ // set Table schema
+ .withSchema(TableSchema.builder()
+ .field("sensorId", Types.LONG())
+ .field("temp", Types.DOUBLE())
+ .field("time", Types.SQL_TIMESTAMP()).build())
+ .build();
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
-// specify JSON field names and types
-val typeInfo = Types.ROW(
- Array("id", "name", "score"),
- Array(Types.INT, Types.STRING, Types.DOUBLE)
-)
+// create builder
+val source: TableSource[_] = Kafka010JsonTableSource.builder()
+ // set Kafka topic
+ .forTopic("sensors")
+ // set Kafka consumer properties
+ .withKafkaProperties(kafkaProps)
+ // set Table schema
+ .withSchema(TableSchema.builder()
+ .field("sensorId", Types.LONG)
+ .field("temp", Types.DOUBLE)
+ .field("time", Types.SQL_TIMESTAMP).build())
+ .build()
+{% endhighlight %}
+</div>
+</div>
+
+#### Optional Configuration
-val kafkaTableSource = new Kafka08JsonTableSource(
- kafkaTopic,
- kafkaProperties,
- typeInfo)
+* **Time Attributes:** Please see the sections on [configuring a rowtime attribute](#configure-a-rowtime-attribute) and [configuring a processing time attribute](#configure-a-processing-time-attribute).
+
+* **Explicit JSON parse schema:** By default, the JSON records are parsed with the table schema. You can configure an explicit JSON schema and provide a mapping from table schema fields to JSON fields as shown in the following example.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+Map<String, String> mapping = new HashMap<>();
+mapping.put("sensorId", "id");
+mapping.put("temperature", "temp");
+
+TableSource source = Kafka010JsonTableSource.builder()
+ // ...
+ // set Table schema
+ .withSchema(TableSchema.builder()
+ .field("sensorId", Types.LONG())
+ .field("temperature", Types.DOUBLE()).build())
+ // set JSON parsing schema
+ .forJsonSchema(TableSchema.builder()
+ .field("id", Types.LONG())
+ .field("temp", Types.DOUBLE()).build())
+ // set mapping from table fields to JSON fields
+ .withTableToJsonMapping(mapping)
+ .build();
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val source: TableSource[_] = Kafka010JsonTableSource.builder()
+ // ...
+ // set Table schema
+ .withSchema(TableSchema.builder()
+ .field("sensorId", Types.LONG)
+ .field("temperature", Types.DOUBLE).build())
+ // set JSON parsing schema
+ .forJsonSchema(TableSchema.builder()
+ .field("id", Types.LONG)
+ .field("temp", Types.DOUBLE).build())
+ // set mapping from table fields to JSON fields
+ .withTableToJsonMapping(Map(
+ "sensorId" -> "id",
+ "temperature" -> "temp").asJava)
+ .build()
{% endhighlight %}
</div>
</div>
-By default, a missing JSON field does not fail the source. You can configure this via:
+* **Missing Field Handling** By default, a missing JSON field is set to `null`. You can enable strict JSON parsing that will cancel the source (and query) if a field is missing.
-```java
-// Fail on missing JSON field
-tableSource.setFailOnMissingField(true);
-```
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+TableSource source = Kafka010JsonTableSource.builder()
+ // ...
+ // configure missing field behavior
+ .failOnMissingField(true)
+ .build();
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val source: TableSource[_] = Kafka010JsonTableSource.builder()
+ // ...
+ // configure missing field behavior
+ .failOnMissingField(true)
+ .build()
+{% endhighlight %}
+</div>
+</div>
{% top %}
### KafkaAvroTableSource
-The `KafkaAvroTableSource` allows you to read Avro's `SpecificRecord` objects from Kafka.
+A `KafkaAvroTableSource` ingests Avro-encoded records from a Kafka topic.
+
+A `KafkaAvroTableSource` is created and configured using a builder. The following example shows how to create a `KafkaAvroTableSource` with basic properties:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+// create builder
+TableSource source = Kafka010AvroTableSource.builder()
+ // set Kafka topic
+ .forTopic("sensors")
+ // set Kafka consumer properties
+ .withKafkaProperties(kafkaProps)
+ // set Table schema
+ .withSchema(TableSchema.builder()
+ .field("sensorId", Types.LONG())
+ .field("temp", Types.DOUBLE())
+ .field("time", Types.SQL_TIMESTAMP()).build())
+ // set class of Avro record
+ .forAvroRecordClass(SensorReading.class)
+ .build();
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+// create builder
+val source: TableSource[_] = Kafka010JsonTableSource.builder()
+ // set Kafka topic
+ .forTopic("sensors")
+ // set Kafka consumer properties
+ .withKafkaProperties(kafkaProps)
+ // set Table schema
+ .withSchema(TableSchema.builder()
+ .field("sensorId", Types.LONG)
+ .field("temp", Types.DOUBLE)
+ .field("time", Types.SQL_TIMESTAMP).build())
+ // set class of Avro record
+ .forAvroRecordClass(classOf[SensorReading])
+ .build()
+{% endhighlight %}
+</div>
+</div>
+
+**NOTE:** The specified Avro record class must provide all fields of the table schema with corresponding type.
+
+#### Optional Configuration
+
+* **Time Attributes:** Please see the sections on [configuring a rowtime attribute](#configure-a-rowtime-attribute) and [configuring a processing time attribute](#configure-a-processing-time-attribute).
+
+* **Explicit Schema Field to Avro Mapping:** By default, all fields of the table schema are mapped by name to fields of the Avro records. If the fields in the Avro records have different names, a mapping from table schema fields to Avro fields can be specified.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+Map<String, String> mapping = new HashMap<>();
+mapping.put("sensorId", "id");
+mapping.put("temperature", "temp");
+
+TableSource source = Kafka010AvroTableSource.builder()
+ // ...
+ // set Table schema
+ .withSchema(TableSchema.builder()
+ .field("sensorId", Types.LONG())
+ .field("temperature", Types.DOUBLE()).build())
+ // set class of Avro record with fields [id, temp]
+ .forAvroRecordClass(SensorReading.class)
+ // set mapping from table fields to JSON fields
+ .withTableToJsonMapping(mapping)
+ .build();
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val source: TableSource[_] = Kafka010AvroTableSource.builder()
+ // ...
+ // set Table schema
+ .withSchema(TableSchema.builder()
+ .field("sensorId", Types.LONG)
+ .field("temperature", Types.DOUBLE).build())
+ // set class of Avro record with fields [id, temp]
+ .forAvroRecordClass(classOf[SensorReading])
+ // set mapping from table fields to JSON fields
+ .withTableToJsonMapping(Map(
+ "sensorId" -> "id",
+ "temperature" -> "temp").asJava)
+ .build()
+{% endhighlight %}
+</div>
+</div>
+
+{% top %}
+
+### Configuring a Processing Time Attribute
-To use the Kafka Avro source, you have to add the Kafka connector dependency to your project:
+[Processing time attributes](streaming.html#processing-time) are commonly used in streaming queries. A processing time attribute returns the current wall-clock time of the operator that accesses it.
- - `flink-connector-kafka-0.8` for Kafka 0.8,
- - `flink-connector-kafka-0.9` for Kafka 0.9, or
- - `flink-connector-kafka-0.10` for Kafka 0.10, respectively.
+Batch queries support processing time attributes as well. However, processing time attributes are initialized with the wall-clock time of the table scan operator and keep this value throughout the query evaluation.
-You can then create the source as follows (example for Kafka 0.8):
+A table schema field of type `SQL_TIMESTAMP` can be declared as a processing time attribute as shown in the following example.
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
-// pass the generated Avro class to the TableSource
-Class<? extends SpecificRecord> clazz = MyAvroType.class;
+TableSource source = Kafka010JsonTableSource.builder()
+ // ...
+ .withSchema(TableSchema.builder()
+ .field("sensorId", Types.LONG())
+ .field("temp", Types.DOUBLE())
+ // field "ptime" is of type SQL_TIMESTAMP
+ .field("ptime", Types.SQL_TIMESTAMP()).build())
+ // declare "ptime" as processing time attribute
+ .withProctimeAttribute("ptime")
+ .build();
+{% endhighlight %}
+</div>
-KafkaAvroTableSource kafkaTableSource = new Kafka08AvroTableSource(
- kafkaTopic,
- kafkaProperties,
- clazz);
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val source: TableSource[_] = Kafka010JsonTableSource.builder()
+ // ...
+ .withSchema(TableSchema.builder()
+ .field("sensorId", Types.LONG)
+ .field("temp", Types.DOUBLE)
+ // field "ptime" is of type SQL_TIMESTAMP
+ .field("ptime", Types.SQL_TIMESTAMP).build())
+ // declare "ptime" as processing time attribute
+ .withProctimeAttribute("ptime")
+ .build()
+{% endhighlight %}
+</div>
+</div>
+
+{% top %}
+
+### Configuring a Rowtime Attribute
+
+[Rowtime attributes](streaming.html#event-time) are attributes of type `TIMESTAMP` and handled in a unified way in stream and batch queries.
+
+A table schema field of type `SQL_TIMESTAMP` can be declared as rowtime attribute by specifying
+
+* the name of the field,
+* a `TimestampExtractor` that computes the actual value for the attribute (usually from one or more other attributes), and
+* a `WatermarkStrategy` that specifies how watermarks are generated for the the rowtime attribute.
+
+The following example shows how to configure a rowtime attribute.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+TableSource source = Kafka010JsonTableSource.builder()
+ // ...
+ .withSchema(TableSchema.builder()
+ .field("sensorId", Types.LONG())
+ .field("temp", Types.DOUBLE())
+ // field "rtime" is of type SQL_TIMESTAMP
+ .field("rtime", Types.SQL_TIMESTAMP()).build())
+ .withRowtimeAttribute(
+ // "rtime" is rowtime attribute
+ "rtime",
+ // value of "rtime" is extracted from existing field with same name
+ new ExistingField("rtime"),
+ // values of "rtime" are at most out-of-order by 30 seconds
+ new BoundedOutOfOrderWatermarks(30000L))
+ .build();
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
-// pass the generated Avro class to the TableSource
-val clazz = classOf[MyAvroType]
+val source: TableSource[_] = Kafka010JsonTableSource.builder()
+ // ...
+ .withSchema(TableSchema.builder()
+ .field("sensorId", Types.LONG)
+ .field("temp", Types.DOUBLE)
+ // field "rtime" is of type SQL_TIMESTAMP
+ .field("rtime", Types.SQL_TIMESTAMP).build())
+ .withRowtimeAttribute(
+ // "rtime" is rowtime attribute
+ "rtime",
+ // value of "rtime" is extracted from existing field with same name
+ new ExistingField("rtime"),
+ // values of "rtime" are at most out-of-order by 30 seconds
+ new BoundedOutOfOrderTimestamps(30000L))
+ .build()
+{% endhighlight %}
+</div>
+</div>
+
+#### Extracting Kafka 0.10+ Timestamps into Rowtime Attribute
+
+Since Kafka 0.10, Kafka messages have a timestamp as metadata that specifies when the record was written into the Kafka topic. `KafkaTableSources` can assign Kafka's message timestamp as rowtime attribute as follows:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+TableSource source = Kafka010JsonTableSource.builder()
+ // ...
+ .withSchema(TableSchema.builder()
+ .field("sensorId", Types.LONG())
+ .field("temp", Types.DOUBLE())
+ // field "rtime" is of type SQL_TIMESTAMP
+ .field("rtime", Types.SQL_TIMESTAMP()).build())
+ // use Kafka timestamp as rowtime attribute
+ .withKafkaTimestampAsRowtimeAttribute()(
+ // "rtime" is rowtime attribute
+ "rtime",
+ // values of "rtime" are ascending
+ new AscendingTimestamps())
+ .build();
+{% endhighlight %}
+</div>
-val kafkaTableSource = new Kafka08AvroTableSource(
- kafkaTopic,
- kafkaProperties,
- clazz)
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val source: TableSource[_] = Kafka010JsonTableSource.builder()
+ // ...
+ .withSchema(TableSchema.builder()
+ .field("sensorId", Types.LONG)
+ .field("temp", Types.DOUBLE)
+ // field "rtime" is of type SQL_TIMESTAMP
+ .field("rtime", Types.SQL_TIMESTAMP).build())
+ // use Kafka timestamp as rowtime attribute
+ .withKafkaTimestampAsRowtimeAttribute()(
+ // "rtime" is rowtime attribute
+ "rtime",
+ // values of "rtime" are ascending
+ new AscendingTimestamps())
+ .build()
{% endhighlight %}
</div>
</div>
+#### Provided TimestampExtractors
+
+Flink provides `TimestampExtractor` implementations for common use cases.
+The following `TimestampExtractor` implementations are currently available:
+
+* `ExistingField(fieldName)`: Extracts the value of a rowtime attribute from an existing `LONG` or `SQL_TIMESTAMP` field.
+* `StreamRecordTimestamp()`: Extracts the value of a rowtime attribute from the timestamp of the `DataStream` `StreamRecord`. Note, this `TimestampExtractor` is not available for batch table sources.
+
+A custom `TimestampExtrator` can be defined by implementing the corresponding interface.
+
+#### Provided WatermarkStrategies
+
+Flink provides `WatermarkStrategy` implementations for common use cases.
+The following `WatermarkStrategy` implementations are currently available:
+
+* `AscendingTimestamps`: A watermark strategy for ascending timestamps. Records with timestamps that are out-of-order will be considered late.
+* `BoundedOutOfOrderTimestamps(delay)`: A watermark strategy for timestamps that are at most out-of-order by the specified delay.
+
+A custom `WatermarkStrategy` can be defined by implementing the corresponding interface.
+
{% top %}
### CsvTableSource
http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java
index 9921428..01e6329 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java
@@ -19,12 +19,17 @@
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
import org.apache.avro.specific.SpecificRecord;
import org.apache.avro.specific.SpecificRecordBase;
+import java.util.Collections;
+import java.util.Map;
import java.util.Properties;
/**
@@ -37,22 +42,100 @@ public class Kafka010AvroTableSource extends KafkaAvroTableSource {
*
* @param topic Kafka topic to consume.
* @param properties Properties for the Kafka consumer.
+ * @param schema Schema of the produced table.
* @param record Avro specific record.
*/
public Kafka010AvroTableSource(
String topic,
Properties properties,
+ TableSchema schema,
Class<? extends SpecificRecordBase> record) {
super(
topic,
properties,
+ schema,
record);
}
+ /**
+ * Sets a mapping from schema fields to fields of the produced Avro record.
+ *
+ * <p>A field mapping is required if the fields of produced tables should be named different than
+ * the fields of the Avro record.
+ * The key of the provided Map refers to the field of the table schema,
+ * the value to the field of the Avro record.</p>
+ *
+ * @param fieldMapping A mapping from schema fields to Avro fields.
+ */
+ @Override
+ public void setFieldMapping(Map<String, String> fieldMapping) {
+ super.setFieldMapping(fieldMapping);
+ }
+
+ /**
+ * Declares a field of the schema to be a processing time attribute.
+ *
+ * @param proctimeAttribute The name of the field that becomes the processing time field.
+ */
+ @Override
+ public void setProctimeAttribute(String proctimeAttribute) {
+ super.setProctimeAttribute(proctimeAttribute);
+ }
+
+ /**
+ * Declares a field of the schema to be a rowtime attribute.
+ *
+ * @param rowtimeAttributeDescriptor The descriptor of the rowtime attribute.
+ */
+ public void setRowtimeAttributeDescriptor(RowtimeAttributeDescriptor rowtimeAttributeDescriptor) {
+ Preconditions.checkNotNull(rowtimeAttributeDescriptor, "Rowtime attribute descriptor must not be null.");
+ super.setRowtimeAttributeDescriptors(Collections.singletonList(rowtimeAttributeDescriptor));
+ }
+
@Override
FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
return new FlinkKafkaConsumer010<>(topic, deserializationSchema, properties);
}
+
+ /**
+ * Returns a builder to configure and create a {@link Kafka010AvroTableSource}.
+ * @return A builder to configure and create a {@link Kafka010AvroTableSource}.
+ */
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /**
+ * A builder to configure and create a {@link Kafka010AvroTableSource}.
+ */
+ public static class Builder extends KafkaAvroTableSource.Builder<Kafka010AvroTableSource, Kafka010AvroTableSource.Builder> {
+
+ @Override
+ protected boolean supportsKafkaTimestamps() {
+ return true;
+ }
+
+ @Override
+ protected Kafka010AvroTableSource.Builder builder() {
+ return this;
+ }
+
+ /**
+ * Builds and configures a {@link Kafka010AvroTableSource}.
+ *
+ * @return A configured {@link Kafka010AvroTableSource}.
+ */
+ @Override
+ public Kafka010AvroTableSource build() {
+ Kafka010AvroTableSource tableSource = new Kafka010AvroTableSource(
+ getTopic(),
+ getKafkaProps(),
+ getTableSchema(),
+ getAvroRecordClass());
+ super.configureTableSource(tableSource);
+ return tableSource;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
index f400f6b..e263cf2 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
@@ -18,11 +18,15 @@
package org.apache.flink.streaming.connectors.kafka;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+import java.util.Collections;
+import java.util.Map;
import java.util.Properties;
/**
@@ -33,21 +37,103 @@ public class Kafka010JsonTableSource extends KafkaJsonTableSource {
/**
* Creates a Kafka 0.10 JSON {@link StreamTableSource}.
*
- * @param topic Kafka topic to consume.
- * @param properties Properties for the Kafka consumer.
- * @param typeInfo Type information describing the result type. The field names are used
- * to parse the JSON file and so are the types.
+ * @param topic Kafka topic to consume.
+ * @param properties Properties for the Kafka consumer.
+ * @param tableSchema The schema of the table.
+ * @param jsonSchema The schema of the JSON messages to decode from Kafka.
*/
public Kafka010JsonTableSource(
- String topic,
- Properties properties,
- TypeInformation<Row> typeInfo) {
+ String topic,
+ Properties properties,
+ TableSchema tableSchema,
+ TableSchema jsonSchema) {
- super(topic, properties, typeInfo);
+ super(topic, properties, tableSchema, jsonSchema);
+ }
+
+ /**
+ * Sets the flag that specifies the behavior in case of missing fields.
+ * TableSource will fail for missing fields if set to true. If set to false, the missing field is set to null.
+ *
+ * @param failOnMissingField Flag that specifies the TableSource behavior in case of missing fields.
+ */
+ @Override
+ public void setFailOnMissingField(boolean failOnMissingField) {
+ super.setFailOnMissingField(failOnMissingField);
+ }
+
+ /**
+ * Sets the mapping from table schema fields to JSON schema fields.
+ *
+ * @param fieldMapping The mapping from table schema fields to JSON schema fields.
+ */
+ @Override
+ public void setFieldMapping(Map<String, String> fieldMapping) {
+ super.setFieldMapping(fieldMapping);
+ }
+
+ /**
+ * Declares a field of the schema to be a processing time attribute.
+ *
+ * @param proctimeAttribute The name of the field that becomes the processing time field.
+ */
+ @Override
+ public void setProctimeAttribute(String proctimeAttribute) {
+ super.setProctimeAttribute(proctimeAttribute);
+ }
+
+ /**
+ * Declares a field of the schema to be a rowtime attribute.
+ *
+ * @param rowtimeAttributeDescriptor The descriptor of the rowtime attribute.
+ */
+ public void setRowtimeAttributeDescriptor(RowtimeAttributeDescriptor rowtimeAttributeDescriptor) {
+ Preconditions.checkNotNull(rowtimeAttributeDescriptor, "Rowtime attribute descriptor must not be null.");
+ super.setRowtimeAttributeDescriptors(Collections.singletonList(rowtimeAttributeDescriptor));
}
@Override
FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
return new FlinkKafkaConsumer010<>(topic, deserializationSchema, properties);
}
+
+ /**
+ * Returns a builder to configure and create a {@link Kafka010JsonTableSource}.
+ * @return A builder to configure and create a {@link Kafka010JsonTableSource}.
+ */
+ public static Kafka010JsonTableSource.Builder builder() {
+ return new Kafka010JsonTableSource.Builder();
+ }
+
+ /**
+ * A builder to configure and create a {@link Kafka010JsonTableSource}.
+ */
+ public static class Builder extends KafkaJsonTableSource.Builder<Kafka010JsonTableSource, Kafka010JsonTableSource.Builder> {
+
+ @Override
+ protected boolean supportsKafkaTimestamps() {
+ return true;
+ }
+
+ @Override
+ protected Kafka010JsonTableSource.Builder builder() {
+ return this;
+ }
+
+ /**
+ * Builds and configures a {@link Kafka010JsonTableSource}.
+ *
+ * @return A configured {@link Kafka010JsonTableSource}.
+ */
+ @Override
+ public Kafka010JsonTableSource build() {
+ Kafka010JsonTableSource tableSource = new Kafka010JsonTableSource(
+ getTopic(),
+ getKafkaProps(),
+ getTableSchema(),
+ getJsonSchema());
+ super.configureTableSource(tableSource);
+ return tableSource;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
index a6de13a..5475c9f 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.types.Row;
@@ -28,7 +29,10 @@ import java.util.Properties;
/**
* Kafka {@link StreamTableSource} for Kafka 0.10.
*/
-public class Kafka010TableSource extends Kafka09TableSource {
+public abstract class Kafka010TableSource extends KafkaTableSource {
+
+ // The deserialization schema for the Kafka records
+ private final DeserializationSchema<Row> deserializationSchema;
/**
* Creates a Kafka 0.10 {@link StreamTableSource}.
@@ -43,9 +47,17 @@ public class Kafka010TableSource extends Kafka09TableSource {
String topic,
Properties properties,
DeserializationSchema<Row> deserializationSchema,
+ TableSchema schema,
TypeInformation<Row> typeInfo) {
- super(topic, properties, deserializationSchema, typeInfo);
+ super(topic, properties, schema, typeInfo);
+
+ this.deserializationSchema = deserializationSchema;
+ }
+
+ @Override
+ public DeserializationSchema<Row> getDeserializationSchema() {
+ return this.deserializationSchema;
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceTest.java
index 025fefc..e977d49 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceTest.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceTest.java
@@ -18,25 +18,18 @@
package org.apache.flink.streaming.connectors.kafka;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.types.Row;
-import java.util.Properties;
-
/**
* Tests for the {@link Kafka010AvroTableSource}.
*/
-public class Kafka010AvroTableSourceTest extends KafkaTableSourceTestBase {
+public class Kafka010AvroTableSourceTest extends KafkaAvroTableSourceTestBase {
@Override
- protected KafkaTableSource createTableSource(String topic, Properties properties, TypeInformation<Row> typeInfo) {
-
- return new Kafka010AvroTableSource(
- topic,
- properties,
- AvroSpecificRecord.class);
+ protected KafkaTableSource.Builder getBuilder() {
+ return Kafka010AvroTableSource.builder();
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceTest.java
index 092f5ea..9a0ab04 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceTest.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceTest.java
@@ -18,21 +18,18 @@
package org.apache.flink.streaming.connectors.kafka;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
import org.apache.flink.types.Row;
-import java.util.Properties;
-
/**
* Tests for the {@link Kafka010JsonTableSource}.
*/
-public class Kafka010JsonTableSourceTest extends KafkaTableSourceTestBase {
+public class Kafka010JsonTableSourceTest extends KafkaJsonTableSourceTestBase {
@Override
- protected KafkaTableSource createTableSource(String topic, Properties properties, TypeInformation<Row> typeInfo) {
- return new Kafka010JsonTableSource(topic, properties, typeInfo);
+ protected KafkaTableSource.Builder getBuilder() {
+ return Kafka010JsonTableSource.builder();
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSource.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSource.java
index edc37cb..81d3496 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSource.java
@@ -19,12 +19,17 @@
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
import org.apache.avro.specific.SpecificRecord;
import org.apache.avro.specific.SpecificRecordBase;
+import java.util.Collections;
+import java.util.Map;
import java.util.Properties;
/**
@@ -37,22 +42,100 @@ public class Kafka011AvroTableSource extends KafkaAvroTableSource {
*
* @param topic Kafka topic to consume.
* @param properties Properties for the Kafka consumer.
+ * @param schema Schema of the produced table.
* @param record Avro specific record.
*/
public Kafka011AvroTableSource(
String topic,
Properties properties,
+ TableSchema schema,
Class<? extends SpecificRecordBase> record) {
super(
topic,
properties,
+ schema,
record);
}
+ /**
+ * Sets a mapping from schema fields to fields of the produced Avro record.
+ *
+ * <p>A field mapping is required if the fields of produced tables should be named different than
+ * the fields of the Avro record.
+ * The key of the provided Map refers to the field of the table schema,
+ * the value to the field of the Avro record.</p>
+ *
+ * @param fieldMapping A mapping from schema fields to Avro fields.
+ */
+ @Override
+ public void setFieldMapping(Map<String, String> fieldMapping) {
+ super.setFieldMapping(fieldMapping);
+ }
+
+ /**
+ * Declares a field of the schema to be a processing time attribute.
+ *
+ * @param proctimeAttribute The name of the field that becomes the processing time field.
+ */
+ @Override
+ public void setProctimeAttribute(String proctimeAttribute) {
+ super.setProctimeAttribute(proctimeAttribute);
+ }
+
+ /**
+ * Declares a field of the schema to be a rowtime attribute.
+ *
+ * @param rowtimeAttributeDescriptor The descriptor of the rowtime attribute.
+ */
+ public void setRowtimeAttributeDescriptor(RowtimeAttributeDescriptor rowtimeAttributeDescriptor) {
+ Preconditions.checkNotNull(rowtimeAttributeDescriptor, "Rowtime attribute descriptor must not be null.");
+ super.setRowtimeAttributeDescriptors(Collections.singletonList(rowtimeAttributeDescriptor));
+ }
+
@Override
FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
return new FlinkKafkaConsumer011<>(topic, deserializationSchema, properties);
}
+
+ /**
+ * Returns a builder to configure and create a {@link Kafka011AvroTableSource}.
+ * @return A builder to configure and create a {@link Kafka011AvroTableSource}.
+ */
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /**
+ * A builder to configure and create a {@link Kafka011AvroTableSource}.
+ */
+ public static class Builder extends KafkaAvroTableSource.Builder<Kafka011AvroTableSource, Kafka011AvroTableSource.Builder> {
+
+ @Override
+ protected boolean supportsKafkaTimestamps() {
+ return true;
+ }
+
+ @Override
+ protected Kafka011AvroTableSource.Builder builder() {
+ return this;
+ }
+
+ /**
+ * Builds and configures a {@link Kafka011AvroTableSource}.
+ *
+ * @return A configured {@link Kafka011AvroTableSource}.
+ */
+ @Override
+ public Kafka011AvroTableSource build() {
+ Kafka011AvroTableSource tableSource = new Kafka011AvroTableSource(
+ getTopic(),
+ getKafkaProps(),
+ getTableSchema(),
+ getAvroRecordClass());
+ super.configureTableSource(tableSource);
+ return tableSource;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSource.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSource.java
index 471c2d2..1aff670 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSource.java
@@ -18,11 +18,15 @@
package org.apache.flink.streaming.connectors.kafka;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+import java.util.Collections;
+import java.util.Map;
import java.util.Properties;
/**
@@ -33,21 +37,103 @@ public class Kafka011JsonTableSource extends KafkaJsonTableSource {
/**
* Creates a Kafka 0.11 JSON {@link StreamTableSource}.
*
- * @param topic Kafka topic to consume.
- * @param properties Properties for the Kafka consumer.
- * @param typeInfo Type information describing the result type. The field names are used
- * to parse the JSON file and so are the types.
+ * @param topic Kafka topic to consume.
+ * @param properties Properties for the Kafka consumer.
+ * @param tableSchema The schema of the table.
+ * @param jsonSchema The schema of the JSON messages to decode from Kafka.
*/
public Kafka011JsonTableSource(
- String topic,
- Properties properties,
- TypeInformation<Row> typeInfo) {
+ String topic,
+ Properties properties,
+ TableSchema tableSchema,
+ TableSchema jsonSchema) {
- super(topic, properties, typeInfo);
+ super(topic, properties, tableSchema, jsonSchema);
+ }
+
+ /**
+ * Sets the flag that specifies the behavior in case of missing fields.
+ * TableSource will fail for missing fields if set to true. If set to false, the missing field is set to null.
+ *
+ * @param failOnMissingField Flag that specifies the TableSource behavior in case of missing fields.
+ */
+ @Override
+ public void setFailOnMissingField(boolean failOnMissingField) {
+ super.setFailOnMissingField(failOnMissingField);
+ }
+
+ /**
+ * Sets the mapping from table schema fields to JSON schema fields.
+ *
+ * @param fieldMapping The mapping from table schema fields to JSON schema fields.
+ */
+ @Override
+ public void setFieldMapping(Map<String, String> fieldMapping) {
+ super.setFieldMapping(fieldMapping);
+ }
+
+ /**
+ * Declares a field of the schema to be a processing time attribute.
+ *
+ * @param proctimeAttribute The name of the field that becomes the processing time field.
+ */
+ @Override
+ public void setProctimeAttribute(String proctimeAttribute) {
+ super.setProctimeAttribute(proctimeAttribute);
+ }
+
+ /**
+ * Declares a field of the schema to be a rowtime attribute.
+ *
+ * @param rowtimeAttributeDescriptor The descriptor of the rowtime attribute.
+ */
+ public void setRowtimeAttributeDescriptor(RowtimeAttributeDescriptor rowtimeAttributeDescriptor) {
+ Preconditions.checkNotNull(rowtimeAttributeDescriptor, "Rowtime attribute descriptor must not be null.");
+ super.setRowtimeAttributeDescriptors(Collections.singletonList(rowtimeAttributeDescriptor));
}
@Override
FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
return new FlinkKafkaConsumer011<>(topic, deserializationSchema, properties);
}
+
+ /**
+ * Returns a builder to configure and create a {@link Kafka011JsonTableSource}.
+ * @return A builder to configure and create a {@link Kafka011JsonTableSource}.
+ */
+ public static Kafka011JsonTableSource.Builder builder() {
+ return new Kafka011JsonTableSource.Builder();
+ }
+
+ /**
+ * A builder to configure and create a {@link Kafka011JsonTableSource}.
+ */
+ public static class Builder extends KafkaJsonTableSource.Builder<Kafka011JsonTableSource, Kafka011JsonTableSource.Builder> {
+
+ @Override
+ protected boolean supportsKafkaTimestamps() {
+ return true;
+ }
+
+ @Override
+ protected Kafka011JsonTableSource.Builder builder() {
+ return this;
+ }
+
+ /**
+ * Builds and configures a {@link Kafka011JsonTableSource}.
+ *
+ * @return A configured {@link Kafka011JsonTableSource}.
+ */
+ @Override
+ public Kafka011JsonTableSource build() {
+ Kafka011JsonTableSource tableSource = new Kafka011JsonTableSource(
+ getTopic(),
+ getKafkaProps(),
+ getTableSchema(),
+ getJsonSchema());
+ super.configureTableSource(tableSource);
+ return tableSource;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java
index 5eaea97..576a421 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.types.Row;
@@ -28,7 +29,10 @@ import java.util.Properties;
/**
* Kafka {@link StreamTableSource} for Kafka 0.11.
*/
-public class Kafka011TableSource extends Kafka09TableSource {
+public abstract class Kafka011TableSource extends KafkaTableSource {
+
+ // The deserialization schema for the Kafka records
+ private final DeserializationSchema<Row> deserializationSchema;
/**
* Creates a Kafka 0.11 {@link StreamTableSource}.
@@ -43,13 +47,22 @@ public class Kafka011TableSource extends Kafka09TableSource {
String topic,
Properties properties,
DeserializationSchema<Row> deserializationSchema,
+ TableSchema schema,
TypeInformation<Row> typeInfo) {
- super(topic, properties, deserializationSchema, typeInfo);
+ super(topic, properties, schema, typeInfo);
+
+ this.deserializationSchema = deserializationSchema;
+ }
+
+ @Override
+ public DeserializationSchema<Row> getDeserializationSchema() {
+ return this.deserializationSchema;
}
@Override
FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
return new FlinkKafkaConsumer011<>(topic, deserializationSchema, properties);
}
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceTest.java
index e60bf17..bde0761 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceTest.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceTest.java
@@ -18,25 +18,18 @@
package org.apache.flink.streaming.connectors.kafka;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.types.Row;
-import java.util.Properties;
-
/**
* Tests for the {@link Kafka011AvroTableSource}.
*/
-public class Kafka011AvroTableSourceTest extends KafkaTableSourceTestBase {
+public class Kafka011AvroTableSourceTest extends KafkaAvroTableSourceTestBase {
@Override
- protected KafkaTableSource createTableSource(String topic, Properties properties, TypeInformation<Row> typeInfo) {
-
- return new Kafka011AvroTableSource(
- topic,
- properties,
- AvroSpecificRecord.class);
+ protected KafkaTableSource.Builder getBuilder() {
+ return Kafka011AvroTableSource.builder();
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceTest.java
index c2e256c..6870ecc 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceTest.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceTest.java
@@ -18,21 +18,18 @@
package org.apache.flink.streaming.connectors.kafka;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
import org.apache.flink.types.Row;
-import java.util.Properties;
-
/**
* Tests for the {@link Kafka011JsonTableSource}.
*/
-public class Kafka011JsonTableSourceTest extends KafkaTableSourceTestBase {
+public class Kafka011JsonTableSourceTest extends KafkaJsonTableSourceTestBase {
@Override
- protected KafkaTableSource createTableSource(String topic, Properties properties, TypeInformation<Row> typeInfo) {
- return new Kafka011JsonTableSource(topic, properties, typeInfo);
+ protected KafkaTableSource.Builder getBuilder() {
+ return Kafka011JsonTableSource.builder();
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java
index a1bea78..998f4d4 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java
@@ -19,12 +19,17 @@
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
import org.apache.avro.specific.SpecificRecord;
import org.apache.avro.specific.SpecificRecordBase;
+import java.util.Collections;
+import java.util.Map;
import java.util.Properties;
/**
@@ -37,22 +42,100 @@ public class Kafka08AvroTableSource extends KafkaAvroTableSource {
*
* @param topic Kafka topic to consume.
* @param properties Properties for the Kafka consumer.
+ * @param schema Schema of the produced table.
* @param record Avro specific record.
*/
public Kafka08AvroTableSource(
String topic,
Properties properties,
+ TableSchema schema,
Class<? extends SpecificRecordBase> record) {
super(
topic,
properties,
+ schema,
record);
}
+ /**
+ * Sets a mapping from schema fields to fields of the produced Avro record.
+ *
+ * <p>A field mapping is required if the fields of produced tables should be named different than
+ * the fields of the Avro record.
+ * The key of the provided Map refers to the field of the table schema,
+ * the value to the field of the Avro record.</p>
+ *
+ * @param fieldMapping A mapping from schema fields to Avro fields.
+ */
+ @Override
+ public void setFieldMapping(Map<String, String> fieldMapping) {
+ super.setFieldMapping(fieldMapping);
+ }
+
+ /**
+ * Declares a field of the schema to be a processing time attribute.
+ *
+ * @param proctimeAttribute The name of the field that becomes the processing time field.
+ */
+ @Override
+ public void setProctimeAttribute(String proctimeAttribute) {
+ super.setProctimeAttribute(proctimeAttribute);
+ }
+
+ /**
+ * Declares a field of the schema to be a rowtime attribute.
+ *
+ * @param rowtimeAttributeDescriptor The descriptor of the rowtime attribute.
+ */
+ public void setRowtimeAttributeDescriptor(RowtimeAttributeDescriptor rowtimeAttributeDescriptor) {
+ Preconditions.checkNotNull(rowtimeAttributeDescriptor, "Rowtime attribute descriptor must not be null.");
+ super.setRowtimeAttributeDescriptors(Collections.singletonList(rowtimeAttributeDescriptor));
+ }
+
@Override
FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
return new FlinkKafkaConsumer08<>(topic, deserializationSchema, properties);
}
+
+ /**
+ * Returns a builder to configure and create a {@link Kafka08AvroTableSource}.
+ * @return A builder to configure and create a {@link Kafka08AvroTableSource}.
+ */
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /**
+ * A builder to configure and create a {@link Kafka08AvroTableSource}.
+ */
+ public static class Builder extends KafkaAvroTableSource.Builder<Kafka08AvroTableSource, Kafka08AvroTableSource.Builder> {
+
+ @Override
+ protected boolean supportsKafkaTimestamps() {
+ return false;
+ }
+
+ @Override
+ protected Kafka08AvroTableSource.Builder builder() {
+ return this;
+ }
+
+ /**
+ * Builds and configures a {@link Kafka08AvroTableSource}.
+ *
+ * @return A configured {@link Kafka08AvroTableSource}.
+ */
+ @Override
+ public Kafka08AvroTableSource build() {
+ Kafka08AvroTableSource tableSource = new Kafka08AvroTableSource(
+ getTopic(),
+ getKafkaProps(),
+ getTableSchema(),
+ getAvroRecordClass());
+ super.configureTableSource(tableSource);
+ return tableSource;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java
index 05a2c71..aab9ea8 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java
@@ -18,11 +18,15 @@
package org.apache.flink.streaming.connectors.kafka;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+import java.util.Collections;
+import java.util.Map;
import java.util.Properties;
/**
@@ -33,21 +37,103 @@ public class Kafka08JsonTableSource extends KafkaJsonTableSource {
/**
* Creates a Kafka 0.8 JSON {@link StreamTableSource}.
*
- * @param topic Kafka topic to consume.
- * @param properties Properties for the Kafka consumer.
- * @param typeInfo Type information describing the result type. The field names are used
- * to parse the JSON file and so are the types.
+ * @param topic Kafka topic to consume.
+ * @param properties Properties for the Kafka consumer.
+ * @param tableSchema The schema of the table.
+ * @param jsonSchema The schema of the JSON messages to decode from Kafka.
*/
public Kafka08JsonTableSource(
- String topic,
- Properties properties,
- TypeInformation<Row> typeInfo) {
+ String topic,
+ Properties properties,
+ TableSchema tableSchema,
+ TableSchema jsonSchema) {
- super(topic, properties, typeInfo);
+ super(topic, properties, tableSchema, jsonSchema);
+ }
+
+ /**
+ * Sets the flag that specifies the behavior in case of missing fields.
+ * TableSource will fail for missing fields if set to true. If set to false, the missing field is set to null.
+ *
+ * @param failOnMissingField Flag that specifies the TableSource behavior in case of missing fields.
+ */
+ @Override
+ public void setFailOnMissingField(boolean failOnMissingField) {
+ super.setFailOnMissingField(failOnMissingField);
+ }
+
+ /**
+ * Sets the mapping from table schema fields to JSON schema fields.
+ *
+ * @param fieldMapping The mapping from table schema fields to JSON schema fields.
+ */
+ @Override
+ public void setFieldMapping(Map<String, String> fieldMapping) {
+ super.setFieldMapping(fieldMapping);
+ }
+
+ /**
+ * Declares a field of the schema to be a processing time attribute.
+ *
+ * @param proctimeAttribute The name of the field that becomes the processing time field.
+ */
+ @Override
+ public void setProctimeAttribute(String proctimeAttribute) {
+ super.setProctimeAttribute(proctimeAttribute);
+ }
+
+ /**
+ * Declares a field of the schema to be a rowtime attribute.
+ *
+ * @param rowtimeAttributeDescriptor The descriptor of the rowtime attribute.
+ */
+ public void setRowtimeAttributeDescriptor(RowtimeAttributeDescriptor rowtimeAttributeDescriptor) {
+ Preconditions.checkNotNull(rowtimeAttributeDescriptor, "Rowtime attribute descriptor must not be null.");
+ super.setRowtimeAttributeDescriptors(Collections.singletonList(rowtimeAttributeDescriptor));
}
@Override
FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
return new FlinkKafkaConsumer08<>(topic, deserializationSchema, properties);
}
+
+ /**
+ * Returns a builder to configure and create a {@link Kafka08JsonTableSource}.
+ * @return A builder to configure and create a {@link Kafka08JsonTableSource}.
+ */
+ public static Kafka08JsonTableSource.Builder builder() {
+ return new Kafka08JsonTableSource.Builder();
+ }
+
+ /**
+ * A builder to configure and create a {@link Kafka08JsonTableSource}.
+ */
+ public static class Builder extends KafkaJsonTableSource.Builder<Kafka08JsonTableSource, Kafka08JsonTableSource.Builder> {
+
+ @Override
+ protected boolean supportsKafkaTimestamps() {
+ return false;
+ }
+
+ @Override
+ protected Kafka08JsonTableSource.Builder builder() {
+ return this;
+ }
+
+ /**
+ * Builds and configures a {@link Kafka08JsonTableSource}.
+ *
+ * @return A configured {@link Kafka08JsonTableSource}.
+ */
+ @Override
+ public Kafka08JsonTableSource build() {
+ Kafka08JsonTableSource tableSource = new Kafka08JsonTableSource(
+ getTopic(),
+ getKafkaProps(),
+ getTableSchema(),
+ getJsonSchema());
+ super.configureTableSource(tableSource);
+ return tableSource;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
index 9536306..b2b949b 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.types.Row;
@@ -28,7 +29,10 @@ import java.util.Properties;
/**
* Kafka {@link StreamTableSource} for Kafka 0.8.
*/
-public class Kafka08TableSource extends KafkaTableSource {
+public abstract class Kafka08TableSource extends KafkaTableSource {
+
+ // The deserialization schema for the Kafka records
+ private final DeserializationSchema<Row> deserializationSchema;
/**
* Creates a Kafka 0.8 {@link StreamTableSource}.
@@ -43,9 +47,17 @@ public class Kafka08TableSource extends KafkaTableSource {
String topic,
Properties properties,
DeserializationSchema<Row> deserializationSchema,
+ TableSchema schema,
TypeInformation<Row> typeInfo) {
- super(topic, properties, deserializationSchema, typeInfo);
+ super(topic, properties, schema, typeInfo);
+
+ this.deserializationSchema = deserializationSchema;
+ }
+
+ @Override
+ public DeserializationSchema<Row> getDeserializationSchema() {
+ return this.deserializationSchema;
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSourceTest.java
index a704c2f..348f0f2 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSourceTest.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSourceTest.java
@@ -18,24 +18,18 @@
package org.apache.flink.streaming.connectors.kafka;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.types.Row;
-import java.util.Properties;
-
/**
* Tests for the {@link Kafka08AvroTableSource}.
*/
-public class Kafka08AvroTableSourceTest extends KafkaTableSourceTestBase {
+public class Kafka08AvroTableSourceTest extends KafkaAvroTableSourceTestBase {
@Override
- protected KafkaTableSource createTableSource(String topic, Properties properties, TypeInformation<Row> typeInfo) {
- return new Kafka08AvroTableSource(
- topic,
- properties,
- AvroSpecificRecord.class);
+ protected KafkaTableSource.Builder getBuilder() {
+ return Kafka08AvroTableSource.builder();
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java
index adcd3a2..01a0123 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java
@@ -18,21 +18,18 @@
package org.apache.flink.streaming.connectors.kafka;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
import org.apache.flink.types.Row;
-import java.util.Properties;
-
/**
* Tests for the {@link Kafka08JsonTableSource}.
*/
-public class Kafka08JsonTableSourceTest extends KafkaTableSourceTestBase {
+public class Kafka08JsonTableSourceTest extends KafkaJsonTableSourceTestBase {
@Override
- protected KafkaTableSource createTableSource(String topic, Properties properties, TypeInformation<Row> typeInfo) {
- return new Kafka08JsonTableSource(topic, properties, typeInfo);
+ protected KafkaTableSource.Builder getBuilder() {
+ return Kafka08JsonTableSource.builder();
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSource.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSource.java
index d69187e..a90a8d8 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSource.java
@@ -19,12 +19,17 @@
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
import org.apache.avro.specific.SpecificRecord;
import org.apache.avro.specific.SpecificRecordBase;
+import java.util.Collections;
+import java.util.Map;
import java.util.Properties;
/**
@@ -37,22 +42,100 @@ public class Kafka09AvroTableSource extends KafkaAvroTableSource {
*
* @param topic Kafka topic to consume.
* @param properties Properties for the Kafka consumer.
+ * @param schema Schema of the produced table.
* @param record Avro specific record.
*/
public Kafka09AvroTableSource(
String topic,
Properties properties,
+ TableSchema schema,
Class<? extends SpecificRecordBase> record) {
super(
topic,
properties,
+ schema,
record);
}
+ /**
+ * Sets a mapping from schema fields to fields of the produced Avro record.
+ *
+ * <p>A field mapping is required if the fields of produced tables should be named different than
+ * the fields of the Avro record.
+ * The key of the provided Map refers to the field of the table schema,
+ * the value to the field of the Avro record.</p>
+ *
+ * @param fieldMapping A mapping from schema fields to Avro fields.
+ */
+ @Override
+ public void setFieldMapping(Map<String, String> fieldMapping) {
+ super.setFieldMapping(fieldMapping);
+ }
+
+ /**
+ * Declares a field of the schema to be a processing time attribute.
+ *
+ * @param proctimeAttribute The name of the field that becomes the processing time field.
+ */
+ @Override
+ public void setProctimeAttribute(String proctimeAttribute) {
+ super.setProctimeAttribute(proctimeAttribute);
+ }
+
+ /**
+ * Declares a field of the schema to be a rowtime attribute.
+ *
+ * @param rowtimeAttributeDescriptor The descriptor of the rowtime attribute.
+ */
+ public void setRowtimeAttributeDescriptor(RowtimeAttributeDescriptor rowtimeAttributeDescriptor) {
+ Preconditions.checkNotNull(rowtimeAttributeDescriptor, "Rowtime attribute descriptor must not be null.");
+ super.setRowtimeAttributeDescriptors(Collections.singletonList(rowtimeAttributeDescriptor));
+ }
+
@Override
FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
return new FlinkKafkaConsumer09<>(topic, deserializationSchema, properties);
}
+
+ /**
+ * Returns a builder to configure and create a {@link Kafka09AvroTableSource}.
+ * @return A builder to configure and create a {@link Kafka09AvroTableSource}.
+ */
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /**
+ * A builder to configure and create a {@link Kafka09AvroTableSource}.
+ */
+ public static class Builder extends KafkaAvroTableSource.Builder<Kafka09AvroTableSource, Kafka09AvroTableSource.Builder> {
+
+ @Override
+ protected boolean supportsKafkaTimestamps() {
+ return false;
+ }
+
+ @Override
+ protected Kafka09AvroTableSource.Builder builder() {
+ return this;
+ }
+
+ /**
+ * Builds and configures a {@link Kafka09AvroTableSource}.
+ *
+ * @return A configured {@link Kafka09AvroTableSource}.
+ */
+ @Override
+ public Kafka09AvroTableSource build() {
+ Kafka09AvroTableSource tableSource = new Kafka09AvroTableSource(
+ getTopic(),
+ getKafkaProps(),
+ getTableSchema(),
+ getAvroRecordClass());
+ super.configureTableSource(tableSource);
+ return tableSource;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
index 80811b2..2f057d7 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
@@ -18,11 +18,15 @@
package org.apache.flink.streaming.connectors.kafka;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+import java.util.Collections;
+import java.util.Map;
import java.util.Properties;
/**
@@ -33,21 +37,103 @@ public class Kafka09JsonTableSource extends KafkaJsonTableSource {
/**
* Creates a Kafka 0.9 JSON {@link StreamTableSource}.
*
- * @param topic Kafka topic to consume.
- * @param properties Properties for the Kafka consumer.
- * @param typeInfo Type information describing the result type. The field names are used
- * to parse the JSON file and so are the types.
+ * @param topic Kafka topic to consume.
+ * @param properties Properties for the Kafka consumer.
+ * @param tableSchema The schema of the table.
+ * @param jsonSchema The schema of the JSON messages to decode from Kafka.
*/
public Kafka09JsonTableSource(
- String topic,
- Properties properties,
- TypeInformation<Row> typeInfo) {
+ String topic,
+ Properties properties,
+ TableSchema tableSchema,
+ TableSchema jsonSchema) {
- super(topic, properties, typeInfo);
+ super(topic, properties, tableSchema, jsonSchema);
+ }
+
+ /**
+ * Sets the flag that specifies the behavior in case of missing fields.
+ * TableSource will fail for missing fields if set to true. If set to false, the missing field is set to null.
+ *
+ * @param failOnMissingField Flag that specifies the TableSource behavior in case of missing fields.
+ */
+ @Override
+ public void setFailOnMissingField(boolean failOnMissingField) {
+ super.setFailOnMissingField(failOnMissingField);
+ }
+
+ /**
+ * Sets the mapping from table schema fields to JSON schema fields.
+ *
+ * @param fieldMapping The mapping from table schema fields to JSON schema fields.
+ */
+ @Override
+ public void setFieldMapping(Map<String, String> fieldMapping) {
+ super.setFieldMapping(fieldMapping);
+ }
+
+ /**
+ * Declares a field of the schema to be a processing time attribute.
+ *
+ * @param proctimeAttribute The name of the field that becomes the processing time field.
+ */
+ @Override
+ public void setProctimeAttribute(String proctimeAttribute) {
+ super.setProctimeAttribute(proctimeAttribute);
+ }
+
+ /**
+ * Declares a field of the schema to be a rowtime attribute.
+ *
+ * @param rowtimeAttributeDescriptor The descriptor of the rowtime attribute.
+ */
+ public void setRowtimeAttributeDescriptor(RowtimeAttributeDescriptor rowtimeAttributeDescriptor) {
+ Preconditions.checkNotNull(rowtimeAttributeDescriptor, "Rowtime attribute descriptor must not be null.");
+ super.setRowtimeAttributeDescriptors(Collections.singletonList(rowtimeAttributeDescriptor));
}
@Override
FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
return new FlinkKafkaConsumer09<>(topic, deserializationSchema, properties);
}
+
+ /**
+ * Returns a builder to configure and create a {@link Kafka09JsonTableSource}.
+ * @return A builder to configure and create a {@link Kafka09JsonTableSource}.
+ */
+ public static Kafka09JsonTableSource.Builder builder() {
+ return new Kafka09JsonTableSource.Builder();
+ }
+
+ /**
+ * A builder to configure and create a {@link Kafka09JsonTableSource}.
+ */
+ public static class Builder extends KafkaJsonTableSource.Builder<Kafka09JsonTableSource, Kafka09JsonTableSource.Builder> {
+
+ @Override
+ protected boolean supportsKafkaTimestamps() {
+ return false;
+ }
+
+ @Override
+ protected Kafka09JsonTableSource.Builder builder() {
+ return this;
+ }
+
+ /**
+ * Builds and configures a {@link Kafka09JsonTableSource}.
+ *
+ * @return A configured {@link Kafka09JsonTableSource}.
+ */
+ @Override
+ public Kafka09JsonTableSource build() {
+ Kafka09JsonTableSource tableSource = new Kafka09JsonTableSource(
+ getTopic(),
+ getKafkaProps(),
+ getTableSchema(),
+ getJsonSchema());
+ super.configureTableSource(tableSource);
+ return tableSource;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
index bc50a4c..4d1166c 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.types.Row;
@@ -28,7 +29,10 @@ import java.util.Properties;
/**
* Kafka {@link StreamTableSource} for Kafka 0.9.
*/
-public class Kafka09TableSource extends KafkaTableSource {
+public abstract class Kafka09TableSource extends KafkaTableSource {
+
+ // The deserialization schema for the Kafka records
+ private final DeserializationSchema<Row> deserializationSchema;
/**
* Creates a Kafka 0.9 {@link StreamTableSource}.
@@ -43,9 +47,17 @@ public class Kafka09TableSource extends KafkaTableSource {
String topic,
Properties properties,
DeserializationSchema<Row> deserializationSchema,
+ TableSchema schema,
TypeInformation<Row> typeInfo) {
- super(topic, properties, deserializationSchema, typeInfo);
+ super(topic, properties, schema, typeInfo);
+
+ this.deserializationSchema = deserializationSchema;
+ }
+
+ @Override
+ public DeserializationSchema<Row> getDeserializationSchema() {
+ return this.deserializationSchema;
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSourceTest.java
index 5e3c42c..27445e1 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSourceTest.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSourceTest.java
@@ -18,25 +18,18 @@
package org.apache.flink.streaming.connectors.kafka;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.types.Row;
-import java.util.Properties;
-
/**
* Tests for the {@link Kafka09AvroTableSource}.
*/
-public class Kafka09AvroTableSourceTest extends KafkaTableSourceTestBase {
+public class Kafka09AvroTableSourceTest extends KafkaAvroTableSourceTestBase {
@Override
- protected KafkaTableSource createTableSource(String topic, Properties properties, TypeInformation<Row> typeInfo) {
-
- return new Kafka09AvroTableSource(
- topic,
- properties,
- AvroSpecificRecord.class);
+ protected KafkaTableSource.Builder getBuilder() {
+ return Kafka09AvroTableSource.builder();
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java
index ec70386..ed3fafb 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java
@@ -18,21 +18,18 @@
package org.apache.flink.streaming.connectors.kafka;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
import org.apache.flink.types.Row;
-import java.util.Properties;
-
/**
* Tests for the {@link Kafka09JsonTableSource}.
*/
-public class Kafka09JsonTableSourceTest extends KafkaTableSourceTestBase {
+public class Kafka09JsonTableSourceTest extends KafkaJsonTableSourceTestBase {
@Override
- protected KafkaTableSource createTableSource(String topic, Properties properties, TypeInformation<Row> typeInfo) {
- return new Kafka09JsonTableSource(topic, properties, typeInfo);
+ protected KafkaTableSource.Builder getBuilder() {
+ return Kafka09JsonTableSource.builder();
}
@Override