You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/07/15 09:24:48 UTC
[03/13] flink git commit: [FLINK-8866] [table] Merge table
source/sink/format factories
http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala
new file mode 100644
index 0000000..3baff8e
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories
+
+import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap}
+
+import org.apache.flink.table.api._
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
+import org.apache.flink.table.descriptors.FormatDescriptorValidator._
+import org.apache.flink.table.descriptors.MetadataValidator._
+import org.apache.flink.table.descriptors.StatisticsValidator._
+import org.apache.flink.table.descriptors._
+import org.apache.flink.table.util.Logging
+import org.apache.flink.util.Preconditions
+
+import _root_.scala.collection.JavaConverters._
+import _root_.scala.collection.mutable
+
+/**
+ * Unified interface to search for a [[TableFactory]] of provided type and properties.
+ */
+object TableFactoryService extends Logging {
+
+ private lazy val defaultLoader = ServiceLoader.load(classOf[TableFactory])
+
+ /**
+ * Finds a table factory of the given class and descriptor.
+ *
+ * @param factoryClass desired factory class
+ * @param descriptor descriptor describing the factory configuration
+ * @tparam T factory class type
+ * @return the matching factory
+ */
+ def find[T](factoryClass: Class[T], descriptor: Descriptor): T = {
+ Preconditions.checkNotNull(descriptor)
+
+ val descriptorProperties = new DescriptorProperties()
+ descriptor.addProperties(descriptorProperties)
+ findInternal(factoryClass, descriptorProperties.asMap, None)
+ }
+
+ /**
+ * Finds a table factory of the given class, descriptor, and classloader.
+ *
+ * @param factoryClass desired factory class
+ * @param descriptor descriptor describing the factory configuration
+ * @param classLoader classloader for service loading
+ * @tparam T factory class type
+ * @return the matching factory
+ */
+ def find[T](factoryClass: Class[T], descriptor: Descriptor, classLoader: ClassLoader): T = {
+ Preconditions.checkNotNull(descriptor)
+ Preconditions.checkNotNull(classLoader)
+
+ val descriptorProperties = new DescriptorProperties()
+ descriptor.addProperties(descriptorProperties)
+ findInternal(factoryClass, descriptorProperties.asMap, Some(classLoader))
+ }
+
+ /**
+ * Finds a table factory of the given class and property map.
+ *
+ * @param factoryClass desired factory class
+ * @param propertyMap properties that describe the factory configuration
+ * @tparam T factory class type
+ * @return the matching factory
+ */
+ def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): T = {
+ findInternal(factoryClass, propertyMap, None)
+ }
+
+ /**
+ * Finds a table factory of the given class, property map, and classloader.
+ *
+ * @param factoryClass desired factory class
+ * @param propertyMap properties that describe the factory configuration
+ * @param classLoader classloader for service loading
+ * @tparam T factory class type
+ * @return the matching factory
+ */
+ def find[T](
+ factoryClass: Class[T],
+ propertyMap: JMap[String, String],
+ classLoader: ClassLoader)
+ : T = {
+ Preconditions.checkNotNull(classLoader)
+
+ findInternal(factoryClass, propertyMap, Some(classLoader))
+ }
+
+ /**
+ * Finds a table factory of the given class, property map, and classloader.
+ *
+ * @param factoryClass desired factory class
+ * @param propertyMap properties that describe the factory configuration
+ * @param classLoader classloader for service loading
+ * @tparam T factory class type
+ * @return the matching factory
+ */
+ private def findInternal[T](
+ factoryClass: Class[T],
+ propertyMap: JMap[String, String],
+ classLoader: Option[ClassLoader])
+ : T = {
+
+ Preconditions.checkNotNull(factoryClass)
+ Preconditions.checkNotNull(propertyMap)
+
+ val properties = propertyMap.asScala.toMap
+
+ val foundFactories = discoverFactories(classLoader)
+
+ val classFactories = filterByFactoryClass(
+ factoryClass,
+ properties,
+ foundFactories)
+
+ val contextFactories = filterByContext(
+ factoryClass,
+ properties,
+ foundFactories,
+ classFactories)
+
+ filterBySupportedProperties(
+ factoryClass,
+ properties,
+ foundFactories,
+ contextFactories)
+ }
+
+ /**
+ * Searches for factories using Java service providers.
+ *
+ * @return all factories in the classpath
+ */
+ private def discoverFactories[T](classLoader: Option[ClassLoader]): Seq[TableFactory] = {
+ try {
+ val iterator = classLoader match {
+ case Some(customClassLoader) =>
+ val customLoader = ServiceLoader.load(classOf[TableFactory], customClassLoader)
+ customLoader.iterator()
+ case None =>
+ defaultLoader.iterator()
+ }
+ iterator.asScala.toSeq
+ } catch {
+ case e: ServiceConfigurationError =>
+ LOG.error("Could not load service provider for table factories.", e)
+ throw new TableException("Could not load service provider for table factories.", e)
+ }
+ }
+
+ /**
+ * Filters factories with matching context by factory class.
+ */
+ private def filterByFactoryClass[T](
+ factoryClass: Class[T],
+ properties: Map[String, String],
+ foundFactories: Seq[TableFactory])
+ : Seq[TableFactory] = {
+
+ val classFactories = foundFactories.filter(f => factoryClass.isAssignableFrom(f.getClass))
+ if (classFactories.isEmpty) {
+ throw new NoMatchingTableFactoryException(
+ s"No factory implements '${factoryClass.getCanonicalName}'.",
+ factoryClass,
+ foundFactories,
+ properties)
+ }
+ classFactories
+ }
+
+ /**
+ * Filters for factories with matching context.
+ *
+ * @return all matching factories
+ */
+ private def filterByContext[T](
+ factoryClass: Class[T],
+ properties: Map[String, String],
+ foundFactories: Seq[TableFactory],
+ classFactories: Seq[TableFactory])
+ : Seq[TableFactory] = {
+
+ val matchingFactories = classFactories.filter { factory =>
+ val requestedContext = normalizeContext(factory)
+
+ val plainContext = mutable.Map[String, String]()
+ plainContext ++= requestedContext
+ // we remove the version for now until we have the first backwards compatibility case
+ // with the version we can provide mappings in case the format changes
+ plainContext.remove(CONNECTOR_PROPERTY_VERSION)
+ plainContext.remove(FORMAT_PROPERTY_VERSION)
+ plainContext.remove(METADATA_PROPERTY_VERSION)
+ plainContext.remove(STATISTICS_PROPERTY_VERSION)
+
+ // check if required context is met
+ plainContext.forall(e => properties.contains(e._1) && properties(e._1) == e._2)
+ }
+
+ if (matchingFactories.isEmpty) {
+ throw new NoMatchingTableFactoryException(
+ "No context matches.",
+ factoryClass,
+ foundFactories,
+ properties)
+ }
+
+ matchingFactories
+ }
+
+ /**
+ * Prepares the properties of a context to be used for match operations.
+ */
+ private def normalizeContext(factory: TableFactory): Map[String, String] = {
+ val requiredContextJava = factory.requiredContext()
+ if (requiredContextJava == null) {
+ throw new TableException(
+ s"Required context of factory '${factory.getClass.getName}' must not be null.")
+ }
+ requiredContextJava.asScala.map(e => (e._1.toLowerCase, e._2)).toMap
+ }
+
+ /**
+ * Filters the matching class factories by supported properties.
+ */
+ private def filterBySupportedProperties[T](
+ factoryClass: Class[T],
+ properties: Map[String, String],
+ foundFactories: Seq[TableFactory],
+ classFactories: Seq[TableFactory])
+ : T = {
+
+ val plainGivenKeys = mutable.ArrayBuffer[String]()
+ properties.keys.foreach { k =>
+ // replace arrays with wildcard
+ val key = k.replaceAll(".\\d+", ".#")
+ // ignore duplicates
+ if (!plainGivenKeys.contains(key)) {
+ plainGivenKeys += key
+ }
+ }
+ var lastKey: Option[String] = None
+ val supportedFactories = classFactories.filter { factory =>
+ val requiredContextKeys = normalizeContext(factory).keySet
+ val (supportedKeys, wildcards) = normalizeSupportedProperties(factory)
+ // ignore context keys
+ val givenContextFreeKeys = plainGivenKeys.filter(!requiredContextKeys.contains(_))
+ // perform factory specific filtering of keys
+ val givenFilteredKeys = filterSupportedPropertiesFactorySpecific(
+ factory,
+ givenContextFreeKeys)
+
+ givenFilteredKeys.forall { k =>
+ lastKey = Option(k)
+ supportedKeys.contains(k) || wildcards.exists(k.startsWith)
+ }
+ }
+
+ if (supportedFactories.isEmpty && classFactories.length == 1 && lastKey.isDefined) {
+ // special case: when there is only one matching factory but the last property key
+ // was incorrect
+ val factory = classFactories.head
+ val (supportedKeys, _) = normalizeSupportedProperties(factory)
+ throw new NoMatchingTableFactoryException(
+ s"""
+ |The matching factory '${factory.getClass.getName}' doesn't support '${lastKey.get}'.
+ |
+ |Supported properties of this factory are:
+ |${supportedKeys.sorted.mkString("\n")}""".stripMargin,
+ factoryClass,
+ foundFactories,
+ properties)
+ } else if (supportedFactories.isEmpty) {
+ throw new NoMatchingTableFactoryException(
+ s"No factory supports all properties.",
+ factoryClass,
+ foundFactories,
+ properties)
+ } else if (supportedFactories.length > 1) {
+ throw new AmbiguousTableFactoryException(
+ supportedFactories,
+ factoryClass,
+ foundFactories,
+ properties)
+ }
+
+ supportedFactories.head.asInstanceOf[T]
+ }
+
+ /**
+ * Prepares the supported properties of a factory to be used for match operations.
+ */
+ private def normalizeSupportedProperties(factory: TableFactory): (Seq[String], Seq[String]) = {
+ val supportedPropertiesJava = factory.supportedProperties()
+ if (supportedPropertiesJava == null) {
+ throw new TableException(
+ s"Supported properties of factory '${factory.getClass.getName}' must not be null.")
+ }
+ val supportedKeys = supportedPropertiesJava.asScala.map(_.toLowerCase)
+
+ // extract wildcard prefixes
+ val wildcards = extractWildcardPrefixes(supportedKeys)
+
+ (supportedKeys, wildcards)
+ }
+
+ /**
+ * Converts the prefix of properties with wildcards (e.g., "format.*").
+ */
+ private def extractWildcardPrefixes(propertyKeys: Seq[String]): Seq[String] = {
+ propertyKeys
+ .filter(_.endsWith("*"))
+ .map(s => s.substring(0, s.length - 1))
+ }
+
+ /**
+ * Performs filtering for special cases (i.e. table format factories with schema derivation).
+ */
+ private def filterSupportedPropertiesFactorySpecific(
+ factory: TableFactory,
+ keys: Seq[String])
+ : Seq[String] = factory match {
+
+ case formatFactory: TableFormatFactory[_] =>
+ val includeSchema = formatFactory.supportsSchemaDerivation()
+ // ignore non-format (or schema) keys
+ keys.filter { k =>
+ if (includeSchema) {
+ k.startsWith(SchemaValidator.SCHEMA + ".") ||
+ k.startsWith(FormatDescriptorValidator.FORMAT + ".")
+ } else {
+ k.startsWith(FormatDescriptorValidator.FORMAT + ".")
+ }
+ }
+
+ case _ =>
+ keys
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFormatFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFormatFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFormatFactory.scala
new file mode 100644
index 0000000..8fa6fad
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFormatFactory.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories
+
+import java.util
+
+/**
+ * A factory to create configured table format instances based on string-based properties. See
+ * also [[TableFactory]] for more information.
+ *
+ * @tparam T record type that the format produces or consumes
+ */
+trait TableFormatFactory[T] extends TableFactory {
+
+ /**
+ * Flag to indicate if the given format supports deriving information from a schema. If the
+ * format can handle schema information, those properties must be added to the list of
+ * supported properties.
+ */
+ def supportsSchemaDerivation(): Boolean
+
+ /**
+ * List of format property keys that this factory can handle. This method will be used for
+ * validation. If a property is passed that this factory cannot handle, an exception will be
+ * thrown. The list must not contain the keys that are specified by the context.
+ *
+ * Example format properties might be:
+ * - format.line-delimiter
+ * - format.ignore-parse-errors
+ * - format.fields.#.type
+ * - format.fields.#.name
+ *
+ * If schema derivation is enabled, the list must include schema properties:
+ * - schema.#.name
+ * - schema.#.type
+ *
+ * Note: All supported format properties must be prefixed with "format.". If schema derivation is
+ * enabled, also properties with "schema." prefix can be used.
+ *
+ * Use "#" to denote an array of values where "#" represents one or more digits. Property
+ * versions like "format.property-version" must not be part of the supported properties.
+ *
+ * @see See also [[TableFactory.supportedProperties()]] for more information.
+ */
+ def supportedProperties(): util.List[String]
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableSinkFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableSinkFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableSinkFactory.scala
new file mode 100644
index 0000000..fc7b365
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableSinkFactory.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories
+
+import org.apache.flink.table.sinks.TableSink
+
+import java.util
+
+trait TableSinkFactory[T] {
+ /**
+ * Creates and configures a [[org.apache.flink.table.sinks.TableSink]]
+ * using the given properties.
+ *
+ * @param properties normalized properties describing a table source.
+ * @return the configured table source.
+ */
+ def createTableSink(properties: util.Map[String, String]): TableSink[T]
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableSourceFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableSourceFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableSourceFactory.scala
new file mode 100644
index 0000000..06bab6c
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableSourceFactory.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories
+
+import org.apache.flink.table.sources.TableSource
+
+import java.util
+
+trait TableSourceFactory[T] extends TableFactory {
+
+ /**
+ * Creates and configures a [[org.apache.flink.table.sources.TableSource]]
+ * using the given properties.
+ *
+ * @param properties normalized properties describing a table source.
+ * @return the configured table source.
+ */
+ def createTableSource(properties: util.Map[String, String]): TableSource[T]
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/formats/DeserializationSchemaFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/formats/DeserializationSchemaFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/formats/DeserializationSchemaFactory.scala
deleted file mode 100644
index da79185..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/formats/DeserializationSchemaFactory.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.formats
-
-import java.util
-
-import org.apache.flink.api.common.serialization.DeserializationSchema
-
-/**
- * Factory for creating configured instances of [[DeserializationSchema]].
- *
- * @tparam T record type that the format produces or consumes
- */
-trait DeserializationSchemaFactory[T] extends TableFormatFactory[T] {
-
- /**
- * Creates and configures a [[DeserializationSchema]] using the given properties.
- *
- * @param properties normalized properties describing the format
- * @return the configured serialization schema or null if the factory cannot provide an
- * instance of this class
- */
- def createDeserializationSchema(properties: util.Map[String, String]): DeserializationSchema[T]
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/formats/SerializationSchemaFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/formats/SerializationSchemaFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/formats/SerializationSchemaFactory.scala
deleted file mode 100644
index e4818cd..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/formats/SerializationSchemaFactory.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.formats
-
-import java.util
-
-import org.apache.flink.api.common.serialization.SerializationSchema
-
-/**
- * Factory for creating configured instances of [[SerializationSchema]].
- *
- * @tparam T record type that the format produces or consumes
- */
-trait SerializationSchemaFactory[T] extends TableFormatFactory[T] {
-
- /**
- * Creates and configures a [[SerializationSchema]] using the given properties.
- *
- * @param properties normalized properties describing the format
- * @return the configured serialization schema or null if the factory cannot provide an
- * instance of this class
- */
- def createSerializationSchema(properties: util.Map[String, String]): SerializationSchema[T]
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/formats/TableFormatFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/formats/TableFormatFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/formats/TableFormatFactory.scala
deleted file mode 100644
index 3afef83..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/formats/TableFormatFactory.scala
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.formats
-
-import java.util
-
-import org.apache.flink.api.common.serialization.{DeserializationSchema, SerializationSchema}
-
-/**
- * A factory to create different table format instances. This factory is used with Java's Service
- * Provider Interfaces (SPI) for discovering. A factory is called with a set of normalized
- * properties that describe the desired format. The factory allows for matching to the given set of
- * properties. See also [[SerializationSchemaFactory]] and [[DeserializationSchemaFactory]] for
- * creating configured instances of format classes accordingly.
- *
- * Classes that implement this interface need to be added to the
- * "META_INF/services/org.apache.flink.table.formats.TableFormatFactory' file of a JAR file in
- * the current classpath to be found.
- *
- * @tparam T record type that the format produces or consumes
- */
-trait TableFormatFactory[T] {
-
- /**
- * Specifies the context that this factory has been implemented for. The framework guarantees
- * to only use the factory if the specified set of properties and values are met.
- *
- * Typical properties might be:
- * - format.type
- * - format.version
- *
- * Specified property versions allow the framework to provide backwards compatible properties
- * in case of string format changes:
- * - format.property-version
- *
- * An empty context means that the factory matches for all requests.
- */
- def requiredContext(): util.Map[String, String]
-
- /**
- * Flag to indicate if the given format supports deriving information from a schema. If the
- * format can handle schema information, those properties must be added to the list of
- * supported properties.
- */
- def supportsSchemaDerivation(): Boolean
-
- /**
- * List of format property keys that this factory can handle. This method will be used for
- * validation. If a property is passed that this factory cannot handle, an exception will be
- * thrown. The list must not contain the keys that are specified by the context.
- *
- * Example format properties might be:
- * - format.line-delimiter
- * - format.ignore-parse-errors
- * - format.fields.#.type
- * - format.fields.#.name
- *
- * If schema derivation is enabled, the list must include schema properties:
- * - schema.#.name
- * - schema.#.type
- *
- * Note: Supported format properties must be prefixed with "format.". If schema derivation is
- * enabled, also properties with "schema." prefix can be used. Use "#" to denote an array of
- * values where "#" represents one or more digits. Property versions like
- * "format.property-version" must not be part of the supported properties.
- */
- def supportedProperties(): util.List[String]
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/formats/TableFormatFactoryService.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/formats/TableFormatFactoryService.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/formats/TableFormatFactoryService.scala
deleted file mode 100644
index 44911a4..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/formats/TableFormatFactoryService.scala
+++ /dev/null
@@ -1,271 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.formats
-
-import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap}
-
-import org.apache.flink.table.api._
-import org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_PROPERTY_VERSION
-import org.apache.flink.table.descriptors._
-import org.apache.flink.table.util.Logging
-import org.apache.flink.util.Preconditions
-
-import _root_.scala.collection.JavaConverters._
-import _root_.scala.collection.mutable
-
-/**
- * Service provider interface for finding a suitable [[TableFormatFactory]] for the
- * given properties.
- */
-object TableFormatFactoryService extends Logging {
-
- private lazy val defaultLoader = ServiceLoader.load(classOf[TableFormatFactory[_]])
-
- /**
- * Finds a table format factory of the given class and creates configured instances from the
- * given property map.
- *
- * @param factoryClass desired format factory
- * @param propertyMap properties that describes the format
- * @tparam T factory class type
- * @return configured instance from factory
- */
- def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): T = {
- findInternal(factoryClass, propertyMap, None)
- }
-
- /**
- * Finds a table format factory of the given class and creates configured instances from the
- * given property map and classloader.
- *
- * @param factoryClass desired format factory
- * @param propertyMap properties that describes the format
- * @param classLoader classloader for service loading
- * @tparam T factory class type
- * @return configured instance from factory
- */
- def find[T](
- factoryClass: Class[T],
- propertyMap: JMap[String, String],
- classLoader: ClassLoader)
- : T = {
- Preconditions.checkNotNull(classLoader)
- findInternal(factoryClass, propertyMap, Some(classLoader))
- }
-
- /**
- * Finds a table format factory of the given class and creates configured instances from the
- * given property map and classloader.
- *
- * @param factoryClass desired format factory
- * @param propertyMap properties that describes the format
- * @param classLoader optional classloader for service loading
- * @tparam T factory class type
- * @return configured instance from factory
- */
- private def findInternal[T](
- factoryClass: Class[T],
- propertyMap: JMap[String, String],
- classLoader: Option[ClassLoader])
- : T = {
-
- Preconditions.checkNotNull(factoryClass)
- Preconditions.checkNotNull(propertyMap)
-
- val properties = propertyMap.asScala.toMap
-
- // find matching context
- val (foundFactories, contextFactories) = findMatchingContext(
- factoryClass,
- properties,
- classLoader)
-
- // filter by factory class
- val classFactories = filterByFactoryClass(
- factoryClass,
- properties,
- foundFactories,
- contextFactories)
-
- // filter by supported keys
- filterBySupportedProperties(
- factoryClass,
- properties,
- foundFactories,
- classFactories)
- }
-
- private def findMatchingContext[T](
- factoryClass: Class[T],
- properties: Map[String, String],
- classLoader: Option[ClassLoader])
- : (Seq[TableFormatFactory[_]], Seq[TableFormatFactory[_]]) = {
-
- val foundFactories = mutable.ArrayBuffer[TableFormatFactory[_]]()
- val matchingFactories = mutable.ArrayBuffer[TableFormatFactory[_]]()
-
- try {
- val iter = classLoader match {
- case Some(customClassLoader) =>
- val customLoader = ServiceLoader.load(classOf[TableFormatFactory[_]], customClassLoader)
- customLoader.iterator()
- case None =>
- defaultLoader.iterator()
- }
-
- while (iter.hasNext) {
- val factory = iter.next()
- foundFactories += factory
-
- val requestedContext = normalizeContext(factory)
-
- val plainContext = mutable.Map[String, String]()
- plainContext ++= requestedContext
- // we remove the version for now until we have the first backwards compatibility case
- // with the version we can provide mappings in case the format changes
- plainContext.remove(FORMAT_PROPERTY_VERSION)
-
- // check if required context is met
- if (plainContext.forall(e => properties.contains(e._1) && properties(e._1) == e._2)) {
- matchingFactories += factory
- }
- }
- } catch {
- case e: ServiceConfigurationError =>
- LOG.error("Could not load service provider for table format factories.", e)
- throw new TableException("Could not load service provider for table format factories.", e)
- }
-
- if (matchingFactories.isEmpty) {
- throw new NoMatchingTableFormatException(
- "No context matches.",
- factoryClass,
- foundFactories,
- properties)
- }
-
- (foundFactories, matchingFactories)
- }
-
- private def normalizeContext(factory: TableFormatFactory[_]): Map[String, String] = {
- val requiredContextJava = factory.requiredContext()
- if (requiredContextJava == null) {
- throw new TableException(
- s"Required context of format factory '${factory.getClass.getName}' must not be null.")
- }
- requiredContextJava.asScala.map(e => (e._1.toLowerCase, e._2)).toMap
- }
-
- private def filterByFactoryClass[T](
- factoryClass: Class[T],
- properties: Map[String, String],
- foundFactories: Seq[TableFormatFactory[_]],
- contextFactories: Seq[TableFormatFactory[_]])
- : Seq[TableFormatFactory[_]] = {
-
- val classFactories = contextFactories.filter(f => factoryClass.isAssignableFrom(f.getClass))
- if (classFactories.isEmpty) {
- throw new NoMatchingTableFormatException(
- s"No factory implements '${factoryClass.getCanonicalName}'.",
- factoryClass,
- foundFactories,
- properties)
- }
- classFactories
- }
-
- private def filterBySupportedProperties[T](
- factoryClass: Class[T],
- properties: Map[String, String],
- foundFactories: Seq[TableFormatFactory[_]],
- classFactories: Seq[TableFormatFactory[_]])
- : T = {
-
- val plainGivenKeys = mutable.ArrayBuffer[String]()
- properties.keys.foreach { k =>
- // replace arrays with wildcard
- val key = k.replaceAll(".\\d+", ".#")
- // ignore duplicates
- if (!plainGivenKeys.contains(key)) {
- plainGivenKeys += key
- }
- }
- var lastKey: Option[String] = None
- val supportedFactories = classFactories.filter { factory =>
- val requiredContextKeys = normalizeContext(factory).keySet
- val includeSchema = factory.supportsSchemaDerivation()
- val supportedKeys = normalizeSupportedProperties(factory)
- val givenKeys = plainGivenKeys
- // ignore context keys
- .filter(!requiredContextKeys.contains(_))
- // ignore non-format (or schema) keys
- .filter { k =>
- if (includeSchema) {
- k.startsWith(SchemaValidator.SCHEMA + ".") ||
- k.startsWith(FormatDescriptorValidator.FORMAT + ".")
- } else {
- k.startsWith(FormatDescriptorValidator.FORMAT + ".")
- }
- }
- givenKeys.forall { k =>
- lastKey = Option(k)
- supportedKeys.contains(k)
- }
- }
-
- if (supportedFactories.isEmpty && classFactories.length == 1 && lastKey.isDefined) {
- // special case: when there is only one matching factory but the last property key
- // was incorrect
- val factory = classFactories.head
- val supportedKeys = normalizeSupportedProperties(factory)
- throw new NoMatchingTableFormatException(
- s"""
- |The matching factory '${factory.getClass.getName}' doesn't support '${lastKey.get}'.
- |
- |Supported properties of this factory are:
- |${supportedKeys.sorted.mkString("\n")}""".stripMargin,
- factoryClass,
- foundFactories,
- properties)
- } else if (supportedFactories.isEmpty) {
- throw new NoMatchingTableFormatException(
- s"No factory supports all properties.",
- factoryClass,
- foundFactories,
- properties)
- } else if (supportedFactories.length > 1) {
- throw new AmbiguousTableFormatException(
- supportedFactories,
- factoryClass,
- foundFactories,
- properties)
- }
-
- supportedFactories.head.asInstanceOf[T]
- }
-
- private def normalizeSupportedProperties(factory: TableFormatFactory[_]): Seq[String] = {
- val supportedPropertiesJava = factory.supportedProperties()
- if (supportedPropertiesJava == null) {
- throw new TableException(
- s"Supported properties of format factory '${factory.getClass.getName}' must not be null.")
- }
- supportedPropertiesJava.asScala.map(_.toLowerCase)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactory.scala
index 8d0e8f9..5ecac27 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactory.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactory.scala
@@ -21,7 +21,7 @@ package org.apache.flink.table.sinks
import java.util
import org.apache.flink.table.api.TableException
-import org.apache.flink.table.connectors.{DiscoverableTableFactory, TableSinkFactory}
+import org.apache.flink.table.factories.{TableFactory, TableSinkFactory}
import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
import org.apache.flink.table.descriptors.CsvValidator._
import org.apache.flink.table.descriptors.DescriptorProperties._
@@ -34,7 +34,7 @@ import org.apache.flink.types.Row
/**
* Factory for creating configured instances of [[CsvTableSink]].
*/
-class CsvTableSinkFactory extends TableSinkFactory[Row] with DiscoverableTableFactory {
+class CsvTableSinkFactory extends TableSinkFactory[Row] with TableFactory {
override def requiredContext(): util.Map[String, String] = {
val context = new util.HashMap[String, String]()
http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactory.scala
index c8e4503..c513669 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactory.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactory.scala
@@ -21,7 +21,7 @@ package org.apache.flink.table.sources
import java.util
import org.apache.flink.table.api.TableException
-import org.apache.flink.table.connectors.{DiscoverableTableFactory, TableSourceFactory}
+import org.apache.flink.table.factories.{TableFactory, TableSourceFactory}
import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_PROPERTY_VERSION, CONNECTOR_TYPE}
import org.apache.flink.table.descriptors.CsvValidator._
import org.apache.flink.table.descriptors.DescriptorProperties.toScala
@@ -34,7 +34,7 @@ import org.apache.flink.types.Row
/**
* Factory for creating configured instances of [[CsvTableSource]].
*/
-class CsvTableSourceFactory extends TableSourceFactory[Row] with DiscoverableTableFactory {
+class CsvTableSourceFactory extends TableSourceFactory[Row] with TableFactory {
override def requiredContext(): util.Map[String, String] = {
val context = new util.HashMap[String, String]()
http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/test/resources/META-INF/services/org.apache.flink.table.connectors.DiscoverableTableFactory
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/resources/META-INF/services/org.apache.flink.table.connectors.DiscoverableTableFactory b/flink-libraries/flink-table/src/test/resources/META-INF/services/org.apache.flink.table.connectors.DiscoverableTableFactory
deleted file mode 100644
index ee8a444..0000000
--- a/flink-libraries/flink-table/src/test/resources/META-INF/services/org.apache.flink.table.connectors.DiscoverableTableFactory
+++ /dev/null
@@ -1,19 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-org.apache.flink.table.connectors.TestFixedFormatTableFactory
-org.apache.flink.table.connectors.TestWildcardFormatTableSourceFactory
-org.apache.flink.table.connectors.TestTableSinkFactory
-org.apache.flink.table.connectors.TestTableSourceFactory
http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-libraries/flink-table/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
new file mode 100644
index 0000000..c97fe8e
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.table.factories.utils.TestFixedFormatTableFactory
+org.apache.flink.table.factories.utils.TestWildcardFormatTableSourceFactory
+org.apache.flink.table.factories.utils.TestTableSinkFactory
+org.apache.flink.table.factories.utils.TestTableSourceFactory
+org.apache.flink.table.factories.utils.TestTableFormatFactory
+org.apache.flink.table.factories.utils.TestAmbiguousTableFormatFactory
http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/test/resources/META-INF/services/org.apache.flink.table.formats.TableFormatFactory
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/resources/META-INF/services/org.apache.flink.table.formats.TableFormatFactory b/flink-libraries/flink-table/src/test/resources/META-INF/services/org.apache.flink.table.formats.TableFormatFactory
deleted file mode 100644
index b5646a3..0000000
--- a/flink-libraries/flink-table/src/test/resources/META-INF/services/org.apache.flink.table.formats.TableFormatFactory
+++ /dev/null
@@ -1,17 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-org.apache.flink.table.formats.utils.TestTableFormatFactory
-org.apache.flink.table.formats.utils.TestAmbiguousTableFormatFactory
http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/connectors/TableSinkFactoryServiceTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/connectors/TableSinkFactoryServiceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/connectors/TableSinkFactoryServiceTest.scala
deleted file mode 100644
index 45d0af8..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/connectors/TableSinkFactoryServiceTest.scala
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.connectors
-
-import org.apache.flink.table.api.{NoMatchingTableFactoryException, ValidationException}
-import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
-import org.apache.flink.table.descriptors.FormatDescriptorValidator._
-import org.apache.flink.table.descriptors.TableDescriptorValidator
-import org.junit.Assert._
-import org.junit.Test
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-
-class TableSinkFactoryServiceTest {
- @Test
- def testValidProperties(): Unit = {
- val props = properties()
- assertTrue(TableFactoryService.find(classOf[TableSinkFactory[_]], props.toMap) != null)
- }
-
- @Test(expected = classOf[NoMatchingTableFactoryException])
- def testInvalidContext(): Unit = {
- val props = properties()
- props.put(CONNECTOR_TYPE, "FAIL")
- TableFactoryService.find(classOf[TableSinkFactory[_]], props.toMap)
- }
-
- @Test
- def testDifferentContextVersion(): Unit = {
- val props = properties()
- props.put(CONNECTOR_PROPERTY_VERSION, "2")
- // the table source should still be found
- assertTrue(TableFactoryService.find(classOf[TableSinkFactory[_]], props.toMap) != null)
- }
-
- @Test(expected = classOf[ValidationException])
- def testUnsupportedProperty(): Unit = {
- val props = properties()
- props.put("format.path_new", "/new/path")
- TableFactoryService.find(classOf[TableSinkFactory[_]], props.toMap)
- }
-
- @Test(expected = classOf[IllegalArgumentException])
- def testFailingFactory(): Unit = {
- val props = properties()
- props.put("failing", "true")
- TableFactoryService.find(classOf[TableSinkFactory[_]], props.toMap)
- .asInstanceOf[TableSinkFactory[_]].createTableSink(props.asJava)
- }
-
- private def properties(): mutable.Map[String, String] = {
- val properties = mutable.Map[String, String]()
- properties.put(TableDescriptorValidator.TABLE_TYPE, "sink")
- properties.put(CONNECTOR_TYPE, "test")
- properties.put(FORMAT_TYPE, "test")
- properties.put(CONNECTOR_PROPERTY_VERSION, "1")
- properties.put(FORMAT_PROPERTY_VERSION, "1")
- properties.put(FORMAT_PATH, "/path/to/target")
- properties.put("schema.0.name", "a")
- properties.put("schema.1.name", "b")
- properties.put("schema.2.name", "c")
- properties.put("schema.0.field.0.name", "a")
- properties.put("schema.0.field.1.name", "b")
- properties.put("schema.0.field.2.name", "c")
- properties.put("failing", "false")
- properties
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/connectors/TableSourceFactoryServiceTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/connectors/TableSourceFactoryServiceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/connectors/TableSourceFactoryServiceTest.scala
deleted file mode 100644
index b32d70a..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/connectors/TableSourceFactoryServiceTest.scala
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.connectors
-
-import org.apache.flink.table.api.{NoMatchingTableFactoryException, ValidationException}
-import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_PROPERTY_VERSION, CONNECTOR_TYPE}
-import org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_PROPERTY_VERSION, FORMAT_TYPE}
-import org.apache.flink.table.descriptors.TableDescriptorValidator
-import org.apache.flink.table.sources.TestWildcardFormatTableSourceFactory
-import org.junit.Assert.assertTrue
-import org.junit.Test
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-
-class TableSourceFactoryServiceTest {
-
- @Test
- def testValidProperties(): Unit = {
- val props = properties()
- props.put(CONNECTOR_TYPE, "fixed")
- props.put(FORMAT_TYPE, "test")
- assertTrue(TableFactoryService.find(classOf[TableSourceFactory[_]], props.toMap) != null)
- }
-
- @Test(expected = classOf[NoMatchingTableFactoryException])
- def testInvalidContext(): Unit = {
- val props = properties()
- props.put(CONNECTOR_TYPE, "FAIL")
- props.put(FORMAT_TYPE, "test")
- TableFactoryService.find(classOf[TableSourceFactory[_]], props.toMap)
- }
-
- @Test
- def testDifferentContextVersion(): Unit = {
- val props = properties()
- props.put(CONNECTOR_TYPE, "fixed")
- props.put(FORMAT_TYPE, "test")
- props.put(CONNECTOR_PROPERTY_VERSION, "2")
- // the table source should still be found
- assertTrue(TableFactoryService.find(classOf[TableSourceFactory[_]], props.toMap) != null)
- }
-
- @Test(expected = classOf[ValidationException])
- def testUnsupportedProperty(): Unit = {
- val props = properties()
- props.put(CONNECTOR_TYPE, "fixed")
- props.put(FORMAT_TYPE, "test")
- props.put("format.path_new", "/new/path")
- TableFactoryService.find(classOf[TableSourceFactory[_]], props.toMap)
- }
-
- @Test(expected = classOf[IllegalArgumentException])
- def testFailingFactory(): Unit = {
- val props = properties()
- props.put(CONNECTOR_TYPE, "fixed")
- props.put(FORMAT_TYPE, "test")
- props.put("failing", "true")
- TableFactoryService.find(classOf[TableSourceFactory[_]], props.toMap)
- .asInstanceOf[TableSourceFactory[_]]
- .createTableSource(props.asJava)
- }
-
- @Test
- def testWildcardFormat(): Unit = {
- val props = properties()
- props.put(CONNECTOR_TYPE, "wildcard")
- props.put(FORMAT_TYPE, "test")
- props.put("format.type", "not-test")
- props.put("format.not-test-property", "wildcard-property")
- val actualTableSource = TableFactoryService.find(classOf[TableSourceFactory[_]], props.toMap)
- assertTrue(actualTableSource.isInstanceOf[TestWildcardFormatTableSourceFactory])
- }
-
- private def properties(): mutable.Map[String, String] = {
- val properties = mutable.Map[String, String]()
- properties.put(
- TableDescriptorValidator.TABLE_TYPE,
- TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE)
- properties.put(CONNECTOR_TYPE, "test")
- properties.put(FORMAT_TYPE, "test")
- properties.put(CONNECTOR_PROPERTY_VERSION, "1")
- properties.put(FORMAT_PROPERTY_VERSION, "1")
- properties.put("format.path", "/path/to/target")
- properties.put("schema.0.name", "a")
- properties.put("schema.1.name", "b")
- properties.put("schema.2.name", "c")
- properties.put("schema.0.field.0.name", "a")
- properties.put("schema.0.field.1.name", "b")
- properties.put("schema.0.field.2.name", "c")
- properties.put("failing", "false")
- properties
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/connectors/TestFixedFormatTableFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/connectors/TestFixedFormatTableFactory.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/connectors/TestFixedFormatTableFactory.scala
deleted file mode 100644
index 7001d2d..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/connectors/TestFixedFormatTableFactory.scala
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.connectors
-
-import java.util
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.api.TableSchema
-import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_PROPERTY_VERSION, CONNECTOR_TYPE}
-import org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_PROPERTY_VERSION, FORMAT_TYPE}
-import org.apache.flink.table.sources.TableSource
-import org.apache.flink.types.Row
-
-/**
- * Table source factory for testing with a fixed format.
- */
-class TestFixedFormatTableFactory extends TableSourceFactory[Row] with DiscoverableTableFactory {
-
- override def requiredContext(): util.Map[String, String] = {
- val context = new util.HashMap[String, String]()
- context.put(CONNECTOR_TYPE, "fixed")
- context.put(FORMAT_TYPE, "test")
- context.put(CONNECTOR_PROPERTY_VERSION, "1")
- context.put(FORMAT_PROPERTY_VERSION, "1")
- context
- }
-
- override def supportedProperties(): util.List[String] = {
- val properties = new util.ArrayList[String]()
- properties.add("format.path")
- properties.add("schema.#.name")
- properties.add("schema.#.field.#.name")
- properties.add("failing")
- properties
- }
-
- override def createTableSource(properties: util.Map[String, String]): TableSource[Row] = {
- if (properties.get("failing") == "true") {
- throw new IllegalArgumentException("Error in this factory.")
- }
- new TableSource[Row] {
- override def getTableSchema: TableSchema = throw new UnsupportedOperationException()
-
- override def getReturnType: TypeInformation[Row] = throw new UnsupportedOperationException()
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/connectors/TestTableSinkFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/connectors/TestTableSinkFactory.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/connectors/TestTableSinkFactory.scala
deleted file mode 100644
index 2dfcdb7..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/connectors/TestTableSinkFactory.scala
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.connectors
-
-import java.util
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.connectors.TestTableSinkFactory.FORMAT_PATH
-import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
-import org.apache.flink.table.descriptors.FormatDescriptorValidator._
-import org.apache.flink.table.sinks.TableSink
-import org.apache.flink.types.Row
-
-class TestTableSinkFactory extends TableSinkFactory[Row] with DiscoverableTableFactory {
-
- override def requiredContext(): util.Map[String, String] = {
- val context = new util.HashMap[String, String]()
- context.put(CONNECTOR_TYPE, "test")
- context.put(FORMAT_TYPE, "test")
- context.put(CONNECTOR_PROPERTY_VERSION, "1")
- context.put(FORMAT_PROPERTY_VERSION, "1")
- context
- }
-
- override def supportedProperties(): util.List[String] = {
- val properties = new util.ArrayList[String]()
- // connector
- properties.add(FORMAT_PATH)
- properties.add("schema.#.name")
- properties.add("schema.#.field.#.name")
- properties.add("failing")
- properties
- }
-
- override def createTableSink(properties: util.Map[String, String]): TableSink[Row] = {
- if (properties.get("failing") == "true") {
- throw new IllegalArgumentException("Error in this factory.")
- }
- new TableSink[Row] {
- override def getOutputType: TypeInformation[Row] = throw new UnsupportedOperationException()
-
- override def getFieldNames: Array[String] = throw new UnsupportedOperationException()
-
- override def getFieldTypes: Array[TypeInformation[_]] =
- throw new UnsupportedOperationException()
-
- override def configure(fieldNames: Array[String],
- fieldTypes: Array[TypeInformation[_]]): TableSink[Row] =
- throw new UnsupportedOperationException()
- }
- }
-}
-
-object TestTableSinkFactory {
- val CONNECTOR_TYPE_VALUE_TEST = "test"
- val FORMAT_TYPE_VALUE_TEST = "test"
- val FORMAT_PATH = "format.path"
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/connectors/TestTableSourceFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/connectors/TestTableSourceFactory.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/connectors/TestTableSourceFactory.scala
deleted file mode 100644
index 345f47e..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/connectors/TestTableSourceFactory.scala
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.connectors
-
-import java.util
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.api.TableSchema
-import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_PROPERTY_VERSION, CONNECTOR_TYPE}
-import org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_PROPERTY_VERSION, FORMAT_TYPE}
-import org.apache.flink.table.sources.TableSource
-import org.apache.flink.types.Row
-
-/**
- * Table source factory for testing.
- */
-class TestTableSourceFactory extends TableSourceFactory[Row] with DiscoverableTableFactory {
-
- override def requiredContext(): util.Map[String, String] = {
- val context = new util.HashMap[String, String]()
- context.put(CONNECTOR_TYPE, "test")
- context.put(FORMAT_TYPE, "test")
- context.put(CONNECTOR_PROPERTY_VERSION, "1")
- context.put(FORMAT_PROPERTY_VERSION, "1")
- context
- }
-
- override def supportedProperties(): util.List[String] = {
- val properties = new util.ArrayList[String]()
- // connector
- properties.add("format.path")
- properties.add("schema.#.name")
- properties.add("schema.#.field.#.name")
- properties.add("failing")
- properties
- }
-
- override def createTableSource(properties: util.Map[String, String]): TableSource[Row] = {
- if (properties.get("failing") == "true") {
- throw new IllegalArgumentException("Error in this factory.")
- }
- new TableSource[Row] {
- override def getTableSchema: TableSchema = throw new UnsupportedOperationException()
-
- override def getReturnType: TypeInformation[Row] = throw new UnsupportedOperationException()
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/connectors/TestWildcardFormatTableSourceFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/connectors/TestWildcardFormatTableSourceFactory.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/connectors/TestWildcardFormatTableSourceFactory.scala
deleted file mode 100644
index c0bab21..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/connectors/TestWildcardFormatTableSourceFactory.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.connectors
-
-import java.util
-
-import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_PROPERTY_VERSION, CONNECTOR_TYPE}
-import org.apache.flink.table.sources.TableSource
-import org.apache.flink.types.Row
-
-/**
- * Table source factory for testing with a wildcard format ("format.*").
- */
-class TestWildcardFormatTableSourceFactory
- extends TableSourceFactory[Row]
- with DiscoverableTableFactory {
-
- override def requiredContext(): util.Map[String, String] = {
- val context = new util.HashMap[String, String]()
- context.put(CONNECTOR_TYPE, "wildcard")
- context.put(CONNECTOR_PROPERTY_VERSION, "1")
- context
- }
-
- override def supportedProperties(): util.List[String] = {
- val properties = new util.ArrayList[String]()
- properties.add("format.*")
- properties.add("schema.#.name")
- properties.add("schema.#.field.#.name")
- properties.add("failing")
- properties
- }
-
- override def createTableSource(properties: util.Map[String, String]): TableSource[Row] = {
- throw new UnsupportedOperationException()
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/TableFormatFactoryServiceTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/TableFormatFactoryServiceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/TableFormatFactoryServiceTest.scala
new file mode 100644
index 0000000..ab5a051
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/TableFormatFactoryServiceTest.scala
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories
+
+import java.util.{HashMap => JHashMap, Map => JMap}
+
+import org.apache.flink.table.api.{AmbiguousTableFactoryException, NoMatchingTableFactoryException}
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_PROPERTY_VERSION, CONNECTOR_TYPE}
+import org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_PROPERTY_VERSION, FORMAT_TYPE}
+import org.apache.flink.table.factories.TableFormatFactoryServiceTest._
+import org.apache.flink.table.factories.utils.{TestAmbiguousTableFormatFactory, TestTableFormatFactory}
+import org.junit.Assert.assertTrue
+import org.junit.Test
+
+/**
+ * Tests for testing format discovery using [[TableFactoryService]]. The tests assume the two
+ * format factories [[TestTableFormatFactory]] and [[TestAmbiguousTableFormatFactory]] are
+ * registered.
+ *
+ * The first format does not support SPECIAL_PATH but supports schema derivation. The
+ * latter format does not support UNIQUE_PROPERTY nor schema derivation. Both formats
+ * have the same context and support COMMON_PATH.
+ */
+class TableFormatFactoryServiceTest {
+
+ @Test
+ def testValidProperties(): Unit = {
+ val props = properties()
+ assertTrue(TableFactoryService.find(classOf[TableFormatFactory[_]], props)
+ .isInstanceOf[TestTableFormatFactory])
+ }
+
+ @Test
+ def testDifferentContextVersion(): Unit = {
+ val props = properties()
+ props.put(FORMAT_PROPERTY_VERSION, "2")
+ // for now we support any property version, the property version should not affect the
+ // discovery at the moment and thus the format should still be found
+ val foundFactory = TableFactoryService.find(classOf[TableFormatFactory[_]], props)
+ assertTrue(foundFactory.isInstanceOf[TestTableFormatFactory])
+ }
+
+ @Test
+ def testAmbiguousMoreSupportSelection(): Unit = {
+ val props = properties()
+ props.remove(UNIQUE_PROPERTY) // both formats match now
+ props.put(SPECIAL_PATH, "/what/ever") // now only TestAmbiguousTableFormatFactory
+ assertTrue(
+ TableFactoryService
+ .find(classOf[TableFormatFactory[_]], props)
+ .isInstanceOf[TestAmbiguousTableFormatFactory])
+ }
+
+ @Test
+ def testAmbiguousClassBasedSelection(): Unit = {
+ val props = properties()
+ props.remove(UNIQUE_PROPERTY) // both formats match now
+ assertTrue(
+ TableFactoryService
+ // we are looking for a particular class
+ .find(classOf[TestAmbiguousTableFormatFactory], props)
+ .isInstanceOf[TestAmbiguousTableFormatFactory])
+ }
+
+ @Test
+ def testAmbiguousSchemaBasedSelection(): Unit = {
+ val props = properties()
+ props.remove(UNIQUE_PROPERTY) // both formats match now
+ // this is unknown to the schema derivation factory
+ props.put("schema.unknown-schema-field", "unknown")
+
+ // the format with schema derivation feels not responsible because of this field,
+ // but since there is another format that feels responsible, no exception is thrown.
+ assertTrue(
+ TableFactoryService
+ .find(classOf[TableFormatFactory[_]], props)
+ .isInstanceOf[TestAmbiguousTableFormatFactory])
+ }
+
+ @Test(expected = classOf[NoMatchingTableFactoryException])
+ def testMissingClass(): Unit = {
+ val props = properties()
+ // this class is not a valid factory
+ TableFactoryService.find(classOf[TableFormatFactoryServiceTest], props)
+ }
+
+ @Test(expected = classOf[NoMatchingTableFactoryException])
+ def testInvalidContext(): Unit = {
+ val props = properties()
+ // no context specifies this
+ props.put(FORMAT_TYPE, "unknown_format_type")
+ TableFactoryService.find(classOf[TableFormatFactory[_]], props)
+ }
+
+ @Test(expected = classOf[NoMatchingTableFactoryException])
+ def testUnsupportedProperty(): Unit = {
+ val props = properties()
+ props.put("format.property_not_defined_by_any_factory", "/new/path")
+ TableFactoryService.find(classOf[TableFormatFactory[_]], props)
+ }
+
+ @Test(expected = classOf[AmbiguousTableFactoryException])
+ def testAmbiguousFactory(): Unit = {
+ val props = properties()
+ props.remove(UNIQUE_PROPERTY) // now both factories match
+ TableFactoryService.find(classOf[TableFormatFactory[_]], props)
+ }
+
+ private def properties(): JMap[String, String] = {
+ val properties = new JHashMap[String, String]()
+ properties.put(CONNECTOR_TYPE, "test")
+ properties.put(FORMAT_TYPE, TEST_FORMAT_TYPE)
+ properties.put(UNIQUE_PROPERTY, "true")
+ properties.put(CONNECTOR_PROPERTY_VERSION, "1")
+ properties.put(FORMAT_PROPERTY_VERSION, "1")
+ properties.put(COMMON_PATH, "/path/to/target")
+ properties.put("schema.0.name", "a")
+ properties.put("schema.1.name", "b")
+ properties.put("schema.2.name", "c")
+ properties
+ }
+}
+
+object TableFormatFactoryServiceTest {
+
+ val TEST_FORMAT_TYPE = "test-format"
+ val COMMON_PATH = "format.common-path"
+ val SPECIAL_PATH = "format.special-path"
+ val UNIQUE_PROPERTY = "format.unique-property"
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/TableSinkFactoryServiceTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/TableSinkFactoryServiceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/TableSinkFactoryServiceTest.scala
new file mode 100644
index 0000000..d0cd1e0
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/TableSinkFactoryServiceTest.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories
+
+import java.util.{HashMap => JHashMap, Map => JMap}
+
+import org.apache.flink.table.api.NoMatchingTableFactoryException
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
+import org.apache.flink.table.descriptors.FormatDescriptorValidator._
+import org.apache.flink.table.factories.utils.TestTableSinkFactory
+import org.apache.flink.table.factories.utils.TestTableSinkFactory._
+import org.junit.Assert._
+import org.junit.Test
+
+/**
+ * Tests for testing table sink discovery using [[TableFactoryService]]. The tests assume the
+ * table sink factory [[TestTableSinkFactory]] is registered.
+ */
+class TableSinkFactoryServiceTest {
+
+ @Test
+ def testValidProperties(): Unit = {
+ val props = properties()
+ assertTrue(TableFactoryService.find(classOf[TableSinkFactory[_]], props)
+ .isInstanceOf[TestTableSinkFactory])
+ }
+
+ @Test(expected = classOf[NoMatchingTableFactoryException])
+ def testInvalidContext(): Unit = {
+ val props = properties()
+ props.put(CONNECTOR_TYPE, "unknown-connector-type")
+ TableFactoryService.find(classOf[TableSinkFactory[_]], props)
+ }
+
+ @Test
+ def testDifferentContextVersion(): Unit = {
+ val props = properties()
+ props.put(CONNECTOR_PROPERTY_VERSION, "2")
+ // the table source should still be found
+ assertTrue(TableFactoryService.find(classOf[TableSinkFactory[_]], props)
+ .isInstanceOf[TestTableSinkFactory])
+ }
+
+ @Test(expected = classOf[NoMatchingTableFactoryException])
+ def testUnsupportedProperty(): Unit = {
+ val props = properties()
+ props.put("format.path_new", "/new/path")
+ TableFactoryService.find(classOf[TableSinkFactory[_]], props)
+ }
+
+ private def properties(): JMap[String, String] = {
+ val properties = new JHashMap[String, String]()
+ properties.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE_TEST)
+ properties.put(FORMAT_TYPE, FORMAT_TYPE_VALUE_TEST)
+ properties.put(CONNECTOR_PROPERTY_VERSION, "1")
+ properties.put(FORMAT_PROPERTY_VERSION, "1")
+ properties.put(FORMAT_PATH, "/path/to/target")
+ properties.put("schema.0.name", "a")
+ properties.put("schema.1.name", "b")
+ properties.put("schema.2.name", "c")
+ properties.put("schema.0.field.0.name", "a")
+ properties.put("schema.0.field.1.name", "b")
+ properties.put("schema.0.field.2.name", "c")
+ properties.put("failing", "false")
+ properties
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/TableSourceFactoryServiceTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/TableSourceFactoryServiceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/TableSourceFactoryServiceTest.scala
new file mode 100644
index 0000000..16f1853
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/TableSourceFactoryServiceTest.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories
+
+import java.util.{HashMap => JHashMap, Map => JMap}
+
+import org.apache.flink.table.api.NoMatchingTableFactoryException
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_PROPERTY_VERSION, CONNECTOR_TYPE}
+import org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_PROPERTY_VERSION, FORMAT_TYPE}
+import org.apache.flink.table.factories.utils.TestFixedFormatTableFactory.{CONNECTOR_TYPE_VALUE_FIXED, FORMAT_TYPE_VALUE_TEST}
+import org.apache.flink.table.factories.utils.TestWildcardFormatTableSourceFactory.CONNECTOR_TYPE_VALUE_WILDCARD
+import org.apache.flink.table.factories.utils.{TestFixedFormatTableFactory, TestWildcardFormatTableSourceFactory}
+import org.junit.Assert.assertTrue
+import org.junit.Test
+
+/**
+ * Tests for testing table source discovery using [[TableFactoryService]]. The tests assume the
+ * two table source factories [[TestFixedFormatTableFactory]] and
+ * [[TestWildcardFormatTableSourceFactory]] are registered.
+ *
+ * The first table source has a [[FORMAT_TYPE_VALUE_TEST]] type where as the second source uses
+ * a wildcard to match arbitrary formats.
+ */
+class TableSourceFactoryServiceTest {
+
+ @Test
+ def testValidProperties(): Unit = {
+ val props = properties()
+ props.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE_FIXED)
+ props.put(FORMAT_TYPE, FORMAT_TYPE_VALUE_TEST)
+ assertTrue(TableFactoryService.find(classOf[TableSourceFactory[_]], props)
+ .isInstanceOf[TestFixedFormatTableFactory])
+ }
+
+ @Test(expected = classOf[NoMatchingTableFactoryException])
+ def testInvalidContext(): Unit = {
+ val props = properties()
+ props.put(CONNECTOR_TYPE, "unknown-connector-type")
+ props.put(FORMAT_TYPE, FORMAT_TYPE_VALUE_TEST)
+ TableFactoryService.find(classOf[TableSourceFactory[_]], props)
+ }
+
+ @Test
+ def testDifferentContextVersion(): Unit = {
+ val props = properties()
+ props.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE_FIXED)
+ props.put(FORMAT_TYPE, FORMAT_TYPE_VALUE_TEST)
+ props.put(CONNECTOR_PROPERTY_VERSION, "2")
+ // the table source should still be found
+ assertTrue(TableFactoryService.find(classOf[TableSourceFactory[_]], props)
+ .isInstanceOf[TestFixedFormatTableFactory])
+ }
+
+ @Test(expected = classOf[NoMatchingTableFactoryException])
+ def testUnsupportedProperty(): Unit = {
+ val props = properties()
+ props.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE_FIXED)
+ props.put(FORMAT_TYPE, FORMAT_TYPE_VALUE_TEST)
+ props.put("format.unknown-format-type-property", "/new/path")
+ TableFactoryService.find(classOf[TableSourceFactory[_]], props)
+ }
+
+ @Test
+ def testWildcardFormat(): Unit = {
+ val props = properties()
+ props.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE_WILDCARD)
+ props.put("format.unknown-format-type-property", "wildcard-property")
+ val actualTableSource = TableFactoryService.find(classOf[TableSourceFactory[_]], props)
+ assertTrue(actualTableSource.isInstanceOf[TestWildcardFormatTableSourceFactory])
+ }
+
+ private def properties(): JMap[String, String] = {
+ val properties = new JHashMap[String, String]()
+ properties.put(CONNECTOR_PROPERTY_VERSION, "1")
+ properties.put(FORMAT_PROPERTY_VERSION, "1")
+ properties.put("format.path", "/path/to/target")
+ properties.put("schema.0.name", "a")
+ properties.put("schema.1.name", "b")
+ properties.put("schema.2.name", "c")
+ properties.put("schema.0.field.0.name", "a")
+ properties.put("schema.0.field.1.name", "b")
+ properties.put("schema.0.field.2.name", "c")
+ properties
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestAmbiguousTableFormatFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestAmbiguousTableFormatFactory.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestAmbiguousTableFormatFactory.scala
new file mode 100644
index 0000000..f8c9078
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestAmbiguousTableFormatFactory.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories.utils
+
+import java.util
+
+import org.apache.flink.table.descriptors.FormatDescriptorValidator
+import org.apache.flink.table.factories.{TableFormatFactory, TableFormatFactoryServiceTest}
+import org.apache.flink.types.Row
+
+/**
+ * Table format factory for testing.
+ *
+ * It does not support UNIQUE_PROPERTY compared to [[TestTableFormatFactory]] nor
+ * schema derivation. Both formats have the same context and support COMMON_PATH.
+ */
+class TestAmbiguousTableFormatFactory extends TableFormatFactory[Row] {
+
+ override def requiredContext(): util.Map[String, String] = {
+ val context = new util.HashMap[String, String]()
+ context.put(
+ FormatDescriptorValidator.FORMAT_TYPE,
+ TableFormatFactoryServiceTest.TEST_FORMAT_TYPE)
+ context.put(FormatDescriptorValidator.FORMAT_PROPERTY_VERSION, "1")
+ context
+ }
+
+ override def supportsSchemaDerivation(): Boolean = false // no schema derivation
+
+ override def supportedProperties(): util.List[String] = {
+ val properties = new util.ArrayList[String]()
+ properties.add(TableFormatFactoryServiceTest.COMMON_PATH)
+ properties.add(TableFormatFactoryServiceTest.SPECIAL_PATH)
+ properties
+ }
+}