You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by wx...@163.com on 2020/03/28 09:38:08 UTC

实现 KafkaUpsertTableSink

各位大佬:

                由于现在的 KafkaTableSink 不支持 sql 中有group ,参照
KafkaTableSink 和 HbaseUpsertTableSink  的一套逻辑实现了一套
KafkaUpsertTableSink:

                KafkaUpsertTableSink

KafkaUpsertTableSinkBase

KafkaUpsertTableSourceSinkFactory

KafkaUpsertTableSourceSinkFactoryBase

MyKafkaValidator

但是在 TableFactoryService. discoverFactories 的方法中不能加载我定义的
KafkaUpsertTableSourceSinkFactory ,请问各位大佬,是否是需要在什么地方注册
呢?

 


/**
* Searches for factories using Java service providers.
*
* @return all factories in the classpath
*/
private static List<TableFactory> discoverFactories(Optional<ClassLoader>
classLoader) {
   try {
      List<TableFactory> result = new LinkedList<>();
      ClassLoader cl =
classLoader.orElse(Thread.currentThread().getContextClassLoader());
      ServiceLoader
         .load(TableFactory.class, cl)
         .iterator()
         .forEachRemaining(result::add);
      //todo add
      result.add(new KafkaUpsertTableSourceSinkFactory());
      return result;
   } catch (ServiceConfigurationError e) {
      LOG.error("Could not load service provider for table factories.", e);
      throw new TableException("Could not load service provider for table
factories.", e);
   }

}

 

 

直接在返回的 result 里面添加对应的  KafkaUpsertTableSourceSinkFactory  是可
以成功运行的。

非常感谢

 

 

------------------

Thanks 

venn

 


Re: Re: 实现 KafkaUpsertTableSink

Posted by "wanglei2@geekplus.com.cn" <wa...@geekplus.com.cn>.
我只保留 KafkaRetractTableSourceSinkFactory  一个, KafkaRetractTableSinkBase 实现 RetractStreamTableSink 接口,在 consumeDataStream 实现只有 True 才发送,最终 work 了。
@Override
public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
   DataStream dtNeed = dataStream.filter(x -> x.f0 == Boolean.TRUE).map(x -> x.f1);

INSERT INTO table1 SELCET field, count(*) from table2 group by field     这是 一个 RetractStream,结果里面会有 True/False, 通过这个过滤是可以的。
INSERT INTO table1 SELECT feild, 1 from table2                                            我理解这不是一个 RetractStream, 上面  dataStream.filter(x -> x.f0 == Boolean.TRUE) 的代码应该会出错,但实际上没有出错

还不是完全能理解,我再看一下吧。

谢谢,
王磊


wanglei2@geekplus.com.cn 

Sender: Benchao Li
Send Time: 2020-03-31 12:02
Receiver: user-zh
Subject: Re: Re: 实现 KafkaUpsertTableSink
我理解你可以让KafkaRetractTableSourceSinkFactory的参数跟KafkaTableSourceSinkFactory的参数有不同的地方,
然后通过这个参数来区分两个不同的factory。比如加一个参数,表示这个sink是retract还是append类型之类的?
 
wanglei2@geekplus.com.cn <wa...@geekplus.com.cn> 于2020年3月31日周二 上午11:17写道:
 
> 这个应该是得到两个 tableFactories 了。 我直接平行的把 KafkaTableSourceSinkFactory 那一套变成
> KafkaRetractTableSourceSinkFactory 写了一遍
> 但这个应该怎样改才合适呢?
>
>     137         private static <T extends TableFactory> T
> findSingleInternal(
>     138                         Class<T> factoryClass,
>     139                         Map<String, String> properties,
>     140                         Optional<ClassLoader> classLoader) {
>     141
>     142                 List<TableFactory> tableFactories =
> discoverFactories(classLoader);
>     143                 List<T> filtered = filter(tableFactories,
> factoryClass, properties);
>     144
>     145                 if (filtered.size() > 1) {
>     146                         throw new AmbiguousTableFactoryException(
>     147                                 filtered,
>     148                                 factoryClass,
>     149                                 tableFactories,
>     150                                 properties);
>     151                 } else {
>     152                         return filtered.get(0);
>     153                 }
>     154         }
>
>
> 谢谢,
> 王磊
>
>
> wanglei2@geekplus.com.cn
>
>
> Sender: wanglei2@geekplus.com.cn
> Send Time: 2020-03-31 10:50
> Receiver: user-zh
> Subject: Re: RE: 实现 KafkaUpsertTableSink
>
> 我以相同的方式 实现了一个  KafkaRetractTableSink, 然后打成 jar 包放在 lib 目录下启动 sql-client:
>
> org.apache.flink.table.planner.delegation.BlinkExecutorFactory
>         at
> org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:146)
>         at
> org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113)
>         at
> org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:377)
>         at
> org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$6(ExecutionContext.java:559)
>         at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684)
>         at
> org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:557)
>         at
> org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:494)
>         at
> org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:159)
>         at
> org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:118)
>         at
> org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:742)
>         ... 3 more
>
> 这个改怎样解决呢?
>
> 谢谢,
> 王磊
>
>
>
> wanglei2@geekplus.com.cn
>
> Sender: wxchunjhyy@163.com
> Send Time: 2020-03-29 10:32
> Receiver: user-zh@flink.apache.org
> Subject: RE: 实现 KafkaUpsertTableSink
> Benchao,非常感谢大佬,之前不知道需要添加 Factory 到文件中,添加之后,可以正常运行了。
> -----Original Message-----
> From: user-zh-return-2640-wxchunjhyy=163.com@flink.apache.org
> <us...@flink.apache.org> On Behalf Of
> Benchao Li
> Sent: Saturday, March 28, 2020 6:28 PM
> To: user-zh <us...@flink.apache.org>
> Subject: Re: 实现 KafkaUpsertTableSink
> Hi,
> 你需要把你新增的Factory添加到 resources下的
>
> META-INF/services/org.apache.flink.table.factories.TableFactory文件中,不知道这一步你是否已经做过了呢?
> <wx...@163.com> 于2020年3月28日周六 下午5:38写道:
> > 各位大佬:
> >
> >                 由于现在的 KafkaTableSink 不支持 sql 中有group ,参照
> > KafkaTableSink 和 HbaseUpsertTableSink  的一套逻辑实现了一套
> > KafkaUpsertTableSink:
> >
> >                 KafkaUpsertTableSink
> >
> > KafkaUpsertTableSinkBase
> >
> > KafkaUpsertTableSourceSinkFactory
> >
> > KafkaUpsertTableSourceSinkFactoryBase
> >
> > MyKafkaValidator
> >
> > 但是在 TableFactoryService. discoverFactories 的方法中不能加载我定义的
> > KafkaUpsertTableSourceSinkFactory ,请问各位大佬,是否是需要在什么地方注册
> > 呢?
> >
> >
> >
> >
> > /**
> > * Searches for factories using Java service providers.
> > *
> > * @return all factories in the classpath */ private static
> > List<TableFactory> discoverFactories(Optional<ClassLoader>
> > classLoader) {
> >    try {
> >       List<TableFactory> result = new LinkedList<>();
> >       ClassLoader cl =
> > classLoader.orElse(Thread.currentThread().getContextClassLoader());
> >       ServiceLoader
> >          .load(TableFactory.class, cl)
> >          .iterator()
> >          .forEachRemaining(result::add);
> >       //todo add
> >       result.add(new KafkaUpsertTableSourceSinkFactory());
> >       return result;
> >    } catch (ServiceConfigurationError e) {
> >       LOG.error("Could not load service provider for table factories.",
> e);
> >       throw new TableException("Could not load service provider for
> > table factories.", e);
> >    }
> >
> > }
> >
> >
> >
> >
> >
> > 直接在返回的 result 里面添加对应的  KafkaUpsertTableSourceSinkFactory  是可
> > 以成功运行的。
> >
> > 非常感谢
> >
> >
> >
> >
> >
> > ------------------
> >
> > Thanks
> >
> > venn
> >
> >
> >
> >
> --
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenchao@gmail.com; libenchao@pku.edu.cn
>
 
 
-- 
 
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenchao@gmail.com; libenchao@pku.edu.cn

Re: Re: 实现 KafkaUpsertTableSink

Posted by Benchao Li <li...@gmail.com>.
我理解你可以让KafkaRetractTableSourceSinkFactory的参数跟KafkaTableSourceSinkFactory的参数有不同的地方,
然后通过这个参数来区分两个不同的factory。比如加一个参数,表示这个sink是retract还是append类型之类的?

wanglei2@geekplus.com.cn <wa...@geekplus.com.cn> 于2020年3月31日周二 上午11:17写道:

> 这个应该是得到两个 tableFactories 了。 我直接平行的把 KafkaTableSourceSinkFactory 那一套变成
> KafkaRetractTableSourceSinkFactory 写了一遍
> 但这个应该怎样改才合适呢?
>
>     137         private static <T extends TableFactory> T
> findSingleInternal(
>     138                         Class<T> factoryClass,
>     139                         Map<String, String> properties,
>     140                         Optional<ClassLoader> classLoader) {
>     141
>     142                 List<TableFactory> tableFactories =
> discoverFactories(classLoader);
>     143                 List<T> filtered = filter(tableFactories,
> factoryClass, properties);
>     144
>     145                 if (filtered.size() > 1) {
>     146                         throw new AmbiguousTableFactoryException(
>     147                                 filtered,
>     148                                 factoryClass,
>     149                                 tableFactories,
>     150                                 properties);
>     151                 } else {
>     152                         return filtered.get(0);
>     153                 }
>     154         }
>
>
> 谢谢,
> 王磊
>
>
> wanglei2@geekplus.com.cn
>
>
> Sender: wanglei2@geekplus.com.cn
> Send Time: 2020-03-31 10:50
> Receiver: user-zh
> Subject: Re: RE: 实现 KafkaUpsertTableSink
>
> 我以相同的方式 实现了一个  KafkaRetractTableSink, 然后打成 jar 包放在 lib 目录下启动 sql-client:
>
> org.apache.flink.table.planner.delegation.BlinkExecutorFactory
>         at
> org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:146)
>         at
> org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113)
>         at
> org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:377)
>         at
> org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$6(ExecutionContext.java:559)
>         at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684)
>         at
> org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:557)
>         at
> org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:494)
>         at
> org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:159)
>         at
> org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:118)
>         at
> org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:742)
>         ... 3 more
>
> 这个改怎样解决呢?
>
> 谢谢,
> 王磊
>
>
>
> wanglei2@geekplus.com.cn
>
> Sender: wxchunjhyy@163.com
> Send Time: 2020-03-29 10:32
> Receiver: user-zh@flink.apache.org
> Subject: RE: 实现 KafkaUpsertTableSink
> Benchao,非常感谢大佬,之前不知道需要添加 Factory 到文件中,添加之后,可以正常运行了。
> -----Original Message-----
> From: user-zh-return-2640-wxchunjhyy=163.com@flink.apache.org
> <us...@flink.apache.org> On Behalf Of
> Benchao Li
> Sent: Saturday, March 28, 2020 6:28 PM
> To: user-zh <us...@flink.apache.org>
> Subject: Re: 实现 KafkaUpsertTableSink
> Hi,
> 你需要把你新增的Factory添加到 resources下的
>
> META-INF/services/org.apache.flink.table.factories.TableFactory文件中,不知道这一步你是否已经做过了呢?
> <wx...@163.com> 于2020年3月28日周六 下午5:38写道:
> > 各位大佬:
> >
> >                 由于现在的 KafkaTableSink 不支持 sql 中有group ,参照
> > KafkaTableSink 和 HbaseUpsertTableSink  的一套逻辑实现了一套
> > KafkaUpsertTableSink:
> >
> >                 KafkaUpsertTableSink
> >
> > KafkaUpsertTableSinkBase
> >
> > KafkaUpsertTableSourceSinkFactory
> >
> > KafkaUpsertTableSourceSinkFactoryBase
> >
> > MyKafkaValidator
> >
> > 但是在 TableFactoryService. discoverFactories 的方法中不能加载我定义的
> > KafkaUpsertTableSourceSinkFactory ,请问各位大佬,是否是需要在什么地方注册
> > 呢?
> >
> >
> >
> >
> > /**
> > * Searches for factories using Java service providers.
> > *
> > * @return all factories in the classpath */ private static
> > List<TableFactory> discoverFactories(Optional<ClassLoader>
> > classLoader) {
> >    try {
> >       List<TableFactory> result = new LinkedList<>();
> >       ClassLoader cl =
> > classLoader.orElse(Thread.currentThread().getContextClassLoader());
> >       ServiceLoader
> >          .load(TableFactory.class, cl)
> >          .iterator()
> >          .forEachRemaining(result::add);
> >       //todo add
> >       result.add(new KafkaUpsertTableSourceSinkFactory());
> >       return result;
> >    } catch (ServiceConfigurationError e) {
> >       LOG.error("Could not load service provider for table factories.",
> e);
> >       throw new TableException("Could not load service provider for
> > table factories.", e);
> >    }
> >
> > }
> >
> >
> >
> >
> >
> > 直接在返回的 result 里面添加对应的  KafkaUpsertTableSourceSinkFactory  是可
> > 以成功运行的。
> >
> > 非常感谢
> >
> >
> >
> >
> >
> > ------------------
> >
> > Thanks
> >
> > venn
> >
> >
> >
> >
> --
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenchao@gmail.com; libenchao@pku.edu.cn
>


-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenchao@gmail.com; libenchao@pku.edu.cn

Re: Re: 实现 KafkaUpsertTableSink

Posted by "wanglei2@geekplus.com.cn" <wa...@geekplus.com.cn>.
这个应该是得到两个 tableFactories 了。 我直接平行的把 KafkaTableSourceSinkFactory 那一套变成 KafkaRetractTableSourceSinkFactory 写了一遍
但这个应该怎样改才合适呢?

    137         private static <T extends TableFactory> T findSingleInternal(
    138                         Class<T> factoryClass,
    139                         Map<String, String> properties,
    140                         Optional<ClassLoader> classLoader) {
    141
    142                 List<TableFactory> tableFactories = discoverFactories(classLoader);
    143                 List<T> filtered = filter(tableFactories, factoryClass, properties);
    144
    145                 if (filtered.size() > 1) {
    146                         throw new AmbiguousTableFactoryException(
    147                                 filtered,
    148                                 factoryClass,
    149                                 tableFactories,
    150                                 properties);
    151                 } else {
    152                         return filtered.get(0);
    153                 }
    154         }


谢谢,
王磊


wanglei2@geekplus.com.cn 

 
Sender: wanglei2@geekplus.com.cn
Send Time: 2020-03-31 10:50
Receiver: user-zh
Subject: Re: RE: 实现 KafkaUpsertTableSink
 
我以相同的方式 实现了一个  KafkaRetractTableSink, 然后打成 jar 包放在 lib 目录下启动 sql-client:
 
org.apache.flink.table.planner.delegation.BlinkExecutorFactory
        at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:146)
        at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113)
        at org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:377)
        at org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$6(ExecutionContext.java:559)
        at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684)
        at org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:557)
        at org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:494)
        at org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:159)
        at org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:118)
        at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:742)
        ... 3 more
 
这个改怎样解决呢?
 
谢谢,
王磊
 
 
 
wanglei2@geekplus.com.cn 
 
Sender: wxchunjhyy@163.com
Send Time: 2020-03-29 10:32
Receiver: user-zh@flink.apache.org
Subject: RE: 实现 KafkaUpsertTableSink
Benchao,非常感谢大佬,之前不知道需要添加 Factory 到文件中,添加之后,可以正常运行了。
-----Original Message-----
From: user-zh-return-2640-wxchunjhyy=163.com@flink.apache.org <us...@flink.apache.org> On Behalf Of Benchao Li
Sent: Saturday, March 28, 2020 6:28 PM
To: user-zh <us...@flink.apache.org>
Subject: Re: 实现 KafkaUpsertTableSink
Hi,
你需要把你新增的Factory添加到 resources下的
META-INF/services/org.apache.flink.table.factories.TableFactory文件中,不知道这一步你是否已经做过了呢?
<wx...@163.com> 于2020年3月28日周六 下午5:38写道:
> 各位大佬:
>
>                 由于现在的 KafkaTableSink 不支持 sql 中有group ,参照
> KafkaTableSink 和 HbaseUpsertTableSink  的一套逻辑实现了一套
> KafkaUpsertTableSink:
>
>                 KafkaUpsertTableSink
>
> KafkaUpsertTableSinkBase
>
> KafkaUpsertTableSourceSinkFactory
>
> KafkaUpsertTableSourceSinkFactoryBase
>
> MyKafkaValidator
>
> 但是在 TableFactoryService. discoverFactories 的方法中不能加载我定义的
> KafkaUpsertTableSourceSinkFactory ,请问各位大佬,是否是需要在什么地方注册
> 呢?
>
>
>
>
> /**
> * Searches for factories using Java service providers.
> *
> * @return all factories in the classpath */ private static 
> List<TableFactory> discoverFactories(Optional<ClassLoader>
> classLoader) {
>    try {
>       List<TableFactory> result = new LinkedList<>();
>       ClassLoader cl =
> classLoader.orElse(Thread.currentThread().getContextClassLoader());
>       ServiceLoader
>          .load(TableFactory.class, cl)
>          .iterator()
>          .forEachRemaining(result::add);
>       //todo add
>       result.add(new KafkaUpsertTableSourceSinkFactory());
>       return result;
>    } catch (ServiceConfigurationError e) {
>       LOG.error("Could not load service provider for table factories.", e);
>       throw new TableException("Could not load service provider for 
> table factories.", e);
>    }
>
> }
>
>
>
>
>
> 直接在返回的 result 里面添加对应的  KafkaUpsertTableSourceSinkFactory  是可
> 以成功运行的。
>
> 非常感谢
>
>
>
>
>
> ------------------
>
> Thanks
>
> venn
>
>
>
>
-- 
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenchao@gmail.com; libenchao@pku.edu.cn

Re: RE: 实现 KafkaUpsertTableSink

Posted by "wanglei2@geekplus.com.cn" <wa...@geekplus.com.cn>.
我以相同的方式 实现了一个  KafkaRetractTableSink, 然后打成 jar 包放在 lib 目录下启动 sql-client:

org.apache.flink.table.planner.delegation.BlinkExecutorFactory
        at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:146)
        at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113)
        at org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:377)
        at org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$6(ExecutionContext.java:559)
        at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684)
        at org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:557)
        at org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:494)
        at org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:159)
        at org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:118)
        at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:742)
        ... 3 more

这个改怎样解决呢?

谢谢,
王磊



wanglei2@geekplus.com.cn 

 
Sender: wxchunjhyy@163.com
Send Time: 2020-03-29 10:32
Receiver: user-zh@flink.apache.org
Subject: RE: 实现 KafkaUpsertTableSink
Benchao,非常感谢大佬,之前不知道需要添加 Factory 到文件中,添加之后,可以正常运行了。
 
 
-----Original Message-----
From: user-zh-return-2640-wxchunjhyy=163.com@flink.apache.org <us...@flink.apache.org> On Behalf Of Benchao Li
Sent: Saturday, March 28, 2020 6:28 PM
To: user-zh <us...@flink.apache.org>
Subject: Re: 实现 KafkaUpsertTableSink
 
Hi,
 
你需要把你新增的Factory添加到 resources下的
META-INF/services/org.apache.flink.table.factories.TableFactory文件中,不知道这一步你是否已经做过了呢?
 
<wx...@163.com> 于2020年3月28日周六 下午5:38写道:
 
> 各位大佬:
>
>                 由于现在的 KafkaTableSink 不支持 sql 中有group ,参照
> KafkaTableSink 和 HbaseUpsertTableSink  的一套逻辑实现了一套
> KafkaUpsertTableSink:
>
>                 KafkaUpsertTableSink
>
> KafkaUpsertTableSinkBase
>
> KafkaUpsertTableSourceSinkFactory
>
> KafkaUpsertTableSourceSinkFactoryBase
>
> MyKafkaValidator
>
> 但是在 TableFactoryService. discoverFactories 的方法中不能加载我定义的
> KafkaUpsertTableSourceSinkFactory ,请问各位大佬,是否是需要在什么地方注册
> 呢?
>
>
>
>
> /**
> * Searches for factories using Java service providers.
> *
> * @return all factories in the classpath */ private static 
> List<TableFactory> discoverFactories(Optional<ClassLoader>
> classLoader) {
>    try {
>       List<TableFactory> result = new LinkedList<>();
>       ClassLoader cl =
> classLoader.orElse(Thread.currentThread().getContextClassLoader());
>       ServiceLoader
>          .load(TableFactory.class, cl)
>          .iterator()
>          .forEachRemaining(result::add);
>       //todo add
>       result.add(new KafkaUpsertTableSourceSinkFactory());
>       return result;
>    } catch (ServiceConfigurationError e) {
>       LOG.error("Could not load service provider for table factories.", e);
>       throw new TableException("Could not load service provider for 
> table factories.", e);
>    }
>
> }
>
>
>
>
>
> 直接在返回的 result 里面添加对应的  KafkaUpsertTableSourceSinkFactory  是可
> 以成功运行的。
>
> 非常感谢
>
>
>
>
>
> ------------------
>
> Thanks
>
> venn
>
>
>
>
 
-- 
 
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenchao@gmail.com; libenchao@pku.edu.cn

RE: 实现 KafkaUpsertTableSink

Posted by wx...@163.com.
Benchao,非常感谢大佬,之前不知道需要添加 Factory 到文件中,添加之后,可以正常运行了。


-----Original Message-----
From: user-zh-return-2640-wxchunjhyy=163.com@flink.apache.org <us...@flink.apache.org> On Behalf Of Benchao Li
Sent: Saturday, March 28, 2020 6:28 PM
To: user-zh <us...@flink.apache.org>
Subject: Re: 实现 KafkaUpsertTableSink

Hi,

你需要把你新增的Factory添加到 resources下的
META-INF/services/org.apache.flink.table.factories.TableFactory文件中,不知道这一步你是否已经做过了呢?

<wx...@163.com> 于2020年3月28日周六 下午5:38写道:

> 各位大佬:
>
>                 由于现在的 KafkaTableSink 不支持 sql 中有group ,参照
> KafkaTableSink 和 HbaseUpsertTableSink  的一套逻辑实现了一套
> KafkaUpsertTableSink:
>
>                 KafkaUpsertTableSink
>
> KafkaUpsertTableSinkBase
>
> KafkaUpsertTableSourceSinkFactory
>
> KafkaUpsertTableSourceSinkFactoryBase
>
> MyKafkaValidator
>
> 但是在 TableFactoryService. discoverFactories 的方法中不能加载我定义的
> KafkaUpsertTableSourceSinkFactory ,请问各位大佬,是否是需要在什么地方注册
> 呢?
>
>
>
>
> /**
> * Searches for factories using Java service providers.
> *
> * @return all factories in the classpath */ private static 
> List<TableFactory> discoverFactories(Optional<ClassLoader>
> classLoader) {
>    try {
>       List<TableFactory> result = new LinkedList<>();
>       ClassLoader cl =
> classLoader.orElse(Thread.currentThread().getContextClassLoader());
>       ServiceLoader
>          .load(TableFactory.class, cl)
>          .iterator()
>          .forEachRemaining(result::add);
>       //todo add
>       result.add(new KafkaUpsertTableSourceSinkFactory());
>       return result;
>    } catch (ServiceConfigurationError e) {
>       LOG.error("Could not load service provider for table factories.", e);
>       throw new TableException("Could not load service provider for 
> table factories.", e);
>    }
>
> }
>
>
>
>
>
> 直接在返回的 result 里面添加对应的  KafkaUpsertTableSourceSinkFactory  是可
> 以成功运行的。
>
> 非常感谢
>
>
>
>
>
> ------------------
>
> Thanks
>
> venn
>
>
>
>

-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenchao@gmail.com; libenchao@pku.edu.cn

Re: 实现 KafkaUpsertTableSink

Posted by Benchao Li <li...@gmail.com>.
Hi,

你需要把你新增的Factory添加到 resources下的
META-INF/services/org.apache.flink.table.factories.TableFactory文件中,不知道这一步你是否已经做过了呢?

<wx...@163.com> 于2020年3月28日周六 下午5:38写道:

> 各位大佬:
>
>                 由于现在的 KafkaTableSink 不支持 sql 中有group ,参照
> KafkaTableSink 和 HbaseUpsertTableSink  的一套逻辑实现了一套
> KafkaUpsertTableSink:
>
>                 KafkaUpsertTableSink
>
> KafkaUpsertTableSinkBase
>
> KafkaUpsertTableSourceSinkFactory
>
> KafkaUpsertTableSourceSinkFactoryBase
>
> MyKafkaValidator
>
> 但是在 TableFactoryService. discoverFactories 的方法中不能加载我定义的
> KafkaUpsertTableSourceSinkFactory ,请问各位大佬,是否是需要在什么地方注册
> 呢?
>
>
>
>
> /**
> * Searches for factories using Java service providers.
> *
> * @return all factories in the classpath
> */
> private static List<TableFactory> discoverFactories(Optional<ClassLoader>
> classLoader) {
>    try {
>       List<TableFactory> result = new LinkedList<>();
>       ClassLoader cl =
> classLoader.orElse(Thread.currentThread().getContextClassLoader());
>       ServiceLoader
>          .load(TableFactory.class, cl)
>          .iterator()
>          .forEachRemaining(result::add);
>       //todo add
>       result.add(new KafkaUpsertTableSourceSinkFactory());
>       return result;
>    } catch (ServiceConfigurationError e) {
>       LOG.error("Could not load service provider for table factories.", e);
>       throw new TableException("Could not load service provider for table
> factories.", e);
>    }
>
> }
>
>
>
>
>
> 直接在返回的 result 里面添加对应的  KafkaUpsertTableSourceSinkFactory  是可
> 以成功运行的。
>
> 非常感谢
>
>
>
>
>
> ------------------
>
> Thanks
>
> venn
>
>
>
>

-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenchao@gmail.com; libenchao@pku.edu.cn