You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/07/15 09:24:56 UTC
[11/13] flink git commit: [FLINK-8866] [table] Create unified
interfaces to configure and instatiate TableSinks
http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaValidatorTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaValidatorTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaValidatorTest.scala
index bf7b84b..c558057 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaValidatorTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaValidatorTest.scala
@@ -20,7 +20,7 @@ package org.apache.flink.table.descriptors
import java.util.Optional
-import org.apache.flink.table.api.{TableSchema, Types}
+import org.apache.flink.table.api.{TableException, TableSchema, Types}
import org.apache.flink.table.sources.tsextractors.{ExistingField, StreamRecordTimestamp}
import org.apache.flink.table.sources.wmstrategies.PreserveWatermarks
import org.junit.Assert.{assertEquals, assertTrue}
@@ -78,6 +78,40 @@ class SchemaValidatorTest {
assertEquals(expectedFormatSchema, formatSchema)
}
+ @Test(expected = classOf[TableException])
+ def testDeriveTableSinkSchemaWithRowtimeFromSource(): Unit = {
+ val desc1 = Schema()
+ .field("otherField", Types.STRING).from("csvField")
+ .field("abcField", Types.STRING)
+ .field("p", Types.SQL_TIMESTAMP).proctime()
+ .field("r", Types.SQL_TIMESTAMP).rowtime(
+ Rowtime().timestampsFromSource().watermarksFromSource())
+ val props = new DescriptorProperties()
+ desc1.addProperties(props)
+
+ SchemaValidator.deriveTableSinkSchema(props)
+ }
+
+ @Test
+ def testDeriveTableSinkSchemaWithRowtimeFromField(): Unit = {
+ val desc1 = Schema()
+ .field("otherField", Types.STRING).from("csvField")
+ .field("abcField", Types.STRING)
+ .field("p", Types.SQL_TIMESTAMP).proctime()
+ .field("r", Types.SQL_TIMESTAMP).rowtime(
+ Rowtime().timestampsFromField("myTime").watermarksFromSource())
+ val props = new DescriptorProperties()
+ desc1.addProperties(props)
+
+ val expectedTableSinkSchema = TableSchema.builder()
+ .field("csvField", Types.STRING) // aliased
+ .field("abcField", Types.STRING)
+ .field("myTime", Types.SQL_TIMESTAMP)
+ .build()
+
+ assertEquals(expectedTableSinkSchema, SchemaValidator.deriveTableSinkSchema(props))
+ }
+
@Test
def testSchemaWithRowtimeFromField(): Unit = {
val desc1 = Schema()
http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/TableSourceDescriptorTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/TableSourceDescriptorTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/TableSourceDescriptorTest.scala
index 91f3aac..a7dd644 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/TableSourceDescriptorTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/TableSourceDescriptorTest.scala
@@ -25,6 +25,9 @@ import org.junit.Test
import scala.collection.JavaConverters._
+/**
+ * Tests for [[TableSourceDescriptor]].
+ */
class TableSourceDescriptorTest extends TableTestBase {
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableEnvironmentITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableEnvironmentITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableEnvironmentITCase.scala
index be8278f..07839e6 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableEnvironmentITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableEnvironmentITCase.scala
@@ -24,7 +24,7 @@ import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.table.runtime.utils.TableProgramsCollectionTestBase
import org.apache.flink.table.runtime.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.table.utils.MemoryTableSinkUtil
+import org.apache.flink.table.utils.MemoryTableSourceSinkUtil
import org.apache.flink.test.util.TestBaseUtils
import org.apache.flink.types.Row
import org.junit.Assert.assertEquals
@@ -120,14 +120,14 @@ class TableEnvironmentITCase(
def testInsertIntoMemoryTable(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
- MemoryTableSinkUtil.clear
+ MemoryTableSourceSinkUtil.clear()
val t = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
tEnv.registerTable("sourceTable", t)
val fieldNames = Array("d", "e", "f")
val fieldTypes = tEnv.scan("sourceTable").getSchema.getTypes
- val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink
+ val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
val sql = "INSERT INTO targetTable SELECT a, b, c FROM sourceTable"
@@ -135,7 +135,7 @@ class TableEnvironmentITCase(
env.execute()
val expected = List("1,1,Hi", "2,2,Hello", "3,2,Hello world")
- assertEquals(expected.sorted, MemoryTableSinkUtil.results.sorted)
+ assertEquals(expected.sorted, MemoryTableSourceSinkUtil.tableDataStrings.sorted)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableEnvironmentITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableEnvironmentITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableEnvironmentITCase.scala
index 5295e7c..67fde38 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableEnvironmentITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableEnvironmentITCase.scala
@@ -26,7 +26,7 @@ import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.table.runtime.utils.TableProgramsTestBase.TableConfigMode
import org.apache.flink.table.runtime.utils.{TableProgramsCollectionTestBase, TableProgramsTestBase}
-import org.apache.flink.table.utils.MemoryTableSinkUtil
+import org.apache.flink.table.utils.MemoryTableSourceSinkUtil
import org.apache.flink.test.util.TestBaseUtils
import org.apache.flink.types.Row
import org.junit.Assert.assertEquals
@@ -186,14 +186,14 @@ class TableEnvironmentITCase(
def testInsertIntoMemoryTable(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
- MemoryTableSinkUtil.clear
+ MemoryTableSourceSinkUtil.clear()
val t = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
tEnv.registerTable("sourceTable", t)
val fieldNames = Array("d", "e", "f")
val fieldTypes = tEnv.scan("sourceTable").getSchema.getTypes
- val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink
+ val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
tEnv.scan("sourceTable")
@@ -202,7 +202,7 @@ class TableEnvironmentITCase(
env.execute()
val expected = List("1,1,Hi", "2,2,Hello", "3,2,Hello world")
- assertEquals(expected.sorted, MemoryTableSinkUtil.results.sorted)
+ assertEquals(expected.sorted, MemoryTableSourceSinkUtil.tableDataStrings.sorted)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
index c553ee6..85a9647 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
@@ -35,7 +35,7 @@ import org.apache.flink.table.plan.TimeIndicatorConversionTest.TableFunc
import org.apache.flink.table.runtime.stream.TimeAttributesITCase.{AtomicTimestampWithEqualWatermark, TestPojo, TimestampWithEqualWatermark, TimestampWithEqualWatermarkPojo}
import org.apache.flink.table.runtime.utils.JavaPojos.Pojo1
import org.apache.flink.table.runtime.utils.StreamITCase
-import org.apache.flink.table.utils.{MemoryTableSinkUtil, TestTableSourceWithTime}
+import org.apache.flink.table.utils.{MemoryTableSourceSinkUtil, TestTableSourceWithTime}
import org.apache.flink.test.util.AbstractTestBase
import org.apache.flink.types.Row
import org.junit.Assert._
@@ -183,7 +183,7 @@ class TimeAttributesITCase extends AbstractTestBase {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val tEnv = TableEnvironment.getTableEnvironment(env)
- MemoryTableSinkUtil.clear
+ MemoryTableSourceSinkUtil.clear()
val stream = env
.fromCollection(data)
@@ -191,7 +191,7 @@ class TimeAttributesITCase extends AbstractTestBase {
stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string)
.filter('rowtime.cast(Types.LONG) > 4)
.select('rowtime, 'rowtime.floor(TimeIntervalUnit.DAY), 'rowtime.ceil(TimeIntervalUnit.DAY))
- .writeToSink(new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink)
+ .writeToSink(new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink)
env.execute()
@@ -199,7 +199,7 @@ class TimeAttributesITCase extends AbstractTestBase {
"1970-01-01 00:00:00.007,1970-01-01 00:00:00.0,1970-01-02 00:00:00.0",
"1970-01-01 00:00:00.008,1970-01-01 00:00:00.0,1970-01-02 00:00:00.0",
"1970-01-01 00:00:00.016,1970-01-01 00:00:00.0,1970-01-02 00:00:00.0")
- assertEquals(expected.sorted, MemoryTableSinkUtil.results.sorted)
+ assertEquals(expected.sorted, MemoryTableSourceSinkUtil.tableDataStrings.sorted)
}
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
index e132349..2228bdd 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
@@ -27,13 +27,14 @@ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.table.api.{TableEnvironment, Types}
import org.apache.flink.table.api.scala._
+import org.apache.flink.table.descriptors.{DescriptorProperties, Rowtime, Schema}
import org.apache.flink.table.expressions.utils.SplitUDF
import org.apache.flink.table.expressions.utils.Func15
import org.apache.flink.table.runtime.stream.sql.SqlITCase.TimestampAndWatermarkWithOffset
import org.apache.flink.table.runtime.utils.TimeTestUtil.EventTimeSourceFunction
import org.apache.flink.table.runtime.utils.{JavaUserDefinedTableFunctions, StreamITCase, StreamTestData, StreamingWithStateTestBase}
import org.apache.flink.types.Row
-import org.apache.flink.table.utils.MemoryTableSinkUtil
+import org.apache.flink.table.utils.{InMemoryTableFactory, MemoryTableSourceSinkUtil}
import org.junit.Assert._
import org.junit._
@@ -691,7 +692,7 @@ class SqlITCase extends StreamingWithStateTestBase {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val tEnv = TableEnvironment.getTableEnvironment(env)
- MemoryTableSinkUtil.clear
+ MemoryTableSourceSinkUtil.clear()
val t = StreamTestData.getSmall3TupleDataStream(env)
.assignAscendingTimestamps(x => x._2)
@@ -701,7 +702,7 @@ class SqlITCase extends StreamingWithStateTestBase {
val fieldNames = Array("d", "e", "f", "t")
val fieldTypes = Array(Types.INT, Types.LONG, Types.STRING, Types.SQL_TIMESTAMP)
.asInstanceOf[Array[TypeInformation[_]]]
- val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink
+ val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
val sql = "INSERT INTO targetTable SELECT a, b, c, rowtime FROM sourceTable"
@@ -712,7 +713,46 @@ class SqlITCase extends StreamingWithStateTestBase {
"1,1,Hi,1970-01-01 00:00:00.001",
"2,2,Hello,1970-01-01 00:00:00.002",
"3,2,Hello world,1970-01-01 00:00:00.002")
- assertEquals(expected.sorted, MemoryTableSinkUtil.results.sorted)
+ assertEquals(expected.sorted, MemoryTableSourceSinkUtil.tableDataStrings.sorted)
+ }
+
+ @Test
+ def testWriteReadTableSourceSink(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ MemoryTableSourceSinkUtil.clear()
+
+ val desc = Schema()
+ .field("a", Types.INT)
+ .field("e", Types.LONG)
+ .field("f", Types.STRING)
+ .field("t", Types.SQL_TIMESTAMP)
+ .rowtime(Rowtime().timestampsFromField("t").watermarksPeriodicAscending())
+ .field("proctime", Types.SQL_TIMESTAMP).proctime()
+ val props = new DescriptorProperties()
+ desc.addProperties(props)
+
+ val t = StreamTestData.getSmall3TupleDataStream(env)
+ .assignAscendingTimestamps(x => x._2)
+ .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+ tEnv.registerTable("sourceTable", t)
+
+ tEnv.registerTableSource("targetTable",
+ new InMemoryTableFactory(3).createTableSource(props.asMap))
+ tEnv.registerTableSink("targetTable",
+ new InMemoryTableFactory(3).createTableSink(props.asMap))
+
+ tEnv.sqlUpdate("INSERT INTO targetTable SELECT a, b, c, rowtime FROM sourceTable")
+ tEnv.sqlQuery("SELECT a, e, f, t from targetTable")
+ .addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+
+ val expected = List(
+ "1,1,Hi,1970-01-01 00:00:00.001",
+ "2,2,Hello,1970-01-01 00:00:00.002",
+ "3,2,Hello world,1970-01-01 00:00:00.002")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
index 467d9d3..70e59f3 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
@@ -35,7 +35,7 @@ import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.{TableEnvironment, TableException, Types}
import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData}
import org.apache.flink.table.sinks._
-import org.apache.flink.table.utils.MemoryTableSinkUtil
+import org.apache.flink.table.utils.MemoryTableSourceSinkUtil
import org.apache.flink.test.util.{AbstractTestBase, TestBaseUtils}
import org.apache.flink.types.Row
import org.apache.flink.util.Collector
@@ -54,13 +54,13 @@ class TableSinkITCase extends AbstractTestBase {
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val tEnv = TableEnvironment.getTableEnvironment(env)
- MemoryTableSinkUtil.clear
+ MemoryTableSourceSinkUtil.clear
val input = StreamTestData.get3TupleDataStream(env)
.assignAscendingTimestamps(r => r._2)
val fieldNames = Array("d", "e", "t")
val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.SQL_TIMESTAMP, Types.LONG)
- val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink
+ val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
input.toTable(tEnv, 'a, 'b, 'c, 't.rowtime)
@@ -75,7 +75,7 @@ class TableSinkITCase extends AbstractTestBase {
"Comment#14,1970-01-01 00:00:00.006,6",
"Comment#15,1970-01-01 00:00:00.006,6").mkString("\n")
- TestBaseUtils.compareResultAsText(MemoryTableSinkUtil.results.asJava, expected)
+ TestBaseUtils.compareResultAsText(MemoryTableSourceSinkUtil.tableData.asJava, expected)
}
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TableSourceFactoryServiceTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TableSourceFactoryServiceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TableSourceFactoryServiceTest.scala
deleted file mode 100644
index e70c4f1..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TableSourceFactoryServiceTest.scala
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sources
-
-import org.apache.flink.table.api.{NoMatchingTableSourceException, TableException, ValidationException}
-import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_PROPERTY_VERSION, CONNECTOR_TYPE}
-import org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_PROPERTY_VERSION, FORMAT_TYPE}
-import org.junit.Assert.assertTrue
-import org.junit.Test
-
-import scala.collection.mutable
-
-class TableSourceFactoryServiceTest {
-
- @Test
- def testValidProperties(): Unit = {
- val props = properties()
- props.put(CONNECTOR_TYPE, "fixed")
- props.put(FORMAT_TYPE, "test")
- assertTrue(TableSourceFactoryService.findAndCreateTableSource(props.toMap) != null)
- }
-
- @Test(expected = classOf[NoMatchingTableSourceException])
- def testInvalidContext(): Unit = {
- val props = properties()
- props.put(CONNECTOR_TYPE, "FAIL")
- props.put(FORMAT_TYPE, "test")
- TableSourceFactoryService.findAndCreateTableSource(props.toMap)
- }
-
- @Test
- def testDifferentContextVersion(): Unit = {
- val props = properties()
- props.put(CONNECTOR_TYPE, "fixed")
- props.put(FORMAT_TYPE, "test")
- props.put(CONNECTOR_PROPERTY_VERSION, "2")
- // the table source should still be found
- assertTrue(TableSourceFactoryService.findAndCreateTableSource(props.toMap) != null)
- }
-
- @Test(expected = classOf[ValidationException])
- def testUnsupportedProperty(): Unit = {
- val props = properties()
- props.put(CONNECTOR_TYPE, "fixed")
- props.put(FORMAT_TYPE, "test")
- props.put("format.path_new", "/new/path")
- TableSourceFactoryService.findAndCreateTableSource(props.toMap)
- }
-
- @Test(expected = classOf[TableException])
- def testFailingFactory(): Unit = {
- val props = properties()
- props.put(CONNECTOR_TYPE, "fixed")
- props.put(FORMAT_TYPE, "test")
- props.put("failing", "true")
- TableSourceFactoryService.findAndCreateTableSource(props.toMap)
- }
-
- @Test
- def testWildcardFormat(): Unit = {
- val props = properties()
- props.put(CONNECTOR_TYPE, "wildcard")
- props.put(FORMAT_TYPE, "test")
- props.put("format.type", "not-test")
- props.put("format.not-test-property", "wildcard-property")
- val actualTableSource = TableSourceFactoryService.findAndCreateTableSource(props.toMap)
- assertTrue(actualTableSource.isInstanceOf[TestWildcardFormatTableSourceFactory])
- }
-
- private def properties(): mutable.Map[String, String] = {
- val properties = mutable.Map[String, String]()
- properties.put(CONNECTOR_PROPERTY_VERSION, "1")
- properties.put(FORMAT_PROPERTY_VERSION, "1")
- properties.put("format.path", "/path/to/target")
- properties.put("schema.0.name", "a")
- properties.put("schema.1.name", "b")
- properties.put("schema.2.name", "c")
- properties.put("schema.0.field.0.name", "a")
- properties.put("schema.0.field.1.name", "b")
- properties.put("schema.0.field.2.name", "c")
- properties.put("failing", "false")
- properties
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TestFixedFormatTableFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TestFixedFormatTableFactory.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TestFixedFormatTableFactory.scala
deleted file mode 100644
index 6a8041b..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TestFixedFormatTableFactory.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sources
-
-import java.util
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.api.TableSchema
-import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_PROPERTY_VERSION, CONNECTOR_TYPE}
-import org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_PROPERTY_VERSION, FORMAT_TYPE}
-import org.apache.flink.types.Row
-
-/**
- * Table source factory for testing with a fixed format.
- */
-class TestFixedFormatTableFactory extends TableSourceFactory[Row] {
-
- override def requiredContext(): util.Map[String, String] = {
- val context = new util.HashMap[String, String]()
- context.put(CONNECTOR_TYPE, "fixed")
- context.put(FORMAT_TYPE, "test")
- context.put(CONNECTOR_PROPERTY_VERSION, "1")
- context.put(FORMAT_PROPERTY_VERSION, "1")
- context
- }
-
- override def supportedProperties(): util.List[String] = {
- val properties = new util.ArrayList[String]()
- properties.add("format.path")
- properties.add("schema.#.name")
- properties.add("schema.#.field.#.name")
- properties.add("failing")
- properties
- }
-
- override def create(properties: util.Map[String, String]): TableSource[Row] = {
- if (properties.get("failing") == "true") {
- throw new IllegalArgumentException("Error in this factory.")
- }
- new TableSource[Row] {
- override def getTableSchema: TableSchema = throw new UnsupportedOperationException()
-
- override def getReturnType: TypeInformation[Row] = throw new UnsupportedOperationException()
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TestWildcardFormatTableSourceFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TestWildcardFormatTableSourceFactory.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TestWildcardFormatTableSourceFactory.scala
deleted file mode 100644
index 91598a6..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TestWildcardFormatTableSourceFactory.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sources
-
-import java.util
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.api.TableSchema
-import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_PROPERTY_VERSION, CONNECTOR_TYPE}
-import org.apache.flink.types.Row
-
-/**
- * Table source factory for testing with a wildcard format ("format.*").
- */
-class TestWildcardFormatTableSourceFactory extends TableSourceFactory[Row] with TableSource[Row] {
-
- override def requiredContext(): util.Map[String, String] = {
- val context = new util.HashMap[String, String]()
- context.put(CONNECTOR_TYPE, "wildcard")
- context.put(CONNECTOR_PROPERTY_VERSION, "1")
- context
- }
-
- override def supportedProperties(): util.List[String] = {
- val properties = new util.ArrayList[String]()
- properties.add("format.*")
- properties.add("schema.#.name")
- properties.add("schema.#.field.#.name")
- properties.add("failing")
- properties
- }
-
- override def create(properties: util.Map[String, String]): TableSource[Row] = {
- this
- }
-
- override def getTableSchema: TableSchema = throw new UnsupportedOperationException()
-
- override def getReturnType: TypeInformation[Row] = throw new UnsupportedOperationException()
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/InMemoryTableFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/InMemoryTableFactory.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/InMemoryTableFactory.scala
new file mode 100644
index 0000000..3689965
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/InMemoryTableFactory.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils
+
+import java.util
+
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.table.connectors.{DiscoverableTableFactory, TableSinkFactory, TableSourceFactory}
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_PROPERTY_VERSION, CONNECTOR_TYPE}
+import org.apache.flink.table.descriptors.RowtimeValidator._
+import org.apache.flink.table.descriptors.SchemaValidator._
+import org.apache.flink.table.descriptors.{DescriptorProperties, SchemaValidator}
+import org.apache.flink.table.sinks.TableSink
+import org.apache.flink.table.sources.TableSource
+import org.apache.flink.types.Row
+
+/**
+ * Factory for creating stream table sources and sinks.
+ *
+ * See [[MemoryTableSourceSinkUtil.UnsafeMemoryTableSource]] and
+ * [[MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink]].
+ *
+ * @param terminationCount determines when to shutdown the streaming source function
+ */
+class InMemoryTableFactory(terminationCount: Int) extends TableSourceFactory[Row]
+ with TableSinkFactory[Row] with DiscoverableTableFactory {
+ override def createTableSink(properties: util.Map[String, String]): TableSink[Row] = {
+ val params: DescriptorProperties = new DescriptorProperties(true)
+ params.putProperties(properties)
+
+ // validate
+ new SchemaValidator(true, true, true).validate(params)
+
+ val tableSchema = SchemaValidator.deriveTableSinkSchema(params)
+
+ new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink()
+ .configure(tableSchema.getColumnNames, tableSchema.getTypes)
+ }
+
+ override def createTableSource(properties: util.Map[String, String]): TableSource[Row] = {
+ val params: DescriptorProperties = new DescriptorProperties(true)
+ params.putProperties(properties)
+
+ // validate
+ new SchemaValidator(true, true, true).validate(params)
+
+ val tableSchema = SchemaValidator.deriveTableSourceSchema(params)
+
+ // proctime
+ val proctimeAttributeOpt = SchemaValidator.deriveProctimeAttribute(params)
+
+ val (names, types) = tableSchema.getColumnNames.zip(tableSchema.getTypes)
+ .filter(_._1 != proctimeAttributeOpt.get()).unzip
+ // rowtime
+ val rowtimeDescriptors = SchemaValidator.deriveRowtimeAttributes(params)
+ new MemoryTableSourceSinkUtil.UnsafeMemoryTableSource(
+ tableSchema,
+ new RowTypeInfo(types, names),
+ rowtimeDescriptors,
+ proctimeAttributeOpt.get(),
+ terminationCount)
+ }
+
+ override def requiredContext(): util.Map[String, String] = {
+ val context: util.Map[String, String] = new util.HashMap[String, String]
+ context.put(CONNECTOR_TYPE, "memory")
+ context.put(CONNECTOR_PROPERTY_VERSION, "1") // backwards compatibility
+
+ context
+ }
+
+ override def supportedProperties(): util.List[String] = {
+ val properties = new util.ArrayList[String]()
+
+ // schema
+ properties.add(SCHEMA + ".#." + SCHEMA_TYPE)
+ properties.add(SCHEMA + ".#." + SCHEMA_NAME)
+ properties.add(SCHEMA + ".#." + SCHEMA_FROM)
+
+ // time attributes
+ properties.add(SCHEMA + ".#." + SCHEMA_PROCTIME)
+ properties.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_TYPE)
+ properties.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_FROM)
+ properties.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_CLASS)
+ properties.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_SERIALIZED)
+ properties.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_TYPE)
+ properties.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_CLASS)
+ properties.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_SERIALIZED)
+ properties.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_DELAY)
+
+ properties
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MemoryTableSinkUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MemoryTableSinkUtil.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MemoryTableSinkUtil.scala
deleted file mode 100644
index 6d6307b..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MemoryTableSinkUtil.scala
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.utils
-
-import org.apache.flink.api.common.io.RichOutputFormat
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
-import org.apache.flink.table.sinks.{AppendStreamTableSink, BatchTableSink, TableSinkBase}
-import org.apache.flink.table.util.TableConnectorUtil
-import org.apache.flink.types.Row
-
-import scala.collection.mutable
-
-object MemoryTableSinkUtil {
- var results: mutable.MutableList[String] = mutable.MutableList.empty[String]
-
- def clear = {
- MemoryTableSinkUtil.results.clear()
- }
-
- final class UnsafeMemoryAppendTableSink
- extends TableSinkBase[Row] with BatchTableSink[Row]
- with AppendStreamTableSink[Row] {
-
- override def getOutputType: TypeInformation[Row] = {
- new RowTypeInfo(getFieldTypes, getFieldNames)
- }
-
- override protected def copy: TableSinkBase[Row] = {
- new UnsafeMemoryAppendTableSink
- }
-
- override def emitDataSet(dataSet: DataSet[Row]): Unit = {
- dataSet
- .output(new MemoryCollectionOutputFormat)
- .name(TableConnectorUtil.generateRuntimeName(this.getClass, getFieldNames))
- }
-
- override def emitDataStream(dataStream: DataStream[Row]): Unit = {
- dataStream
- .addSink(new MemoryAppendSink)
- .name(TableConnectorUtil.generateRuntimeName(this.getClass, getFieldNames))
- }
- }
-
- private class MemoryAppendSink extends RichSinkFunction[Row]() {
-
- override def invoke(value: Row): Unit = {
- results.synchronized {
- results += value.toString
- }
- }
- }
-
- private class MemoryCollectionOutputFormat extends RichOutputFormat[Row] {
-
- override def configure(parameters: Configuration): Unit = {}
-
- override def open(taskNumber: Int, numTasks: Int): Unit = {}
-
- override def writeRecord(record: Row): Unit = {
- results.synchronized {
- results += record.toString
- }
- }
-
- override def close(): Unit = {}
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MemoryTableSourceSinkUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MemoryTableSourceSinkUtil.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MemoryTableSourceSinkUtil.scala
new file mode 100644
index 0000000..cb0ad43
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MemoryTableSourceSinkUtil.scala
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils
+
+import java.util
+
+import org.apache.flink.api.common.io.RichOutputFormat
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.table.api.TableSchema
+import org.apache.flink.table.sinks.{AppendStreamTableSink, BatchTableSink, TableSinkBase}
+import org.apache.flink.table.sources._
+import org.apache.flink.table.util.TableConnectorUtil
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+/**
+ * Utilities to ingest and retrieve results into and from a table program.
+ */
+object MemoryTableSourceSinkUtil {
+
+ val tableData: mutable.ListBuffer[Row] = mutable.ListBuffer[Row]()
+
+ def tableDataStrings: Seq[String] = tableData.map(_.toString)
+
+ def clear(): Unit = {
+ MemoryTableSourceSinkUtil.tableData.clear()
+ }
+
+ class UnsafeMemoryTableSource(
+ tableSchema: TableSchema,
+ returnType: TypeInformation[Row],
+ rowtimeAttributeDescriptor: util.List[RowtimeAttributeDescriptor],
+ proctime: String,
+ val terminationCount: Int)
+ extends BatchTableSource[Row]
+ with StreamTableSource[Row]
+ with DefinedProctimeAttribute
+ with DefinedRowtimeAttributes {
+
+ override def getReturnType: TypeInformation[Row] = returnType
+
+ override def getTableSchema: TableSchema = tableSchema
+
+ override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = {
+ execEnv.fromCollection(tableData.asJava, returnType)
+ }
+
+ final class InMemorySourceFunction(var count: Int = terminationCount)
+ extends SourceFunction[Row] {
+
+ override def cancel(): Unit = throw new UnsupportedOperationException()
+
+ override def run(ctx: SourceContext[Row]): Unit = {
+ while (count > 0) {
+ tableData.synchronized {
+ if (tableData.nonEmpty) {
+ val r = tableData.remove(0)
+ ctx.collect(r)
+ count -= 1
+ }
+ }
+ }
+ }
+ }
+
+ override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
+ execEnv.addSource(new InMemorySourceFunction, returnType)
+ }
+
+ override def getProctimeAttribute: String = proctime
+
+ override def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor] = {
+ rowtimeAttributeDescriptor
+ }
+ }
+
+ final class UnsafeMemoryAppendTableSink
+ extends TableSinkBase[Row] with BatchTableSink[Row]
+ with AppendStreamTableSink[Row] {
+
+ override def getOutputType: TypeInformation[Row] = {
+ new RowTypeInfo(getFieldTypes, getFieldNames)
+ }
+
+ override protected def copy: TableSinkBase[Row] = {
+ new UnsafeMemoryAppendTableSink
+ }
+
+ override def emitDataSet(dataSet: DataSet[Row]): Unit = {
+ dataSet
+ .output(new MemoryCollectionOutputFormat)
+ .name(TableConnectorUtil.generateRuntimeName(this.getClass, getFieldNames))
+ }
+
+ override def emitDataStream(dataStream: DataStream[Row]): Unit = {
+ dataStream
+ .addSink(new MemoryAppendSink)
+ .name(TableConnectorUtil.generateRuntimeName(this.getClass, getFieldNames))
+ }
+ }
+
+ private class MemoryAppendSink extends RichSinkFunction[Row]() {
+
+ override def invoke(value: Row): Unit = {
+ tableData.synchronized {
+ tableData += Row.copy(value)
+ }
+ }
+ }
+
+ private class MemoryCollectionOutputFormat extends RichOutputFormat[Row] {
+
+ override def configure(parameters: Configuration): Unit = {}
+
+ override def open(taskNumber: Int, numTasks: Int): Unit = {}
+
+ override def writeRecord(record: Row): Unit = {
+ tableData.synchronized {
+ tableData += Row.copy(record)
+ }
+ }
+
+ override def close(): Unit = {}
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
index 308ead2..c25f30f 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
@@ -46,6 +46,8 @@ class MockTableEnvironment extends TableEnvironment(new TableConfig) {
fieldTypes: Array[TypeInformation[_]],
tableSink: TableSink[_]): Unit = ???
+ override def registerTableSink(name: String, tableSink: TableSink[_]): Unit = ???
+
override protected def createUniqueTableName(): String = ???
override protected def registerTableSourceInternal(name: String, tableSource: TableSource[_])