You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/04/01 12:38:35 UTC

[GitHub] [flink] dmvk commented on a change in pull request #19304: [FLINK-26957][runtime] Removes flush in FileSystemJobResultStore.createDirtyResultInternal

dmvk commented on a change in pull request #19304:
URL: https://github.com/apache/flink/pull/19304#discussion_r840534110



##########
File path: flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataOutputStream.java
##########
@@ -35,6 +38,8 @@
     /** The file output stream used to write data. */
     private final FileOutputStream fos;
 
+    private volatile boolean isClosed = false;

Review comment:
       accessing a volatile field on these code paths is very expensive; in general output streams are not thread safe, so it's safe to use a non-volatile variable here

##########
File path: flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataOutputStream.java
##########
@@ -47,31 +52,49 @@ public LocalDataOutputStream(final File file) throws IOException {
 
     @Override
     public void write(final int b) throws IOException {
+        mayThrowClosedChannelException();
+        fos.write(b);
+    }
+
+    @Override
+    public void write(@Nonnull final byte[] b) throws IOException {
+        mayThrowClosedChannelException();
         fos.write(b);
     }
 
     @Override
     public void write(final byte[] b, final int off, final int len) throws IOException {
+        mayThrowClosedChannelException();
         fos.write(b, off, len);
     }
 
     @Override
     public void close() throws IOException {
         fos.close();
+        isClosed = true;

Review comment:
       should we switch the order here?

##########
File path: flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataOutputStream.java
##########
@@ -35,6 +38,8 @@
     /** The file output stream used to write data. */
     private final FileOutputStream fos;
 
+    private volatile boolean isClosed = false;

Review comment:
       `isClosed` -> `closed`; the prefix `is` prefix is for boolean getters
   
   ```suggestion
       private boolean closed = false;
   ```

##########
File path: flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataOutputStream.java
##########
@@ -47,31 +52,49 @@ public LocalDataOutputStream(final File file) throws IOException {
 
     @Override
     public void write(final int b) throws IOException {
+        mayThrowClosedChannelException();
+        fos.write(b);
+    }
+
+    @Override
+    public void write(@Nonnull final byte[] b) throws IOException {
+        mayThrowClosedChannelException();
         fos.write(b);
     }
 
     @Override
     public void write(final byte[] b, final int off, final int len) throws IOException {
+        mayThrowClosedChannelException();
         fos.write(b, off, len);
     }
 
     @Override
     public void close() throws IOException {
         fos.close();
+        isClosed = true;
     }
 
     @Override
     public void flush() throws IOException {
+        mayThrowClosedChannelException();
         fos.flush();
     }
 
     @Override
     public void sync() throws IOException {
+        mayThrowClosedChannelException();
         fos.getFD().sync();
     }
 
     @Override
     public long getPos() throws IOException {
+        mayThrowClosedChannelException();
         return fos.getChannel().position();
     }
+
+    private void mayThrowClosedChannelException() throws IOException {

Review comment:
       ```suggestion
       private void checkOpen() throws IOException {
   ```

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStore.java
##########
@@ -138,7 +138,6 @@ public void createDirtyResultInternal(JobResultEntry jobResultEntry) throws IOEx
         final Path path = constructDirtyPath(jobResultEntry.getJobId());
         try (OutputStream os = fileSystem.create(path, FileSystem.WriteMode.NO_OVERWRITE)) {
             mapper.writeValue(os, new JsonJobResultEntry(jobResultEntry));

Review comment:
       Hmm, I'm still not sure about this double close thingy 🤔 Can we simply make suure the `writeValue` doesn't close anything instead?
   
   ```suggestion
               mapper.writeValue(new NonClosingOutpusStreamDecorator(os), new JsonJobResultEntry(jobResultEntry));
   ```
   
   nit: there is a typo in `NonClosingOutpusStreamDecorator` class name




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org