You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by twalthr <gi...@git.apache.org> on 2018/07/13 06:52:00 UTC
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
GitHub user twalthr opened a pull request:
https://github.com/apache/flink/pull/6323
[FLINK-8558] [FLINK-8866] [table] Finalize unified table source/sink/format interfaces
## What is the purpose of the change
This PR finalizes the efforts done in #6264 and #6201 for having unified interfaces for table sources, table sinks, and table formats. It reduces code duplication and cleans up the code base around factories.
## Brief change log
- Introduction of `org.apache.table.factories.TableFactory` a common interface for factories
- Introduction of `org.apache.table.factories.TableFormatFactory` a specific table factory for formats
- Specific factories for `StreamTableSource`, `StreamTableSink`, `BatchTableSource`, `BatchTableSink`, `DeserializationSchema`, and `SerializationSchema`
- Deprecation of old format-specific table sources (sinks will be deprecated in a follow-up PR)
- Possibility to register table source and sink under a common name (table type `both` in SQL Client YAML)
## Verifying this change
- Existing tests verify the implementation
- Additional ITCases and unit tests have been added
- (An end-to-end test will follow in a separate PR)
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): no
- The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes
- The serializers: no
- The runtime per-record code paths (performance sensitive): no
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no
- The S3 file system connector: no
## Documentation
- Does this pull request introduce a new feature? yes
- If yes, how is the feature documented? not documented
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/twalthr/flink UnifiedInterfacesFinal
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/6323.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #6323
----
commit 980499f887d72ddf9a405c4ad200d0cab15d889c
Author: Timo Walther <tw...@...>
Date: 2018-06-27T11:16:49Z
[FLINK-8558] [table] Add unified format interfaces and separate formats from connectors
This PR introduces a format discovery mechanism based on Java Service Providers. The general `TableFormatFactory` is similar to the existing table source discovery mechanism. However, it allows for arbirary format interfaces that might be introduced in the future. At the moment, a connector can request configured instances of `DeserializationSchema` and `SerializationSchema`. In the future we can add interfaces such as a `Writer` or `KeyedSerializationSchema` without breaking backwards compatibility.
This PR deprecates the existing strong coupling of connector and format for the Kafa table sources and table source factories. It introduces descriptor-based alternatives.
commit 42a8a156d4e6f8f3d119c458350b6c897306fc48
Author: Shuyi Chen <sh...@...>
Date: 2018-06-19T19:00:34Z
[FLINK-8866] [table] Create unified interfaces to configure and instatiate TableSinks
This closes #6201.
commit 311dc62e59c0e4146c094b73c21b979f31b2e1d9
Author: Timo Walther <tw...@...>
Date: 2018-07-11T11:29:03Z
Rename to TableFactory and move it to factories package
commit 1c581cba61ba321bb6de6a4d298a881840d11cfe
Author: Timo Walther <tw...@...>
Date: 2018-07-11T11:46:31Z
Refactor format factories
commit 5c6df7598d1f1c3c698ae9b6b35eb37d7fff8295
Author: Timo Walther <tw...@...>
Date: 2018-07-12T06:35:00Z
Unify table factories
commit 0cd7c44c006aba21c32d8785d17bfc3dbee03916
Author: Timo Walther <tw...@...>
Date: 2018-07-12T07:05:50Z
Move table type out of descriptors
commit 6b83f2e1c0e63147f049dc5389c5633077b789a4
Author: Timo Walther <tw...@...>
Date: 2018-07-12T08:50:09Z
Make source/sink factories environment-dependent
commit 4f1255fd003080f078afe6ef67ffa58f40ffec36
Author: Timo Walther <tw...@...>
Date: 2018-07-12T18:48:45Z
Clean up and simplify changes
----
---
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6323#discussion_r202282478
--- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/SourceSink.java ---
@@ -51,16 +51,10 @@ public void addProperties(DescriptorProperties properties) {
}
public Source toSource() {
- final Map<String, String> newProperties = new HashMap<>(properties);
- newProperties.replace(TableDescriptorValidator.TABLE_TYPE(),
- TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE());
--- End diff --
why did those lines disappear?
---
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6323#discussion_r202352869
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala ---
@@ -52,12 +52,9 @@ class FlinkLogicalTableSourceScan(
override def deriveRowType(): RelDataType = {
val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
val streamingTable = table.unwrap(classOf[TableSourceSinkTable[_, _]]) match {
- case t: TableSourceSinkTable[_, _] => t.tableSourceTableOpt match {
- case Some(_: StreamTableSourceTable[_]) => true
- case Some(_: BatchTableSourceTable[_]) => false
- case _ => throw TableException(s"Unknown Table type ${t.getClass}.")
- }
- case t => throw TableException(s"Unknown Table type ${t.getClass}.")
+ case t: TableSourceSinkTable[_, _] if t.isStreamSourceTable => true
+ // null
--- End diff --
drop `//null` ?
---
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6323#discussion_r202338408
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala ---
@@ -18,143 +18,358 @@
package org.apache.flink.table.factories
-import java.util.{ServiceConfigurationError, ServiceLoader}
+import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap}
import org.apache.flink.table.api._
import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
import org.apache.flink.table.descriptors.FormatDescriptorValidator._
import org.apache.flink.table.descriptors.MetadataValidator._
import org.apache.flink.table.descriptors.StatisticsValidator._
-import org.apache.flink.table.descriptors.{DescriptorProperties, TableDescriptor, TableDescriptorValidator}
+import org.apache.flink.table.descriptors._
import org.apache.flink.table.util.Logging
+import org.apache.flink.util.Preconditions
import _root_.scala.collection.JavaConverters._
import _root_.scala.collection.mutable
/**
- * Unified interface to search for TableFactoryDiscoverable of provided type and properties.
+ * Unified interface to search for a [[TableFactory]] of provided type and properties.
*/
object TableFactoryService extends Logging {
private lazy val defaultLoader = ServiceLoader.load(classOf[TableFactory])
- def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = {
- find(clz, descriptor, null)
+ /**
+ * Finds a table factory of the given class and descriptor.
+ *
+ * @param factoryClass desired factory class
+ * @param descriptor descriptor describing the factory configuration
+ * @tparam T factory class type
+ * @return the matching factory
+ */
+ def find[T](factoryClass: Class[T], descriptor: Descriptor): T = {
+ Preconditions.checkNotNull(factoryClass)
+ Preconditions.checkNotNull(descriptor)
+
+ val descriptorProperties = new DescriptorProperties()
+ descriptor.addProperties(descriptorProperties)
+ findInternal(factoryClass, descriptorProperties.asMap, None)
}
- def find(clz: Class[_], descriptor: TableDescriptor, classLoader: ClassLoader)
- : TableFactory = {
+ /**
+ * Finds a table factory of the given class, descriptor, and classloader.
+ *
+ * @param factoryClass desired factory class
+ * @param descriptor descriptor describing the factory configuration
+ * @param classLoader classloader for service loading
+ * @tparam T factory class type
+ * @return the matching factory
+ */
+ def find[T](factoryClass: Class[T], descriptor: Descriptor, classLoader: ClassLoader): T = {
+ Preconditions.checkNotNull(factoryClass)
+ Preconditions.checkNotNull(descriptor)
+ Preconditions.checkNotNull(classLoader)
- val properties = new DescriptorProperties()
- descriptor.addProperties(properties)
- find(clz, properties.asMap.asScala.toMap, classLoader)
+ val descriptorProperties = new DescriptorProperties()
+ descriptor.addProperties(descriptorProperties)
+ findInternal(factoryClass, descriptorProperties.asMap, None)
}
- def find(clz: Class[_], properties: Map[String, String]): TableFactory = {
- find(clz: Class[_], properties, null)
+ /**
+ * Finds a table factory of the given class and property map.
+ *
+ * @param factoryClass desired factory class
+ * @param propertyMap properties that describe the factory configuration
+ * @tparam T factory class type
+ * @return the matching factory
+ */
+ def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): T = {
+ Preconditions.checkNotNull(factoryClass)
+ Preconditions.checkNotNull(propertyMap)
+
+ findInternal(factoryClass, propertyMap, None)
}
- def find(clz: Class[_], properties: Map[String, String],
- classLoader: ClassLoader): TableFactory = {
+ /**
+ * Finds a table factory of the given class, property map, and classloader.
+ *
+ * @param factoryClass desired factory class
+ * @param propertyMap properties that describe the factory configuration
+ * @param classLoader classloader for service loading
+ * @tparam T factory class type
+ * @return the matching factory
+ */
+ def find[T](
+ factoryClass: Class[T],
+ propertyMap: JMap[String, String],
+ classLoader: ClassLoader)
+ : T = {
+ Preconditions.checkNotNull(factoryClass)
+ Preconditions.checkNotNull(propertyMap)
+ Preconditions.checkNotNull(classLoader)
+
+ findInternal(factoryClass, propertyMap, Some(classLoader))
+ }
+
+ /**
+ * Finds a table factory of the given class, property map, and classloader.
+ *
+ * @param factoryClass desired factory class
+ * @param propertyMap properties that describe the factory configuration
+ * @param classLoader classloader for service loading
+ * @tparam T factory class type
+ * @return the matching factory
+ */
+ private def findInternal[T](
+ factoryClass: Class[T],
+ propertyMap: JMap[String, String],
+ classLoader: Option[ClassLoader])
+ : T = {
+
+ val properties = propertyMap.asScala.toMap
+
+ // discover table factories
+ val foundFactories = discoverFactories(classLoader)
- var matchingFactory: Option[(TableFactory, Seq[String])] = None
+ // filter by factory class
+ val classFactories = filterByFactoryClass(
+ factoryClass,
+ properties,
+ foundFactories)
+
+ // find matching context
+ val contextFactories = filterByContext(
+ factoryClass,
+ properties,
+ foundFactories,
+ classFactories)
+
+ // filter by supported keys
+ filterBySupportedProperties(
+ factoryClass,
+ properties,
+ foundFactories,
+ contextFactories)
+ }
+
+ /**
+ * Searches for factories using Java service providers.
+ *
+ * @return all factories in the classpath
+ */
+ private def discoverFactories[T](classLoader: Option[ClassLoader]): Seq[TableFactory] = {
+ val foundFactories = mutable.ArrayBuffer[TableFactory]()
try {
- val iter = if (classLoader == null) {
- defaultLoader.iterator()
- } else {
- val customLoader = ServiceLoader.load(classOf[TableFactory], classLoader)
- customLoader.iterator()
+ val iterator = classLoader match {
+ case Some(customClassLoader) =>
+ val customLoader = ServiceLoader.load(classOf[TableFactory], customClassLoader)
+ customLoader.iterator()
+ case None =>
+ defaultLoader.iterator()
}
- while (iter.hasNext) {
- val factory = iter.next()
-
- if (clz.isAssignableFrom(factory.getClass)) {
- val requiredContextJava = try {
- factory.requiredContext()
- } catch {
- case t: Throwable =>
- throw new TableException(
- s"Table source factory '${factory.getClass.getCanonicalName}' caused an exception.",
- t)
- }
-
- val requiredContext = if (requiredContextJava != null) {
- // normalize properties
- requiredContextJava.asScala.map(e => (e._1.toLowerCase, e._2))
- } else {
- Map[String, String]()
- }
-
- val plainContext = mutable.Map[String, String]()
- plainContext ++= requiredContext
- // we remove the versions for now until we have the first backwards compatibility case
- // with the version we can provide mappings in case the format changes
- plainContext.remove(CONNECTOR_PROPERTY_VERSION)
- plainContext.remove(FORMAT_PROPERTY_VERSION)
- plainContext.remove(METADATA_PROPERTY_VERSION)
- plainContext.remove(STATISTICS_PROPERTY_VERSION)
-
- if (plainContext.forall(e => properties.contains(e._1) && properties(e._1) == e._2)) {
- matchingFactory match {
- case Some(_) => throw new AmbiguousTableFactoryException(properties)
- case None => matchingFactory =
- Some((factory.asInstanceOf[TableFactory], requiredContext.keys.toSeq))
- }
- }
- }
+
+ while (iterator.hasNext) {
+ val factory = iterator.next()
+ foundFactories += factory
}
+
+ foundFactories
} catch {
case e: ServiceConfigurationError =>
LOG.error("Could not load service provider for table factories.", e)
throw new TableException("Could not load service provider for table factories.", e)
}
+ }
+
+ /**
+ * Filters for factories with matching context.
+ *
+ * @return all matching factories
+ */
+ private def filterByContext[T](
+ factoryClass: Class[T],
+ properties: Map[String, String],
+ foundFactories: Seq[TableFactory],
+ classFactories: Seq[TableFactory])
+ : Seq[TableFactory] = {
+
+ val matchingFactories = mutable.ArrayBuffer[TableFactory]()
+
+ classFactories.foreach { factory =>
--- End diff --
```
val matchingFactories = classFactories.filter(...)
```
?
---
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/flink/pull/6323
---
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/6323#discussion_r202506574
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala ---
@@ -44,32 +43,27 @@ object ExternalTableSourceUtil extends Logging {
val properties = new DescriptorProperties()
externalCatalogTable.addProperties(properties)
val javaMap = properties.asMap
- val source = TableFactoryService.find(classOf[TableSourceFactory[_]], javaMap)
- .asInstanceOf[TableSourceFactory[_]]
- .createTableSource(javaMap)
tableEnv match {
// check for a batch table source in this batch environment
case _: BatchTableEnvironment =>
- source match {
- case bts: BatchTableSource[_] =>
- new TableSourceSinkTable(Some(new BatchTableSourceTable(
- bts,
- new FlinkStatistic(externalCatalogTable.getTableStats))), None)
- case _ => throw new TableException(
- s"Found table source '${source.getClass.getCanonicalName}' is not applicable " +
- s"in a batch environment.")
- }
+ val source = TableFactoryService
--- End diff --
Usually it is very uncommon to define both a batch and streaming source in the same factory. Your proposed change would require all future sources to implement a check for the environment before which is unnecessary in 80% of the cases. Separating by environment is a concept that can be find throughout the entire `flink-table` module because both sources/sinks behave quite different.
---
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/6323#discussion_r202506740
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala ---
@@ -18,143 +18,358 @@
package org.apache.flink.table.factories
-import java.util.{ServiceConfigurationError, ServiceLoader}
+import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap}
import org.apache.flink.table.api._
import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
import org.apache.flink.table.descriptors.FormatDescriptorValidator._
import org.apache.flink.table.descriptors.MetadataValidator._
import org.apache.flink.table.descriptors.StatisticsValidator._
-import org.apache.flink.table.descriptors.{DescriptorProperties, TableDescriptor, TableDescriptorValidator}
+import org.apache.flink.table.descriptors._
import org.apache.flink.table.util.Logging
+import org.apache.flink.util.Preconditions
import _root_.scala.collection.JavaConverters._
import _root_.scala.collection.mutable
/**
- * Unified interface to search for TableFactoryDiscoverable of provided type and properties.
+ * Unified interface to search for a [[TableFactory]] of provided type and properties.
*/
object TableFactoryService extends Logging {
private lazy val defaultLoader = ServiceLoader.load(classOf[TableFactory])
- def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = {
- find(clz, descriptor, null)
+ /**
+ * Finds a table factory of the given class and descriptor.
+ *
+ * @param factoryClass desired factory class
+ * @param descriptor descriptor describing the factory configuration
+ * @tparam T factory class type
+ * @return the matching factory
+ */
+ def find[T](factoryClass: Class[T], descriptor: Descriptor): T = {
+ Preconditions.checkNotNull(factoryClass)
--- End diff --
The variable is only passed one time. The internal method is not checking for null gain.
---
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6323#discussion_r202268969
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedFieldMapping.scala ---
@@ -50,7 +50,8 @@ trait DefinedFieldMapping {
* type. It can also provide a mapping for fields which are not in the [[TableSchema]] to make
* fields in the physical [[TypeInformation]] accessible for a [[TimestampExtractor]].
*
- * @return A mapping from [[TableSchema]] fields to [[TypeInformation]] fields.
+ * @return A mapping from [[TableSchema]] fields to [[TypeInformation]] fields or
+ * null if no mapping is necessary.
*/
def getFieldMapping: JMap[String, String]
--- End diff --
annotate `Nullable` or change to `Optional`.
---
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6323#discussion_r202332281
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFormatFactory.scala ---
@@ -16,42 +16,17 @@
* limitations under the License.
*/
-package org.apache.flink.table.formats
+package org.apache.flink.table.factories
import java.util
-import org.apache.flink.api.common.serialization.{DeserializationSchema, SerializationSchema}
-
/**
- * A factory to create different table format instances. This factory is used with Java's Service
- * Provider Interfaces (SPI) for discovering. A factory is called with a set of normalized
- * properties that describe the desired format. The factory allows for matching to the given set of
- * properties. See also [[SerializationSchemaFactory]] and [[DeserializationSchemaFactory]] for
- * creating configured instances of format classes accordingly.
- *
- * Classes that implement this interface need to be added to the
- * "META_INF/services/org.apache.flink.table.formats.TableFormatFactory' file of a JAR file in
- * the current classpath to be found.
+ * A factory to create configured table format instances based on string-based properties. See
+ * also [[TableFactory]] for more information.
*
* @tparam T record type that the format produces or consumes
*/
-trait TableFormatFactory[T] {
-
- /**
- * Specifies the context that this factory has been implemented for. The framework guarantees
- * to only use the factory if the specified set of properties and values are met.
- *
- * Typical properties might be:
- * - format.type
- * - format.version
- *
- * Specified property versions allow the framework to provide backwards compatible properties
- * in case of string format changes:
- * - format.property-version
- *
- * An empty context means that the factory matches for all requests.
- */
- def requiredContext(): util.Map[String, String]
+trait TableFormatFactory[T] extends TableFactory {
--- End diff --
why haven't you removed `supportedProperties` as well?
---
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6323#discussion_r202521122
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala ---
@@ -18,143 +18,358 @@
package org.apache.flink.table.factories
-import java.util.{ServiceConfigurationError, ServiceLoader}
+import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap}
import org.apache.flink.table.api._
import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
import org.apache.flink.table.descriptors.FormatDescriptorValidator._
import org.apache.flink.table.descriptors.MetadataValidator._
import org.apache.flink.table.descriptors.StatisticsValidator._
-import org.apache.flink.table.descriptors.{DescriptorProperties, TableDescriptor, TableDescriptorValidator}
+import org.apache.flink.table.descriptors._
import org.apache.flink.table.util.Logging
+import org.apache.flink.util.Preconditions
import _root_.scala.collection.JavaConverters._
import _root_.scala.collection.mutable
/**
- * Unified interface to search for TableFactoryDiscoverable of provided type and properties.
+ * Unified interface to search for a [[TableFactory]] of provided type and properties.
*/
object TableFactoryService extends Logging {
private lazy val defaultLoader = ServiceLoader.load(classOf[TableFactory])
- def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = {
- find(clz, descriptor, null)
+ /**
+ * Finds a table factory of the given class and descriptor.
+ *
+ * @param factoryClass desired factory class
+ * @param descriptor descriptor describing the factory configuration
+ * @tparam T factory class type
+ * @return the matching factory
+ */
+ def find[T](factoryClass: Class[T], descriptor: Descriptor): T = {
+ Preconditions.checkNotNull(factoryClass)
--- End diff --
But this leads to quite some unnecessary code duplication. `checkNotNull(factoryClass)` appears 5 times here and same applies to other params. Doing the check only in the place where you are touching value would either solve or at least limit this issue.
---
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/6323#discussion_r202506625
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFormatFactory.scala ---
@@ -16,42 +16,17 @@
* limitations under the License.
*/
-package org.apache.flink.table.formats
+package org.apache.flink.table.factories
import java.util
-import org.apache.flink.api.common.serialization.{DeserializationSchema, SerializationSchema}
-
/**
- * A factory to create different table format instances. This factory is used with Java's Service
- * Provider Interfaces (SPI) for discovering. A factory is called with a set of normalized
- * properties that describe the desired format. The factory allows for matching to the given set of
- * properties. See also [[SerializationSchemaFactory]] and [[DeserializationSchemaFactory]] for
- * creating configured instances of format classes accordingly.
- *
- * Classes that implement this interface need to be added to the
- * "META_INF/services/org.apache.flink.table.formats.TableFormatFactory' file of a JAR file in
- * the current classpath to be found.
+ * A factory to create configured table format instances based on string-based properties. See
+ * also [[TableFactory]] for more information.
*
* @tparam T record type that the format produces or consumes
*/
-trait TableFormatFactory[T] {
-
- /**
- * Specifies the context that this factory has been implemented for. The framework guarantees
- * to only use the factory if the specified set of properties and values are met.
- *
- * Typical properties might be:
- * - format.type
- * - format.version
- *
- * Specified property versions allow the framework to provide backwards compatible properties
- * in case of string format changes:
- * - format.property-version
- *
- * An empty context means that the factory matches for all requests.
- */
- def requiredContext(): util.Map[String, String]
+trait TableFormatFactory[T] extends TableFactory {
--- End diff --
Because the Java comment explains the specific situation how supported format properties are handled.
---
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6323#discussion_r202274951
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactory.scala ---
@@ -16,14 +16,14 @@
* limitations under the License.
*/
-package org.apache.flink.table.connectors
+package org.apache.flink.table.factories
import java.util
/**
* Common trait for all properties-based discoverable table factories.
*/
-trait DiscoverableTableFactory {
+trait TableFactory {
--- End diff --
name `TableFactory` is kind of strange. `Table` here is more like package name and `Factory` is generic. I think I liked the previous name better, although it wasn't also very good but it's hard for me to figure out a better one.
---
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6323#discussion_r202351647
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala ---
@@ -465,14 +465,14 @@ abstract class TableEnvironment(val config: TableConfig) {
tableSink: TableSink[_]): Unit
/**
- * Registers an external [[TableSink]] which is already configured in this
- * [[TableEnvironment]]'s catalog.
+ * Registers an external [[TableSink]] with already configured field names and field types in
--- End diff --
ditto?
---
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/6323#discussion_r202506861
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala ---
@@ -18,143 +18,358 @@
package org.apache.flink.table.factories
-import java.util.{ServiceConfigurationError, ServiceLoader}
+import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap}
import org.apache.flink.table.api._
import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
import org.apache.flink.table.descriptors.FormatDescriptorValidator._
import org.apache.flink.table.descriptors.MetadataValidator._
import org.apache.flink.table.descriptors.StatisticsValidator._
-import org.apache.flink.table.descriptors.{DescriptorProperties, TableDescriptor, TableDescriptorValidator}
+import org.apache.flink.table.descriptors._
import org.apache.flink.table.util.Logging
+import org.apache.flink.util.Preconditions
import _root_.scala.collection.JavaConverters._
import _root_.scala.collection.mutable
/**
- * Unified interface to search for TableFactoryDiscoverable of provided type and properties.
+ * Unified interface to search for a [[TableFactory]] of provided type and properties.
*/
object TableFactoryService extends Logging {
private lazy val defaultLoader = ServiceLoader.load(classOf[TableFactory])
- def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = {
- find(clz, descriptor, null)
+ /**
+ * Finds a table factory of the given class and descriptor.
+ *
+ * @param factoryClass desired factory class
+ * @param descriptor descriptor describing the factory configuration
+ * @tparam T factory class type
+ * @return the matching factory
+ */
+ def find[T](factoryClass: Class[T], descriptor: Descriptor): T = {
+ Preconditions.checkNotNull(factoryClass)
+ Preconditions.checkNotNull(descriptor)
+
+ val descriptorProperties = new DescriptorProperties()
+ descriptor.addProperties(descriptorProperties)
+ findInternal(factoryClass, descriptorProperties.asMap, None)
}
- def find(clz: Class[_], descriptor: TableDescriptor, classLoader: ClassLoader)
- : TableFactory = {
+ /**
+ * Finds a table factory of the given class, descriptor, and classloader.
+ *
+ * @param factoryClass desired factory class
+ * @param descriptor descriptor describing the factory configuration
+ * @param classLoader classloader for service loading
+ * @tparam T factory class type
+ * @return the matching factory
+ */
+ def find[T](factoryClass: Class[T], descriptor: Descriptor, classLoader: ClassLoader): T = {
+ Preconditions.checkNotNull(factoryClass)
+ Preconditions.checkNotNull(descriptor)
+ Preconditions.checkNotNull(classLoader)
- val properties = new DescriptorProperties()
- descriptor.addProperties(properties)
- find(clz, properties.asMap.asScala.toMap, classLoader)
+ val descriptorProperties = new DescriptorProperties()
+ descriptor.addProperties(descriptorProperties)
+ findInternal(factoryClass, descriptorProperties.asMap, None)
}
- def find(clz: Class[_], properties: Map[String, String]): TableFactory = {
- find(clz: Class[_], properties, null)
+ /**
+ * Finds a table factory of the given class and property map.
+ *
+ * @param factoryClass desired factory class
+ * @param propertyMap properties that describe the factory configuration
+ * @tparam T factory class type
+ * @return the matching factory
+ */
+ def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): T = {
+ Preconditions.checkNotNull(factoryClass)
+ Preconditions.checkNotNull(propertyMap)
+
+ findInternal(factoryClass, propertyMap, None)
}
- def find(clz: Class[_], properties: Map[String, String],
- classLoader: ClassLoader): TableFactory = {
+ /**
+ * Finds a table factory of the given class, property map, and classloader.
+ *
+ * @param factoryClass desired factory class
+ * @param propertyMap properties that describe the factory configuration
+ * @param classLoader classloader for service loading
+ * @tparam T factory class type
+ * @return the matching factory
+ */
+ def find[T](
+ factoryClass: Class[T],
+ propertyMap: JMap[String, String],
+ classLoader: ClassLoader)
+ : T = {
+ Preconditions.checkNotNull(factoryClass)
+ Preconditions.checkNotNull(propertyMap)
+ Preconditions.checkNotNull(classLoader)
+
+ findInternal(factoryClass, propertyMap, Some(classLoader))
+ }
+
+ /**
+ * Finds a table factory of the given class, property map, and classloader.
+ *
+ * @param factoryClass desired factory class
+ * @param propertyMap properties that describe the factory configuration
+ * @param classLoader classloader for service loading
+ * @tparam T factory class type
+ * @return the matching factory
+ */
+ private def findInternal[T](
+ factoryClass: Class[T],
+ propertyMap: JMap[String, String],
+ classLoader: Option[ClassLoader])
+ : T = {
+
+ val properties = propertyMap.asScala.toMap
+
+ // discover table factories
+ val foundFactories = discoverFactories(classLoader)
- var matchingFactory: Option[(TableFactory, Seq[String])] = None
+ // filter by factory class
+ val classFactories = filterByFactoryClass(
+ factoryClass,
+ properties,
+ foundFactories)
+
+ // find matching context
+ val contextFactories = filterByContext(
+ factoryClass,
+ properties,
+ foundFactories,
+ classFactories)
+
+ // filter by supported keys
+ filterBySupportedProperties(
+ factoryClass,
+ properties,
+ foundFactories,
+ contextFactories)
+ }
+
+ /**
+ * Searches for factories using Java service providers.
+ *
+ * @return all factories in the classpath
+ */
+ private def discoverFactories[T](classLoader: Option[ClassLoader]): Seq[TableFactory] = {
+ val foundFactories = mutable.ArrayBuffer[TableFactory]()
try {
- val iter = if (classLoader == null) {
- defaultLoader.iterator()
- } else {
- val customLoader = ServiceLoader.load(classOf[TableFactory], classLoader)
- customLoader.iterator()
+ val iterator = classLoader match {
+ case Some(customClassLoader) =>
+ val customLoader = ServiceLoader.load(classOf[TableFactory], customClassLoader)
+ customLoader.iterator()
+ case None =>
+ defaultLoader.iterator()
}
- while (iter.hasNext) {
- val factory = iter.next()
-
- if (clz.isAssignableFrom(factory.getClass)) {
- val requiredContextJava = try {
- factory.requiredContext()
- } catch {
- case t: Throwable =>
- throw new TableException(
- s"Table source factory '${factory.getClass.getCanonicalName}' caused an exception.",
- t)
- }
-
- val requiredContext = if (requiredContextJava != null) {
- // normalize properties
- requiredContextJava.asScala.map(e => (e._1.toLowerCase, e._2))
- } else {
- Map[String, String]()
- }
-
- val plainContext = mutable.Map[String, String]()
- plainContext ++= requiredContext
- // we remove the versions for now until we have the first backwards compatibility case
- // with the version we can provide mappings in case the format changes
- plainContext.remove(CONNECTOR_PROPERTY_VERSION)
- plainContext.remove(FORMAT_PROPERTY_VERSION)
- plainContext.remove(METADATA_PROPERTY_VERSION)
- plainContext.remove(STATISTICS_PROPERTY_VERSION)
-
- if (plainContext.forall(e => properties.contains(e._1) && properties(e._1) == e._2)) {
- matchingFactory match {
- case Some(_) => throw new AmbiguousTableFactoryException(properties)
- case None => matchingFactory =
- Some((factory.asInstanceOf[TableFactory], requiredContext.keys.toSeq))
- }
- }
- }
+
+ while (iterator.hasNext) {
+ val factory = iterator.next()
+ foundFactories += factory
}
+
+ foundFactories
} catch {
case e: ServiceConfigurationError =>
LOG.error("Could not load service provider for table factories.", e)
throw new TableException("Could not load service provider for table factories.", e)
}
+ }
+
+ /**
+ * Filters for factories with matching context.
+ *
+ * @return all matching factories
+ */
+ private def filterByContext[T](
+ factoryClass: Class[T],
+ properties: Map[String, String],
+ foundFactories: Seq[TableFactory],
+ classFactories: Seq[TableFactory])
+ : Seq[TableFactory] = {
+
+ val matchingFactories = mutable.ArrayBuffer[TableFactory]()
+
+ classFactories.foreach { factory =>
+ val requestedContext = normalizeContext(factory)
+
+ val plainContext = mutable.Map[String, String]()
+ plainContext ++= requestedContext
+ // we remove the version for now until we have the first backwards compatibility case
+ // with the version we can provide mappings in case the format changes
--- End diff --
You are right. This could lead to confusion. I will remove these lines entirely.
---
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6323#discussion_r202335205
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala ---
@@ -18,143 +18,358 @@
package org.apache.flink.table.factories
-import java.util.{ServiceConfigurationError, ServiceLoader}
+import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap}
import org.apache.flink.table.api._
import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
import org.apache.flink.table.descriptors.FormatDescriptorValidator._
import org.apache.flink.table.descriptors.MetadataValidator._
import org.apache.flink.table.descriptors.StatisticsValidator._
-import org.apache.flink.table.descriptors.{DescriptorProperties, TableDescriptor, TableDescriptorValidator}
+import org.apache.flink.table.descriptors._
import org.apache.flink.table.util.Logging
+import org.apache.flink.util.Preconditions
import _root_.scala.collection.JavaConverters._
import _root_.scala.collection.mutable
/**
- * Unified interface to search for TableFactoryDiscoverable of provided type and properties.
+ * Unified interface to search for a [[TableFactory]] of provided type and properties.
*/
object TableFactoryService extends Logging {
private lazy val defaultLoader = ServiceLoader.load(classOf[TableFactory])
- def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = {
- find(clz, descriptor, null)
+ /**
+ * Finds a table factory of the given class and descriptor.
+ *
+ * @param factoryClass desired factory class
+ * @param descriptor descriptor describing the factory configuration
+ * @tparam T factory class type
+ * @return the matching factory
+ */
+ def find[T](factoryClass: Class[T], descriptor: Descriptor): T = {
+ Preconditions.checkNotNull(factoryClass)
+ Preconditions.checkNotNull(descriptor)
+
+ val descriptorProperties = new DescriptorProperties()
+ descriptor.addProperties(descriptorProperties)
+ findInternal(factoryClass, descriptorProperties.asMap, None)
}
- def find(clz: Class[_], descriptor: TableDescriptor, classLoader: ClassLoader)
- : TableFactory = {
+ /**
+ * Finds a table factory of the given class, descriptor, and classloader.
+ *
+ * @param factoryClass desired factory class
+ * @param descriptor descriptor describing the factory configuration
+ * @param classLoader classloader for service loading
+ * @tparam T factory class type
+ * @return the matching factory
+ */
+ def find[T](factoryClass: Class[T], descriptor: Descriptor, classLoader: ClassLoader): T = {
+ Preconditions.checkNotNull(factoryClass)
+ Preconditions.checkNotNull(descriptor)
+ Preconditions.checkNotNull(classLoader)
- val properties = new DescriptorProperties()
- descriptor.addProperties(properties)
- find(clz, properties.asMap.asScala.toMap, classLoader)
+ val descriptorProperties = new DescriptorProperties()
+ descriptor.addProperties(descriptorProperties)
+ findInternal(factoryClass, descriptorProperties.asMap, None)
}
- def find(clz: Class[_], properties: Map[String, String]): TableFactory = {
- find(clz: Class[_], properties, null)
+ /**
+ * Finds a table factory of the given class and property map.
+ *
+ * @param factoryClass desired factory class
+ * @param propertyMap properties that describe the factory configuration
+ * @tparam T factory class type
+ * @return the matching factory
+ */
+ def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): T = {
+ Preconditions.checkNotNull(factoryClass)
+ Preconditions.checkNotNull(propertyMap)
+
+ findInternal(factoryClass, propertyMap, None)
}
- def find(clz: Class[_], properties: Map[String, String],
- classLoader: ClassLoader): TableFactory = {
+ /**
+ * Finds a table factory of the given class, property map, and classloader.
+ *
+ * @param factoryClass desired factory class
+ * @param propertyMap properties that describe the factory configuration
+ * @param classLoader classloader for service loading
+ * @tparam T factory class type
+ * @return the matching factory
+ */
+ def find[T](
+ factoryClass: Class[T],
+ propertyMap: JMap[String, String],
+ classLoader: ClassLoader)
+ : T = {
+ Preconditions.checkNotNull(factoryClass)
+ Preconditions.checkNotNull(propertyMap)
+ Preconditions.checkNotNull(classLoader)
+
+ findInternal(factoryClass, propertyMap, Some(classLoader))
+ }
+
+ /**
+ * Finds a table factory of the given class, property map, and classloader.
+ *
+ * @param factoryClass desired factory class
+ * @param propertyMap properties that describe the factory configuration
+ * @param classLoader classloader for service loading
+ * @tparam T factory class type
+ * @return the matching factory
+ */
+ private def findInternal[T](
--- End diff --
Huge +1. This method/class looks so much better now :)
---
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6323#discussion_r202334688
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala ---
@@ -18,143 +18,358 @@
package org.apache.flink.table.factories
-import java.util.{ServiceConfigurationError, ServiceLoader}
+import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap}
import org.apache.flink.table.api._
import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
import org.apache.flink.table.descriptors.FormatDescriptorValidator._
import org.apache.flink.table.descriptors.MetadataValidator._
import org.apache.flink.table.descriptors.StatisticsValidator._
-import org.apache.flink.table.descriptors.{DescriptorProperties, TableDescriptor, TableDescriptorValidator}
+import org.apache.flink.table.descriptors._
import org.apache.flink.table.util.Logging
+import org.apache.flink.util.Preconditions
import _root_.scala.collection.JavaConverters._
import _root_.scala.collection.mutable
/**
- * Unified interface to search for TableFactoryDiscoverable of provided type and properties.
+ * Unified interface to search for a [[TableFactory]] of provided type and properties.
*/
object TableFactoryService extends Logging {
private lazy val defaultLoader = ServiceLoader.load(classOf[TableFactory])
- def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = {
- find(clz, descriptor, null)
+ /**
+ * Finds a table factory of the given class and descriptor.
+ *
+ * @param factoryClass desired factory class
+ * @param descriptor descriptor describing the factory configuration
+ * @tparam T factory class type
+ * @return the matching factory
+ */
+ def find[T](factoryClass: Class[T], descriptor: Descriptor): T = {
+ Preconditions.checkNotNull(factoryClass)
+ Preconditions.checkNotNull(descriptor)
+
+ val descriptorProperties = new DescriptorProperties()
+ descriptor.addProperties(descriptorProperties)
+ findInternal(factoryClass, descriptorProperties.asMap, None)
}
- def find(clz: Class[_], descriptor: TableDescriptor, classLoader: ClassLoader)
- : TableFactory = {
+ /**
+ * Finds a table factory of the given class, descriptor, and classloader.
+ *
+ * @param factoryClass desired factory class
+ * @param descriptor descriptor describing the factory configuration
+ * @param classLoader classloader for service loading
+ * @tparam T factory class type
+ * @return the matching factory
+ */
+ def find[T](factoryClass: Class[T], descriptor: Descriptor, classLoader: ClassLoader): T = {
+ Preconditions.checkNotNull(factoryClass)
+ Preconditions.checkNotNull(descriptor)
+ Preconditions.checkNotNull(classLoader)
- val properties = new DescriptorProperties()
- descriptor.addProperties(properties)
- find(clz, properties.asMap.asScala.toMap, classLoader)
+ val descriptorProperties = new DescriptorProperties()
+ descriptor.addProperties(descriptorProperties)
+ findInternal(factoryClass, descriptorProperties.asMap, None)
}
- def find(clz: Class[_], properties: Map[String, String]): TableFactory = {
- find(clz: Class[_], properties, null)
+ /**
+ * Finds a table factory of the given class and property map.
+ *
+ * @param factoryClass desired factory class
+ * @param propertyMap properties that describe the factory configuration
+ * @tparam T factory class type
+ * @return the matching factory
+ */
+ def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): T = {
+ Preconditions.checkNotNull(factoryClass)
+ Preconditions.checkNotNull(propertyMap)
+
+ findInternal(factoryClass, propertyMap, None)
}
- def find(clz: Class[_], properties: Map[String, String],
- classLoader: ClassLoader): TableFactory = {
+ /**
+ * Finds a table factory of the given class, property map, and classloader.
+ *
+ * @param factoryClass desired factory class
+ * @param propertyMap properties that describe the factory configuration
+ * @param classLoader classloader for service loading
+ * @tparam T factory class type
+ * @return the matching factory
+ */
+ def find[T](
+ factoryClass: Class[T],
+ propertyMap: JMap[String, String],
+ classLoader: ClassLoader)
+ : T = {
+ Preconditions.checkNotNull(factoryClass)
+ Preconditions.checkNotNull(propertyMap)
+ Preconditions.checkNotNull(classLoader)
+
+ findInternal(factoryClass, propertyMap, Some(classLoader))
+ }
+
+ /**
+ * Finds a table factory of the given class, property map, and classloader.
+ *
+ * @param factoryClass desired factory class
+ * @param propertyMap properties that describe the factory configuration
+ * @param classLoader classloader for service loading
+ * @tparam T factory class type
+ * @return the matching factory
+ */
+ private def findInternal[T](
+ factoryClass: Class[T],
+ propertyMap: JMap[String, String],
+ classLoader: Option[ClassLoader])
+ : T = {
+
+ val properties = propertyMap.asScala.toMap
+
+ // discover table factories
--- End diff --
drop the comments?
---
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6323#discussion_r202282625
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSinkDescriptor.scala ---
@@ -22,9 +22,11 @@ package org.apache.flink.table.descriptors
* Common class for all descriptors describing a table sink.
*/
abstract class TableSinkDescriptor extends TableDescriptor {
+
+ /**
+ * Internal method for properties conversion.
+ */
override private[flink] def addProperties(properties: DescriptorProperties): Unit = {
super.addProperties(properties)
- properties.putString(TableDescriptorValidator.TABLE_TYPE,
- TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE)
--- End diff --
why did those lines disappear?
---
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6323#discussion_r202268019
--- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java ---
@@ -54,51 +56,105 @@
* override {@link #createKafkaConsumer(String, Properties, DeserializationSchema)}}.
*/
@Internal
-public abstract class KafkaTableSource
- implements StreamTableSource<Row>, DefinedProctimeAttribute, DefinedRowtimeAttributes {
+public abstract class KafkaTableSource implements
+ StreamTableSource<Row>,
+ DefinedProctimeAttribute,
+ DefinedRowtimeAttributes,
+ DefinedFieldMapping {
+
+ // common table source attributes
+ // TODO make all attributes final once we drop support for format-specific table sources
/** The schema of the table. */
private final TableSchema schema;
+ /** Field name of the processing time attribute, null if no processing time field is defined. */
+ private String proctimeAttribute;
+
+ /** Descriptor for a rowtime attribute. */
+ private List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors;
+
+ /** Mapping for the fields of the table schema to fields of the physical returned type or null. */
+ private Map<String, String> fieldMapping;
+
+ // Kafka-specific attributes
+
/** The Kafka topic to consume. */
private final String topic;
/** Properties for the Kafka consumer. */
private final Properties properties;
- /** Type information describing the result type. */
- private TypeInformation<Row> returnType;
-
- /** Field name of the processing time attribute, null if no processing time field is defined. */
- private String proctimeAttribute;
-
- /** Descriptor for a rowtime attribute. */
- private List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors;
+ /** Deserialization schema for decoding records from Kafka. */
+ private final DeserializationSchema<Row> deserializationSchema;
/** The startup mode for the contained consumer (default is {@link StartupMode#GROUP_OFFSETS}). */
- private StartupMode startupMode = StartupMode.GROUP_OFFSETS;
+ private StartupMode startupMode;
/** Specific startup offsets; only relevant when startup mode is {@link StartupMode#SPECIFIC_OFFSETS}. */
private Map<KafkaTopicPartition, Long> specificStartupOffsets;
/**
* Creates a generic Kafka {@link StreamTableSource}.
*
- * @param topic Kafka topic to consume.
- * @param properties Properties for the Kafka consumer.
- * @param schema Schema of the produced table.
- * @param returnType Type information of the produced physical DataStream.
+ * @param schema Schema of the produced table.
+ * @param proctimeAttribute Field name of the processing time attribute, null if no
--- End diff --
> null
`Optional` or overload or use builder or disallow null. Same applies to classes that are extending from this one.
---
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/6323#discussion_r202507057
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala ---
@@ -52,12 +52,9 @@ class FlinkLogicalTableSourceScan(
override def deriveRowType(): RelDataType = {
val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
val streamingTable = table.unwrap(classOf[TableSourceSinkTable[_, _]]) match {
- case t: TableSourceSinkTable[_, _] => t.tableSourceTableOpt match {
- case Some(_: StreamTableSourceTable[_]) => true
- case Some(_: BatchTableSourceTable[_]) => false
- case _ => throw TableException(s"Unknown Table type ${t.getClass}.")
- }
- case t => throw TableException(s"Unknown Table type ${t.getClass}.")
+ case t: TableSourceSinkTable[_, _] if t.isStreamSourceTable => true
+ // null
--- End diff --
This information is useful.
---
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6323#discussion_r202337455
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala ---
@@ -18,143 +18,358 @@
package org.apache.flink.table.factories
-import java.util.{ServiceConfigurationError, ServiceLoader}
+import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap}
import org.apache.flink.table.api._
import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
import org.apache.flink.table.descriptors.FormatDescriptorValidator._
import org.apache.flink.table.descriptors.MetadataValidator._
import org.apache.flink.table.descriptors.StatisticsValidator._
-import org.apache.flink.table.descriptors.{DescriptorProperties, TableDescriptor, TableDescriptorValidator}
+import org.apache.flink.table.descriptors._
import org.apache.flink.table.util.Logging
+import org.apache.flink.util.Preconditions
import _root_.scala.collection.JavaConverters._
import _root_.scala.collection.mutable
/**
- * Unified interface to search for TableFactoryDiscoverable of provided type and properties.
+ * Unified interface to search for a [[TableFactory]] of provided type and properties.
*/
object TableFactoryService extends Logging {
private lazy val defaultLoader = ServiceLoader.load(classOf[TableFactory])
- def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = {
- find(clz, descriptor, null)
+ /**
+ * Finds a table factory of the given class and descriptor.
+ *
+ * @param factoryClass desired factory class
+ * @param descriptor descriptor describing the factory configuration
+ * @tparam T factory class type
+ * @return the matching factory
+ */
+ def find[T](factoryClass: Class[T], descriptor: Descriptor): T = {
+ Preconditions.checkNotNull(factoryClass)
+ Preconditions.checkNotNull(descriptor)
+
+ val descriptorProperties = new DescriptorProperties()
+ descriptor.addProperties(descriptorProperties)
+ findInternal(factoryClass, descriptorProperties.asMap, None)
}
- def find(clz: Class[_], descriptor: TableDescriptor, classLoader: ClassLoader)
- : TableFactory = {
+ /**
+ * Finds a table factory of the given class, descriptor, and classloader.
+ *
+ * @param factoryClass desired factory class
+ * @param descriptor descriptor describing the factory configuration
+ * @param classLoader classloader for service loading
+ * @tparam T factory class type
+ * @return the matching factory
+ */
+ def find[T](factoryClass: Class[T], descriptor: Descriptor, classLoader: ClassLoader): T = {
+ Preconditions.checkNotNull(factoryClass)
+ Preconditions.checkNotNull(descriptor)
+ Preconditions.checkNotNull(classLoader)
- val properties = new DescriptorProperties()
- descriptor.addProperties(properties)
- find(clz, properties.asMap.asScala.toMap, classLoader)
+ val descriptorProperties = new DescriptorProperties()
+ descriptor.addProperties(descriptorProperties)
+ findInternal(factoryClass, descriptorProperties.asMap, None)
}
- def find(clz: Class[_], properties: Map[String, String]): TableFactory = {
- find(clz: Class[_], properties, null)
+ /**
+ * Finds a table factory of the given class and property map.
+ *
+ * @param factoryClass desired factory class
+ * @param propertyMap properties that describe the factory configuration
+ * @tparam T factory class type
+ * @return the matching factory
+ */
+ def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): T = {
+ Preconditions.checkNotNull(factoryClass)
+ Preconditions.checkNotNull(propertyMap)
+
+ findInternal(factoryClass, propertyMap, None)
}
- def find(clz: Class[_], properties: Map[String, String],
- classLoader: ClassLoader): TableFactory = {
+ /**
+ * Finds a table factory of the given class, property map, and classloader.
+ *
+ * @param factoryClass desired factory class
+ * @param propertyMap properties that describe the factory configuration
+ * @param classLoader classloader for service loading
+ * @tparam T factory class type
+ * @return the matching factory
+ */
+ def find[T](
+ factoryClass: Class[T],
+ propertyMap: JMap[String, String],
+ classLoader: ClassLoader)
+ : T = {
+ Preconditions.checkNotNull(factoryClass)
+ Preconditions.checkNotNull(propertyMap)
+ Preconditions.checkNotNull(classLoader)
+
+ findInternal(factoryClass, propertyMap, Some(classLoader))
+ }
+
+ /**
+ * Finds a table factory of the given class, property map, and classloader.
+ *
+ * @param factoryClass desired factory class
+ * @param propertyMap properties that describe the factory configuration
+ * @param classLoader classloader for service loading
+ * @tparam T factory class type
+ * @return the matching factory
+ */
+ private def findInternal[T](
+ factoryClass: Class[T],
+ propertyMap: JMap[String, String],
+ classLoader: Option[ClassLoader])
+ : T = {
+
+ val properties = propertyMap.asScala.toMap
+
+ // discover table factories
+ val foundFactories = discoverFactories(classLoader)
- var matchingFactory: Option[(TableFactory, Seq[String])] = None
+ // filter by factory class
+ val classFactories = filterByFactoryClass(
+ factoryClass,
+ properties,
+ foundFactories)
+
+ // find matching context
+ val contextFactories = filterByContext(
+ factoryClass,
+ properties,
+ foundFactories,
+ classFactories)
+
+ // filter by supported keys
+ filterBySupportedProperties(
+ factoryClass,
+ properties,
+ foundFactories,
+ contextFactories)
+ }
+
+ /**
+ * Searches for factories using Java service providers.
+ *
+ * @return all factories in the classpath
+ */
+ private def discoverFactories[T](classLoader: Option[ClassLoader]): Seq[TableFactory] = {
+ val foundFactories = mutable.ArrayBuffer[TableFactory]()
try {
- val iter = if (classLoader == null) {
- defaultLoader.iterator()
- } else {
- val customLoader = ServiceLoader.load(classOf[TableFactory], classLoader)
- customLoader.iterator()
+ val iterator = classLoader match {
+ case Some(customClassLoader) =>
+ val customLoader = ServiceLoader.load(classOf[TableFactory], customClassLoader)
+ customLoader.iterator()
+ case None =>
+ defaultLoader.iterator()
}
- while (iter.hasNext) {
- val factory = iter.next()
-
- if (clz.isAssignableFrom(factory.getClass)) {
- val requiredContextJava = try {
- factory.requiredContext()
- } catch {
- case t: Throwable =>
- throw new TableException(
- s"Table source factory '${factory.getClass.getCanonicalName}' caused an exception.",
- t)
- }
-
- val requiredContext = if (requiredContextJava != null) {
- // normalize properties
- requiredContextJava.asScala.map(e => (e._1.toLowerCase, e._2))
- } else {
- Map[String, String]()
- }
-
- val plainContext = mutable.Map[String, String]()
- plainContext ++= requiredContext
- // we remove the versions for now until we have the first backwards compatibility case
- // with the version we can provide mappings in case the format changes
- plainContext.remove(CONNECTOR_PROPERTY_VERSION)
- plainContext.remove(FORMAT_PROPERTY_VERSION)
- plainContext.remove(METADATA_PROPERTY_VERSION)
- plainContext.remove(STATISTICS_PROPERTY_VERSION)
-
- if (plainContext.forall(e => properties.contains(e._1) && properties(e._1) == e._2)) {
- matchingFactory match {
- case Some(_) => throw new AmbiguousTableFactoryException(properties)
- case None => matchingFactory =
- Some((factory.asInstanceOf[TableFactory], requiredContext.keys.toSeq))
- }
- }
- }
+
+ while (iterator.hasNext) {
--- End diff --
is there no one liner way to convert iterator to seq? Does this method need to work on `Seq`?
---
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6323#discussion_r202268054
--- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java ---
@@ -54,51 +56,105 @@
* override {@link #createKafkaConsumer(String, Properties, DeserializationSchema)}}.
*/
@Internal
-public abstract class KafkaTableSource
- implements StreamTableSource<Row>, DefinedProctimeAttribute, DefinedRowtimeAttributes {
+public abstract class KafkaTableSource implements
+ StreamTableSource<Row>,
+ DefinedProctimeAttribute,
+ DefinedRowtimeAttributes,
+ DefinedFieldMapping {
+
+ // common table source attributes
+ // TODO make all attributes final once we drop support for format-specific table sources
/** The schema of the table. */
private final TableSchema schema;
+ /** Field name of the processing time attribute, null if no processing time field is defined. */
+ private String proctimeAttribute;
+
+ /** Descriptor for a rowtime attribute. */
+ private List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors;
+
+ /** Mapping for the fields of the table schema to fields of the physical returned type or null. */
+ private Map<String, String> fieldMapping;
+
+ // Kafka-specific attributes
+
/** The Kafka topic to consume. */
private final String topic;
/** Properties for the Kafka consumer. */
private final Properties properties;
- /** Type information describing the result type. */
- private TypeInformation<Row> returnType;
-
- /** Field name of the processing time attribute, null if no processing time field is defined. */
- private String proctimeAttribute;
-
- /** Descriptor for a rowtime attribute. */
- private List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors;
+ /** Deserialization schema for decoding records from Kafka. */
+ private final DeserializationSchema<Row> deserializationSchema;
/** The startup mode for the contained consumer (default is {@link StartupMode#GROUP_OFFSETS}). */
- private StartupMode startupMode = StartupMode.GROUP_OFFSETS;
+ private StartupMode startupMode;
/** Specific startup offsets; only relevant when startup mode is {@link StartupMode#SPECIFIC_OFFSETS}. */
private Map<KafkaTopicPartition, Long> specificStartupOffsets;
/**
* Creates a generic Kafka {@link StreamTableSource}.
*
- * @param topic Kafka topic to consume.
- * @param properties Properties for the Kafka consumer.
- * @param schema Schema of the produced table.
- * @param returnType Type information of the produced physical DataStream.
+ * @param schema Schema of the produced table.
+ * @param proctimeAttribute Field name of the processing time attribute, null if no
+ * processing time field is defined.
+ * @param rowtimeAttributeDescriptors Descriptor for a rowtime attribute
+ * @param fieldMapping Mapping for the fields of the table schema to
--- End diff --
>or null
ditto
---
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6323#discussion_r202333467
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala ---
@@ -143,118 +143,82 @@ case class CatalogAlreadyExistException(
}
/**
- * Exception for not finding a [[TableFormatFactory]] for the
- * given properties.
+ * Exception for not finding a [[TableFactory]] for the given properties.
*
* @param message message that indicates the current matching step
* @param factoryClass required factory class
- * @param formatFactories all found factories
- * @param properties properties that describe the table format
+ * @param factories all found factories
+ * @param properties properties that describe the configuration
* @param cause the cause
*/
-case class NoMatchingTableFormatException(
+case class NoMatchingTableFactoryException(
message: String,
factoryClass: Class[_],
- formatFactories: Seq[TableFormatFactory[_]],
+ factories: Seq[TableFactory],
properties: Map[String, String],
cause: Throwable)
extends RuntimeException(
--- End diff --
side note: shouldn't our exception inherit from `FlinkTableException` or sth like that?
---
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6323#discussion_r202354777
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceSinkTable.scala ---
@@ -21,23 +21,47 @@ package org.apache.flink.table.plan.schema
import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
import org.apache.calcite.schema.Statistic
import org.apache.calcite.schema.impl.AbstractTable
+import org.apache.flink.table.api.TableException
-class TableSourceSinkTable[T1, T2](val tableSourceTableOpt: Option[TableSourceTable[T1]],
- val tableSinkTableOpt: Option[TableSinkTable[T2]])
+/**
--- End diff --
ditto: fixup Shuyi's commit
---
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6323#discussion_r202521212
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala ---
@@ -18,143 +18,358 @@
package org.apache.flink.table.factories
-import java.util.{ServiceConfigurationError, ServiceLoader}
+import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap}
import org.apache.flink.table.api._
import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
import org.apache.flink.table.descriptors.FormatDescriptorValidator._
import org.apache.flink.table.descriptors.MetadataValidator._
import org.apache.flink.table.descriptors.StatisticsValidator._
-import org.apache.flink.table.descriptors.{DescriptorProperties, TableDescriptor, TableDescriptorValidator}
+import org.apache.flink.table.descriptors._
import org.apache.flink.table.util.Logging
+import org.apache.flink.util.Preconditions
import _root_.scala.collection.JavaConverters._
import _root_.scala.collection.mutable
/**
- * Unified interface to search for TableFactoryDiscoverable of provided type and properties.
+ * Unified interface to search for a [[TableFactory]] of provided type and properties.
*/
object TableFactoryService extends Logging {
private lazy val defaultLoader = ServiceLoader.load(classOf[TableFactory])
- def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = {
- find(clz, descriptor, null)
+ /**
+ * Finds a table factory of the given class and descriptor.
+ *
+ * @param factoryClass desired factory class
+ * @param descriptor descriptor describing the factory configuration
+ * @tparam T factory class type
+ * @return the matching factory
+ */
+ def find[T](factoryClass: Class[T], descriptor: Descriptor): T = {
+ Preconditions.checkNotNull(factoryClass)
+ Preconditions.checkNotNull(descriptor)
+
+ val descriptorProperties = new DescriptorProperties()
+ descriptor.addProperties(descriptorProperties)
+ findInternal(factoryClass, descriptorProperties.asMap, None)
}
- def find(clz: Class[_], descriptor: TableDescriptor, classLoader: ClassLoader)
- : TableFactory = {
+ /**
+ * Finds a table factory of the given class, descriptor, and classloader.
+ *
+ * @param factoryClass desired factory class
+ * @param descriptor descriptor describing the factory configuration
+ * @param classLoader classloader for service loading
+ * @tparam T factory class type
+ * @return the matching factory
+ */
+ def find[T](factoryClass: Class[T], descriptor: Descriptor, classLoader: ClassLoader): T = {
+ Preconditions.checkNotNull(factoryClass)
+ Preconditions.checkNotNull(descriptor)
+ Preconditions.checkNotNull(classLoader)
- val properties = new DescriptorProperties()
- descriptor.addProperties(properties)
- find(clz, properties.asMap.asScala.toMap, classLoader)
+ val descriptorProperties = new DescriptorProperties()
+ descriptor.addProperties(descriptorProperties)
+ findInternal(factoryClass, descriptorProperties.asMap, None)
}
- def find(clz: Class[_], properties: Map[String, String]): TableFactory = {
- find(clz: Class[_], properties, null)
+ /**
+ * Finds a table factory of the given class and property map.
+ *
+ * @param factoryClass desired factory class
+ * @param propertyMap properties that describe the factory configuration
+ * @tparam T factory class type
+ * @return the matching factory
+ */
+ def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): T = {
+ Preconditions.checkNotNull(factoryClass)
+ Preconditions.checkNotNull(propertyMap)
+
+ findInternal(factoryClass, propertyMap, None)
}
- def find(clz: Class[_], properties: Map[String, String],
- classLoader: ClassLoader): TableFactory = {
+ /**
+ * Finds a table factory of the given class, property map, and classloader.
+ *
+ * @param factoryClass desired factory class
+ * @param propertyMap properties that describe the factory configuration
+ * @param classLoader classloader for service loading
+ * @tparam T factory class type
+ * @return the matching factory
+ */
+ def find[T](
+ factoryClass: Class[T],
+ propertyMap: JMap[String, String],
+ classLoader: ClassLoader)
+ : T = {
+ Preconditions.checkNotNull(factoryClass)
+ Preconditions.checkNotNull(propertyMap)
+ Preconditions.checkNotNull(classLoader)
+
+ findInternal(factoryClass, propertyMap, Some(classLoader))
+ }
+
+ /**
+ * Finds a table factory of the given class, property map, and classloader.
+ *
+ * @param factoryClass desired factory class
+ * @param propertyMap properties that describe the factory configuration
+ * @param classLoader classloader for service loading
+ * @tparam T factory class type
+ * @return the matching factory
+ */
+ private def findInternal[T](
+ factoryClass: Class[T],
+ propertyMap: JMap[String, String],
+ classLoader: Option[ClassLoader])
+ : T = {
+
+ val properties = propertyMap.asScala.toMap
+
+ // discover table factories
+ val foundFactories = discoverFactories(classLoader)
- var matchingFactory: Option[(TableFactory, Seq[String])] = None
+ // filter by factory class
+ val classFactories = filterByFactoryClass(
+ factoryClass,
+ properties,
+ foundFactories)
+
+ // find matching context
+ val contextFactories = filterByContext(
+ factoryClass,
+ properties,
+ foundFactories,
+ classFactories)
+
+ // filter by supported keys
+ filterBySupportedProperties(
+ factoryClass,
+ properties,
+ foundFactories,
+ contextFactories)
+ }
+
+ /**
+ * Searches for factories using Java service providers.
+ *
+ * @return all factories in the classpath
+ */
+ private def discoverFactories[T](classLoader: Option[ClassLoader]): Seq[TableFactory] = {
+ val foundFactories = mutable.ArrayBuffer[TableFactory]()
try {
- val iter = if (classLoader == null) {
- defaultLoader.iterator()
- } else {
- val customLoader = ServiceLoader.load(classOf[TableFactory], classLoader)
- customLoader.iterator()
+ val iterator = classLoader match {
+ case Some(customClassLoader) =>
+ val customLoader = ServiceLoader.load(classOf[TableFactory], customClassLoader)
+ customLoader.iterator()
+ case None =>
+ defaultLoader.iterator()
}
- while (iter.hasNext) {
- val factory = iter.next()
-
- if (clz.isAssignableFrom(factory.getClass)) {
- val requiredContextJava = try {
- factory.requiredContext()
- } catch {
- case t: Throwable =>
- throw new TableException(
- s"Table source factory '${factory.getClass.getCanonicalName}' caused an exception.",
- t)
- }
-
- val requiredContext = if (requiredContextJava != null) {
- // normalize properties
- requiredContextJava.asScala.map(e => (e._1.toLowerCase, e._2))
- } else {
- Map[String, String]()
- }
-
- val plainContext = mutable.Map[String, String]()
- plainContext ++= requiredContext
- // we remove the versions for now until we have the first backwards compatibility case
- // with the version we can provide mappings in case the format changes
- plainContext.remove(CONNECTOR_PROPERTY_VERSION)
- plainContext.remove(FORMAT_PROPERTY_VERSION)
- plainContext.remove(METADATA_PROPERTY_VERSION)
- plainContext.remove(STATISTICS_PROPERTY_VERSION)
-
- if (plainContext.forall(e => properties.contains(e._1) && properties(e._1) == e._2)) {
- matchingFactory match {
- case Some(_) => throw new AmbiguousTableFactoryException(properties)
- case None => matchingFactory =
- Some((factory.asInstanceOf[TableFactory], requiredContext.keys.toSeq))
- }
- }
- }
+
+ while (iterator.hasNext) {
+ val factory = iterator.next()
+ foundFactories += factory
}
+
+ foundFactories
} catch {
case e: ServiceConfigurationError =>
LOG.error("Could not load service provider for table factories.", e)
throw new TableException("Could not load service provider for table factories.", e)
}
+ }
+
+ /**
+ * Filters for factories with matching context.
+ *
+ * @return all matching factories
+ */
+ private def filterByContext[T](
+ factoryClass: Class[T],
+ properties: Map[String, String],
+ foundFactories: Seq[TableFactory],
+ classFactories: Seq[TableFactory])
+ : Seq[TableFactory] = {
+
+ val matchingFactories = mutable.ArrayBuffer[TableFactory]()
+
+ classFactories.foreach { factory =>
+ val requestedContext = normalizeContext(factory)
+
+ val plainContext = mutable.Map[String, String]()
+ plainContext ++= requestedContext
+ // we remove the version for now until we have the first backwards compatibility case
+ // with the version we can provide mappings in case the format changes
--- End diff --
Can you create a follow up issue with blocker status for `1.6.0` so that it doesn't slip?
---
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6323#discussion_r202520938
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactory.scala ---
@@ -16,14 +16,14 @@
* limitations under the License.
*/
-package org.apache.flink.table.connectors
+package org.apache.flink.table.factories
import java.util
/**
* Common trait for all properties-based discoverable table factories.
*/
-trait DiscoverableTableFactory {
+trait TableFactory {
--- End diff --
I am not happy but I can not think about something better :( `Factory` is way to generic. It implies that this is THE Factory, THE One Factory To Rule Them All and that's not true. There are plenty of other factories in `flink-table`. `Table` prefix as you confirmed is just a package prefix, thus doesn't improve anything :(
---
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6323#discussion_r202521065
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFormatFactory.scala ---
@@ -16,42 +16,17 @@
* limitations under the License.
*/
-package org.apache.flink.table.formats
+package org.apache.flink.table.factories
import java.util
-import org.apache.flink.api.common.serialization.{DeserializationSchema, SerializationSchema}
-
/**
- * A factory to create different table format instances. This factory is used with Java's Service
- * Provider Interfaces (SPI) for discovering. A factory is called with a set of normalized
- * properties that describe the desired format. The factory allows for matching to the given set of
- * properties. See also [[SerializationSchemaFactory]] and [[DeserializationSchemaFactory]] for
- * creating configured instances of format classes accordingly.
- *
- * Classes that implement this interface need to be added to the
- * "META_INF/services/org.apache.flink.table.formats.TableFormatFactory' file of a JAR file in
- * the current classpath to be found.
+ * A factory to create configured table format instances based on string-based properties. See
+ * also [[TableFactory]] for more information.
*
* @tparam T record type that the format produces or consumes
*/
-trait TableFormatFactory[T] {
-
- /**
- * Specifies the context that this factory has been implemented for. The framework guarantees
- * to only use the factory if the specified set of properties and values are met.
- *
- * Typical properties might be:
- * - format.type
- * - format.version
- *
- * Specified property versions allow the framework to provide backwards compatible properties
- * in case of string format changes:
- * - format.property-version
- *
- * An empty context means that the factory matches for all requests.
- */
- def requiredContext(): util.Map[String, String]
+trait TableFormatFactory[T] extends TableFactory {
--- End diff --
Maybe in that case deduplicate comment with `@See` java doc pointer? Otherwise there is huge risk of comments drifting out of sink.
---
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6323#discussion_r202281623
--- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java ---
@@ -214,15 +216,18 @@ private static TableDescriptor createTableDescriptor(String name, Map<String, Ob
if (typeObject == null || !(typeObject instanceof String)) {
throw new SqlClientException("Invalid 'type' attribute for table '" + name + "'.");
}
- final String type = (String) config.get(TABLE_TYPE);
+ final String type = (String) typeObject;
+ final Map<String, Object> properties = new HashMap<>(config);
--- End diff --
copy of `config` named as `properties`? this is confusing. Rename `config` to `properties` and inline this copy? Or inline `properties` and rename `normalizedProperties` back to `normalizedConfig`
---
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/6323#discussion_r202506906
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala ---
@@ -18,143 +18,358 @@
package org.apache.flink.table.factories
-import java.util.{ServiceConfigurationError, ServiceLoader}
+import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap}
import org.apache.flink.table.api._
import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
import org.apache.flink.table.descriptors.FormatDescriptorValidator._
import org.apache.flink.table.descriptors.MetadataValidator._
import org.apache.flink.table.descriptors.StatisticsValidator._
-import org.apache.flink.table.descriptors.{DescriptorProperties, TableDescriptor, TableDescriptorValidator}
+import org.apache.flink.table.descriptors._
import org.apache.flink.table.util.Logging
+import org.apache.flink.util.Preconditions
import _root_.scala.collection.JavaConverters._
import _root_.scala.collection.mutable
/**
- * Unified interface to search for TableFactoryDiscoverable of provided type and properties.
+ * Unified interface to search for a [[TableFactory]] of provided type and properties.
*/
object TableFactoryService extends Logging {
private lazy val defaultLoader = ServiceLoader.load(classOf[TableFactory])
- def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = {
- find(clz, descriptor, null)
+ /**
+ * Finds a table factory of the given class and descriptor.
+ *
+ * @param factoryClass desired factory class
+ * @param descriptor descriptor describing the factory configuration
+ * @tparam T factory class type
+ * @return the matching factory
+ */
+ def find[T](factoryClass: Class[T], descriptor: Descriptor): T = {
+ Preconditions.checkNotNull(factoryClass)
+ Preconditions.checkNotNull(descriptor)
+
+ val descriptorProperties = new DescriptorProperties()
+ descriptor.addProperties(descriptorProperties)
+ findInternal(factoryClass, descriptorProperties.asMap, None)
}
- def find(clz: Class[_], descriptor: TableDescriptor, classLoader: ClassLoader)
- : TableFactory = {
+ /**
+ * Finds a table factory of the given class, descriptor, and classloader.
+ *
+ * @param factoryClass desired factory class
+ * @param descriptor descriptor describing the factory configuration
+ * @param classLoader classloader for service loading
+ * @tparam T factory class type
+ * @return the matching factory
+ */
+ def find[T](factoryClass: Class[T], descriptor: Descriptor, classLoader: ClassLoader): T = {
+ Preconditions.checkNotNull(factoryClass)
+ Preconditions.checkNotNull(descriptor)
+ Preconditions.checkNotNull(classLoader)
- val properties = new DescriptorProperties()
- descriptor.addProperties(properties)
- find(clz, properties.asMap.asScala.toMap, classLoader)
+ val descriptorProperties = new DescriptorProperties()
+ descriptor.addProperties(descriptorProperties)
+ findInternal(factoryClass, descriptorProperties.asMap, None)
}
- def find(clz: Class[_], properties: Map[String, String]): TableFactory = {
- find(clz: Class[_], properties, null)
+ /**
+ * Finds a table factory of the given class and property map.
+ *
+ * @param factoryClass desired factory class
+ * @param propertyMap properties that describe the factory configuration
+ * @tparam T factory class type
+ * @return the matching factory
+ */
+ def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): T = {
+ Preconditions.checkNotNull(factoryClass)
+ Preconditions.checkNotNull(propertyMap)
+
+ findInternal(factoryClass, propertyMap, None)
}
- def find(clz: Class[_], properties: Map[String, String],
- classLoader: ClassLoader): TableFactory = {
+ /**
+ * Finds a table factory of the given class, property map, and classloader.
+ *
+ * @param factoryClass desired factory class
+ * @param propertyMap properties that describe the factory configuration
+ * @param classLoader classloader for service loading
+ * @tparam T factory class type
+ * @return the matching factory
+ */
+ def find[T](
+ factoryClass: Class[T],
+ propertyMap: JMap[String, String],
+ classLoader: ClassLoader)
+ : T = {
+ Preconditions.checkNotNull(factoryClass)
+ Preconditions.checkNotNull(propertyMap)
+ Preconditions.checkNotNull(classLoader)
+
+ findInternal(factoryClass, propertyMap, Some(classLoader))
+ }
+
+ /**
+ * Finds a table factory of the given class, property map, and classloader.
+ *
+ * @param factoryClass desired factory class
+ * @param propertyMap properties that describe the factory configuration
+ * @param classLoader classloader for service loading
+ * @tparam T factory class type
+ * @return the matching factory
+ */
+ private def findInternal[T](
+ factoryClass: Class[T],
+ propertyMap: JMap[String, String],
+ classLoader: Option[ClassLoader])
+ : T = {
+
+ val properties = propertyMap.asScala.toMap
+
+ // discover table factories
+ val foundFactories = discoverFactories(classLoader)
- var matchingFactory: Option[(TableFactory, Seq[String])] = None
+ // filter by factory class
+ val classFactories = filterByFactoryClass(
+ factoryClass,
+ properties,
+ foundFactories)
+
+ // find matching context
+ val contextFactories = filterByContext(
+ factoryClass,
+ properties,
+ foundFactories,
+ classFactories)
+
+ // filter by supported keys
+ filterBySupportedProperties(
+ factoryClass,
+ properties,
+ foundFactories,
+ contextFactories)
+ }
+
+ /**
+ * Searches for factories using Java service providers.
+ *
+ * @return all factories in the classpath
+ */
+ private def discoverFactories[T](classLoader: Option[ClassLoader]): Seq[TableFactory] = {
+ val foundFactories = mutable.ArrayBuffer[TableFactory]()
try {
- val iter = if (classLoader == null) {
- defaultLoader.iterator()
- } else {
- val customLoader = ServiceLoader.load(classOf[TableFactory], classLoader)
- customLoader.iterator()
+ val iterator = classLoader match {
+ case Some(customClassLoader) =>
+ val customLoader = ServiceLoader.load(classOf[TableFactory], customClassLoader)
+ customLoader.iterator()
+ case None =>
+ defaultLoader.iterator()
}
- while (iter.hasNext) {
- val factory = iter.next()
-
- if (clz.isAssignableFrom(factory.getClass)) {
- val requiredContextJava = try {
- factory.requiredContext()
- } catch {
- case t: Throwable =>
- throw new TableException(
- s"Table source factory '${factory.getClass.getCanonicalName}' caused an exception.",
- t)
- }
-
- val requiredContext = if (requiredContextJava != null) {
- // normalize properties
- requiredContextJava.asScala.map(e => (e._1.toLowerCase, e._2))
- } else {
- Map[String, String]()
- }
-
- val plainContext = mutable.Map[String, String]()
- plainContext ++= requiredContext
- // we remove the versions for now until we have the first backwards compatibility case
- // with the version we can provide mappings in case the format changes
- plainContext.remove(CONNECTOR_PROPERTY_VERSION)
- plainContext.remove(FORMAT_PROPERTY_VERSION)
- plainContext.remove(METADATA_PROPERTY_VERSION)
- plainContext.remove(STATISTICS_PROPERTY_VERSION)
-
- if (plainContext.forall(e => properties.contains(e._1) && properties(e._1) == e._2)) {
- matchingFactory match {
- case Some(_) => throw new AmbiguousTableFactoryException(properties)
- case None => matchingFactory =
- Some((factory.asInstanceOf[TableFactory], requiredContext.keys.toSeq))
- }
- }
- }
+
+ while (iterator.hasNext) {
--- End diff --
+1 a `iterator.asScala.toSeq` does the job
---
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6323#discussion_r202350406
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ---
@@ -114,6 +114,8 @@ abstract class StreamTableEnvironment(
: Unit = {
tableSource match {
+
+ // check for proper stream table source
--- End diff --
ditto?
---
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6323#discussion_r202356170
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactory.scala ---
@@ -104,17 +100,12 @@ class CsvTableSinkFactory
"Encodings that differ from the schema are not supported yet for CsvTableSink.")
}
- toScala(params.getOptionalString(CONNECTOR_PATH))
- .foreach(csvTableSinkBuilder.path)
- toScala(params.getOptionalInt(NUM_FILES))
- .foreach(n => csvTableSinkBuilder.numFiles(n))
- toScala(params.getOptionalString(WRITE_MODE))
- .foreach(csvTableSinkBuilder.writeMode)
- toScala(params.getOptionalString(FORMAT_FIELD_DELIMITER))
- .foreach(csvTableSinkBuilder.fieldDelimiter)
-
- csvTableSinkBuilder
- .build()
+ val path = params.getString(CONNECTOR_PATH)
--- End diff --
ditto fixup
---
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6323#discussion_r202271871
--- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactory.java ---
@@ -124,110 +123,127 @@
params.putProperties(properties);
// validate
- new SchemaValidator(true).validate(params);
+ // allow Kafka timestamps to be used, watermarks can not be received from source
+ new SchemaValidator(true, supportsKafkaTimestamps(), false).validate(params);
new KafkaValidator().validate(params);
- formatValidator().validate(params);
- // build
- final KafkaTableSource.Builder builder = createBuilderWithFormat(params);
+ // deserialization schema using format discovery
+ final DeserializationSchemaFactory<?> formatFactory = TableFormatFactoryService.find(
+ DeserializationSchemaFactory.class,
+ properties,
+ this.getClass().getClassLoader());
+ @SuppressWarnings("unchecked")
+ final DeserializationSchema<Row> deserializationSchema = (DeserializationSchema<Row>) formatFactory
+ .createDeserializationSchema(properties);
- // topic
- final String topic = params.getString(CONNECTOR_TOPIC);
- builder.forTopic(topic);
+ // schema
+ final TableSchema schema = params.getTableSchema(SCHEMA());
+
+ // proctime
+ final String proctimeAttribute = SchemaValidator.deriveProctimeAttribute(params).orElse(null);
--- End diff --
this is kind of ridiculous :/ Deep inside we work on `Optional` and then we switch to null...
Please drag this `Optional` until the very end and do this `orElse(null)` conversion in `org.apache.flink.table.sources.DefinedProctimeAttribute#getProctimeAttribute` implementation.
---
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6323#discussion_r202347281
--- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java ---
@@ -329,18 +329,6 @@ public void stop(SessionContext session) {
}
}
- private <T> void executeUpdateInternal(ExecutionContext<T> context, String query) {
--- End diff --
Please squash this change with commit that introduced this unused method
---
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6323#discussion_r202282838
--- Diff: flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceFactoryTest.java ---
@@ -21,8 +21,12 @@
import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_08;
/**
- * Tests for {@link Kafka08JsonTableSourceFactory}.
+ * Tests for legacy Kafka08JsonTableSourceFactory.
+ *
+ * @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we
+ * drop support for format-specific table sources.
*/
+@Deprecated
--- End diff --
Does this modification belong to this commit?
---
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/6323#discussion_r202506494
--- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/SourceSink.java ---
@@ -51,16 +51,10 @@ public void addProperties(DescriptorProperties properties) {
}
public Source toSource() {
- final Map<String, String> newProperties = new HashMap<>(properties);
- newProperties.replace(TableDescriptorValidator.TABLE_TYPE(),
- TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE());
--- End diff --
The table type is a concept of the SQL Client and should not be part of the table descriptor.
---
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6323#discussion_r202334214
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala ---
@@ -18,143 +18,358 @@
package org.apache.flink.table.factories
-import java.util.{ServiceConfigurationError, ServiceLoader}
+import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap}
import org.apache.flink.table.api._
import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
import org.apache.flink.table.descriptors.FormatDescriptorValidator._
import org.apache.flink.table.descriptors.MetadataValidator._
import org.apache.flink.table.descriptors.StatisticsValidator._
-import org.apache.flink.table.descriptors.{DescriptorProperties, TableDescriptor, TableDescriptorValidator}
+import org.apache.flink.table.descriptors._
import org.apache.flink.table.util.Logging
+import org.apache.flink.util.Preconditions
import _root_.scala.collection.JavaConverters._
import _root_.scala.collection.mutable
/**
- * Unified interface to search for TableFactoryDiscoverable of provided type and properties.
+ * Unified interface to search for a [[TableFactory]] of provided type and properties.
*/
object TableFactoryService extends Logging {
private lazy val defaultLoader = ServiceLoader.load(classOf[TableFactory])
- def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = {
- find(clz, descriptor, null)
+ /**
+ * Finds a table factory of the given class and descriptor.
+ *
+ * @param factoryClass desired factory class
+ * @param descriptor descriptor describing the factory configuration
+ * @tparam T factory class type
+ * @return the matching factory
+ */
+ def find[T](factoryClass: Class[T], descriptor: Descriptor): T = {
+ Preconditions.checkNotNull(factoryClass)
--- End diff --
if you are not touching variables but only passing them over somewhere, you can safely skip `checkNotNull`. Handy if some variable is being passed multiple times (like here)
---
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/6323#discussion_r202506766
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala ---
@@ -18,143 +18,358 @@
package org.apache.flink.table.factories
-import java.util.{ServiceConfigurationError, ServiceLoader}
+import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap}
import org.apache.flink.table.api._
import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
import org.apache.flink.table.descriptors.FormatDescriptorValidator._
import org.apache.flink.table.descriptors.MetadataValidator._
import org.apache.flink.table.descriptors.StatisticsValidator._
-import org.apache.flink.table.descriptors.{DescriptorProperties, TableDescriptor, TableDescriptorValidator}
+import org.apache.flink.table.descriptors._
import org.apache.flink.table.util.Logging
+import org.apache.flink.util.Preconditions
import _root_.scala.collection.JavaConverters._
import _root_.scala.collection.mutable
/**
- * Unified interface to search for TableFactoryDiscoverable of provided type and properties.
+ * Unified interface to search for a [[TableFactory]] of provided type and properties.
*/
object TableFactoryService extends Logging {
private lazy val defaultLoader = ServiceLoader.load(classOf[TableFactory])
- def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = {
- find(clz, descriptor, null)
+ /**
+ * Finds a table factory of the given class and descriptor.
+ *
+ * @param factoryClass desired factory class
+ * @param descriptor descriptor describing the factory configuration
+ * @tparam T factory class type
+ * @return the matching factory
+ */
+ def find[T](factoryClass: Class[T], descriptor: Descriptor): T = {
+ Preconditions.checkNotNull(factoryClass)
+ Preconditions.checkNotNull(descriptor)
+
+ val descriptorProperties = new DescriptorProperties()
+ descriptor.addProperties(descriptorProperties)
+ findInternal(factoryClass, descriptorProperties.asMap, None)
}
- def find(clz: Class[_], descriptor: TableDescriptor, classLoader: ClassLoader)
- : TableFactory = {
+ /**
+ * Finds a table factory of the given class, descriptor, and classloader.
+ *
+ * @param factoryClass desired factory class
+ * @param descriptor descriptor describing the factory configuration
+ * @param classLoader classloader for service loading
+ * @tparam T factory class type
+ * @return the matching factory
+ */
+ def find[T](factoryClass: Class[T], descriptor: Descriptor, classLoader: ClassLoader): T = {
+ Preconditions.checkNotNull(factoryClass)
+ Preconditions.checkNotNull(descriptor)
+ Preconditions.checkNotNull(classLoader)
- val properties = new DescriptorProperties()
- descriptor.addProperties(properties)
- find(clz, properties.asMap.asScala.toMap, classLoader)
+ val descriptorProperties = new DescriptorProperties()
+ descriptor.addProperties(descriptorProperties)
+ findInternal(factoryClass, descriptorProperties.asMap, None)
}
- def find(clz: Class[_], properties: Map[String, String]): TableFactory = {
- find(clz: Class[_], properties, null)
+ /**
+ * Finds a table factory of the given class and property map.
+ *
+ * @param factoryClass desired factory class
+ * @param propertyMap properties that describe the factory configuration
+ * @tparam T factory class type
+ * @return the matching factory
+ */
+ def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): T = {
+ Preconditions.checkNotNull(factoryClass)
+ Preconditions.checkNotNull(propertyMap)
+
+ findInternal(factoryClass, propertyMap, None)
}
- def find(clz: Class[_], properties: Map[String, String],
- classLoader: ClassLoader): TableFactory = {
+ /**
+ * Finds a table factory of the given class, property map, and classloader.
+ *
+ * @param factoryClass desired factory class
+ * @param propertyMap properties that describe the factory configuration
+ * @param classLoader classloader for service loading
+ * @tparam T factory class type
+ * @return the matching factory
+ */
+ def find[T](
+ factoryClass: Class[T],
+ propertyMap: JMap[String, String],
+ classLoader: ClassLoader)
+ : T = {
+ Preconditions.checkNotNull(factoryClass)
+ Preconditions.checkNotNull(propertyMap)
+ Preconditions.checkNotNull(classLoader)
+
+ findInternal(factoryClass, propertyMap, Some(classLoader))
--- End diff --
I like to have it more explicit at the beginning of a function.
---
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6323#discussion_r202336263
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala ---
@@ -18,143 +18,358 @@
package org.apache.flink.table.factories
-import java.util.{ServiceConfigurationError, ServiceLoader}
+import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap}
import org.apache.flink.table.api._
import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
import org.apache.flink.table.descriptors.FormatDescriptorValidator._
import org.apache.flink.table.descriptors.MetadataValidator._
import org.apache.flink.table.descriptors.StatisticsValidator._
-import org.apache.flink.table.descriptors.{DescriptorProperties, TableDescriptor, TableDescriptorValidator}
+import org.apache.flink.table.descriptors._
import org.apache.flink.table.util.Logging
+import org.apache.flink.util.Preconditions
import _root_.scala.collection.JavaConverters._
import _root_.scala.collection.mutable
/**
- * Unified interface to search for TableFactoryDiscoverable of provided type and properties.
+ * Unified interface to search for a [[TableFactory]] of provided type and properties.
*/
object TableFactoryService extends Logging {
private lazy val defaultLoader = ServiceLoader.load(classOf[TableFactory])
- def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = {
- find(clz, descriptor, null)
+ /**
+ * Finds a table factory of the given class and descriptor.
+ *
+ * @param factoryClass desired factory class
+ * @param descriptor descriptor describing the factory configuration
+ * @tparam T factory class type
+ * @return the matching factory
+ */
+ def find[T](factoryClass: Class[T], descriptor: Descriptor): T = {
+ Preconditions.checkNotNull(factoryClass)
+ Preconditions.checkNotNull(descriptor)
+
+ val descriptorProperties = new DescriptorProperties()
+ descriptor.addProperties(descriptorProperties)
+ findInternal(factoryClass, descriptorProperties.asMap, None)
}
- def find(clz: Class[_], descriptor: TableDescriptor, classLoader: ClassLoader)
- : TableFactory = {
+ /**
+ * Finds a table factory of the given class, descriptor, and classloader.
+ *
+ * @param factoryClass desired factory class
+ * @param descriptor descriptor describing the factory configuration
+ * @param classLoader classloader for service loading
+ * @tparam T factory class type
+ * @return the matching factory
+ */
+ def find[T](factoryClass: Class[T], descriptor: Descriptor, classLoader: ClassLoader): T = {
+ Preconditions.checkNotNull(factoryClass)
+ Preconditions.checkNotNull(descriptor)
+ Preconditions.checkNotNull(classLoader)
- val properties = new DescriptorProperties()
- descriptor.addProperties(properties)
- find(clz, properties.asMap.asScala.toMap, classLoader)
+ val descriptorProperties = new DescriptorProperties()
+ descriptor.addProperties(descriptorProperties)
+ findInternal(factoryClass, descriptorProperties.asMap, None)
}
- def find(clz: Class[_], properties: Map[String, String]): TableFactory = {
- find(clz: Class[_], properties, null)
+ /**
+ * Finds a table factory of the given class and property map.
+ *
+ * @param factoryClass desired factory class
+ * @param propertyMap properties that describe the factory configuration
+ * @tparam T factory class type
+ * @return the matching factory
+ */
+ def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): T = {
+ Preconditions.checkNotNull(factoryClass)
+ Preconditions.checkNotNull(propertyMap)
+
+ findInternal(factoryClass, propertyMap, None)
}
- def find(clz: Class[_], properties: Map[String, String],
- classLoader: ClassLoader): TableFactory = {
+ /**
+ * Finds a table factory of the given class, property map, and classloader.
+ *
+ * @param factoryClass desired factory class
+ * @param propertyMap properties that describe the factory configuration
+ * @param classLoader classloader for service loading
+ * @tparam T factory class type
+ * @return the matching factory
+ */
+ def find[T](
+ factoryClass: Class[T],
+ propertyMap: JMap[String, String],
+ classLoader: ClassLoader)
+ : T = {
+ Preconditions.checkNotNull(factoryClass)
+ Preconditions.checkNotNull(propertyMap)
+ Preconditions.checkNotNull(classLoader)
+
+ findInternal(factoryClass, propertyMap, Some(classLoader))
+ }
+
+ /**
+ * Finds a table factory of the given class, property map, and classloader.
+ *
+ * @param factoryClass desired factory class
+ * @param propertyMap properties that describe the factory configuration
+ * @param classLoader classloader for service loading
+ * @tparam T factory class type
+ * @return the matching factory
+ */
+ private def findInternal[T](
+ factoryClass: Class[T],
+ propertyMap: JMap[String, String],
+ classLoader: Option[ClassLoader])
+ : T = {
+
+ val properties = propertyMap.asScala.toMap
+
+ // discover table factories
+ val foundFactories = discoverFactories(classLoader)
- var matchingFactory: Option[(TableFactory, Seq[String])] = None
+ // filter by factory class
+ val classFactories = filterByFactoryClass(
+ factoryClass,
+ properties,
+ foundFactories)
+
+ // find matching context
+ val contextFactories = filterByContext(
+ factoryClass,
+ properties,
+ foundFactories,
+ classFactories)
+
+ // filter by supported keys
+ filterBySupportedProperties(
+ factoryClass,
+ properties,
+ foundFactories,
+ contextFactories)
+ }
+
+ /**
+ * Searches for factories using Java service providers.
+ *
+ * @return all factories in the classpath
+ */
+ private def discoverFactories[T](classLoader: Option[ClassLoader]): Seq[TableFactory] = {
+ val foundFactories = mutable.ArrayBuffer[TableFactory]()
try {
- val iter = if (classLoader == null) {
- defaultLoader.iterator()
- } else {
- val customLoader = ServiceLoader.load(classOf[TableFactory], classLoader)
- customLoader.iterator()
+ val iterator = classLoader match {
+ case Some(customClassLoader) =>
+ val customLoader = ServiceLoader.load(classOf[TableFactory], customClassLoader)
+ customLoader.iterator()
+ case None =>
+ defaultLoader.iterator()
}
- while (iter.hasNext) {
- val factory = iter.next()
-
- if (clz.isAssignableFrom(factory.getClass)) {
- val requiredContextJava = try {
- factory.requiredContext()
- } catch {
- case t: Throwable =>
- throw new TableException(
- s"Table source factory '${factory.getClass.getCanonicalName}' caused an exception.",
- t)
- }
-
- val requiredContext = if (requiredContextJava != null) {
- // normalize properties
- requiredContextJava.asScala.map(e => (e._1.toLowerCase, e._2))
- } else {
- Map[String, String]()
- }
-
- val plainContext = mutable.Map[String, String]()
- plainContext ++= requiredContext
- // we remove the versions for now until we have the first backwards compatibility case
- // with the version we can provide mappings in case the format changes
- plainContext.remove(CONNECTOR_PROPERTY_VERSION)
- plainContext.remove(FORMAT_PROPERTY_VERSION)
- plainContext.remove(METADATA_PROPERTY_VERSION)
- plainContext.remove(STATISTICS_PROPERTY_VERSION)
-
- if (plainContext.forall(e => properties.contains(e._1) && properties(e._1) == e._2)) {
- matchingFactory match {
- case Some(_) => throw new AmbiguousTableFactoryException(properties)
- case None => matchingFactory =
- Some((factory.asInstanceOf[TableFactory], requiredContext.keys.toSeq))
- }
- }
- }
+
+ while (iterator.hasNext) {
+ val factory = iterator.next()
+ foundFactories += factory
}
+
+ foundFactories
} catch {
case e: ServiceConfigurationError =>
LOG.error("Could not load service provider for table factories.", e)
throw new TableException("Could not load service provider for table factories.", e)
}
+ }
+
+ /**
+ * Filters for factories with matching context.
+ *
+ * @return all matching factories
+ */
+ private def filterByContext[T](
+ factoryClass: Class[T],
+ properties: Map[String, String],
+ foundFactories: Seq[TableFactory],
+ classFactories: Seq[TableFactory])
+ : Seq[TableFactory] = {
+
+ val matchingFactories = mutable.ArrayBuffer[TableFactory]()
+
+ classFactories.foreach { factory =>
+ val requestedContext = normalizeContext(factory)
+
+ val plainContext = mutable.Map[String, String]()
+ plainContext ++= requestedContext
+ // we remove the version for now until we have the first backwards compatibility case
+ // with the version we can provide mappings in case the format changes
--- End diff --
What if user provides custom connector or custom format? Is it documented that we do not support versioning now?
---
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6323#discussion_r202269785
--- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java ---
@@ -54,51 +56,105 @@
* override {@link #createKafkaConsumer(String, Properties, DeserializationSchema)}}.
*/
@Internal
-public abstract class KafkaTableSource
- implements StreamTableSource<Row>, DefinedProctimeAttribute, DefinedRowtimeAttributes {
+public abstract class KafkaTableSource implements
+ StreamTableSource<Row>,
+ DefinedProctimeAttribute,
+ DefinedRowtimeAttributes,
+ DefinedFieldMapping {
+
+ // common table source attributes
+ // TODO make all attributes final once we drop support for format-specific table sources
/** The schema of the table. */
private final TableSchema schema;
+ /** Field name of the processing time attribute, null if no processing time field is defined. */
+ private String proctimeAttribute;
+
+ /** Descriptor for a rowtime attribute. */
+ private List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors;
+
+ /** Mapping for the fields of the table schema to fields of the physical returned type or null. */
+ private Map<String, String> fieldMapping;
+
+ // Kafka-specific attributes
+
/** The Kafka topic to consume. */
private final String topic;
/** Properties for the Kafka consumer. */
private final Properties properties;
- /** Type information describing the result type. */
- private TypeInformation<Row> returnType;
-
- /** Field name of the processing time attribute, null if no processing time field is defined. */
- private String proctimeAttribute;
-
- /** Descriptor for a rowtime attribute. */
- private List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors;
+ /** Deserialization schema for decoding records from Kafka. */
+ private final DeserializationSchema<Row> deserializationSchema;
/** The startup mode for the contained consumer (default is {@link StartupMode#GROUP_OFFSETS}). */
- private StartupMode startupMode = StartupMode.GROUP_OFFSETS;
+ private StartupMode startupMode;
/** Specific startup offsets; only relevant when startup mode is {@link StartupMode#SPECIFIC_OFFSETS}. */
private Map<KafkaTopicPartition, Long> specificStartupOffsets;
/**
* Creates a generic Kafka {@link StreamTableSource}.
*
- * @param topic Kafka topic to consume.
- * @param properties Properties for the Kafka consumer.
- * @param schema Schema of the produced table.
- * @param returnType Type information of the produced physical DataStream.
+ * @param schema Schema of the produced table.
+ * @param proctimeAttribute Field name of the processing time attribute, null if no
+ * processing time field is defined.
+ * @param rowtimeAttributeDescriptors Descriptor for a rowtime attribute
+ * @param fieldMapping Mapping for the fields of the table schema to
--- End diff --
How can this field ever be null? `SchemaValidator.deriveFieldMapping` doesn't allow for that.
---
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6323#discussion_r202289678
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala ---
@@ -44,32 +43,27 @@ object ExternalTableSourceUtil extends Logging {
val properties = new DescriptorProperties()
externalCatalogTable.addProperties(properties)
val javaMap = properties.asMap
- val source = TableFactoryService.find(classOf[TableSourceFactory[_]], javaMap)
- .asInstanceOf[TableSourceFactory[_]]
- .createTableSource(javaMap)
tableEnv match {
// check for a batch table source in this batch environment
case _: BatchTableEnvironment =>
- source match {
- case bts: BatchTableSource[_] =>
- new TableSourceSinkTable(Some(new BatchTableSourceTable(
- bts,
- new FlinkStatistic(externalCatalogTable.getTableStats))), None)
- case _ => throw new TableException(
- s"Found table source '${source.getClass.getCanonicalName}' is not applicable " +
- s"in a batch environment.")
- }
+ val source = TableFactoryService
--- End diff --
1.
This case looks kind of ugly. Shouldn't it be unify to sth like:
```
TableFactoryService.find(classOf[TableSourceFactory], javaMap)
.createTableSource(javaMap, environment.isInstanceOf[StreamTableEnvironment])
```
?
If you really want, you can on top of that define methods:
```
def createBatchTableSource(val javaMap) = createTableSource(javaMap, isStream = false)
def createStreamTableSource(val javaMap) = createTableSource(javaMap, isStream = true)
```
but I would skip them.
Factories could choose to support or not streaming/batching tables. Same applies to similar code in `ExecutionContext.java`.
2. Ditto: Is there any significant value of keeping both `BatchTableSink(Source)Factory` and `StreamTableSink(Source)Factory`?
both 1. and 2 adds quite a lot of boilerplate code for definition and for caller.
---
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6323#discussion_r202334605
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala ---
@@ -18,143 +18,358 @@
package org.apache.flink.table.factories
-import java.util.{ServiceConfigurationError, ServiceLoader}
+import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap}
import org.apache.flink.table.api._
import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
import org.apache.flink.table.descriptors.FormatDescriptorValidator._
import org.apache.flink.table.descriptors.MetadataValidator._
import org.apache.flink.table.descriptors.StatisticsValidator._
-import org.apache.flink.table.descriptors.{DescriptorProperties, TableDescriptor, TableDescriptorValidator}
+import org.apache.flink.table.descriptors._
import org.apache.flink.table.util.Logging
+import org.apache.flink.util.Preconditions
import _root_.scala.collection.JavaConverters._
import _root_.scala.collection.mutable
/**
- * Unified interface to search for TableFactoryDiscoverable of provided type and properties.
+ * Unified interface to search for a [[TableFactory]] of provided type and properties.
*/
object TableFactoryService extends Logging {
private lazy val defaultLoader = ServiceLoader.load(classOf[TableFactory])
- def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = {
- find(clz, descriptor, null)
+ /**
+ * Finds a table factory of the given class and descriptor.
+ *
+ * @param factoryClass desired factory class
+ * @param descriptor descriptor describing the factory configuration
+ * @tparam T factory class type
+ * @return the matching factory
+ */
+ def find[T](factoryClass: Class[T], descriptor: Descriptor): T = {
+ Preconditions.checkNotNull(factoryClass)
+ Preconditions.checkNotNull(descriptor)
+
+ val descriptorProperties = new DescriptorProperties()
+ descriptor.addProperties(descriptorProperties)
+ findInternal(factoryClass, descriptorProperties.asMap, None)
}
- def find(clz: Class[_], descriptor: TableDescriptor, classLoader: ClassLoader)
- : TableFactory = {
+ /**
+ * Finds a table factory of the given class, descriptor, and classloader.
+ *
+ * @param factoryClass desired factory class
+ * @param descriptor descriptor describing the factory configuration
+ * @param classLoader classloader for service loading
+ * @tparam T factory class type
+ * @return the matching factory
+ */
+ def find[T](factoryClass: Class[T], descriptor: Descriptor, classLoader: ClassLoader): T = {
+ Preconditions.checkNotNull(factoryClass)
+ Preconditions.checkNotNull(descriptor)
+ Preconditions.checkNotNull(classLoader)
- val properties = new DescriptorProperties()
- descriptor.addProperties(properties)
- find(clz, properties.asMap.asScala.toMap, classLoader)
+ val descriptorProperties = new DescriptorProperties()
+ descriptor.addProperties(descriptorProperties)
+ findInternal(factoryClass, descriptorProperties.asMap, None)
}
- def find(clz: Class[_], properties: Map[String, String]): TableFactory = {
- find(clz: Class[_], properties, null)
+ /**
+ * Finds a table factory of the given class and property map.
+ *
+ * @param factoryClass desired factory class
+ * @param propertyMap properties that describe the factory configuration
+ * @tparam T factory class type
+ * @return the matching factory
+ */
+ def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): T = {
+ Preconditions.checkNotNull(factoryClass)
+ Preconditions.checkNotNull(propertyMap)
+
+ findInternal(factoryClass, propertyMap, None)
}
- def find(clz: Class[_], properties: Map[String, String],
- classLoader: ClassLoader): TableFactory = {
+ /**
+ * Finds a table factory of the given class, property map, and classloader.
+ *
+ * @param factoryClass desired factory class
+ * @param propertyMap properties that describe the factory configuration
+ * @param classLoader classloader for service loading
+ * @tparam T factory class type
+ * @return the matching factory
+ */
+ def find[T](
+ factoryClass: Class[T],
+ propertyMap: JMap[String, String],
+ classLoader: ClassLoader)
+ : T = {
+ Preconditions.checkNotNull(factoryClass)
+ Preconditions.checkNotNull(propertyMap)
+ Preconditions.checkNotNull(classLoader)
+
+ findInternal(factoryClass, propertyMap, Some(classLoader))
--- End diff --
Another hint sometimes it's handy to inline `checkNotNull` call:
```
findInternal(factoryClass, propertyMap, Some(checkNotNull(classLoader)))
```
---
[GitHub] flink issue #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified table ...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:
https://github.com/apache/flink/pull/6323
Thank @pnowojski. Merging this...
---
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/6323#discussion_r202506193
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactory.scala ---
@@ -16,14 +16,14 @@
* limitations under the License.
*/
-package org.apache.flink.table.connectors
+package org.apache.flink.table.factories
import java.util
/**
* Common trait for all properties-based discoverable table factories.
*/
-trait DiscoverableTableFactory {
+trait TableFactory {
--- End diff --
Actually, I would like the very generic name `Factory` but since we have to add some prefix to make it unique in the project, I named it `TableFactory` because we prefix everything in this module with `Table`.
---
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/6323#discussion_r202506661
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala ---
@@ -143,118 +143,82 @@ case class CatalogAlreadyExistException(
}
/**
- * Exception for not finding a [[TableFormatFactory]] for the
- * given properties.
+ * Exception for not finding a [[TableFactory]] for the given properties.
*
* @param message message that indicates the current matching step
* @param factoryClass required factory class
- * @param formatFactories all found factories
- * @param properties properties that describe the table format
+ * @param factories all found factories
+ * @param properties properties that describe the configuration
* @param cause the cause
*/
-case class NoMatchingTableFormatException(
+case class NoMatchingTableFactoryException(
message: String,
factoryClass: Class[_],
- formatFactories: Seq[TableFormatFactory[_]],
+ factories: Seq[TableFactory],
properties: Map[String, String],
cause: Throwable)
extends RuntimeException(
--- End diff --
So far we don't have an inheritance of exceptions. Case classes don't support that in Scala so we would need to convert them.
---
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/6323#discussion_r202535180
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala ---
@@ -18,143 +18,358 @@
package org.apache.flink.table.factories
-import java.util.{ServiceConfigurationError, ServiceLoader}
+import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap}
import org.apache.flink.table.api._
import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
import org.apache.flink.table.descriptors.FormatDescriptorValidator._
import org.apache.flink.table.descriptors.MetadataValidator._
import org.apache.flink.table.descriptors.StatisticsValidator._
-import org.apache.flink.table.descriptors.{DescriptorProperties, TableDescriptor, TableDescriptorValidator}
+import org.apache.flink.table.descriptors._
import org.apache.flink.table.util.Logging
+import org.apache.flink.util.Preconditions
import _root_.scala.collection.JavaConverters._
import _root_.scala.collection.mutable
/**
- * Unified interface to search for TableFactoryDiscoverable of provided type and properties.
+ * Unified interface to search for a [[TableFactory]] of provided type and properties.
*/
object TableFactoryService extends Logging {
private lazy val defaultLoader = ServiceLoader.load(classOf[TableFactory])
- def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = {
- find(clz, descriptor, null)
+ /**
+ * Finds a table factory of the given class and descriptor.
+ *
+ * @param factoryClass desired factory class
+ * @param descriptor descriptor describing the factory configuration
+ * @tparam T factory class type
+ * @return the matching factory
+ */
+ def find[T](factoryClass: Class[T], descriptor: Descriptor): T = {
+ Preconditions.checkNotNull(factoryClass)
+ Preconditions.checkNotNull(descriptor)
+
+ val descriptorProperties = new DescriptorProperties()
+ descriptor.addProperties(descriptorProperties)
+ findInternal(factoryClass, descriptorProperties.asMap, None)
}
- def find(clz: Class[_], descriptor: TableDescriptor, classLoader: ClassLoader)
- : TableFactory = {
+ /**
+ * Finds a table factory of the given class, descriptor, and classloader.
+ *
+ * @param factoryClass desired factory class
+ * @param descriptor descriptor describing the factory configuration
+ * @param classLoader classloader for service loading
+ * @tparam T factory class type
+ * @return the matching factory
+ */
+ def find[T](factoryClass: Class[T], descriptor: Descriptor, classLoader: ClassLoader): T = {
+ Preconditions.checkNotNull(factoryClass)
+ Preconditions.checkNotNull(descriptor)
+ Preconditions.checkNotNull(classLoader)
- val properties = new DescriptorProperties()
- descriptor.addProperties(properties)
- find(clz, properties.asMap.asScala.toMap, classLoader)
+ val descriptorProperties = new DescriptorProperties()
+ descriptor.addProperties(descriptorProperties)
+ findInternal(factoryClass, descriptorProperties.asMap, None)
}
- def find(clz: Class[_], properties: Map[String, String]): TableFactory = {
- find(clz: Class[_], properties, null)
+ /**
+ * Finds a table factory of the given class and property map.
+ *
+ * @param factoryClass desired factory class
+ * @param propertyMap properties that describe the factory configuration
+ * @tparam T factory class type
+ * @return the matching factory
+ */
+ def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): T = {
+ Preconditions.checkNotNull(factoryClass)
+ Preconditions.checkNotNull(propertyMap)
+
+ findInternal(factoryClass, propertyMap, None)
}
- def find(clz: Class[_], properties: Map[String, String],
- classLoader: ClassLoader): TableFactory = {
+ /**
+ * Finds a table factory of the given class, property map, and classloader.
+ *
+ * @param factoryClass desired factory class
+ * @param propertyMap properties that describe the factory configuration
+ * @param classLoader classloader for service loading
+ * @tparam T factory class type
+ * @return the matching factory
+ */
+ def find[T](
+ factoryClass: Class[T],
+ propertyMap: JMap[String, String],
+ classLoader: ClassLoader)
+ : T = {
+ Preconditions.checkNotNull(factoryClass)
+ Preconditions.checkNotNull(propertyMap)
+ Preconditions.checkNotNull(classLoader)
+
+ findInternal(factoryClass, propertyMap, Some(classLoader))
+ }
+
+ /**
+ * Finds a table factory of the given class, property map, and classloader.
+ *
+ * @param factoryClass desired factory class
+ * @param propertyMap properties that describe the factory configuration
+ * @param classLoader classloader for service loading
+ * @tparam T factory class type
+ * @return the matching factory
+ */
+ private def findInternal[T](
+ factoryClass: Class[T],
+ propertyMap: JMap[String, String],
+ classLoader: Option[ClassLoader])
+ : T = {
+
+ val properties = propertyMap.asScala.toMap
+
+ // discover table factories
+ val foundFactories = discoverFactories(classLoader)
- var matchingFactory: Option[(TableFactory, Seq[String])] = None
+ // filter by factory class
+ val classFactories = filterByFactoryClass(
+ factoryClass,
+ properties,
+ foundFactories)
+
+ // find matching context
+ val contextFactories = filterByContext(
+ factoryClass,
+ properties,
+ foundFactories,
+ classFactories)
+
+ // filter by supported keys
+ filterBySupportedProperties(
+ factoryClass,
+ properties,
+ foundFactories,
+ contextFactories)
+ }
+
+ /**
+ * Searches for factories using Java service providers.
+ *
+ * @return all factories in the classpath
+ */
+ private def discoverFactories[T](classLoader: Option[ClassLoader]): Seq[TableFactory] = {
+ val foundFactories = mutable.ArrayBuffer[TableFactory]()
try {
- val iter = if (classLoader == null) {
- defaultLoader.iterator()
- } else {
- val customLoader = ServiceLoader.load(classOf[TableFactory], classLoader)
- customLoader.iterator()
+ val iterator = classLoader match {
+ case Some(customClassLoader) =>
+ val customLoader = ServiceLoader.load(classOf[TableFactory], customClassLoader)
+ customLoader.iterator()
+ case None =>
+ defaultLoader.iterator()
}
- while (iter.hasNext) {
- val factory = iter.next()
-
- if (clz.isAssignableFrom(factory.getClass)) {
- val requiredContextJava = try {
- factory.requiredContext()
- } catch {
- case t: Throwable =>
- throw new TableException(
- s"Table source factory '${factory.getClass.getCanonicalName}' caused an exception.",
- t)
- }
-
- val requiredContext = if (requiredContextJava != null) {
- // normalize properties
- requiredContextJava.asScala.map(e => (e._1.toLowerCase, e._2))
- } else {
- Map[String, String]()
- }
-
- val plainContext = mutable.Map[String, String]()
- plainContext ++= requiredContext
- // we remove the versions for now until we have the first backwards compatibility case
- // with the version we can provide mappings in case the format changes
- plainContext.remove(CONNECTOR_PROPERTY_VERSION)
- plainContext.remove(FORMAT_PROPERTY_VERSION)
- plainContext.remove(METADATA_PROPERTY_VERSION)
- plainContext.remove(STATISTICS_PROPERTY_VERSION)
-
- if (plainContext.forall(e => properties.contains(e._1) && properties(e._1) == e._2)) {
- matchingFactory match {
- case Some(_) => throw new AmbiguousTableFactoryException(properties)
- case None => matchingFactory =
- Some((factory.asInstanceOf[TableFactory], requiredContext.keys.toSeq))
- }
- }
- }
+
+ while (iterator.hasNext) {
+ val factory = iterator.next()
+ foundFactories += factory
}
+
+ foundFactories
} catch {
case e: ServiceConfigurationError =>
LOG.error("Could not load service provider for table factories.", e)
throw new TableException("Could not load service provider for table factories.", e)
}
+ }
+
+ /**
+ * Filters for factories with matching context.
+ *
+ * @return all matching factories
+ */
+ private def filterByContext[T](
+ factoryClass: Class[T],
+ properties: Map[String, String],
+ foundFactories: Seq[TableFactory],
+ classFactories: Seq[TableFactory])
+ : Seq[TableFactory] = {
+
+ val matchingFactories = mutable.ArrayBuffer[TableFactory]()
+
+ classFactories.foreach { factory =>
+ val requestedContext = normalizeContext(factory)
+
+ val plainContext = mutable.Map[String, String]()
+ plainContext ++= requestedContext
+ // we remove the version for now until we have the first backwards compatibility case
+ // with the version we can provide mappings in case the format changes
--- End diff --
I opened FLINK-9851 for that.
---
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6323#discussion_r202520986
--- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/SourceSink.java ---
@@ -51,16 +51,10 @@ public void addProperties(DescriptorProperties properties) {
}
public Source toSource() {
- final Map<String, String> newProperties = new HashMap<>(properties);
- newProperties.replace(TableDescriptorValidator.TABLE_TYPE(),
- TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE());
--- End diff --
I'm just asking because I can not find what code has replaced them. Or there were some dead code?
---
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6323#discussion_r202350170
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala ---
@@ -103,24 +103,40 @@ abstract class BatchTableEnvironment(
: Unit = {
tableSource match {
+
+ // check for proper batch table source
--- End diff --
ditto, those changes do seems to not belong to this commit. If so please extract them to separate one:
```
git reset --soft HEAD^
git reset HEAD flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
git commit -m "Clean up and simplify changes"
git commit -a --fixup=42a8a156d4e6f8f3d119c458350b6c897306fc48 # MD5 of Shuyi Chen's commit
git rebase -i origin/master --autosquash
```
---
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6323#discussion_r202269870
--- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java ---
@@ -134,34 +190,60 @@ public String getProctimeAttribute() {
return rowtimeAttributeDescriptors;
}
+ @Override
+ public Map<String, String> getFieldMapping() {
+ return fieldMapping;
+ }
+
@Override
public String explainSource() {
return TableConnectorUtil.generateRuntimeName(this.getClass(), schema.getColumnNames());
}
+ /**
+ * Returns the properties for the Kafka consumer.
+ *
+ * @return properties for the Kafka consumer.
+ */
+ public Properties getProperties() {
+ return properties;
+ }
+
+ /**
+ * Returns the deserialization schema.
+ *
+ * @return The deserialization schema
+ */
+ public DeserializationSchema<Row> getDeserializationSchema(){
+ return deserializationSchema;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
- if (!(o instanceof KafkaTableSource)) {
+ // TODO force classes to be equal once we drop support for format-specific table sources
+ // if (o == null || getClass() != o.getClass()) {
+ if (o == null || !(o instanceof KafkaTableSource)) {
return false;
}
- KafkaTableSource that = (KafkaTableSource) o;
+ final KafkaTableSource that = (KafkaTableSource) o;
return Objects.equals(schema, that.schema) &&
- Objects.equals(topic, that.topic) &&
- Objects.equals(properties, that.properties) &&
- Objects.equals(returnType, that.returnType) &&
Objects.equals(proctimeAttribute, that.proctimeAttribute) &&
Objects.equals(rowtimeAttributeDescriptors, that.rowtimeAttributeDescriptors) &&
+ Objects.equals(fieldMapping, that.fieldMapping) &&
+ Objects.equals(topic, that.topic) &&
+ Objects.equals(properties, that.properties) &&
+ Objects.equals(deserializationSchema, that.deserializationSchema) &&
startupMode == that.startupMode &&
Objects.equals(specificStartupOffsets, that.specificStartupOffsets);
}
@Override
public int hashCode() {
- return Objects.hash(schema, topic, properties, returnType,
- proctimeAttribute, rowtimeAttributeDescriptors, startupMode, specificStartupOffsets);
+ return Objects.hash(schema, proctimeAttribute, rowtimeAttributeDescriptors, fieldMapping,
--- End diff --
format one entry per line
---
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6323#discussion_r202335703
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala ---
@@ -18,143 +18,358 @@
package org.apache.flink.table.factories
-import java.util.{ServiceConfigurationError, ServiceLoader}
+import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap}
import org.apache.flink.table.api._
import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
import org.apache.flink.table.descriptors.FormatDescriptorValidator._
import org.apache.flink.table.descriptors.MetadataValidator._
import org.apache.flink.table.descriptors.StatisticsValidator._
-import org.apache.flink.table.descriptors.{DescriptorProperties, TableDescriptor, TableDescriptorValidator}
+import org.apache.flink.table.descriptors._
import org.apache.flink.table.util.Logging
+import org.apache.flink.util.Preconditions
import _root_.scala.collection.JavaConverters._
import _root_.scala.collection.mutable
/**
- * Unified interface to search for TableFactoryDiscoverable of provided type and properties.
+ * Unified interface to search for a [[TableFactory]] of provided type and properties.
*/
object TableFactoryService extends Logging {
private lazy val defaultLoader = ServiceLoader.load(classOf[TableFactory])
- def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = {
- find(clz, descriptor, null)
+ /**
+ * Finds a table factory of the given class and descriptor.
+ *
+ * @param factoryClass desired factory class
+ * @param descriptor descriptor describing the factory configuration
+ * @tparam T factory class type
+ * @return the matching factory
+ */
+ def find[T](factoryClass: Class[T], descriptor: Descriptor): T = {
+ Preconditions.checkNotNull(factoryClass)
+ Preconditions.checkNotNull(descriptor)
+
+ val descriptorProperties = new DescriptorProperties()
+ descriptor.addProperties(descriptorProperties)
+ findInternal(factoryClass, descriptorProperties.asMap, None)
}
- def find(clz: Class[_], descriptor: TableDescriptor, classLoader: ClassLoader)
- : TableFactory = {
+ /**
+ * Finds a table factory of the given class, descriptor, and classloader.
+ *
+ * @param factoryClass desired factory class
+ * @param descriptor descriptor describing the factory configuration
+ * @param classLoader classloader for service loading
+ * @tparam T factory class type
+ * @return the matching factory
+ */
+ def find[T](factoryClass: Class[T], descriptor: Descriptor, classLoader: ClassLoader): T = {
+ Preconditions.checkNotNull(factoryClass)
+ Preconditions.checkNotNull(descriptor)
+ Preconditions.checkNotNull(classLoader)
- val properties = new DescriptorProperties()
- descriptor.addProperties(properties)
- find(clz, properties.asMap.asScala.toMap, classLoader)
+ val descriptorProperties = new DescriptorProperties()
+ descriptor.addProperties(descriptorProperties)
+ findInternal(factoryClass, descriptorProperties.asMap, None)
}
- def find(clz: Class[_], properties: Map[String, String]): TableFactory = {
- find(clz: Class[_], properties, null)
+ /**
+ * Finds a table factory of the given class and property map.
+ *
+ * @param factoryClass desired factory class
+ * @param propertyMap properties that describe the factory configuration
+ * @tparam T factory class type
+ * @return the matching factory
+ */
+ def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): T = {
+ Preconditions.checkNotNull(factoryClass)
+ Preconditions.checkNotNull(propertyMap)
+
+ findInternal(factoryClass, propertyMap, None)
}
- def find(clz: Class[_], properties: Map[String, String],
- classLoader: ClassLoader): TableFactory = {
+ /**
+ * Finds a table factory of the given class, property map, and classloader.
+ *
+ * @param factoryClass desired factory class
+ * @param propertyMap properties that describe the factory configuration
+ * @param classLoader classloader for service loading
+ * @tparam T factory class type
+ * @return the matching factory
+ */
+ def find[T](
+ factoryClass: Class[T],
+ propertyMap: JMap[String, String],
+ classLoader: ClassLoader)
+ : T = {
+ Preconditions.checkNotNull(factoryClass)
+ Preconditions.checkNotNull(propertyMap)
+ Preconditions.checkNotNull(classLoader)
+
+ findInternal(factoryClass, propertyMap, Some(classLoader))
+ }
+
+ /**
+ * Finds a table factory of the given class, property map, and classloader.
+ *
+ * @param factoryClass desired factory class
+ * @param propertyMap properties that describe the factory configuration
+ * @param classLoader classloader for service loading
+ * @tparam T factory class type
+ * @return the matching factory
+ */
+ private def findInternal[T](
+ factoryClass: Class[T],
+ propertyMap: JMap[String, String],
+ classLoader: Option[ClassLoader])
+ : T = {
+
+ val properties = propertyMap.asScala.toMap
+
+ // discover table factories
+ val foundFactories = discoverFactories(classLoader)
- var matchingFactory: Option[(TableFactory, Seq[String])] = None
+ // filter by factory class
+ val classFactories = filterByFactoryClass(
+ factoryClass,
+ properties,
+ foundFactories)
+
+ // find matching context
+ val contextFactories = filterByContext(
+ factoryClass,
+ properties,
+ foundFactories,
+ classFactories)
+
+ // filter by supported keys
+ filterBySupportedProperties(
+ factoryClass,
+ properties,
+ foundFactories,
+ contextFactories)
+ }
+
+ /**
+ * Searches for factories using Java service providers.
+ *
+ * @return all factories in the classpath
+ */
+ private def discoverFactories[T](classLoader: Option[ClassLoader]): Seq[TableFactory] = {
+ val foundFactories = mutable.ArrayBuffer[TableFactory]()
try {
- val iter = if (classLoader == null) {
- defaultLoader.iterator()
- } else {
- val customLoader = ServiceLoader.load(classOf[TableFactory], classLoader)
- customLoader.iterator()
+ val iterator = classLoader match {
+ case Some(customClassLoader) =>
+ val customLoader = ServiceLoader.load(classOf[TableFactory], customClassLoader)
+ customLoader.iterator()
+ case None =>
+ defaultLoader.iterator()
}
- while (iter.hasNext) {
- val factory = iter.next()
-
- if (clz.isAssignableFrom(factory.getClass)) {
- val requiredContextJava = try {
- factory.requiredContext()
- } catch {
- case t: Throwable =>
- throw new TableException(
- s"Table source factory '${factory.getClass.getCanonicalName}' caused an exception.",
- t)
- }
-
- val requiredContext = if (requiredContextJava != null) {
- // normalize properties
- requiredContextJava.asScala.map(e => (e._1.toLowerCase, e._2))
- } else {
- Map[String, String]()
- }
-
- val plainContext = mutable.Map[String, String]()
- plainContext ++= requiredContext
- // we remove the versions for now until we have the first backwards compatibility case
- // with the version we can provide mappings in case the format changes
- plainContext.remove(CONNECTOR_PROPERTY_VERSION)
- plainContext.remove(FORMAT_PROPERTY_VERSION)
- plainContext.remove(METADATA_PROPERTY_VERSION)
- plainContext.remove(STATISTICS_PROPERTY_VERSION)
-
- if (plainContext.forall(e => properties.contains(e._1) && properties(e._1) == e._2)) {
- matchingFactory match {
- case Some(_) => throw new AmbiguousTableFactoryException(properties)
- case None => matchingFactory =
- Some((factory.asInstanceOf[TableFactory], requiredContext.keys.toSeq))
- }
- }
- }
+
+ while (iterator.hasNext) {
+ val factory = iterator.next()
+ foundFactories += factory
}
+
+ foundFactories
} catch {
case e: ServiceConfigurationError =>
LOG.error("Could not load service provider for table factories.", e)
throw new TableException("Could not load service provider for table factories.", e)
}
+ }
+
+ /**
+ * Filters for factories with matching context.
+ *
+ * @return all matching factories
+ */
+ private def filterByContext[T](
--- End diff --
nit: move this method below `filterByFactoryClass`? (to match the order of invocations)
---
[GitHub] flink issue #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified table ...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:
https://github.com/apache/flink/pull/6323
Thank you @pnowojski. I hope I could address all your comments. I will clean the commit history and improve the commit messages during merging.
---
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6323#discussion_r202282221
--- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java ---
@@ -214,15 +216,18 @@ private static TableDescriptor createTableDescriptor(String name, Map<String, Ob
if (typeObject == null || !(typeObject instanceof String)) {
throw new SqlClientException("Invalid 'type' attribute for table '" + name + "'.");
}
- final String type = (String) config.get(TABLE_TYPE);
+ final String type = (String) typeObject;
+ final Map<String, Object> properties = new HashMap<>(config);
config.remove(TABLE_TYPE);
- final Map<String, String> normalizedConfig = ConfigUtil.normalizeYaml(config);
- if (type.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE())) {
- return new Source(name, normalizedConfig);
- } else if (type.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SINK())) {
- return new Sink(name, normalizedConfig);
- } else if (type.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE_SINK())) {
- return new SourceSink(name, normalizedConfig);
+
+ final Map<String, String> normalizedProperties = ConfigUtil.normalizeYaml(properties);
+ switch (type) {
+ case TABLE_TYPE_VALUE_SOURCE:
+ return new Source(name, normalizedProperties);
+ case TABLE_TYPE_VALUE_SINK:
+ return new Sink(name, normalizedProperties);
+ case TABLE_TYPE_VALUE_BOTH:
+ return new SourceSink(name, normalizedProperties);
--- End diff --
```
default:
throw new UnsupportedOperationException(
String.format("Unsupported [%s] value [%s]", TABLE_TYPE, type)
```
---
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6323#discussion_r202275530
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactory.scala ---
@@ -21,7 +21,14 @@ package org.apache.flink.table.factories
import java.util
/**
- * Common trait for all properties-based discoverable table factories.
+ * A factory to create different table-related instances from string-based properties. This
+ * factory is used with Java's Service Provider Interfaces (SPI) for discovering. A factory is
+ * called with a set of normalized properties that describe the desired format. The factory allows
--- End diff --
isn't this comment a bit misleading?
> that describe the desired format.
---
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6323#discussion_r202348312
--- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ProgramDeployer.java ---
@@ -63,65 +62,45 @@ public void run() {
LOG.debug("Submitting job {} with the following environment: \n{}",
jobGraph.getJobID(), context.getMergedEnvironment());
}
- if (result != null) {
- executionResultBucket.add(deployJob(context, jobGraph, result));
- } else {
- deployJob(context, jobGraph, result);
- }
+ executionResultBucket.add(deployJob(context, jobGraph, result));
}
public JobExecutionResult fetchExecutionResult() {
- if (result != null) {
- return executionResultBucket.poll();
- } else {
- return null;
- }
+ return executionResultBucket.poll();
}
/**
- * Deploys a job. Depending on the deployment creates a new job cluster. If result is requested,
- * it saves the cluster id in the result and blocks until job completion.
+ * Deploys a job. Depending on the deployment creates a new job cluster. It saves the cluster id in
--- End diff --
How do those changes in this file relate to rest of the PR/commit?
---
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6323#discussion_r202327344
--- Diff: flink-libraries/flink-table/src/test/resources/META-INF/services/org.apache.flink.table.connectors.DiscoverableTableFactory ---
@@ -13,5 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+<<<<<<< HEAD:flink-libraries/flink-table/src/test/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
--- End diff --
?
---
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6323#discussion_r202356462
--- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableEnvironmentITCase.scala ---
@@ -186,7 +186,7 @@ class TableEnvironmentITCase(
def testInsertIntoMemoryTable(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
- MemoryTableSourceSinkUtil.clear
+ MemoryTableSourceSinkUtil.clear()
--- End diff --
ditto, and same applies probably to hundreds of other changes in this commit :(
---
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/6323#discussion_r202506512
--- Diff: flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceFactoryTest.java ---
@@ -21,8 +21,12 @@
import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_08;
/**
- * Tests for {@link Kafka08JsonTableSourceFactory}.
+ * Tests for legacy Kafka08JsonTableSourceFactory.
+ *
+ * @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we
+ * drop support for format-specific table sources.
*/
+@Deprecated
--- End diff --
No but we forgot it in the previous commit.
---
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/6323#discussion_r202506126
--- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java ---
@@ -54,51 +56,105 @@
* override {@link #createKafkaConsumer(String, Properties, DeserializationSchema)}}.
*/
@Internal
-public abstract class KafkaTableSource
- implements StreamTableSource<Row>, DefinedProctimeAttribute, DefinedRowtimeAttributes {
+public abstract class KafkaTableSource implements
+ StreamTableSource<Row>,
+ DefinedProctimeAttribute,
+ DefinedRowtimeAttributes,
+ DefinedFieldMapping {
+
+ // common table source attributes
+ // TODO make all attributes final once we drop support for format-specific table sources
/** The schema of the table. */
private final TableSchema schema;
+ /** Field name of the processing time attribute, null if no processing time field is defined. */
+ private String proctimeAttribute;
+
+ /** Descriptor for a rowtime attribute. */
+ private List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors;
+
+ /** Mapping for the fields of the table schema to fields of the physical returned type or null. */
+ private Map<String, String> fieldMapping;
+
+ // Kafka-specific attributes
+
/** The Kafka topic to consume. */
private final String topic;
/** Properties for the Kafka consumer. */
private final Properties properties;
- /** Type information describing the result type. */
- private TypeInformation<Row> returnType;
-
- /** Field name of the processing time attribute, null if no processing time field is defined. */
- private String proctimeAttribute;
-
- /** Descriptor for a rowtime attribute. */
- private List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors;
+ /** Deserialization schema for decoding records from Kafka. */
+ private final DeserializationSchema<Row> deserializationSchema;
/** The startup mode for the contained consumer (default is {@link StartupMode#GROUP_OFFSETS}). */
- private StartupMode startupMode = StartupMode.GROUP_OFFSETS;
+ private StartupMode startupMode;
/** Specific startup offsets; only relevant when startup mode is {@link StartupMode#SPECIFIC_OFFSETS}. */
private Map<KafkaTopicPartition, Long> specificStartupOffsets;
/**
* Creates a generic Kafka {@link StreamTableSource}.
*
- * @param topic Kafka topic to consume.
- * @param properties Properties for the Kafka consumer.
- * @param schema Schema of the produced table.
- * @param returnType Type information of the produced physical DataStream.
+ * @param schema Schema of the produced table.
+ * @param proctimeAttribute Field name of the processing time attribute, null if no
+ * processing time field is defined.
+ * @param rowtimeAttributeDescriptors Descriptor for a rowtime attribute
+ * @param fieldMapping Mapping for the fields of the table schema to
--- End diff --
Backward compatibility. It could have been null in the past.
---