You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "lqjacklee (Jira)" <ji...@apache.org> on 2021/06/23 12:52:00 UTC

[jira] [Created] (FLINK-23122) Provide the Dynamic register converter

lqjacklee created FLINK-23122:
---------------------------------

             Summary: Provide the Dynamic register converter 
                 Key: FLINK-23122
                 URL: https://issues.apache.org/jira/browse/FLINK-23122
             Project: Flink
          Issue Type: Improvement
          Components: Connectors / Common, Connectors / HBase, Connectors / Hive, Connectors / JDBC, Connectors / ORC, Table SQL / API
    Affects Versions: 1.14.0
            Reporter: lqjacklee


Background:

Type conversion is the core of direct data conversion between Flink and data source. By default, Flink provides type conversion for different connectors. Different transformation logic is distributed in the specific implementation of multiple connectors. It brings a big problem to the reuse of Flink system. Secondly, due to the diversity of different types of data sources, the original transformation needs to be extended, and the original transformation does not have dynamic expansion. Finally, the core of the transformation logic needs to be reused in multiple projects, hoping to abstract the transformation logic into a unified processing. The application program directly depends on the same type transformation system, and different sub components can dynamically expand the types of transformation.


1, ConvertServiceRegister : provide register and search function. 

{code:java}
public interface ConvertServiceRegister {

    void register(ConversionService conversionService);

    void register(ConversionServiceFactory conversionServiceFactory);

    void register(ConversionServiceSet conversionServiceSet);

    Collection<ConversionService> convertServices();

    Collection<ConversionService> convertServices(String group);

    Collection<ConversionServiceSet> convertServiceSets();

    Collection<ConversionServiceSet> convertServiceSets(String group);
}
{code}

2, ConversionService : provide the implement.

{code:java}
public interface ConversionService<S, T> extends Order {

    Set<String> tags();

    boolean canConvert(TypeInformationHolder source, TypeInformationHolder target)
            throws ConvertException;

    Object convert(
            TypeInformationHolder sourceType,
            Object source,
            TypeInformationHolder targetType,
            Object defaultValue,
            boolean nullable)
            throws ConvertException;
}
{code}


3, ConversionServiceFactory : provide the conversion service factory function.

{code:java}
public interface ConversionServiceFactory<T> extends Order {

    Set<String> tags();

    ConversionService<?, T> getConversionService(T target) throws ConvertException;
}
{code}


4, ConversionServiceSet : provide group management.

{code:java}
public interface ConversionServiceSet extends Loadable {

    Set<String> tags();

    Collection<ConversionService> conversionServices();

    boolean support(TypeInformationHolder source, TypeInformationHolder target)
            throws ConvertException;

    Object convert(
            String name,
            TypeInformationHolder typeInformationHolder,
            Object value,
            TypeInformationHolder type,
            Object defaultValue,
            boolean nullable)
            throws ConvertException;
}
{code}






--
This message was sent by Atlassian Jira
(v8.3.4#803005)