You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/07/15 09:24:47 UTC

[02/13] flink git commit: [FLINK-8866] [table] Merge table source/sink/format factories

http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestDeserializationSchema.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestDeserializationSchema.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestDeserializationSchema.scala
new file mode 100644
index 0000000..dd723e2
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestDeserializationSchema.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories.utils
+
+import org.apache.flink.api.common.serialization.DeserializationSchema
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.types.Row
+
+/**
+  * Deserialization schema for testing purposes.
+  */
+class TestDeserializationSchema(val typeInfo: TypeInformation[Row])
+  extends DeserializationSchema[Row] {
+
+  override def deserialize(message: Array[Byte]): Row = throw new UnsupportedOperationException()
+
+  override def isEndOfStream(nextElement: Row): Boolean = throw new UnsupportedOperationException()
+
+  override def getProducedType: TypeInformation[Row] = typeInfo
+
+  def canEqual(other: Any): Boolean = other.isInstanceOf[TestDeserializationSchema]
+
+  override def equals(other: Any): Boolean = other match {
+    case that: TestDeserializationSchema =>
+      (that canEqual this) &&
+        typeInfo == that.typeInfo
+    case _ => false
+  }
+
+  override def hashCode(): Int = {
+    val state = Seq(typeInfo)
+    state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestFixedFormatTableFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestFixedFormatTableFactory.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestFixedFormatTableFactory.scala
new file mode 100644
index 0000000..07692be
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestFixedFormatTableFactory.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories.utils
+
+import java.util
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableSchema
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_PROPERTY_VERSION, CONNECTOR_TYPE}
+import org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_PROPERTY_VERSION, FORMAT_TYPE}
+import org.apache.flink.table.factories.TableSourceFactory
+import org.apache.flink.table.factories.utils.TestFixedFormatTableFactory._
+import org.apache.flink.table.sources.TableSource
+import org.apache.flink.types.Row
+
+/**
+  * Table source factory for testing with a fixed format.
+  */
+class TestFixedFormatTableFactory extends TableSourceFactory[Row] {
+
+  override def requiredContext(): util.Map[String, String] = {
+    val context = new util.HashMap[String, String]()
+    context.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE_FIXED)
+    context.put(FORMAT_TYPE, FORMAT_TYPE_VALUE_TEST)
+    context.put(CONNECTOR_PROPERTY_VERSION, "1")
+    context.put(FORMAT_PROPERTY_VERSION, "1")
+    context
+  }
+
+  override def supportedProperties(): util.List[String] = {
+    val properties = new util.ArrayList[String]()
+    properties.add("format.path")
+    properties.add("schema.#.name")
+    properties.add("schema.#.field.#.name")
+    properties
+  }
+
+  override def createTableSource(properties: util.Map[String, String]): TableSource[Row] = {
+    new TableSource[Row] {
+      override def getTableSchema: TableSchema = throw new UnsupportedOperationException()
+
+      override def getReturnType: TypeInformation[Row] = throw new UnsupportedOperationException()
+    }
+  }
+}
+
+object TestFixedFormatTableFactory {
+  val CONNECTOR_TYPE_VALUE_FIXED = "fixed"
+  val FORMAT_TYPE_VALUE_TEST = "test"
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestSerializationSchema.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestSerializationSchema.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestSerializationSchema.scala
new file mode 100644
index 0000000..ab613a9
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestSerializationSchema.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories.utils
+
+import org.apache.flink.api.common.serialization.SerializationSchema
+import org.apache.flink.types.Row
+
+/**
+  * Serialization schema for testing purposes.
+  */
+class TestSerializationSchema extends SerializationSchema[Row] {
+
+  override def serialize(element: Row): Array[Byte] = throw new UnsupportedOperationException()
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestTableFormat.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestTableFormat.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestTableFormat.scala
new file mode 100644
index 0000000..5e26995
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestTableFormat.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories.utils
+
+import org.apache.flink.table.descriptors.{DescriptorProperties, FormatDescriptor}
+
+/**
+  * Format descriptor for testing purposes.
+  */
+class TestTableFormat extends FormatDescriptor("test-format", 1) {
+
+  override protected def addFormatProperties(properties: DescriptorProperties): Unit = {
+    properties.putString("format.important", "this is important")
+    properties.putString("format.path", "/path/to/sth")
+    properties.putString("format.derive-schema", "true")
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestTableFormatFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestTableFormatFactory.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestTableFormatFactory.scala
new file mode 100644
index 0000000..475cff9
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestTableFormatFactory.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories.utils
+
+import java.util
+
+import org.apache.flink.api.common.serialization.DeserializationSchema
+import org.apache.flink.table.descriptors.{DescriptorProperties, FormatDescriptorValidator, SchemaValidator}
+import org.apache.flink.table.factories.{DeserializationSchemaFactory, TableFormatFactoryServiceTest}
+import org.apache.flink.types.Row
+
+/**
+  * Table format factory for testing.
+  *
+  * It has the same context as [[TestAmbiguousTableFormatFactory]] and both support COMMON_PATH.
+  * This format does not support SPECIAL_PATH but supports schema derivation.
+  */
+class TestTableFormatFactory extends DeserializationSchemaFactory[Row] {
+
+  override def requiredContext(): util.Map[String, String] = {
+    val context = new util.HashMap[String, String]()
+    context.put(
+      FormatDescriptorValidator.FORMAT_TYPE,
+      TableFormatFactoryServiceTest.TEST_FORMAT_TYPE)
+    context.put(FormatDescriptorValidator.FORMAT_PROPERTY_VERSION, "1")
+    context
+  }
+
+  override def supportsSchemaDerivation(): Boolean = true
+
+  override def supportedProperties(): util.List[String] = {
+    val properties = new util.ArrayList[String]()
+    properties.add(TableFormatFactoryServiceTest.UNIQUE_PROPERTY)
+    properties.add(TableFormatFactoryServiceTest.COMMON_PATH)
+    properties.add(FormatDescriptorValidator.FORMAT_DERIVE_SCHEMA)
+    properties.addAll(SchemaValidator.getSchemaDerivationKeys)
+    properties
+  }
+
+  override def createDeserializationSchema(
+      properties: util.Map[String, String])
+    : DeserializationSchema[Row] = {
+
+    val props = new DescriptorProperties(true)
+    props.putProperties(properties)
+    val schema = SchemaValidator.deriveFormatFields(props)
+    new TestDeserializationSchema(schema.toRowType)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestTableSinkFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestTableSinkFactory.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestTableSinkFactory.scala
new file mode 100644
index 0000000..bbffd31
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestTableSinkFactory.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories.utils
+
+import java.util
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
+import org.apache.flink.table.descriptors.FormatDescriptorValidator._
+import org.apache.flink.table.factories.utils.TestTableSinkFactory._
+import org.apache.flink.table.factories.{TableFactory, TableSinkFactory}
+import org.apache.flink.table.sinks.TableSink
+import org.apache.flink.types.Row
+
+/**
+  * Test table sink factory.
+  */
+class TestTableSinkFactory extends TableSinkFactory[Row] with TableFactory {
+
+  override def requiredContext(): util.Map[String, String] = {
+    val context = new util.HashMap[String, String]()
+    context.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE_TEST)
+    context.put(FORMAT_TYPE, FORMAT_TYPE_VALUE_TEST)
+    context.put(CONNECTOR_PROPERTY_VERSION, "1")
+    context.put(FORMAT_PROPERTY_VERSION, "1")
+    context
+  }
+
+  override def supportedProperties(): util.List[String] = {
+    val properties = new util.ArrayList[String]()
+    // connector
+    properties.add(FORMAT_PATH)
+    properties.add("schema.#.name")
+    properties.add("schema.#.field.#.name")
+    properties.add("failing")
+    properties
+  }
+
+  override def createTableSink(properties: util.Map[String, String]): TableSink[Row] = {
+    if (properties.get("failing") == "true") {
+      throw new IllegalArgumentException("Error in this factory.")
+    }
+    new TableSink[Row] {
+      override def getOutputType: TypeInformation[Row] = throw new UnsupportedOperationException()
+
+      override def getFieldNames: Array[String] = throw new UnsupportedOperationException()
+
+      override def getFieldTypes: Array[TypeInformation[_]] =
+        throw new UnsupportedOperationException()
+
+      override def configure(fieldNames: Array[String],
+                             fieldTypes: Array[TypeInformation[_]]): TableSink[Row] =
+        throw new UnsupportedOperationException()
+    }
+  }
+}
+
+object TestTableSinkFactory {
+  val CONNECTOR_TYPE_VALUE_TEST = "test"
+  val FORMAT_TYPE_VALUE_TEST = "test"
+  val FORMAT_PATH = "format.path"
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestTableSourceFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestTableSourceFactory.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestTableSourceFactory.scala
new file mode 100644
index 0000000..ff3b24a
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestTableSourceFactory.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories.utils
+
+import java.util
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableSchema
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_PROPERTY_VERSION, CONNECTOR_TYPE}
+import org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_PROPERTY_VERSION, FORMAT_TYPE}
+import org.apache.flink.table.factories.utils.TestTableSinkFactory.{CONNECTOR_TYPE_VALUE_TEST, FORMAT_TYPE_VALUE_TEST}
+import org.apache.flink.table.factories.{TableFactory, TableSourceFactory}
+import org.apache.flink.table.sources.TableSource
+import org.apache.flink.types.Row
+
+/**
+  * Table source factory for testing.
+  */
+class TestTableSourceFactory extends TableSourceFactory[Row] with TableFactory {
+
+  override def requiredContext(): util.Map[String, String] = {
+    val context = new util.HashMap[String, String]()
+    context.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE_TEST)
+    context.put(FORMAT_TYPE, FORMAT_TYPE_VALUE_TEST)
+    context.put(CONNECTOR_PROPERTY_VERSION, "1")
+    context.put(FORMAT_PROPERTY_VERSION, "1")
+    context
+  }
+
+  override def supportedProperties(): util.List[String] = {
+    val properties = new util.ArrayList[String]()
+    // connector
+    properties.add("format.path")
+    properties.add("schema.#.name")
+    properties.add("schema.#.field.#.name")
+    properties.add("failing")
+    properties
+  }
+
+  override def createTableSource(properties: util.Map[String, String]): TableSource[Row] = {
+    if (properties.get("failing") == "true") {
+      throw new IllegalArgumentException("Error in this factory.")
+    }
+    new TableSource[Row] {
+      override def getTableSchema: TableSchema = throw new UnsupportedOperationException()
+
+      override def getReturnType: TypeInformation[Row] = throw new UnsupportedOperationException()
+    }
+  }
+}
+
+object TestTableSourceFactory {
+  val CONNECTOR_TYPE_VALUE_TEST = "test"
+  val FORMAT_TYPE_VALUE_TEST = "test"
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestWildcardFormatTableSourceFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestWildcardFormatTableSourceFactory.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestWildcardFormatTableSourceFactory.scala
new file mode 100644
index 0000000..5463422
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestWildcardFormatTableSourceFactory.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories.utils
+
+import java.util
+
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_PROPERTY_VERSION, CONNECTOR_TYPE}
+import org.apache.flink.table.factories.TableSourceFactory
+import org.apache.flink.table.factories.utils.TestWildcardFormatTableSourceFactory.CONNECTOR_TYPE_VALUE_WILDCARD
+import org.apache.flink.table.sources.TableSource
+import org.apache.flink.types.Row
+
+/**
+  * Table source factory for testing with a wildcard format ("format.*").
+  */
+class TestWildcardFormatTableSourceFactory
+  extends TableSourceFactory[Row] {
+
+  override def requiredContext(): util.Map[String, String] = {
+    val context = new util.HashMap[String, String]()
+    context.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE_WILDCARD)
+    context.put(CONNECTOR_PROPERTY_VERSION, "1")
+    context
+  }
+
+  override def supportedProperties(): util.List[String] = {
+    val properties = new util.ArrayList[String]()
+    properties.add("format.*")
+    properties.add("schema.#.name")
+    properties.add("schema.#.field.#.name")
+    properties
+  }
+
+  override def createTableSource(properties: util.Map[String, String]): TableSource[Row] = {
+    throw new UnsupportedOperationException()
+  }
+}
+
+object TestWildcardFormatTableSourceFactory {
+  val CONNECTOR_TYPE_VALUE_WILDCARD = "wildcard"
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/formats/TableFormatFactoryServiceTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/formats/TableFormatFactoryServiceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/formats/TableFormatFactoryServiceTest.scala
deleted file mode 100644
index 7d7d2d2..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/formats/TableFormatFactoryServiceTest.scala
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.formats
-
-import java.util.{HashMap => JHashMap, Map => JMap}
-
-import org.apache.flink.table.api.{AmbiguousTableFormatException, NoMatchingTableFormatException}
-import org.apache.flink.table.descriptors.FormatDescriptorValidator
-import org.apache.flink.table.formats.TableFormatFactoryServiceTest.{COMMON_PATH, SPECIAL_PATH, TEST_FORMAT_TYPE, UNIQUE_PROPERTY}
-import org.apache.flink.table.formats.utils.{TestAmbiguousTableFormatFactory, TestTableFormatFactory}
-import org.junit.Assert.{assertNotNull, assertTrue}
-import org.junit.Test
-
-/**
-  * Tests for [[TableFormatFactoryService]]. The tests assume the two format factories
-  * [[TestTableFormatFactory]] and [[TestAmbiguousTableFormatFactory]] are registered.
-  *
-  * The first format does not support SPECIAL_PATH but supports schema derivation. The
-  * latter format does not support UNIQUE_PROPERTY nor schema derivation. Both formats
-  * have the same context and support COMMON_PATH.
-  */
-class TableFormatFactoryServiceTest {
-
-  @Test
-  def testValidProperties(): Unit = {
-    val props = properties()
-    assertNotNull(
-      TableFormatFactoryService.find(classOf[TableFormatFactory[_]], props))
-  }
-
-  @Test
-  def testDifferentContextVersion(): Unit = {
-    val props = properties()
-    props.put(FormatDescriptorValidator.FORMAT_PROPERTY_VERSION, "2")
-    // for now we support any property version, the property version should not affect the
-    // discovery at the moment and thus the format should still be found
-    val foundFactory = TableFormatFactoryService.find(classOf[TableFormatFactory[_]], props)
-    assertTrue(foundFactory.isInstanceOf[TestTableFormatFactory])
-  }
-
-  @Test
-  def testAmbiguousMoreSupportSelection(): Unit = {
-    val props = properties()
-    props.remove(UNIQUE_PROPERTY) // both formats match now
-    props.put(SPECIAL_PATH, "/what/ever") // now only TestAmbiguousTableFormatFactory
-    assertTrue(
-      TableFormatFactoryService
-        .find(classOf[TableFormatFactory[_]], props)
-        .isInstanceOf[TestAmbiguousTableFormatFactory])
-  }
-
-  @Test
-  def testAmbiguousClassBasedSelection(): Unit = {
-    val props = properties()
-    props.remove(UNIQUE_PROPERTY) // both formats match now
-    assertTrue(
-      TableFormatFactoryService
-        // we are looking for a particular class
-        .find(classOf[TestAmbiguousTableFormatFactory], props)
-        .isInstanceOf[TestAmbiguousTableFormatFactory])
-  }
-
-  @Test
-  def testAmbiguousSchemaBasedSelection(): Unit = {
-    val props = properties()
-    props.remove(UNIQUE_PROPERTY) // both formats match now
-    // this is unknown to the schema derivation factory
-    props.put("schema.unknown-schema-field", "unknown")
-
-    // the format with schema derivation feels not responsible because of this field,
-    // but since there is another format that feels responsible, no exception is thrown.
-    assertTrue(
-      TableFormatFactoryService
-        .find(classOf[TableFormatFactory[_]], props)
-        .isInstanceOf[TestAmbiguousTableFormatFactory])
-  }
-
-  @Test(expected = classOf[NoMatchingTableFormatException])
-  def testMissingClass(): Unit = {
-    val props = properties()
-    // this class is not a valid factory
-    TableFormatFactoryService.find(classOf[TableFormatFactoryServiceTest], props)
-  }
-
-  @Test(expected = classOf[NoMatchingTableFormatException])
-  def testInvalidContext(): Unit = {
-    val props = properties()
-    // no context specifies this
-    props.put(FormatDescriptorValidator.FORMAT_TYPE, "unknown_format_type")
-    TableFormatFactoryService.find(classOf[TableFormatFactory[_]], props)
-  }
-
-  @Test(expected = classOf[NoMatchingTableFormatException])
-  def testUnsupportedProperty(): Unit = {
-    val props = properties()
-    props.put("format.property_not_defined_by_any_factory", "/new/path")
-    TableFormatFactoryService.find(classOf[TableFormatFactory[_]], props)
-  }
-
-  @Test(expected = classOf[AmbiguousTableFormatException])
-  def testAmbiguousFactory(): Unit = {
-    val props = properties()
-    props.remove(UNIQUE_PROPERTY) // now both factories match
-    TableFormatFactoryService.find(classOf[TableFormatFactory[_]], props)
-  }
-
-  private def properties(): JMap[String, String] = {
-    val properties = new JHashMap[String, String]()
-    properties.put("connector.type", "test")
-    properties.put("format.type", TEST_FORMAT_TYPE)
-    properties.put(UNIQUE_PROPERTY, "true")
-    properties.put("connector.property-version", "1")
-    properties.put("format.property-version", "1")
-    properties.put(COMMON_PATH, "/path/to/target")
-    properties.put("schema.0.name", "a")
-    properties.put("schema.1.name", "b")
-    properties.put("schema.2.name", "c")
-    properties
-  }
-}
-
-object TableFormatFactoryServiceTest {
-
-  val TEST_FORMAT_TYPE = "test-format"
-  val COMMON_PATH = "format.common-path"
-  val SPECIAL_PATH = "format.special-path"
-  val UNIQUE_PROPERTY = "format.unique-property"
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/formats/utils/TestAmbiguousTableFormatFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/formats/utils/TestAmbiguousTableFormatFactory.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/formats/utils/TestAmbiguousTableFormatFactory.scala
deleted file mode 100644
index 1a30ac0..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/formats/utils/TestAmbiguousTableFormatFactory.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.formats.utils
-
-import java.util
-
-import org.apache.flink.table.descriptors.FormatDescriptorValidator
-import org.apache.flink.table.formats.{TableFormatFactory, TableFormatFactoryServiceTest}
-import org.apache.flink.types.Row
-
-/**
-  * Table format factory for testing.
-  *
-  * It does not support UNIQUE_PROPERTY compared to [[TestTableFormatFactory]] nor
-  * schema derivation. Both formats have the same context and support COMMON_PATH.
-  */
-class TestAmbiguousTableFormatFactory extends TableFormatFactory[Row] {
-
-  override def requiredContext(): util.Map[String, String] = {
-    val context = new util.HashMap[String, String]()
-    context.put(
-      FormatDescriptorValidator.FORMAT_TYPE,
-      TableFormatFactoryServiceTest.TEST_FORMAT_TYPE)
-    context.put(FormatDescriptorValidator.FORMAT_PROPERTY_VERSION, "1")
-    context
-  }
-
-  override def supportsSchemaDerivation(): Boolean = false // no schema derivation
-
-  override def supportedProperties(): util.List[String] = {
-    val properties = new util.ArrayList[String]()
-    properties.add(TableFormatFactoryServiceTest.COMMON_PATH)
-    properties.add(TableFormatFactoryServiceTest.SPECIAL_PATH)
-    properties
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/formats/utils/TestDeserializationSchema.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/formats/utils/TestDeserializationSchema.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/formats/utils/TestDeserializationSchema.scala
deleted file mode 100644
index 0b519ef..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/formats/utils/TestDeserializationSchema.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.formats.utils
-
-import org.apache.flink.api.common.serialization.DeserializationSchema
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.types.Row
-
-/**
-  * Deserialization schema for testing purposes.
-  */
-class TestDeserializationSchema(val typeInfo: TypeInformation[Row])
-  extends DeserializationSchema[Row] {
-
-  override def deserialize(message: Array[Byte]): Row = throw new UnsupportedOperationException()
-
-  override def isEndOfStream(nextElement: Row): Boolean = throw new UnsupportedOperationException()
-
-  override def getProducedType: TypeInformation[Row] = typeInfo
-
-  def canEqual(other: Any): Boolean = other.isInstanceOf[TestDeserializationSchema]
-
-  override def equals(other: Any): Boolean = other match {
-    case that: TestDeserializationSchema =>
-      (that canEqual this) &&
-        typeInfo == that.typeInfo
-    case _ => false
-  }
-
-  override def hashCode(): Int = {
-    val state = Seq(typeInfo)
-    state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/formats/utils/TestSerializationSchema.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/formats/utils/TestSerializationSchema.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/formats/utils/TestSerializationSchema.scala
deleted file mode 100644
index 7043eca..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/formats/utils/TestSerializationSchema.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.formats.utils
-
-import org.apache.flink.api.common.serialization.SerializationSchema
-import org.apache.flink.types.Row
-
-/**
-  * Serialization schema for testing purposes.
-  */
-class TestSerializationSchema extends SerializationSchema[Row] {
-
-  override def serialize(element: Row): Array[Byte] = throw new UnsupportedOperationException()
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/formats/utils/TestTableFormat.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/formats/utils/TestTableFormat.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/formats/utils/TestTableFormat.scala
deleted file mode 100644
index 38cef05..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/formats/utils/TestTableFormat.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.formats.utils
-
-import org.apache.flink.table.descriptors.{DescriptorProperties, FormatDescriptor}
-
-/**
-  * Format descriptor for testing purposes.
-  */
-class TestTableFormat extends FormatDescriptor("test-format", 1) {
-
-  override protected def addFormatProperties(properties: DescriptorProperties): Unit = {
-    properties.putString("format.important", "this is important")
-    properties.putString("format.path", "/path/to/sth")
-    properties.putString("format.derive-schema", "true")
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/formats/utils/TestTableFormatFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/formats/utils/TestTableFormatFactory.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/formats/utils/TestTableFormatFactory.scala
deleted file mode 100644
index efd9afe..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/formats/utils/TestTableFormatFactory.scala
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.formats.utils
-
-import java.util
-
-import org.apache.flink.api.common.serialization.DeserializationSchema
-import org.apache.flink.table.descriptors.{DescriptorProperties, FormatDescriptorValidator, SchemaValidator}
-import org.apache.flink.table.formats.{DeserializationSchemaFactory, TableFormatFactoryServiceTest}
-import org.apache.flink.types.Row
-
-/**
-  * Table format factory for testing.
-  *
-  * It has the same context as [[TestAmbiguousTableFormatFactory]] and both support COMMON_PATH.
-  * This format does not support SPECIAL_PATH but supports schema derivation.
-  */
-class TestTableFormatFactory extends DeserializationSchemaFactory[Row] {
-
-  override def requiredContext(): util.Map[String, String] = {
-    val context = new util.HashMap[String, String]()
-    context.put(
-      FormatDescriptorValidator.FORMAT_TYPE,
-      TableFormatFactoryServiceTest.TEST_FORMAT_TYPE)
-    context.put(FormatDescriptorValidator.FORMAT_PROPERTY_VERSION, "1")
-    context
-  }
-
-  override def supportsSchemaDerivation(): Boolean = true
-
-  override def supportedProperties(): util.List[String] = {
-    val properties = new util.ArrayList[String]()
-    properties.add(TableFormatFactoryServiceTest.UNIQUE_PROPERTY)
-    properties.add(TableFormatFactoryServiceTest.COMMON_PATH)
-    properties.add(FormatDescriptorValidator.FORMAT_DERIVE_SCHEMA)
-    properties.addAll(SchemaValidator.getSchemaDerivationKeys)
-    properties
-  }
-
-  override def createDeserializationSchema(
-      properties: util.Map[String, String])
-    : DeserializationSchema[Row] = {
-
-    val props = new DescriptorProperties(true)
-    props.putProperties(properties)
-    val schema = SchemaValidator.deriveFormatFields(props)
-    new TestDeserializationSchema(schema.toRowType)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/InMemoryTableFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/InMemoryTableFactory.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/InMemoryTableFactory.scala
index 3689965..8062b48 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/InMemoryTableFactory.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/InMemoryTableFactory.scala
@@ -21,11 +21,11 @@ package org.apache.flink.table.utils
 import java.util
 
 import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.table.connectors.{DiscoverableTableFactory, TableSinkFactory, TableSourceFactory}
 import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_PROPERTY_VERSION, CONNECTOR_TYPE}
 import org.apache.flink.table.descriptors.RowtimeValidator._
 import org.apache.flink.table.descriptors.SchemaValidator._
 import org.apache.flink.table.descriptors.{DescriptorProperties, SchemaValidator}
+import org.apache.flink.table.factories.{TableFactory, TableSinkFactory, TableSourceFactory}
 import org.apache.flink.table.sinks.TableSink
 import org.apache.flink.table.sources.TableSource
 import org.apache.flink.types.Row
@@ -39,7 +39,7 @@ import org.apache.flink.types.Row
   * @param terminationCount determines when to shutdown the streaming source function
   */
 class InMemoryTableFactory(terminationCount: Int) extends TableSourceFactory[Row]
-  with TableSinkFactory[Row] with DiscoverableTableFactory {
+  with TableSinkFactory[Row] with TableFactory {
   override def createTableSink(properties: util.Map[String, String]): TableSink[Row] = {
     val params: DescriptorProperties = new DescriptorProperties(true)
     params.putProperties(properties)