You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/05/31 11:41:47 UTC
flink git commit: [FLINK-6780] [table] ExternalTableSource should add
time attributes in the row type.
Repository: flink
Updated Branches:
refs/heads/master e38c9fb80 -> 1beab3813
[FLINK-6780] [table] ExternalTableSource should add time attributes in the row type.
This closes #4023.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1beab381
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1beab381
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1beab381
Branch: refs/heads/master
Commit: 1beab38134a77ccebe35ea2c199bce5c13db8c60
Parents: e38c9fb
Author: Haohui Mai <wh...@apache.org>
Authored: Wed May 31 01:40:53 2017 -0700
Committer: twalthr <tw...@apache.org>
Committed: Wed May 31 13:40:47 2017 +0200
----------------------------------------------------------------------
.../table/catalog/ExternalTableSourceUtil.scala | 18 ++++--
.../catalog/ExternalTableSourceUtilTest.scala | 63 ++++++++++++++++++++
2 files changed, 77 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/1beab381/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala
index 0964bbc..ccc2e9e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala
@@ -18,15 +18,15 @@
package org.apache.flink.table.catalog
-import java.lang.reflect.Modifier
import java.net.URL
import org.apache.commons.configuration.{ConfigurationException, ConversionException, PropertiesConfiguration}
+import org.apache.flink.annotation.VisibleForTesting
import org.apache.flink.table.annotation.TableType
import org.apache.flink.table.api.{AmbiguousTableSourceConverterException, NoMatchedTableSourceConverterException}
-import org.apache.flink.table.plan.schema.TableSourceTable
+import org.apache.flink.table.plan.schema.{StreamTableSourceTable, TableSourceTable}
import org.apache.flink.table.plan.stats.FlinkStatistic
-import org.apache.flink.table.sources.TableSource
+import org.apache.flink.table.sources.{StreamTableSource, TableSource}
import org.apache.flink.util.InstantiationUtil
import org.reflections.Reflections
import org.slf4j.{Logger, LoggerFactory}
@@ -83,6 +83,13 @@ object ExternalTableSourceUtil {
registeredConverters
}
+ @VisibleForTesting
+ private[flink] def injectTableSourceConverter(
+ tableType: String,
+ converterClazz: Class[_ <: TableSourceConverter[_]]) = {
+ tableTypeToTableSourceConvertersClazz.addBinding(tableType, converterClazz)
+ }
+
/**
* Converts an [[ExternalCatalogTable]] instance to a [[TableSourceTable]] instance
*
@@ -119,7 +126,10 @@ object ExternalTableSourceUtil {
} else {
FlinkStatistic.UNKNOWN
}
- new TableSourceTable(convertedTableSource, flinkStatistic)
+ convertedTableSource match {
+ case s : StreamTableSource[_] => new StreamTableSourceTable(s, flinkStatistic)
+ case _ => new TableSourceTable(convertedTableSource, flinkStatistic)
+ }
}
case None =>
LOG.error(s"Cannot find any TableSourceConverter binded to table type [$tableType]. " +
http://git-wip-us.apache.org/repos/asf/flink/blob/1beab381/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalTableSourceUtilTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalTableSourceUtilTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalTableSourceUtilTest.scala
new file mode 100644
index 0000000..82bfd8d
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalTableSourceUtilTest.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog
+
+import java.util.{Collections => JCollections, Set => JSet}
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+import org.apache.flink.table.api.{TableSchema, Types}
+import org.apache.flink.table.plan.schema.StreamTableSourceTable
+import org.apache.flink.table.sources.StreamTableSource
+import org.apache.flink.types.Row
+import org.junit.Assert.assertTrue
+import org.junit.{Before, Test}
+
+class ExternalTableSourceUtilTest {
+
+ @Before
+ def setUp() : Unit = {
+ ExternalTableSourceUtil.injectTableSourceConverter("mock", classOf[MockTableSourceConverter])
+ }
+
+ @Test
+ def testExternalStreamTable() = {
+ val schema = new TableSchema(Array("foo"), Array(BasicTypeInfo.INT_TYPE_INFO))
+ val table = ExternalCatalogTable("mock", schema)
+ val tableSource = ExternalTableSourceUtil.fromExternalCatalogTable(table)
+ assertTrue(tableSource.isInstanceOf[StreamTableSourceTable[_]])
+ }
+}
+
+class MockTableSourceConverter extends TableSourceConverter[StreamTableSource[Row]] {
+ override def requiredProperties: JSet[String] = JCollections.emptySet()
+ override def fromExternalCatalogTable(externalCatalogTable: ExternalCatalogTable)
+ : StreamTableSource[Row] = {
+ new StreamTableSource[Row] {
+ override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] =
+ throw new UnsupportedOperationException
+
+ override def getReturnType: TypeInformation[Row] = {
+ val schema = externalCatalogTable.schema
+ Types.ROW(schema.getColumnNames, schema.getTypes)
+ }
+ }
+ }
+}