You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Congxian Qiu(klion26) (Jira)" <ji...@apache.org> on 2020/09/21 11:02:00 UTC

[jira] [Created] (FLINK-19325) Optimize the consumed time for checkpoint completion

Congxian Qiu(klion26) created FLINK-19325:
---------------------------------------------

             Summary: Optimize the consumed time for checkpoint completion
                 Key: FLINK-19325
                 URL: https://issues.apache.org/jira/browse/FLINK-19325
             Project: Flink
          Issue Type: Improvement
          Components: Runtime / Checkpointing
            Reporter: Congxian Qiu(klion26)


Currently when completing a checkpoint, we'll write out the state handle out in {{MetadataV2V3SerializerBase.java#serializeStreamStateHandle}}
{code:java}
static void serializeStreamStateHandle(StreamStateHandle stateHandle, DataOutputStream dos) throws IOException {
   if (stateHandle == null) {
      dos.writeByte(NULL_HANDLE);

   } else if (stateHandle instanceof RelativeFileStateHandle) {
      dos.writeByte(RELATIVE_STREAM_STATE_HANDLE);
      RelativeFileStateHandle relativeFileStateHandle = (RelativeFileStateHandle) stateHandle;
      dos.writeUTF(relativeFileStateHandle.getRelativePath());
      dos.writeLong(relativeFileStateHandle.getStateSize());
   } else if (stateHandle instanceof FileStateHandle) {
      dos.writeByte(FILE_STREAM_STATE_HANDLE);
      FileStateHandle fileStateHandle = (FileStateHandle) stateHandle;
      dos.writeLong(stateHandle.getStateSize());
      dos.writeUTF(fileStateHandle.getFilePath().toString());

   } else if (stateHandle instanceof ByteStreamStateHandle) {
      dos.writeByte(BYTE_STREAM_STATE_HANDLE);
      ByteStreamStateHandle byteStreamStateHandle = (ByteStreamStateHandle) stateHandle;
      dos.writeUTF(byteStreamStateHandle.getHandleName());
      byte[] internalData = byteStreamStateHandle.getData();
      dos.writeInt(internalData.length);
      dos.write(byteStreamStateHandle.getData());
   } else {
      throw new IOException("Unknown implementation of StreamStateHandle: " + stateHandle.getClass());
   }

   dos.flush();
}

{code}

We'll call {{dos.flush()}} after every state handle written out. But this may consume too much time and is not needed, because we'll close the outputstream after all things have been written out.

I propose to remove the {{dos.flush()}} here to optimize the consumed time for checkpoint completion



--
This message was sent by Atlassian Jira
(v8.3.4#803005)