You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/07/15 09:24:54 UTC
[09/13] flink git commit: [FLINK-8558] [table] Add unified format
interfaces and separate formats from connectors
[FLINK-8558] [table] Add unified format interfaces and separate formats from connectors
This PR introduces a format discovery mechanism based on Java Service Providers. The general `TableFormatFactory` is similar to the existing table source discovery mechanism. However, it allows for arbirary format interfaces that might be introduced in the future. At the moment, a connector can request configured instances of `DeserializationSchema` and `SerializationSchema`. In the future we can add interfaces such as a `Writer` or `KeyedSerializationSchema` without breaking backwards compatibility.
This PR deprecates the existing strong coupling of connector and format for the Kafa table sources and table source factories. It introduces descriptor-based alternatives.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ee40335f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ee40335f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ee40335f
Branch: refs/heads/master
Commit: ee40335ffa40fb32a692fa6be70946d9a70301b2
Parents: 1632681
Author: Timo Walther <tw...@apache.org>
Authored: Wed Jun 27 13:16:49 2018 +0200
Committer: Timo Walther <tw...@apache.org>
Committed: Sun Jul 15 09:51:28 2018 +0200
----------------------------------------------------------------------
docs/dev/table/sqlClient.md | 2 +
.../flink-connector-kafka-0.10/pom.xml | 53 +++
.../kafka/Kafka010AvroTableSource.java | 31 +-
.../kafka/Kafka010AvroTableSourceFactory.java | 37 --
.../kafka/Kafka010JsonTableSource.java | 32 +-
.../kafka/Kafka010JsonTableSourceFactory.java | 37 --
.../connectors/kafka/Kafka010TableSource.java | 71 +++-
.../kafka/Kafka010TableSourceFactory.java | 72 ++++
...pache.flink.table.sources.TableSourceFactory | 3 +-
.../Kafka010AvroTableSourceFactoryTest.java | 37 --
.../kafka/Kafka010AvroTableSourceTest.java | 4 +
.../Kafka010JsonTableSourceFactoryTest.java | 6 +-
.../kafka/Kafka010JsonTableSourceTest.java | 4 +
.../kafka/Kafka010TableSourceFactoryTest.java | 74 ++++
.../kafka/Kafka011AvroTableSource.java | 30 +-
.../kafka/Kafka011AvroTableSourceFactory.java | 37 --
.../kafka/Kafka011JsonTableSource.java | 33 +-
.../kafka/Kafka011JsonTableSourceFactory.java | 37 --
.../connectors/kafka/Kafka011TableSource.java | 70 +++-
.../kafka/Kafka011TableSourceFactory.java | 72 ++++
...pache.flink.table.sources.TableSourceFactory | 3 +-
.../Kafka011AvroTableSourceFactoryTest.java | 37 --
.../kafka/Kafka011AvroTableSourceTest.java | 4 +
.../Kafka011JsonTableSourceFactoryTest.java | 6 +-
.../kafka/Kafka011JsonTableSourceTest.java | 4 +
.../kafka/Kafka011TableSourceFactoryTest.java | 74 ++++
.../kafka/Kafka08AvroTableSource.java | 31 +-
.../kafka/Kafka08AvroTableSourceFactory.java | 37 --
.../kafka/Kafka08JsonTableSource.java | 32 +-
.../kafka/Kafka08JsonTableSourceFactory.java | 37 --
.../connectors/kafka/Kafka08TableSource.java | 69 +++-
.../kafka/Kafka08TableSourceFactory.java | 72 ++++
...pache.flink.table.sources.TableSourceFactory | 3 +-
.../Kafka08AvroTableSourceFactoryTest.java | 37 --
.../kafka/Kafka08AvroTableSourceTest.java | 4 +
.../kafka/Kafka08JsonTableSourceTest.java | 4 +
.../kafka/Kafka08TableSourceFactoryTest.java | 74 ++++
.../flink-connector-kafka-0.9/pom.xml | 52 +++
.../kafka/Kafka09AvroTableSource.java | 29 +-
.../kafka/Kafka09AvroTableSourceFactory.java | 36 --
.../kafka/Kafka09JsonTableSource.java | 32 +-
.../kafka/Kafka09JsonTableSourceFactory.java | 37 --
.../connectors/kafka/Kafka09TableSource.java | 69 +++-
.../kafka/Kafka09TableSourceFactory.java | 72 ++++
...pache.flink.table.sources.TableSourceFactory | 3 +-
.../Kafka09AvroTableSourceFactoryTest.java | 37 --
.../kafka/Kafka09AvroTableSourceTest.java | 4 +
.../Kafka09JsonTableSourceFactoryTest.java | 4 +
.../kafka/Kafka09JsonTableSourceTest.java | 4 +
.../kafka/Kafka09TableSourceFactoryTest.java | 74 ++++
.../connectors/kafka/KafkaAvroTableSource.java | 83 ++---
.../kafka/KafkaAvroTableSourceFactory.java | 81 -----
.../connectors/kafka/KafkaJsonTableSource.java | 99 ++---
.../kafka/KafkaJsonTableSourceFactory.java | 96 -----
.../connectors/kafka/KafkaTableSource.java | 266 +++++++++++---
.../kafka/KafkaTableSourceFactory.java | 195 +++++-----
.../KafkaAvroTableSourceFactoryTestBase.java | 123 -------
.../kafka/KafkaAvroTableSourceTestBase.java | 6 +-
.../KafkaJsonTableSourceFactoryTestBase.java | 18 +-
.../kafka/KafkaJsonTableSourceTestBase.java | 6 +-
.../kafka/KafkaTableSourceBuilderTestBase.java | 264 ++++++++++++++
.../kafka/KafkaTableSourceFactoryTestBase.java | 189 ++++++++++
.../kafka/KafkaTableSourceTestBase.java | 260 -------------
.../avro/AvroRowDeserializationSchema.java | 19 +
.../formats/avro/AvroRowFormatFactory.java | 99 +++++
.../avro/AvroRowSerializationSchema.java | 18 +
...pache.flink.table.formats.TableFormatFactory | 16 +
.../formats/avro/AvroRowFormatFactoryTest.java | 104 ++++++
.../json/JsonRowDeserializationSchema.java | 20 +-
.../formats/json/JsonRowFormatFactory.java | 108 ++++++
.../formats/json/JsonRowSchemaConverter.java | 362 +++++++++++++++++++
.../json/JsonRowSerializationSchema.java | 20 +-
.../flink/formats/json/JsonSchemaConverter.java | 362 -------------------
.../apache/flink/table/descriptors/Json.java | 3 +-
.../flink/table/descriptors/JsonValidator.java | 2 +-
...pache.flink.table.formats.TableFormatFactory | 16 +
.../formats/json/JsonRowFormatFactoryTest.java | 150 ++++++++
.../json/JsonRowSchemaConverterTest.java | 114 ++++++
.../json/JsonRowSerializationSchemaTest.java | 2 +-
.../formats/json/JsonSchemaConverterTest.java | 114 ------
.../flink/table/api/BatchTableEnvironment.scala | 49 ++-
.../table/api/StreamTableEnvironment.scala | 49 ++-
.../flink/table/api/TableEnvironment.scala | 38 ++
.../apache/flink/table/api/TableSchema.scala | 10 +
.../org/apache/flink/table/api/exceptions.scala | 80 ++++
.../descriptors/DescriptorProperties.scala | 17 +-
.../descriptors/FormatDescriptorValidator.scala | 5 +
.../table/descriptors/RowtimeValidator.scala | 43 ++-
.../table/descriptors/SchemaValidator.scala | 46 ++-
.../formats/DeserializationSchemaFactory.scala | 41 +++
.../formats/SerializationSchemaFactory.scala | 41 +++
.../table/formats/TableFormatFactory.scala | 85 +++++
.../formats/TableFormatFactoryService.scala | 271 ++++++++++++++
.../table/sources/CsvTableSourceFactory.scala | 5 +-
.../table/sources/DefinedFieldMapping.scala | 5 +-
.../table/sources/TableSourceFactory.scala | 9 +
.../sources/TableSourceFactoryService.scala | 35 +-
.../table/sources/definedTimeAttributes.scala | 2 +
...pache.flink.table.formats.TableFormatFactory | 17 +
...pache.flink.table.sources.TableSourceFactory | 3 +-
.../flink/table/descriptors/RowtimeTest.scala | 7 +-
.../flink/table/descriptors/SchemaTest.scala | 5 +-
.../StreamTableSourceDescriptorTest.scala | 79 ----
.../descriptors/TableSourceDescriptorTest.scala | 111 ++++++
.../formats/TableFormatFactoryServiceTest.scala | 144 ++++++++
.../utils/TestAmbiguousTableFormatFactory.scala | 52 +++
.../utils/TestDeserializationSchema.scala | 50 +++
.../formats/utils/TestSerializationSchema.scala | 30 ++
.../table/formats/utils/TestTableFormat.scala | 33 ++
.../formats/utils/TestTableFormatFactory.scala | 65 ++++
.../sources/TableSourceFactoryServiceTest.scala | 22 +-
.../sources/TestFixedFormatTableFactory.scala | 62 ++++
.../table/sources/TestTableSourceFactory.scala | 63 ----
.../TestWildcardFormatTableSourceFactory.scala | 56 +++
.../table/utils/MockTableEnvironment.scala | 3 +
115 files changed, 4515 insertions(+), 2056 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/docs/dev/table/sqlClient.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sqlClient.md b/docs/dev/table/sqlClient.md
index 2ba8167..24af655 100644
--- a/docs/dev/table/sqlClient.md
+++ b/docs/dev/table/sqlClient.md
@@ -229,6 +229,8 @@ The SQL Client does not require to setup a Java project using Maven or SBT. Inst
| Name | Version | Download |
| :---------------- | :------------ | :--------------------- |
| Filesystem | | Built-in |
+| Apache Kafka | 0.9 | [Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.9{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.9{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar) |
+| Apache Kafka | 0.10 | [Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.10{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.10{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar) |
| Apache Kafka | 0.11 | [Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.11{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.11{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar) |
#### Formats
http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-connectors/flink-connector-kafka-0.10/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/pom.xml b/flink-connectors/flink-connector-kafka-0.10/pom.xml
index 22efc34..2fb7a32 100644
--- a/flink-connectors/flink-connector-kafka-0.10/pom.xml
+++ b/flink-connectors/flink-connector-kafka-0.10/pom.xml
@@ -202,6 +202,59 @@ under the License.
</dependencies>
+ <profiles>
+ <profile>
+ <!-- Create SQL Client uber jars for releases -->
+ <id>release</id>
+ <activation>
+ <property>
+ <name>release</name>
+ </property>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <shadedArtifactAttached>true</shadedArtifactAttached>
+ <shadedClassifierName>sql-jar</shadedClassifierName>
+ <artifactSet>
+ <includes combine.children="append">
+ <include>org.apache.kafka:*</include>
+ <include>org.apache.flink:flink-connector-kafka-base_${scala.binary.version}</include>
+ <include>org.apache.flink:flink-connector-kafka-0.9_${scala.binary.version}</include>
+ </includes>
+ </artifactSet>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>kafka/kafka-version.properties</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ <relocations>
+ <relocation>
+ <pattern>org.apache.kafka</pattern>
+ <shadedPattern>org.apache.flink.kafka010.shaded.org.apache.kafka</shadedPattern>
+ </relocation>
+ </relocations>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+
<build>
<plugins>
<plugin>
http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java
index f759f61..ebbadcf 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java
@@ -18,9 +18,9 @@
package org.apache.flink.streaming.connectors.kafka;
-import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.ConnectorDescriptor;
import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.types.Row;
@@ -35,8 +35,13 @@ import java.util.Properties;
/**
* Kafka {@link StreamTableSource} for Kafka 0.10.
+ *
+ * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
+ * with descriptors for schema and format instead. Descriptors allow for
+ * implementation-agnostic definition of tables. See also
+ * {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
*/
-@PublicEvolving
+@Deprecated
public class Kafka010AvroTableSource extends KafkaAvroTableSource {
/**
@@ -46,7 +51,9 @@ public class Kafka010AvroTableSource extends KafkaAvroTableSource {
* @param properties Properties for the Kafka consumer.
* @param schema Schema of the produced table.
* @param record Avro specific record.
+ * @deprecated Use table descriptors instead of implementation-specific builders.
*/
+ @Deprecated
public Kafka010AvroTableSource(
String topic,
Properties properties,
@@ -69,7 +76,9 @@ public class Kafka010AvroTableSource extends KafkaAvroTableSource {
* the value to the field of the Avro record.</p>
*
* @param fieldMapping A mapping from schema fields to Avro fields.
+ * @deprecated Use table descriptors instead of implementation-specific builders.
*/
+ @Deprecated
@Override
public void setFieldMapping(Map<String, String> fieldMapping) {
super.setFieldMapping(fieldMapping);
@@ -79,7 +88,9 @@ public class Kafka010AvroTableSource extends KafkaAvroTableSource {
* Declares a field of the schema to be a processing time attribute.
*
* @param proctimeAttribute The name of the field that becomes the processing time field.
+ * @deprecated Use table descriptors instead of implementation-specific builders.
*/
+ @Deprecated
@Override
public void setProctimeAttribute(String proctimeAttribute) {
super.setProctimeAttribute(proctimeAttribute);
@@ -89,7 +100,9 @@ public class Kafka010AvroTableSource extends KafkaAvroTableSource {
* Declares a field of the schema to be a rowtime attribute.
*
* @param rowtimeAttributeDescriptor The descriptor of the rowtime attribute.
+ * @deprecated Use table descriptors instead of implementation-specific builders.
*/
+ @Deprecated
public void setRowtimeAttributeDescriptor(RowtimeAttributeDescriptor rowtimeAttributeDescriptor) {
Preconditions.checkNotNull(rowtimeAttributeDescriptor, "Rowtime attribute descriptor must not be null.");
super.setRowtimeAttributeDescriptors(Collections.singletonList(rowtimeAttributeDescriptor));
@@ -102,15 +115,27 @@ public class Kafka010AvroTableSource extends KafkaAvroTableSource {
/**
* Returns a builder to configure and create a {@link Kafka010AvroTableSource}.
+ *
* @return A builder to configure and create a {@link Kafka010AvroTableSource}.
+ * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
+ * with descriptors for schema and format instead. Descriptors allow for
+ * implementation-agnostic definition of tables. See also
+ * {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
*/
+ @Deprecated
public static Builder builder() {
return new Builder();
}
/**
* A builder to configure and create a {@link Kafka010AvroTableSource}.
+ *
+ * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
+ * with descriptors for schema and format instead. Descriptors allow for
+ * implementation-agnostic definition of tables. See also
+ * {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
*/
+ @Deprecated
public static class Builder extends KafkaAvroTableSource.Builder<Kafka010AvroTableSource, Kafka010AvroTableSource.Builder> {
@Override
@@ -127,7 +152,9 @@ public class Kafka010AvroTableSource extends KafkaAvroTableSource {
* Builds and configures a {@link Kafka010AvroTableSource}.
*
* @return A configured {@link Kafka010AvroTableSource}.
+ * @deprecated Use table descriptors instead of implementation-specific builders.
*/
+ @Deprecated
@Override
public Kafka010AvroTableSource build() {
Kafka010AvroTableSource tableSource = new Kafka010AvroTableSource(
http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceFactory.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceFactory.java
deleted file mode 100644
index 3972c3f..0000000
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceFactory.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_010;
-
-/**
- * Factory for creating configured instances of {@link Kafka010AvroTableSource}.
- */
-public class Kafka010AvroTableSourceFactory extends KafkaAvroTableSourceFactory {
-
- @Override
- protected KafkaAvroTableSource.Builder createKafkaAvroBuilder() {
- return new Kafka010AvroTableSource.Builder();
- }
-
- @Override
- protected String kafkaVersion() {
- return CONNECTOR_VERSION_VALUE_010;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
index bda236f..a5e33a1 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
@@ -18,9 +18,9 @@
package org.apache.flink.streaming.connectors.kafka;
-import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.ConnectorDescriptor;
import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.types.Row;
@@ -32,8 +32,13 @@ import java.util.Properties;
/**
* Kafka {@link StreamTableSource} for Kafka 0.10.
+ *
+ * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
+ * with descriptors for schema and format instead. Descriptors allow for
+ * implementation-agnostic definition of tables. See also
+ * {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
*/
-@PublicEvolving
+@Deprecated
public class Kafka010JsonTableSource extends KafkaJsonTableSource {
/**
@@ -43,7 +48,9 @@ public class Kafka010JsonTableSource extends KafkaJsonTableSource {
* @param properties Properties for the Kafka consumer.
* @param tableSchema The schema of the table.
* @param jsonSchema The schema of the JSON messages to decode from Kafka.
+ * @deprecated Use table descriptors instead of implementation-specific builders.
*/
+ @Deprecated
public Kafka010JsonTableSource(
String topic,
Properties properties,
@@ -58,7 +65,9 @@ public class Kafka010JsonTableSource extends KafkaJsonTableSource {
* TableSource will fail for missing fields if set to true. If set to false, the missing field is set to null.
*
* @param failOnMissingField Flag that specifies the TableSource behavior in case of missing fields.
+ * @deprecated Use table descriptors instead of implementation-specific builders.
*/
+ @Deprecated
@Override
public void setFailOnMissingField(boolean failOnMissingField) {
super.setFailOnMissingField(failOnMissingField);
@@ -68,7 +77,9 @@ public class Kafka010JsonTableSource extends KafkaJsonTableSource {
* Sets the mapping from table schema fields to JSON schema fields.
*
* @param fieldMapping The mapping from table schema fields to JSON schema fields.
+ * @deprecated Use table descriptors instead of implementation-specific builders.
*/
+ @Deprecated
@Override
public void setFieldMapping(Map<String, String> fieldMapping) {
super.setFieldMapping(fieldMapping);
@@ -78,7 +89,9 @@ public class Kafka010JsonTableSource extends KafkaJsonTableSource {
* Declares a field of the schema to be a processing time attribute.
*
* @param proctimeAttribute The name of the field that becomes the processing time field.
+ * @deprecated Use table descriptors instead of implementation-specific builders.
*/
+ @Deprecated
@Override
public void setProctimeAttribute(String proctimeAttribute) {
super.setProctimeAttribute(proctimeAttribute);
@@ -88,7 +101,9 @@ public class Kafka010JsonTableSource extends KafkaJsonTableSource {
* Declares a field of the schema to be a rowtime attribute.
*
* @param rowtimeAttributeDescriptor The descriptor of the rowtime attribute.
+ * @deprecated Use table descriptors instead of implementation-specific builders.
*/
+ @Deprecated
public void setRowtimeAttributeDescriptor(RowtimeAttributeDescriptor rowtimeAttributeDescriptor) {
Preconditions.checkNotNull(rowtimeAttributeDescriptor, "Rowtime attribute descriptor must not be null.");
super.setRowtimeAttributeDescriptors(Collections.singletonList(rowtimeAttributeDescriptor));
@@ -101,15 +116,27 @@ public class Kafka010JsonTableSource extends KafkaJsonTableSource {
/**
* Returns a builder to configure and create a {@link Kafka010JsonTableSource}.
+ *
* @return A builder to configure and create a {@link Kafka010JsonTableSource}.
+ * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
+ * with descriptors for schema and format instead. Descriptors allow for
+ * implementation-agnostic definition of tables. See also
+ * {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
*/
+ @Deprecated
public static Kafka010JsonTableSource.Builder builder() {
return new Kafka010JsonTableSource.Builder();
}
/**
* A builder to configure and create a {@link Kafka010JsonTableSource}.
+ *
+ * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
+ * with descriptors for schema and format instead. Descriptors allow for
+ * implementation-agnostic definition of tables. See also
+ * {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
*/
+ @Deprecated
public static class Builder extends KafkaJsonTableSource.Builder<Kafka010JsonTableSource, Kafka010JsonTableSource.Builder> {
@Override
@@ -126,6 +153,7 @@ public class Kafka010JsonTableSource extends KafkaJsonTableSource {
* Builds and configures a {@link Kafka010JsonTableSource}.
*
* @return A configured {@link Kafka010JsonTableSource}.
+ * @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Override
public Kafka010JsonTableSource build() {
http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceFactory.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceFactory.java
deleted file mode 100644
index 6c128b0..0000000
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceFactory.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_010;
-
-/**
- * Factory for creating configured instances of {@link Kafka010JsonTableSource}.
- */
-public class Kafka010JsonTableSourceFactory extends KafkaJsonTableSourceFactory {
-
- @Override
- protected KafkaJsonTableSource.Builder createKafkaJsonBuilder() {
- return new Kafka010JsonTableSource.Builder();
- }
-
- @Override
- protected String kafkaVersion() {
- return CONNECTOR_VERSION_VALUE_010;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
index 5a02227..f657462 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
@@ -18,48 +18,79 @@
package org.apache.flink.streaming.connectors.kafka;
-import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.types.Row;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
import java.util.Properties;
/**
* Kafka {@link StreamTableSource} for Kafka 0.10.
*/
-@PublicEvolving
-public abstract class Kafka010TableSource extends KafkaTableSource {
-
- // The deserialization schema for the Kafka records
- private final DeserializationSchema<Row> deserializationSchema;
+@Internal
+public class Kafka010TableSource extends KafkaTableSource {
/**
* Creates a Kafka 0.10 {@link StreamTableSource}.
*
- * @param topic Kafka topic to consume.
- * @param properties Properties for the Kafka consumer.
- * @param deserializationSchema Deserialization schema to use for Kafka records.
- * @param typeInfo Type information describing the result type. The field names are used
- * to parse the JSON file and so are the types.
+ * @param schema Schema of the produced table.
+ * @param proctimeAttribute Field name of the processing time attribute.
+ * @param rowtimeAttributeDescriptors Descriptor for a rowtime attribute
+ * @param fieldMapping Mapping for the fields of the table schema to
+ * fields of the physical returned type.
+ * @param topic Kafka topic to consume.
+ * @param properties Properties for the Kafka consumer.
+ * @param deserializationSchema Deserialization schema for decoding records from Kafka.
+ * @param startupMode Startup mode for the contained consumer.
+ * @param specificStartupOffsets Specific startup offsets; only relevant when startup
+ * mode is {@link StartupMode#SPECIFIC_OFFSETS}.
*/
public Kafka010TableSource(
+ TableSchema schema,
+ Optional<String> proctimeAttribute,
+ List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
+ Optional<Map<String, String>> fieldMapping,
String topic,
Properties properties,
DeserializationSchema<Row> deserializationSchema,
- TableSchema schema,
- TypeInformation<Row> typeInfo) {
+ StartupMode startupMode,
+ Map<KafkaTopicPartition, Long> specificStartupOffsets) {
- super(topic, properties, schema, typeInfo);
-
- this.deserializationSchema = deserializationSchema;
+ super(
+ schema,
+ proctimeAttribute,
+ rowtimeAttributeDescriptors,
+ fieldMapping,
+ topic,
+ properties,
+ deserializationSchema,
+ startupMode,
+ specificStartupOffsets);
}
- @Override
- public DeserializationSchema<Row> getDeserializationSchema() {
- return this.deserializationSchema;
+ /**
+ * Creates a Kafka 0.10 {@link StreamTableSource}.
+ *
+ * @param schema Schema of the produced table.
+ * @param topic Kafka topic to consume.
+ * @param properties Properties for the Kafka consumer.
+ * @param deserializationSchema Deserialization schema for decoding records from Kafka.
+ */
+ public Kafka010TableSource(
+ TableSchema schema,
+ String topic,
+ Properties properties,
+ DeserializationSchema<Row> deserializationSchema) {
+
+ super(schema, topic, properties, deserializationSchema);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceFactory.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceFactory.java
new file mode 100644
index 0000000..4a86016
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceFactory.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.KafkaValidator;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
+import org.apache.flink.types.Row;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Factory for creating configured instances of {@link Kafka010TableSource}.
+ */
+public class Kafka010TableSourceFactory extends KafkaTableSourceFactory {
+
+ @Override
+ protected String kafkaVersion() {
+ return KafkaValidator.CONNECTOR_VERSION_VALUE_010;
+ }
+
+ @Override
+ protected boolean supportsKafkaTimestamps() {
+ return true;
+ }
+
+ @Override
+ protected KafkaTableSource createKafkaTableSource(
+ TableSchema schema,
+ Optional<String> proctimeAttribute,
+ List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
+ Map<String, String> fieldMapping,
+ String topic,
+ Properties properties,
+ DeserializationSchema<Row> deserializationSchema,
+ StartupMode startupMode,
+ Map<KafkaTopicPartition, Long> specificStartupOffsets) {
+
+ return new Kafka010TableSource(
+ schema,
+ proctimeAttribute,
+ rowtimeAttributeDescriptors,
+ Optional.of(fieldMapping),
+ topic,
+ properties,
+ deserializationSchema,
+ startupMode,
+ specificStartupOffsets);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-connectors/flink-connector-kafka-0.10/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory b/flink-connectors/flink-connector-kafka-0.10/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
index cf10939..21f5707 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
@@ -13,5 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-org.apache.flink.streaming.connectors.kafka.Kafka010JsonTableSourceFactory
-org.apache.flink.streaming.connectors.kafka.Kafka010AvroTableSourceFactory
+org.apache.flink.streaming.connectors.kafka.Kafka010TableSourceFactory
http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceFactoryTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceFactoryTest.java
deleted file mode 100644
index d78bfbd..0000000
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceFactoryTest.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_010;
-
-/**
- * Tests for {@link Kafka010AvroTableSourceFactory}.
- */
-public class Kafka010AvroTableSourceFactoryTest extends KafkaAvroTableSourceFactoryTestBase {
-
- @Override
- protected String version() {
- return CONNECTOR_VERSION_VALUE_010;
- }
-
- @Override
- protected KafkaAvroTableSource.Builder builder() {
- return Kafka010AvroTableSource.builder();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceTest.java
index f5f8af1..bf253c4 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceTest.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceTest.java
@@ -24,7 +24,11 @@ import org.apache.flink.types.Row;
/**
* Tests for the {@link Kafka010AvroTableSource}.
+ *
+ * @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we
+ * drop support for format-specific table sources.
*/
+@Deprecated
public class Kafka010AvroTableSourceTest extends KafkaAvroTableSourceTestBase {
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceFactoryTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceFactoryTest.java
index 22cf659..ab83a01 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceFactoryTest.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceFactoryTest.java
@@ -21,8 +21,12 @@ package org.apache.flink.streaming.connectors.kafka;
import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_010;
/**
- * Tests for {@link Kafka010JsonTableSourceFactory}.
+ * Tests for legacy Kafka010JsonTableSourceFactory.
+ *
+ * @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we
+ * drop support for format-specific table sources.
*/
+@Deprecated
public class Kafka010JsonTableSourceFactoryTest extends KafkaJsonTableSourceFactoryTestBase {
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceTest.java
index f9d4d84..087f3ed 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceTest.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceTest.java
@@ -24,7 +24,11 @@ import org.apache.flink.types.Row;
/**
* Tests for the {@link Kafka010JsonTableSource}.
+ *
+ * @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we
+ * drop support for format-specific table sources.
*/
+@Deprecated
public class Kafka010JsonTableSourceTest extends KafkaJsonTableSourceTestBase {
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceFactoryTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceFactoryTest.java
new file mode 100644
index 0000000..ff3b0b0
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceFactoryTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.KafkaValidator;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
+import org.apache.flink.types.Row;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Test for {@link Kafka010TableSource} created by {@link Kafka010TableSourceFactory}.
+ */
+public class Kafka010TableSourceFactoryTest extends KafkaTableSourceFactoryTestBase {
+
+ @Override
+ protected String getKafkaVersion() {
+ return KafkaValidator.CONNECTOR_VERSION_VALUE_010;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ protected Class<FlinkKafkaConsumerBase<Row>> getExpectedFlinkKafkaConsumer() {
+ return (Class) FlinkKafkaConsumer010.class;
+ }
+
+ @Override
+ protected KafkaTableSource getExpectedKafkaTableSource(
+ TableSchema schema,
+ Optional<String> proctimeAttribute,
+ List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
+ Map<String, String> fieldMapping,
+ String topic,
+ Properties properties,
+ DeserializationSchema<Row> deserializationSchema,
+ StartupMode startupMode,
+ Map<KafkaTopicPartition, Long> specificStartupOffsets) {
+
+ return new Kafka010TableSource(
+ schema,
+ proctimeAttribute,
+ rowtimeAttributeDescriptors,
+ Optional.of(fieldMapping),
+ topic,
+ properties,
+ deserializationSchema,
+ startupMode,
+ specificStartupOffsets
+ );
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSource.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSource.java
index f4484f6..b3f4e0a 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSource.java
@@ -18,9 +18,9 @@
package org.apache.flink.streaming.connectors.kafka;
-import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.ConnectorDescriptor;
import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.types.Row;
@@ -35,8 +35,13 @@ import java.util.Properties;
/**
* Kafka {@link StreamTableSource} for Kafka 0.11.
+ *
+ * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
+ * with descriptors for schema and format instead. Descriptors allow for
+ * implementation-agnostic definition of tables. See also
+ * {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
*/
-@PublicEvolving
+@Deprecated
public class Kafka011AvroTableSource extends KafkaAvroTableSource {
/**
@@ -46,7 +51,9 @@ public class Kafka011AvroTableSource extends KafkaAvroTableSource {
* @param properties Properties for the Kafka consumer.
* @param schema Schema of the produced table.
* @param record Avro specific record.
+ * @deprecated Use table descriptors instead of implementation-specific builders.
*/
+ @Deprecated
public Kafka011AvroTableSource(
String topic,
Properties properties,
@@ -69,7 +76,9 @@ public class Kafka011AvroTableSource extends KafkaAvroTableSource {
* the value to the field of the Avro record.</p>
*
* @param fieldMapping A mapping from schema fields to Avro fields.
+ * @deprecated Use table descriptors instead of implementation-specific builders.
*/
+ @Deprecated
@Override
public void setFieldMapping(Map<String, String> fieldMapping) {
super.setFieldMapping(fieldMapping);
@@ -79,7 +88,9 @@ public class Kafka011AvroTableSource extends KafkaAvroTableSource {
* Declares a field of the schema to be a processing time attribute.
*
* @param proctimeAttribute The name of the field that becomes the processing time field.
+ * @deprecated Use table descriptors instead of implementation-specific builders.
*/
+ @Deprecated
@Override
public void setProctimeAttribute(String proctimeAttribute) {
super.setProctimeAttribute(proctimeAttribute);
@@ -89,7 +100,9 @@ public class Kafka011AvroTableSource extends KafkaAvroTableSource {
* Declares a field of the schema to be a rowtime attribute.
*
* @param rowtimeAttributeDescriptor The descriptor of the rowtime attribute.
+ * @deprecated Use table descriptors instead of implementation-specific builders.
*/
+ @Deprecated
public void setRowtimeAttributeDescriptor(RowtimeAttributeDescriptor rowtimeAttributeDescriptor) {
Preconditions.checkNotNull(rowtimeAttributeDescriptor, "Rowtime attribute descriptor must not be null.");
super.setRowtimeAttributeDescriptors(Collections.singletonList(rowtimeAttributeDescriptor));
@@ -103,14 +116,25 @@ public class Kafka011AvroTableSource extends KafkaAvroTableSource {
/**
* Returns a builder to configure and create a {@link Kafka011AvroTableSource}.
* @return A builder to configure and create a {@link Kafka011AvroTableSource}.
+ * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
+ * with descriptors for schema and format instead. Descriptors allow for
+ * implementation-agnostic definition of tables. See also
+ * {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
*/
+ @Deprecated
public static Builder builder() {
return new Builder();
}
/**
* A builder to configure and create a {@link Kafka011AvroTableSource}.
+ *
+ * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
+ * with descriptors for schema and format instead. Descriptors allow for
+ * implementation-agnostic definition of tables. See also
+ * {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
*/
+ @Deprecated
public static class Builder extends KafkaAvroTableSource.Builder<Kafka011AvroTableSource, Kafka011AvroTableSource.Builder> {
@Override
@@ -127,7 +151,9 @@ public class Kafka011AvroTableSource extends KafkaAvroTableSource {
* Builds and configures a {@link Kafka011AvroTableSource}.
*
* @return A configured {@link Kafka011AvroTableSource}.
+ * @deprecated Use table descriptors instead of implementation-specific builders.
*/
+ @Deprecated
@Override
public Kafka011AvroTableSource build() {
Kafka011AvroTableSource tableSource = new Kafka011AvroTableSource(
http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceFactory.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceFactory.java
deleted file mode 100644
index a959983..0000000
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceFactory.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_011;
-
-/**
- * Factory for creating configured instances of {@link Kafka011AvroTableSource}.
- */
-public class Kafka011AvroTableSourceFactory extends KafkaAvroTableSourceFactory {
-
- @Override
- protected KafkaAvroTableSource.Builder createKafkaAvroBuilder() {
- return new Kafka011AvroTableSource.Builder();
- }
-
- @Override
- protected String kafkaVersion() {
- return CONNECTOR_VERSION_VALUE_011;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSource.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSource.java
index a012f5d..74c5007 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSource.java
@@ -18,9 +18,9 @@
package org.apache.flink.streaming.connectors.kafka;
-import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.ConnectorDescriptor;
import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.types.Row;
@@ -32,8 +32,13 @@ import java.util.Properties;
/**
* Kafka {@link StreamTableSource} for Kafka 0.11.
+ *
+ * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
+ * with descriptors for schema and format instead. Descriptors allow for
+ * implementation-agnostic definition of tables. See also
+ * {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
*/
-@PublicEvolving
+@Deprecated
public class Kafka011JsonTableSource extends KafkaJsonTableSource {
/**
@@ -43,7 +48,9 @@ public class Kafka011JsonTableSource extends KafkaJsonTableSource {
* @param properties Properties for the Kafka consumer.
* @param tableSchema The schema of the table.
* @param jsonSchema The schema of the JSON messages to decode from Kafka.
+ * @deprecated Use table descriptors instead of implementation-specific builders.
*/
+ @Deprecated
public Kafka011JsonTableSource(
String topic,
Properties properties,
@@ -58,7 +65,9 @@ public class Kafka011JsonTableSource extends KafkaJsonTableSource {
* TableSource will fail for missing fields if set to true. If set to false, the missing field is set to null.
*
* @param failOnMissingField Flag that specifies the TableSource behavior in case of missing fields.
+ * @deprecated Use table descriptors instead of implementation-specific builders.
*/
+ @Deprecated
@Override
public void setFailOnMissingField(boolean failOnMissingField) {
super.setFailOnMissingField(failOnMissingField);
@@ -68,7 +77,9 @@ public class Kafka011JsonTableSource extends KafkaJsonTableSource {
* Sets the mapping from table schema fields to JSON schema fields.
*
* @param fieldMapping The mapping from table schema fields to JSON schema fields.
+ * @deprecated Use table descriptors instead of implementation-specific builders.
*/
+ @Deprecated
@Override
public void setFieldMapping(Map<String, String> fieldMapping) {
super.setFieldMapping(fieldMapping);
@@ -78,7 +89,9 @@ public class Kafka011JsonTableSource extends KafkaJsonTableSource {
* Declares a field of the schema to be a processing time attribute.
*
* @param proctimeAttribute The name of the field that becomes the processing time field.
+ * @deprecated Use table descriptors instead of implementation-specific builders.
*/
+ @Deprecated
@Override
public void setProctimeAttribute(String proctimeAttribute) {
super.setProctimeAttribute(proctimeAttribute);
@@ -88,7 +101,9 @@ public class Kafka011JsonTableSource extends KafkaJsonTableSource {
* Declares a field of the schema to be a rowtime attribute.
*
* @param rowtimeAttributeDescriptor The descriptor of the rowtime attribute.
+ * @deprecated Use table descriptors instead of implementation-specific builders.
*/
+ @Deprecated
public void setRowtimeAttributeDescriptor(RowtimeAttributeDescriptor rowtimeAttributeDescriptor) {
Preconditions.checkNotNull(rowtimeAttributeDescriptor, "Rowtime attribute descriptor must not be null.");
super.setRowtimeAttributeDescriptors(Collections.singletonList(rowtimeAttributeDescriptor));
@@ -101,15 +116,27 @@ public class Kafka011JsonTableSource extends KafkaJsonTableSource {
/**
* Returns a builder to configure and create a {@link Kafka011JsonTableSource}.
+ *
* @return A builder to configure and create a {@link Kafka011JsonTableSource}.
+ * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
+ * with descriptors for schema and format instead. Descriptors allow for
+ * implementation-agnostic definition of tables. See also
+ * {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
*/
+ @Deprecated
public static Kafka011JsonTableSource.Builder builder() {
return new Kafka011JsonTableSource.Builder();
}
/**
* A builder to configure and create a {@link Kafka011JsonTableSource}.
+ *
+ * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
+ * with descriptors for schema and format instead. Descriptors allow for
+ * implementation-agnostic definition of tables. See also
+ * {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
*/
+ @Deprecated
public static class Builder extends KafkaJsonTableSource.Builder<Kafka011JsonTableSource, Kafka011JsonTableSource.Builder> {
@Override
@@ -126,7 +153,9 @@ public class Kafka011JsonTableSource extends KafkaJsonTableSource {
* Builds and configures a {@link Kafka011JsonTableSource}.
*
* @return A configured {@link Kafka011JsonTableSource}.
+ * @deprecated Use table descriptors instead of implementation-specific builders.
*/
+ @Deprecated
@Override
public Kafka011JsonTableSource build() {
Kafka011JsonTableSource tableSource = new Kafka011JsonTableSource(
http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceFactory.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceFactory.java
deleted file mode 100644
index 53bf7be..0000000
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceFactory.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_011;
-
-/**
- * Factory for creating configured instances of {@link Kafka011JsonTableSource}.
- */
-public class Kafka011JsonTableSourceFactory extends KafkaJsonTableSourceFactory {
-
- @Override
- protected KafkaJsonTableSource.Builder createKafkaJsonBuilder() {
- return new Kafka011JsonTableSource.Builder();
- }
-
- @Override
- protected String kafkaVersion() {
- return CONNECTOR_VERSION_VALUE_011;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java
index 6c9c37d..c1d8820 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java
@@ -18,53 +18,83 @@
package org.apache.flink.streaming.connectors.kafka;
-import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.types.Row;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
import java.util.Properties;
/**
* Kafka {@link StreamTableSource} for Kafka 0.11.
*/
-@PublicEvolving
-public abstract class Kafka011TableSource extends KafkaTableSource {
+@Internal
+public class Kafka011TableSource extends KafkaTableSource {
- // The deserialization schema for the Kafka records
- private final DeserializationSchema<Row> deserializationSchema;
+ /**
+ * Creates a Kafka 0.11 {@link StreamTableSource}.
+ *
+ * @param schema Schema of the produced table.
+ * @param proctimeAttribute Field name of the processing time attribute.
+ * @param rowtimeAttributeDescriptors Descriptor for a rowtime attribute
+ * @param fieldMapping Mapping for the fields of the table schema to
+ * fields of the physical returned type.
+ * @param topic Kafka topic to consume.
+ * @param properties Properties for the Kafka consumer.
+ * @param deserializationSchema Deserialization schema for decoding records from Kafka.
+ * @param startupMode Startup mode for the contained consumer.
+ * @param specificStartupOffsets Specific startup offsets; only relevant when startup
+ * mode is {@link StartupMode#SPECIFIC_OFFSETS}.
+ */
+ public Kafka011TableSource(
+ TableSchema schema,
+ Optional<String> proctimeAttribute,
+ List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
+ Optional<Map<String, String>> fieldMapping,
+ String topic, Properties properties,
+ DeserializationSchema<Row> deserializationSchema,
+ StartupMode startupMode,
+ Map<KafkaTopicPartition, Long> specificStartupOffsets) {
+
+ super(
+ schema,
+ proctimeAttribute,
+ rowtimeAttributeDescriptors,
+ fieldMapping,
+ topic,
+ properties,
+ deserializationSchema,
+ startupMode,
+ specificStartupOffsets);
+ }
/**
* Creates a Kafka 0.11 {@link StreamTableSource}.
*
+ * @param schema Schema of the produced table.
* @param topic Kafka topic to consume.
* @param properties Properties for the Kafka consumer.
- * @param deserializationSchema Deserialization schema to use for Kafka records.
- * @param typeInfo Type information describing the result type. The field names are used
- * to parse the JSON file and so are the types.
+ * @param deserializationSchema Deserialization schema for decoding records from Kafka.
*/
public Kafka011TableSource(
+ TableSchema schema,
String topic,
Properties properties,
- DeserializationSchema<Row> deserializationSchema,
- TableSchema schema,
- TypeInformation<Row> typeInfo) {
-
- super(topic, properties, schema, typeInfo);
+ DeserializationSchema<Row> deserializationSchema) {
- this.deserializationSchema = deserializationSchema;
- }
-
- @Override
- public DeserializationSchema<Row> getDeserializationSchema() {
- return this.deserializationSchema;
+ super(schema, topic, properties, deserializationSchema);
}
@Override
protected FlinkKafkaConsumerBase<Row> createKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
return new FlinkKafkaConsumer011<>(topic, deserializationSchema, properties);
}
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceFactory.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceFactory.java
new file mode 100644
index 0000000..b1e3929
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceFactory.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.KafkaValidator;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
+import org.apache.flink.types.Row;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Factory for creating configured instances of {@link Kafka011TableSource}.
+ */
+public class Kafka011TableSourceFactory extends KafkaTableSourceFactory {
+
+ @Override
+ protected String kafkaVersion() {
+ return KafkaValidator.CONNECTOR_VERSION_VALUE_011;
+ }
+
+ @Override
+ protected boolean supportsKafkaTimestamps() {
+ return true;
+ }
+
+ @Override
+ protected KafkaTableSource createKafkaTableSource(
+ TableSchema schema,
+ Optional<String> proctimeAttribute,
+ List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
+ Map<String, String> fieldMapping,
+ String topic,
+ Properties properties,
+ DeserializationSchema<Row> deserializationSchema,
+ StartupMode startupMode,
+ Map<KafkaTopicPartition, Long> specificStartupOffsets) {
+
+ return new Kafka011TableSource(
+ schema,
+ proctimeAttribute,
+ rowtimeAttributeDescriptors,
+ Optional.of(fieldMapping),
+ topic,
+ properties,
+ deserializationSchema,
+ startupMode,
+ specificStartupOffsets);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-connectors/flink-connector-kafka-0.11/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory b/flink-connectors/flink-connector-kafka-0.11/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
index f6825ad..c056097 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
@@ -13,5 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-org.apache.flink.streaming.connectors.kafka.Kafka011JsonTableSourceFactory
-org.apache.flink.streaming.connectors.kafka.Kafka011AvroTableSourceFactory
+org.apache.flink.streaming.connectors.kafka.Kafka011TableSourceFactory
http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceFactoryTest.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceFactoryTest.java
deleted file mode 100644
index 787a525..0000000
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceFactoryTest.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_011;
-
-/**
- * Tests for {@link Kafka011AvroTableSourceFactory}.
- */
-public class Kafka011AvroTableSourceFactoryTest extends KafkaAvroTableSourceFactoryTestBase {
-
- @Override
- protected String version() {
- return CONNECTOR_VERSION_VALUE_011;
- }
-
- @Override
- protected KafkaAvroTableSource.Builder builder() {
- return Kafka011AvroTableSource.builder();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceTest.java
index e348aa6..aa083a2 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceTest.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceTest.java
@@ -24,7 +24,11 @@ import org.apache.flink.types.Row;
/**
* Tests for the {@link Kafka011AvroTableSource}.
+ *
+ * @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we
+ * drop support for format-specific table sources.
*/
+@Deprecated
public class Kafka011AvroTableSourceTest extends KafkaAvroTableSourceTestBase {
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceFactoryTest.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceFactoryTest.java
index ed92863..ae46dac 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceFactoryTest.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceFactoryTest.java
@@ -21,8 +21,12 @@ package org.apache.flink.streaming.connectors.kafka;
import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_011;
/**
- * Tests for {@link Kafka011JsonTableSourceFactory}.
+ * Tests for legacy Kafka011JsonTableSourceFactory.
+ *
+ * @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we
+ * drop support for format-specific table sources.
*/
+@Deprecated
public class Kafka011JsonTableSourceFactoryTest extends KafkaJsonTableSourceFactoryTestBase {
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceTest.java
index b35d1c5..451795a 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceTest.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceTest.java
@@ -24,7 +24,11 @@ import org.apache.flink.types.Row;
/**
* Tests for the {@link Kafka011JsonTableSource}.
+ *
+ * @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we
+ * drop support for format-specific table sources.
*/
+@Deprecated
public class Kafka011JsonTableSourceTest extends KafkaJsonTableSourceTestBase {
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceFactoryTest.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceFactoryTest.java
new file mode 100644
index 0000000..abaa490
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceFactoryTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.KafkaValidator;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
+import org.apache.flink.types.Row;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Test for {@link Kafka011TableSource} created by {@link Kafka011TableSourceFactory}.
+ */
+public class Kafka011TableSourceFactoryTest extends KafkaTableSourceFactoryTestBase {
+
+ @Override
+ protected String getKafkaVersion() {
+ return KafkaValidator.CONNECTOR_VERSION_VALUE_011;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ protected Class<FlinkKafkaConsumerBase<Row>> getExpectedFlinkKafkaConsumer() {
+ return (Class) FlinkKafkaConsumer011.class;
+ }
+
+ @Override
+ protected KafkaTableSource getExpectedKafkaTableSource(
+ TableSchema schema,
+ Optional<String> proctimeAttribute,
+ List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
+ Map<String, String> fieldMapping,
+ String topic,
+ Properties properties,
+ DeserializationSchema<Row> deserializationSchema,
+ StartupMode startupMode,
+ Map<KafkaTopicPartition, Long> specificStartupOffsets) {
+
+ return new Kafka011TableSource(
+ schema,
+ proctimeAttribute,
+ rowtimeAttributeDescriptors,
+ Optional.of(fieldMapping),
+ topic,
+ properties,
+ deserializationSchema,
+ startupMode,
+ specificStartupOffsets
+ );
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java
index 3c16722..8206287 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java
@@ -18,9 +18,9 @@
package org.apache.flink.streaming.connectors.kafka;
-import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.ConnectorDescriptor;
import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.types.Row;
@@ -35,8 +35,13 @@ import java.util.Properties;
/**
* Kafka {@link StreamTableSource} for Kafka 0.8.
+ *
+ * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
+ * with descriptors for schema and format instead. Descriptors allow for
+ * implementation-agnostic definition of tables. See also
+ * {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
*/
-@PublicEvolving
+@Deprecated
public class Kafka08AvroTableSource extends KafkaAvroTableSource {
/**
@@ -46,7 +51,9 @@ public class Kafka08AvroTableSource extends KafkaAvroTableSource {
* @param properties Properties for the Kafka consumer.
* @param schema Schema of the produced table.
* @param record Avro specific record.
+ * @deprecated Use table descriptors instead of implementation-specific builders.
*/
+ @Deprecated
public Kafka08AvroTableSource(
String topic,
Properties properties,
@@ -69,7 +76,9 @@ public class Kafka08AvroTableSource extends KafkaAvroTableSource {
* the value to the field of the Avro record.</p>
*
* @param fieldMapping A mapping from schema fields to Avro fields.
+ * @deprecated Use table descriptors instead of implementation-specific builders.
*/
+ @Deprecated
@Override
public void setFieldMapping(Map<String, String> fieldMapping) {
super.setFieldMapping(fieldMapping);
@@ -79,7 +88,9 @@ public class Kafka08AvroTableSource extends KafkaAvroTableSource {
* Declares a field of the schema to be a processing time attribute.
*
* @param proctimeAttribute The name of the field that becomes the processing time field.
+ * @deprecated Use table descriptors instead of implementation-specific builders.
*/
+ @Deprecated
@Override
public void setProctimeAttribute(String proctimeAttribute) {
super.setProctimeAttribute(proctimeAttribute);
@@ -89,7 +100,9 @@ public class Kafka08AvroTableSource extends KafkaAvroTableSource {
* Declares a field of the schema to be a rowtime attribute.
*
* @param rowtimeAttributeDescriptor The descriptor of the rowtime attribute.
+ * @deprecated Use table descriptors instead of implementation-specific builders.
*/
+ @Deprecated
public void setRowtimeAttributeDescriptor(RowtimeAttributeDescriptor rowtimeAttributeDescriptor) {
Preconditions.checkNotNull(rowtimeAttributeDescriptor, "Rowtime attribute descriptor must not be null.");
super.setRowtimeAttributeDescriptors(Collections.singletonList(rowtimeAttributeDescriptor));
@@ -102,15 +115,27 @@ public class Kafka08AvroTableSource extends KafkaAvroTableSource {
/**
* Returns a builder to configure and create a {@link Kafka08AvroTableSource}.
+ *
* @return A builder to configure and create a {@link Kafka08AvroTableSource}.
+ * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
+ * with descriptors for schema and format instead. Descriptors allow for
+ * implementation-agnostic definition of tables. See also
+ * {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
*/
+ @Deprecated
public static Builder builder() {
return new Builder();
}
/**
* A builder to configure and create a {@link Kafka08AvroTableSource}.
+ *
+ * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
+ * with descriptors for schema and format instead. Descriptors allow for
+ * implementation-agnostic definition of tables. See also
+ * {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
*/
+ @Deprecated
public static class Builder extends KafkaAvroTableSource.Builder<Kafka08AvroTableSource, Kafka08AvroTableSource.Builder> {
@Override
@@ -127,7 +152,9 @@ public class Kafka08AvroTableSource extends KafkaAvroTableSource {
* Builds and configures a {@link Kafka08AvroTableSource}.
*
* @return A configured {@link Kafka08AvroTableSource}.
+ * @deprecated Use table descriptors instead of implementation-specific builders.
*/
+ @Deprecated
@Override
public Kafka08AvroTableSource build() {
Kafka08AvroTableSource tableSource = new Kafka08AvroTableSource(
http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSourceFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSourceFactory.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSourceFactory.java
deleted file mode 100644
index aefc4db..0000000
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSourceFactory.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_08;
-
-/**
- * Factory for creating configured instances of {@link Kafka08AvroTableSource}.
- */
-public class Kafka08AvroTableSourceFactory extends KafkaAvroTableSourceFactory {
-
- @Override
- protected KafkaAvroTableSource.Builder createKafkaAvroBuilder() {
- return new Kafka08AvroTableSource.Builder();
- }
-
- @Override
- protected String kafkaVersion() {
- return CONNECTOR_VERSION_VALUE_08;
- }
-}