You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Piotr Jagielski <pj...@touk.pl> on 2021/08/14 09:40:09 UTC

Problems with reading ORC files with S3 filesystem

Hi,
I want to use Flink SQL filesystem to read ORC file via S3 filesystem on Flink 1.13. My table definition looks like this:

create or replace table xxx 
 (..., startdate string)
 partitioned by (startdate) with ('connector'='filesystem', 'format'='orc', 'path'='s3://xxx/orc/yyy')

I followed Flink's S3 guide and installed S3 libs as plugin. I have MinIO as S3 provider and it works for Flinks checkpoints and HA files. 
The SQL connector also works when I use CSV or Avro formats. The problems start with ORC

1. If I just put flink-orc on job's classpath I get error on JobManager:
Caused by: java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration
	at org.apache.flink.orc.OrcFileFormatFactory$1.createRuntimeDecoder(OrcFileFormatFactory.java:121) ~[?:?]
	at org.apache.flink.orc.OrcFileFormatFactory$1.createRuntimeDecoder(OrcFileFormatFactory.java:88) ~[?:?]
	at org.apache.flink.table.filesystem.FileSystemTableSource.getScanRuntimeProvider(FileSystemTableSource.java:118) ~[flink-table-blink_2.12-1.13.2.jar:1.13.2]

2. I managed to put hadoop common libs on the classpath by this maven setup:

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-orc_${scala.binary.version}</artifactId>
			<version>${flink.version}</version>
			<exclusions>
				<exclusion>
					<groupId>org.apache.orc</groupId>
					<artifactId>orc-core</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>org.apache.orc</groupId>
			<artifactId>orc-core</artifactId>
			<version>1.5.6</version>
		</dependency>
		<dependency>
			<groupId>org.apache.orc</groupId>
			<artifactId>orc-shims</artifactId>
			<version>1.5.6</version>
		</dependency>
		<dependency>
			<groupId>net.java.dev.jets3t</groupId>
			<artifactId>jets3t</artifactId>
			<version>0.9.0</version>
		</dependency>

No the job is accepted by JobManager, but execution fails with lack of AWS credentials:
Caused by: java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3 URL, or by setting the fs.s3.awsAccessKeyId or fs.s3.awsSecretAccessKey properties (respectively).
	at org.apache.hadoop.fs.s3.S3Credentials.initialize(S3Credentials.java:70)
	at org.apache.hadoop.fs.s3.Jets3tFileSystemStore.initialize(Jets3tFileSystemStore.java:92)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.base/java.lang.reflect.Method.invoke(Unknown Source)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
	at com.sun.proxy.$Proxy76.initialize(Unknown Source)
	at org.apache.hadoop.fs.s3.S3FileSystem.initialize(S3FileSystem.java:92)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2433)
	at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:88)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2467)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2449)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:367)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:287)
	at org.apache.orc.impl.ReaderImpl.getFileSystem(ReaderImpl.java:395)
	at org.apache.orc.impl.ReaderImpl.<init>(ReaderImpl.java:368)
	at org.apache.orc.OrcFile.createReader(OrcFile.java:343)

I guess that ORC reader tries to recreate s3 filesystem in job's classloader and cannot use credentials from flink-conf.yaml. However I can see in the logs that it earlier managed to list the files on MinIO:

2021-08-14 09:35:48,285 INFO  org.apache.flink.connector.file.src.assigners.LocalityAwareSplitAssigner [] - Assigning remote split to requesting host '172': Optional[FileSourceSplit: s3://xxx/orc/yyy/startdate=2021-08-10/3cf3afae-1050-4591-a5af-98d231879687.orc [0, 144607)  hosts=[localhost] ID=0000000002 position=null]


So I think the issue is in ORCReader when it tries to read specific file.

Any ideas hao can I modify the setup or pass the credentials to Jets3t?

Regards,
Piotr


Re: Problems with reading ORC files with S3 filesystem

Posted by Piotr Jagielski <pj...@touk.pl>.
Hi David,

Thanks for your answer. I finally managed to read ORC files by:
- switching to s3a:// in my Flink SQL table path parameter
- providing all the properties in Hadoop's core-site.xml file (fs.s3a.endpoint, fs.s3a.path.style.access, fs.s3a.aws.credentials.provider, fs.s3a.access.key, fs.s3a.secret.key)
- setting HADOOP_CONF_DIR env variable pointing to directory containing core-site.xml

Regards,
Piotr

On 2021/08/16 09:07:48, David Morávek <dm...@apache.org> wrote: 
> Hi Piotr,
> 
> unfortunately this is a known long-standing issue [1]. The problem is that
> ORC format is not using Flink's filesystem abstraction for actual reading
> of the underlying file, so you have to adjust your Hadoop config
> accordingly. There is also a corresponding SO question [2] covering this.
> 
> I think a proper fix would actually require changing the interface on ORC
> side, because currently there seems to be now easy way to switch the FS
> implementation (I've just quickly checked OrcFile class, so this might not
> be 100% accurate).
> 
> [1] https://issues.apache.org/jira/browse/FLINK-10989
> [2] https://stackoverflow.com/a/53435359
> 
> Best,
> D.
> 
> On Sat, Aug 14, 2021 at 11:40 AM Piotr Jagielski <pj...@touk.pl> wrote:
> 
> > Hi,
> > I want to use Flink SQL filesystem to read ORC file via S3 filesystem on
> > Flink 1.13. My table definition looks like this:
> >
> > create or replace table xxx
> >  (..., startdate string)
> >  partitioned by (startdate) with ('connector'='filesystem',
> > 'format'='orc', 'path'='s3://xxx/orc/yyy')
> >
> > I followed Flink's S3 guide and installed S3 libs as plugin. I have MinIO
> > as S3 provider and it works for Flinks checkpoints and HA files.
> > The SQL connector also works when I use CSV or Avro formats. The problems
> > start with ORC
> >
> > 1. If I just put flink-orc on job's classpath I get error on JobManager:
> > Caused by: java.lang.NoClassDefFoundError:
> > org/apache/hadoop/conf/Configuration
> >         at
> > org.apache.flink.orc.OrcFileFormatFactory$1.createRuntimeDecoder(OrcFileFormatFactory.java:121)
> > ~[?:?]
> >         at
> > org.apache.flink.orc.OrcFileFormatFactory$1.createRuntimeDecoder(OrcFileFormatFactory.java:88)
> > ~[?:?]
> >         at
> > org.apache.flink.table.filesystem.FileSystemTableSource.getScanRuntimeProvider(FileSystemTableSource.java:118)
> > ~[flink-table-blink_2.12-1.13.2.jar:1.13.2]
> >
> > 2. I managed to put hadoop common libs on the classpath by this maven
> > setup:
> >
> >                 <dependency>
> >                         <groupId>org.apache.flink</groupId>
> >
> > <artifactId>flink-orc_${scala.binary.version}</artifactId>
> >                         <version>${flink.version}</version>
> >                         <exclusions>
> >                                 <exclusion>
> >                                         <groupId>org.apache.orc</groupId>
> >                                         <artifactId>orc-core</artifactId>
> >                                 </exclusion>
> >                         </exclusions>
> >                 </dependency>
> >                 <dependency>
> >                         <groupId>org.apache.orc</groupId>
> >                         <artifactId>orc-core</artifactId>
> >                         <version>1.5.6</version>
> >                 </dependency>
> >                 <dependency>
> >                         <groupId>org.apache.orc</groupId>
> >                         <artifactId>orc-shims</artifactId>
> >                         <version>1.5.6</version>
> >                 </dependency>
> >                 <dependency>
> >                         <groupId>net.java.dev.jets3t</groupId>
> >                         <artifactId>jets3t</artifactId>
> >                         <version>0.9.0</version>
> >                 </dependency>
> >
> > No the job is accepted by JobManager, but execution fails with lack of AWS
> > credentials:
> > Caused by: java.lang.IllegalArgumentException: AWS Access Key ID and
> > Secret Access Key must be specified as the username or password
> > (respectively) of a s3 URL, or by setting the fs.s3.awsAccessKeyId or
> > fs.s3.awsSecretAccessKey properties (respectively).
> >         at
> > org.apache.hadoop.fs.s3.S3Credentials.initialize(S3Credentials.java:70)
> >         at
> > org.apache.hadoop.fs.s3.Jets3tFileSystemStore.initialize(Jets3tFileSystemStore.java:92)
> >         at
> > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> > Method)
> >         at
> > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown
> > Source)
> >         at
> > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
> > Source)
> >         at java.base/java.lang.reflect.Method.invoke(Unknown Source)
> >         at
> > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
> >         at
> > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
> >         at com.sun.proxy.$Proxy76.initialize(Unknown Source)
> >         at
> > org.apache.hadoop.fs.s3.S3FileSystem.initialize(S3FileSystem.java:92)
> >         at
> > org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2433)
> >         at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:88)
> >         at
> > org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2467)
> >         at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2449)
> >         at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:367)
> >         at org.apache.hadoop.fs.Path.getFileSystem(Path.java:287)
> >         at
> > org.apache.orc.impl.ReaderImpl.getFileSystem(ReaderImpl.java:395)
> >         at org.apache.orc.impl.ReaderImpl.<init>(ReaderImpl.java:368)
> >         at org.apache.orc.OrcFile.createReader(OrcFile.java:343)
> >
> > I guess that ORC reader tries to recreate s3 filesystem in job's
> > classloader and cannot use credentials from flink-conf.yaml. However I can
> > see in the logs that it earlier managed to list the files on MinIO:
> >
> > 2021-08-14 09:35:48,285 INFO
> > org.apache.flink.connector.file.src.assigners.LocalityAwareSplitAssigner []
> > - Assigning remote split to requesting host '172':
> > Optional[FileSourceSplit:
> > s3://xxx/orc/yyy/startdate=2021-08-10/3cf3afae-1050-4591-a5af-98d231879687.orc
> > [0, 144607)  hosts=[localhost] ID=0000000002 position=null]
> >
> >
> > So I think the issue is in ORCReader when it tries to read specific file.
> >
> > Any ideas hao can I modify the setup or pass the credentials to Jets3t?
> >
> > Regards,
> > Piotr
> >
> >
> 

Re: Problems with reading ORC files with S3 filesystem

Posted by David Morávek <dm...@apache.org>.
Hi Piotr,

unfortunately this is a known long-standing issue [1]. The problem is that
ORC format is not using Flink's filesystem abstraction for actual reading
of the underlying file, so you have to adjust your Hadoop config
accordingly. There is also a corresponding SO question [2] covering this.

I think a proper fix would actually require changing the interface on ORC
side, because currently there seems to be now easy way to switch the FS
implementation (I've just quickly checked OrcFile class, so this might not
be 100% accurate).

[1] https://issues.apache.org/jira/browse/FLINK-10989
[2] https://stackoverflow.com/a/53435359

Best,
D.

On Sat, Aug 14, 2021 at 11:40 AM Piotr Jagielski <pj...@touk.pl> wrote:

> Hi,
> I want to use Flink SQL filesystem to read ORC file via S3 filesystem on
> Flink 1.13. My table definition looks like this:
>
> create or replace table xxx
>  (..., startdate string)
>  partitioned by (startdate) with ('connector'='filesystem',
> 'format'='orc', 'path'='s3://xxx/orc/yyy')
>
> I followed Flink's S3 guide and installed S3 libs as plugin. I have MinIO
> as S3 provider and it works for Flinks checkpoints and HA files.
> The SQL connector also works when I use CSV or Avro formats. The problems
> start with ORC
>
> 1. If I just put flink-orc on job's classpath I get error on JobManager:
> Caused by: java.lang.NoClassDefFoundError:
> org/apache/hadoop/conf/Configuration
>         at
> org.apache.flink.orc.OrcFileFormatFactory$1.createRuntimeDecoder(OrcFileFormatFactory.java:121)
> ~[?:?]
>         at
> org.apache.flink.orc.OrcFileFormatFactory$1.createRuntimeDecoder(OrcFileFormatFactory.java:88)
> ~[?:?]
>         at
> org.apache.flink.table.filesystem.FileSystemTableSource.getScanRuntimeProvider(FileSystemTableSource.java:118)
> ~[flink-table-blink_2.12-1.13.2.jar:1.13.2]
>
> 2. I managed to put hadoop common libs on the classpath by this maven
> setup:
>
>                 <dependency>
>                         <groupId>org.apache.flink</groupId>
>
> <artifactId>flink-orc_${scala.binary.version}</artifactId>
>                         <version>${flink.version}</version>
>                         <exclusions>
>                                 <exclusion>
>                                         <groupId>org.apache.orc</groupId>
>                                         <artifactId>orc-core</artifactId>
>                                 </exclusion>
>                         </exclusions>
>                 </dependency>
>                 <dependency>
>                         <groupId>org.apache.orc</groupId>
>                         <artifactId>orc-core</artifactId>
>                         <version>1.5.6</version>
>                 </dependency>
>                 <dependency>
>                         <groupId>org.apache.orc</groupId>
>                         <artifactId>orc-shims</artifactId>
>                         <version>1.5.6</version>
>                 </dependency>
>                 <dependency>
>                         <groupId>net.java.dev.jets3t</groupId>
>                         <artifactId>jets3t</artifactId>
>                         <version>0.9.0</version>
>                 </dependency>
>
> No the job is accepted by JobManager, but execution fails with lack of AWS
> credentials:
> Caused by: java.lang.IllegalArgumentException: AWS Access Key ID and
> Secret Access Key must be specified as the username or password
> (respectively) of a s3 URL, or by setting the fs.s3.awsAccessKeyId or
> fs.s3.awsSecretAccessKey properties (respectively).
>         at
> org.apache.hadoop.fs.s3.S3Credentials.initialize(S3Credentials.java:70)
>         at
> org.apache.hadoop.fs.s3.Jets3tFileSystemStore.initialize(Jets3tFileSystemStore.java:92)
>         at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
>         at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown
> Source)
>         at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
> Source)
>         at java.base/java.lang.reflect.Method.invoke(Unknown Source)
>         at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
>         at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>         at com.sun.proxy.$Proxy76.initialize(Unknown Source)
>         at
> org.apache.hadoop.fs.s3.S3FileSystem.initialize(S3FileSystem.java:92)
>         at
> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2433)
>         at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:88)
>         at
> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2467)
>         at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2449)
>         at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:367)
>         at org.apache.hadoop.fs.Path.getFileSystem(Path.java:287)
>         at
> org.apache.orc.impl.ReaderImpl.getFileSystem(ReaderImpl.java:395)
>         at org.apache.orc.impl.ReaderImpl.<init>(ReaderImpl.java:368)
>         at org.apache.orc.OrcFile.createReader(OrcFile.java:343)
>
> I guess that ORC reader tries to recreate s3 filesystem in job's
> classloader and cannot use credentials from flink-conf.yaml. However I can
> see in the logs that it earlier managed to list the files on MinIO:
>
> 2021-08-14 09:35:48,285 INFO
> org.apache.flink.connector.file.src.assigners.LocalityAwareSplitAssigner []
> - Assigning remote split to requesting host '172':
> Optional[FileSourceSplit:
> s3://xxx/orc/yyy/startdate=2021-08-10/3cf3afae-1050-4591-a5af-98d231879687.orc
> [0, 144607)  hosts=[localhost] ID=0000000002 position=null]
>
>
> So I think the issue is in ORCReader when it tries to read specific file.
>
> Any ideas hao can I modify the setup or pass the credentials to Jets3t?
>
> Regards,
> Piotr
>
>