You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by z y xing <ch...@gmail.com> on 2022/05/19 08:05:52 UTC

s3p 如果在本地调试

各位好:
了解实际运行是要复制jar到plugin下,但是调试的话用该怎么初始化s3p这个文件系统了?

flink版本 1.14,win10
项目通过flink-quick-start创建,在pom中添加了如下依赖

<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-s3-fs-presto</artifactId>
   <version>${flink.version}</version>
</dependency>

初始代码类似如下:

Configuration fileSystemConf = new Configuration();

fileSystemConf.setBoolean("presto.s3.connection.ssl.enabled", false);
fileSystemConf.setString("presto.s3.access-key", "minioadmin");
fileSystemConf.setString("presto.s3.secret-key", "minioadmin");
fileSystemConf.setString("presto.s3.endpoint", "http://127.0.0.1:9000");

FileSystem.initialize(fileSystemConf);

Path path = new Path("s3p://test/");
System.out.println(path.getFileSystem().exists(path));

但是会抛出如下异常:
Exception in thread "main"
org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not
find a file system implementation for scheme 's3p'. The scheme is directly
supported by Flink through the following plugin: flink-s3-fs-presto. Please
ensure that each plugin resides within its own subfolder within the plugins
directory. See
https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for
more information. If you want to use a Hadoop file system for that scheme,
please add the scheme to the configuration fs.allowed-fallback-filesystems.
For a full list of supported file systems, please see
https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/.
at
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:515)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:409)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:274)
at org.example.StreamingJob.main(StreamingJob.java:58)

但是神奇的是,我可以用s3a
初始化配置如下:

fileSystemConf.setBoolean("fs.s3a.connection.ssl.enabled", false);
fileSystemConf.setString("fs.s3a.endpoint", "http://127.0.0.1:9000");
fileSystemConf.setString("fs.s3a.access.key", "minioadmin");
fileSystemConf.setString("fs.s3a.secret.key", "minioadmin");
fileSystemConf.setString("fs.s3a.path.style.access", "true");
fileSystemConf.setString("fs.s3a.impl",
"org.apache.hadoop.fs.s3a.S3AFileSystem");


谢谢!

Re: s3p 如果在本地调试

Posted by 邢振宇 <zy...@gmail.com>.
不好意思,前面那个邮箱忘记订阅了,没看到答复。
我reload过的
不过已经发现原因了:是如果直接IDEA调试,且同时有 flink-s3-fs-presto 和
flink-s3-fs-hadoop,*务必保证  flink-s3-fs-presto
的依赖在  flink-s3-fs-hadoop 前面*,否则会有因为依赖加载顺序的原因使用了错误的依赖导致 s3p
的filesystemfactory 加载失败。这个是我后来打开全部日志后发现的堆栈。

Weihua Hu <hu...@gmail.com> 于2022年5月19日周四 21:00写道:

> Hi,
> 你是在 IDEA 中运行吗?我增加相关的 pom 依赖后在 wordcount 中可以正常运行,可以 idea maven reload
> project 试试
>
> Best,
> Weihua
>
> > 2022年5月19日 下午4:05,z y xing <ch...@gmail.com> 写道:
> >
> > 各位好:
> > 了解实际运行是要复制jar到plugin下,但是调试的话用该怎么初始化s3p这个文件系统了?
> >
> > flink版本 1.14,win10
> > 项目通过flink-quick-start创建,在pom中添加了如下依赖
> >
> > <dependency>
> >   <groupId>org.apache.flink</groupId>
> >   <artifactId>flink-s3-fs-presto</artifactId>
> >   <version>${flink.version}</version>
> > </dependency>
> >
> > 初始代码类似如下:
> >
> > Configuration fileSystemConf = new Configuration();
> >
> > fileSystemConf.setBoolean("presto.s3.connection.ssl.enabled", false);
> > fileSystemConf.setString("presto.s3.access-key", "minioadmin");
> > fileSystemConf.setString("presto.s3.secret-key", "minioadmin");
> > fileSystemConf.setString("presto.s3.endpoint", "http://127.0.0.1:9000");
> >
> > FileSystem.initialize(fileSystemConf);
> >
> > Path path = new Path("s3p://test/");
> > System.out.println(path.getFileSystem().exists(path));
> >
> > 但是会抛出如下异常:
> > Exception in thread "main"
> > org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not
> > find a file system implementation for scheme 's3p'. The scheme is
> directly
> > supported by Flink through the following plugin: flink-s3-fs-presto.
> Please
> > ensure that each plugin resides within its own subfolder within the
> plugins
> > directory. See
> > https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html
> for
> > more information. If you want to use a Hadoop file system for that
> scheme,
> > please add the scheme to the configuration
> fs.allowed-fallback-filesystems.
> > For a full list of supported file systems, please see
> > https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/.
> > at
> >
> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:515)
> > at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:409)
> > at org.apache.flink.core.fs.Path.getFileSystem(Path.java:274)
> > at org.example.StreamingJob.main(StreamingJob.java:58)
> >
> > 但是神奇的是,我可以用s3a
> > 初始化配置如下:
> >
> > fileSystemConf.setBoolean("fs.s3a.connection.ssl.enabled", false);
> > fileSystemConf.setString("fs.s3a.endpoint", "http://127.0.0.1:9000");
> > fileSystemConf.setString("fs.s3a.access.key", "minioadmin");
> > fileSystemConf.setString("fs.s3a.secret.key", "minioadmin");
> > fileSystemConf.setString("fs.s3a.path.style.access", "true");
> > fileSystemConf.setString("fs.s3a.impl",
> > "org.apache.hadoop.fs.s3a.S3AFileSystem");
> >
> >
> > 谢谢!
>
>

Re: s3p 如果在本地调试

Posted by Weihua Hu <hu...@gmail.com>.
Hi,
你是在 IDEA 中运行吗?我增加相关的 pom 依赖后在 wordcount 中可以正常运行,可以 idea maven reload project 试试

Best,
Weihua

> 2022年5月19日 下午4:05,z y xing <ch...@gmail.com> 写道:
> 
> 各位好:
> 了解实际运行是要复制jar到plugin下,但是调试的话用该怎么初始化s3p这个文件系统了?
> 
> flink版本 1.14,win10
> 项目通过flink-quick-start创建,在pom中添加了如下依赖
> 
> <dependency>
>   <groupId>org.apache.flink</groupId>
>   <artifactId>flink-s3-fs-presto</artifactId>
>   <version>${flink.version}</version>
> </dependency>
> 
> 初始代码类似如下:
> 
> Configuration fileSystemConf = new Configuration();
> 
> fileSystemConf.setBoolean("presto.s3.connection.ssl.enabled", false);
> fileSystemConf.setString("presto.s3.access-key", "minioadmin");
> fileSystemConf.setString("presto.s3.secret-key", "minioadmin");
> fileSystemConf.setString("presto.s3.endpoint", "http://127.0.0.1:9000");
> 
> FileSystem.initialize(fileSystemConf);
> 
> Path path = new Path("s3p://test/");
> System.out.println(path.getFileSystem().exists(path));
> 
> 但是会抛出如下异常:
> Exception in thread "main"
> org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not
> find a file system implementation for scheme 's3p'. The scheme is directly
> supported by Flink through the following plugin: flink-s3-fs-presto. Please
> ensure that each plugin resides within its own subfolder within the plugins
> directory. See
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for
> more information. If you want to use a Hadoop file system for that scheme,
> please add the scheme to the configuration fs.allowed-fallback-filesystems.
> For a full list of supported file systems, please see
> https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/.
> at
> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:515)
> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:409)
> at org.apache.flink.core.fs.Path.getFileSystem(Path.java:274)
> at org.example.StreamingJob.main(StreamingJob.java:58)
> 
> 但是神奇的是,我可以用s3a
> 初始化配置如下:
> 
> fileSystemConf.setBoolean("fs.s3a.connection.ssl.enabled", false);
> fileSystemConf.setString("fs.s3a.endpoint", "http://127.0.0.1:9000");
> fileSystemConf.setString("fs.s3a.access.key", "minioadmin");
> fileSystemConf.setString("fs.s3a.secret.key", "minioadmin");
> fileSystemConf.setString("fs.s3a.path.style.access", "true");
> fileSystemConf.setString("fs.s3a.impl",
> "org.apache.hadoop.fs.s3a.S3AFileSystem");
> 
> 
> 谢谢!