You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Oleksandr Nitavskyi (Jira)" <ji...@apache.org> on 2023/03/30 12:51:00 UTC
[jira] [Commented] (FLINK-29242) Read time out when close write channel [flink-gs-fs-hadoop ]
[ https://issues.apache.org/jira/browse/FLINK-29242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17706863#comment-17706863 ]
Oleksandr Nitavskyi commented on FLINK-29242:
---------------------------------------------
While it seems that Flink can do something to be more resilient for such type of errors, here is a link to probably related issue in Google Cloud tracker: https://issuetracker.google.com/issues/191071342?pli=1
> Read time out when close write channel [flink-gs-fs-hadoop ]
> ------------------------------------------------------------
>
> Key: FLINK-29242
> URL: https://issues.apache.org/jira/browse/FLINK-29242
> Project: Flink
> Issue Type: Bug
> Components: FileSystems
> Affects Versions: 1.15.0
> Environment: flink version: 1.15
> jdk: 1.8
>
> Reporter: Jian Zheng
> Priority: Major
>
> h2. Detail
> See in GSBlobStorageImpl
> {code:java}
> @Override
> public int write(byte[] content, int start, int length) throws IOException {
> LOGGER.trace("Writing {} bytes to blob {}", length, blobIdentifier);
> Preconditions.checkNotNull(content);
> Preconditions.checkArgument(start >= 0);
> Preconditions.checkArgument(length >= 0);
> ByteBuffer byteBuffer = ByteBuffer.wrap(content, start, length);
> int written = writeChannel.write(byteBuffer);
> LOGGER.trace("Wrote {} bytes to blob {}", written, blobIdentifier);
> return written;
> }
> @Override
> public void close() throws IOException {
> LOGGER.trace("Closing write channel to blob {}", blobIdentifier);
> writeChannel.close();
> } {code}
> when I write data into google cloud storage by flink-gs-fs-haddoop.
> The service always has read time out exceptions, which can be reproduced in a very short time of task execution.
> I tried to trace the code and found that it always occurs when the writeChannel Close code is executed. I tried retrying by modifying the source code but it didn't solve the problem, the timeout is 20s and the checkpoint will fail if this problem occurs.
> I tried to change the chunk size but found no help, with this component, I can't write data to gcs via flink.
>
> By the way, I found that 503 service unavailable occurs when create writeChannel. This problem occurs less often than Read time out, but it needs to be checked
> {code:java}
> @Override
> public GSBlobStorage.WriteChannel writeBlob(GSBlobIdentifier blobIdentifier) {
> LOGGER.trace("Creating writeable blob for identifier {}", blobIdentifier);
> Preconditions.checkNotNull(blobIdentifier);
> BlobInfo blobInfo = BlobInfo.newBuilder(blobIdentifier.getBlobId()).build();
> com.google.cloud.WriteChannel writeChannel = storage.writer(blobInfo);
> return new WriteChannel(blobIdentifier, writeChannel);
> }
> @Override
> public GSBlobStorage.WriteChannel writeBlob(
> GSBlobIdentifier blobIdentifier, MemorySize chunkSize) {
> LOGGER.trace(
> "Creating writeable blob for identifier {} with chunk size {}",
> blobIdentifier,
> chunkSize);
> Preconditions.checkNotNull(blobIdentifier);
> Preconditions.checkArgument(chunkSize.getBytes() > 0);
> BlobInfo blobInfo = BlobInfo.newBuilder(blobIdentifier.getBlobId()).build();
> com.google.cloud.WriteChannel writeChannel = storage.writer(blobInfo);
> writeChannel.setChunkSize((int) chunkSize.getBytes());
> return new WriteChannel(blobIdentifier, writeChannel);
> } {code}
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)