You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/09/15 22:45:33 UTC

[GitHub] fhueske commented on a change in pull request #6508: [Flink-10079] [table] Support external sink table in the INSERT INTO clause

fhueske commented on a change in pull request #6508: [Flink-10079] [table] Support external sink table in the INSERT INTO clause 
URL: https://github.com/apache/flink/pull/6508#discussion_r217896469
 
 

 ##########
 File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ##########
 @@ -820,20 +820,59 @@ abstract class TableEnvironment(val config: TableConfig) {
 
   /**
     * Checks if a table is registered under the given name.
+    * Internal and external catalogs are both checked.
     *
     * @param name The table name to check.
     * @return true, if a table is registered under the name, false otherwise.
     */
   protected[flink] def isRegistered(name: String): Boolean = {
-    rootSchema.getTableNames.contains(name)
+    var isRegistered = rootSchema.getTableNames.contains(name)
+
+    // check if the table exists in external catalogs
+    if (!isRegistered) {
+      val (externalSchema, externalTableName) = resolveExternalTable(name)
+      if (externalSchema != null) {
+        isRegistered = externalSchema.getTableNames.contains(externalTableName)
+      }
+    }
+
+    return isRegistered
   }
 
   protected def getTable(name: String): org.apache.calcite.schema.Table = {
-    rootSchema.getTable(name)
+    var table = rootSchema.getTable(name)
+
+    // check if the table exists in external catalogs
+    if (table == null) {
+      val (externalSchema, externalTableName) = resolveExternalTable(name)
+      if (externalSchema != null) {
+        table = externalSchema.getTable(externalTableName)
+      }
+    }
+
+    return table
   }
 
   protected def getRowType(name: String): RelDataType = {
-    rootSchema.getTable(name).getRowType(typeFactory)
+    val table = getTable(name)
+    if (table != null) {
+      table.getRowType(typeFactory)
+    }
+
+    return null
+  }
+
+  protected def resolveExternalTable(name: String): (SchemaPlus, String) = {
 
 Review comment:
   this function can be inlined into `getTable()`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services