You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by beyond1920 <gi...@git.apache.org> on 2017/02/24 06:59:57 UTC

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

GitHub user beyond1920 opened a pull request:

    https://github.com/apache/flink/pull/3406

    [flink-5568] [Table API & SQL]Introduce interface for catalog, and provide an in-memory implementation. Integrate external catalog with calcite catalog

    This pr aims to introduce interface for catalog, and provide an in-memory implementation for test and develop, finally integrate external catalog with calcite catalog.
    The main change including:
    1. Introduce ExternalCatalog abstraction, including introduce ExternalCatalogDatabase as database   in catalog and ExternalCatalogTable as table in catalog.
    2. Provide an in-memory implementation for test and develop.
    3. Introduce ExternalCatalogSchema which is an implementation of Calcite Schema interface. It registers database in ExternalCatalog as calcite Schemas, and tables in a database as Calcite table.
    4. Add ExternalCatalogCompatible annotation. The TableSource with this annotation represents it could be converted to or from externalCatalogTable. ExternalCatalogTableConverter is the converter between externalCatalogTable and tableSource.
    5. Introduce CatalogTableHelper utility. It has two responsibilities: * automatically find the TableSources which are with ExternalCatalogCompatible annotation. * convert an ExternalCatalogTable instance to a TableSourceTable instance.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/alibaba/flink dev

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3406.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3406
    
----
commit d0e1ab20078adc4f788e9c2d2c167f0251ae3476
Author: jingzhang <be...@126.com>
Date:   2017-02-22T11:28:08Z

    Introduce interface for external catalog, and provide an in-memory implementation for test or develop. Integrate with calcite catalog.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r104156636
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CatalogTableHelper.scala ---
    @@ -0,0 +1,139 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.catalog
    +
    +import java.io.IOException
    +import java.lang.reflect.Modifier
    +import java.net.URL
    +import java.util.Properties
    +
    +import org.apache.flink.table.annotation.ExternalCatalogCompatible
    +import org.apache.flink.table.api.{ExternalCatalogTableTypeAlreadyExistException, ExternalCatalogTableTypeNotExistException}
    +import org.apache.flink.table.plan.schema.TableSourceTable
    +import org.apache.flink.table.plan.stats.FlinkStatistic
    +import org.apache.flink.table.sources.TableSource
    +import org.apache.flink.util.InstantiationUtil
    +import org.reflections.Reflections
    +import org.slf4j.{Logger, LoggerFactory}
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.HashMap
    +
    +/**
    +  * The utility class is used to convert ExternalCatalogTable to TableSourceTable.
    +  */
    +object CatalogTableHelper {
    +
    +  // config file to specifier the scan package to search tableSources
    +  // which is compatible with external catalog.
    +  private val tableSourceConfigFileName = "externalcatalogTable.properties"
    +
    +  private val LOG: Logger = LoggerFactory.getLogger(this.getClass)
    +
    +  // registered tableSources which are compatible with external catalog.
    +  // Key is table type name, Value is converter class.
    +  private val tableSourceTypeToConvertersClazz = {
    +    val registeredConverters =
    +      new HashMap[String, Class[_ <: TableSourceConverter[_]]]
    +    // scan all config file to find all tableSources which are compatible with external catalog.
    +    val resourceUrls = getClass.getClassLoader.getResources(tableSourceConfigFileName)
    +    while (resourceUrls.hasMoreElements) {
    +      val url = resourceUrls.nextElement()
    +      parseScanPackageFromConfigFile(url) match {
    +        case Some(scanPackage) =>
    +          val clazzWithAnnotations = new Reflections(scanPackage)
    +              .getTypesAnnotatedWith(classOf[ExternalCatalogCompatible])
    +          clazzWithAnnotations.asScala.foreach(clazz =>
    +            if (classOf[TableSource[_]].isAssignableFrom(clazz)) {
    +              if (Modifier.isAbstract(clazz.getModifiers()) ||
    +                  Modifier.isInterface(clazz.getModifiers)) {
    +                LOG.warn(
    +                  s"class :[${clazz.getName}] is also with ExternalCatalogCompatible annotation, " +
    +                      s"but it's an abstract clazz or an interface.")
    +              } else {
    +                val externalCatalogCompatible: ExternalCatalogCompatible =
    +                  clazz.getAnnotation(classOf[ExternalCatalogCompatible])
    +                val tableSourceName = externalCatalogCompatible.tableType()
    +                if (registeredConverters.contains(tableSourceName)) {
    +                  LOG.error(s"The table type [$tableSourceName] is already registered.")
    +                  throw new ExternalCatalogTableTypeAlreadyExistException(tableSourceName)
    --- End diff --
    
    Is this a reason to fail or is a warn log message sufficient?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by beyond1920 <gi...@git.apache.org>.
Github user beyond1920 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r105362292
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalog.scala ---
    @@ -0,0 +1,114 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.catalog
    +
    +import java.util
    +
    +/**
    +  * This class is responsible for interact with external catalog.
    +  * Its main responsibilities including:
    +  * <ul>
    +  * <li> create/drop/alter database or tables for DDL operations
    +  * <li> provide tables for calcite catalog, it looks up databases or tables in the external catalog
    +  * </ul>
    +  */
    +trait ExternalCatalog {
    +
    +  /**
    +    * Adds table into external Catalog
    +    *
    +    * @param table          description of table which to create
    +    * @param ignoreIfExists whether to ignore operation if table already exists
    --- End diff --
    
    Good advice, I would change the comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r105138128
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalog.scala ---
    @@ -0,0 +1,114 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.catalog
    +
    +import java.util
    +
    +/**
    +  * This class is responsible for interact with external catalog.
    +  * Its main responsibilities including:
    +  * <ul>
    +  * <li> create/drop/alter database or tables for DDL operations
    +  * <li> provide tables for calcite catalog, it looks up databases or tables in the external catalog
    +  * </ul>
    +  */
    +trait ExternalCatalog {
    +
    +  /**
    +    * Adds table into external Catalog
    +    *
    +    * @param table          description of table which to create
    +    * @param ignoreIfExists whether to ignore operation if table already exists
    --- End diff --
    
    I think we should make the expected behavior of all methods more clear: When is an operation performed, when is an exception thrown, etc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r104022427
  
    --- Diff: flink-libraries/flink-table/pom.xml ---
    @@ -92,7 +92,12 @@ under the License.
     			</exclusions>
     		</dependency>
     
    -
    +		<dependency>
    --- End diff --
    
    We need to check that the license is compatible with AL2 even though the license says *"Do what you want"* .


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by beyond1920 <gi...@git.apache.org>.
Github user beyond1920 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r104343679
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala ---
    @@ -68,6 +62,90 @@ object CommonTestData {
         )
       }
     
    +  def getMockedFlinkExternalCatalog: ExternalCatalog = {
    +    val csvRecord1 = Seq(
    --- End diff --
    
    val csvRecord1 = Seq() is just csv data, maybe it's better to name it as csvDataRecords?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r104167970
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalog.scala ---
    @@ -0,0 +1,108 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.catalog
    +
    +/**
    +  * This class is responsible for interact with external catalog.
    +  * Its main responsibilities including:
    +  * <ul>
    +  * <li> create/drop/alter database or tables for DDL operations
    +  * <li> provide tables for calcite catalog, it looks up databases or tables in the external catalog
    +  * </ul>
    +  */
    +trait ExternalCatalog {
    +
    +  /**
    +    * Adds table into external Catalog
    +    *
    +    * @param table          description of table which to create
    +    * @param ignoreIfExists whether to ignore operation if table already exists
    +    */
    +  def createTable(table: ExternalCatalogTable, ignoreIfExists: Boolean): Unit
    +
    +  /**
    +    * Deletes table from external Catalog
    +    *
    +    * @param dbName            database name
    +    * @param tableName         table name
    +    * @param ignoreIfNotExists whether to ignore operation if table not exist yet
    +    */
    +  def dropTable(dbName: String, tableName: String, ignoreIfNotExists: Boolean): Unit
    +
    +  /**
    +    * Alters existed table into external Catalog
    +    *
    +    * @param table description of table which to alter
    +    */
    +  def alterTable(table: ExternalCatalogTable): Unit
    +
    +  /**
    +    * Gets table from external Catalog
    +    *
    +    * @param dbName    database name
    +    * @param tableName table name
    +    * @return table
    +    */
    +  def getTable(dbName: String, tableName: String): ExternalCatalogTable
    +
    +  /**
    +    * Gets the table name lists from current external Catalog
    +    *
    +    * @param dbName database name
    +    * @return lists of table name
    +    */
    +  def listTables(dbName: String): Seq[String]
    --- End diff --
    
    Java `List` or `Set` instead of `Seq`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3406: [flink-5568] [Table API & SQL]Introduce interface for cat...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/3406
  
    Hi @KurtYoung, you are right. Only `requiredProperties()` would be required to verify properties. 
    
    I thought that the other two methods would be a good way to define the parameters of the converter. They could be used to print a usage message or details when the properties are not matched. We can also leave those out if you think that the implementation overhead does not correspond to the gains.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by beyond1920 <gi...@git.apache.org>.
Github user beyond1920 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r104343965
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala ---
    @@ -68,6 +62,90 @@ object CommonTestData {
         )
       }
     
    +  def getMockedFlinkExternalCatalog: ExternalCatalog = {
    +    val csvRecord1 = Seq(
    +      "1#1#Hi",
    +      "2#2#Hello",
    +      "3#2#Hello world"
    +    )
    +    val tempFilePath1 = writeToTempFile(csvRecord1.mkString("$"), "csv-test1", "tmp")
    +    val externalCatalogTable1 = ExternalCatalogTable(
    +      TableIdentifier("db1", "tb1"),
    +      "csv",
    +      DataSchema(
    +        Array(
    +          BasicTypeInfo.INT_TYPE_INFO,
    +          BasicTypeInfo.LONG_TYPE_INFO,
    +          BasicTypeInfo.STRING_TYPE_INFO),
    +        Array("a", "b", "c")
    +      ),
    +      properties = Map(
    +        "path" -> tempFilePath1,
    +        "fieldDelim" -> "#",
    +        "rowDelim" -> "$"
    +      )
    +    )
    +
    +    val csvRecord2 = Seq(
    +      "1#1#0#Hallo#1",
    +      "2#2#1#Hallo Welt#2",
    +      "2#3#2#Hallo Welt wie#1",
    +      "3#4#3#Hallo Welt wie gehts?#2",
    +      "3#5#4#ABC#2",
    +      "3#6#5#BCD#3",
    +      "4#7#6#CDE#2",
    +      "4#8#7#DEF#1",
    +      "4#9#8#EFG#1",
    +      "4#10#9#FGH#2",
    +      "5#11#10#GHI#1",
    +      "5#12#11#HIJ#3",
    +      "5#13#12#IJK#3",
    +      "5#14#13#JKL#2",
    +      "5#15#14#KLM#2"
    +    )
    +    val tempFilePath2 = writeToTempFile(csvRecord2.mkString("$"), "csv-test2", "tmp")
    +    val externalCatalogTable2 = ExternalCatalogTable(
    +      TableIdentifier("db2", "tb2"),
    +      "csv",
    +      DataSchema(
    +        Array(
    +          BasicTypeInfo.INT_TYPE_INFO,
    +          BasicTypeInfo.LONG_TYPE_INFO,
    +          BasicTypeInfo.INT_TYPE_INFO,
    +          BasicTypeInfo.STRING_TYPE_INFO,
    +          BasicTypeInfo.LONG_TYPE_INFO),
    +        Array("d", "e", "f", "g", "h")
    +      ),
    +      properties = Map(
    +        "path" -> tempFilePath2,
    +        "fieldDelim" -> "#",
    +        "rowDelim" -> "$"
    +      )
    +    )
    +    val catalog = mock(classOf[ExternalCatalog])
    --- End diff --
    
    Good idea


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r104188272
  
    --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/annotation/ExternalCatalogCompatible.java ---
    @@ -0,0 +1,63 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.annotation;
    +
    +import org.apache.flink.annotation.Public;
    +import org.apache.flink.table.catalog.TableSourceConverter;
    +
    +import java.lang.annotation.Documented;
    +import java.lang.annotation.ElementType;
    +import java.lang.annotation.Retention;
    +import java.lang.annotation.RetentionPolicy;
    +import java.lang.annotation.Target;
    +
    +/**
    + * A tableSource with this annotation represents it is compatible with external catalog, that is,
    + * an instance of this tableSource can be converted to or converted from external catalog table
    + * instance.
    + * The annotation contains the following information:
    + * <ul>
    + * <li> external catalog table type name for this kind of tableSource </li>
    + * <li> external catalog table <-> tableSource converter class </li>
    + * </ul>
    + */
    +@Documented
    +@Target(ElementType.TYPE)
    +@Retention(RetentionPolicy.RUNTIME)
    +@Public
    +public @interface ExternalCatalogCompatible {
    --- End diff --
    
    I think we should add a version field here as well to support multiple versions of Converters.
    This might become important when the `TableSource` evolves or if we have different `TableSources` or for the same table type. 
    
    So either 
    - `[$tableType:$version -> TableConverter]+`, which would allow support for different table types or 
    - `$tableType, [$version -> TableConverter]+`, which limits a `TableSource` to one table type but multiple versions of it.
    
    What do you think @beyond1920, @KurtYoung ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r105137846
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalog.scala ---
    @@ -0,0 +1,114 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.catalog
    +
    +import java.util
    +
    +/**
    +  * This class is responsible for interact with external catalog.
    +  * Its main responsibilities including:
    +  * <ul>
    +  * <li> create/drop/alter database or tables for DDL operations
    +  * <li> provide tables for calcite catalog, it looks up databases or tables in the external catalog
    +  * </ul>
    +  */
    +trait ExternalCatalog {
    +
    +  /**
    +    * Adds table into external Catalog
    +    *
    +    * @param table          description of table which to create
    +    * @param ignoreIfExists whether to ignore operation if table already exists
    --- End diff --
    
    The current description suggests that the existing table is overridden if it exists and the flag is set to false.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by beyond1920 <gi...@git.apache.org>.
Github user beyond1920 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r104371759
  
    --- Diff: flink-libraries/flink-table/pom.xml ---
    @@ -92,7 +92,12 @@ under the License.
     			</exclusions>
     		</dependency>
     
    -
    +		<dependency>
    --- End diff --
    
    fabian,about reflections jar, its license is WTFPL, WTFPL only says 'Everyone is permitted to copy and distribute verbatim or modified
    copies of this license document', however it does not say it compatible with ASL explicitly.
    Besides, I also want to include the class of this jar into the generated flink-table-{version}.jar, just like calcite class. I don\u2019t know whether it is allowed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r106347964
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala ---
    @@ -0,0 +1,76 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.catalog
    +
    +import java.util.{HashMap => JHashMap, Map => JMap}
    +import java.lang.{Long => JLong}
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.table.plan.stats.TableStats
    +
    +/**
    +  * Table definition of the external catalog.
    +  *
    +  * @param identifier           identifier of external catalog table, including dbName and tableName
    +  * @param tableType            type of external catalog table, e.g csv, hbase, kafka
    +  * @param schema               schema of table data, including column names and column types
    +  * @param properties           properties of external catalog table
    +  * @param stats                statistics of external catalog table
    +  * @param comment              comment of external catalog table
    +  * @param createTime           create time of external catalog table
    +  * @param lastAccessTime       last access time of of external catalog table
    +  */
    +case class ExternalCatalogTable(
    +    identifier: TableIdentifier,
    +    tableType: String,
    +    schema: DataSchema,
    --- End diff --
    
    I would call this `ColumnSchema`, `DataSchema` sounds very generic.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3406: [flink-5568] [Table API & SQL]Introduce interface for cat...

Posted by beyond1920 <gi...@git.apache.org>.
Github user beyond1920 commented on the issue:

    https://github.com/apache/flink/pull/3406
  
    @fhueske, thanks for your review. I changed the pr based on your suggestions, except for one point.
    About adding the version field to ExternalCatalogCompatible, could we define tableType is identifier, it includes version information. For example, kafka0.8/ kafka0.9 / kafka1.0 or hive1.0/ hive2.0.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by beyond1920 <gi...@git.apache.org>.
Github user beyond1920 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r106648264
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala ---
    @@ -0,0 +1,76 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.catalog
    +
    +import java.util.{HashMap => JHashMap, Map => JMap}
    +import java.lang.{Long => JLong}
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.table.plan.stats.TableStats
    +
    +/**
    +  * Table definition of the external catalog.
    +  *
    +  * @param identifier           identifier of external catalog table, including dbName and tableName
    +  * @param tableType            type of external catalog table, e.g csv, hbase, kafka
    +  * @param schema               schema of table data, including column names and column types
    +  * @param properties           properties of external catalog table
    +  * @param stats                statistics of external catalog table
    +  * @param comment              comment of external catalog table
    +  * @param createTime           create time of external catalog table
    +  * @param lastAccessTime       last access time of of external catalog table
    +  */
    +case class ExternalCatalogTable(
    +    identifier: TableIdentifier,
    +    tableType: String,
    +    schema: DataSchema,
    --- End diff --
    
    right, thanks for reminder me.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by beyond1920 <gi...@git.apache.org>.
Github user beyond1920 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r104333889
  
    --- Diff: flink-libraries/flink-table/pom.xml ---
    @@ -92,7 +92,12 @@ under the License.
     			</exclusions>
     		</dependency>
     
    -
    +		<dependency>
    --- End diff --
    
    Yes, this problem bothers me, too. the license of reflections jar is  WTFPL, does that means we would do anything, how to check whether it is compatible with AL2? I noticed that the reflections jar is already referenced in  flink-parent.pom, but only for tests, and would not be included in any flink jars. My original thought is not only use this jar, but also include it (like calcit) in the flink-table-{version}.jar. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r105138617
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala ---
    @@ -0,0 +1,73 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.catalog
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.table.plan.stats.TableStats
    +
    +/**
    +  * Table definition of the external catalog.
    +  *
    +  * @param identifier           identifier of external catalog table, including dbName and tableName
    +  * @param tableType            type of external catalog table, e.g csv, hbase, kafka
    +  * @param schema               schema of table data, including column names and column types
    +  * @param properties           properties of external catalog table
    +  * @param stats                statistics of external catalog table
    +  * @param comment              comment of external catalog table
    +  * @param createTime           create time of external catalog table
    +  * @param lastAccessTime       last access time of of external catalog table
    +  */
    +case class ExternalCatalogTable(
    +    identifier: TableIdentifier,
    +    tableType: String,
    +    schema: DataSchema,
    +    properties: Map[String, String] = Map.empty,
    --- End diff --
    
    Java `Map`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r106348421
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala ---
    @@ -0,0 +1,153 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.catalog
    +
    +import java.lang.reflect.Modifier
    +import java.net.URL
    +
    +import org.apache.commons.configuration.{ConfigurationException, ConversionException, PropertiesConfiguration}
    +import org.apache.flink.table.annotation.TableType
    +import org.apache.flink.table.api.{AmbiguousTableSourceConverterException, NoMatchedTableSourceConverterException}
    +import org.apache.flink.table.plan.schema.TableSourceTable
    +import org.apache.flink.table.plan.stats.FlinkStatistic
    +import org.apache.flink.table.sources.TableSource
    +import org.apache.flink.util.InstantiationUtil
    +import org.reflections.Reflections
    +import org.slf4j.{Logger, LoggerFactory}
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable
    +
    +/**
    +  * The utility class is used to convert ExternalCatalogTable to TableSourceTable.
    +  */
    +object ExternalTableSourceUtil {
    +
    +  // config file to specify scan package to search TableSourceConverter
    +  private val tableSourceConverterConfigFileName = "tableSourceConverter.properties"
    +
    +  private val LOG: Logger = LoggerFactory.getLogger(this.getClass)
    +
    +  // registered table type with the TableSourceConverter.
    +  // Key is table type name, Value is set of converter class.
    +  private val tableTypeToTableSourceConvertersClazz = {
    +    val registeredConverters =
    +      new mutable.HashMap[String, mutable.Set[Class[_ <: TableSourceConverter[_]]]]
    +          with mutable.MultiMap[String, Class[_ <: TableSourceConverter[_]]]
    +    // scan all config files to find TableSourceConverters which are annotationed with TableType.
    +    val resourceUrls = getClass.getClassLoader.getResources(tableSourceConverterConfigFileName)
    +    while (resourceUrls.hasMoreElements) {
    +      val url = resourceUrls.nextElement()
    +      val scanPackages = parseScanPackagesFromConfigFile(url)
    +      scanPackages.foreach(scanPackage => {
    +        val clazzWithAnnotations = new Reflections(scanPackage)
    +            .getTypesAnnotatedWith(classOf[TableType])
    +        clazzWithAnnotations.asScala.foreach(clazz =>
    +          if (classOf[TableSourceConverter[_]].isAssignableFrom(clazz)) {
    +            if (Modifier.isAbstract(clazz.getModifiers()) ||
    --- End diff --
    
    Can non-static inner classes be a problem here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by beyond1920 <gi...@git.apache.org>.
Github user beyond1920 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r106395036
  
    --- Diff: flink-libraries/flink-table/src/main/resources/tableSourceConverter.properties ---
    @@ -0,0 +1,19 @@
    +################################################################################
    +#  Licensed to the Apache Software Foundation (ASF) under one
    +#  or more contributor license agreements.  See the NOTICE file
    +#  distributed with this work for additional information
    +#  regarding copyright ownership.  The ASF licenses this file
    +#  to you under the Apache License, Version 2.0 (the
    +#  "License"); you may not use this file except in compliance
    +#  with the License.  You may obtain a copy of the License at
    +#
    +#      http://www.apache.org/licenses/LICENSE-2.0
    +#
    +#  Unless required by applicable law or agreed to in writing, software
    +#  distributed under the License is distributed on an "AS IS" BASIS,
    +#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +#  See the License for the specific language governing permissions and
    +# limitations under the License.
    +################################################################################
    +
    --- End diff --
    
    Ok.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r105159600
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala ---
    @@ -0,0 +1,90 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.catalog
    +
    +import java.util.Collections
    +
    +import com.google.common.collect.Lists
    +import org.apache.calcite.jdbc.CalciteSchema
    +import org.apache.calcite.prepare.CalciteCatalogReader
    +import org.apache.calcite.schema.SchemaPlus
    +import org.apache.calcite.sql.validate.SqlMonikerType
    +import org.apache.commons.collections.CollectionUtils
    +import org.apache.flink.table.calcite.{FlinkTypeFactory, FlinkTypeSystem}
    +import org.apache.flink.table.plan.schema.TableSourceTable
    +import org.apache.flink.table.sources.CsvTableSource
    +import org.apache.flink.table.utils.CommonTestData
    +import org.junit.{Before, Test}
    +import org.junit.Assert._
    +
    +class ExternalCatalogSchemaTest {
    +
    +  private val schemaName: String = "test"
    +  private var externalCatalogSchema: ExternalCatalogSchema = _
    +  private var calciteCatalogReader: CalciteCatalogReader = _
    +  private val db = "db1"
    +  private val tb = "tb1"
    +
    +  @Before
    +  def setUp(): Unit = {
    +    val rootSchemaPlus: SchemaPlus = CalciteSchema.createRootSchema(true, false).plus()
    +    val catalog = CommonTestData.getMockedFlinkExternalCatalog
    +    externalCatalogSchema = ExternalCatalogSchema.create(rootSchemaPlus, schemaName, catalog)
    +    val typeFactory = new FlinkTypeFactory(new FlinkTypeSystem())
    +    calciteCatalogReader = new CalciteCatalogReader(
    +      CalciteSchema.from(rootSchemaPlus),
    +      false,
    +      Collections.emptyList(),
    +      typeFactory)
    +  }
    +
    +  @Test
    +  def testGetSubSchema(): Unit = {
    +    val allSchemaObjectNames = calciteCatalogReader
    +        .getAllSchemaObjectNames(Lists.newArrayList(schemaName))
    +    assertTrue(allSchemaObjectNames.size() == 2)
    +    assertEquals(SqlMonikerType.SCHEMA, allSchemaObjectNames.get(0).getType)
    --- End diff --
    
    Check both entries


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r104371681
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala ---
    @@ -0,0 +1,166 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.catalog
    +
    +import java.util
    +
    +import org.apache.calcite.linq4j.tree.Expression
    +import org.apache.calcite.schema._
    +import org.apache.flink.table.api.{DatabaseNotExistException, TableNotExistException}
    +import org.slf4j.{Logger, LoggerFactory}
    +
    +import scala.collection.JavaConverters._
    +
    +/**
    +  * This class is responsible for connect external catalog to calcite catalog.
    +  * In this way, it is possible to look-up and access tables in SQL queries
    +  * without registering tables in advance.
    +  * The databases in the external catalog registers as calcite sub-Schemas of current schema.
    +  * The tables in a given database registers as calcite tables
    +  * of the [[ExternalCatalogDatabaseSchema]].
    +  *
    +  * @param catalogIdentifier external catalog name
    +  * @param catalog           external catalog
    +  */
    +class ExternalCatalogSchema(
    +    catalogIdentifier: String,
    +    catalog: ExternalCatalog) extends Schema {
    +
    +  private val LOG: Logger = LoggerFactory.getLogger(this.getClass)
    +
    +  /**
    +    * Looks up database by the given sub-schema name in the external catalog,
    +    * returns it Wrapped in a [[ExternalCatalogDatabaseSchema]] with the given database name.
    +    *
    +    * @param name Sub-schema name
    +    * @return Sub-schema with a given name, or null
    +    */
    +  override def getSubSchema(name: String): Schema = {
    +    try {
    +      val db = catalog.getDatabase(name)
    +      if (db != null) {
    +        new ExternalCatalogDatabaseSchema(db.dbName, catalog)
    +      } else {
    +        null
    +      }
    +    } catch {
    +      case e: DatabaseNotExistException =>
    +        LOG.warn(s"database $name does not exist in externalCatalog $catalogIdentifier")
    +        null
    +    }
    +  }
    +
    +  /**
    +    * Lists the databases of the external catalog,
    +    * returns the lists as the names of this schema's sub-schemas.
    +    *
    +    * @return names of this schema's child schemas
    +    */
    +  override def getSubSchemaNames: util.Set[String] = catalog.listDatabases().toSet.asJava
    +
    +  override def getTable(name: String): Table = null
    +
    +  override def isMutable: Boolean = true
    +
    +  override def getFunctions(name: String): util.Collection[Function] =
    +    util.Collections.emptyList[Function]
    +
    +  override def getExpression(parentSchema: SchemaPlus, name: String): Expression =
    +    Schemas.subSchemaExpression(parentSchema, name, getClass)
    +
    +  override def getFunctionNames: util.Set[String] = util.Collections.emptySet[String]
    +
    +  override def getTableNames: util.Set[String] = util.Collections.emptySet[String]
    +
    +  override def contentsHaveChangedSince(lastCheck: Long, now: Long): Boolean = false
    --- End diff --
    
    Right, my understanding of the parameter is that Calcite asks "Has the content changed?" and if we say "false" it will look into the cache. If we tell Calcite "true" (the content has changed) it will go against the external catalog. That's why I was thinking that `true` is the right return value. Can you check Calcite's behavior for this function?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r105703767
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CRUDExternalCatalog.scala ---
    @@ -0,0 +1,110 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.catalog
    +
    +import org.apache.flink.table.api._
    +
    +/**
    +  * This class is responsible for interact with external catalog.
    +  * Its main responsibilities including:
    +  * <ul>
    +  * <li> create/drop/alter database or tables for DDL operations
    +  * <li> provide tables for calcite catalog, it looks up databases or tables in the external catalog
    +  * </ul>
    +  */
    +trait CRUDExternalCatalog extends ReadonlyExternalCatalog {
    --- End diff --
    
    Rename to `CrudExternalCatalog`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r105121440
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CatalogTableHelper.scala ---
    @@ -0,0 +1,138 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.catalog
    +
    +import java.io.IOException
    +import java.lang.reflect.Modifier
    +import java.net.URL
    +import java.util.Properties
    +
    +import org.apache.flink.table.annotation.ExternalCatalogCompatible
    +import org.apache.flink.table.api.ExternalCatalogTableTypeNotExistException
    +import org.apache.flink.table.plan.schema.TableSourceTable
    +import org.apache.flink.table.plan.stats.FlinkStatistic
    +import org.apache.flink.table.sources.TableSource
    +import org.apache.flink.util.InstantiationUtil
    +import org.reflections.Reflections
    +import org.slf4j.{Logger, LoggerFactory}
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.HashMap
    +
    +/**
    +  * The utility class is used to convert ExternalCatalogTable to TableSourceTable.
    +  */
    +object CatalogTableHelper {
    +
    +  // config file to specifier the scan package to search tableSources
    +  // which is compatible with external catalog.
    +  private val tableSourceConfigFileName = "externalCatalogTable.properties"
    --- End diff --
    
    Should we configure the path to scan via the `TableConfig` instead of a hard-coded property file?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r105712740
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala ---
    @@ -0,0 +1,146 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.catalog
    +
    +import java.io.IOException
    +import java.lang.reflect.Modifier
    +import java.net.URL
    +import java.util.Properties
    +
    +import org.apache.flink.table.annotation.TableType
    +import org.apache.flink.table.api.{AmbiguousTableSourceConverterException, NoMatchedTableSourceConverterException}
    +import org.apache.flink.table.plan.schema.TableSourceTable
    +import org.apache.flink.table.plan.stats.FlinkStatistic
    +import org.apache.flink.table.sources.TableSource
    +import org.apache.flink.util.InstantiationUtil
    +import org.reflections.Reflections
    +import org.slf4j.{Logger, LoggerFactory}
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable
    +
    +/**
    +  * The utility class is used to convert ExternalCatalogTable to TableSourceTable.
    +  */
    +object ExternalTableSourceUtil {
    +
    +  // config file to specify scan package to search TableSourceConverter
    +  private val tableSourceConverterConfigFileName = "tableSourceConverter.properties"
    +
    +  private val LOG: Logger = LoggerFactory.getLogger(this.getClass)
    +
    +  // registered table type with the TableSourceConverter.
    +  // Key is table type name, Value is set of converter class.
    +  private val tableTypeToTableSourceConvertersClazz = {
    +    val registeredConverters =
    +      new mutable.HashMap[String, mutable.Set[Class[_ <: TableSourceConverter[_]]]]
    +          with mutable.MultiMap[String, Class[_ <: TableSourceConverter[_]]]
    +    // scan all config files to find TableSourceConverters which are annotationed with TableType.
    +    val resourceUrls = getClass.getClassLoader.getResources(tableSourceConverterConfigFileName)
    +    while (resourceUrls.hasMoreElements) {
    +      val url = resourceUrls.nextElement()
    +      parseScanPackageFromConfigFile(url) match {
    +        case Some(scanPackage) =>
    +          val clazzWithAnnotations = new Reflections(scanPackage)
    +              .getTypesAnnotatedWith(classOf[TableType])
    +          clazzWithAnnotations.asScala.foreach(clazz =>
    +            if (classOf[TableSourceConverter[_]].isAssignableFrom(clazz)) {
    +              if (Modifier.isAbstract(clazz.getModifiers()) ||
    +                  Modifier.isInterface(clazz.getModifiers)) {
    +                LOG.warn(s"Class ${clazz.getName} is annotated with TableType " +
    +                    s"but an abstract class or interface.")
    +              } else {
    +                val tableTypeAnnotation: TableType =
    +                  clazz.getAnnotation(classOf[TableType])
    +                val tableType = tableTypeAnnotation.value()
    +                val converterClazz = clazz.asInstanceOf[Class[_ <: TableSourceConverter[_]]]
    +                registeredConverters.addBinding(tableType, converterClazz)
    +                LOG.info(s"Registers the converter ${clazz.getName} to table type [$tableType]. ")
    +              }
    +            } else {
    +              LOG.warn(
    +                s"Class ${clazz.getName} is annotated with TableType, " +
    +                    s"but does not implement the TableSourceConverter interface.")
    +            }
    +          )
    +        case None =>
    +          LOG.warn(s"Fail to get scan package from config file [$url].")
    +      }
    +    }
    +    registeredConverters
    +  }
    +
    +  /**
    +    * Converts an [[ExternalCatalogTable]] instance to a [[TableSourceTable]] instance
    +    *
    +    * @param externalCatalogTable the [[ExternalCatalogTable]] instance which to convert
    +    * @return converted [[TableSourceTable]] instance from the input catalog table
    +    */
    +  def fromExternalCatalogTable(externalCatalogTable: ExternalCatalogTable): TableSourceTable[_] = {
    +    val tableType = externalCatalogTable.tableType
    +    val propertyKeys = externalCatalogTable.properties.keySet()
    +    tableTypeToTableSourceConvertersClazz.get(tableType) match {
    +      case Some(converterClasses) =>
    +        val matchedConverters = converterClasses.map(InstantiationUtil.instantiate(_))
    +            .filter(converter => propertyKeys.containsAll(converter.requiredProperties))
    +        if (matchedConverters.isEmpty) {
    +          LOG.error(s"Cannot find any TableSourceConverter binded to table type [$tableType]. " +
    --- End diff --
    
    Mention that the required properties might not have matched.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by beyond1920 <gi...@git.apache.org>.
Github user beyond1920 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r104401203
  
    --- Diff: flink-libraries/flink-table/pom.xml ---
    @@ -92,7 +92,12 @@ under the License.
     			</exclusions>
     		</dependency>
     
    -
    +		<dependency>
    --- End diff --
    
    thanks a lot, I would exclude com.google.code.findbugs.* which license is LGPL.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r105124247
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CatalogTableHelper.scala ---
    @@ -0,0 +1,138 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.catalog
    +
    +import java.io.IOException
    +import java.lang.reflect.Modifier
    +import java.net.URL
    +import java.util.Properties
    +
    +import org.apache.flink.table.annotation.ExternalCatalogCompatible
    +import org.apache.flink.table.api.ExternalCatalogTableTypeNotExistException
    +import org.apache.flink.table.plan.schema.TableSourceTable
    +import org.apache.flink.table.plan.stats.FlinkStatistic
    +import org.apache.flink.table.sources.TableSource
    +import org.apache.flink.util.InstantiationUtil
    +import org.reflections.Reflections
    +import org.slf4j.{Logger, LoggerFactory}
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.HashMap
    +
    +/**
    +  * The utility class is used to convert ExternalCatalogTable to TableSourceTable.
    +  */
    +object CatalogTableHelper {
    --- End diff --
    
    Rename to `ExternalTableSourceUtil`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by beyond1920 <gi...@git.apache.org>.
Github user beyond1920 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r105362031
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CatalogTableHelper.scala ---
    @@ -0,0 +1,138 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.catalog
    +
    +import java.io.IOException
    +import java.lang.reflect.Modifier
    +import java.net.URL
    +import java.util.Properties
    +
    +import org.apache.flink.table.annotation.ExternalCatalogCompatible
    +import org.apache.flink.table.api.ExternalCatalogTableTypeNotExistException
    +import org.apache.flink.table.plan.schema.TableSourceTable
    +import org.apache.flink.table.plan.stats.FlinkStatistic
    +import org.apache.flink.table.sources.TableSource
    +import org.apache.flink.util.InstantiationUtil
    +import org.reflections.Reflections
    +import org.slf4j.{Logger, LoggerFactory}
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.HashMap
    +
    +/**
    +  * The utility class is used to convert ExternalCatalogTable to TableSourceTable.
    +  */
    +object CatalogTableHelper {
    +
    +  // config file to specifier the scan package to search tableSources
    +  // which is compatible with external catalog.
    +  private val tableSourceConfigFileName = "externalCatalogTable.properties"
    --- End diff --
    
    Hi, fabian. I'm a little confused. Which method do you means?
    1. we could add a pkgsToScan field to TableConfig, it's response of Users to specify the value of pkgsToScan field. such as tableConfig.setPkgsToScan("org.apache.flink.table.sources","org.apache.flink.streaming.connectors.kafka")?
    Then users should know exactly which module every needed converter belongs to. 
    2. We could add a converterConfigFileName in TableConfig, it's response of Users to specify the value of pkgsToScan field. such as tableConfig.setConverterConfigFileName("externalCatalogTable.properties")?
    The way is a little strange because the file name is fixed based on the convention, may not changed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3406: [flink-5568] [Table API & SQL]Introduce interface for cat...

Posted by KurtYoung <gi...@git.apache.org>.
Github user KurtYoung commented on the issue:

    https://github.com/apache/flink/pull/3406
  
    Hi @fhueske , i like your propose about moving the annotation from `TableSource` to `TableSourceConverter`. Lets do it this way. 
    BTW, i noticed that you offered three possible methods to the `TableSourceConverter`, i can only imagine `def requiredProperties: Array[String]
    ` is necessary for now. It can help validating the converter and to decide which converter we should use when multiple converters have the same `TableType`. 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r104159480
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CatalogTableHelper.scala ---
    @@ -0,0 +1,139 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.catalog
    +
    +import java.io.IOException
    +import java.lang.reflect.Modifier
    +import java.net.URL
    +import java.util.Properties
    +
    +import org.apache.flink.table.annotation.ExternalCatalogCompatible
    +import org.apache.flink.table.api.{ExternalCatalogTableTypeAlreadyExistException, ExternalCatalogTableTypeNotExistException}
    +import org.apache.flink.table.plan.schema.TableSourceTable
    +import org.apache.flink.table.plan.stats.FlinkStatistic
    +import org.apache.flink.table.sources.TableSource
    +import org.apache.flink.util.InstantiationUtil
    +import org.reflections.Reflections
    +import org.slf4j.{Logger, LoggerFactory}
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.HashMap
    +
    +/**
    +  * The utility class is used to convert ExternalCatalogTable to TableSourceTable.
    +  */
    +object CatalogTableHelper {
    +
    +  // config file to specifier the scan package to search tableSources
    +  // which is compatible with external catalog.
    +  private val tableSourceConfigFileName = "externalcatalogTable.properties"
    +
    +  private val LOG: Logger = LoggerFactory.getLogger(this.getClass)
    +
    +  // registered tableSources which are compatible with external catalog.
    +  // Key is table type name, Value is converter class.
    +  private val tableSourceTypeToConvertersClazz = {
    +    val registeredConverters =
    +      new HashMap[String, Class[_ <: TableSourceConverter[_]]]
    +    // scan all config file to find all tableSources which are compatible with external catalog.
    +    val resourceUrls = getClass.getClassLoader.getResources(tableSourceConfigFileName)
    +    while (resourceUrls.hasMoreElements) {
    +      val url = resourceUrls.nextElement()
    +      parseScanPackageFromConfigFile(url) match {
    +        case Some(scanPackage) =>
    +          val clazzWithAnnotations = new Reflections(scanPackage)
    +              .getTypesAnnotatedWith(classOf[ExternalCatalogCompatible])
    +          clazzWithAnnotations.asScala.foreach(clazz =>
    +            if (classOf[TableSource[_]].isAssignableFrom(clazz)) {
    +              if (Modifier.isAbstract(clazz.getModifiers()) ||
    +                  Modifier.isInterface(clazz.getModifiers)) {
    +                LOG.warn(
    +                  s"class :[${clazz.getName}] is also with ExternalCatalogCompatible annotation, " +
    --- End diff --
    
    `"Class ${clazz.getName} is annotated with ExternalCatalogCompatible but an abstract class or interface."`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by beyond1920 <gi...@git.apache.org>.
Github user beyond1920 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r106403723
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala ---
    @@ -0,0 +1,153 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.catalog
    +
    +import java.lang.reflect.Modifier
    +import java.net.URL
    +
    +import org.apache.commons.configuration.{ConfigurationException, ConversionException, PropertiesConfiguration}
    +import org.apache.flink.table.annotation.TableType
    +import org.apache.flink.table.api.{AmbiguousTableSourceConverterException, NoMatchedTableSourceConverterException}
    +import org.apache.flink.table.plan.schema.TableSourceTable
    +import org.apache.flink.table.plan.stats.FlinkStatistic
    +import org.apache.flink.table.sources.TableSource
    +import org.apache.flink.util.InstantiationUtil
    +import org.reflections.Reflections
    +import org.slf4j.{Logger, LoggerFactory}
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable
    +
    +/**
    +  * The utility class is used to convert ExternalCatalogTable to TableSourceTable.
    +  */
    +object ExternalTableSourceUtil {
    +
    +  // config file to specify scan package to search TableSourceConverter
    +  private val tableSourceConverterConfigFileName = "tableSourceConverter.properties"
    +
    +  private val LOG: Logger = LoggerFactory.getLogger(this.getClass)
    +
    +  // registered table type with the TableSourceConverter.
    +  // Key is table type name, Value is set of converter class.
    +  private val tableTypeToTableSourceConvertersClazz = {
    +    val registeredConverters =
    +      new mutable.HashMap[String, mutable.Set[Class[_ <: TableSourceConverter[_]]]]
    +          with mutable.MultiMap[String, Class[_ <: TableSourceConverter[_]]]
    +    // scan all config files to find TableSourceConverters which are annotationed with TableType.
    +    val resourceUrls = getClass.getClassLoader.getResources(tableSourceConverterConfigFileName)
    +    while (resourceUrls.hasMoreElements) {
    +      val url = resourceUrls.nextElement()
    +      val scanPackages = parseScanPackagesFromConfigFile(url)
    +      scanPackages.foreach(scanPackage => {
    +        val clazzWithAnnotations = new Reflections(scanPackage)
    +            .getTypesAnnotatedWith(classOf[TableType])
    +        clazzWithAnnotations.asScala.foreach(clazz =>
    +          if (classOf[TableSourceConverter[_]].isAssignableFrom(clazz)) {
    +            if (Modifier.isAbstract(clazz.getModifiers()) ||
    --- End diff --
    
    Yes, I would add check on it based on InstantiationUtil.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by beyond1920 <gi...@git.apache.org>.
Github user beyond1920 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r104401325
  
    --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/annotation/ExternalCatalogCompatible.java ---
    @@ -0,0 +1,63 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.annotation;
    +
    +import org.apache.flink.annotation.Public;
    +import org.apache.flink.table.catalog.TableSourceConverter;
    +
    +import java.lang.annotation.Documented;
    +import java.lang.annotation.ElementType;
    +import java.lang.annotation.Retention;
    +import java.lang.annotation.RetentionPolicy;
    +import java.lang.annotation.Target;
    +
    +/**
    + * A tableSource with this annotation represents it is compatible with external catalog, that is,
    + * an instance of this tableSource can be converted to or converted from external catalog table
    + * instance.
    + * The annotation contains the following information:
    + * <ul>
    + * <li> external catalog table type name for this kind of tableSource </li>
    + * <li> external catalog table <-> tableSource converter class </li>
    + * </ul>
    + */
    +@Documented
    +@Target(ElementType.TYPE)
    +@Retention(RetentionPolicy.RUNTIME)
    +@Public
    +public @interface ExternalCatalogCompatible {
    --- End diff --
    
    could we define tableType is identifier, it includes version information.  For example, kafka0.8/ kafka0.9 / kafka1.0 or hive1.0/ hive2.0.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r105141370
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalog.scala ---
    @@ -0,0 +1,114 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.catalog
    +
    +import java.util
    +
    +/**
    +  * This class is responsible for interact with external catalog.
    +  * Its main responsibilities including:
    +  * <ul>
    +  * <li> create/drop/alter database or tables for DDL operations
    +  * <li> provide tables for calcite catalog, it looks up databases or tables in the external catalog
    +  * </ul>
    +  */
    +trait ExternalCatalog {
    +
    +  /**
    +    * Adds table into external Catalog
    +    *
    +    * @param table          description of table which to create
    +    * @param ignoreIfExists whether to ignore operation if table already exists
    +    */
    +  def createTable(table: ExternalCatalogTable, ignoreIfExists: Boolean): Unit
    +
    +  /**
    +    * Deletes table from external Catalog
    +    *
    +    * @param dbName            database name
    +    * @param tableName         table name
    +    * @param ignoreIfNotExists whether to ignore operation if table not exist yet
    +    */
    +  def dropTable(dbName: String, tableName: String, ignoreIfNotExists: Boolean): Unit
    +
    +  /**
    +    * Modifies an existing table in the external catalog
    +    *
    +    * @param table             description of table which to modify
    +    * @param ignoreIfNotExists whether to ignore operation if table not exist yet
    +    */
    +  def alterTable(table: ExternalCatalogTable, ignoreIfNotExists: Boolean): Unit
    +
    +  /**
    +    * Gets table from external Catalog
    +    *
    +    * @param dbName    database name
    +    * @param tableName table name
    +    * @return table
    +    */
    +  def getTable(dbName: String, tableName: String): ExternalCatalogTable
    +
    +  /**
    +    * Gets the table name lists from current external Catalog
    +    *
    +    * @param dbName database name
    +    * @return lists of table name
    +    */
    +  def listTables(dbName: String): util.List[String]
    --- End diff --
    
    Do we expect `null` or an exception if the database does not exist?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r104160007
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CatalogTableHelper.scala ---
    @@ -0,0 +1,139 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.catalog
    +
    +import java.io.IOException
    +import java.lang.reflect.Modifier
    +import java.net.URL
    +import java.util.Properties
    +
    +import org.apache.flink.table.annotation.ExternalCatalogCompatible
    +import org.apache.flink.table.api.{ExternalCatalogTableTypeAlreadyExistException, ExternalCatalogTableTypeNotExistException}
    +import org.apache.flink.table.plan.schema.TableSourceTable
    +import org.apache.flink.table.plan.stats.FlinkStatistic
    +import org.apache.flink.table.sources.TableSource
    +import org.apache.flink.util.InstantiationUtil
    +import org.reflections.Reflections
    +import org.slf4j.{Logger, LoggerFactory}
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.HashMap
    +
    +/**
    +  * The utility class is used to convert ExternalCatalogTable to TableSourceTable.
    +  */
    +object CatalogTableHelper {
    +
    +  // config file to specifier the scan package to search tableSources
    +  // which is compatible with external catalog.
    +  private val tableSourceConfigFileName = "externalcatalogTable.properties"
    +
    +  private val LOG: Logger = LoggerFactory.getLogger(this.getClass)
    +
    +  // registered tableSources which are compatible with external catalog.
    +  // Key is table type name, Value is converter class.
    +  private val tableSourceTypeToConvertersClazz = {
    +    val registeredConverters =
    +      new HashMap[String, Class[_ <: TableSourceConverter[_]]]
    +    // scan all config file to find all tableSources which are compatible with external catalog.
    +    val resourceUrls = getClass.getClassLoader.getResources(tableSourceConfigFileName)
    +    while (resourceUrls.hasMoreElements) {
    +      val url = resourceUrls.nextElement()
    +      parseScanPackageFromConfigFile(url) match {
    +        case Some(scanPackage) =>
    +          val clazzWithAnnotations = new Reflections(scanPackage)
    +              .getTypesAnnotatedWith(classOf[ExternalCatalogCompatible])
    +          clazzWithAnnotations.asScala.foreach(clazz =>
    +            if (classOf[TableSource[_]].isAssignableFrom(clazz)) {
    +              if (Modifier.isAbstract(clazz.getModifiers()) ||
    +                  Modifier.isInterface(clazz.getModifiers)) {
    +                LOG.warn(
    +                  s"class :[${clazz.getName}] is also with ExternalCatalogCompatible annotation, " +
    +                      s"but it's an abstract clazz or an interface.")
    +              } else {
    +                val externalCatalogCompatible: ExternalCatalogCompatible =
    +                  clazz.getAnnotation(classOf[ExternalCatalogCompatible])
    +                val tableSourceName = externalCatalogCompatible.tableType()
    +                if (registeredConverters.contains(tableSourceName)) {
    +                  LOG.error(s"The table type [$tableSourceName] is already registered.")
    +                  throw new ExternalCatalogTableTypeAlreadyExistException(tableSourceName)
    +                }
    +                registeredConverters.put(tableSourceName, externalCatalogCompatible.converter())
    +              }
    +            } else {
    +              LOG.warn(
    +                s"class :[${clazz.getName}] is also with ExternalCatalogCompatible annotation, " +
    +                    s"but it's not sub-class of tableSource.")
    +            }
    +          )
    +        case None =>
    +      }
    +    }
    +    registeredConverters
    +  }
    +
    +  /**
    +    * Converts an [[ExternalCatalogTable]] instance to a [[TableSourceTable]] instance
    +    *
    +    * @param externalCatalogTable the [[ExternalCatalogTable]] instance which to convert
    +    * @return converted [[TableSourceTable]] instance from the input catalog table
    +    */
    +  def fromExternalCatalogTable(externalCatalogTable: ExternalCatalogTable): TableSourceTable[_] = {
    +    val tableType = externalCatalogTable.tableType
    +    tableSourceTypeToConvertersClazz.get(tableType) match {
    +      case Some(converterClazz) =>
    +        val converter = InstantiationUtil.instantiate(converterClazz)
    +        val convertedTableSource: TableSource[_] =
    +          converter.fromExternalCatalogTable(externalCatalogTable) match {
    +            case ts: TableSource[_] => ts
    +            case _ => throw new RuntimeException(
    --- End diff --
    
    This case cannot happen since `fromExternalCatalogTable()` must return a `TableSource`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3406: [flink-5568] [Table API & SQL]Introduce interface for cat...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/3406
  
    Good point about the documentation. I think this should be added with PR #3409 when registering external catalogs is exposed via the `TableEnvironment`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r106349244
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceConverter.scala ---
    @@ -0,0 +1,73 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.sources
    +
    +import org.apache.flink.table.annotation.TableType
    +import org.apache.flink.table.catalog.{ExternalCatalogTable, TableSourceConverter}
    +
    +import scala.collection.JavaConverters._
    +import java.util.{Set => JSet}
    +
    +import com.google.common.collect.ImmutableSet
    +
    +/**
    +  * The class defines a converter used to convert [[CsvTableSource]] to
    +  * or from [[ExternalCatalogTable]].
    +  */
    +@TableType(value = "csv")
    +class CsvTableSourceConverter extends TableSourceConverter[CsvTableSource] {
    +
    +  private val required: JSet[String] = ImmutableSet.of("path", "fieldDelim", "rowDelim")
    +
    +  override def requiredProperties: JSet[String] = required
    +
    +  override def fromExternalCatalogTable(
    +      externalCatalogTable: ExternalCatalogTable): CsvTableSource = {
    +    val params = externalCatalogTable.properties.asScala
    +    val csvTableSourceBuilder = new CsvTableSource.Builder
    +
    +    params.get("path").foreach(csvTableSourceBuilder.path)
    +    params.get("fieldDelim").foreach(csvTableSourceBuilder.fieldDelimiter)
    +    params.get("rowDelim").foreach(csvTableSourceBuilder.lineDelimiter)
    +    params.get("quoteCharacter").foreach(quoteStr =>
    +      if (quoteStr.length != 1) {
    +        throw new IllegalArgumentException("the value of param quoteCharacter is invalid")
    --- End diff --
    
    This exception could be improved. "must only contain one character"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r104163154
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalog.scala ---
    @@ -0,0 +1,108 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.catalog
    +
    +/**
    +  * This class is responsible for interact with external catalog.
    +  * Its main responsibilities including:
    +  * <ul>
    +  * <li> create/drop/alter database or tables for DDL operations
    +  * <li> provide tables for calcite catalog, it looks up databases or tables in the external catalog
    +  * </ul>
    +  */
    +trait ExternalCatalog {
    +
    +  /**
    +    * Adds table into external Catalog
    +    *
    +    * @param table          description of table which to create
    +    * @param ignoreIfExists whether to ignore operation if table already exists
    +    */
    +  def createTable(table: ExternalCatalogTable, ignoreIfExists: Boolean): Unit
    +
    +  /**
    +    * Deletes table from external Catalog
    +    *
    +    * @param dbName            database name
    +    * @param tableName         table name
    +    * @param ignoreIfNotExists whether to ignore operation if table not exist yet
    +    */
    +  def dropTable(dbName: String, tableName: String, ignoreIfNotExists: Boolean): Unit
    +
    +  /**
    +    * Alters existed table into external Catalog
    --- End diff --
    
    "Modify an existing table in the external catalog"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r104194149
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala ---
    @@ -68,6 +62,90 @@ object CommonTestData {
         )
       }
     
    +  def getMockedFlinkExternalCatalog: ExternalCatalog = {
    +    val csvRecord1 = Seq(
    +      "1#1#Hi",
    +      "2#2#Hello",
    +      "3#2#Hello world"
    +    )
    +    val tempFilePath1 = writeToTempFile(csvRecord1.mkString("$"), "csv-test1", "tmp")
    +    val externalCatalogTable1 = ExternalCatalogTable(
    +      TableIdentifier("db1", "tb1"),
    +      "csv",
    +      DataSchema(
    +        Array(
    +          BasicTypeInfo.INT_TYPE_INFO,
    +          BasicTypeInfo.LONG_TYPE_INFO,
    +          BasicTypeInfo.STRING_TYPE_INFO),
    +        Array("a", "b", "c")
    +      ),
    +      properties = Map(
    +        "path" -> tempFilePath1,
    +        "fieldDelim" -> "#",
    +        "rowDelim" -> "$"
    +      )
    +    )
    +
    +    val csvRecord2 = Seq(
    +      "1#1#0#Hallo#1",
    +      "2#2#1#Hallo Welt#2",
    +      "2#3#2#Hallo Welt wie#1",
    +      "3#4#3#Hallo Welt wie gehts?#2",
    +      "3#5#4#ABC#2",
    +      "3#6#5#BCD#3",
    +      "4#7#6#CDE#2",
    +      "4#8#7#DEF#1",
    +      "4#9#8#EFG#1",
    +      "4#10#9#FGH#2",
    +      "5#11#10#GHI#1",
    +      "5#12#11#HIJ#3",
    +      "5#13#12#IJK#3",
    +      "5#14#13#JKL#2",
    +      "5#15#14#KLM#2"
    +    )
    +    val tempFilePath2 = writeToTempFile(csvRecord2.mkString("$"), "csv-test2", "tmp")
    +    val externalCatalogTable2 = ExternalCatalogTable(
    +      TableIdentifier("db2", "tb2"),
    +      "csv",
    +      DataSchema(
    +        Array(
    +          BasicTypeInfo.INT_TYPE_INFO,
    +          BasicTypeInfo.LONG_TYPE_INFO,
    +          BasicTypeInfo.INT_TYPE_INFO,
    +          BasicTypeInfo.STRING_TYPE_INFO,
    +          BasicTypeInfo.LONG_TYPE_INFO),
    +        Array("d", "e", "f", "g", "h")
    +      ),
    +      properties = Map(
    +        "path" -> tempFilePath2,
    +        "fieldDelim" -> "#",
    +        "rowDelim" -> "$"
    +      )
    +    )
    +    val catalog = mock(classOf[ExternalCatalog])
    --- End diff --
    
    Couldn't we use the `InMemoryExternalCatalog` here instead of mocking one?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r104158678
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CatalogTableHelper.scala ---
    @@ -0,0 +1,139 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.catalog
    +
    +import java.io.IOException
    +import java.lang.reflect.Modifier
    +import java.net.URL
    +import java.util.Properties
    +
    +import org.apache.flink.table.annotation.ExternalCatalogCompatible
    +import org.apache.flink.table.api.{ExternalCatalogTableTypeAlreadyExistException, ExternalCatalogTableTypeNotExistException}
    +import org.apache.flink.table.plan.schema.TableSourceTable
    +import org.apache.flink.table.plan.stats.FlinkStatistic
    +import org.apache.flink.table.sources.TableSource
    +import org.apache.flink.util.InstantiationUtil
    +import org.reflections.Reflections
    +import org.slf4j.{Logger, LoggerFactory}
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.HashMap
    +
    +/**
    +  * The utility class is used to convert ExternalCatalogTable to TableSourceTable.
    +  */
    +object CatalogTableHelper {
    +
    +  // config file to specifier the scan package to search tableSources
    +  // which is compatible with external catalog.
    +  private val tableSourceConfigFileName = "externalcatalogTable.properties"
    --- End diff --
    
    `externalCatalogTable` (uppercase `C`)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/3406


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r106346971
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CrudExternalCatalog.scala ---
    @@ -0,0 +1,110 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.catalog
    +
    +import org.apache.flink.table.api._
    +
    +/**
    +  * This class is responsible for interact with external catalog.
    +  * Its main responsibilities including:
    +  * <ul>
    +  * <li> create/drop/alter database or tables for DDL operations
    +  * <li> provide tables for calcite catalog, it looks up databases or tables in the external catalog
    +  * </ul>
    +  */
    +trait CrudExternalCatalog extends ExternalCatalog {
    +
    +  /**
    +    * Adds table into external Catalog
    +    *
    +    * @param table          description of table which to create
    +    * @param ignoreIfExists if table already exists in the catalog, not throw exception and leave
    +    *                       the existed table if ignoreIfExists is true;
    +    *                       else throw a TableAlreadyExistException.
    +    * @throws DatabaseNotExistException  if database does not exist in the catalog yet
    +    * @throws TableAlreadyExistException if table already exists in the catalog and
    +    *                                    ignoreIfExists is false
    +    */
    +  @throws[DatabaseNotExistException]
    +  @throws[TableAlreadyExistException]
    +  def createTable(table: ExternalCatalogTable, ignoreIfExists: Boolean): Unit
    +
    +  /**
    +    * Deletes table from external Catalog
    +    *
    +    * @param dbName            database name
    +    * @param tableName         table name
    +    * @param ignoreIfNotExists if table not exist yet, not throw exception if ignoreIfNotExists is
    +    *                          true; else throw TableNotExistException
    +    * @throws DatabaseNotExistException if database does not exist in the catalog yet
    +    * @throws TableNotExistException    if table does not exist in the catalog yet
    +    */
    +  @throws[DatabaseNotExistException]
    +  @throws[TableNotExistException]
    +  def dropTable(dbName: String, tableName: String, ignoreIfNotExists: Boolean): Unit
    --- End diff --
    
    Why is a database name necessary for dropping but not for creating or altering?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by beyond1920 <gi...@git.apache.org>.
Github user beyond1920 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r105316858
  
    --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/annotation/ExternalCatalogCompatible.java ---
    @@ -0,0 +1,63 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.annotation;
    +
    +import org.apache.flink.annotation.Public;
    +import org.apache.flink.table.catalog.TableSourceConverter;
    +
    +import java.lang.annotation.Documented;
    +import java.lang.annotation.ElementType;
    +import java.lang.annotation.Retention;
    +import java.lang.annotation.RetentionPolicy;
    +import java.lang.annotation.Target;
    +
    +/**
    + * A tableSource with this annotation represents it is compatible with external catalog, that is,
    + * an instance of this tableSource can be converted to or converted from external catalog table
    + * instance.
    + * The annotation contains the following information:
    + * <ul>
    + * <li> external catalog table type name for this kind of tableSource </li>
    + * <li> external catalog table <-> tableSource converter class </li>
    + * </ul>
    + */
    +@Documented
    +@Target(ElementType.TYPE)
    +@Retention(RetentionPolicy.RUNTIME)
    +@Public
    +public @interface ExternalCatalogCompatible {
    --- End diff --
    
    This suggestion is pretty good, thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r104176638
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala ---
    @@ -0,0 +1,166 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.catalog
    +
    +import java.util
    +
    +import org.apache.calcite.linq4j.tree.Expression
    +import org.apache.calcite.schema._
    +import org.apache.flink.table.api.{DatabaseNotExistException, TableNotExistException}
    +import org.slf4j.{Logger, LoggerFactory}
    +
    +import scala.collection.JavaConverters._
    +
    +/**
    +  * This class is responsible for connect external catalog to calcite catalog.
    +  * In this way, it is possible to look-up and access tables in SQL queries
    +  * without registering tables in advance.
    +  * The databases in the external catalog registers as calcite sub-Schemas of current schema.
    +  * The tables in a given database registers as calcite tables
    +  * of the [[ExternalCatalogDatabaseSchema]].
    +  *
    +  * @param catalogIdentifier external catalog name
    +  * @param catalog           external catalog
    +  */
    +class ExternalCatalogSchema(
    +    catalogIdentifier: String,
    +    catalog: ExternalCatalog) extends Schema {
    +
    +  private val LOG: Logger = LoggerFactory.getLogger(this.getClass)
    +
    +  /**
    +    * Looks up database by the given sub-schema name in the external catalog,
    +    * returns it Wrapped in a [[ExternalCatalogDatabaseSchema]] with the given database name.
    +    *
    +    * @param name Sub-schema name
    +    * @return Sub-schema with a given name, or null
    +    */
    +  override def getSubSchema(name: String): Schema = {
    +    try {
    +      val db = catalog.getDatabase(name)
    +      if (db != null) {
    +        new ExternalCatalogDatabaseSchema(db.dbName, catalog)
    +      } else {
    +        null
    +      }
    +    } catch {
    +      case e: DatabaseNotExistException =>
    +        LOG.warn(s"database $name does not exist in externalCatalog $catalogIdentifier")
    +        null
    +    }
    +  }
    +
    +  /**
    +    * Lists the databases of the external catalog,
    +    * returns the lists as the names of this schema's sub-schemas.
    +    *
    +    * @return names of this schema's child schemas
    +    */
    +  override def getSubSchemaNames: util.Set[String] = catalog.listDatabases().toSet.asJava
    +
    +  override def getTable(name: String): Table = null
    +
    +  override def isMutable: Boolean = true
    +
    +  override def getFunctions(name: String): util.Collection[Function] =
    +    util.Collections.emptyList[Function]
    +
    +  override def getExpression(parentSchema: SchemaPlus, name: String): Expression =
    +    Schemas.subSchemaExpression(parentSchema, name, getClass)
    +
    +  override def getFunctionNames: util.Set[String] = util.Collections.emptySet[String]
    +
    +  override def getTableNames: util.Set[String] = util.Collections.emptySet[String]
    +
    +  override def contentsHaveChangedSince(lastCheck: Long, now: Long): Boolean = false
    --- End diff --
    
    shouldn't the default be `true`? If Calcite caches schema information, we should return `true` to ensure we always have the most recent information, no?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r104158740
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CatalogTableHelper.scala ---
    @@ -0,0 +1,139 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.catalog
    +
    +import java.io.IOException
    +import java.lang.reflect.Modifier
    +import java.net.URL
    +import java.util.Properties
    +
    +import org.apache.flink.table.annotation.ExternalCatalogCompatible
    +import org.apache.flink.table.api.{ExternalCatalogTableTypeAlreadyExistException, ExternalCatalogTableTypeNotExistException}
    +import org.apache.flink.table.plan.schema.TableSourceTable
    +import org.apache.flink.table.plan.stats.FlinkStatistic
    +import org.apache.flink.table.sources.TableSource
    +import org.apache.flink.util.InstantiationUtil
    +import org.reflections.Reflections
    +import org.slf4j.{Logger, LoggerFactory}
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.HashMap
    +
    +/**
    +  * The utility class is used to convert ExternalCatalogTable to TableSourceTable.
    +  */
    +object CatalogTableHelper {
    +
    +  // config file to specifier the scan package to search tableSources
    +  // which is compatible with external catalog.
    +  private val tableSourceConfigFileName = "externalcatalogTable.properties"
    +
    +  private val LOG: Logger = LoggerFactory.getLogger(this.getClass)
    +
    +  // registered tableSources which are compatible with external catalog.
    +  // Key is table type name, Value is converter class.
    +  private val tableSourceTypeToConvertersClazz = {
    +    val registeredConverters =
    +      new HashMap[String, Class[_ <: TableSourceConverter[_]]]
    +    // scan all config file to find all tableSources which are compatible with external catalog.
    +    val resourceUrls = getClass.getClassLoader.getResources(tableSourceConfigFileName)
    +    while (resourceUrls.hasMoreElements) {
    +      val url = resourceUrls.nextElement()
    +      parseScanPackageFromConfigFile(url) match {
    +        case Some(scanPackage) =>
    +          val clazzWithAnnotations = new Reflections(scanPackage)
    +              .getTypesAnnotatedWith(classOf[ExternalCatalogCompatible])
    +          clazzWithAnnotations.asScala.foreach(clazz =>
    +            if (classOf[TableSource[_]].isAssignableFrom(clazz)) {
    +              if (Modifier.isAbstract(clazz.getModifiers()) ||
    +                  Modifier.isInterface(clazz.getModifiers)) {
    +                LOG.warn(
    +                  s"class :[${clazz.getName}] is also with ExternalCatalogCompatible annotation, " +
    +                      s"but it's an abstract clazz or an interface.")
    +              } else {
    +                val externalCatalogCompatible: ExternalCatalogCompatible =
    +                  clazz.getAnnotation(classOf[ExternalCatalogCompatible])
    +                val tableSourceName = externalCatalogCompatible.tableType()
    +                if (registeredConverters.contains(tableSourceName)) {
    +                  LOG.error(s"The table type [$tableSourceName] is already registered.")
    +                  throw new ExternalCatalogTableTypeAlreadyExistException(tableSourceName)
    +                }
    +                registeredConverters.put(tableSourceName, externalCatalogCompatible.converter())
    +              }
    +            } else {
    +              LOG.warn(
    +                s"class :[${clazz.getName}] is also with ExternalCatalogCompatible annotation, " +
    +                    s"but it's not sub-class of tableSource.")
    +            }
    +          )
    +        case None =>
    +      }
    +    }
    +    registeredConverters
    +  }
    +
    +  /**
    +    * Converts an [[ExternalCatalogTable]] instance to a [[TableSourceTable]] instance
    +    *
    +    * @param externalCatalogTable the [[ExternalCatalogTable]] instance which to convert
    +    * @return converted [[TableSourceTable]] instance from the input catalog table
    +    */
    +  def fromExternalCatalogTable(externalCatalogTable: ExternalCatalogTable): TableSourceTable[_] = {
    +    val tableType = externalCatalogTable.tableType
    +    tableSourceTypeToConvertersClazz.get(tableType) match {
    +      case Some(converterClazz) =>
    +        val converter = InstantiationUtil.instantiate(converterClazz)
    +        val convertedTableSource: TableSource[_] =
    +          converter.fromExternalCatalogTable(externalCatalogTable) match {
    +            case ts: TableSource[_] => ts
    +            case _ => throw new RuntimeException(
    +              s"the converted result from converter ${converterClazz.getName} " +
    +                  s"is not tableSource instance")
    +          }
    +
    +        val flinkStatistic = externalCatalogTable.stats match {
    +          case Some(stats) => FlinkStatistic.of(stats)
    +          case None => FlinkStatistic.UNKNOWN
    +        }
    +        new TableSourceTable(convertedTableSource, flinkStatistic)
    +      case None =>
    +        LOG.error(s"the table type [$tableType] does not exist.")
    --- End diff --
    
    `s"Cannot find TableSource to scan table of type [$tableType]. Register TableSources via externalCatalogTable.properties file."`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by beyond1920 <gi...@git.apache.org>.
Github user beyond1920 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r106395005
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala ---
    @@ -0,0 +1,76 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.catalog
    +
    +import java.util.{HashMap => JHashMap, Map => JMap}
    +import java.lang.{Long => JLong}
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.table.plan.stats.TableStats
    +
    +/**
    +  * Table definition of the external catalog.
    +  *
    +  * @param identifier           identifier of external catalog table, including dbName and tableName
    +  * @param tableType            type of external catalog table, e.g csv, hbase, kafka
    +  * @param schema               schema of table data, including column names and column types
    +  * @param properties           properties of external catalog table
    +  * @param stats                statistics of external catalog table
    +  * @param comment              comment of external catalog table
    +  * @param createTime           create time of external catalog table
    +  * @param lastAccessTime       last access time of of external catalog table
    +  */
    +case class ExternalCatalogTable(
    +    identifier: TableIdentifier,
    +    tableType: String,
    +    schema: DataSchema,
    --- End diff --
    
    ColumnSchema sounds like a schema to describe a column, not the schema to describe a table. Maybe TableSchema is better, however there is already a class named TableSchema. So I use DataSchema.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r105138498
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogDatabase.scala ---
    @@ -0,0 +1,29 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.catalog
    +
    +/**
    +  * Database definition of the external catalog.
    +  *
    +  * @param dbName     database name
    +  * @param properties database properties
    +  */
    +case class ExternalCatalogDatabase(
    +    dbName: String,
    +    properties: Map[String, String] = Map.empty)
    --- End diff --
    
    `Map` should be a Java `Map`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r104191562
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala ---
    @@ -68,6 +62,90 @@ object CommonTestData {
         )
       }
     
    +  def getMockedFlinkExternalCatalog: ExternalCatalog = {
    +    val csvRecord1 = Seq(
    --- End diff --
    
    `csvRecord1` -> `csvTable1`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by beyond1920 <gi...@git.apache.org>.
Github user beyond1920 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r105384191
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CatalogTableHelper.scala ---
    @@ -0,0 +1,138 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.catalog
    +
    +import java.io.IOException
    +import java.lang.reflect.Modifier
    +import java.net.URL
    +import java.util.Properties
    +
    +import org.apache.flink.table.annotation.ExternalCatalogCompatible
    +import org.apache.flink.table.api.ExternalCatalogTableTypeNotExistException
    +import org.apache.flink.table.plan.schema.TableSourceTable
    +import org.apache.flink.table.plan.stats.FlinkStatistic
    +import org.apache.flink.table.sources.TableSource
    +import org.apache.flink.util.InstantiationUtil
    +import org.reflections.Reflections
    +import org.slf4j.{Logger, LoggerFactory}
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.HashMap
    +
    +/**
    +  * The utility class is used to convert ExternalCatalogTable to TableSourceTable.
    +  */
    +object CatalogTableHelper {
    +
    +  // config file to specifier the scan package to search tableSources
    +  // which is compatible with external catalog.
    +  private val tableSourceConfigFileName = "externalCatalogTable.properties"
    --- End diff --
    
    fabian,  register all needed Converter classes instead of use scanning can work, too. However  if somebody adds new tableSourceConverters , such as parquetTableSourceConverter or else, users need to change the code or config file to including the new added Converter?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r105158798
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala ---
    @@ -68,6 +64,87 @@ object CommonTestData {
         )
       }
     
    +  def getMockedFlinkExternalCatalog: ExternalCatalog = {
    --- End diff --
    
    rename to `getInMemoryTestCatalog()`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by beyond1920 <gi...@git.apache.org>.
Github user beyond1920 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r104340593
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala ---
    @@ -0,0 +1,166 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.catalog
    +
    +import java.util
    +
    +import org.apache.calcite.linq4j.tree.Expression
    +import org.apache.calcite.schema._
    +import org.apache.flink.table.api.{DatabaseNotExistException, TableNotExistException}
    +import org.slf4j.{Logger, LoggerFactory}
    +
    +import scala.collection.JavaConverters._
    +
    +/**
    +  * This class is responsible for connect external catalog to calcite catalog.
    +  * In this way, it is possible to look-up and access tables in SQL queries
    +  * without registering tables in advance.
    +  * The databases in the external catalog registers as calcite sub-Schemas of current schema.
    +  * The tables in a given database registers as calcite tables
    +  * of the [[ExternalCatalogDatabaseSchema]].
    +  *
    +  * @param catalogIdentifier external catalog name
    +  * @param catalog           external catalog
    +  */
    +class ExternalCatalogSchema(
    +    catalogIdentifier: String,
    +    catalog: ExternalCatalog) extends Schema {
    +
    +  private val LOG: Logger = LoggerFactory.getLogger(this.getClass)
    +
    +  /**
    +    * Looks up database by the given sub-schema name in the external catalog,
    +    * returns it Wrapped in a [[ExternalCatalogDatabaseSchema]] with the given database name.
    +    *
    +    * @param name Sub-schema name
    +    * @return Sub-schema with a given name, or null
    +    */
    +  override def getSubSchema(name: String): Schema = {
    +    try {
    +      val db = catalog.getDatabase(name)
    +      if (db != null) {
    +        new ExternalCatalogDatabaseSchema(db.dbName, catalog)
    +      } else {
    +        null
    +      }
    +    } catch {
    +      case e: DatabaseNotExistException =>
    +        LOG.warn(s"database $name does not exist in externalCatalog $catalogIdentifier")
    +        null
    +    }
    +  }
    +
    +  /**
    +    * Lists the databases of the external catalog,
    +    * returns the lists as the names of this schema's sub-schemas.
    +    *
    +    * @return names of this schema's child schemas
    +    */
    +  override def getSubSchemaNames: util.Set[String] = catalog.listDatabases().toSet.asJava
    +
    +  override def getTable(name: String): Table = null
    +
    +  override def isMutable: Boolean = true
    +
    +  override def getFunctions(name: String): util.Collection[Function] =
    +    util.Collections.emptyList[Function]
    +
    +  override def getExpression(parentSchema: SchemaPlus, name: String): Expression =
    +    Schemas.subSchemaExpression(parentSchema, name, getClass)
    +
    +  override def getFunctionNames: util.Set[String] = util.Collections.emptySet[String]
    +
    +  override def getTableNames: util.Set[String] = util.Collections.emptySet[String]
    +
    +  override def contentsHaveChangedSince(lastCheck: Long, now: Long): Boolean = false
    --- End diff --
    
    set contentsHaveChangedSince to false because We want to fetch the latest informations from the underlying ExternalCatalog instead of using calcite caches. Because tables list and databases list of ExternalCatalog may changed anytime. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r104184922
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalog.scala ---
    @@ -0,0 +1,108 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.catalog
    +
    +/**
    +  * This class is responsible for interact with external catalog.
    +  * Its main responsibilities including:
    +  * <ul>
    +  * <li> create/drop/alter database or tables for DDL operations
    +  * <li> provide tables for calcite catalog, it looks up databases or tables in the external catalog
    +  * </ul>
    +  */
    +trait ExternalCatalog {
    +
    +  /**
    +    * Adds table into external Catalog
    +    *
    +    * @param table          description of table which to create
    +    * @param ignoreIfExists whether to ignore operation if table already exists
    +    */
    +  def createTable(table: ExternalCatalogTable, ignoreIfExists: Boolean): Unit
    +
    +  /**
    +    * Deletes table from external Catalog
    +    *
    +    * @param dbName            database name
    +    * @param tableName         table name
    +    * @param ignoreIfNotExists whether to ignore operation if table not exist yet
    +    */
    +  def dropTable(dbName: String, tableName: String, ignoreIfNotExists: Boolean): Unit
    +
    +  /**
    +    * Alters existed table into external Catalog
    +    *
    +    * @param table description of table which to alter
    +    */
    +  def alterTable(table: ExternalCatalogTable): Unit
    --- End diff --
    
    Add an `ignoreIfNotExists` option?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3406: [flink-5568] [Table API & SQL]Introduce interface for cat...

Posted by beyond1920 <gi...@git.apache.org>.
Github user beyond1920 commented on the issue:

    https://github.com/apache/flink/pull/3406
  
    @fhueske , thanks for your review. I updated the pr based on your comments.
    Your suggestion about moving the annotation from TableSource to TableSourceConverter is good, I changed the pr in this way.  
    About not use scanning at all but exactly specify the Converter classes. It can work, too. However, if somebody adds new tableSourceConverters , such as parquetTableSourceConverter or else, users need to change the code or config file to register new added Converters, right? However scanning method can finds all converters automatically.
    Let me know what's your opinion, thanks.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r105140265
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalog.scala ---
    @@ -0,0 +1,114 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.catalog
    +
    +import java.util
    +
    +/**
    +  * This class is responsible for interact with external catalog.
    +  * Its main responsibilities including:
    +  * <ul>
    +  * <li> create/drop/alter database or tables for DDL operations
    +  * <li> provide tables for calcite catalog, it looks up databases or tables in the external catalog
    +  * </ul>
    +  */
    +trait ExternalCatalog {
    +
    +  /**
    +    * Adds table into external Catalog
    +    *
    +    * @param table          description of table which to create
    +    * @param ignoreIfExists whether to ignore operation if table already exists
    +    */
    +  def createTable(table: ExternalCatalogTable, ignoreIfExists: Boolean): Unit
    --- End diff --
    
    I'm wondering whether the `createX`, `dropX`, and`alterX` methods are actually required.
    For the integration with Calcite, `getTable()`, `getDataBase()`, and `listTables()` are sufficient.
    I could imagine an `ExternalCatalog` which does not have the `createX`, `dropX`, and`alterX` methods but just the required ones.
    
    Should we separate this trait into two traits? The `ExternalCatalog` has only the require methods and a `DdlExternalCatalog` which extends the `ExternalCatalog` and offers the additional `createX`, `dropX`, and`alterX` methods?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by beyond1920 <gi...@git.apache.org>.
Github user beyond1920 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r106404460
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceConverter.scala ---
    @@ -0,0 +1,73 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.sources
    +
    +import org.apache.flink.table.annotation.TableType
    +import org.apache.flink.table.catalog.{ExternalCatalogTable, TableSourceConverter}
    +
    +import scala.collection.JavaConverters._
    +import java.util.{Set => JSet}
    +
    +import com.google.common.collect.ImmutableSet
    +
    +/**
    +  * The class defines a converter used to convert [[CsvTableSource]] to
    +  * or from [[ExternalCatalogTable]].
    +  */
    +@TableType(value = "csv")
    +class CsvTableSourceConverter extends TableSourceConverter[CsvTableSource] {
    +
    +  private val required: JSet[String] = ImmutableSet.of("path", "fieldDelim", "rowDelim")
    +
    +  override def requiredProperties: JSet[String] = required
    +
    +  override def fromExternalCatalogTable(
    +      externalCatalogTable: ExternalCatalogTable): CsvTableSource = {
    +    val params = externalCatalogTable.properties.asScala
    +    val csvTableSourceBuilder = new CsvTableSource.Builder
    +
    +    params.get("path").foreach(csvTableSourceBuilder.path)
    +    params.get("fieldDelim").foreach(csvTableSourceBuilder.fieldDelimiter)
    +    params.get("rowDelim").foreach(csvTableSourceBuilder.lineDelimiter)
    +    params.get("quoteCharacter").foreach(quoteStr =>
    +      if (quoteStr.length != 1) {
    +        throw new IllegalArgumentException("the value of param quoteCharacter is invalid")
    --- End diff --
    
    Ok


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by beyond1920 <gi...@git.apache.org>.
Github user beyond1920 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r106394128
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CrudExternalCatalog.scala ---
    @@ -0,0 +1,110 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.catalog
    +
    +import org.apache.flink.table.api._
    +
    +/**
    +  * This class is responsible for interact with external catalog.
    +  * Its main responsibilities including:
    +  * <ul>
    +  * <li> create/drop/alter database or tables for DDL operations
    +  * <li> provide tables for calcite catalog, it looks up databases or tables in the external catalog
    +  * </ul>
    +  */
    +trait CrudExternalCatalog extends ExternalCatalog {
    +
    +  /**
    +    * Adds table into external Catalog
    +    *
    +    * @param table          description of table which to create
    +    * @param ignoreIfExists if table already exists in the catalog, not throw exception and leave
    +    *                       the existed table if ignoreIfExists is true;
    +    *                       else throw a TableAlreadyExistException.
    +    * @throws DatabaseNotExistException  if database does not exist in the catalog yet
    +    * @throws TableAlreadyExistException if table already exists in the catalog and
    +    *                                    ignoreIfExists is false
    +    */
    +  @throws[DatabaseNotExistException]
    +  @throws[TableAlreadyExistException]
    +  def createTable(table: ExternalCatalogTable, ignoreIfExists: Boolean): Unit
    +
    +  /**
    +    * Deletes table from external Catalog
    +    *
    +    * @param dbName            database name
    +    * @param tableName         table name
    +    * @param ignoreIfNotExists if table not exist yet, not throw exception if ignoreIfNotExists is
    +    *                          true; else throw TableNotExistException
    +    * @throws DatabaseNotExistException if database does not exist in the catalog yet
    +    * @throws TableNotExistException    if table does not exist in the catalog yet
    +    */
    +  @throws[DatabaseNotExistException]
    +  @throws[TableNotExistException]
    +  def dropTable(dbName: String, tableName: String, ignoreIfNotExists: Boolean): Unit
    --- End diff --
    
    Take the usage into consideration, when we create database or alter database, we may need to specify other properties except for databaseNames, So the input parameters to createDatabase and alterDatabase is a ExternalCatalogDatabase instance which includes databaseName and other properties.
    However, when we drop a database, name is enough.
    Take ddl for example,
    CREATE DATABASE czech_slovak_names 
      CHARACTER SET = 'keybcs2'
      COLLATE = 'keybcs2_bin';
    
    Drop (DATABASE) [IF EXISTS] database_name;



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r104166130
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala ---
    @@ -0,0 +1,73 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.catalog
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.table.plan.stats.TableStats
    +
    +/**
    +  * Table definition of the external catalog.
    +  *
    +  * @param identifier           identifier of external catalog table, including dbName and tableName
    +  * @param tableType            type of external catalog table, e.g csv, hbase, kafka
    +  * @param schema               schema of table data, including column names and column types
    +  * @param properties           properties of external catalog table
    +  * @param stats                statistics of external catalog table
    +  * @param comment              comment of external catalog table
    +  * @param createTime           create time of external catalog table
    +  * @param lastAccessTime       last access time of of external catalog table
    +  */
    +case class ExternalCatalogTable(
    +    identifier: TableIdentifier,
    +    tableType: String,
    +    schema: DataSchema,
    +    properties: Map[String, String] = Map.empty,
    +    stats: Option[TableStats] = None,
    +    comment: Option[String] = None,
    +    createTime: Long = System.currentTimeMillis,
    +    lastAccessTime: Long = -1)
    +
    +/**
    +  * Identifier of external catalog table
    +  *
    +  * @param database database name
    +  * @param table    table name
    +  */
    +case class TableIdentifier(
    +    database: String,
    +    table: String) {
    +
    +  override def toString: String = s"$database.$table"
    +
    +}
    +
    +/**
    +  * Schema of External catalog table's columns
    +  *
    +  * @param columnTypes types of each column
    +  * @param columnNames names of each column
    +  */
    +case class DataSchema(
    +    columnTypes: Array[TypeInformation[_]],
    +    columnNames: Array[String]) {
    +
    +  override def toString: String =
    +    s"column types: [${columnTypes.mkString(",")}], column names: [${columnNames.mkString(",")}]"
    --- End diff --
    
    Change this to something like:
    ```
    names.zip(types).map( x => s"$x._1: $x._2").mkString(",")
    ```
    
    To have something like "name: String, age: Int, salary: Float"?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r105142163
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala ---
    @@ -0,0 +1,164 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.catalog
    +
    +import java.util
    +
    +import org.apache.calcite.linq4j.tree.Expression
    +import org.apache.calcite.schema._
    +import org.apache.flink.table.api.{DatabaseNotExistException, TableNotExistException}
    +import org.slf4j.{Logger, LoggerFactory}
    +
    +import scala.collection.JavaConverters._
    +
    +/**
    +  * This class is responsible for connect external catalog to calcite catalog.
    +  * In this way, it is possible to look-up and access tables in SQL queries
    +  * without registering tables in advance.
    +  * The databases in the external catalog registers as calcite sub-Schemas of current schema.
    +  * The tables in a given database registers as calcite tables
    +  * of the [[ExternalCatalogDatabaseSchema]].
    +  *
    +  * @param catalogIdentifier external catalog name
    +  * @param catalog           external catalog
    +  */
    +class ExternalCatalogSchema(
    +    catalogIdentifier: String,
    +    catalog: ExternalCatalog) extends Schema {
    +
    +  private val LOG: Logger = LoggerFactory.getLogger(this.getClass)
    +
    +  /**
    +    * Looks up database by the given sub-schema name in the external catalog,
    +    * returns it Wrapped in a [[ExternalCatalogDatabaseSchema]] with the given database name.
    +    *
    +    * @param name Sub-schema name
    +    * @return Sub-schema with a given name, or null
    +    */
    +  override def getSubSchema(name: String): Schema = {
    +    try {
    +      val db = catalog.getDatabase(name)
    +      if (db != null) {
    +        new ExternalCatalogDatabaseSchema(db.dbName, catalog)
    +      } else {
    +        null
    +      }
    +    } catch {
    +      case e: DatabaseNotExistException =>
    +        LOG.warn(s"Database $name does not exist in externalCatalog $catalogIdentifier")
    +        null
    +    }
    +  }
    +
    +  /**
    +    * Lists the databases of the external catalog,
    +    * returns the lists as the names of this schema's sub-schemas.
    +    *
    +    * @return names of this schema's child schemas
    +    */
    +  override def getSubSchemaNames: util.Set[String] = new util.LinkedHashSet(catalog.listDatabases())
    +
    +  override def getTable(name: String): Table = null
    +
    +  override def isMutable: Boolean = true
    +
    +  override def getFunctions(name: String): util.Collection[Function] =
    +    util.Collections.emptyList[Function]
    +
    +  override def getExpression(parentSchema: SchemaPlus, name: String): Expression =
    +    Schemas.subSchemaExpression(parentSchema, name, getClass)
    +
    +  override def getFunctionNames: util.Set[String] = util.Collections.emptySet[String]
    +
    +  override def getTableNames: util.Set[String] = util.Collections.emptySet[String]
    +
    +  override def contentsHaveChangedSince(lastCheck: Long, now: Long): Boolean = true
    +
    +  /**
    +    * Registers sub-Schemas to current schema plus
    +    *
    +    * @param plusOfThis
    +    */
    +  def registerSubSchemas(plusOfThis: SchemaPlus) {
    +    catalog.listDatabases().asScala.foreach(db => plusOfThis.add(db, getSubSchema(db)))
    +  }
    +
    +  private class ExternalCatalogDatabaseSchema(
    +      schemaName: String,
    +      flinkExternalCatalog: ExternalCatalog) extends Schema {
    +
    +    override def getTable(name: String): Table = {
    +      try {
    +        val externalCatalogTable = flinkExternalCatalog.getTable(schemaName, name)
    --- End diff --
    
    Logic can be simplified if we specify the behavior of `getTable()` more precisely.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r105137565
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalog.scala ---
    @@ -0,0 +1,114 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.catalog
    +
    +import java.util
    +
    +/**
    +  * This class is responsible for interact with external catalog.
    +  * Its main responsibilities including:
    +  * <ul>
    +  * <li> create/drop/alter database or tables for DDL operations
    +  * <li> provide tables for calcite catalog, it looks up databases or tables in the external catalog
    +  * </ul>
    +  */
    +trait ExternalCatalog {
    +
    +  /**
    +    * Adds table into external Catalog
    +    *
    +    * @param table          description of table which to create
    +    * @param ignoreIfExists whether to ignore operation if table already exists
    +    */
    +  def createTable(table: ExternalCatalogTable, ignoreIfExists: Boolean): Unit
    +
    +  /**
    +    * Deletes table from external Catalog
    +    *
    +    * @param dbName            database name
    +    * @param tableName         table name
    +    * @param ignoreIfNotExists whether to ignore operation if table not exist yet
    +    */
    +  def dropTable(dbName: String, tableName: String, ignoreIfNotExists: Boolean): Unit
    +
    +  /**
    +    * Modifies an existing table in the external catalog
    +    *
    +    * @param table             description of table which to modify
    +    * @param ignoreIfNotExists whether to ignore operation if table not exist yet
    +    */
    +  def alterTable(table: ExternalCatalogTable, ignoreIfNotExists: Boolean): Unit
    +
    +  /**
    +    * Gets table from external Catalog
    +    *
    +    * @param dbName    database name
    +    * @param tableName table name
    +    * @return table
    +    */
    +  def getTable(dbName: String, tableName: String): ExternalCatalogTable
    +
    +  /**
    +    * Gets the table name lists from current external Catalog
    +    *
    +    * @param dbName database name
    +    * @return lists of table name
    +    */
    +  def listTables(dbName: String): util.List[String]
    +
    +  /**
    +    * Adds database into external Catalog
    +    *
    +    * @param db             description of database which to create
    +    * @param ignoreIfExists whether to ignore operation if database already exists
    --- End diff --
    
    What happens if DB exists and `ignoreIfExists = true`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r104185348
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalog.scala ---
    @@ -0,0 +1,108 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.catalog
    +
    +/**
    +  * This class is responsible for interact with external catalog.
    +  * Its main responsibilities including:
    +  * <ul>
    +  * <li> create/drop/alter database or tables for DDL operations
    +  * <li> provide tables for calcite catalog, it looks up databases or tables in the external catalog
    +  * </ul>
    +  */
    +trait ExternalCatalog {
    +
    +  /**
    +    * Adds table into external Catalog
    +    *
    +    * @param table          description of table which to create
    +    * @param ignoreIfExists whether to ignore operation if table already exists
    +    */
    +  def createTable(table: ExternalCatalogTable, ignoreIfExists: Boolean): Unit
    +
    +  /**
    +    * Deletes table from external Catalog
    +    *
    +    * @param dbName            database name
    +    * @param tableName         table name
    +    * @param ignoreIfNotExists whether to ignore operation if table not exist yet
    +    */
    +  def dropTable(dbName: String, tableName: String, ignoreIfNotExists: Boolean): Unit
    +
    +  /**
    +    * Alters existed table into external Catalog
    +    *
    +    * @param table description of table which to alter
    +    */
    +  def alterTable(table: ExternalCatalogTable): Unit
    +
    +  /**
    +    * Gets table from external Catalog
    +    *
    +    * @param dbName    database name
    +    * @param tableName table name
    +    * @return table
    +    */
    +  def getTable(dbName: String, tableName: String): ExternalCatalogTable
    +
    +  /**
    +    * Gets the table name lists from current external Catalog
    +    *
    +    * @param dbName database name
    +    * @return lists of table name
    +    */
    +  def listTables(dbName: String): Seq[String]
    +
    +  /**
    +    * Adds database into external Catalog
    +    *
    +    * @param db             database name
    +    * @param ignoreIfExists whether to ignore operation if database already exists
    +    */
    +  def createDatabase(db: ExternalCatalogDatabase, ignoreIfExists: Boolean): Unit
    +
    +  /**
    +    * Deletes database from external Catalog
    +    *
    +    * @param dbName            database name
    +    * @param ignoreIfNotExists whether to ignore operation if table not exist yet
    +    */
    +  def dropDatabase(dbName: String, ignoreIfNotExists: Boolean): Unit
    +
    +  /**
    +    * Alters existed database into external Catalog
    +    */
    +  def alterDatabase(db: ExternalCatalogDatabase): Unit
    --- End diff --
    
    Add an `ignoreIfNotExists` option?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r105382341
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CatalogTableHelper.scala ---
    @@ -0,0 +1,138 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.catalog
    +
    +import java.io.IOException
    +import java.lang.reflect.Modifier
    +import java.net.URL
    +import java.util.Properties
    +
    +import org.apache.flink.table.annotation.ExternalCatalogCompatible
    +import org.apache.flink.table.api.ExternalCatalogTableTypeNotExistException
    +import org.apache.flink.table.plan.schema.TableSourceTable
    +import org.apache.flink.table.plan.stats.FlinkStatistic
    +import org.apache.flink.table.sources.TableSource
    +import org.apache.flink.util.InstantiationUtil
    +import org.reflections.Reflections
    +import org.slf4j.{Logger, LoggerFactory}
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.HashMap
    +
    +/**
    +  * The utility class is used to convert ExternalCatalogTable to TableSourceTable.
    +  */
    +object CatalogTableHelper {
    +
    +  // config file to specifier the scan package to search tableSources
    +  // which is compatible with external catalog.
    +  private val tableSourceConfigFileName = "externalCatalogTable.properties"
    --- End diff --
    
    Sorry, I think I confused something there. 
    The approach to have a resource file in each Module / Jar that offers converters is good. Hard-coding the name of the resource file is fine.
    
    Thinking this one step further, would it make sense to not use scanning at all but exactly specify the Converter classes? Then we would not need to use the reflection library to scan the class path. Just as an idea.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3406: [flink-5568] [Table API & SQL]Introduce interface for cat...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:

    https://github.com/apache/flink/pull/3406
  
    Not to forget: we also need some good documentation for the website.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r105141923
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalog.scala ---
    @@ -0,0 +1,114 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.catalog
    +
    +import java.util
    +
    +/**
    +  * This class is responsible for interact with external catalog.
    +  * Its main responsibilities including:
    +  * <ul>
    +  * <li> create/drop/alter database or tables for DDL operations
    +  * <li> provide tables for calcite catalog, it looks up databases or tables in the external catalog
    +  * </ul>
    +  */
    +trait ExternalCatalog {
    +
    +  /**
    +    * Adds table into external Catalog
    +    *
    +    * @param table          description of table which to create
    +    * @param ignoreIfExists whether to ignore operation if table already exists
    +    */
    +  def createTable(table: ExternalCatalogTable, ignoreIfExists: Boolean): Unit
    +
    +  /**
    +    * Deletes table from external Catalog
    +    *
    +    * @param dbName            database name
    +    * @param tableName         table name
    +    * @param ignoreIfNotExists whether to ignore operation if table not exist yet
    +    */
    +  def dropTable(dbName: String, tableName: String, ignoreIfNotExists: Boolean): Unit
    +
    +  /**
    +    * Modifies an existing table in the external catalog
    +    *
    +    * @param table             description of table which to modify
    +    * @param ignoreIfNotExists whether to ignore operation if table not exist yet
    +    */
    +  def alterTable(table: ExternalCatalogTable, ignoreIfNotExists: Boolean): Unit
    +
    +  /**
    +    * Gets table from external Catalog
    +    *
    +    * @param dbName    database name
    +    * @param tableName table name
    +    * @return table
    +    */
    +  def getTable(dbName: String, tableName: String): ExternalCatalogTable
    --- End diff --
    
    Do we expect `null` or an exception if the table or database does not exist?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3406: [flink-5568] [Table API & SQL]Introduce interface for cat...

Posted by beyond1920 <gi...@git.apache.org>.
Github user beyond1920 commented on the issue:

    https://github.com/apache/flink/pull/3406
  
    @twalthr , thanks for the review. I have updated the pr based on your suggestion. I would add documents later in the pr #3409. About the name of ExternalCatalog, I notice that there are three kinds of catalog at flink now, the first one is calcite catalog, the second one is function catalog, the third one is the external catalog. I add 'external' prefix to distinguish them. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r104387807
  
    --- Diff: flink-libraries/flink-table/pom.xml ---
    @@ -92,7 +92,12 @@ under the License.
     			</exclusions>
     		</dependency>
     
    -
    +		<dependency>
    --- End diff --
    
    Here is what I found out. WTFPL is compatible with AL2 (https://www.apache.org/legal/resolved.html).
    
    Maven Central lists the dependency as BSD licensed (https://search.maven.org/#artifactdetails%7Corg.reflections%7Creflections%7C0.9.10%7Cjar) which is also OK.
    However, it seems that some of its transitive dependencies are LGPL licensed which would not work. So maybe we have to exclude those.
    
    Copying a file and relicensing should be OK under WTFPL. However, we should clearly indicate the origin of the code.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r105704013
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ReadonlyExternalCatalog.scala ---
    @@ -0,0 +1,72 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.catalog
    +
    +import java.util.{List => JList}
    +
    +import org.apache.flink.table.api._
    +
    +/**
    +  * This class is responsible for read table/database from external catalog.
    +  * Its main responsibilities is provide tables for calcite catalog, it looks up databases or tables
    +  * in the external catalog.
    +  */
    +trait ReadonlyExternalCatalog {
    --- End diff --
    
    Rename to `ExternalCatalog`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r105136850
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalog.scala ---
    @@ -0,0 +1,114 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.catalog
    +
    +import java.util
    +
    +/**
    +  * This class is responsible for interact with external catalog.
    +  * Its main responsibilities including:
    +  * <ul>
    +  * <li> create/drop/alter database or tables for DDL operations
    +  * <li> provide tables for calcite catalog, it looks up databases or tables in the external catalog
    +  * </ul>
    +  */
    +trait ExternalCatalog {
    +
    +  /**
    +    * Adds table into external Catalog
    +    *
    +    * @param table          description of table which to create
    +    * @param ignoreIfExists whether to ignore operation if table already exists
    --- End diff --
    
    We should specify what happens if `ignoreIfExists = true`. Will it replace the existing table or just not thrown not an exception and leave the existing table?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3406: [flink-5568] [Table API & SQL]Introduce interface for cat...

Posted by beyond1920 <gi...@git.apache.org>.
Github user beyond1920 commented on the issue:

    https://github.com/apache/flink/pull/3406
  
    @fhueske , thanks for your review. I already updated the pr based on your comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3406: [flink-5568] [Table API & SQL]Introduce interface for cat...

Posted by KurtYoung <gi...@git.apache.org>.
Github user KurtYoung commented on the issue:

    https://github.com/apache/flink/pull/3406
  
    Seems good to merge now, merging...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r106415566
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala ---
    @@ -0,0 +1,76 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.catalog
    +
    +import java.util.{HashMap => JHashMap, Map => JMap}
    +import java.lang.{Long => JLong}
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.table.plan.stats.TableStats
    +
    +/**
    +  * Table definition of the external catalog.
    +  *
    +  * @param identifier           identifier of external catalog table, including dbName and tableName
    +  * @param tableType            type of external catalog table, e.g csv, hbase, kafka
    +  * @param schema               schema of table data, including column names and column types
    +  * @param properties           properties of external catalog table
    +  * @param stats                statistics of external catalog table
    +  * @param comment              comment of external catalog table
    +  * @param createTime           create time of external catalog table
    +  * @param lastAccessTime       last access time of of external catalog table
    +  */
    +case class ExternalCatalogTable(
    +    identifier: TableIdentifier,
    +    tableType: String,
    +    schema: DataSchema,
    --- End diff --
    
    Actually, I think the available `TableSchema` class can be reused here.
    Would that work @beyond1920 ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by beyond1920 <gi...@git.apache.org>.
Github user beyond1920 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r104394065
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala ---
    @@ -0,0 +1,166 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.catalog
    +
    +import java.util
    +
    +import org.apache.calcite.linq4j.tree.Expression
    +import org.apache.calcite.schema._
    +import org.apache.flink.table.api.{DatabaseNotExistException, TableNotExistException}
    +import org.slf4j.{Logger, LoggerFactory}
    +
    +import scala.collection.JavaConverters._
    +
    +/**
    +  * This class is responsible for connect external catalog to calcite catalog.
    +  * In this way, it is possible to look-up and access tables in SQL queries
    +  * without registering tables in advance.
    +  * The databases in the external catalog registers as calcite sub-Schemas of current schema.
    +  * The tables in a given database registers as calcite tables
    +  * of the [[ExternalCatalogDatabaseSchema]].
    +  *
    +  * @param catalogIdentifier external catalog name
    +  * @param catalog           external catalog
    +  */
    +class ExternalCatalogSchema(
    +    catalogIdentifier: String,
    +    catalog: ExternalCatalog) extends Schema {
    +
    +  private val LOG: Logger = LoggerFactory.getLogger(this.getClass)
    +
    +  /**
    +    * Looks up database by the given sub-schema name in the external catalog,
    +    * returns it Wrapped in a [[ExternalCatalogDatabaseSchema]] with the given database name.
    +    *
    +    * @param name Sub-schema name
    +    * @return Sub-schema with a given name, or null
    +    */
    +  override def getSubSchema(name: String): Schema = {
    +    try {
    +      val db = catalog.getDatabase(name)
    +      if (db != null) {
    +        new ExternalCatalogDatabaseSchema(db.dbName, catalog)
    +      } else {
    +        null
    +      }
    +    } catch {
    +      case e: DatabaseNotExistException =>
    +        LOG.warn(s"database $name does not exist in externalCatalog $catalogIdentifier")
    +        null
    +    }
    +  }
    +
    +  /**
    +    * Lists the databases of the external catalog,
    +    * returns the lists as the names of this schema's sub-schemas.
    +    *
    +    * @return names of this schema's child schemas
    +    */
    +  override def getSubSchemaNames: util.Set[String] = catalog.listDatabases().toSet.asJava
    +
    +  override def getTable(name: String): Table = null
    +
    +  override def isMutable: Boolean = true
    +
    +  override def getFunctions(name: String): util.Collection[Function] =
    +    util.Collections.emptyList[Function]
    +
    +  override def getExpression(parentSchema: SchemaPlus, name: String): Expression =
    +    Schemas.subSchemaExpression(parentSchema, name, getClass)
    +
    +  override def getFunctionNames: util.Set[String] = util.Collections.emptySet[String]
    +
    +  override def getTableNames: util.Set[String] = util.Collections.emptySet[String]
    +
    +  override def contentsHaveChangedSince(lastCheck: Long, now: Long): Boolean = false
    --- End diff --
    
    Oh, yes, my bad. Would change it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r104371244
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala ---
    @@ -68,6 +62,90 @@ object CommonTestData {
         )
       }
     
    +  def getMockedFlinkExternalCatalog: ExternalCatalog = {
    +    val csvRecord1 = Seq(
    --- End diff --
    
    fine with me


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by beyond1920 <gi...@git.apache.org>.
Github user beyond1920 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r105363598
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalog.scala ---
    @@ -0,0 +1,114 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.catalog
    +
    +import java.util
    +
    +/**
    +  * This class is responsible for interact with external catalog.
    +  * Its main responsibilities including:
    +  * <ul>
    +  * <li> create/drop/alter database or tables for DDL operations
    +  * <li> provide tables for calcite catalog, it looks up databases or tables in the external catalog
    +  * </ul>
    +  */
    +trait ExternalCatalog {
    +
    +  /**
    +    * Adds table into external Catalog
    +    *
    +    * @param table          description of table which to create
    +    * @param ignoreIfExists whether to ignore operation if table already exists
    +    */
    +  def createTable(table: ExternalCatalogTable, ignoreIfExists: Boolean): Unit
    --- End diff --
    
    Hi, fabian. Even for the integration with Calcite, getTable(), getDataBase(), and listTables() are sufficient, however an complete ExternalCatalog should be responsible for CRUD operations on db/table. For some readonly ExternalCatalog, we could choose to not supported createX, dropX, andalterX methods , for example, throw UnsupportedOperationException.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r105179034
  
    --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/annotation/ExternalCatalogCompatible.java ---
    @@ -0,0 +1,63 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.annotation;
    +
    +import org.apache.flink.annotation.Public;
    +import org.apache.flink.table.catalog.TableSourceConverter;
    +
    +import java.lang.annotation.Documented;
    +import java.lang.annotation.ElementType;
    +import java.lang.annotation.Retention;
    +import java.lang.annotation.RetentionPolicy;
    +import java.lang.annotation.Target;
    +
    +/**
    + * A tableSource with this annotation represents it is compatible with external catalog, that is,
    + * an instance of this tableSource can be converted to or converted from external catalog table
    + * instance.
    + * The annotation contains the following information:
    + * <ul>
    + * <li> external catalog table type name for this kind of tableSource </li>
    + * <li> external catalog table <-> tableSource converter class </li>
    + * </ul>
    + */
    +@Documented
    +@Target(ElementType.TYPE)
    +@Retention(RetentionPolicy.RUNTIME)
    +@Public
    +public @interface ExternalCatalogCompatible {
    --- End diff --
    
    I thought about the version and agree that we do not necessarily need a dedicated version field.
    However, I think we need a bit more "safety" in the translation process since the converters will rely on certain properties to be set and the link between catalog table and converter is just a string. IMO, we need some kind of verification that the properties of a `ExternalCatalogTable` are compatible with the requirements of a converter.
    
    What do you think about the following approach:
    - We extend the `TableSourceConverter` interface by a few methods to describe its requirements:
    ```
    def requiredProperties: Array[String]
    def optionalProperites: Array[String]
    def getPropertyDescription(p: String): String
    ```
    - we remove the `ExternalCatalogCompatible` annotation
    - we add a `TableType` annotation which carries a String for the table type name and which is annotated to the `TableSourceConverter` (not to the TableSource!). 
    
    By doing this, we scan for converters instead of table sources. IMO, this has the benefit that the required properties of a table type (i.e., the identifier string) are tied to the converter, which knows how to handle the ExternalTable and convert it into a TableSource. The type of the TableSource is not really relevant. It also means that there is one place to go when looking for the requirements of a table type (the converter).
    
    What do you think @beyond1920 and @KurtYoung ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r105714201
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceConverter.scala ---
    @@ -0,0 +1,73 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.sources
    +
    +import org.apache.flink.table.annotation.TableType
    +import org.apache.flink.table.catalog.{ExternalCatalogTable, TableSourceConverter}
    +
    +import scala.collection.JavaConverters._
    +import java.util.{Set => JSet}
    +
    +import com.google.common.collect.ImmutableSet
    +
    +/**
    +  * The class defines a converter used to convert [[CsvTableSource]] to
    +  * or from [[ExternalCatalogTable]].
    +  */
    +@TableType(value = "csv")
    +class CsvTableSourceConverter extends TableSourceConverter[CsvTableSource] {
    +
    +  private val required: JSet[String] = ImmutableSet.of("path")
    --- End diff --
    
    I'd add fieldDelim and rowDelim to the required parameters as well. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r106348184
  
    --- Diff: flink-libraries/flink-table/src/main/resources/tableSourceConverter.properties ---
    @@ -0,0 +1,19 @@
    +################################################################################
    +#  Licensed to the Apache Software Foundation (ASF) under one
    +#  or more contributor license agreements.  See the NOTICE file
    +#  distributed with this work for additional information
    +#  regarding copyright ownership.  The ASF licenses this file
    +#  to you under the Apache License, Version 2.0 (the
    +#  "License"); you may not use this file except in compliance
    +#  with the License.  You may obtain a copy of the License at
    +#
    +#      http://www.apache.org/licenses/LICENSE-2.0
    +#
    +#  Unless required by applicable law or agreed to in writing, software
    +#  distributed under the License is distributed on an "AS IS" BASIS,
    +#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +#  See the License for the specific language governing permissions and
    +# limitations under the License.
    +################################################################################
    +
    --- End diff --
    
    Can you add some comments to this file? What it is used for and where?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r104178022
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala ---
    @@ -0,0 +1,166 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.catalog
    +
    +import java.util
    +
    +import org.apache.calcite.linq4j.tree.Expression
    +import org.apache.calcite.schema._
    +import org.apache.flink.table.api.{DatabaseNotExistException, TableNotExistException}
    +import org.slf4j.{Logger, LoggerFactory}
    +
    +import scala.collection.JavaConverters._
    +
    +/**
    +  * This class is responsible for connect external catalog to calcite catalog.
    +  * In this way, it is possible to look-up and access tables in SQL queries
    +  * without registering tables in advance.
    +  * The databases in the external catalog registers as calcite sub-Schemas of current schema.
    +  * The tables in a given database registers as calcite tables
    +  * of the [[ExternalCatalogDatabaseSchema]].
    +  *
    +  * @param catalogIdentifier external catalog name
    +  * @param catalog           external catalog
    +  */
    +class ExternalCatalogSchema(
    +    catalogIdentifier: String,
    +    catalog: ExternalCatalog) extends Schema {
    +
    +  private val LOG: Logger = LoggerFactory.getLogger(this.getClass)
    +
    +  /**
    +    * Looks up database by the given sub-schema name in the external catalog,
    +    * returns it Wrapped in a [[ExternalCatalogDatabaseSchema]] with the given database name.
    +    *
    +    * @param name Sub-schema name
    +    * @return Sub-schema with a given name, or null
    +    */
    +  override def getSubSchema(name: String): Schema = {
    +    try {
    +      val db = catalog.getDatabase(name)
    +      if (db != null) {
    +        new ExternalCatalogDatabaseSchema(db.dbName, catalog)
    +      } else {
    +        null
    +      }
    +    } catch {
    +      case e: DatabaseNotExistException =>
    +        LOG.warn(s"database $name does not exist in externalCatalog $catalogIdentifier")
    +        null
    +    }
    +  }
    +
    +  /**
    +    * Lists the databases of the external catalog,
    +    * returns the lists as the names of this schema's sub-schemas.
    +    *
    +    * @return names of this schema's child schemas
    +    */
    +  override def getSubSchemaNames: util.Set[String] = catalog.listDatabases().toSet.asJava
    +
    +  override def getTable(name: String): Table = null
    +
    +  override def isMutable: Boolean = true
    +
    +  override def getFunctions(name: String): util.Collection[Function] =
    +    util.Collections.emptyList[Function]
    +
    +  override def getExpression(parentSchema: SchemaPlus, name: String): Expression =
    +    Schemas.subSchemaExpression(parentSchema, name, getClass)
    +
    +  override def getFunctionNames: util.Set[String] = util.Collections.emptySet[String]
    +
    +  override def getTableNames: util.Set[String] = util.Collections.emptySet[String]
    +
    +  override def contentsHaveChangedSince(lastCheck: Long, now: Long): Boolean = false
    +
    +  /**
    +    * Registers sub-Schemas to current schema plus
    +    *
    +    * @param plusOfThis
    +    */
    +  def registerSubSchemas(plusOfThis: SchemaPlus) {
    +    catalog.listDatabases().foreach(
    +      dbName => plusOfThis.add(dbName, getSubSchema(dbName))
    +    )
    +  }
    +
    +  private class ExternalCatalogDatabaseSchema(
    +      schemaName: String,
    +      flinkExternalCatalog: ExternalCatalog) extends Schema {
    +
    +    override def getTable(name: String): Table = {
    +      try {
    +        val externalCatalogTable = flinkExternalCatalog.getTable(schemaName, name)
    +        if (externalCatalogTable != null) {
    +          CatalogTableHelper.fromExternalCatalogTable(externalCatalogTable)
    +        } else {
    +          null
    +        }
    +      } catch {
    +        case TableNotExistException(db, table, cause) => {
    +          LOG.warn(s"table $db.$table does not exist in externalCatalog $catalogIdentifier")
    --- End diff --
    
    **T**able


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r104159675
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CatalogTableHelper.scala ---
    @@ -0,0 +1,139 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.catalog
    +
    +import java.io.IOException
    +import java.lang.reflect.Modifier
    +import java.net.URL
    +import java.util.Properties
    +
    +import org.apache.flink.table.annotation.ExternalCatalogCompatible
    +import org.apache.flink.table.api.{ExternalCatalogTableTypeAlreadyExistException, ExternalCatalogTableTypeNotExistException}
    +import org.apache.flink.table.plan.schema.TableSourceTable
    +import org.apache.flink.table.plan.stats.FlinkStatistic
    +import org.apache.flink.table.sources.TableSource
    +import org.apache.flink.util.InstantiationUtil
    +import org.reflections.Reflections
    +import org.slf4j.{Logger, LoggerFactory}
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.HashMap
    +
    +/**
    +  * The utility class is used to convert ExternalCatalogTable to TableSourceTable.
    +  */
    +object CatalogTableHelper {
    +
    +  // config file to specifier the scan package to search tableSources
    +  // which is compatible with external catalog.
    +  private val tableSourceConfigFileName = "externalcatalogTable.properties"
    +
    +  private val LOG: Logger = LoggerFactory.getLogger(this.getClass)
    +
    +  // registered tableSources which are compatible with external catalog.
    +  // Key is table type name, Value is converter class.
    +  private val tableSourceTypeToConvertersClazz = {
    +    val registeredConverters =
    +      new HashMap[String, Class[_ <: TableSourceConverter[_]]]
    +    // scan all config file to find all tableSources which are compatible with external catalog.
    +    val resourceUrls = getClass.getClassLoader.getResources(tableSourceConfigFileName)
    +    while (resourceUrls.hasMoreElements) {
    +      val url = resourceUrls.nextElement()
    +      parseScanPackageFromConfigFile(url) match {
    +        case Some(scanPackage) =>
    +          val clazzWithAnnotations = new Reflections(scanPackage)
    +              .getTypesAnnotatedWith(classOf[ExternalCatalogCompatible])
    +          clazzWithAnnotations.asScala.foreach(clazz =>
    +            if (classOf[TableSource[_]].isAssignableFrom(clazz)) {
    +              if (Modifier.isAbstract(clazz.getModifiers()) ||
    +                  Modifier.isInterface(clazz.getModifiers)) {
    +                LOG.warn(
    +                  s"class :[${clazz.getName}] is also with ExternalCatalogCompatible annotation, " +
    +                      s"but it's an abstract clazz or an interface.")
    +              } else {
    +                val externalCatalogCompatible: ExternalCatalogCompatible =
    +                  clazz.getAnnotation(classOf[ExternalCatalogCompatible])
    +                val tableSourceName = externalCatalogCompatible.tableType()
    +                if (registeredConverters.contains(tableSourceName)) {
    +                  LOG.error(s"The table type [$tableSourceName] is already registered.")
    +                  throw new ExternalCatalogTableTypeAlreadyExistException(tableSourceName)
    +                }
    +                registeredConverters.put(tableSourceName, externalCatalogCompatible.converter())
    +              }
    +            } else {
    +              LOG.warn(
    +                s"class :[${clazz.getName}] is also with ExternalCatalogCompatible annotation, " +
    --- End diff --
    
    `"Class ${clazz.getName} is annotated with ExternalCatalogCompatible but does not implement the TableSource interface."`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r105141214
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala ---
    @@ -0,0 +1,164 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.catalog
    +
    +import java.util
    +
    +import org.apache.calcite.linq4j.tree.Expression
    +import org.apache.calcite.schema._
    +import org.apache.flink.table.api.{DatabaseNotExistException, TableNotExistException}
    +import org.slf4j.{Logger, LoggerFactory}
    +
    +import scala.collection.JavaConverters._
    +
    +/**
    +  * This class is responsible for connect external catalog to calcite catalog.
    +  * In this way, it is possible to look-up and access tables in SQL queries
    +  * without registering tables in advance.
    +  * The databases in the external catalog registers as calcite sub-Schemas of current schema.
    +  * The tables in a given database registers as calcite tables
    +  * of the [[ExternalCatalogDatabaseSchema]].
    +  *
    +  * @param catalogIdentifier external catalog name
    +  * @param catalog           external catalog
    +  */
    +class ExternalCatalogSchema(
    +    catalogIdentifier: String,
    +    catalog: ExternalCatalog) extends Schema {
    +
    +  private val LOG: Logger = LoggerFactory.getLogger(this.getClass)
    +
    +  /**
    +    * Looks up database by the given sub-schema name in the external catalog,
    +    * returns it Wrapped in a [[ExternalCatalogDatabaseSchema]] with the given database name.
    +    *
    +    * @param name Sub-schema name
    +    * @return Sub-schema with a given name, or null
    +    */
    +  override def getSubSchema(name: String): Schema = {
    +    try {
    +      val db = catalog.getDatabase(name)
    --- End diff --
    
    Shouldn't we specify the behavior of `getDatabase()` more clearly? 
    Is it expected to return `null` or throw an exception if the database does not exist? Right now we cover both cases which is not necessary if we have a clear interface definition.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r105159735
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala ---
    @@ -0,0 +1,90 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.catalog
    +
    +import java.util.Collections
    +
    +import com.google.common.collect.Lists
    +import org.apache.calcite.jdbc.CalciteSchema
    +import org.apache.calcite.prepare.CalciteCatalogReader
    +import org.apache.calcite.schema.SchemaPlus
    +import org.apache.calcite.sql.validate.SqlMonikerType
    +import org.apache.commons.collections.CollectionUtils
    +import org.apache.flink.table.calcite.{FlinkTypeFactory, FlinkTypeSystem}
    +import org.apache.flink.table.plan.schema.TableSourceTable
    +import org.apache.flink.table.sources.CsvTableSource
    +import org.apache.flink.table.utils.CommonTestData
    +import org.junit.{Before, Test}
    +import org.junit.Assert._
    +
    +class ExternalCatalogSchemaTest {
    +
    +  private val schemaName: String = "test"
    +  private var externalCatalogSchema: ExternalCatalogSchema = _
    +  private var calciteCatalogReader: CalciteCatalogReader = _
    +  private val db = "db1"
    +  private val tb = "tb1"
    +
    +  @Before
    +  def setUp(): Unit = {
    +    val rootSchemaPlus: SchemaPlus = CalciteSchema.createRootSchema(true, false).plus()
    +    val catalog = CommonTestData.getMockedFlinkExternalCatalog
    +    externalCatalogSchema = ExternalCatalogSchema.create(rootSchemaPlus, schemaName, catalog)
    +    val typeFactory = new FlinkTypeFactory(new FlinkTypeSystem())
    +    calciteCatalogReader = new CalciteCatalogReader(
    +      CalciteSchema.from(rootSchemaPlus),
    +      false,
    +      Collections.emptyList(),
    +      typeFactory)
    +  }
    +
    +  @Test
    +  def testGetSubSchema(): Unit = {
    +    val allSchemaObjectNames = calciteCatalogReader
    +        .getAllSchemaObjectNames(Lists.newArrayList(schemaName))
    +    assertTrue(allSchemaObjectNames.size() == 2)
    +    assertEquals(SqlMonikerType.SCHEMA, allSchemaObjectNames.get(0).getType)
    +    assertTrue(
    +      CollectionUtils.isEqualCollection(
    +        allSchemaObjectNames.get(0).getFullyQualifiedNames,
    --- End diff --
    
    Check both entries


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r104167894
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalog.scala ---
    @@ -0,0 +1,108 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.catalog
    +
    +/**
    +  * This class is responsible for interact with external catalog.
    +  * Its main responsibilities including:
    +  * <ul>
    +  * <li> create/drop/alter database or tables for DDL operations
    +  * <li> provide tables for calcite catalog, it looks up databases or tables in the external catalog
    +  * </ul>
    +  */
    +trait ExternalCatalog {
    +
    +  /**
    +    * Adds table into external Catalog
    +    *
    +    * @param table          description of table which to create
    +    * @param ignoreIfExists whether to ignore operation if table already exists
    +    */
    +  def createTable(table: ExternalCatalogTable, ignoreIfExists: Boolean): Unit
    +
    +  /**
    +    * Deletes table from external Catalog
    +    *
    +    * @param dbName            database name
    +    * @param tableName         table name
    +    * @param ignoreIfNotExists whether to ignore operation if table not exist yet
    +    */
    +  def dropTable(dbName: String, tableName: String, ignoreIfNotExists: Boolean): Unit
    +
    +  /**
    +    * Alters existed table into external Catalog
    +    *
    +    * @param table description of table which to alter
    +    */
    +  def alterTable(table: ExternalCatalogTable): Unit
    +
    +  /**
    +    * Gets table from external Catalog
    +    *
    +    * @param dbName    database name
    +    * @param tableName table name
    +    * @return table
    +    */
    +  def getTable(dbName: String, tableName: String): ExternalCatalogTable
    +
    +  /**
    +    * Gets the table name lists from current external Catalog
    +    *
    +    * @param dbName database name
    +    * @return lists of table name
    +    */
    +  def listTables(dbName: String): Seq[String]
    +
    +  /**
    +    * Adds database into external Catalog
    +    *
    +    * @param db             database name
    +    * @param ignoreIfExists whether to ignore operation if database already exists
    +    */
    +  def createDatabase(db: ExternalCatalogDatabase, ignoreIfExists: Boolean): Unit
    +
    +  /**
    +    * Deletes database from external Catalog
    +    *
    +    * @param dbName            database name
    +    * @param ignoreIfNotExists whether to ignore operation if table not exist yet
    +    */
    +  def dropDatabase(dbName: String, ignoreIfNotExists: Boolean): Unit
    +
    +  /**
    +    * Alters existed database into external Catalog
    +    */
    +  def alterDatabase(db: ExternalCatalogDatabase): Unit
    +
    +  /**
    +    * Gets database from external Catalog
    +    *
    +    * @param dbName database name
    +    * @return database
    +    */
    +  def getDatabase(dbName: String): ExternalCatalogDatabase
    +
    +  /**
    +    * Gets the database name lists from current external Catalog
    +    *
    +    * @return
    +    */
    +  def listDatabases(): Seq[String]
    --- End diff --
    
    Java `List` or `Set` instead of `Seq`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r104150546
  
    --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/annotation/ExternalCatalogCompatible.java ---
    @@ -0,0 +1,63 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.annotation;
    +
    +import org.apache.flink.annotation.Public;
    +import org.apache.flink.table.catalog.TableSourceConverter;
    +
    +import java.lang.annotation.Documented;
    +import java.lang.annotation.ElementType;
    +import java.lang.annotation.Retention;
    +import java.lang.annotation.RetentionPolicy;
    +import java.lang.annotation.Target;
    +
    +/**
    + * A tableSource with this annotation represents it is compatible with external catalog, that is,
    --- End diff --
    
    **T**ableSource (uppercase)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r105138652
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala ---
    @@ -0,0 +1,73 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.catalog
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.table.plan.stats.TableStats
    +
    +/**
    +  * Table definition of the external catalog.
    +  *
    +  * @param identifier           identifier of external catalog table, including dbName and tableName
    +  * @param tableType            type of external catalog table, e.g csv, hbase, kafka
    +  * @param schema               schema of table data, including column names and column types
    +  * @param properties           properties of external catalog table
    +  * @param stats                statistics of external catalog table
    +  * @param comment              comment of external catalog table
    +  * @param createTime           create time of external catalog table
    +  * @param lastAccessTime       last access time of of external catalog table
    +  */
    +case class ExternalCatalogTable(
    +    identifier: TableIdentifier,
    +    tableType: String,
    +    schema: DataSchema,
    +    properties: Map[String, String] = Map.empty,
    +    stats: Option[TableStats] = None,
    --- End diff --
    
    No `Option`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r104177526
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala ---
    @@ -0,0 +1,166 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.catalog
    +
    +import java.util
    +
    +import org.apache.calcite.linq4j.tree.Expression
    +import org.apache.calcite.schema._
    +import org.apache.flink.table.api.{DatabaseNotExistException, TableNotExistException}
    +import org.slf4j.{Logger, LoggerFactory}
    +
    +import scala.collection.JavaConverters._
    +
    +/**
    +  * This class is responsible for connect external catalog to calcite catalog.
    +  * In this way, it is possible to look-up and access tables in SQL queries
    +  * without registering tables in advance.
    +  * The databases in the external catalog registers as calcite sub-Schemas of current schema.
    +  * The tables in a given database registers as calcite tables
    +  * of the [[ExternalCatalogDatabaseSchema]].
    +  *
    +  * @param catalogIdentifier external catalog name
    +  * @param catalog           external catalog
    +  */
    +class ExternalCatalogSchema(
    +    catalogIdentifier: String,
    +    catalog: ExternalCatalog) extends Schema {
    +
    +  private val LOG: Logger = LoggerFactory.getLogger(this.getClass)
    +
    +  /**
    +    * Looks up database by the given sub-schema name in the external catalog,
    +    * returns it Wrapped in a [[ExternalCatalogDatabaseSchema]] with the given database name.
    +    *
    +    * @param name Sub-schema name
    +    * @return Sub-schema with a given name, or null
    +    */
    +  override def getSubSchema(name: String): Schema = {
    +    try {
    +      val db = catalog.getDatabase(name)
    +      if (db != null) {
    +        new ExternalCatalogDatabaseSchema(db.dbName, catalog)
    +      } else {
    +        null
    +      }
    +    } catch {
    +      case e: DatabaseNotExistException =>
    +        LOG.warn(s"database $name does not exist in externalCatalog $catalogIdentifier")
    --- End diff --
    
    **D**atabase


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r104191673
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala ---
    @@ -68,6 +62,90 @@ object CommonTestData {
         )
       }
     
    +  def getMockedFlinkExternalCatalog: ExternalCatalog = {
    +    val csvRecord1 = Seq(
    +      "1#1#Hi",
    +      "2#2#Hello",
    +      "3#2#Hello world"
    +    )
    +    val tempFilePath1 = writeToTempFile(csvRecord1.mkString("$"), "csv-test1", "tmp")
    +    val externalCatalogTable1 = ExternalCatalogTable(
    +      TableIdentifier("db1", "tb1"),
    +      "csv",
    +      DataSchema(
    +        Array(
    +          BasicTypeInfo.INT_TYPE_INFO,
    +          BasicTypeInfo.LONG_TYPE_INFO,
    +          BasicTypeInfo.STRING_TYPE_INFO),
    +        Array("a", "b", "c")
    +      ),
    +      properties = Map(
    +        "path" -> tempFilePath1,
    +        "fieldDelim" -> "#",
    +        "rowDelim" -> "$"
    +      )
    +    )
    +
    +    val csvRecord2 = Seq(
    --- End diff --
    
    `csvRecord2` -> `csvTable2` 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r105141318
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalog.scala ---
    @@ -0,0 +1,114 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.catalog
    +
    +import java.util
    +
    +/**
    +  * This class is responsible for interact with external catalog.
    +  * Its main responsibilities including:
    +  * <ul>
    +  * <li> create/drop/alter database or tables for DDL operations
    +  * <li> provide tables for calcite catalog, it looks up databases or tables in the external catalog
    +  * </ul>
    +  */
    +trait ExternalCatalog {
    +
    +  /**
    +    * Adds table into external Catalog
    +    *
    +    * @param table          description of table which to create
    +    * @param ignoreIfExists whether to ignore operation if table already exists
    +    */
    +  def createTable(table: ExternalCatalogTable, ignoreIfExists: Boolean): Unit
    +
    +  /**
    +    * Deletes table from external Catalog
    +    *
    +    * @param dbName            database name
    +    * @param tableName         table name
    +    * @param ignoreIfNotExists whether to ignore operation if table not exist yet
    +    */
    +  def dropTable(dbName: String, tableName: String, ignoreIfNotExists: Boolean): Unit
    +
    +  /**
    +    * Modifies an existing table in the external catalog
    +    *
    +    * @param table             description of table which to modify
    +    * @param ignoreIfNotExists whether to ignore operation if table not exist yet
    +    */
    +  def alterTable(table: ExternalCatalogTable, ignoreIfNotExists: Boolean): Unit
    +
    +  /**
    +    * Gets table from external Catalog
    +    *
    +    * @param dbName    database name
    +    * @param tableName table name
    +    * @return table
    +    */
    +  def getTable(dbName: String, tableName: String): ExternalCatalogTable
    +
    +  /**
    +    * Gets the table name lists from current external Catalog
    +    *
    +    * @param dbName database name
    +    * @return lists of table name
    +    */
    +  def listTables(dbName: String): util.List[String]
    +
    +  /**
    +    * Adds database into external Catalog
    +    *
    +    * @param db             description of database which to create
    +    * @param ignoreIfExists whether to ignore operation if database already exists
    +    */
    +  def createDatabase(db: ExternalCatalogDatabase, ignoreIfExists: Boolean): Unit
    +
    +  /**
    +    * Deletes database from external Catalog
    +    *
    +    * @param dbName            database name
    +    * @param ignoreIfNotExists whether to ignore operation if table not exist yet
    +    */
    +  def dropDatabase(dbName: String, ignoreIfNotExists: Boolean): Unit
    +
    +  /**
    +    * Modifies existed database into external Catalog
    +    *
    +    * @param db                description of database which to modify
    +    * @param ignoreIfNotExists whether to ignore operation if table not exist yet
    +    */
    +  def alterDatabase(db: ExternalCatalogDatabase, ignoreIfNotExists: Boolean): Unit
    +
    +  /**
    +    * Gets database from external Catalog
    +    *
    +    * @param dbName database name
    +    * @return database
    +    */
    +  def getDatabase(dbName: String): ExternalCatalogDatabase
    --- End diff --
    
    Do we expect `null` or an exception if the database does not exist?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3406: [flink-5568] [Table API & SQL]Introduce interface for cat...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/3406
  
    Thanks for the update @beyond1920.
    The PR looks good to me. 
    @twalthr do you also want to have a look at this PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by beyond1920 <gi...@git.apache.org>.
Github user beyond1920 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r105363822
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalog.scala ---
    @@ -0,0 +1,114 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.catalog
    +
    +import java.util
    +
    +/**
    +  * This class is responsible for interact with external catalog.
    +  * Its main responsibilities including:
    +  * <ul>
    +  * <li> create/drop/alter database or tables for DDL operations
    +  * <li> provide tables for calcite catalog, it looks up databases or tables in the external catalog
    +  * </ul>
    +  */
    +trait ExternalCatalog {
    +
    +  /**
    +    * Adds table into external Catalog
    +    *
    +    * @param table          description of table which to create
    +    * @param ignoreIfExists whether to ignore operation if table already exists
    +    */
    +  def createTable(table: ExternalCatalogTable, ignoreIfExists: Boolean): Unit
    +
    +  /**
    +    * Deletes table from external Catalog
    +    *
    +    * @param dbName            database name
    +    * @param tableName         table name
    +    * @param ignoreIfNotExists whether to ignore operation if table not exist yet
    +    */
    +  def dropTable(dbName: String, tableName: String, ignoreIfNotExists: Boolean): Unit
    +
    +  /**
    +    * Modifies an existing table in the external catalog
    +    *
    +    * @param table             description of table which to modify
    +    * @param ignoreIfNotExists whether to ignore operation if table not exist yet
    +    */
    +  def alterTable(table: ExternalCatalogTable, ignoreIfNotExists: Boolean): Unit
    +
    +  /**
    +    * Gets table from external Catalog
    +    *
    +    * @param dbName    database name
    +    * @param tableName table name
    +    * @return table
    +    */
    +  def getTable(dbName: String, tableName: String): ExternalCatalogTable
    --- End diff --
    
    I would add more comments of the method. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3406#discussion_r105703538
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/InMemoryExternalCatalog.scala ---
    @@ -0,0 +1,155 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.catalog
    +
    +import org.apache.flink.table.api.{DatabaseAlreadyExistException, DatabaseNotExistException, TableAlreadyExistException, TableNotExistException}
    +import java.util.{List => JList}
    +
    +import scala.collection.mutable.HashMap
    +import scala.collection.JavaConverters._
    +
    +/**
    +  * This class is an in-memory implementation of [[ReadonlyExternalCatalog]].
    +  *
    +  * It could be used for testing or developing instead of used in production environment.
    +  */
    +class InMemoryExternalCatalog extends CRUDExternalCatalog {
    +
    +  private val databases = new HashMap[String, Database]
    +
    +  @throws[DatabaseNotExistException]
    +  @throws[TableAlreadyExistException]
    +  override def createTable(
    +      table: ExternalCatalogTable,
    +      ignoreIfExists: Boolean): Unit = synchronized {
    +    val dbName = table.identifier.database
    +    val tables = getTables(dbName)
    +    val tableName = table.identifier.table
    +    if (tables.contains(tableName)) {
    +      if (!ignoreIfExists) {
    +        throw new TableAlreadyExistException(dbName, tableName)
    +      }
    +    } else {
    +      tables.put(tableName, table)
    +    }
    +  }
    +
    +  @throws[DatabaseNotExistException]
    +  @throws[TableNotExistException]
    +  override def dropTable(
    +      dbName: String,
    +      tableName: String,
    +      ignoreIfNotExists: Boolean): Unit = synchronized {
    +    val tables = getTables(dbName)
    +    if (tables.remove(tableName).isEmpty && !ignoreIfNotExists) {
    +      throw new TableNotExistException(dbName, tableName)
    +    }
    +  }
    +
    +  @throws[DatabaseNotExistException]
    +  @throws[TableNotExistException]
    +  override def alterTable(
    +      table: ExternalCatalogTable,
    +      ignoreIfNotExists: Boolean): Unit = synchronized {
    +    val dbName = table.identifier.database
    +    val tables = getTables(dbName)
    +    val tableName = table.identifier.table
    +    if (tables.contains(tableName)) {
    +      tables.put(tableName, table)
    +    } else if (!ignoreIfNotExists) {
    +      throw new TableNotExistException(dbName, tableName)
    +    }
    +  }
    +
    +  @throws[DatabaseNotExistException]
    +  override def listTables(dbName: String): JList[String] = synchronized {
    +    val tables = getTables(dbName)
    +    tables.keys.toList.asJava
    +  }
    +
    +  @throws[DatabaseNotExistException]
    +  @throws[TableNotExistException]
    +  override def getTable(dbName: String, tableName: String): ExternalCatalogTable = synchronized {
    +    val tables = getTables(dbName)
    +    tables.get(tableName) match {
    +      case Some(table) => table
    +      case None => throw new TableNotExistException(dbName, tableName)
    +    }
    +  }
    +
    +  @throws[DatabaseAlreadyExistException]
    +  override def createDatabase(
    +      db: ExternalCatalogDatabase,
    +      ignoreIfExists: Boolean): Unit = synchronized {
    +    val dbName = db.dbName
    +    if (databases.contains(dbName)) {
    +      if (!ignoreIfExists) {
    +        throw new DatabaseAlreadyExistException(dbName)
    +      }
    +    } else {
    +      databases.put(dbName, new Database(db))
    +    }
    +  }
    +
    +  @throws[DatabaseNotExistException]
    +  override def alterDatabase(
    +      db: ExternalCatalogDatabase,
    +      ignoreIfNotExists: Boolean): Unit = synchronized {
    +    val dbName = db.dbName
    +    databases.get(dbName) match {
    +      case Some(database) => database.db = db
    +      case None =>
    +        if (!ignoreIfNotExists) {
    +          throw new DatabaseNotExistException(dbName)
    +        }
    +    }
    +  }
    +
    +  @throws[DatabaseNotExistException]
    +  override def dropDatabase(
    +      dbName: String,
    +      ignoreIfNotExists: Boolean): Unit = synchronized {
    +    if (databases.remove(dbName).isEmpty && !ignoreIfNotExists) {
    +      throw new DatabaseNotExistException(dbName)
    +    }
    +  }
    +
    +  override def listDatabases(): JList[String] = synchronized {
    +    databases.keys.toList.asJava
    +  }
    +
    +  @throws[DatabaseNotExistException]
    +  override def getDatabase(dbName: String): ExternalCatalogDatabase = synchronized {
    +    databases.get(dbName) match {
    +      case Some(database) => database.db
    +      case None => null
    --- End diff --
    
    throw `DatabaseNotExistException`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---