You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Ayush Verma <ay...@gmail.com> on 2019/06/03 18:02:20 UTC

Issue using Flink on EMR

Hello,

We have a Flink on EMR setup following this guide
<https://github.com/aws-samples/flink-stream-processing-refarch>. YARN,
apparently changes the io.tmp.dirs property to /mnt/yarn & /mnt1/yarn. When
using these directories, the flink job gets the following error.

2019-05-22 12:23:12,515 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph - Job
my-flink-job (e92bf814c495e2c713e24f1d37aa3afd) switched from state
RUNNING to FAILING.
java.nio.file.NoSuchFileException:
/mnt/yarn/usercache/hadoop/appcache/application_1558347223117_0001,/mnt1/yarn/usercache/hadoop/appcache/application_1558347223117_0001/.tmp_c729afcc-7bd7-4422-8232-306e28bc62c1
 at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
 at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
 at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
 at sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214)
 at java.nio.file.spi.FileSystemProvider.newOutputStream(FileSystemProvider.java:434)
 at java.nio.file.Files.newOutputStream(Files.java:216)
 at org.apache.flink.fs.s3.common.utils.RefCountedTmpFileCreator.apply(RefCountedTmpFileCreator.java:80)
 at org.apache.flink.fs.s3.common.utils.RefCountedTmpFileCreator.apply(RefCountedTmpFileCreator.java:39)
 at org.apache.flink.fs.s3.common.utils.RefCountedBufferingFileStream.openNew(RefCountedBufferingFileStream.java:174)
 at org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.boundedBufferingFileStream(S3RecoverableFsDataOutputStream.java:271)
 at org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.newStream(S3RecoverableFsDataOutputStream.java:236)
 at org.apache.flink.fs.s3.common.writer.S3RecoverableWriter.open(S3RecoverableWriter.java:74)
 at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:221)
 at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:212)
 at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:268)
 at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:370)
 at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
 at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
 at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:565)
 at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:724)
 at org.apache.flink.streaming.api.operators.ProcessOperator$ContextImpl.output(ProcessOperator.java:102)
 at net.skyscanner.data.platform.flink.parquet.ParquetConsumerJob$3.processElement(ParquetConsumerJob.java:96)
 at net.skyscanner.data.platform.flink.parquet.ParquetConsumerJob$3.processElement(ParquetConsumerJob.java:92)
 at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
 at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
 at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
 at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
 at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
 at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
 at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
 at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
 at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
 at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
 at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
 at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
 at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
 at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
 at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
 at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
 at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
 at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
 at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
 at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
 at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.emitRecordAndUpdateState(KinesisDataFetcher.java:551)
 at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.deserializeRecordForCollectionAndUpdateState(ShardConsumer.java:344)
 at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:231)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)


If I change the io.tmp.dirs to /tmp, for eg., the job works fine. BUT my
understanding is that, YARN creates a *shared* directory, by mounting it on
all the containers so that these files can survive container terminations.
Looking for advice on how to investigate further into this issue and
hopefully resolve it.