You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Oleksandr Nitavskyi (Jira)" <ji...@apache.org> on 2023/03/30 12:51:00 UTC

[jira] [Commented] (FLINK-29242) Read time out when close write channel [flink-gs-fs-hadoop ]

    [ https://issues.apache.org/jira/browse/FLINK-29242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17706863#comment-17706863 ] 

Oleksandr Nitavskyi commented on FLINK-29242:
---------------------------------------------

While it seems that Flink can do something to be more resilient for such type of errors, here is a link to probably related issue in Google Cloud tracker: https://issuetracker.google.com/issues/191071342?pli=1 

> Read time out when close write channel [flink-gs-fs-hadoop ]
> ------------------------------------------------------------
>
>                 Key: FLINK-29242
>                 URL: https://issues.apache.org/jira/browse/FLINK-29242
>             Project: Flink
>          Issue Type: Bug
>          Components: FileSystems
>    Affects Versions: 1.15.0
>         Environment: flink version: 1.15
> jdk: 1.8
>  
>            Reporter: Jian Zheng
>            Priority: Major
>
> h2. Detail
> See in GSBlobStorageImpl
> {code:java}
> @Override
> public int write(byte[] content, int start, int length) throws IOException {
>     LOGGER.trace("Writing {} bytes to blob {}", length, blobIdentifier);
>     Preconditions.checkNotNull(content);
>     Preconditions.checkArgument(start >= 0);
>     Preconditions.checkArgument(length >= 0);
>     ByteBuffer byteBuffer = ByteBuffer.wrap(content, start, length);
>     int written = writeChannel.write(byteBuffer);
>     LOGGER.trace("Wrote {} bytes to blob {}", written, blobIdentifier);
>     return written;
> }
> @Override
> public void close() throws IOException {
>     LOGGER.trace("Closing write channel to blob {}", blobIdentifier);
>     writeChannel.close();
> } {code}
> when I write data into google cloud storage by flink-gs-fs-haddoop.
> The service always has read time out exceptions, which can be reproduced in a very short time of task execution. 
> I tried to trace the code and found that it always occurs when the writeChannel Close code is executed. I tried retrying by modifying the source code but it didn't solve the problem, the timeout is 20s and the checkpoint will fail if this problem occurs.
> I tried to change the chunk size but found no help, with this component, I can't write data to gcs via flink.
>  
> By the way, I found that 503 service unavailable occurs when create writeChannel. This problem occurs less often than Read time out, but it needs to be checked
> {code:java}
> @Override
> public GSBlobStorage.WriteChannel writeBlob(GSBlobIdentifier blobIdentifier) {
>     LOGGER.trace("Creating writeable blob for identifier {}", blobIdentifier);
>     Preconditions.checkNotNull(blobIdentifier);
>     BlobInfo blobInfo = BlobInfo.newBuilder(blobIdentifier.getBlobId()).build();
>     com.google.cloud.WriteChannel writeChannel = storage.writer(blobInfo);
>     return new WriteChannel(blobIdentifier, writeChannel);
> }
> @Override
> public GSBlobStorage.WriteChannel writeBlob(
>         GSBlobIdentifier blobIdentifier, MemorySize chunkSize) {
>     LOGGER.trace(
>             "Creating writeable blob for identifier {} with chunk size {}",
>             blobIdentifier,
>             chunkSize);
>     Preconditions.checkNotNull(blobIdentifier);
>     Preconditions.checkArgument(chunkSize.getBytes() > 0);
>     BlobInfo blobInfo = BlobInfo.newBuilder(blobIdentifier.getBlobId()).build();
>     com.google.cloud.WriteChannel writeChannel = storage.writer(blobInfo);
>     writeChannel.setChunkSize((int) chunkSize.getBytes());
>     return new WriteChannel(blobIdentifier, writeChannel);
> } {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)