You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/04/24 08:27:41 UTC
[1/2] flink git commit: [FLINK-8537] [table] Add a Kafka table source
factory with Avro format support
Repository: flink
Updated Branches:
refs/heads/master cdf4744ba -> 614b1e29e
http://git-wip-us.apache.org/repos/asf/flink/blob/614b1e29/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/AscendingTimestamps.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/AscendingTimestamps.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/AscendingTimestamps.scala
index 9fc7c88..8dbe0e1 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/AscendingTimestamps.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/AscendingTimestamps.scala
@@ -37,4 +37,13 @@ final class AscendingTimestamps extends PeriodicWatermarkAssigner {
}
override def getWatermark: Watermark = new Watermark(maxTimestamp - 1)
+
+ override def equals(obj: Any): Boolean = obj match {
+ case _: AscendingTimestamps => true
+ case _ => false
+ }
+
+ override def hashCode(): Int = {
+ classOf[AscendingTimestamps].hashCode()
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/614b1e29/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala
index dd71bd3..8695c61 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala
@@ -62,7 +62,19 @@ abstract class PunctuatedWatermarkAssigner extends WatermarkStrategy {
}
/** A strategy which indicates the watermarks should be preserved from the underlying datastream.*/
-final class PreserveWatermarks extends WatermarkStrategy
+final class PreserveWatermarks extends WatermarkStrategy {
+
+ override def equals(obj: scala.Any): Boolean = {
+ obj match {
+ case _: PreserveWatermarks => true
+ case _ => false
+ }
+ }
+
+ override def hashCode(): Int = {
+ classOf[PreserveWatermarks].hashCode()
+ }
+}
object PreserveWatermarks {
val INSTANCE: PreserveWatermarks = new PreserveWatermarks
}
http://git-wip-us.apache.org/repos/asf/flink/blob/614b1e29/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/AvroTypesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/AvroTypesITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/AvroTypesITCase.scala
deleted file mode 100644
index 1983554..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/AvroTypesITCase.scala
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.batch
-
-import java.lang.{Boolean => JBoolean, Long => JLong}
-import java.util
-import java.util.Collections
-
-import org.apache.avro.util.Utf8
-import org.apache.flink.api.scala._
-import org.apache.flink.formats.avro.generated.{Address, Colors, Fixed16, User}
-import org.apache.flink.table.api.TableEnvironment
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.runtime.utils.TableProgramsClusterTestBase
-import org.apache.flink.table.runtime.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.TestBaseUtils
-import org.apache.flink.types.Row
-import org.junit.Test
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-
-/**
- * Tests for interoperability with Avro types.
- */
-@RunWith(classOf[Parameterized])
-class AvroTypesITCase(
- mode: TestExecutionMode,
- configMode: TableConfigMode)
- extends TableProgramsClusterTestBase(mode, configMode) {
-
- @Test
- def testAvroToRow(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val t = testData(env).toTable(tEnv)
-
- val result = t.select('*)
-
- val results = result.toDataSet[Row].collect()
- val expected = "black,null,Whatever,[true],[hello],true,0.0,GREEN," +
- "[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],42,{},null,null,null,null\n" +
- "blue,null,Charlie,[],[],false,1.337,RED," +
- "null,1337,{},{\"num\": 42, \"street\": \"Bakerstreet\", \"city\": \"Berlin\", " +
- "\"state\": \"Berlin\", \"zip\": \"12049\"},null,null,null\n" +
- "yellow,null,Terminator,[false],[world],false,0.0,GREEN," +
- "[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],1,{},null,null,null,null"
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testAvroStringAccess(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val t = testData(env).toTable(tEnv)
-
- val result = t.select('name)
- val results = result.toDataSet[Utf8].collect()
- val expected = "Charlie\n" +
- "Terminator\n" +
- "Whatever"
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testAvroObjectAccess(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val t = testData(env).toTable(tEnv)
-
- val result = t
- .filter('type_nested.isNotNull)
- .select('type_nested.flatten()).as('city, 'num, 'state, 'street, 'zip)
-
- val results = result.toDataSet[Address].collect()
- val expected = AvroTypesITCase.USER_1.getTypeNested.toString
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testAvroToAvro(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val t = testData(env).toTable(tEnv)
-
- val result = t.select('*).toDataSet[User].collect()
- val expected = AvroTypesITCase.USER_1 + "\n" +
- AvroTypesITCase.USER_2 + "\n" +
- AvroTypesITCase.USER_3
- TestBaseUtils.compareResultAsText(result.asJava, expected)
- }
-
- private def testData(env: ExecutionEnvironment): DataSet[User] = {
-
- val data = new mutable.MutableList[User]
-
- data.+=(AvroTypesITCase.USER_1)
- data.+=(AvroTypesITCase.USER_2)
- data.+=(AvroTypesITCase.USER_3)
-
- env.fromCollection(data)
- }
-}
-
-object AvroTypesITCase {
-
- val USER_1: User = User.newBuilder()
- .setName("Charlie")
- .setFavoriteColor("blue")
- .setFavoriteNumber(null)
- .setTypeBoolTest(false)
- .setTypeDoubleTest(1.337d)
- .setTypeNullTest(null)
- .setTypeLongTest(1337L)
- .setTypeArrayString(new util.ArrayList[CharSequence])
- .setTypeArrayBoolean(new util.ArrayList[JBoolean]())
- .setTypeNullableArray(null)
- .setTypeEnum(Colors.RED)
- .setTypeMap(new util.HashMap[CharSequence, JLong])
- .setTypeFixed(null)
- .setTypeUnion(null)
- .setTypeNested(
- Address.newBuilder
- .setNum(42)
- .setStreet("Bakerstreet")
- .setCity("Berlin")
- .setState("Berlin")
- .setZip("12049")
- .build)
- .build
-
- val USER_2: User = User.newBuilder()
- .setName("Whatever")
- .setFavoriteNumber(null)
- .setFavoriteColor("black")
- .setTypeLongTest(42L)
- .setTypeDoubleTest(0.0)
- .setTypeNullTest(null)
- .setTypeBoolTest(true)
- .setTypeArrayString(Collections.singletonList("hello"))
- .setTypeArrayBoolean(Collections.singletonList(true))
- .setTypeEnum(Colors.GREEN)
- .setTypeMap(new util.HashMap[CharSequence, JLong])
- .setTypeFixed(new Fixed16())
- .setTypeUnion(null)
- .setTypeNested(null)
- .build()
-
- val USER_3: User = User.newBuilder()
- .setName("Terminator")
- .setFavoriteNumber(null)
- .setFavoriteColor("yellow")
- .setTypeLongTest(1L)
- .setTypeDoubleTest(0.0)
- .setTypeNullTest(null)
- .setTypeBoolTest(false)
- .setTypeArrayString(Collections.singletonList("world"))
- .setTypeArrayBoolean(Collections.singletonList(false))
- .setTypeEnum(Colors.GREEN)
- .setTypeMap(new util.HashMap[CharSequence, JLong])
- .setTypeFixed(new Fixed16())
- .setTypeUnion(null)
- .setTypeNested(null)
- .build()
-}
[2/2] flink git commit: [FLINK-8537] [table] Add a Kafka table source
factory with Avro format support
Posted by tw...@apache.org.
[FLINK-8537] [table] Add a Kafka table source factory with Avro format support
This closes #5610.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/614b1e29
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/614b1e29
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/614b1e29
Branch: refs/heads/master
Commit: 614b1e29eefd37fa22e7c882bc243751c6897b90
Parents: cdf4744
Author: Xingcan Cui <xi...@gmail.com>
Authored: Thu Mar 1 19:21:05 2018 +0800
Committer: Timo Walther <tw...@apache.org>
Committed: Tue Apr 24 10:23:48 2018 +0200
----------------------------------------------------------------------
.../kafka/Kafka010AvroTableSourceFactory.java | 37 +++
.../kafka/Kafka010JsonTableSourceFactory.java | 2 +-
...pache.flink.table.sources.TableSourceFactory | 1 +
.../Kafka010AvroTableSourceFactoryTest.java | 37 +++
.../kafka/Kafka011AvroTableSourceFactory.java | 37 +++
.../kafka/Kafka011JsonTableSourceFactory.java | 2 +-
...pache.flink.table.sources.TableSourceFactory | 1 +
.../Kafka011AvroTableSourceFactoryTest.java | 37 +++
.../kafka/Kafka08AvroTableSourceFactory.java | 37 +++
.../kafka/Kafka08JsonTableSourceFactory.java | 2 +-
...pache.flink.table.sources.TableSourceFactory | 1 +
.../Kafka08AvroTableSourceFactoryTest.java | 37 +++
.../kafka/Kafka09AvroTableSourceFactory.java | 36 +++
.../kafka/Kafka09JsonTableSourceFactory.java | 2 +-
...pache.flink.table.sources.TableSourceFactory | 1 +
.../Kafka09AvroTableSourceFactoryTest.java | 37 +++
.../connectors/kafka/KafkaAvroTableSource.java | 73 ++----
.../kafka/KafkaAvroTableSourceFactory.java | 81 +++++++
.../kafka/KafkaJsonTableSourceFactory.java | 204 +++-------------
.../kafka/KafkaTableSourceFactory.java | 233 +++++++++++++++++++
.../KafkaAvroTableSourceFactoryTestBase.java | 123 ++++++++++
flink-formats/flink-avro/pom.xml | 22 +-
.../typeutils/AvroRecordClassConverter.java | 81 +++++++
.../apache/flink/table/descriptors/Avro.java | 59 +++++
.../flink/table/descriptors/AvroValidator.java | 34 +++
.../flink/table/descriptors/AvroTest.java | 64 +++++
.../table/runtime/batch/AvroTypesITCase.java | 203 ++++++++++++++++
flink-libraries/flink-table/pom.xml | 15 --
.../sources/tsextractors/ExistingField.scala | 2 +-
.../tsextractors/StreamRecordTimestamp.scala | 9 +
.../wmstrategies/AscendingTimestamps.scala | 9 +
.../wmstrategies/watermarkStrategies.scala | 14 +-
.../table/runtime/batch/AvroTypesITCase.scala | 188 ---------------
33 files changed, 1288 insertions(+), 433 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/614b1e29/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceFactory.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceFactory.java
new file mode 100644
index 0000000..3972c3f
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_010;
+
+/**
+ * Factory for creating configured instances of {@link Kafka010AvroTableSource}.
+ */
+public class Kafka010AvroTableSourceFactory extends KafkaAvroTableSourceFactory {
+
+ @Override
+ protected KafkaAvroTableSource.Builder createKafkaAvroBuilder() {
+ return new Kafka010AvroTableSource.Builder();
+ }
+
+ @Override
+ protected String kafkaVersion() {
+ return CONNECTOR_VERSION_VALUE_010;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/614b1e29/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceFactory.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceFactory.java
index c639a44..6c128b0 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceFactory.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceFactory.java
@@ -26,7 +26,7 @@ import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSIO
public class Kafka010JsonTableSourceFactory extends KafkaJsonTableSourceFactory {
@Override
- protected KafkaJsonTableSource.Builder createBuilder() {
+ protected KafkaJsonTableSource.Builder createKafkaJsonBuilder() {
return new Kafka010JsonTableSource.Builder();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/614b1e29/flink-connectors/flink-connector-kafka-0.10/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory b/flink-connectors/flink-connector-kafka-0.10/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
index 9ef54fc..cf10939 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
@@ -14,3 +14,4 @@
# limitations under the License.
org.apache.flink.streaming.connectors.kafka.Kafka010JsonTableSourceFactory
+org.apache.flink.streaming.connectors.kafka.Kafka010AvroTableSourceFactory
http://git-wip-us.apache.org/repos/asf/flink/blob/614b1e29/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceFactoryTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceFactoryTest.java
new file mode 100644
index 0000000..d78bfbd
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceFactoryTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_010;
+
+/**
+ * Tests for {@link Kafka010AvroTableSourceFactory}.
+ */
+public class Kafka010AvroTableSourceFactoryTest extends KafkaAvroTableSourceFactoryTestBase {
+
+ @Override
+ protected String version() {
+ return CONNECTOR_VERSION_VALUE_010;
+ }
+
+ @Override
+ protected KafkaAvroTableSource.Builder builder() {
+ return Kafka010AvroTableSource.builder();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/614b1e29/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceFactory.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceFactory.java
new file mode 100644
index 0000000..a959983
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_011;
+
+/**
+ * Factory for creating configured instances of {@link Kafka011AvroTableSource}.
+ */
+public class Kafka011AvroTableSourceFactory extends KafkaAvroTableSourceFactory {
+
+ @Override
+ protected KafkaAvroTableSource.Builder createKafkaAvroBuilder() {
+ return new Kafka011AvroTableSource.Builder();
+ }
+
+ @Override
+ protected String kafkaVersion() {
+ return CONNECTOR_VERSION_VALUE_011;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/614b1e29/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceFactory.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceFactory.java
index 6745bb2..53bf7be 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceFactory.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceFactory.java
@@ -26,7 +26,7 @@ import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSIO
public class Kafka011JsonTableSourceFactory extends KafkaJsonTableSourceFactory {
@Override
- protected KafkaJsonTableSource.Builder createBuilder() {
+ protected KafkaJsonTableSource.Builder createKafkaJsonBuilder() {
return new Kafka011JsonTableSource.Builder();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/614b1e29/flink-connectors/flink-connector-kafka-0.11/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory b/flink-connectors/flink-connector-kafka-0.11/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
index 75135e5..f6825ad 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
@@ -14,3 +14,4 @@
# limitations under the License.
org.apache.flink.streaming.connectors.kafka.Kafka011JsonTableSourceFactory
+org.apache.flink.streaming.connectors.kafka.Kafka011AvroTableSourceFactory
http://git-wip-us.apache.org/repos/asf/flink/blob/614b1e29/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceFactoryTest.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceFactoryTest.java
new file mode 100644
index 0000000..787a525
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceFactoryTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_011;
+
+/**
+ * Tests for {@link Kafka011AvroTableSourceFactory}.
+ */
+public class Kafka011AvroTableSourceFactoryTest extends KafkaAvroTableSourceFactoryTestBase {
+
+ @Override
+ protected String version() {
+ return CONNECTOR_VERSION_VALUE_011;
+ }
+
+ @Override
+ protected KafkaAvroTableSource.Builder builder() {
+ return Kafka011AvroTableSource.builder();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/614b1e29/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSourceFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSourceFactory.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSourceFactory.java
new file mode 100644
index 0000000..aefc4db
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSourceFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_08;
+
+/**
+ * Factory for creating configured instances of {@link Kafka08AvroTableSource}.
+ */
+public class Kafka08AvroTableSourceFactory extends KafkaAvroTableSourceFactory {
+
+ @Override
+ protected KafkaAvroTableSource.Builder createKafkaAvroBuilder() {
+ return new Kafka08AvroTableSource.Builder();
+ }
+
+ @Override
+ protected String kafkaVersion() {
+ return CONNECTOR_VERSION_VALUE_08;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/614b1e29/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceFactory.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceFactory.java
index 2da805a..13e0d57 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceFactory.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceFactory.java
@@ -26,7 +26,7 @@ import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSIO
public class Kafka08JsonTableSourceFactory extends KafkaJsonTableSourceFactory {
@Override
- protected KafkaJsonTableSource.Builder createBuilder() {
+ protected KafkaJsonTableSource.Builder createKafkaJsonBuilder() {
return new Kafka08JsonTableSource.Builder();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/614b1e29/flink-connectors/flink-connector-kafka-0.8/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory b/flink-connectors/flink-connector-kafka-0.8/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
index 9092955..7c45718 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
@@ -14,3 +14,4 @@
# limitations under the License.
org.apache.flink.streaming.connectors.kafka.Kafka08JsonTableSourceFactory
+org.apache.flink.streaming.connectors.kafka.Kafka08AvroTableSourceFactory
http://git-wip-us.apache.org/repos/asf/flink/blob/614b1e29/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSourceFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSourceFactoryTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSourceFactoryTest.java
new file mode 100644
index 0000000..f367636
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSourceFactoryTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_08;
+
+/**
+ * Tests for {@link Kafka08AvroTableSourceFactory}.
+ */
+public class Kafka08AvroTableSourceFactoryTest extends KafkaAvroTableSourceFactoryTestBase {
+
+ @Override
+ protected String version() {
+ return CONNECTOR_VERSION_VALUE_08;
+ }
+
+ @Override
+ protected KafkaAvroTableSource.Builder builder() {
+ return Kafka08AvroTableSource.builder();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/614b1e29/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSourceFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSourceFactory.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSourceFactory.java
new file mode 100644
index 0000000..0cbe2bf
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSourceFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_09;
+
+/**
+ * Factory for creating configured instances of {@link Kafka09AvroTableSource}.
+ */
+public class Kafka09AvroTableSourceFactory extends KafkaAvroTableSourceFactory {
+ @Override
+ protected KafkaAvroTableSource.Builder createKafkaAvroBuilder() {
+ return new Kafka09AvroTableSource.Builder();
+ }
+
+ @Override
+ protected String kafkaVersion() {
+ return CONNECTOR_VERSION_VALUE_09;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/614b1e29/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceFactory.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceFactory.java
index 9207426..7363282 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceFactory.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceFactory.java
@@ -26,7 +26,7 @@ import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSIO
public class Kafka09JsonTableSourceFactory extends KafkaJsonTableSourceFactory {
@Override
- protected KafkaJsonTableSource.Builder createBuilder() {
+ protected KafkaJsonTableSource.Builder createKafkaJsonBuilder() {
return new Kafka09JsonTableSource.Builder();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/614b1e29/flink-connectors/flink-connector-kafka-0.9/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory b/flink-connectors/flink-connector-kafka-0.9/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
index 2f38bd0..4717da9 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
@@ -14,3 +14,4 @@
# limitations under the License.
org.apache.flink.streaming.connectors.kafka.Kafka09JsonTableSourceFactory
+org.apache.flink.streaming.connectors.kafka.Kafka09AvroTableSourceFactory
http://git-wip-us.apache.org/repos/asf/flink/blob/614b1e29/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSourceFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSourceFactoryTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSourceFactoryTest.java
new file mode 100644
index 0000000..3078373
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSourceFactoryTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_09;
+
+/**
+ * Tests for {@link Kafka09AvroTableSourceFactory}.
+ */
+public class Kafka09AvroTableSourceFactoryTest extends KafkaAvroTableSourceFactoryTestBase {
+
+ @Override
+ protected String version() {
+ return CONNECTOR_VERSION_VALUE_09;
+ }
+
+ @Override
+ protected KafkaAvroTableSource.Builder builder() {
+ return Kafka09AvroTableSource.builder();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/614b1e29/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
index 1587798..7828a1c 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
@@ -20,25 +20,17 @@ package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.GenericTypeInfo;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.formats.avro.AvroRowDeserializationSchema;
-import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
+import org.apache.flink.formats.avro.typeutils.AvroRecordClassConverter;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.sources.DefinedFieldMapping;
import org.apache.flink.table.sources.StreamTableSource;
-import org.apache.flink.types.Row;
-import org.apache.avro.Schema;
-import org.apache.avro.specific.SpecificData;
import org.apache.avro.specific.SpecificRecord;
import org.apache.avro.specific.SpecificRecordBase;
-import org.apache.avro.util.Utf8;
-import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Properties;
/**
@@ -72,7 +64,7 @@ public abstract class KafkaAvroTableSource extends KafkaTableSource implements D
topic,
properties,
schema,
- convertToRowTypeInformation(avroRecordClass));
+ AvroRecordClassConverter.convert(avroRecordClass));
this.avroRecordClass = avroRecordClass;
}
@@ -92,6 +84,27 @@ public abstract class KafkaAvroTableSource extends KafkaTableSource implements D
return new AvroRowDeserializationSchema(avroRecordClass);
}
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof KafkaAvroTableSource)) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ final KafkaAvroTableSource that = (KafkaAvroTableSource) o;
+ return Objects.equals(avroRecordClass, that.avroRecordClass) &&
+ Objects.equals(fieldMapping, that.fieldMapping);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), avroRecordClass, fieldMapping);
+ }
+
//////// SETTERS FOR OPTIONAL PARAMETERS
/**
@@ -106,44 +119,6 @@ public abstract class KafkaAvroTableSource extends KafkaTableSource implements D
//////// HELPER METHODS
/**
- * Converts the extracted AvroTypeInfo into a RowTypeInfo nested structure with deterministic field order.
- * Replaces generic Utf8 with basic String type information.
- */
- @SuppressWarnings("unchecked")
- private static <T extends SpecificRecordBase> TypeInformation<Row> convertToRowTypeInformation(Class<T> avroClass) {
- final AvroTypeInfo<T> avroTypeInfo = new AvroTypeInfo<>(avroClass);
- // determine schema to retrieve deterministic field order
- final Schema schema = SpecificData.get().getSchema(avroClass);
- return (TypeInformation<Row>) convertToTypeInformation(avroTypeInfo, schema);
- }
-
- /**
- * Recursively converts extracted AvroTypeInfo into a RowTypeInfo nested structure with deterministic field order.
- * Replaces generic Utf8 with basic String type information.
- */
- private static TypeInformation<?> convertToTypeInformation(TypeInformation<?> extracted, Schema schema) {
- if (schema.getType() == Schema.Type.RECORD) {
- final List<Schema.Field> fields = schema.getFields();
- final AvroTypeInfo<?> avroTypeInfo = (AvroTypeInfo<?>) extracted;
-
- final TypeInformation<?>[] types = new TypeInformation<?>[fields.size()];
- final String[] names = new String[fields.size()];
- for (int i = 0; i < fields.size(); i++) {
- final Schema.Field field = fields.get(i);
- types[i] = convertToTypeInformation(avroTypeInfo.getTypeAt(field.name()), field.schema());
- names[i] = field.name();
- }
- return new RowTypeInfo(types, names);
- } else if (extracted instanceof GenericTypeInfo<?>) {
- final GenericTypeInfo<?> genericTypeInfo = (GenericTypeInfo<?>) extracted;
- if (genericTypeInfo.getTypeClass() == Utf8.class) {
- return BasicTypeInfo.STRING_TYPE_INFO;
- }
- }
- return extracted;
- }
-
- /**
* Abstract builder for a {@link KafkaAvroTableSource} to be extended by builders of subclasses of
* KafkaAvroTableSource.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/614b1e29/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceFactory.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceFactory.java
new file mode 100644
index 0000000..1401914
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceFactory.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.formats.avro.typeutils.AvroRecordClassConverter;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.AvroValidator;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.FormatDescriptorValidator;
+import org.apache.flink.table.descriptors.SchemaValidator;
+
+import org.apache.avro.specific.SpecificRecordBase;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Factory for creating configured instances of {@link KafkaAvroTableSource}.
+ */
+public abstract class KafkaAvroTableSourceFactory extends KafkaTableSourceFactory {
+
+ @Override
+ protected String formatType() {
+ return AvroValidator.FORMAT_TYPE_VALUE;
+ }
+
+ @Override
+ protected int formatPropertyVersion() {
+ return 1;
+ }
+
+ @Override
+ protected List<String> formatProperties() {
+ return Collections.singletonList(AvroValidator.FORMAT_RECORD_CLASS);
+ }
+
+ @Override
+ protected FormatDescriptorValidator formatValidator() {
+ return new AvroValidator();
+ }
+
+ @Override
+ protected KafkaTableSource.Builder createBuilderWithFormat(DescriptorProperties params) {
+ KafkaAvroTableSource.Builder builder = createKafkaAvroBuilder();
+
+ // Avro format schema
+ final Class<? extends SpecificRecordBase> avroRecordClass =
+ params.getClass(AvroValidator.FORMAT_RECORD_CLASS, SpecificRecordBase.class);
+ builder.forAvroRecordClass(avroRecordClass);
+ final TableSchema formatSchema = TableSchema.fromTypeInfo(AvroRecordClassConverter.convert(avroRecordClass));
+
+ // field mapping
+ final Map<String, String> mapping = SchemaValidator.deriveFieldMapping(params, Optional.of(formatSchema));
+ builder.withTableToAvroMapping(mapping);
+
+ return builder;
+ }
+
+ /**
+ * Creates a version specific {@link KafkaAvroTableSource.Builder}.
+ */
+ protected abstract KafkaAvroTableSource.Builder createKafkaAvroBuilder();
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/614b1e29/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactory.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactory.java
index 2897314..9898569 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactory.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactory.java
@@ -20,219 +20,77 @@ package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.formats.json.JsonSchemaConverter;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.FormatDescriptorValidator;
import org.apache.flink.table.descriptors.JsonValidator;
-import org.apache.flink.table.descriptors.KafkaValidator;
import org.apache.flink.table.descriptors.SchemaValidator;
-import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
-import org.apache.flink.table.sources.TableSource;
-import org.apache.flink.table.sources.TableSourceFactory;
-import org.apache.flink.types.Row;
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.Properties;
-
-import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
-import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
-import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_VERSION;
-import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_DERIVE_SCHEMA;
-import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_PROPERTY_VERSION;
-import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_TYPE;
-import static org.apache.flink.table.descriptors.JsonValidator.FORMAT_FAIL_ON_MISSING_FIELD;
-import static org.apache.flink.table.descriptors.JsonValidator.FORMAT_JSON_SCHEMA;
-import static org.apache.flink.table.descriptors.JsonValidator.FORMAT_SCHEMA;
-import static org.apache.flink.table.descriptors.JsonValidator.FORMAT_TYPE_VALUE;
-import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES;
-import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES_KEY;
-import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES_VALUE;
-import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS;
-import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS_OFFSET;
-import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS_PARTITION;
-import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_STARTUP_MODE;
-import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_TOPIC;
-import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_TYPE_VALUE_KAFKA;
-import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_CLASS;
-import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_FROM;
-import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_SERIALIZED;
-import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_TYPE;
-import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERMARKS_CLASS;
-import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERMARKS_DELAY;
-import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERMARKS_SERIALIZED;
-import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERMARKS_TYPE;
-import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA;
-import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_FROM;
-import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_NAME;
-import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_PROCTIME;
-import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_TYPE;
/**
* Factory for creating configured instances of {@link KafkaJsonTableSource}.
*/
-public abstract class KafkaJsonTableSourceFactory implements TableSourceFactory<Row> {
+public abstract class KafkaJsonTableSourceFactory extends KafkaTableSourceFactory {
@Override
- public Map<String, String> requiredContext() {
- Map<String, String> context = new HashMap<>();
- context.put(CONNECTOR_TYPE(), CONNECTOR_TYPE_VALUE_KAFKA); // kafka
- context.put(CONNECTOR_VERSION(), kafkaVersion());
-
- context.put(FORMAT_TYPE(), FORMAT_TYPE_VALUE); // json format
-
- context.put(CONNECTOR_PROPERTY_VERSION(), "1"); // backwards compatibility
- context.put(FORMAT_PROPERTY_VERSION(), "1");
-
- return context;
+ protected String formatType() {
+ return JsonValidator.FORMAT_TYPE_VALUE;
}
@Override
- public List<String> supportedProperties() {
- List<String> properties = new ArrayList<>();
-
- // kafka
- properties.add(CONNECTOR_TOPIC);
- properties.add(CONNECTOR_PROPERTIES);
- properties.add(CONNECTOR_PROPERTIES + ".#." + CONNECTOR_PROPERTIES_KEY);
- properties.add(CONNECTOR_PROPERTIES + ".#." + CONNECTOR_PROPERTIES_VALUE);
- properties.add(CONNECTOR_STARTUP_MODE);
- properties.add(CONNECTOR_SPECIFIC_OFFSETS + ".#." + CONNECTOR_SPECIFIC_OFFSETS_PARTITION);
- properties.add(CONNECTOR_SPECIFIC_OFFSETS + ".#." + CONNECTOR_SPECIFIC_OFFSETS_OFFSET);
-
- // json format
- properties.add(FORMAT_JSON_SCHEMA);
- properties.add(FORMAT_SCHEMA);
- properties.add(FORMAT_FAIL_ON_MISSING_FIELD);
- properties.add(FORMAT_DERIVE_SCHEMA());
-
- // schema
- properties.add(SCHEMA() + ".#." + SCHEMA_TYPE());
- properties.add(SCHEMA() + ".#." + SCHEMA_NAME());
- properties.add(SCHEMA() + ".#." + SCHEMA_FROM());
-
- // time attributes
- properties.add(SCHEMA() + ".#." + SCHEMA_PROCTIME());
- properties.add(SCHEMA() + ".#." + ROWTIME_TIMESTAMPS_TYPE());
- properties.add(SCHEMA() + ".#." + ROWTIME_TIMESTAMPS_FROM());
- properties.add(SCHEMA() + ".#." + ROWTIME_TIMESTAMPS_CLASS());
- properties.add(SCHEMA() + ".#." + ROWTIME_TIMESTAMPS_SERIALIZED());
- properties.add(SCHEMA() + ".#." + ROWTIME_WATERMARKS_TYPE());
- properties.add(SCHEMA() + ".#." + ROWTIME_WATERMARKS_CLASS());
- properties.add(SCHEMA() + ".#." + ROWTIME_WATERMARKS_SERIALIZED());
- properties.add(SCHEMA() + ".#." + ROWTIME_WATERMARKS_DELAY());
-
- return properties;
+ protected int formatPropertyVersion() {
+ return 1;
}
@Override
- public TableSource<Row> create(Map<String, String> properties) {
- final DescriptorProperties params = new DescriptorProperties(true);
- params.putProperties(properties);
-
- // validate
- new SchemaValidator(true).validate(params);
- new KafkaValidator().validate(params);
- new JsonValidator().validate(params);
-
- // build
- final KafkaJsonTableSource.Builder builder = createBuilder();
-
- // topic
- final String topic = params.getString(CONNECTOR_TOPIC);
- builder.forTopic(topic);
-
- // properties
- final Properties props = new Properties();
- final List<Map<String, String>> propsList = params.getFixedIndexedProperties(
- CONNECTOR_PROPERTIES,
- Arrays.asList(CONNECTOR_PROPERTIES_KEY, CONNECTOR_PROPERTIES_VALUE));
- propsList.forEach(kv -> props.put(
- params.getString(kv.get(CONNECTOR_PROPERTIES_KEY)),
- params.getString(kv.get(CONNECTOR_PROPERTIES_VALUE))
- ));
- builder.withKafkaProperties(props);
-
- // startup mode
- params
- .getOptionalString(CONNECTOR_STARTUP_MODE)
- .ifPresent(startupMode -> {
- switch (startupMode) {
-
- case KafkaValidator.CONNECTOR_STARTUP_MODE_VALUE_EARLIEST:
- builder.fromEarliest();
- break;
-
- case KafkaValidator.CONNECTOR_STARTUP_MODE_VALUE_LATEST:
- builder.fromLatest();
- break;
-
- case KafkaValidator.CONNECTOR_STARTUP_MODE_VALUE_GROUP_OFFSETS:
- builder.fromGroupOffsets();
- break;
+ protected List<String> formatProperties() {
+ return Arrays.asList(
+ JsonValidator.FORMAT_JSON_SCHEMA,
+ JsonValidator.FORMAT_SCHEMA,
+ JsonValidator.FORMAT_FAIL_ON_MISSING_FIELD,
+ FormatDescriptorValidator.FORMAT_DERIVE_SCHEMA());
+ }
- case KafkaValidator.CONNECTOR_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS:
- final Map<KafkaTopicPartition, Long> offsetMap = new HashMap<>();
+ @Override
+ protected FormatDescriptorValidator formatValidator() {
+ return new JsonValidator();
+ }
- final List<Map<String, String>> offsetList = params.getFixedIndexedProperties(
- CONNECTOR_SPECIFIC_OFFSETS,
- Arrays.asList(CONNECTOR_SPECIFIC_OFFSETS_PARTITION, CONNECTOR_SPECIFIC_OFFSETS_OFFSET));
- offsetList.forEach(kv -> {
- final int partition = params.getInt(kv.get(CONNECTOR_SPECIFIC_OFFSETS_PARTITION));
- final long offset = params.getLong(kv.get(CONNECTOR_SPECIFIC_OFFSETS_OFFSET));
- final KafkaTopicPartition topicPartition = new KafkaTopicPartition(topic, partition);
- offsetMap.put(topicPartition, offset);
- });
- builder.fromSpecificOffsets(offsetMap);
- break;
- }
- });
+ @Override
+ protected KafkaTableSource.Builder createBuilderWithFormat(DescriptorProperties params) {
+ final KafkaJsonTableSource.Builder builder = createKafkaJsonBuilder();
// missing field
- params.getOptionalBoolean(FORMAT_FAIL_ON_MISSING_FIELD).ifPresent(builder::failOnMissingField);
+ params.getOptionalBoolean(JsonValidator.FORMAT_FAIL_ON_MISSING_FIELD)
+ .ifPresent(builder::failOnMissingField);
// json schema
final TableSchema formatSchema;
- if (params.containsKey(FORMAT_SCHEMA)) {
- final TypeInformation<?> info = params.getType(FORMAT_SCHEMA);
+ if (params.containsKey(JsonValidator.FORMAT_SCHEMA)) {
+ final TypeInformation<?> info = params.getType(JsonValidator.FORMAT_SCHEMA);
formatSchema = TableSchema.fromTypeInfo(info);
- } else if (params.containsKey(FORMAT_JSON_SCHEMA)) {
- final TypeInformation<?> info = JsonSchemaConverter.convert(params.getString(FORMAT_JSON_SCHEMA));
+ } else if (params.containsKey(JsonValidator.FORMAT_JSON_SCHEMA)) {
+ final TypeInformation<?> info = JsonSchemaConverter.convert(params.getString(JsonValidator.FORMAT_JSON_SCHEMA));
formatSchema = TableSchema.fromTypeInfo(info);
} else {
formatSchema = SchemaValidator.deriveFormatFields(params);
}
builder.forJsonSchema(formatSchema);
- // schema
- final TableSchema schema = params.getTableSchema(SCHEMA());
- builder.withSchema(schema);
-
- // proctime
- SchemaValidator.deriveProctimeAttribute(params).ifPresent(builder::withProctimeAttribute);
-
- // rowtime
- final List<RowtimeAttributeDescriptor> descriptors = SchemaValidator.deriveRowtimeAttributes(params);
- if (descriptors.size() > 1) {
- throw new TableException("More than one rowtime attribute is not supported yet.");
- } else if (descriptors.size() == 1) {
- final RowtimeAttributeDescriptor desc = descriptors.get(0);
- builder.withRowtimeAttribute(desc.getAttributeName(), desc.getTimestampExtractor(), desc.getWatermarkStrategy());
- }
-
// field mapping
final Map<String, String> mapping = SchemaValidator.deriveFieldMapping(params, Optional.of(formatSchema));
builder.withTableToJsonMapping(mapping);
- return builder.build();
+ return builder;
}
- protected abstract KafkaJsonTableSource.Builder createBuilder();
+ /**
+ * Creates a version specific {@link KafkaJsonTableSource.Builder}.
+ */
+ protected abstract KafkaJsonTableSource.Builder createKafkaJsonBuilder();
- protected abstract String kafkaVersion();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/614b1e29/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactory.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactory.java
new file mode 100644
index 0000000..6aaef9b
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactory.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.FormatDescriptorValidator;
+import org.apache.flink.table.descriptors.KafkaValidator;
+import org.apache.flink.table.descriptors.SchemaValidator;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.table.sources.TableSourceFactory;
+import org.apache.flink.types.Row;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
+import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
+import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_VERSION;
+import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_PROPERTY_VERSION;
+import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_TYPE;
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES;
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES_KEY;
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES_VALUE;
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS;
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS_OFFSET;
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS_PARTITION;
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_STARTUP_MODE;
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_TOPIC;
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_TYPE_VALUE_KAFKA;
+import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_CLASS;
+import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_FROM;
+import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_SERIALIZED;
+import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_TYPE;
+import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERMARKS_CLASS;
+import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERMARKS_DELAY;
+import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERMARKS_SERIALIZED;
+import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERMARKS_TYPE;
+import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA;
+import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_FROM;
+import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_NAME;
+import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_PROCTIME;
+import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_TYPE;
+
+/**
+ * Factory for creating configured instances of {@link KafkaJsonTableSource}.
+ */
+abstract class KafkaTableSourceFactory implements TableSourceFactory<Row> {
+
+ @Override
+ public Map<String, String> requiredContext() {
+ Map<String, String> context = new HashMap<>();
+ context.put(CONNECTOR_TYPE(), CONNECTOR_TYPE_VALUE_KAFKA); // kafka
+ context.put(CONNECTOR_VERSION(), kafkaVersion()); // version
+
+ context.put(FORMAT_TYPE(), formatType()); // format
+
+ context.put(CONNECTOR_PROPERTY_VERSION(), "1"); // backwards compatibility
+ context.put(FORMAT_PROPERTY_VERSION(), String.valueOf(formatPropertyVersion()));
+
+ return context;
+ }
+
+ @Override
+ public List<String> supportedProperties() {
+ List<String> properties = new ArrayList<>();
+
+ // kafka
+ properties.add(CONNECTOR_TOPIC);
+ properties.add(CONNECTOR_PROPERTIES);
+ properties.add(CONNECTOR_PROPERTIES + ".#." + CONNECTOR_PROPERTIES_KEY);
+ properties.add(CONNECTOR_PROPERTIES + ".#." + CONNECTOR_PROPERTIES_VALUE);
+ properties.add(CONNECTOR_STARTUP_MODE);
+ properties.add(CONNECTOR_SPECIFIC_OFFSETS + ".#." + CONNECTOR_SPECIFIC_OFFSETS_PARTITION);
+ properties.add(CONNECTOR_SPECIFIC_OFFSETS + ".#." + CONNECTOR_SPECIFIC_OFFSETS_OFFSET);
+
+ // schema
+ properties.add(SCHEMA() + ".#." + SCHEMA_TYPE());
+ properties.add(SCHEMA() + ".#." + SCHEMA_NAME());
+ properties.add(SCHEMA() + ".#." + SCHEMA_FROM());
+
+ // time attributes
+ properties.add(SCHEMA() + ".#." + SCHEMA_PROCTIME());
+ properties.add(SCHEMA() + ".#." + ROWTIME_TIMESTAMPS_TYPE());
+ properties.add(SCHEMA() + ".#." + ROWTIME_TIMESTAMPS_FROM());
+ properties.add(SCHEMA() + ".#." + ROWTIME_TIMESTAMPS_CLASS());
+ properties.add(SCHEMA() + ".#." + ROWTIME_TIMESTAMPS_SERIALIZED());
+ properties.add(SCHEMA() + ".#." + ROWTIME_WATERMARKS_TYPE());
+ properties.add(SCHEMA() + ".#." + ROWTIME_WATERMARKS_CLASS());
+ properties.add(SCHEMA() + ".#." + ROWTIME_WATERMARKS_SERIALIZED());
+ properties.add(SCHEMA() + ".#." + ROWTIME_WATERMARKS_DELAY());
+
+ properties.addAll(formatProperties());
+
+ return properties;
+ }
+
+ @Override
+ public TableSource<Row> create(Map<String, String> properties) {
+ final DescriptorProperties params = new DescriptorProperties(true);
+ params.putProperties(properties);
+
+ // validate
+ new SchemaValidator(true).validate(params);
+ new KafkaValidator().validate(params);
+ formatValidator().validate(params);
+
+ // build
+ final KafkaTableSource.Builder builder = createBuilderWithFormat(params);
+
+ // topic
+ final String topic = params.getString(CONNECTOR_TOPIC);
+ builder.forTopic(topic);
+
+ // properties
+ final Properties props = new Properties();
+ final List<Map<String, String>> propsList = params.getFixedIndexedProperties(
+ CONNECTOR_PROPERTIES,
+ Arrays.asList(CONNECTOR_PROPERTIES_KEY, CONNECTOR_PROPERTIES_VALUE));
+ propsList.forEach(kv -> props.put(
+ params.getString(kv.get(CONNECTOR_PROPERTIES_KEY)),
+ params.getString(kv.get(CONNECTOR_PROPERTIES_VALUE))
+ ));
+ builder.withKafkaProperties(props);
+
+ // startup mode
+ params
+ .getOptionalString(CONNECTOR_STARTUP_MODE)
+ .ifPresent(startupMode -> {
+ switch (startupMode) {
+
+ case KafkaValidator.CONNECTOR_STARTUP_MODE_VALUE_EARLIEST:
+ builder.fromEarliest();
+ break;
+
+ case KafkaValidator.CONNECTOR_STARTUP_MODE_VALUE_LATEST:
+ builder.fromLatest();
+ break;
+
+ case KafkaValidator.CONNECTOR_STARTUP_MODE_VALUE_GROUP_OFFSETS:
+ builder.fromGroupOffsets();
+ break;
+
+ case KafkaValidator.CONNECTOR_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS:
+ final Map<KafkaTopicPartition, Long> offsetMap = new HashMap<>();
+
+ final List<Map<String, String>> offsetList = params.getFixedIndexedProperties(
+ CONNECTOR_SPECIFIC_OFFSETS,
+ Arrays.asList(CONNECTOR_SPECIFIC_OFFSETS_PARTITION, CONNECTOR_SPECIFIC_OFFSETS_OFFSET));
+ offsetList.forEach(kv -> {
+ final int partition = params.getInt(kv.get(CONNECTOR_SPECIFIC_OFFSETS_PARTITION));
+ final long offset = params.getLong(kv.get(CONNECTOR_SPECIFIC_OFFSETS_OFFSET));
+ final KafkaTopicPartition topicPartition = new KafkaTopicPartition(topic, partition);
+ offsetMap.put(topicPartition, offset);
+ });
+ builder.fromSpecificOffsets(offsetMap);
+ break;
+ }
+ });
+
+ // schema
+ final TableSchema schema = params.getTableSchema(SCHEMA());
+ builder.withSchema(schema);
+
+ // proctime
+ SchemaValidator.deriveProctimeAttribute(params).ifPresent(builder::withProctimeAttribute);
+
+ // rowtime
+ final List<RowtimeAttributeDescriptor> descriptors = SchemaValidator.deriveRowtimeAttributes(params);
+ if (descriptors.size() > 1) {
+ throw new TableException("More than one rowtime attribute is not supported yet.");
+ } else if (descriptors.size() == 1) {
+ final RowtimeAttributeDescriptor desc = descriptors.get(0);
+ builder.withRowtimeAttribute(desc.getAttributeName(), desc.getTimestampExtractor(), desc.getWatermarkStrategy());
+ }
+
+ return builder.build();
+ }
+
+ /**
+ * Returns the format type string (e.g., "json").
+ */
+ protected abstract String formatType();
+
+ /**
+ * Returns the format property version.
+ */
+ protected abstract int formatPropertyVersion();
+
+ /**
+ * Returns the properties of the format.
+ */
+ protected abstract List<String> formatProperties();
+
+ /**
+ * Returns the validator for the format.
+ */
+ protected abstract FormatDescriptorValidator formatValidator();
+
+ /**
+ * Returns the Kafka version.
+ */
+ protected abstract String kafkaVersion();
+
+ /**
+ * Creates a builder with all the format-related configurations have been set.
+ */
+ protected abstract KafkaTableSource.Builder createBuilderWithFormat(DescriptorProperties params);
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/614b1e29/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceFactoryTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceFactoryTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceFactoryTestBase.java
new file mode 100644
index 0000000..9552559
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceFactoryTestBase.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.formats.avro.generated.Address;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.Avro;
+import org.apache.flink.table.descriptors.FormatDescriptor;
+import org.apache.flink.table.descriptors.Kafka;
+import org.apache.flink.table.descriptors.Schema;
+import org.apache.flink.table.descriptors.TestTableSourceDescriptor;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.table.sources.TableSourceFactoryService;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Tests for {@link KafkaAvroTableSourceFactory}.
+ */
+public abstract class KafkaAvroTableSourceFactoryTestBase {
+
+ private static final String TOPIC = "test-topic";
+
+ protected abstract String version();
+
+ protected abstract KafkaAvroTableSource.Builder builder();
+
+ @Test
+ public void testTableSourceFromAvroSchema() {
+ testTableSource(new Avro().recordClass(Address.class));
+ }
+
+ private void testTableSource(FormatDescriptor format) {
+ // construct table source using a builder
+
+ final Map<String, String> tableAvroMapping = new HashMap<>();
+ tableAvroMapping.put("a_street", "street");
+ tableAvroMapping.put("street", "street");
+ tableAvroMapping.put("b_city", "city");
+ tableAvroMapping.put("city", "city");
+ tableAvroMapping.put("c_state", "state");
+ tableAvroMapping.put("state", "state");
+ tableAvroMapping.put("zip", "zip");
+ tableAvroMapping.put("num", "num");
+
+ final Properties props = new Properties();
+ props.put("group.id", "test-group");
+ props.put("bootstrap.servers", "localhost:1234");
+
+ final Map<KafkaTopicPartition, Long> specificOffsets = new HashMap<>();
+ specificOffsets.put(new KafkaTopicPartition(TOPIC, 0), 100L);
+ specificOffsets.put(new KafkaTopicPartition(TOPIC, 1), 123L);
+
+ KafkaAvroTableSource.Builder builder = builder();
+
+ builder.forAvroRecordClass(Address.class)
+ .withTableToAvroMapping(tableAvroMapping);
+
+ final KafkaTableSource builderSource = builder
+ .withKafkaProperties(props)
+ .forTopic(TOPIC)
+ .fromSpecificOffsets(specificOffsets)
+ .withSchema(
+ TableSchema.builder()
+ .field("a_street", Types.STRING)
+ .field("b_city", Types.STRING)
+ .field("c_state", Types.STRING)
+ .field("zip", Types.STRING)
+ .field("proctime", Types.SQL_TIMESTAMP)
+ .build())
+ .withProctimeAttribute("proctime")
+ .build();
+
+ // construct table source using descriptors and table source factory
+
+ final Map<Integer, Long> offsets = new HashMap<>();
+ offsets.put(0, 100L);
+ offsets.put(1, 123L);
+
+ final TestTableSourceDescriptor testDesc = new TestTableSourceDescriptor(
+ new Kafka()
+ .version(version())
+ .topic(TOPIC)
+ .properties(props)
+ .startFromSpecificOffsets(offsets))
+ .addFormat(format)
+ .addSchema(
+ new Schema()
+ .field("a_street", Types.STRING).from("street")
+ .field("b_city", Types.STRING).from("city")
+ .field("c_state", Types.STRING).from("state")
+ .field("zip", Types.STRING)
+ .field("proctime", Types.SQL_TIMESTAMP).proctime());
+
+ final TableSource<?> factorySource = TableSourceFactoryService.findAndCreateTableSource(testDesc);
+
+ assertEquals(builderSource, factorySource);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/614b1e29/flink-formats/flink-avro/pom.xml
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/pom.xml b/flink-formats/flink-avro/pom.xml
index 3458760..2a437f6 100644
--- a/flink-formats/flink-avro/pom.xml
+++ b/flink-formats/flink-avro/pom.xml
@@ -51,17 +51,35 @@ under the License.
<!-- managed version -->
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <!-- use a dedicated Scala version to not depend on it -->
+ <artifactId>flink-table_2.11</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ <!-- Projects depending on this project, won't depend on flink-table. -->
+ <optional>true</optional>
+ </dependency>
+
<!-- test dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-test-utils-junit</artifactId>
+ <artifactId>flink-table_2.11</artifactId>
<version>${project.version}</version>
<scope>test</scope>
+ <type>test-jar</type>
</dependency>
<!-- To avoid having to have the 'flink-avro' project dependent on a particular
- Scala version, we hard-refer the flink-test-utils_2.11 here -->
+ Scala version, we hard-refer the flink-streaming-scala_2.11 here -->
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-scala_2.11</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.apache.flink</groupId>
http://git-wip-us.apache.org/repos/asf/flink/blob/614b1e29/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroRecordClassConverter.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroRecordClassConverter.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroRecordClassConverter.java
new file mode 100644
index 0000000..b7b4871
--- /dev/null
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroRecordClassConverter.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.typeutils;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+
+import org.apache.avro.Schema;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.avro.util.Utf8;
+
+import java.util.List;
+
+/**
+ * Utilities for Avro record class conversion.
+ */
+public class AvroRecordClassConverter {
+
+ private AvroRecordClassConverter() {
+ // private
+ }
+
+ /**
+ * Converts the extracted AvroTypeInfo into a RowTypeInfo nested structure with deterministic field order.
+ * Replaces generic Utf8 with basic String type information.
+ */
+ @SuppressWarnings("unchecked")
+ public static <T extends SpecificRecordBase> TypeInformation<Row> convert(Class<T> avroClass) {
+ final AvroTypeInfo<T> avroTypeInfo = new AvroTypeInfo<>(avroClass);
+ // determine schema to retrieve deterministic field order
+ final Schema schema = SpecificData.get().getSchema(avroClass);
+ return (TypeInformation<Row>) convertType(avroTypeInfo, schema);
+ }
+
+ /**
+ * Recursively converts extracted AvroTypeInfo into a RowTypeInfo nested structure with deterministic field order.
+ * Replaces generic Utf8 with basic String type information.
+ */
+ private static TypeInformation<?> convertType(TypeInformation<?> extracted, Schema schema) {
+ if (schema.getType() == Schema.Type.RECORD) {
+ final List<Schema.Field> fields = schema.getFields();
+ final AvroTypeInfo<?> avroTypeInfo = (AvroTypeInfo<?>) extracted;
+
+ final TypeInformation<?>[] types = new TypeInformation<?>[fields.size()];
+ final String[] names = new String[fields.size()];
+ for (int i = 0; i < fields.size(); i++) {
+ final Schema.Field field = fields.get(i);
+ types[i] = convertType(avroTypeInfo.getTypeAt(field.name()), field.schema());
+ names[i] = field.name();
+ }
+ return new RowTypeInfo(types, names);
+ } else if (extracted instanceof GenericTypeInfo<?>) {
+ final GenericTypeInfo<?> genericTypeInfo = (GenericTypeInfo<?>) extracted;
+ if (genericTypeInfo.getTypeClass() == Utf8.class) {
+ return BasicTypeInfo.STRING_TYPE_INFO;
+ }
+ }
+ return extracted;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/614b1e29/flink-formats/flink-avro/src/main/java/org/apache/flink/table/descriptors/Avro.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/table/descriptors/Avro.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/table/descriptors/Avro.java
new file mode 100644
index 0000000..f07a22f
--- /dev/null
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/table/descriptors/Avro.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.util.Preconditions;
+
+import org.apache.avro.specific.SpecificRecordBase;
+
+/**
+ * Format descriptor for Apache Avro records.
+ */
+public class Avro extends FormatDescriptor {
+
+ private Class<? extends SpecificRecordBase> recordClass;
+
+ /**
+ * Format descriptor for Apache Avro records.
+ */
+ public Avro() {
+ super(AvroValidator.FORMAT_TYPE_VALUE, 1);
+ }
+
+ /**
+ * Sets the class of the Avro specific record. Required.
+ *
+ * @param recordClass class of the Avro record.
+ */
+ public Avro recordClass(Class<? extends SpecificRecordBase> recordClass) {
+ Preconditions.checkNotNull(recordClass);
+ this.recordClass = recordClass;
+ return this;
+ }
+
+ /**
+ * Internal method for format properties conversion.
+ */
+ @Override
+ public void addFormatProperties(DescriptorProperties properties) {
+ if (null != recordClass) {
+ properties.putClass(AvroValidator.FORMAT_RECORD_CLASS, recordClass);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/614b1e29/flink-formats/flink-avro/src/main/java/org/apache/flink/table/descriptors/AvroValidator.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/table/descriptors/AvroValidator.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/table/descriptors/AvroValidator.java
new file mode 100644
index 0000000..8a72abf
--- /dev/null
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/table/descriptors/AvroValidator.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+/**
+ * Validator for {@link Avro}.
+ */
+public class AvroValidator extends FormatDescriptorValidator {
+
+ public static final String FORMAT_TYPE_VALUE = "avro";
+ public static final String FORMAT_RECORD_CLASS = "format.record-class";
+
+ @Override
+ public void validate(DescriptorProperties properties) {
+ super.validate(properties);
+ properties.validateString(FORMAT_RECORD_CLASS, false, 1);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/614b1e29/flink-formats/flink-avro/src/test/java/org/apache/flink/table/descriptors/AvroTest.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/table/descriptors/AvroTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/table/descriptors/AvroTest.java
new file mode 100644
index 0000000..2345553
--- /dev/null
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/table/descriptors/AvroTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.formats.avro.generated.User;
+import org.apache.flink.table.api.ValidationException;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Tests for the {@link Avro} descriptor.
+ */
+public class AvroTest extends DescriptorTestBase {
+
+ @Test(expected = ValidationException.class)
+ public void testMissingRecordClass() {
+ removePropertyAndVerify(descriptors().get(0), "format.record-class");
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public List<Descriptor> descriptors() {
+ final Descriptor desc1 = new Avro().recordClass(User.class);
+ return Collections.singletonList(desc1);
+ }
+
+ @Override
+ public List<Map<String, String>> properties() {
+ final Map<String, String> props1 = new HashMap<>();
+ props1.put("format.type", "avro");
+ props1.put("format.property-version", "1");
+ props1.put("format.record-class", "org.apache.flink.formats.avro.generated.User");
+
+ return Collections.singletonList(props1);
+ }
+
+ @Override
+ public DescriptorValidator validator() {
+ return new AvroValidator();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/614b1e29/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java
new file mode 100644
index 0000000..88c70c6
--- /dev/null
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.formats.avro.generated.Address;
+import org.apache.flink.formats.avro.generated.Colors;
+import org.apache.flink.formats.avro.generated.Fixed16;
+import org.apache.flink.formats.avro.generated.User;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.runtime.utils.TableProgramsClusterTestBase;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.types.Row;
+
+import org.apache.avro.util.Utf8;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Tests for interoperability with Avro types.
+ */
+@RunWith(Parameterized.class)
+public class AvroTypesITCase extends TableProgramsClusterTestBase {
+
+ private static final User USER_1 = User.newBuilder()
+ .setName("Charlie")
+ .setFavoriteColor("blue")
+ .setFavoriteNumber(null)
+ .setTypeBoolTest(false)
+ .setTypeDoubleTest(1.337d)
+ .setTypeNullTest(null)
+ .setTypeLongTest(1337L)
+ .setTypeArrayString(new ArrayList<>())
+ .setTypeArrayBoolean(new ArrayList<>())
+ .setTypeNullableArray(null)
+ .setTypeEnum(Colors.RED)
+ .setTypeMap(new HashMap<>())
+ .setTypeFixed(null)
+ .setTypeUnion(null)
+ .setTypeNested(
+ Address.newBuilder()
+ .setNum(42)
+ .setStreet("Bakerstreet")
+ .setCity("Berlin")
+ .setState("Berlin")
+ .setZip("12049").build())
+ .build();
+
+ private static final User USER_2 = User.newBuilder()
+ .setName("Whatever")
+ .setFavoriteNumber(null)
+ .setFavoriteColor("black")
+ .setTypeLongTest(42L)
+ .setTypeDoubleTest(0.0)
+ .setTypeNullTest(null)
+ .setTypeBoolTest(true)
+ .setTypeArrayString(Collections.singletonList("hello"))
+ .setTypeArrayBoolean(Collections.singletonList(true))
+ .setTypeEnum(Colors.GREEN)
+ .setTypeMap(new HashMap<>())
+ .setTypeFixed(new Fixed16())
+ .setTypeUnion(null)
+ .setTypeNested(null)
+ .build();
+
+ private static final User USER_3 = User.newBuilder()
+ .setName("Terminator")
+ .setFavoriteNumber(null)
+ .setFavoriteColor("yellow")
+ .setTypeLongTest(1L)
+ .setTypeDoubleTest(0.0)
+ .setTypeNullTest(null)
+ .setTypeBoolTest(false)
+ .setTypeArrayString(Collections.singletonList("world"))
+ .setTypeArrayBoolean(Collections.singletonList(false))
+ .setTypeEnum(Colors.GREEN)
+ .setTypeMap(new HashMap<>())
+ .setTypeFixed(new Fixed16())
+ .setTypeUnion(null)
+ .setTypeNested(null)
+ .build();
+
+ private static TypeInformation<Row> rowType = Types.ROW(
+ Types.GENERIC(Utf8.class),
+ Types.INT,
+ Types.GENERIC(Utf8.class),
+ Types.GENERIC(List.class),
+ Types.GENERIC(List.class),
+ Types.GENERIC(Object.class),
+ Types.DOUBLE,
+ Types.ENUM(Colors.class),
+ Types.GENERIC(Fixed16.class),
+ Types.LONG,
+ Types.GENERIC(Map.class),
+ Types.POJO(Address.class),
+ Types.GENERIC(Object.class),
+ Types.GENERIC(List.class),
+ Types.GENERIC(Object.class)
+ );
+
+ public AvroTypesITCase(
+ TestExecutionMode executionMode,
+ TableConfigMode tableConfigMode) {
+ super(executionMode, tableConfigMode);
+ }
+
+ @Test
+ public void testAvroToRow() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ Table t = tEnv.fromDataSet(testData(env));
+ Table result = t.select("*");
+
+ List<Row> results = tEnv.toDataSet(result, rowType).collect();
+ String expected = "black,null,Whatever,[true],[hello],true,0.0,GREEN," +
+ "[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],42,{},null,null,null,null\n" +
+ "blue,null,Charlie,[],[],false,1.337,RED," +
+ "null,1337,{},{\"num\": 42, \"street\": \"Bakerstreet\", \"city\": \"Berlin\", " +
+ "\"state\": \"Berlin\", \"zip\": \"12049\"},null,null,null\n" +
+ "yellow,null,Terminator,[false],[world],false,0.0,GREEN," +
+ "[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],1,{},null,null,null,null";
+ TestBaseUtils.compareResultAsText(results, expected);
+ }
+
+ @Test
+ public void testAvroStringAccess() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ Table t = tEnv.fromDataSet(testData(env));
+ Table result = t.select("name");
+ List<Utf8> results = tEnv.toDataSet(result, Types.GENERIC(Utf8.class)).collect();
+ String expected = "Charlie\n" +
+ "Terminator\n" +
+ "Whatever";
+ TestBaseUtils.compareResultAsText(results, expected);
+ }
+
+ @Test
+ public void testAvroObjectAccess() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ Table t = tEnv.fromDataSet(testData(env));
+ Table result = t
+ .filter("type_nested.isNotNull")
+ .select("type_nested.flatten()").as("city, num, state, street, zip");
+
+ List<Address> results = tEnv.toDataSet(result, Types.POJO(Address.class)).collect();
+ String expected = USER_1.getTypeNested().toString();
+ TestBaseUtils.compareResultAsText(results, expected);
+ }
+
+ @Test
+ public void testAvroToAvro() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ Table t = tEnv.fromDataSet(testData(env));
+ Table result = t.select("*");
+
+ List<User> results = tEnv.toDataSet(result, Types.POJO(User.class)).collect();
+ String expected = USER_1 + "\n" + USER_2 + "\n" + USER_3;
+ TestBaseUtils.compareResultAsText(results, expected);
+ }
+
+ private DataSet<User> testData(ExecutionEnvironment env) {
+ List<User> data = new ArrayList<>(3);
+ data.add(USER_1);
+ data.add(USER_2);
+ data.add(USER_3);
+ return env.fromCollection(data);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/614b1e29/flink-libraries/flink-table/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/pom.xml b/flink-libraries/flink-table/pom.xml
index 395e5cb..d993f23 100644
--- a/flink-libraries/flink-table/pom.xml
+++ b/flink-libraries/flink-table/pom.xml
@@ -222,21 +222,6 @@ under the License.
<scope>test</scope>
<type>test-jar</type>
</dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-avro</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- <type>test-jar</type>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-avro</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/flink/blob/614b1e29/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala
index 866029b..6a2a418 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala
@@ -27,7 +27,7 @@ import org.apache.flink.table.expressions.{Cast, Expression, ResolvedFieldRefere
*
* @param field The field to convert into a rowtime attribute.
*/
-class ExistingField(val field: String) extends TimestampExtractor {
+final class ExistingField(val field: String) extends TimestampExtractor {
override def getArgumentFields: Array[String] = Array(field)
http://git-wip-us.apache.org/repos/asf/flink/blob/614b1e29/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/StreamRecordTimestamp.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/StreamRecordTimestamp.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/StreamRecordTimestamp.scala
index fcbd63f..659b13b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/StreamRecordTimestamp.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/StreamRecordTimestamp.scala
@@ -42,6 +42,15 @@ final class StreamRecordTimestamp extends TimestampExtractor {
override def getExpression(fieldAccesses: Array[ResolvedFieldReference]): Expression = {
org.apache.flink.table.expressions.StreamRecordTimestamp()
}
+
+ override def equals(obj: Any): Boolean = obj match {
+ case _: StreamRecordTimestamp => true
+ case _ => false
+ }
+
+ override def hashCode(): Int = {
+ classOf[StreamRecordTimestamp].hashCode()
+ }
}
object StreamRecordTimestamp {