You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/05/31 11:41:47 UTC

flink git commit: [FLINK-6780] [table] ExternalTableSource should add time attributes in the row type.

Repository: flink
Updated Branches:
  refs/heads/master e38c9fb80 -> 1beab3813


[FLINK-6780] [table] ExternalTableSource should add time attributes in the row type.

This closes #4023.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1beab381
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1beab381
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1beab381

Branch: refs/heads/master
Commit: 1beab38134a77ccebe35ea2c199bce5c13db8c60
Parents: e38c9fb
Author: Haohui Mai <wh...@apache.org>
Authored: Wed May 31 01:40:53 2017 -0700
Committer: twalthr <tw...@apache.org>
Committed: Wed May 31 13:40:47 2017 +0200

----------------------------------------------------------------------
 .../table/catalog/ExternalTableSourceUtil.scala | 18 ++++--
 .../catalog/ExternalTableSourceUtilTest.scala   | 63 ++++++++++++++++++++
 2 files changed, 77 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1beab381/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala
index 0964bbc..ccc2e9e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala
@@ -18,15 +18,15 @@
 
 package org.apache.flink.table.catalog
 
-import java.lang.reflect.Modifier
 import java.net.URL
 
 import org.apache.commons.configuration.{ConfigurationException, ConversionException, PropertiesConfiguration}
+import org.apache.flink.annotation.VisibleForTesting
 import org.apache.flink.table.annotation.TableType
 import org.apache.flink.table.api.{AmbiguousTableSourceConverterException, NoMatchedTableSourceConverterException}
-import org.apache.flink.table.plan.schema.TableSourceTable
+import org.apache.flink.table.plan.schema.{StreamTableSourceTable, TableSourceTable}
 import org.apache.flink.table.plan.stats.FlinkStatistic
-import org.apache.flink.table.sources.TableSource
+import org.apache.flink.table.sources.{StreamTableSource, TableSource}
 import org.apache.flink.util.InstantiationUtil
 import org.reflections.Reflections
 import org.slf4j.{Logger, LoggerFactory}
@@ -83,6 +83,13 @@ object ExternalTableSourceUtil {
     registeredConverters
   }
 
+  @VisibleForTesting
+  private[flink] def injectTableSourceConverter(
+    tableType: String,
+    converterClazz: Class[_ <: TableSourceConverter[_]]) = {
+    tableTypeToTableSourceConvertersClazz.addBinding(tableType, converterClazz)
+  }
+
   /**
     * Converts an [[ExternalCatalogTable]] instance to a [[TableSourceTable]] instance
     *
@@ -119,7 +126,10 @@ object ExternalTableSourceUtil {
           } else {
             FlinkStatistic.UNKNOWN
           }
-          new TableSourceTable(convertedTableSource, flinkStatistic)
+          convertedTableSource match {
+            case s : StreamTableSource[_] => new StreamTableSourceTable(s, flinkStatistic)
+            case _ => new TableSourceTable(convertedTableSource, flinkStatistic)
+          }
         }
       case None =>
         LOG.error(s"Cannot find any TableSourceConverter binded to table type [$tableType]. " +

http://git-wip-us.apache.org/repos/asf/flink/blob/1beab381/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalTableSourceUtilTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalTableSourceUtilTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalTableSourceUtilTest.scala
new file mode 100644
index 0000000..82bfd8d
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalTableSourceUtilTest.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog
+
+import java.util.{Collections => JCollections, Set => JSet}
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+import org.apache.flink.table.api.{TableSchema, Types}
+import org.apache.flink.table.plan.schema.StreamTableSourceTable
+import org.apache.flink.table.sources.StreamTableSource
+import org.apache.flink.types.Row
+import org.junit.Assert.assertTrue
+import org.junit.{Before, Test}
+
+class ExternalTableSourceUtilTest {
+
+  @Before
+  def setUp() : Unit = {
+    ExternalTableSourceUtil.injectTableSourceConverter("mock", classOf[MockTableSourceConverter])
+  }
+
+  @Test
+  def testExternalStreamTable() = {
+    val schema = new TableSchema(Array("foo"), Array(BasicTypeInfo.INT_TYPE_INFO))
+    val table = ExternalCatalogTable("mock", schema)
+    val tableSource = ExternalTableSourceUtil.fromExternalCatalogTable(table)
+    assertTrue(tableSource.isInstanceOf[StreamTableSourceTable[_]])
+  }
+}
+
+class MockTableSourceConverter extends TableSourceConverter[StreamTableSource[Row]] {
+  override def requiredProperties: JSet[String] = JCollections.emptySet()
+  override def fromExternalCatalogTable(externalCatalogTable: ExternalCatalogTable)
+    : StreamTableSource[Row] = {
+    new StreamTableSource[Row] {
+      override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] =
+        throw new UnsupportedOperationException
+
+      override def getReturnType: TypeInformation[Row] = {
+        val schema = externalCatalogTable.schema
+        Types.ROW(schema.getColumnNames, schema.getTypes)
+      }
+    }
+  }
+}