You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Chesnay Schepler (Jira)" <ji...@apache.org> on 2020/01/29 09:34:00 UTC

[jira] [Updated] (FLINK-14574) flink-s3-fs-hadoop doesn't work with plugins mechanism

     [ https://issues.apache.org/jira/browse/FLINK-14574?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Chesnay Schepler updated FLINK-14574:
-------------------------------------
    Issue Type: Bug  (was: Task)

>  flink-s3-fs-hadoop doesn't work with plugins mechanism
> -------------------------------------------------------
>
>                 Key: FLINK-14574
>                 URL: https://issues.apache.org/jira/browse/FLINK-14574
>             Project: Flink
>          Issue Type: Bug
>          Components: FileSystems
>    Affects Versions: 1.9.0
>            Reporter: Piotr Nowojski
>            Assignee: Arvid Heise
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 1.9.2, 1.10.0
>
>          Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> As reported by a user via [mailing list|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/No-FileSystem-for-scheme-quot-file-quot-for-S3A-in-and-state-processor-api-in-1-9-td30704.html]:
> {noformat}
> We've added flink-s3-fs-hadoop library to plugins folder and trying to
> bootstrap state to S3 using S3A protocol. The following exception happens
> (unless hadoop library is put to lib folder instead of plugins). Looks like
> S3A filesystem is trying to use "local" filesystem for temporary files and
> fails:
> java.lang.Exception: Could not write timer service of MapPartition
> (d2976134f80849779b7a94b7e6218476) (4/4) to checkpoint state stream.
> 	at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:466)
> 	at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89)
> 	at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399)
> 	at
> org.apache.flink.state.api.output.SnapshotUtils.snapshot(SnapshotUtils.java:59)
> 	at
> org.apache.flink.state.api.output.operators.KeyedStateBootstrapOperator.endInput(KeyedStateBootstrapOperator.java:84)
> 	at
> org.apache.flink.state.api.output.BoundedStreamTask.performDefaultAction(BoundedStreamTask.java:85)
> 	at
> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
> 	at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
> 	at
> org.apache.flink.state.api.output.BoundedOneInputStreamTaskRunner.mapPartition(BoundedOneInputStreamTaskRunner.java:76)
> 	at
> org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103)
> 	at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:504)
> 	at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:369)
> 	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> 	at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: Could not open output stream for state
> backend
> 	at
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:367)
> 	at
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flush(FsCheckpointStreamFactory.java:234)
> 	at
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:209)
> 	at
> org.apache.flink.runtime.state.NonClosingCheckpointOutputStream.write(NonClosingCheckpointOutputStream.java:61)
> 	at java.io.DataOutputStream.write(DataOutputStream.java:107)
> 	at java.io.DataOutputStream.writeUTF(DataOutputStream.java:401)
> 	at java.io.DataOutputStream.writeUTF(DataOutputStream.java:323)
> 	at
> org.apache.flink.util.LinkedOptionalMapSerializer.lambda$writeOptionalMap$0(LinkedOptionalMapSerializer.java:58)
> 	at
> org.apache.flink.util.LinkedOptionalMap.forEach(LinkedOptionalMap.java:163)
> 	at
> org.apache.flink.util.LinkedOptionalMapSerializer.writeOptionalMap(LinkedOptionalMapSerializer.java:57)
> 	at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshotData.writeKryoRegistrations(KryoSerializerSnapshotData.java:141)
> 	at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshotData.writeSnapshotData(KryoSerializerSnapshotData.java:128)
> 	at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshot.writeSnapshot(KryoSerializerSnapshot.java:72)
> 	at
> org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.writeVersionedSnapshot(TypeSerializerSnapshot.java:153)
> 	at
> org.apache.flink.streaming.api.operators.InternalTimersSnapshotReaderWriters$InternalTimersSnapshotWriterV2.writeKeyAndNamespaceSerializers(InternalTimersSnapshotReaderWriters.java:199)
> 	at
> org.apache.flink.streaming.api.operators.InternalTimersSnapshotReaderWriters$AbstractInternalTimersSnapshotWriter.writeTimersSnapshot(InternalTimersSnapshotReaderWriters.java:117)
> 	at
> org.apache.flink.streaming.api.operators.InternalTimerServiceSerializationProxy.write(InternalTimerServiceSerializationProxy.java:101)
> 	at
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.snapshotStateForKeyGroup(InternalTimeServiceManager.java:139)
> 	at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:462)
> 	... 14 common frames omitted
> Caused by:
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.UnsupportedFileSystemException:
> No FileSystem for scheme "file"
> 	at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3332)
> 	at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3352)
> 	at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124)
> 	at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3403)
> 	at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3371)
> 	at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.get(FileSystem.java:477)
> 	at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.getLocal(FileSystem.java:433)
> 	at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:301)
> 	at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:378)
> 	at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:456)
> 	at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:200)
> 	at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.createTmpFileForWrite(S3AFileSystem.java:572)
> 	at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3ADataBlocks$DiskBlockFactory.create(S3ADataBlocks.java:811)
> 	at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3ABlockOutputStream.createBlockIfNeeded(S3ABlockOutputStream.java:190)
> 	at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3ABlockOutputStream.<init>(S3ABlockOutputStream.java:168)
> 	at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:778)
> 	at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1169)
> 	at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1149)
> 	at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1038)
> 	at
> org.apache.flink.fs.s3.common.hadoop.HadoopFileSystem.create(HadoopFileSystem.java:141)
> 	at
> org.apache.flink.fs.s3.common.hadoop.HadoopFileSystem.create(HadoopFileSystem.java:37)
> 	at
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:126)
> 	at
> org.apache.flink.core.fs.EntropyInjector.createEntropyAware(EntropyInjector.java:61)
> 	at
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:356)
> 	... 32 common frames omitted
> {noformat}
> I think the problem is caused by {{org.apache.hadoop.fs.FileSystem#loadFileSystems}} method inside {{flink-s3-fs-hadoop}}, which is using {{ServiceLoader.load(FileSystem.class);}} to load a FileSystem via {{Thread.currentThread().getContextClassLoader();}}. At this point of time {{getContextClassLoader()}} is probably already the user class loader instead of plugin's.
> We should investigate why is this {{loadFileSystems}} method called so late (after we have already restored user's class loader) and how can we workaround this.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)