You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Jingsong Lee (Jira)" <ji...@apache.org> on 2020/02/05 07:58:00 UTC

[jira] [Created] (FLINK-15912) Add Context to improve TableSourceFactory and TableSinkFactory

Jingsong Lee created FLINK-15912:
------------------------------------

             Summary: Add Context to improve TableSourceFactory and TableSinkFactory
                 Key: FLINK-15912
                 URL: https://issues.apache.org/jira/browse/FLINK-15912
             Project: Flink
          Issue Type: Improvement
          Components: Table SQL / API
            Reporter: Jingsong Lee
             Fix For: 1.11.0


Discussion in: [http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Improve-TableFactory-td36647.html]

Vote in: [http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Improve-TableFactory-to-add-Context-td37211.html]

Motivation:
Now the main needs and problems are:
 * Connector can't get TableConfig[1], and some behaviors really need to be
controlled by the user's table configuration. In the era of catalog, we
can't put these config in connector properties, which is too inconvenient.
 * A context class also allows for future modifications without touching the TableFactory interface again.

Interface:
{code:java}
  public interface TableSourceFactory<T> extends TableFactory {
   ......

   /**
    * Creates and configures a {@link TableSource} based on the given
{@link Context}.
    *
    * @param context context of this table source.
    * @return the configured table source.
    */
   default TableSource<T> createTableSource(Context context) {
      return createTableSource(
            context.getObjectIdentifier().toObjectPath(),
            context.getTable());
   }
   /**
    * Context of table source creation. Contains table information and
environment information.
    */
   interface Context {
      /**
       * @return full identifier of the given {@link CatalogTable}.
       */
      ObjectIdentifier getObjectIdentifier();
      /**
       * @return table {@link CatalogTable} instance.
       */
      CatalogTable getTable();
      /**
       * @return readable config of this table environment.
       */
      ReadableConfig getConfiguration();
   }
}

public interface TableSinkFactory<T> extends TableFactory {
   ......
   /**
    * Creates and configures a {@link TableSink} based on the given
{@link Context}.
    *
    * @param context context of this table sink.
    * @return the configured table sink.
    */
   default TableSink<T> createTableSink(Context context) {
      return createTableSink(
            context.getObjectIdentifier().toObjectPath(),
            context.getTable());
   }
   /**
    * Context of table sink creation. Contains table information and
environment information.
    */
   interface Context {
      /**
       * @return full identifier of the given {@link CatalogTable}.
       */
      ObjectIdentifier getObjectIdentifier();
      /**
       * @return table {@link CatalogTable} instance.
       */
      CatalogTable getTable();
      /**
       * @return readable config of this table environment.
       */
      ReadableConfig getConfiguration();
   }
}
{code}
Add inner class into TableSourceFactory and TableSinkFactory, the reason they are defined repeatedly is that source and sink may need different properties in the future.

 

[1] https://issues.apache.org/jira/browse/FLINK-15290



--
This message was sent by Atlassian Jira
(v8.3.4#803005)