You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/07/15 09:24:47 UTC
[02/13] flink git commit: [FLINK-8866] [table] Merge table
source/sink/format factories
http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestDeserializationSchema.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestDeserializationSchema.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestDeserializationSchema.scala
new file mode 100644
index 0000000..dd723e2
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestDeserializationSchema.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories.utils
+
+import org.apache.flink.api.common.serialization.DeserializationSchema
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.types.Row
+
+/**
+ * Deserialization schema for testing purposes.
+ */
+class TestDeserializationSchema(val typeInfo: TypeInformation[Row])
+ extends DeserializationSchema[Row] {
+
+ override def deserialize(message: Array[Byte]): Row = throw new UnsupportedOperationException()
+
+ override def isEndOfStream(nextElement: Row): Boolean = throw new UnsupportedOperationException()
+
+ override def getProducedType: TypeInformation[Row] = typeInfo
+
+ def canEqual(other: Any): Boolean = other.isInstanceOf[TestDeserializationSchema]
+
+ override def equals(other: Any): Boolean = other match {
+ case that: TestDeserializationSchema =>
+ (that canEqual this) &&
+ typeInfo == that.typeInfo
+ case _ => false
+ }
+
+ override def hashCode(): Int = {
+ val state = Seq(typeInfo)
+ state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestFixedFormatTableFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestFixedFormatTableFactory.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestFixedFormatTableFactory.scala
new file mode 100644
index 0000000..07692be
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestFixedFormatTableFactory.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories.utils
+
+import java.util
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableSchema
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_PROPERTY_VERSION, CONNECTOR_TYPE}
+import org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_PROPERTY_VERSION, FORMAT_TYPE}
+import org.apache.flink.table.factories.TableSourceFactory
+import org.apache.flink.table.factories.utils.TestFixedFormatTableFactory._
+import org.apache.flink.table.sources.TableSource
+import org.apache.flink.types.Row
+
+/**
+ * Table source factory for testing with a fixed format.
+ */
+class TestFixedFormatTableFactory extends TableSourceFactory[Row] {
+
+ override def requiredContext(): util.Map[String, String] = {
+ val context = new util.HashMap[String, String]()
+ context.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE_FIXED)
+ context.put(FORMAT_TYPE, FORMAT_TYPE_VALUE_TEST)
+ context.put(CONNECTOR_PROPERTY_VERSION, "1")
+ context.put(FORMAT_PROPERTY_VERSION, "1")
+ context
+ }
+
+ override def supportedProperties(): util.List[String] = {
+ val properties = new util.ArrayList[String]()
+ properties.add("format.path")
+ properties.add("schema.#.name")
+ properties.add("schema.#.field.#.name")
+ properties
+ }
+
+ override def createTableSource(properties: util.Map[String, String]): TableSource[Row] = {
+ new TableSource[Row] {
+ override def getTableSchema: TableSchema = throw new UnsupportedOperationException()
+
+ override def getReturnType: TypeInformation[Row] = throw new UnsupportedOperationException()
+ }
+ }
+}
+
+object TestFixedFormatTableFactory {
+ val CONNECTOR_TYPE_VALUE_FIXED = "fixed"
+ val FORMAT_TYPE_VALUE_TEST = "test"
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestSerializationSchema.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestSerializationSchema.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestSerializationSchema.scala
new file mode 100644
index 0000000..ab613a9
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestSerializationSchema.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories.utils
+
+import org.apache.flink.api.common.serialization.SerializationSchema
+import org.apache.flink.types.Row
+
+/**
+ * Serialization schema for testing purposes.
+ */
+class TestSerializationSchema extends SerializationSchema[Row] {
+
+ override def serialize(element: Row): Array[Byte] = throw new UnsupportedOperationException()
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestTableFormat.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestTableFormat.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestTableFormat.scala
new file mode 100644
index 0000000..5e26995
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestTableFormat.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories.utils
+
+import org.apache.flink.table.descriptors.{DescriptorProperties, FormatDescriptor}
+
+/**
+ * Format descriptor for testing purposes.
+ */
+class TestTableFormat extends FormatDescriptor("test-format", 1) {
+
+ override protected def addFormatProperties(properties: DescriptorProperties): Unit = {
+ properties.putString("format.important", "this is important")
+ properties.putString("format.path", "/path/to/sth")
+ properties.putString("format.derive-schema", "true")
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestTableFormatFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestTableFormatFactory.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestTableFormatFactory.scala
new file mode 100644
index 0000000..475cff9
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestTableFormatFactory.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories.utils
+
+import java.util
+
+import org.apache.flink.api.common.serialization.DeserializationSchema
+import org.apache.flink.table.descriptors.{DescriptorProperties, FormatDescriptorValidator, SchemaValidator}
+import org.apache.flink.table.factories.{DeserializationSchemaFactory, TableFormatFactoryServiceTest}
+import org.apache.flink.types.Row
+
+/**
+ * Table format factory for testing.
+ *
+ * It has the same context as [[TestAmbiguousTableFormatFactory]] and both support COMMON_PATH.
+ * This format does not support SPECIAL_PATH but supports schema derivation.
+ */
+class TestTableFormatFactory extends DeserializationSchemaFactory[Row] {
+
+ override def requiredContext(): util.Map[String, String] = {
+ val context = new util.HashMap[String, String]()
+ context.put(
+ FormatDescriptorValidator.FORMAT_TYPE,
+ TableFormatFactoryServiceTest.TEST_FORMAT_TYPE)
+ context.put(FormatDescriptorValidator.FORMAT_PROPERTY_VERSION, "1")
+ context
+ }
+
+ override def supportsSchemaDerivation(): Boolean = true
+
+ override def supportedProperties(): util.List[String] = {
+ val properties = new util.ArrayList[String]()
+ properties.add(TableFormatFactoryServiceTest.UNIQUE_PROPERTY)
+ properties.add(TableFormatFactoryServiceTest.COMMON_PATH)
+ properties.add(FormatDescriptorValidator.FORMAT_DERIVE_SCHEMA)
+ properties.addAll(SchemaValidator.getSchemaDerivationKeys)
+ properties
+ }
+
+ override def createDeserializationSchema(
+ properties: util.Map[String, String])
+ : DeserializationSchema[Row] = {
+
+ val props = new DescriptorProperties(true)
+ props.putProperties(properties)
+ val schema = SchemaValidator.deriveFormatFields(props)
+ new TestDeserializationSchema(schema.toRowType)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestTableSinkFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestTableSinkFactory.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestTableSinkFactory.scala
new file mode 100644
index 0000000..bbffd31
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestTableSinkFactory.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories.utils
+
+import java.util
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
+import org.apache.flink.table.descriptors.FormatDescriptorValidator._
+import org.apache.flink.table.factories.utils.TestTableSinkFactory._
+import org.apache.flink.table.factories.{TableFactory, TableSinkFactory}
+import org.apache.flink.table.sinks.TableSink
+import org.apache.flink.types.Row
+
+/**
+ * Test table sink factory.
+ */
+class TestTableSinkFactory extends TableSinkFactory[Row] with TableFactory {
+
+ override def requiredContext(): util.Map[String, String] = {
+ val context = new util.HashMap[String, String]()
+ context.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE_TEST)
+ context.put(FORMAT_TYPE, FORMAT_TYPE_VALUE_TEST)
+ context.put(CONNECTOR_PROPERTY_VERSION, "1")
+ context.put(FORMAT_PROPERTY_VERSION, "1")
+ context
+ }
+
+ override def supportedProperties(): util.List[String] = {
+ val properties = new util.ArrayList[String]()
+ // connector
+ properties.add(FORMAT_PATH)
+ properties.add("schema.#.name")
+ properties.add("schema.#.field.#.name")
+ properties.add("failing")
+ properties
+ }
+
+ override def createTableSink(properties: util.Map[String, String]): TableSink[Row] = {
+ if (properties.get("failing") == "true") {
+ throw new IllegalArgumentException("Error in this factory.")
+ }
+ new TableSink[Row] {
+ override def getOutputType: TypeInformation[Row] = throw new UnsupportedOperationException()
+
+ override def getFieldNames: Array[String] = throw new UnsupportedOperationException()
+
+ override def getFieldTypes: Array[TypeInformation[_]] =
+ throw new UnsupportedOperationException()
+
+ override def configure(fieldNames: Array[String],
+ fieldTypes: Array[TypeInformation[_]]): TableSink[Row] =
+ throw new UnsupportedOperationException()
+ }
+ }
+}
+
+object TestTableSinkFactory {
+ val CONNECTOR_TYPE_VALUE_TEST = "test"
+ val FORMAT_TYPE_VALUE_TEST = "test"
+ val FORMAT_PATH = "format.path"
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestTableSourceFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestTableSourceFactory.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestTableSourceFactory.scala
new file mode 100644
index 0000000..ff3b24a
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestTableSourceFactory.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories.utils
+
+import java.util
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableSchema
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_PROPERTY_VERSION, CONNECTOR_TYPE}
+import org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_PROPERTY_VERSION, FORMAT_TYPE}
+import org.apache.flink.table.factories.utils.TestTableSinkFactory.{CONNECTOR_TYPE_VALUE_TEST, FORMAT_TYPE_VALUE_TEST}
+import org.apache.flink.table.factories.{TableFactory, TableSourceFactory}
+import org.apache.flink.table.sources.TableSource
+import org.apache.flink.types.Row
+
+/**
+ * Table source factory for testing.
+ */
+class TestTableSourceFactory extends TableSourceFactory[Row] with TableFactory {
+
+ override def requiredContext(): util.Map[String, String] = {
+ val context = new util.HashMap[String, String]()
+ context.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE_TEST)
+ context.put(FORMAT_TYPE, FORMAT_TYPE_VALUE_TEST)
+ context.put(CONNECTOR_PROPERTY_VERSION, "1")
+ context.put(FORMAT_PROPERTY_VERSION, "1")
+ context
+ }
+
+ override def supportedProperties(): util.List[String] = {
+ val properties = new util.ArrayList[String]()
+ // connector
+ properties.add("format.path")
+ properties.add("schema.#.name")
+ properties.add("schema.#.field.#.name")
+ properties.add("failing")
+ properties
+ }
+
+ override def createTableSource(properties: util.Map[String, String]): TableSource[Row] = {
+ if (properties.get("failing") == "true") {
+ throw new IllegalArgumentException("Error in this factory.")
+ }
+ new TableSource[Row] {
+ override def getTableSchema: TableSchema = throw new UnsupportedOperationException()
+
+ override def getReturnType: TypeInformation[Row] = throw new UnsupportedOperationException()
+ }
+ }
+}
+
+object TestTableSourceFactory {
+ val CONNECTOR_TYPE_VALUE_TEST = "test"
+ val FORMAT_TYPE_VALUE_TEST = "test"
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestWildcardFormatTableSourceFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestWildcardFormatTableSourceFactory.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestWildcardFormatTableSourceFactory.scala
new file mode 100644
index 0000000..5463422
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestWildcardFormatTableSourceFactory.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories.utils
+
+import java.util
+
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_PROPERTY_VERSION, CONNECTOR_TYPE}
+import org.apache.flink.table.factories.TableSourceFactory
+import org.apache.flink.table.factories.utils.TestWildcardFormatTableSourceFactory.CONNECTOR_TYPE_VALUE_WILDCARD
+import org.apache.flink.table.sources.TableSource
+import org.apache.flink.types.Row
+
+/**
+ * Table source factory for testing with a wildcard format ("format.*").
+ */
+class TestWildcardFormatTableSourceFactory
+ extends TableSourceFactory[Row] {
+
+ override def requiredContext(): util.Map[String, String] = {
+ val context = new util.HashMap[String, String]()
+ context.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE_WILDCARD)
+ context.put(CONNECTOR_PROPERTY_VERSION, "1")
+ context
+ }
+
+ override def supportedProperties(): util.List[String] = {
+ val properties = new util.ArrayList[String]()
+ properties.add("format.*")
+ properties.add("schema.#.name")
+ properties.add("schema.#.field.#.name")
+ properties
+ }
+
+ override def createTableSource(properties: util.Map[String, String]): TableSource[Row] = {
+ throw new UnsupportedOperationException()
+ }
+}
+
+object TestWildcardFormatTableSourceFactory {
+ val CONNECTOR_TYPE_VALUE_WILDCARD = "wildcard"
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/formats/TableFormatFactoryServiceTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/formats/TableFormatFactoryServiceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/formats/TableFormatFactoryServiceTest.scala
deleted file mode 100644
index 7d7d2d2..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/formats/TableFormatFactoryServiceTest.scala
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.formats
-
-import java.util.{HashMap => JHashMap, Map => JMap}
-
-import org.apache.flink.table.api.{AmbiguousTableFormatException, NoMatchingTableFormatException}
-import org.apache.flink.table.descriptors.FormatDescriptorValidator
-import org.apache.flink.table.formats.TableFormatFactoryServiceTest.{COMMON_PATH, SPECIAL_PATH, TEST_FORMAT_TYPE, UNIQUE_PROPERTY}
-import org.apache.flink.table.formats.utils.{TestAmbiguousTableFormatFactory, TestTableFormatFactory}
-import org.junit.Assert.{assertNotNull, assertTrue}
-import org.junit.Test
-
-/**
- * Tests for [[TableFormatFactoryService]]. The tests assume the two format factories
- * [[TestTableFormatFactory]] and [[TestAmbiguousTableFormatFactory]] are registered.
- *
- * The first format does not support SPECIAL_PATH but supports schema derivation. The
- * latter format does not support UNIQUE_PROPERTY nor schema derivation. Both formats
- * have the same context and support COMMON_PATH.
- */
-class TableFormatFactoryServiceTest {
-
- @Test
- def testValidProperties(): Unit = {
- val props = properties()
- assertNotNull(
- TableFormatFactoryService.find(classOf[TableFormatFactory[_]], props))
- }
-
- @Test
- def testDifferentContextVersion(): Unit = {
- val props = properties()
- props.put(FormatDescriptorValidator.FORMAT_PROPERTY_VERSION, "2")
- // for now we support any property version, the property version should not affect the
- // discovery at the moment and thus the format should still be found
- val foundFactory = TableFormatFactoryService.find(classOf[TableFormatFactory[_]], props)
- assertTrue(foundFactory.isInstanceOf[TestTableFormatFactory])
- }
-
- @Test
- def testAmbiguousMoreSupportSelection(): Unit = {
- val props = properties()
- props.remove(UNIQUE_PROPERTY) // both formats match now
- props.put(SPECIAL_PATH, "/what/ever") // now only TestAmbiguousTableFormatFactory
- assertTrue(
- TableFormatFactoryService
- .find(classOf[TableFormatFactory[_]], props)
- .isInstanceOf[TestAmbiguousTableFormatFactory])
- }
-
- @Test
- def testAmbiguousClassBasedSelection(): Unit = {
- val props = properties()
- props.remove(UNIQUE_PROPERTY) // both formats match now
- assertTrue(
- TableFormatFactoryService
- // we are looking for a particular class
- .find(classOf[TestAmbiguousTableFormatFactory], props)
- .isInstanceOf[TestAmbiguousTableFormatFactory])
- }
-
- @Test
- def testAmbiguousSchemaBasedSelection(): Unit = {
- val props = properties()
- props.remove(UNIQUE_PROPERTY) // both formats match now
- // this is unknown to the schema derivation factory
- props.put("schema.unknown-schema-field", "unknown")
-
- // the format with schema derivation feels not responsible because of this field,
- // but since there is another format that feels responsible, no exception is thrown.
- assertTrue(
- TableFormatFactoryService
- .find(classOf[TableFormatFactory[_]], props)
- .isInstanceOf[TestAmbiguousTableFormatFactory])
- }
-
- @Test(expected = classOf[NoMatchingTableFormatException])
- def testMissingClass(): Unit = {
- val props = properties()
- // this class is not a valid factory
- TableFormatFactoryService.find(classOf[TableFormatFactoryServiceTest], props)
- }
-
- @Test(expected = classOf[NoMatchingTableFormatException])
- def testInvalidContext(): Unit = {
- val props = properties()
- // no context specifies this
- props.put(FormatDescriptorValidator.FORMAT_TYPE, "unknown_format_type")
- TableFormatFactoryService.find(classOf[TableFormatFactory[_]], props)
- }
-
- @Test(expected = classOf[NoMatchingTableFormatException])
- def testUnsupportedProperty(): Unit = {
- val props = properties()
- props.put("format.property_not_defined_by_any_factory", "/new/path")
- TableFormatFactoryService.find(classOf[TableFormatFactory[_]], props)
- }
-
- @Test(expected = classOf[AmbiguousTableFormatException])
- def testAmbiguousFactory(): Unit = {
- val props = properties()
- props.remove(UNIQUE_PROPERTY) // now both factories match
- TableFormatFactoryService.find(classOf[TableFormatFactory[_]], props)
- }
-
- private def properties(): JMap[String, String] = {
- val properties = new JHashMap[String, String]()
- properties.put("connector.type", "test")
- properties.put("format.type", TEST_FORMAT_TYPE)
- properties.put(UNIQUE_PROPERTY, "true")
- properties.put("connector.property-version", "1")
- properties.put("format.property-version", "1")
- properties.put(COMMON_PATH, "/path/to/target")
- properties.put("schema.0.name", "a")
- properties.put("schema.1.name", "b")
- properties.put("schema.2.name", "c")
- properties
- }
-}
-
-object TableFormatFactoryServiceTest {
-
- val TEST_FORMAT_TYPE = "test-format"
- val COMMON_PATH = "format.common-path"
- val SPECIAL_PATH = "format.special-path"
- val UNIQUE_PROPERTY = "format.unique-property"
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/formats/utils/TestAmbiguousTableFormatFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/formats/utils/TestAmbiguousTableFormatFactory.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/formats/utils/TestAmbiguousTableFormatFactory.scala
deleted file mode 100644
index 1a30ac0..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/formats/utils/TestAmbiguousTableFormatFactory.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.formats.utils
-
-import java.util
-
-import org.apache.flink.table.descriptors.FormatDescriptorValidator
-import org.apache.flink.table.formats.{TableFormatFactory, TableFormatFactoryServiceTest}
-import org.apache.flink.types.Row
-
-/**
- * Table format factory for testing.
- *
- * It does not support UNIQUE_PROPERTY compared to [[TestTableFormatFactory]] nor
- * schema derivation. Both formats have the same context and support COMMON_PATH.
- */
-class TestAmbiguousTableFormatFactory extends TableFormatFactory[Row] {
-
- override def requiredContext(): util.Map[String, String] = {
- val context = new util.HashMap[String, String]()
- context.put(
- FormatDescriptorValidator.FORMAT_TYPE,
- TableFormatFactoryServiceTest.TEST_FORMAT_TYPE)
- context.put(FormatDescriptorValidator.FORMAT_PROPERTY_VERSION, "1")
- context
- }
-
- override def supportsSchemaDerivation(): Boolean = false // no schema derivation
-
- override def supportedProperties(): util.List[String] = {
- val properties = new util.ArrayList[String]()
- properties.add(TableFormatFactoryServiceTest.COMMON_PATH)
- properties.add(TableFormatFactoryServiceTest.SPECIAL_PATH)
- properties
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/formats/utils/TestDeserializationSchema.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/formats/utils/TestDeserializationSchema.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/formats/utils/TestDeserializationSchema.scala
deleted file mode 100644
index 0b519ef..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/formats/utils/TestDeserializationSchema.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.formats.utils
-
-import org.apache.flink.api.common.serialization.DeserializationSchema
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.types.Row
-
-/**
- * Deserialization schema for testing purposes.
- */
-class TestDeserializationSchema(val typeInfo: TypeInformation[Row])
- extends DeserializationSchema[Row] {
-
- override def deserialize(message: Array[Byte]): Row = throw new UnsupportedOperationException()
-
- override def isEndOfStream(nextElement: Row): Boolean = throw new UnsupportedOperationException()
-
- override def getProducedType: TypeInformation[Row] = typeInfo
-
- def canEqual(other: Any): Boolean = other.isInstanceOf[TestDeserializationSchema]
-
- override def equals(other: Any): Boolean = other match {
- case that: TestDeserializationSchema =>
- (that canEqual this) &&
- typeInfo == that.typeInfo
- case _ => false
- }
-
- override def hashCode(): Int = {
- val state = Seq(typeInfo)
- state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/formats/utils/TestSerializationSchema.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/formats/utils/TestSerializationSchema.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/formats/utils/TestSerializationSchema.scala
deleted file mode 100644
index 7043eca..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/formats/utils/TestSerializationSchema.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.formats.utils
-
-import org.apache.flink.api.common.serialization.SerializationSchema
-import org.apache.flink.types.Row
-
-/**
- * Serialization schema for testing purposes.
- */
-class TestSerializationSchema extends SerializationSchema[Row] {
-
- override def serialize(element: Row): Array[Byte] = throw new UnsupportedOperationException()
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/formats/utils/TestTableFormat.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/formats/utils/TestTableFormat.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/formats/utils/TestTableFormat.scala
deleted file mode 100644
index 38cef05..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/formats/utils/TestTableFormat.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.formats.utils
-
-import org.apache.flink.table.descriptors.{DescriptorProperties, FormatDescriptor}
-
-/**
- * Format descriptor for testing purposes.
- */
-class TestTableFormat extends FormatDescriptor("test-format", 1) {
-
- override protected def addFormatProperties(properties: DescriptorProperties): Unit = {
- properties.putString("format.important", "this is important")
- properties.putString("format.path", "/path/to/sth")
- properties.putString("format.derive-schema", "true")
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/formats/utils/TestTableFormatFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/formats/utils/TestTableFormatFactory.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/formats/utils/TestTableFormatFactory.scala
deleted file mode 100644
index efd9afe..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/formats/utils/TestTableFormatFactory.scala
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.formats.utils
-
-import java.util
-
-import org.apache.flink.api.common.serialization.DeserializationSchema
-import org.apache.flink.table.descriptors.{DescriptorProperties, FormatDescriptorValidator, SchemaValidator}
-import org.apache.flink.table.formats.{DeserializationSchemaFactory, TableFormatFactoryServiceTest}
-import org.apache.flink.types.Row
-
-/**
- * Table format factory for testing.
- *
- * It has the same context as [[TestAmbiguousTableFormatFactory]] and both support COMMON_PATH.
- * This format does not support SPECIAL_PATH but supports schema derivation.
- */
-class TestTableFormatFactory extends DeserializationSchemaFactory[Row] {
-
- override def requiredContext(): util.Map[String, String] = {
- val context = new util.HashMap[String, String]()
- context.put(
- FormatDescriptorValidator.FORMAT_TYPE,
- TableFormatFactoryServiceTest.TEST_FORMAT_TYPE)
- context.put(FormatDescriptorValidator.FORMAT_PROPERTY_VERSION, "1")
- context
- }
-
- override def supportsSchemaDerivation(): Boolean = true
-
- override def supportedProperties(): util.List[String] = {
- val properties = new util.ArrayList[String]()
- properties.add(TableFormatFactoryServiceTest.UNIQUE_PROPERTY)
- properties.add(TableFormatFactoryServiceTest.COMMON_PATH)
- properties.add(FormatDescriptorValidator.FORMAT_DERIVE_SCHEMA)
- properties.addAll(SchemaValidator.getSchemaDerivationKeys)
- properties
- }
-
- override def createDeserializationSchema(
- properties: util.Map[String, String])
- : DeserializationSchema[Row] = {
-
- val props = new DescriptorProperties(true)
- props.putProperties(properties)
- val schema = SchemaValidator.deriveFormatFields(props)
- new TestDeserializationSchema(schema.toRowType)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/abbb8905/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/InMemoryTableFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/InMemoryTableFactory.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/InMemoryTableFactory.scala
index 3689965..8062b48 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/InMemoryTableFactory.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/InMemoryTableFactory.scala
@@ -21,11 +21,11 @@ package org.apache.flink.table.utils
import java.util
import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.table.connectors.{DiscoverableTableFactory, TableSinkFactory, TableSourceFactory}
import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_PROPERTY_VERSION, CONNECTOR_TYPE}
import org.apache.flink.table.descriptors.RowtimeValidator._
import org.apache.flink.table.descriptors.SchemaValidator._
import org.apache.flink.table.descriptors.{DescriptorProperties, SchemaValidator}
+import org.apache.flink.table.factories.{TableFactory, TableSinkFactory, TableSourceFactory}
import org.apache.flink.table.sinks.TableSink
import org.apache.flink.table.sources.TableSource
import org.apache.flink.types.Row
@@ -39,7 +39,7 @@ import org.apache.flink.types.Row
* @param terminationCount determines when to shutdown the streaming source function
*/
class InMemoryTableFactory(terminationCount: Int) extends TableSourceFactory[Row]
- with TableSinkFactory[Row] with DiscoverableTableFactory {
+ with TableSinkFactory[Row] with TableFactory {
override def createTableSink(properties: util.Map[String, String]): TableSink[Row] = {
val params: DescriptorProperties = new DescriptorProperties(true)
params.putProperties(properties)