You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/07/15 09:24:48 UTC

[03/13] flink git commit: [FLINK-8866] [table] Merge table source/sink/format factories

http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala
new file mode 100644
index 0000000..3baff8e
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories
+
+import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap}
+
+import org.apache.flink.table.api._
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
+import org.apache.flink.table.descriptors.FormatDescriptorValidator._
+import org.apache.flink.table.descriptors.MetadataValidator._
+import org.apache.flink.table.descriptors.StatisticsValidator._
+import org.apache.flink.table.descriptors._
+import org.apache.flink.table.util.Logging
+import org.apache.flink.util.Preconditions
+
+import _root_.scala.collection.JavaConverters._
+import _root_.scala.collection.mutable
+
+/**
+  * Unified interface to search for a [[TableFactory]] of provided type and properties.
+  */
+object TableFactoryService extends Logging {
+
+  private lazy val defaultLoader = ServiceLoader.load(classOf[TableFactory])
+
+  /**
+    * Finds a table factory of the given class and descriptor.
+    *
+    * @param factoryClass desired factory class
+    * @param descriptor descriptor describing the factory configuration
+    * @tparam T factory class type
+    * @return the matching factory
+    */
+  def find[T](factoryClass: Class[T], descriptor: Descriptor): T = {
+    Preconditions.checkNotNull(descriptor)
+
+    val descriptorProperties = new DescriptorProperties()
+    descriptor.addProperties(descriptorProperties)
+    findInternal(factoryClass, descriptorProperties.asMap, None)
+  }
+
+  /**
+    * Finds a table factory of the given class, descriptor, and classloader.
+    *
+    * @param factoryClass desired factory class
+    * @param descriptor descriptor describing the factory configuration
+    * @param classLoader classloader for service loading
+    * @tparam T factory class type
+    * @return the matching factory
+    */
+  def find[T](factoryClass: Class[T], descriptor: Descriptor, classLoader: ClassLoader): T = {
+    Preconditions.checkNotNull(descriptor)
+    Preconditions.checkNotNull(classLoader)
+
+    val descriptorProperties = new DescriptorProperties()
+    descriptor.addProperties(descriptorProperties)
+    findInternal(factoryClass, descriptorProperties.asMap, Some(classLoader))
+  }
+
+  /**
+    * Finds a table factory of the given class and property map.
+    *
+    * @param factoryClass desired factory class
+    * @param propertyMap properties that describe the factory configuration
+    * @tparam T factory class type
+    * @return the matching factory
+    */
+  def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): T = {
+    findInternal(factoryClass, propertyMap, None)
+  }
+
+  /**
+    * Finds a table factory of the given class, property map, and classloader.
+    *
+    * @param factoryClass desired factory class
+    * @param propertyMap properties that describe the factory configuration
+    * @param classLoader classloader for service loading
+    * @tparam T factory class type
+    * @return the matching factory
+    */
+  def find[T](
+      factoryClass: Class[T],
+      propertyMap: JMap[String, String],
+      classLoader: ClassLoader)
+    : T = {
+    Preconditions.checkNotNull(classLoader)
+
+    findInternal(factoryClass, propertyMap, Some(classLoader))
+  }
+
+  /**
+    * Finds a table factory of the given class, property map, and classloader.
+    *
+    * @param factoryClass desired factory class
+    * @param propertyMap properties that describe the factory configuration
+    * @param classLoader classloader for service loading
+    * @tparam T factory class type
+    * @return the matching factory
+    */
+  private def findInternal[T](
+      factoryClass: Class[T],
+      propertyMap: JMap[String, String],
+      classLoader: Option[ClassLoader])
+    : T = {
+
+    Preconditions.checkNotNull(factoryClass)
+    Preconditions.checkNotNull(propertyMap)
+
+    val properties = propertyMap.asScala.toMap
+
+    val foundFactories = discoverFactories(classLoader)
+
+    val classFactories = filterByFactoryClass(
+      factoryClass,
+      properties,
+      foundFactories)
+
+    val contextFactories = filterByContext(
+      factoryClass,
+      properties,
+      foundFactories,
+      classFactories)
+
+    filterBySupportedProperties(
+      factoryClass,
+      properties,
+      foundFactories,
+      contextFactories)
+  }
+
+  /**
+    * Searches for factories using Java service providers.
+    *
+    * @return all factories in the classpath
+    */
+  private def discoverFactories[T](classLoader: Option[ClassLoader]): Seq[TableFactory] = {
+    try {
+      val iterator = classLoader match {
+        case Some(customClassLoader) =>
+          val customLoader = ServiceLoader.load(classOf[TableFactory], customClassLoader)
+          customLoader.iterator()
+        case None =>
+          defaultLoader.iterator()
+      }
+      iterator.asScala.toSeq
+    } catch {
+      case e: ServiceConfigurationError =>
+        LOG.error("Could not load service provider for table factories.", e)
+        throw new TableException("Could not load service provider for table factories.", e)
+    }
+  }
+
+  /**
+    * Filters factories with matching context by factory class.
+    */
+  private def filterByFactoryClass[T](
+      factoryClass: Class[T],
+      properties: Map[String, String],
+      foundFactories: Seq[TableFactory])
+    : Seq[TableFactory] = {
+
+    val classFactories = foundFactories.filter(f => factoryClass.isAssignableFrom(f.getClass))
+    if (classFactories.isEmpty) {
+      throw new NoMatchingTableFactoryException(
+        s"No factory implements '${factoryClass.getCanonicalName}'.",
+        factoryClass,
+        foundFactories,
+        properties)
+    }
+    classFactories
+  }
+
+  /**
+    * Filters for factories with matching context.
+    *
+    * @return all matching factories
+    */
+  private def filterByContext[T](
+      factoryClass: Class[T],
+      properties: Map[String, String],
+      foundFactories: Seq[TableFactory],
+      classFactories: Seq[TableFactory])
+    : Seq[TableFactory] = {
+
+    val matchingFactories = classFactories.filter { factory =>
+      val requestedContext = normalizeContext(factory)
+
+      val plainContext = mutable.Map[String, String]()
+      plainContext ++= requestedContext
+      // we remove the version for now until we have the first backwards compatibility case
+      // with the version we can provide mappings in case the format changes
+      plainContext.remove(CONNECTOR_PROPERTY_VERSION)
+      plainContext.remove(FORMAT_PROPERTY_VERSION)
+      plainContext.remove(METADATA_PROPERTY_VERSION)
+      plainContext.remove(STATISTICS_PROPERTY_VERSION)
+
+      // check if required context is met
+      plainContext.forall(e => properties.contains(e._1) && properties(e._1) == e._2)
+    }
+
+    if (matchingFactories.isEmpty) {
+      throw new NoMatchingTableFactoryException(
+        "No context matches.",
+        factoryClass,
+        foundFactories,
+        properties)
+    }
+
+    matchingFactories
+  }
+
+  /**
+    * Prepares the properties of a context to be used for match operations.
+    */
+  private def normalizeContext(factory: TableFactory): Map[String, String] = {
+    val requiredContextJava = factory.requiredContext()
+    if (requiredContextJava == null) {
+      throw new TableException(
+        s"Required context of factory '${factory.getClass.getName}' must not be null.")
+    }
+    requiredContextJava.asScala.map(e => (e._1.toLowerCase, e._2)).toMap
+  }
+
+  /**
+    * Filters the matching class factories by supported properties.
+    */
+  private def filterBySupportedProperties[T](
+      factoryClass: Class[T],
+      properties: Map[String, String],
+      foundFactories: Seq[TableFactory],
+      classFactories: Seq[TableFactory])
+    : T = {
+
+    val plainGivenKeys = mutable.ArrayBuffer[String]()
+    properties.keys.foreach { k =>
+      // replace arrays with wildcard
+      val key = k.replaceAll(".\\d+", ".#")
+      // ignore duplicates
+      if (!plainGivenKeys.contains(key)) {
+        plainGivenKeys += key
+      }
+    }
+    var lastKey: Option[String] = None
+    val supportedFactories = classFactories.filter { factory =>
+      val requiredContextKeys = normalizeContext(factory).keySet
+      val (supportedKeys, wildcards) = normalizeSupportedProperties(factory)
+      // ignore context keys
+      val givenContextFreeKeys = plainGivenKeys.filter(!requiredContextKeys.contains(_))
+      // perform factory specific filtering of keys
+      val givenFilteredKeys = filterSupportedPropertiesFactorySpecific(
+        factory,
+        givenContextFreeKeys)
+
+      givenFilteredKeys.forall { k =>
+        lastKey = Option(k)
+        supportedKeys.contains(k) || wildcards.exists(k.startsWith)
+      }
+    }
+
+    if (supportedFactories.isEmpty && classFactories.length == 1 && lastKey.isDefined) {
+      // special case: when there is only one matching factory but the last property key
+      // was incorrect
+      val factory = classFactories.head
+      val (supportedKeys, _) = normalizeSupportedProperties(factory)
+      throw new NoMatchingTableFactoryException(
+        s"""
+          |The matching factory '${factory.getClass.getName}' doesn't support '${lastKey.get}'.
+          |
+          |Supported properties of this factory are:
+          |${supportedKeys.sorted.mkString("\n")}""".stripMargin,
+        factoryClass,
+        foundFactories,
+        properties)
+    } else if (supportedFactories.isEmpty) {
+      throw new NoMatchingTableFactoryException(
+        s"No factory supports all properties.",
+        factoryClass,
+        foundFactories,
+        properties)
+    } else if (supportedFactories.length > 1) {
+      throw new AmbiguousTableFactoryException(
+        supportedFactories,
+        factoryClass,
+        foundFactories,
+        properties)
+    }
+
+    supportedFactories.head.asInstanceOf[T]
+  }
+
+  /**
+    * Prepares the supported properties of a factory to be used for match operations.
+    */
+  private def normalizeSupportedProperties(factory: TableFactory): (Seq[String], Seq[String]) = {
+    val supportedPropertiesJava = factory.supportedProperties()
+    if (supportedPropertiesJava == null) {
+      throw new TableException(
+        s"Supported properties of factory '${factory.getClass.getName}' must not be null.")
+    }
+    val supportedKeys = supportedPropertiesJava.asScala.map(_.toLowerCase)
+
+    // extract wildcard prefixes
+    val wildcards = extractWildcardPrefixes(supportedKeys)
+
+    (supportedKeys, wildcards)
+  }
+
+  /**
+    * Converts the prefix of properties with wildcards (e.g., "format.*").
+    */
+  private def extractWildcardPrefixes(propertyKeys: Seq[String]): Seq[String] = {
+    propertyKeys
+      .filter(_.endsWith("*"))
+      .map(s => s.substring(0, s.length - 1))
+  }
+
+  /**
+    * Performs filtering for special cases (i.e. table format factories with schema derivation).
+    */
+  private def filterSupportedPropertiesFactorySpecific(
+      factory: TableFactory,
+      keys: Seq[String])
+    : Seq[String] = factory match {
+
+    case formatFactory: TableFormatFactory[_] =>
+      val includeSchema = formatFactory.supportsSchemaDerivation()
+      // ignore non-format (or schema) keys
+      keys.filter { k =>
+        if (includeSchema) {
+          k.startsWith(SchemaValidator.SCHEMA + ".") ||
+            k.startsWith(FormatDescriptorValidator.FORMAT + ".")
+        } else {
+          k.startsWith(FormatDescriptorValidator.FORMAT + ".")
+        }
+      }
+
+    case _ =>
+      keys
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFormatFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFormatFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFormatFactory.scala
new file mode 100644
index 0000000..8fa6fad
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFormatFactory.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories
+
+import java.util
+
+/**
+  * A factory to create configured table format instances based on string-based properties. See
+  * also [[TableFactory]] for more information.
+  *
+  * @tparam T record type that the format produces or consumes
+  */
+trait TableFormatFactory[T] extends TableFactory {
+
+  /**
+    * Flag to indicate if the given format supports deriving information from a schema. If the
+    * format can handle schema information, those properties must be added to the list of
+    * supported properties.
+    */
+  def supportsSchemaDerivation(): Boolean
+
+  /**
+    * List of format property keys that this factory can handle. This method will be used for
+    * validation. If a property is passed that this factory cannot handle, an exception will be
+    * thrown. The list must not contain the keys that are specified by the context.
+    *
+    * Example format properties might be:
+    *   - format.line-delimiter
+    *   - format.ignore-parse-errors
+    *   - format.fields.#.type
+    *   - format.fields.#.name
+    *
+    * If schema derivation is enabled, the list must include schema properties:
+    *   - schema.#.name
+    *   - schema.#.type
+    *
+    * Note: All supported format properties must be prefixed with "format.". If schema derivation is
+    * enabled, also properties with "schema." prefix can be used.
+    *
+    * Use "#" to denote an array of values where "#" represents one or more digits. Property
+    * versions like "format.property-version" must not be part of the supported properties.
+    *
+    * @see See also [[TableFactory.supportedProperties()]] for more information.
+    */
+  def supportedProperties(): util.List[String]
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableSinkFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableSinkFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableSinkFactory.scala
new file mode 100644
index 0000000..fc7b365
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableSinkFactory.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories
+
+import org.apache.flink.table.sinks.TableSink
+
+import java.util
+
+trait TableSinkFactory[T] {
+  /**
+    * Creates and configures a [[org.apache.flink.table.sinks.TableSink]]
+    * using the given properties.
+    *
+    * @param properties normalized properties describing a table source.
+    * @return the configured table source.
+    */
+  def createTableSink(properties: util.Map[String, String]): TableSink[T]
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableSourceFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableSourceFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableSourceFactory.scala
new file mode 100644
index 0000000..06bab6c
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableSourceFactory.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories
+
+import org.apache.flink.table.sources.TableSource
+
+import java.util
+
+trait TableSourceFactory[T] extends TableFactory {
+
+  /**
+    * Creates and configures a [[org.apache.flink.table.sources.TableSource]]
+    * using the given properties.
+    *
+    * @param properties normalized properties describing a table source.
+    * @return the configured table source.
+    */
+  def createTableSource(properties: util.Map[String, String]): TableSource[T]
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/formats/DeserializationSchemaFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/formats/DeserializationSchemaFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/formats/DeserializationSchemaFactory.scala
deleted file mode 100644
index da79185..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/formats/DeserializationSchemaFactory.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.formats
-
-import java.util
-
-import org.apache.flink.api.common.serialization.DeserializationSchema
-
-/**
-  * Factory for creating configured instances of [[DeserializationSchema]].
-  *
-  * @tparam T record type that the format produces or consumes
-  */
-trait DeserializationSchemaFactory[T] extends TableFormatFactory[T] {
-
-  /**
-    * Creates and configures a [[DeserializationSchema]] using the given properties.
-    *
-    * @param properties normalized properties describing the format
-    * @return the configured serialization schema or null if the factory cannot provide an
-    *         instance of this class
-    */
-  def createDeserializationSchema(properties: util.Map[String, String]): DeserializationSchema[T]
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/formats/SerializationSchemaFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/formats/SerializationSchemaFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/formats/SerializationSchemaFactory.scala
deleted file mode 100644
index e4818cd..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/formats/SerializationSchemaFactory.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.formats
-
-import java.util
-
-import org.apache.flink.api.common.serialization.SerializationSchema
-
-/**
-  * Factory for creating configured instances of [[SerializationSchema]].
-  *
-  * @tparam T record type that the format produces or consumes
-  */
-trait SerializationSchemaFactory[T] extends TableFormatFactory[T] {
-
-  /**
-    * Creates and configures a [[SerializationSchema]] using the given properties.
-    *
-    * @param properties normalized properties describing the format
-    * @return the configured serialization schema or null if the factory cannot provide an
-    *         instance of this class
-    */
-  def createSerializationSchema(properties: util.Map[String, String]): SerializationSchema[T]
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/formats/TableFormatFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/formats/TableFormatFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/formats/TableFormatFactory.scala
deleted file mode 100644
index 3afef83..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/formats/TableFormatFactory.scala
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.formats
-
-import java.util
-
-import org.apache.flink.api.common.serialization.{DeserializationSchema, SerializationSchema}
-
-/**
-  * A factory to create different table format instances. This factory is used with Java's Service
-  * Provider Interfaces (SPI) for discovering. A factory is called with a set of normalized
-  * properties that describe the desired format. The factory allows for matching to the given set of
-  * properties. See also [[SerializationSchemaFactory]] and [[DeserializationSchemaFactory]] for
-  * creating configured instances of format classes accordingly.
-  *
-  * Classes that implement this interface need to be added to the
-  * "META_INF/services/org.apache.flink.table.formats.TableFormatFactory' file of a JAR file in
-  * the current classpath to be found.
-  *
-  * @tparam T record type that the format produces or consumes
-  */
-trait TableFormatFactory[T] {
-
-  /**
-    * Specifies the context that this factory has been implemented for. The framework guarantees
-    * to only use the factory if the specified set of properties and values are met.
-    *
-    * Typical properties might be:
-    *   - format.type
-    *   - format.version
-    *
-    * Specified property versions allow the framework to provide backwards compatible properties
-    * in case of string format changes:
-    *   - format.property-version
-    *
-    * An empty context means that the factory matches for all requests.
-    */
-  def requiredContext(): util.Map[String, String]
-
-  /**
-    * Flag to indicate if the given format supports deriving information from a schema. If the
-    * format can handle schema information, those properties must be added to the list of
-    * supported properties.
-    */
-  def supportsSchemaDerivation(): Boolean
-
-  /**
-    * List of format property keys that this factory can handle. This method will be used for
-    * validation. If a property is passed that this factory cannot handle, an exception will be
-    * thrown. The list must not contain the keys that are specified by the context.
-    *
-    * Example format properties might be:
-    *   - format.line-delimiter
-    *   - format.ignore-parse-errors
-    *   - format.fields.#.type
-    *   - format.fields.#.name
-    *
-    * If schema derivation is enabled, the list must include schema properties:
-    *   - schema.#.name
-    *   - schema.#.type
-    *
-    * Note: Supported format properties must be prefixed with "format.". If schema derivation is
-    * enabled, also properties with "schema." prefix can be used. Use "#" to denote an array of
-    * values where "#" represents one or more digits. Property versions like
-    * "format.property-version" must not be part of the supported properties.
-    */
-  def supportedProperties(): util.List[String]
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/formats/TableFormatFactoryService.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/formats/TableFormatFactoryService.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/formats/TableFormatFactoryService.scala
deleted file mode 100644
index 44911a4..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/formats/TableFormatFactoryService.scala
+++ /dev/null
@@ -1,271 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.formats
-
-import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap}
-
-import org.apache.flink.table.api._
-import org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_PROPERTY_VERSION
-import org.apache.flink.table.descriptors._
-import org.apache.flink.table.util.Logging
-import org.apache.flink.util.Preconditions
-
-import _root_.scala.collection.JavaConverters._
-import _root_.scala.collection.mutable
-
-/**
-  * Service provider interface for finding a suitable [[TableFormatFactory]] for the
-  * given properties.
-  */
-object TableFormatFactoryService extends Logging {
-
-  private lazy val defaultLoader = ServiceLoader.load(classOf[TableFormatFactory[_]])
-
-  /**
-    * Finds a table format factory of the given class and creates configured instances from the
-    * given property map.
-    *
-    * @param factoryClass desired format factory
-    * @param propertyMap properties that describes the format
-    * @tparam T factory class type
-    * @return configured instance from factory
-    */
-  def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): T = {
-    findInternal(factoryClass, propertyMap, None)
-  }
-
-  /**
-    * Finds a table format factory of the given class and creates configured instances from the
-    * given property map and classloader.
-    *
-    * @param factoryClass desired format factory
-    * @param propertyMap properties that describes the format
-    * @param classLoader classloader for service loading
-    * @tparam T factory class type
-    * @return configured instance from factory
-    */
-  def find[T](
-      factoryClass: Class[T],
-      propertyMap: JMap[String, String],
-      classLoader: ClassLoader)
-    : T = {
-    Preconditions.checkNotNull(classLoader)
-    findInternal(factoryClass, propertyMap, Some(classLoader))
-  }
-
-  /**
-    * Finds a table format factory of the given class and creates configured instances from the
-    * given property map and classloader.
-    *
-    * @param factoryClass desired format factory
-    * @param propertyMap properties that describes the format
-    * @param classLoader optional classloader for service loading
-    * @tparam T factory class type
-    * @return configured instance from factory
-    */
-  private def findInternal[T](
-      factoryClass: Class[T],
-      propertyMap: JMap[String, String],
-      classLoader: Option[ClassLoader])
-    : T = {
-
-    Preconditions.checkNotNull(factoryClass)
-    Preconditions.checkNotNull(propertyMap)
-
-    val properties = propertyMap.asScala.toMap
-
-    // find matching context
-    val (foundFactories, contextFactories) = findMatchingContext(
-      factoryClass,
-      properties,
-      classLoader)
-
-    // filter by factory class
-    val classFactories = filterByFactoryClass(
-      factoryClass,
-      properties,
-      foundFactories,
-      contextFactories)
-
-    // filter by supported keys
-    filterBySupportedProperties(
-      factoryClass,
-      properties,
-      foundFactories,
-      classFactories)
-  }
-
-  private def findMatchingContext[T](
-      factoryClass: Class[T],
-      properties: Map[String, String],
-      classLoader: Option[ClassLoader])
-    : (Seq[TableFormatFactory[_]], Seq[TableFormatFactory[_]]) = {
-
-    val foundFactories = mutable.ArrayBuffer[TableFormatFactory[_]]()
-    val matchingFactories = mutable.ArrayBuffer[TableFormatFactory[_]]()
-
-    try {
-      val iter = classLoader match {
-        case Some(customClassLoader) =>
-          val customLoader = ServiceLoader.load(classOf[TableFormatFactory[_]], customClassLoader)
-          customLoader.iterator()
-        case None =>
-          defaultLoader.iterator()
-      }
-
-      while (iter.hasNext) {
-        val factory = iter.next()
-        foundFactories += factory
-
-        val requestedContext = normalizeContext(factory)
-
-        val plainContext = mutable.Map[String, String]()
-        plainContext ++= requestedContext
-        // we remove the version for now until we have the first backwards compatibility case
-        // with the version we can provide mappings in case the format changes
-        plainContext.remove(FORMAT_PROPERTY_VERSION)
-
-        // check if required context is met
-        if (plainContext.forall(e => properties.contains(e._1) && properties(e._1) == e._2)) {
-          matchingFactories += factory
-        }
-      }
-    } catch {
-      case e: ServiceConfigurationError =>
-        LOG.error("Could not load service provider for table format factories.", e)
-        throw new TableException("Could not load service provider for table format factories.", e)
-    }
-
-    if (matchingFactories.isEmpty) {
-      throw new NoMatchingTableFormatException(
-        "No context matches.",
-        factoryClass,
-        foundFactories,
-        properties)
-    }
-
-    (foundFactories, matchingFactories)
-  }
-
-  private def normalizeContext(factory: TableFormatFactory[_]): Map[String, String] = {
-    val requiredContextJava = factory.requiredContext()
-    if (requiredContextJava == null) {
-      throw new TableException(
-        s"Required context of format factory '${factory.getClass.getName}' must not be null.")
-    }
-    requiredContextJava.asScala.map(e => (e._1.toLowerCase, e._2)).toMap
-  }
-
-  private def filterByFactoryClass[T](
-      factoryClass: Class[T],
-      properties: Map[String, String],
-      foundFactories: Seq[TableFormatFactory[_]],
-      contextFactories: Seq[TableFormatFactory[_]])
-    : Seq[TableFormatFactory[_]] = {
-
-    val classFactories = contextFactories.filter(f => factoryClass.isAssignableFrom(f.getClass))
-    if (classFactories.isEmpty) {
-      throw new NoMatchingTableFormatException(
-        s"No factory implements '${factoryClass.getCanonicalName}'.",
-        factoryClass,
-        foundFactories,
-        properties)
-    }
-    classFactories
-  }
-
-  private def filterBySupportedProperties[T](
-      factoryClass: Class[T],
-      properties: Map[String, String],
-      foundFactories: Seq[TableFormatFactory[_]],
-      classFactories: Seq[TableFormatFactory[_]])
-    : T = {
-
-    val plainGivenKeys = mutable.ArrayBuffer[String]()
-    properties.keys.foreach { k =>
-      // replace arrays with wildcard
-      val key = k.replaceAll(".\\d+", ".#")
-      // ignore duplicates
-      if (!plainGivenKeys.contains(key)) {
-        plainGivenKeys += key
-      }
-    }
-    var lastKey: Option[String] = None
-    val supportedFactories = classFactories.filter { factory =>
-      val requiredContextKeys = normalizeContext(factory).keySet
-      val includeSchema = factory.supportsSchemaDerivation()
-      val supportedKeys = normalizeSupportedProperties(factory)
-      val givenKeys = plainGivenKeys
-        // ignore context keys
-        .filter(!requiredContextKeys.contains(_))
-        // ignore non-format (or schema) keys
-        .filter { k =>
-          if (includeSchema) {
-            k.startsWith(SchemaValidator.SCHEMA + ".") ||
-              k.startsWith(FormatDescriptorValidator.FORMAT + ".")
-          } else {
-            k.startsWith(FormatDescriptorValidator.FORMAT + ".")
-          }
-        }
-      givenKeys.forall { k =>
-        lastKey = Option(k)
-        supportedKeys.contains(k)
-      }
-    }
-
-    if (supportedFactories.isEmpty && classFactories.length == 1 && lastKey.isDefined) {
-      // special case: when there is only one matching factory but the last property key
-      // was incorrect
-      val factory = classFactories.head
-      val supportedKeys = normalizeSupportedProperties(factory)
-      throw new NoMatchingTableFormatException(
-        s"""
-          |The matching factory '${factory.getClass.getName}' doesn't support '${lastKey.get}'.
-          |
-          |Supported properties of this factory are:
-          |${supportedKeys.sorted.mkString("\n")}""".stripMargin,
-        factoryClass,
-        foundFactories,
-        properties)
-    } else if (supportedFactories.isEmpty) {
-      throw new NoMatchingTableFormatException(
-        s"No factory supports all properties.",
-        factoryClass,
-        foundFactories,
-        properties)
-    } else if (supportedFactories.length > 1) {
-      throw new AmbiguousTableFormatException(
-        supportedFactories,
-        factoryClass,
-        foundFactories,
-        properties)
-    }
-
-    supportedFactories.head.asInstanceOf[T]
-  }
-
-  private def normalizeSupportedProperties(factory: TableFormatFactory[_]): Seq[String] = {
-    val supportedPropertiesJava = factory.supportedProperties()
-    if (supportedPropertiesJava == null) {
-      throw new TableException(
-        s"Supported properties of format factory '${factory.getClass.getName}' must not be null.")
-    }
-    supportedPropertiesJava.asScala.map(_.toLowerCase)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactory.scala
index 8d0e8f9..5ecac27 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactory.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactory.scala
@@ -21,7 +21,7 @@ package org.apache.flink.table.sinks
 import java.util
 
 import org.apache.flink.table.api.TableException
-import org.apache.flink.table.connectors.{DiscoverableTableFactory, TableSinkFactory}
+import org.apache.flink.table.factories.{TableFactory, TableSinkFactory}
 import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
 import org.apache.flink.table.descriptors.CsvValidator._
 import org.apache.flink.table.descriptors.DescriptorProperties._
@@ -34,7 +34,7 @@ import org.apache.flink.types.Row
 /**
   * Factory for creating configured instances of [[CsvTableSink]].
   */
-class CsvTableSinkFactory extends TableSinkFactory[Row] with DiscoverableTableFactory {
+class CsvTableSinkFactory extends TableSinkFactory[Row] with TableFactory {
 
   override def requiredContext(): util.Map[String, String] = {
     val context = new util.HashMap[String, String]()

http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactory.scala
index c8e4503..c513669 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactory.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactory.scala
@@ -21,7 +21,7 @@ package org.apache.flink.table.sources
 import java.util
 
 import org.apache.flink.table.api.TableException
-import org.apache.flink.table.connectors.{DiscoverableTableFactory, TableSourceFactory}
+import org.apache.flink.table.factories.{TableFactory, TableSourceFactory}
 import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_PROPERTY_VERSION, CONNECTOR_TYPE}
 import org.apache.flink.table.descriptors.CsvValidator._
 import org.apache.flink.table.descriptors.DescriptorProperties.toScala
@@ -34,7 +34,7 @@ import org.apache.flink.types.Row
 /**
   * Factory for creating configured instances of [[CsvTableSource]].
   */
-class CsvTableSourceFactory extends TableSourceFactory[Row] with DiscoverableTableFactory {
+class CsvTableSourceFactory extends TableSourceFactory[Row] with TableFactory {
 
   override def requiredContext(): util.Map[String, String] = {
     val context = new util.HashMap[String, String]()

http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/test/resources/META-INF/services/org.apache.flink.table.connectors.DiscoverableTableFactory
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/resources/META-INF/services/org.apache.flink.table.connectors.DiscoverableTableFactory b/flink-libraries/flink-table/src/test/resources/META-INF/services/org.apache.flink.table.connectors.DiscoverableTableFactory
deleted file mode 100644
index ee8a444..0000000
--- a/flink-libraries/flink-table/src/test/resources/META-INF/services/org.apache.flink.table.connectors.DiscoverableTableFactory
+++ /dev/null
@@ -1,19 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-org.apache.flink.table.connectors.TestFixedFormatTableFactory
-org.apache.flink.table.connectors.TestWildcardFormatTableSourceFactory
-org.apache.flink.table.connectors.TestTableSinkFactory
-org.apache.flink.table.connectors.TestTableSourceFactory

http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-libraries/flink-table/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
new file mode 100644
index 0000000..c97fe8e
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.table.factories.utils.TestFixedFormatTableFactory
+org.apache.flink.table.factories.utils.TestWildcardFormatTableSourceFactory
+org.apache.flink.table.factories.utils.TestTableSinkFactory
+org.apache.flink.table.factories.utils.TestTableSourceFactory
+org.apache.flink.table.factories.utils.TestTableFormatFactory
+org.apache.flink.table.factories.utils.TestAmbiguousTableFormatFactory

http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/test/resources/META-INF/services/org.apache.flink.table.formats.TableFormatFactory
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/resources/META-INF/services/org.apache.flink.table.formats.TableFormatFactory b/flink-libraries/flink-table/src/test/resources/META-INF/services/org.apache.flink.table.formats.TableFormatFactory
deleted file mode 100644
index b5646a3..0000000
--- a/flink-libraries/flink-table/src/test/resources/META-INF/services/org.apache.flink.table.formats.TableFormatFactory
+++ /dev/null
@@ -1,17 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-org.apache.flink.table.formats.utils.TestTableFormatFactory
-org.apache.flink.table.formats.utils.TestAmbiguousTableFormatFactory

http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/connectors/TableSinkFactoryServiceTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/connectors/TableSinkFactoryServiceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/connectors/TableSinkFactoryServiceTest.scala
deleted file mode 100644
index 45d0af8..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/connectors/TableSinkFactoryServiceTest.scala
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.connectors
-
-import org.apache.flink.table.api.{NoMatchingTableFactoryException, ValidationException}
-import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
-import org.apache.flink.table.descriptors.FormatDescriptorValidator._
-import org.apache.flink.table.descriptors.TableDescriptorValidator
-import org.junit.Assert._
-import org.junit.Test
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-
-class TableSinkFactoryServiceTest {
-  @Test
-  def testValidProperties(): Unit = {
-    val props = properties()
-    assertTrue(TableFactoryService.find(classOf[TableSinkFactory[_]], props.toMap) != null)
-  }
-
-  @Test(expected = classOf[NoMatchingTableFactoryException])
-  def testInvalidContext(): Unit = {
-    val props = properties()
-    props.put(CONNECTOR_TYPE, "FAIL")
-    TableFactoryService.find(classOf[TableSinkFactory[_]], props.toMap)
-  }
-
-  @Test
-  def testDifferentContextVersion(): Unit = {
-    val props = properties()
-    props.put(CONNECTOR_PROPERTY_VERSION, "2")
-    // the table source should still be found
-    assertTrue(TableFactoryService.find(classOf[TableSinkFactory[_]], props.toMap) != null)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testUnsupportedProperty(): Unit = {
-    val props = properties()
-    props.put("format.path_new", "/new/path")
-    TableFactoryService.find(classOf[TableSinkFactory[_]], props.toMap)
-  }
-
-  @Test(expected = classOf[IllegalArgumentException])
-  def testFailingFactory(): Unit = {
-    val props = properties()
-    props.put("failing", "true")
-    TableFactoryService.find(classOf[TableSinkFactory[_]], props.toMap)
-      .asInstanceOf[TableSinkFactory[_]].createTableSink(props.asJava)
-  }
-
-  private def properties(): mutable.Map[String, String] = {
-    val properties = mutable.Map[String, String]()
-    properties.put(TableDescriptorValidator.TABLE_TYPE, "sink")
-    properties.put(CONNECTOR_TYPE, "test")
-    properties.put(FORMAT_TYPE, "test")
-    properties.put(CONNECTOR_PROPERTY_VERSION, "1")
-    properties.put(FORMAT_PROPERTY_VERSION, "1")
-    properties.put(FORMAT_PATH, "/path/to/target")
-    properties.put("schema.0.name", "a")
-    properties.put("schema.1.name", "b")
-    properties.put("schema.2.name", "c")
-    properties.put("schema.0.field.0.name", "a")
-    properties.put("schema.0.field.1.name", "b")
-    properties.put("schema.0.field.2.name", "c")
-    properties.put("failing", "false")
-    properties
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/connectors/TableSourceFactoryServiceTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/connectors/TableSourceFactoryServiceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/connectors/TableSourceFactoryServiceTest.scala
deleted file mode 100644
index b32d70a..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/connectors/TableSourceFactoryServiceTest.scala
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.connectors
-
-import org.apache.flink.table.api.{NoMatchingTableFactoryException, ValidationException}
-import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_PROPERTY_VERSION, CONNECTOR_TYPE}
-import org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_PROPERTY_VERSION, FORMAT_TYPE}
-import org.apache.flink.table.descriptors.TableDescriptorValidator
-import org.apache.flink.table.sources.TestWildcardFormatTableSourceFactory
-import org.junit.Assert.assertTrue
-import org.junit.Test
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-
-class TableSourceFactoryServiceTest {
-
-  @Test
-  def testValidProperties(): Unit = {
-    val props = properties()
-    props.put(CONNECTOR_TYPE, "fixed")
-    props.put(FORMAT_TYPE, "test")
-    assertTrue(TableFactoryService.find(classOf[TableSourceFactory[_]], props.toMap) != null)
-  }
-
-  @Test(expected = classOf[NoMatchingTableFactoryException])
-  def testInvalidContext(): Unit = {
-    val props = properties()
-    props.put(CONNECTOR_TYPE, "FAIL")
-    props.put(FORMAT_TYPE, "test")
-    TableFactoryService.find(classOf[TableSourceFactory[_]], props.toMap)
-  }
-
-  @Test
-  def testDifferentContextVersion(): Unit = {
-    val props = properties()
-    props.put(CONNECTOR_TYPE, "fixed")
-    props.put(FORMAT_TYPE, "test")
-    props.put(CONNECTOR_PROPERTY_VERSION, "2")
-    // the table source should still be found
-    assertTrue(TableFactoryService.find(classOf[TableSourceFactory[_]], props.toMap) != null)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testUnsupportedProperty(): Unit = {
-    val props = properties()
-    props.put(CONNECTOR_TYPE, "fixed")
-    props.put(FORMAT_TYPE, "test")
-    props.put("format.path_new", "/new/path")
-    TableFactoryService.find(classOf[TableSourceFactory[_]], props.toMap)
-  }
-
-  @Test(expected = classOf[IllegalArgumentException])
-  def testFailingFactory(): Unit = {
-    val props = properties()
-    props.put(CONNECTOR_TYPE, "fixed")
-    props.put(FORMAT_TYPE, "test")
-    props.put("failing", "true")
-    TableFactoryService.find(classOf[TableSourceFactory[_]], props.toMap)
-      .asInstanceOf[TableSourceFactory[_]]
-      .createTableSource(props.asJava)
-  }
-
-  @Test
-  def testWildcardFormat(): Unit = {
-    val props = properties()
-    props.put(CONNECTOR_TYPE, "wildcard")
-    props.put(FORMAT_TYPE, "test")
-    props.put("format.type", "not-test")
-    props.put("format.not-test-property", "wildcard-property")
-    val actualTableSource = TableFactoryService.find(classOf[TableSourceFactory[_]], props.toMap)
-    assertTrue(actualTableSource.isInstanceOf[TestWildcardFormatTableSourceFactory])
-  }
-
-  private def properties(): mutable.Map[String, String] = {
-    val properties = mutable.Map[String, String]()
-    properties.put(
-      TableDescriptorValidator.TABLE_TYPE,
-      TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE)
-    properties.put(CONNECTOR_TYPE, "test")
-    properties.put(FORMAT_TYPE, "test")
-    properties.put(CONNECTOR_PROPERTY_VERSION, "1")
-    properties.put(FORMAT_PROPERTY_VERSION, "1")
-    properties.put("format.path", "/path/to/target")
-    properties.put("schema.0.name", "a")
-    properties.put("schema.1.name", "b")
-    properties.put("schema.2.name", "c")
-    properties.put("schema.0.field.0.name", "a")
-    properties.put("schema.0.field.1.name", "b")
-    properties.put("schema.0.field.2.name", "c")
-    properties.put("failing", "false")
-    properties
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/connectors/TestFixedFormatTableFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/connectors/TestFixedFormatTableFactory.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/connectors/TestFixedFormatTableFactory.scala
deleted file mode 100644
index 7001d2d..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/connectors/TestFixedFormatTableFactory.scala
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.connectors
-
-import java.util
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.api.TableSchema
-import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_PROPERTY_VERSION, CONNECTOR_TYPE}
-import org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_PROPERTY_VERSION, FORMAT_TYPE}
-import org.apache.flink.table.sources.TableSource
-import org.apache.flink.types.Row
-
-/**
-  * Table source factory for testing with a fixed format.
-  */
-class TestFixedFormatTableFactory extends TableSourceFactory[Row] with DiscoverableTableFactory {
-
-  override def requiredContext(): util.Map[String, String] = {
-    val context = new util.HashMap[String, String]()
-    context.put(CONNECTOR_TYPE, "fixed")
-    context.put(FORMAT_TYPE, "test")
-    context.put(CONNECTOR_PROPERTY_VERSION, "1")
-    context.put(FORMAT_PROPERTY_VERSION, "1")
-    context
-  }
-
-  override def supportedProperties(): util.List[String] = {
-    val properties = new util.ArrayList[String]()
-    properties.add("format.path")
-    properties.add("schema.#.name")
-    properties.add("schema.#.field.#.name")
-    properties.add("failing")
-    properties
-  }
-
-  override def createTableSource(properties: util.Map[String, String]): TableSource[Row] = {
-    if (properties.get("failing") == "true") {
-      throw new IllegalArgumentException("Error in this factory.")
-    }
-    new TableSource[Row] {
-      override def getTableSchema: TableSchema = throw new UnsupportedOperationException()
-
-      override def getReturnType: TypeInformation[Row] = throw new UnsupportedOperationException()
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/connectors/TestTableSinkFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/connectors/TestTableSinkFactory.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/connectors/TestTableSinkFactory.scala
deleted file mode 100644
index 2dfcdb7..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/connectors/TestTableSinkFactory.scala
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.connectors
-
-import java.util
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.connectors.TestTableSinkFactory.FORMAT_PATH
-import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
-import org.apache.flink.table.descriptors.FormatDescriptorValidator._
-import org.apache.flink.table.sinks.TableSink
-import org.apache.flink.types.Row
-
-class TestTableSinkFactory extends TableSinkFactory[Row] with DiscoverableTableFactory {
-
-  override def requiredContext(): util.Map[String, String] = {
-    val context = new util.HashMap[String, String]()
-    context.put(CONNECTOR_TYPE, "test")
-    context.put(FORMAT_TYPE, "test")
-    context.put(CONNECTOR_PROPERTY_VERSION, "1")
-    context.put(FORMAT_PROPERTY_VERSION, "1")
-    context
-  }
-
-  override def supportedProperties(): util.List[String] = {
-    val properties = new util.ArrayList[String]()
-    // connector
-    properties.add(FORMAT_PATH)
-    properties.add("schema.#.name")
-    properties.add("schema.#.field.#.name")
-    properties.add("failing")
-    properties
-  }
-
-  override def createTableSink(properties: util.Map[String, String]): TableSink[Row] = {
-    if (properties.get("failing") == "true") {
-      throw new IllegalArgumentException("Error in this factory.")
-    }
-    new TableSink[Row] {
-      override def getOutputType: TypeInformation[Row] = throw new UnsupportedOperationException()
-
-      override def getFieldNames: Array[String] = throw new UnsupportedOperationException()
-
-      override def getFieldTypes: Array[TypeInformation[_]] =
-        throw new UnsupportedOperationException()
-
-      override def configure(fieldNames: Array[String],
-                             fieldTypes: Array[TypeInformation[_]]): TableSink[Row] =
-        throw new UnsupportedOperationException()
-    }
-  }
-}
-
-object TestTableSinkFactory {
-  val CONNECTOR_TYPE_VALUE_TEST = "test"
-  val FORMAT_TYPE_VALUE_TEST = "test"
-  val FORMAT_PATH = "format.path"
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/connectors/TestTableSourceFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/connectors/TestTableSourceFactory.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/connectors/TestTableSourceFactory.scala
deleted file mode 100644
index 345f47e..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/connectors/TestTableSourceFactory.scala
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.connectors
-
-import java.util
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.api.TableSchema
-import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_PROPERTY_VERSION, CONNECTOR_TYPE}
-import org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_PROPERTY_VERSION, FORMAT_TYPE}
-import org.apache.flink.table.sources.TableSource
-import org.apache.flink.types.Row
-
-/**
-  * Table source factory for testing.
-  */
-class TestTableSourceFactory extends TableSourceFactory[Row] with DiscoverableTableFactory {
-
-  override def requiredContext(): util.Map[String, String] = {
-    val context = new util.HashMap[String, String]()
-    context.put(CONNECTOR_TYPE, "test")
-    context.put(FORMAT_TYPE, "test")
-    context.put(CONNECTOR_PROPERTY_VERSION, "1")
-    context.put(FORMAT_PROPERTY_VERSION, "1")
-    context
-  }
-
-  override def supportedProperties(): util.List[String] = {
-    val properties = new util.ArrayList[String]()
-    // connector
-    properties.add("format.path")
-    properties.add("schema.#.name")
-    properties.add("schema.#.field.#.name")
-    properties.add("failing")
-    properties
-  }
-
-  override def createTableSource(properties: util.Map[String, String]): TableSource[Row] = {
-    if (properties.get("failing") == "true") {
-      throw new IllegalArgumentException("Error in this factory.")
-    }
-    new TableSource[Row] {
-      override def getTableSchema: TableSchema = throw new UnsupportedOperationException()
-
-      override def getReturnType: TypeInformation[Row] = throw new UnsupportedOperationException()
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/connectors/TestWildcardFormatTableSourceFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/connectors/TestWildcardFormatTableSourceFactory.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/connectors/TestWildcardFormatTableSourceFactory.scala
deleted file mode 100644
index c0bab21..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/connectors/TestWildcardFormatTableSourceFactory.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.connectors
-
-import java.util
-
-import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_PROPERTY_VERSION, CONNECTOR_TYPE}
-import org.apache.flink.table.sources.TableSource
-import org.apache.flink.types.Row
-
-/**
-  * Table source factory for testing with a wildcard format ("format.*").
-  */
-class TestWildcardFormatTableSourceFactory
-  extends TableSourceFactory[Row]
-  with DiscoverableTableFactory  {
-
-  override def requiredContext(): util.Map[String, String] = {
-    val context = new util.HashMap[String, String]()
-    context.put(CONNECTOR_TYPE, "wildcard")
-    context.put(CONNECTOR_PROPERTY_VERSION, "1")
-    context
-  }
-
-  override def supportedProperties(): util.List[String] = {
-    val properties = new util.ArrayList[String]()
-    properties.add("format.*")
-    properties.add("schema.#.name")
-    properties.add("schema.#.field.#.name")
-    properties.add("failing")
-    properties
-  }
-
-  override def createTableSource(properties: util.Map[String, String]): TableSource[Row] = {
-    throw new UnsupportedOperationException()
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/TableFormatFactoryServiceTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/TableFormatFactoryServiceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/TableFormatFactoryServiceTest.scala
new file mode 100644
index 0000000..ab5a051
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/TableFormatFactoryServiceTest.scala
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories
+
+import java.util.{HashMap => JHashMap, Map => JMap}
+
+import org.apache.flink.table.api.{AmbiguousTableFactoryException, NoMatchingTableFactoryException}
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_PROPERTY_VERSION, CONNECTOR_TYPE}
+import org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_PROPERTY_VERSION, FORMAT_TYPE}
+import org.apache.flink.table.factories.TableFormatFactoryServiceTest._
+import org.apache.flink.table.factories.utils.{TestAmbiguousTableFormatFactory, TestTableFormatFactory}
+import org.junit.Assert.assertTrue
+import org.junit.Test
+
+/**
+  * Tests for testing format discovery using [[TableFactoryService]]. The tests assume the two
+  * format factories [[TestTableFormatFactory]] and [[TestAmbiguousTableFormatFactory]] are
+  * registered.
+  *
+  * The first format does not support SPECIAL_PATH but supports schema derivation. The
+  * latter format does not support UNIQUE_PROPERTY nor schema derivation. Both formats
+  * have the same context and support COMMON_PATH.
+  */
+class TableFormatFactoryServiceTest {
+
+  @Test
+  def testValidProperties(): Unit = {
+    val props = properties()
+    assertTrue(TableFactoryService.find(classOf[TableFormatFactory[_]], props)
+      .isInstanceOf[TestTableFormatFactory])
+  }
+
+  @Test
+  def testDifferentContextVersion(): Unit = {
+    val props = properties()
+    props.put(FORMAT_PROPERTY_VERSION, "2")
+    // for now we support any property version, the property version should not affect the
+    // discovery at the moment and thus the format should still be found
+    val foundFactory = TableFactoryService.find(classOf[TableFormatFactory[_]], props)
+    assertTrue(foundFactory.isInstanceOf[TestTableFormatFactory])
+  }
+
+  @Test
+  def testAmbiguousMoreSupportSelection(): Unit = {
+    val props = properties()
+    props.remove(UNIQUE_PROPERTY) // both formats match now
+    props.put(SPECIAL_PATH, "/what/ever") // now only TestAmbiguousTableFormatFactory
+    assertTrue(
+      TableFactoryService
+        .find(classOf[TableFormatFactory[_]], props)
+        .isInstanceOf[TestAmbiguousTableFormatFactory])
+  }
+
+  @Test
+  def testAmbiguousClassBasedSelection(): Unit = {
+    val props = properties()
+    props.remove(UNIQUE_PROPERTY) // both formats match now
+    assertTrue(
+      TableFactoryService
+        // we are looking for a particular class
+        .find(classOf[TestAmbiguousTableFormatFactory], props)
+        .isInstanceOf[TestAmbiguousTableFormatFactory])
+  }
+
+  @Test
+  def testAmbiguousSchemaBasedSelection(): Unit = {
+    val props = properties()
+    props.remove(UNIQUE_PROPERTY) // both formats match now
+    // this is unknown to the schema derivation factory
+    props.put("schema.unknown-schema-field", "unknown")
+
+    // the format with schema derivation feels not responsible because of this field,
+    // but since there is another format that feels responsible, no exception is thrown.
+    assertTrue(
+      TableFactoryService
+        .find(classOf[TableFormatFactory[_]], props)
+        .isInstanceOf[TestAmbiguousTableFormatFactory])
+  }
+
+  @Test(expected = classOf[NoMatchingTableFactoryException])
+  def testMissingClass(): Unit = {
+    val props = properties()
+    // this class is not a valid factory
+    TableFactoryService.find(classOf[TableFormatFactoryServiceTest], props)
+  }
+
+  @Test(expected = classOf[NoMatchingTableFactoryException])
+  def testInvalidContext(): Unit = {
+    val props = properties()
+    // no context specifies this
+    props.put(FORMAT_TYPE, "unknown_format_type")
+    TableFactoryService.find(classOf[TableFormatFactory[_]], props)
+  }
+
+  @Test(expected = classOf[NoMatchingTableFactoryException])
+  def testUnsupportedProperty(): Unit = {
+    val props = properties()
+    props.put("format.property_not_defined_by_any_factory", "/new/path")
+    TableFactoryService.find(classOf[TableFormatFactory[_]], props)
+  }
+
+  @Test(expected = classOf[AmbiguousTableFactoryException])
+  def testAmbiguousFactory(): Unit = {
+    val props = properties()
+    props.remove(UNIQUE_PROPERTY) // now both factories match
+    TableFactoryService.find(classOf[TableFormatFactory[_]], props)
+  }
+
+  private def properties(): JMap[String, String] = {
+    val properties = new JHashMap[String, String]()
+    properties.put(CONNECTOR_TYPE, "test")
+    properties.put(FORMAT_TYPE, TEST_FORMAT_TYPE)
+    properties.put(UNIQUE_PROPERTY, "true")
+    properties.put(CONNECTOR_PROPERTY_VERSION, "1")
+    properties.put(FORMAT_PROPERTY_VERSION, "1")
+    properties.put(COMMON_PATH, "/path/to/target")
+    properties.put("schema.0.name", "a")
+    properties.put("schema.1.name", "b")
+    properties.put("schema.2.name", "c")
+    properties
+  }
+}
+
+object TableFormatFactoryServiceTest {
+
+  val TEST_FORMAT_TYPE = "test-format"
+  val COMMON_PATH = "format.common-path"
+  val SPECIAL_PATH = "format.special-path"
+  val UNIQUE_PROPERTY = "format.unique-property"
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/TableSinkFactoryServiceTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/TableSinkFactoryServiceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/TableSinkFactoryServiceTest.scala
new file mode 100644
index 0000000..d0cd1e0
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/TableSinkFactoryServiceTest.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories
+
+import java.util.{HashMap => JHashMap, Map => JMap}
+
+import org.apache.flink.table.api.NoMatchingTableFactoryException
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
+import org.apache.flink.table.descriptors.FormatDescriptorValidator._
+import org.apache.flink.table.factories.utils.TestTableSinkFactory
+import org.apache.flink.table.factories.utils.TestTableSinkFactory._
+import org.junit.Assert._
+import org.junit.Test
+
+/**
+  * Tests for testing table sink discovery using [[TableFactoryService]]. The tests assume the
+  * table sink factory [[TestTableSinkFactory]] is registered.
+  */
+class TableSinkFactoryServiceTest {
+
+  @Test
+  def testValidProperties(): Unit = {
+    val props = properties()
+    assertTrue(TableFactoryService.find(classOf[TableSinkFactory[_]], props)
+      .isInstanceOf[TestTableSinkFactory])
+  }
+
+  @Test(expected = classOf[NoMatchingTableFactoryException])
+  def testInvalidContext(): Unit = {
+    val props = properties()
+    props.put(CONNECTOR_TYPE, "unknown-connector-type")
+    TableFactoryService.find(classOf[TableSinkFactory[_]], props)
+  }
+
+  @Test
+  def testDifferentContextVersion(): Unit = {
+    val props = properties()
+    props.put(CONNECTOR_PROPERTY_VERSION, "2")
+    // the table source should still be found
+    assertTrue(TableFactoryService.find(classOf[TableSinkFactory[_]], props)
+      .isInstanceOf[TestTableSinkFactory])
+  }
+
+  @Test(expected = classOf[NoMatchingTableFactoryException])
+  def testUnsupportedProperty(): Unit = {
+    val props = properties()
+    props.put("format.path_new", "/new/path")
+    TableFactoryService.find(classOf[TableSinkFactory[_]], props)
+  }
+
+  private def properties(): JMap[String, String] = {
+    val properties = new JHashMap[String, String]()
+    properties.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE_TEST)
+    properties.put(FORMAT_TYPE, FORMAT_TYPE_VALUE_TEST)
+    properties.put(CONNECTOR_PROPERTY_VERSION, "1")
+    properties.put(FORMAT_PROPERTY_VERSION, "1")
+    properties.put(FORMAT_PATH, "/path/to/target")
+    properties.put("schema.0.name", "a")
+    properties.put("schema.1.name", "b")
+    properties.put("schema.2.name", "c")
+    properties.put("schema.0.field.0.name", "a")
+    properties.put("schema.0.field.1.name", "b")
+    properties.put("schema.0.field.2.name", "c")
+    properties.put("failing", "false")
+    properties
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/TableSourceFactoryServiceTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/TableSourceFactoryServiceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/TableSourceFactoryServiceTest.scala
new file mode 100644
index 0000000..16f1853
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/TableSourceFactoryServiceTest.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories
+
+import java.util.{HashMap => JHashMap, Map => JMap}
+
+import org.apache.flink.table.api.NoMatchingTableFactoryException
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_PROPERTY_VERSION, CONNECTOR_TYPE}
+import org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_PROPERTY_VERSION, FORMAT_TYPE}
+import org.apache.flink.table.factories.utils.TestFixedFormatTableFactory.{CONNECTOR_TYPE_VALUE_FIXED, FORMAT_TYPE_VALUE_TEST}
+import org.apache.flink.table.factories.utils.TestWildcardFormatTableSourceFactory.CONNECTOR_TYPE_VALUE_WILDCARD
+import org.apache.flink.table.factories.utils.{TestFixedFormatTableFactory, TestWildcardFormatTableSourceFactory}
+import org.junit.Assert.assertTrue
+import org.junit.Test
+
+/**
+  * Tests for testing table source discovery using [[TableFactoryService]]. The tests assume the
+  * two table source factories [[TestFixedFormatTableFactory]] and
+  * [[TestWildcardFormatTableSourceFactory]] are registered.
+  *
+  * The first table source has a [[FORMAT_TYPE_VALUE_TEST]] type where as the second source uses
+  * a wildcard to match arbitrary formats.
+  */
+class TableSourceFactoryServiceTest {
+
+  @Test
+  def testValidProperties(): Unit = {
+    val props = properties()
+    props.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE_FIXED)
+    props.put(FORMAT_TYPE, FORMAT_TYPE_VALUE_TEST)
+    assertTrue(TableFactoryService.find(classOf[TableSourceFactory[_]], props)
+      .isInstanceOf[TestFixedFormatTableFactory])
+  }
+
+  @Test(expected = classOf[NoMatchingTableFactoryException])
+  def testInvalidContext(): Unit = {
+    val props = properties()
+    props.put(CONNECTOR_TYPE, "unknown-connector-type")
+    props.put(FORMAT_TYPE, FORMAT_TYPE_VALUE_TEST)
+    TableFactoryService.find(classOf[TableSourceFactory[_]], props)
+  }
+
+  @Test
+  def testDifferentContextVersion(): Unit = {
+    val props = properties()
+    props.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE_FIXED)
+    props.put(FORMAT_TYPE, FORMAT_TYPE_VALUE_TEST)
+    props.put(CONNECTOR_PROPERTY_VERSION, "2")
+    // the table source should still be found
+    assertTrue(TableFactoryService.find(classOf[TableSourceFactory[_]], props)
+      .isInstanceOf[TestFixedFormatTableFactory])
+  }
+
+  @Test(expected = classOf[NoMatchingTableFactoryException])
+  def testUnsupportedProperty(): Unit = {
+    val props = properties()
+    props.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE_FIXED)
+    props.put(FORMAT_TYPE, FORMAT_TYPE_VALUE_TEST)
+    props.put("format.unknown-format-type-property", "/new/path")
+    TableFactoryService.find(classOf[TableSourceFactory[_]], props)
+  }
+
+  @Test
+  def testWildcardFormat(): Unit = {
+    val props = properties()
+    props.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE_WILDCARD)
+    props.put("format.unknown-format-type-property", "wildcard-property")
+    val actualTableSource = TableFactoryService.find(classOf[TableSourceFactory[_]], props)
+    assertTrue(actualTableSource.isInstanceOf[TestWildcardFormatTableSourceFactory])
+  }
+
+  private def properties(): JMap[String, String] = {
+    val properties = new JHashMap[String, String]()
+    properties.put(CONNECTOR_PROPERTY_VERSION, "1")
+    properties.put(FORMAT_PROPERTY_VERSION, "1")
+    properties.put("format.path", "/path/to/target")
+    properties.put("schema.0.name", "a")
+    properties.put("schema.1.name", "b")
+    properties.put("schema.2.name", "c")
+    properties.put("schema.0.field.0.name", "a")
+    properties.put("schema.0.field.1.name", "b")
+    properties.put("schema.0.field.2.name", "c")
+    properties
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestAmbiguousTableFormatFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestAmbiguousTableFormatFactory.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestAmbiguousTableFormatFactory.scala
new file mode 100644
index 0000000..f8c9078
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestAmbiguousTableFormatFactory.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories.utils
+
+import java.util
+
+import org.apache.flink.table.descriptors.FormatDescriptorValidator
+import org.apache.flink.table.factories.{TableFormatFactory, TableFormatFactoryServiceTest}
+import org.apache.flink.types.Row
+
+/**
+  * Table format factory for testing.
+  *
+  * It does not support UNIQUE_PROPERTY compared to [[TestTableFormatFactory]] nor
+  * schema derivation. Both formats have the same context and support COMMON_PATH.
+  */
+class TestAmbiguousTableFormatFactory extends TableFormatFactory[Row] {
+
+  override def requiredContext(): util.Map[String, String] = {
+    val context = new util.HashMap[String, String]()
+    context.put(
+      FormatDescriptorValidator.FORMAT_TYPE,
+      TableFormatFactoryServiceTest.TEST_FORMAT_TYPE)
+    context.put(FormatDescriptorValidator.FORMAT_PROPERTY_VERSION, "1")
+    context
+  }
+
+  override def supportsSchemaDerivation(): Boolean = false // no schema derivation
+
+  override def supportedProperties(): util.List[String] = {
+    val properties = new util.ArrayList[String]()
+    properties.add(TableFormatFactoryServiceTest.COMMON_PATH)
+    properties.add(TableFormatFactoryServiceTest.SPECIAL_PATH)
+    properties
+  }
+}