You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Rohan Kumar <ro...@gmail.com> on 2021/12/29 07:48:42 UTC

Unable to read S3 data using the filesystem connector

Hello,

I tried to read parquet data in S3 using the filesystem connector but got
the below error. The jobmanger is not starting.
I tried the standalone-job in docker.
I have already included flink-s3-fs-hadoop and flink-s3-fs-presto as
plugins and they are working fine for checkpointing and Kubernetes HA. The
issue is when I am  reading files from S3 using the table API connector.

Caused by: java.lang.ClassNotFoundException:
org.apache.hadoop.conf.Configuration
at jdk.internal.loader.BuiltinClassLoader.loadClass(Unknown Source) ~[?:?]
at jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(Unknown
Source) ~[?:?]
at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
at
org.apache.flink.formats.parquet.ParquetFileFormatFactory.getParquetConfiguration(ParquetFileFormatFactory.java:116)
~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
at
org.apache.flink.formats.parquet.ParquetFileFormatFactory.access$000(ParquetFileFormatFactory.java:51)
~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
at
org.apache.flink.formats.parquet.ParquetFileFormatFactory$1.createRuntimeDecoder(ParquetFileFormatFactory.java:79)
~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
at
org.apache.flink.formats.parquet.ParquetFileFormatFactory$1.createRuntimeDecoder(ParquetFileFormatFactory.java:67)
~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
at
org.apache.flink.table.filesystem.FileSystemTableSource.getScanRuntimeProvider(FileSystemTableSource.java:118)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.table.planner.connectors.DynamicSourceUtils.validateScanSource(DynamicSourceUtils.java:452)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.table.planner.connectors.DynamicSourceUtils.prepareDynamicSource(DynamicSourceUtils.java:160)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.table.planner.connectors.DynamicSourceUtils.convertSourceToRel(DynamicSourceUtils.java:124)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:85)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at
org.apache.calcite.rel.core.RelFactories$TableScanFactoryImpl.createScan(RelFactories.java:495)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1099)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1123)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:351)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:154)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.table.operations.CatalogQueryOperation.accept(CatalogQueryOperation.java:68)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:151)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:133)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:92)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.table.operations.CatalogQueryOperation.accept(CatalogQueryOperation.java:68)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.table.planner.calcite.FlinkRelBuilder.queryOperation(FlinkRelBuilder.scala:184)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:258)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:182)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:285)
~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
at scala.collection.Iterator.foreach(Iterator.scala:943)
~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
at scala.collection.Iterator.foreach$(Iterator.scala:943)
~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
at scala.collection.TraversableLike.map(TraversableLike.scala:285)
~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
at scala.collection.TraversableLike.map$(TraversableLike.scala:278)
~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
at
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:182)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toStreamInternal(StreamTableEnvironmentImpl.scala:297)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toStreamInternal(StreamTableEnvironmentImpl.scala:288)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:213)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:190)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.table.api.bridge.scala.TableConversions.toDataStream(TableConversions.scala:52)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at org.example.Job$.main(Job.scala:53) ~[?:?]
at org.example.Job.main(Job.scala) ~[?:?]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
~[?:?]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
~[?:?]
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
~[?:?]
at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
~[flink-dist_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
~[flink-dist_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:253)
~[flink-dist_2.12-1.14.2.jar:1.14.2]
... 13 more



Here's my code:

val env: StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment
    val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)

    env.setRuntimeMode(RuntimeExecutionMode.BATCH)

    val source = tEnv.from(TableDescriptor
      .forConnector("filesystem")
      .option("path", "s3a://source/data")
      .option("format", "parquet")
      .schema(Schema.newBuilder()
        .column("InvoiceNo", DataTypes.BIGINT())
        .column("InvoiceDate", DataTypes.BIGINT())
        .column("Quantity", DataTypes.BIGINT())
        .column("UnitPrice", DataTypes.BIGINT())
        .column("Description", DataTypes.STRING())
        .column("CustomerID", DataTypes.STRING())
        .column("Country", DataTypes.STRING())
        .column("currentTs", DataTypes.TIMESTAMP(3))
        .build()
      )
      .build())

    source.toDataStream.print()

    env.execute()

Then I added hadoop-common dependency and got the below error and jobmanger
didn't start.

Exception in thread "main" java.lang.NoSuchMethodError:
'org.apache.commons.cli.Option$Builder
org.apache.commons.cli.Option.builder(java.lang.String)'
at
org.apache.flink.container.entrypoint.StandaloneApplicationClusterConfigurationParserFactory.<clinit>(StandaloneApplicationClusterConfigurationParserFactory.java:49)
at
org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint.main(StandaloneApplicationClusterEntryPoint.java:57)



Then I built the flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar from flink source
and included it in the lib directory. The jobmanager started. But, got the
below error and job kept restarting.

Caused by: java.lang.ClassNotFoundException: Class
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback
not found
at
org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2310)
~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2398)
~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2420)
~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at org.apache.hadoop.security.Groups.<init>(Groups.java:107)
~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at org.apache.hadoop.security.Groups.<init>(Groups.java:102)
~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:451)
~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:337)
~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:304)
~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.hadoop.security.UserGroupInformation.doSubjectLogin(UserGroupInformation.java:1860)
~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.hadoop.security.UserGroupInformation.createLoginUser(UserGroupInformation.java:718)
~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:668)
~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:579)
~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at org.apache.hadoop.fs.FileSystem$Cache$Key.<init>(FileSystem.java:3564)
~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at org.apache.hadoop.fs.FileSystem$Cache$Key.<init>(FileSystem.java:3554)
~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3391)
~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:485)
~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.parquet.hadoop.util.HadoopInputFile.fromPath(HadoopInputFile.java:38)
~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
at
org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:456)
~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
at
org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createReader(ParquetVectorizedInputFormat.java:112)
~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
at
org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createReader(ParquetVectorizedInputFormat.java:73)
~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
at
org.apache.flink.connector.file.src.impl.FileSourceSplitReader.checkSplitOrStartNext(FileSourceSplitReader.java:112)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:65)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
~[?:?]
at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?]
... 1 more

Re: Unable to read S3 data using the filesystem connector

Posted by David Morávek <dm...@apache.org>.
Hi Rohan,

setting this up is currently not really straightforward :( I think we need
to improve on this.

For supporting the s3 filesystem, you did it right by placing s3 jars into
the plugins directory. Please note, that these are loaded in a separate
class loader and also contain a shaded version of hadoop.

Parquet unfortunately needs some hadoop classes as well, but it can not
re-use the ones used by filesystems, because these are not visible to the
parent classloader.

Shaded hadoop fs is not really meant to be visible for the parent loader
(you've seen this by running into serviceloader related issues), so I'd
discourage you going down this path.

I think adding a hadoop-common (maybe you may even need
hadoop-mapreduce-client-core instead) dependency is a way to go here. You
just may have to setup some exclusions, so you don't get conflicts from
some transitive dependencies (commons-cli in your case, but there might be
others).

Best,
D.

On Wed, Dec 29, 2021 at 10:53 AM Rohan Kumar <ro...@gmail.com>
wrote:

> I am running flink 1.14.2
>
> Thanks
>
> On Wed, 29 Dec 2021 at 13:18, Rohan Kumar <ro...@gmail.com> wrote:
>
>> Hello,
>>
>> I tried to read parquet data in S3 using the filesystem connector but got
>> the below error. The jobmanger is not starting.
>> I tried the standalone-job in docker.
>> I have already included flink-s3-fs-hadoop and flink-s3-fs-presto as
>> plugins and they are working fine for checkpointing and Kubernetes HA. The
>> issue is when I am  reading files from S3 using the table API connector.
>>
>> Caused by: java.lang.ClassNotFoundException:
>> org.apache.hadoop.conf.Configuration
>> at jdk.internal.loader.BuiltinClassLoader.loadClass(Unknown Source) ~[?:?]
>> at jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(Unknown
>> Source) ~[?:?]
>> at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
>> at
>> org.apache.flink.formats.parquet.ParquetFileFormatFactory.getParquetConfiguration(ParquetFileFormatFactory.java:116)
>> ~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
>> at
>> org.apache.flink.formats.parquet.ParquetFileFormatFactory.access$000(ParquetFileFormatFactory.java:51)
>> ~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
>> at
>> org.apache.flink.formats.parquet.ParquetFileFormatFactory$1.createRuntimeDecoder(ParquetFileFormatFactory.java:79)
>> ~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
>> at
>> org.apache.flink.formats.parquet.ParquetFileFormatFactory$1.createRuntimeDecoder(ParquetFileFormatFactory.java:67)
>> ~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
>> at
>> org.apache.flink.table.filesystem.FileSystemTableSource.getScanRuntimeProvider(FileSystemTableSource.java:118)
>> ~[flink-table_2.12-1.14.2.jar:1.14.2]
>> at
>> org.apache.flink.table.planner.connectors.DynamicSourceUtils.validateScanSource(DynamicSourceUtils.java:452)
>> ~[flink-table_2.12-1.14.2.jar:1.14.2]
>> at
>> org.apache.flink.table.planner.connectors.DynamicSourceUtils.prepareDynamicSource(DynamicSourceUtils.java:160)
>> ~[flink-table_2.12-1.14.2.jar:1.14.2]
>> at
>> org.apache.flink.table.planner.connectors.DynamicSourceUtils.convertSourceToRel(DynamicSourceUtils.java:124)
>> ~[flink-table_2.12-1.14.2.jar:1.14.2]
>> at
>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:85)
>> ~[flink-table_2.12-1.14.2.jar:1.14.2]
>> at
>> org.apache.calcite.rel.core.RelFactories$TableScanFactoryImpl.createScan(RelFactories.java:495)
>> ~[flink-table_2.12-1.14.2.jar:1.14.2]
>> at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1099)
>> ~[flink-table_2.12-1.14.2.jar:1.14.2]
>> at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1123)
>> ~[flink-table_2.12-1.14.2.jar:1.14.2]
>> at
>> org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:351)
>> ~[flink-table_2.12-1.14.2.jar:1.14.2]
>> at
>> org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:154)
>> ~[flink-table_2.12-1.14.2.jar:1.14.2]
>> at
>> org.apache.flink.table.operations.CatalogQueryOperation.accept(CatalogQueryOperation.java:68)
>> ~[flink-table_2.12-1.14.2.jar:1.14.2]
>> at
>> org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:151)
>> ~[flink-table_2.12-1.14.2.jar:1.14.2]
>> at
>> org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:133)
>> ~[flink-table_2.12-1.14.2.jar:1.14.2]
>> at
>> org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:92)
>> ~[flink-table_2.12-1.14.2.jar:1.14.2]
>> at
>> org.apache.flink.table.operations.CatalogQueryOperation.accept(CatalogQueryOperation.java:68)
>> ~[flink-table_2.12-1.14.2.jar:1.14.2]
>> at
>> org.apache.flink.table.planner.calcite.FlinkRelBuilder.queryOperation(FlinkRelBuilder.scala:184)
>> ~[flink-table_2.12-1.14.2.jar:1.14.2]
>> at
>> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:258)
>> ~[flink-table_2.12-1.14.2.jar:1.14.2]
>> at
>> org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:182)
>> ~[flink-table_2.12-1.14.2.jar:1.14.2]
>> at
>> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:285)
>> ~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
>> at scala.collection.Iterator.foreach(Iterator.scala:943)
>> ~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
>> at scala.collection.Iterator.foreach$(Iterator.scala:943)
>> ~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>> ~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
>> at scala.collection.IterableLike.foreach(IterableLike.scala:74)
>> ~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
>> at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
>> ~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
>> at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
>> ~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
>> at scala.collection.TraversableLike.map(TraversableLike.scala:285)
>> ~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
>> at scala.collection.TraversableLike.map$(TraversableLike.scala:278)
>> ~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
>> at scala.collection.AbstractTraversable.map(Traversable.scala:108)
>> ~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
>> at
>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:182)
>> ~[flink-table_2.12-1.14.2.jar:1.14.2]
>> at
>> org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toStreamInternal(StreamTableEnvironmentImpl.scala:297)
>> ~[flink-table_2.12-1.14.2.jar:1.14.2]
>> at
>> org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toStreamInternal(StreamTableEnvironmentImpl.scala:288)
>> ~[flink-table_2.12-1.14.2.jar:1.14.2]
>> at
>> org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:213)
>> ~[flink-table_2.12-1.14.2.jar:1.14.2]
>> at
>> org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:190)
>> ~[flink-table_2.12-1.14.2.jar:1.14.2]
>> at
>> org.apache.flink.table.api.bridge.scala.TableConversions.toDataStream(TableConversions.scala:52)
>> ~[flink-table_2.12-1.14.2.jar:1.14.2]
>> at org.example.Job$.main(Job.scala:53) ~[?:?]
>> at org.example.Job.main(Job.scala) ~[?:?]
>> at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> ~[?:?]
>> at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
>> ~[?:?]
>> at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
>> Source) ~[?:?]
>> at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
>> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>> at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
>> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>> at
>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
>> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>> at
>> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:253)
>> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>> ... 13 more
>>
>>
>>
>> Here's my code:
>>
>> val env: StreamExecutionEnvironment =
>> StreamExecutionEnvironment.getExecutionEnvironment
>>     val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
>>
>>     env.setRuntimeMode(RuntimeExecutionMode.BATCH)
>>
>>     val source = tEnv.from(TableDescriptor
>>       .forConnector("filesystem")
>>       .option("path", "s3a://source/data")
>>       .option("format", "parquet")
>>       .schema(Schema.newBuilder()
>>         .column("InvoiceNo", DataTypes.BIGINT())
>>         .column("InvoiceDate", DataTypes.BIGINT())
>>         .column("Quantity", DataTypes.BIGINT())
>>         .column("UnitPrice", DataTypes.BIGINT())
>>         .column("Description", DataTypes.STRING())
>>         .column("CustomerID", DataTypes.STRING())
>>         .column("Country", DataTypes.STRING())
>>         .column("currentTs", DataTypes.TIMESTAMP(3))
>>         .build()
>>       )
>>       .build())
>>
>>     source.toDataStream.print()
>>
>>     env.execute()
>>
>> Then I added hadoop-common dependency and got the below error and
>> jobmanger didn't start.
>>
>> Exception in thread "main" java.lang.NoSuchMethodError:
>> 'org.apache.commons.cli.Option$Builder
>> org.apache.commons.cli.Option.builder(java.lang.String)'
>> at
>> org.apache.flink.container.entrypoint.StandaloneApplicationClusterConfigurationParserFactory.<clinit>(StandaloneApplicationClusterConfigurationParserFactory.java:49)
>> at
>> org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint.main(StandaloneApplicationClusterEntryPoint.java:57)
>>
>>
>>
>> Then I built the flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar from flink
>> source and included it in the lib directory. The jobmanager started. But,
>> got the below error and job kept restarting.
>>
>> Caused by: java.lang.ClassNotFoundException: Class
>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback
>> not found
>> at
>> org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2310)
>> ~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>> at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2398)
>> ~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>> at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2420)
>> ~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>> at org.apache.hadoop.security.Groups.<init>(Groups.java:107)
>> ~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>> at org.apache.hadoop.security.Groups.<init>(Groups.java:102)
>> ~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>> at
>> org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:451)
>> ~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>> at
>> org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:337)
>> ~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>> at
>> org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:304)
>> ~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>> at
>> org.apache.hadoop.security.UserGroupInformation.doSubjectLogin(UserGroupInformation.java:1860)
>> ~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>> at
>> org.apache.hadoop.security.UserGroupInformation.createLoginUser(UserGroupInformation.java:718)
>> ~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>> at
>> org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:668)
>> ~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>> at
>> org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:579)
>> ~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>> at org.apache.hadoop.fs.FileSystem$Cache$Key.<init>(FileSystem.java:3564)
>> ~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>> at org.apache.hadoop.fs.FileSystem$Cache$Key.<init>(FileSystem.java:3554)
>> ~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3391)
>> ~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:485)
>> ~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
>> ~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>> at
>> org.apache.parquet.hadoop.util.HadoopInputFile.fromPath(HadoopInputFile.java:38)
>> ~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
>> at
>> org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:456)
>> ~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
>> at
>> org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createReader(ParquetVectorizedInputFormat.java:112)
>> ~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
>> at
>> org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createReader(ParquetVectorizedInputFormat.java:73)
>> ~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
>> at
>> org.apache.flink.connector.file.src.impl.FileSourceSplitReader.checkSplitOrStartNext(FileSourceSplitReader.java:112)
>> ~[flink-table_2.12-1.14.2.jar:1.14.2]
>> at
>> org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:65)
>> ~[flink-table_2.12-1.14.2.jar:1.14.2]
>> at
>> org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
>> ~[flink-table_2.12-1.14.2.jar:1.14.2]
>> at
>> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
>> ~[flink-table_2.12-1.14.2.jar:1.14.2]
>> at
>> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
>> ~[flink-table_2.12-1.14.2.jar:1.14.2]
>> at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
>> ~[?:?]
>> at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
>> at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
>> ~[?:?]
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
>> ~[?:?]
>> ... 1 more
>>
>

Re: Unable to read S3 data using the filesystem connector

Posted by Rohan Kumar <ro...@gmail.com>.
I am running flink 1.14.2

Thanks

On Wed, 29 Dec 2021 at 13:18, Rohan Kumar <ro...@gmail.com> wrote:

> Hello,
>
> I tried to read parquet data in S3 using the filesystem connector but got
> the below error. The jobmanger is not starting.
> I tried the standalone-job in docker.
> I have already included flink-s3-fs-hadoop and flink-s3-fs-presto as
> plugins and they are working fine for checkpointing and Kubernetes HA. The
> issue is when I am  reading files from S3 using the table API connector.
>
> Caused by: java.lang.ClassNotFoundException:
> org.apache.hadoop.conf.Configuration
> at jdk.internal.loader.BuiltinClassLoader.loadClass(Unknown Source) ~[?:?]
> at jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(Unknown
> Source) ~[?:?]
> at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
> at
> org.apache.flink.formats.parquet.ParquetFileFormatFactory.getParquetConfiguration(ParquetFileFormatFactory.java:116)
> ~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
> at
> org.apache.flink.formats.parquet.ParquetFileFormatFactory.access$000(ParquetFileFormatFactory.java:51)
> ~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
> at
> org.apache.flink.formats.parquet.ParquetFileFormatFactory$1.createRuntimeDecoder(ParquetFileFormatFactory.java:79)
> ~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
> at
> org.apache.flink.formats.parquet.ParquetFileFormatFactory$1.createRuntimeDecoder(ParquetFileFormatFactory.java:67)
> ~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
> at
> org.apache.flink.table.filesystem.FileSystemTableSource.getScanRuntimeProvider(FileSystemTableSource.java:118)
> ~[flink-table_2.12-1.14.2.jar:1.14.2]
> at
> org.apache.flink.table.planner.connectors.DynamicSourceUtils.validateScanSource(DynamicSourceUtils.java:452)
> ~[flink-table_2.12-1.14.2.jar:1.14.2]
> at
> org.apache.flink.table.planner.connectors.DynamicSourceUtils.prepareDynamicSource(DynamicSourceUtils.java:160)
> ~[flink-table_2.12-1.14.2.jar:1.14.2]
> at
> org.apache.flink.table.planner.connectors.DynamicSourceUtils.convertSourceToRel(DynamicSourceUtils.java:124)
> ~[flink-table_2.12-1.14.2.jar:1.14.2]
> at
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:85)
> ~[flink-table_2.12-1.14.2.jar:1.14.2]
> at
> org.apache.calcite.rel.core.RelFactories$TableScanFactoryImpl.createScan(RelFactories.java:495)
> ~[flink-table_2.12-1.14.2.jar:1.14.2]
> at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1099)
> ~[flink-table_2.12-1.14.2.jar:1.14.2]
> at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1123)
> ~[flink-table_2.12-1.14.2.jar:1.14.2]
> at
> org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:351)
> ~[flink-table_2.12-1.14.2.jar:1.14.2]
> at
> org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:154)
> ~[flink-table_2.12-1.14.2.jar:1.14.2]
> at
> org.apache.flink.table.operations.CatalogQueryOperation.accept(CatalogQueryOperation.java:68)
> ~[flink-table_2.12-1.14.2.jar:1.14.2]
> at
> org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:151)
> ~[flink-table_2.12-1.14.2.jar:1.14.2]
> at
> org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:133)
> ~[flink-table_2.12-1.14.2.jar:1.14.2]
> at
> org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:92)
> ~[flink-table_2.12-1.14.2.jar:1.14.2]
> at
> org.apache.flink.table.operations.CatalogQueryOperation.accept(CatalogQueryOperation.java:68)
> ~[flink-table_2.12-1.14.2.jar:1.14.2]
> at
> org.apache.flink.table.planner.calcite.FlinkRelBuilder.queryOperation(FlinkRelBuilder.scala:184)
> ~[flink-table_2.12-1.14.2.jar:1.14.2]
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:258)
> ~[flink-table_2.12-1.14.2.jar:1.14.2]
> at
> org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:182)
> ~[flink-table_2.12-1.14.2.jar:1.14.2]
> at
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:285)
> ~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
> at scala.collection.Iterator.foreach(Iterator.scala:943)
> ~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
> at scala.collection.Iterator.foreach$(Iterator.scala:943)
> ~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
> ~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
> at scala.collection.IterableLike.foreach(IterableLike.scala:74)
> ~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
> at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
> ~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
> at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
> ~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
> at scala.collection.TraversableLike.map(TraversableLike.scala:285)
> ~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
> at scala.collection.TraversableLike.map$(TraversableLike.scala:278)
> ~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
> at scala.collection.AbstractTraversable.map(Traversable.scala:108)
> ~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:182)
> ~[flink-table_2.12-1.14.2.jar:1.14.2]
> at
> org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toStreamInternal(StreamTableEnvironmentImpl.scala:297)
> ~[flink-table_2.12-1.14.2.jar:1.14.2]
> at
> org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toStreamInternal(StreamTableEnvironmentImpl.scala:288)
> ~[flink-table_2.12-1.14.2.jar:1.14.2]
> at
> org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:213)
> ~[flink-table_2.12-1.14.2.jar:1.14.2]
> at
> org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:190)
> ~[flink-table_2.12-1.14.2.jar:1.14.2]
> at
> org.apache.flink.table.api.bridge.scala.TableConversions.toDataStream(TableConversions.scala:52)
> ~[flink-table_2.12-1.14.2.jar:1.14.2]
> at org.example.Job$.main(Job.scala:53) ~[?:?]
> at org.example.Job.main(Job.scala) ~[?:?]
> at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> ~[?:?]
> at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
> ~[?:?]
> at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
> Source) ~[?:?]
> at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:253)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> ... 13 more
>
>
>
> Here's my code:
>
> val env: StreamExecutionEnvironment =
> StreamExecutionEnvironment.getExecutionEnvironment
>     val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
>
>     env.setRuntimeMode(RuntimeExecutionMode.BATCH)
>
>     val source = tEnv.from(TableDescriptor
>       .forConnector("filesystem")
>       .option("path", "s3a://source/data")
>       .option("format", "parquet")
>       .schema(Schema.newBuilder()
>         .column("InvoiceNo", DataTypes.BIGINT())
>         .column("InvoiceDate", DataTypes.BIGINT())
>         .column("Quantity", DataTypes.BIGINT())
>         .column("UnitPrice", DataTypes.BIGINT())
>         .column("Description", DataTypes.STRING())
>         .column("CustomerID", DataTypes.STRING())
>         .column("Country", DataTypes.STRING())
>         .column("currentTs", DataTypes.TIMESTAMP(3))
>         .build()
>       )
>       .build())
>
>     source.toDataStream.print()
>
>     env.execute()
>
> Then I added hadoop-common dependency and got the below error and
> jobmanger didn't start.
>
> Exception in thread "main" java.lang.NoSuchMethodError:
> 'org.apache.commons.cli.Option$Builder
> org.apache.commons.cli.Option.builder(java.lang.String)'
> at
> org.apache.flink.container.entrypoint.StandaloneApplicationClusterConfigurationParserFactory.<clinit>(StandaloneApplicationClusterConfigurationParserFactory.java:49)
> at
> org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint.main(StandaloneApplicationClusterEntryPoint.java:57)
>
>
>
> Then I built the flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar from flink
> source and included it in the lib directory. The jobmanager started. But,
> got the below error and job kept restarting.
>
> Caused by: java.lang.ClassNotFoundException: Class
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback
> not found
> at
> org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2310)
> ~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2398)
> ~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2420)
> ~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at org.apache.hadoop.security.Groups.<init>(Groups.java:107)
> ~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at org.apache.hadoop.security.Groups.<init>(Groups.java:102)
> ~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:451)
> ~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:337)
> ~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:304)
> ~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.hadoop.security.UserGroupInformation.doSubjectLogin(UserGroupInformation.java:1860)
> ~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.hadoop.security.UserGroupInformation.createLoginUser(UserGroupInformation.java:718)
> ~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:668)
> ~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:579)
> ~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at org.apache.hadoop.fs.FileSystem$Cache$Key.<init>(FileSystem.java:3564)
> ~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at org.apache.hadoop.fs.FileSystem$Cache$Key.<init>(FileSystem.java:3554)
> ~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3391)
> ~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:485)
> ~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
> ~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.parquet.hadoop.util.HadoopInputFile.fromPath(HadoopInputFile.java:38)
> ~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
> at
> org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:456)
> ~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
> at
> org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createReader(ParquetVectorizedInputFormat.java:112)
> ~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
> at
> org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createReader(ParquetVectorizedInputFormat.java:73)
> ~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
> at
> org.apache.flink.connector.file.src.impl.FileSourceSplitReader.checkSplitOrStartNext(FileSourceSplitReader.java:112)
> ~[flink-table_2.12-1.14.2.jar:1.14.2]
> at
> org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:65)
> ~[flink-table_2.12-1.14.2.jar:1.14.2]
> at
> org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
> ~[flink-table_2.12-1.14.2.jar:1.14.2]
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
> ~[flink-table_2.12-1.14.2.jar:1.14.2]
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
> ~[flink-table_2.12-1.14.2.jar:1.14.2]
> at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
> ~[?:?]
> at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
> at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?]
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
> ~[?:?]
> ... 1 more
>