You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Yun Gao (Jira)" <ji...@apache.org> on 2020/10/12 07:24:00 UTC
[jira] [Commented] (FLINK-18915) FIXED_PATH(dummy Hadoop Path) with
WriterImpl may cause ORC writer OOM
[ https://issues.apache.org/jira/browse/FLINK-18915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17212187#comment-17212187 ]
Yun Gao commented on FLINK-18915:
---------------------------------
Very thansk [~wei.wei] and [~lzljs3620320] for reporting and analyzing this issue! I checked the implementation and the issue should do exists. Since a sink task should always create its own OrcBulkWriterFactory on startup and it should not create duplicate writer for the same target path, I think we should simply change the FIX_PATH to a random path ?
> FIXED_PATH(dummy Hadoop Path) with WriterImpl may cause ORC writer OOM
> ----------------------------------------------------------------------
>
> Key: FLINK-18915
> URL: https://issues.apache.org/jira/browse/FLINK-18915
> Project: Flink
> Issue Type: Bug
> Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
> Affects Versions: 1.11.0, 1.11.1
> Reporter: wei
> Priority: Critical
> Fix For: 1.12.0, 1.11.3
>
>
> # OrcBulkWriterFactory
> {code:java}
> @Override
> public BulkWriter<T> create(FSDataOutputStream out) throws IOException {
> OrcFile.WriterOptions opts = getWriterOptions();
> opts.physicalWriter(new PhysicalWriterImpl(out, opts));
> return new OrcBulkWriter<>(vectorizer, new WriterImpl(null, FIXED_PATH, opts));
> }{code}
>
> # MemoryManagerImpl
> {code:java}
> //
> public void addWriter(Path path, long requestedAllocation,
> Callback callback) throws IOException {
> checkOwner();
> WriterInfo oldVal = writerList.get(path);
> // this should always be null, but we handle the case where the memory
> // manager wasn't told that a writer wasn't still in use and the task
> // starts writing to the same path.
> if (oldVal == null) {
> oldVal = new WriterInfo(requestedAllocation, callback);
> writerList.put(path, oldVal);
> totalAllocation += requestedAllocation;
> } else {
> // handle a new writer that is writing to the same path
> totalAllocation += requestedAllocation - oldVal.allocation;
> oldVal.allocation = requestedAllocation;
> oldVal.callback = callback;
> }
> updateScale(true);
> }
> {code}
> SinkTask may have multi BulkWriter create, FIXED_PATH will cause overlay the last writer callback;Last writer's WriterImpl#checkMemory will never called;
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)