You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by "yinghua_zh@163.com" <yi...@163.com> on 2021/02/02 08:48:12 UTC

Flink SQL关于'connector' = 'filesystem‘的问题求助!

今天在使用Flink 1.11.3版本使用Flink SQL将kafka中数据导入到HDFS上时提示如下的错误
Caused by: org.apache.flink.table.api.TableException: Could not load service provider for factories.  
at org.apache.flink.table.factories.FactoryUtil.discoverFactories(FactoryUtil.java:346)  
at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:221)  
at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)  
at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:118)  
... 39 more  
Caused by: java.util.ServiceConfigurationError: org.apache.flink.table.factories.Factory: Provider org.apache.flink.core.fs.HadoopFsFactory not found  
at java.util.ServiceLoader.fail(ServiceLoader.java:239)  
at java.util.ServiceLoader.access$300(ServiceLoader.java:185)  
at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:372)  
at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)  
at java.util.ServiceLoader$1.next(ServiceLoader.java:480)  
at java.util.Iterator.forEachRemaining(Iterator.java:116)  

SQL语句为:
CREATE TABLE hdfs_table (
   content STRING,
   dt STRING,
   h STRING
) PARTITIONED BY (dt, h) WITH (
  'connector' = 'filesystem',
  'path '= 'hdfs://hdfsCluster/tmp/zyh_test',
  'format' = 'csv'
);

出错后我们在代码中没看到有HDFS实现DynamicTableSinkFactory的相关类,是不是FlinkSQL不支持写入到HDFS中?通过Hive的connector来实现?

在测试前我们按照官方文档如下的操作,添加HDFS的相关类
org.apache.flink.table.factories.Factory 中为:org.apache.flink.core.fs.HadoopFsFactory
org.apache.flink.table.factories.TableFactory中为:org.apache.flink.table.filesystem.FileSystemTableFactory   但是添加后报上述错误

添加新的可插拔文件系统实现
文件系统通过org.apache.flink.core.fs.FileSystem类表示,该类捕获访问和修改该文件系统中文件和对象的方式。
要添加新的文件系统:
添加文件系统实现,它是的子类org.apache.flink.core.fs.FileSystem。
添加一个实例化该文件系统并声明用于注册FileSystem的方案的工厂。这必须是的子类org.apache.flink.core.fs.FileSystemFactory。
添加服务条目。创建一个META-INF/services/org.apache.flink.core.fs.FileSystemFactory包含文件系统工厂类的类名的文件(有关更多详细信息,请参见Java Service Loader文档)。
在插件发现期间,文件系统工厂类将由专用的Java类加载器加载,以避免与其他插件和Flink组件发生类冲突。在文件系统实例化和文件系统操作调用期间,应使用相同的类加载器。









yinghua_zh@163.com