You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-issues@hadoop.apache.org by GitBox <gi...@apache.org> on 2021/02/02 10:36:12 UTC

[GitHub] [hadoop] mehakmeet commented on a change in pull request #2587: HADOOP-13327 Output Stream Specification.

mehakmeet commented on a change in pull request #2587:
URL: https://github.com/apache/hadoop/pull/2587#discussion_r568481286



##########
File path: hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractCreateTest.java
##########
@@ -436,4 +447,110 @@ private void createFile(Path path) throws IOException {
     writeDataset(fs, path, data, data.length, 1024 * 1024,
         true);
   }
+
+  @Test
+  public void testSyncable() throws Throwable {
+    describe("test declared and actual Syncable behaviors");
+    FileSystem fs = getFileSystem();
+    boolean supportsFlush = isSupported(SUPPORTS_HFLUSH);
+    boolean supportsSync = isSupported(SUPPORTS_HSYNC);
+
+    validateSyncableSemantics(fs, supportsSync, supportsFlush);
+  }
+
+  /**
+   * Validate the semantics of syncable.
+   * @param fs filesystem
+   * @param supportsSync sync is present
+   * @param supportsFlush flush is present.
+   * @throws IOException failure
+   */
+  protected void validateSyncableSemantics(final FileSystem fs,
+      final boolean supportsSync, final boolean supportsFlush) throws IOException {
+    Path path = methodPath();
+    LOG.info("Expecting files under {} to have supportsSync={}"
+            + " and supportsFlush={}",
+        path, supportsSync, supportsFlush);
+
+
+    try (FSDataOutputStream out = fs.create(path, true)) {
+      LOG.info("Created output stream {}", out);
+
+      // probe stream for support for flush/sync, whose capabilities
+      // of supports/does not support must match what is expected
+      String[] hflushCapabilities = {
+          StreamCapabilities.HFLUSH
+      };
+      String[] hsyncCapabilities = {
+          StreamCapabilities.HSYNC
+      };
+      if (supportsFlush) {
+        assertCapabilities(out, hflushCapabilities, null);
+      } else {
+        assertCapabilities(out, null, hflushCapabilities);
+      }
+      if (supportsSync) {
+        assertCapabilities(out, hsyncCapabilities, null);
+      } else {
+        assertCapabilities(out, null, hsyncCapabilities);
+      }
+      out.write('a');
+      try {
+        out.hflush();
+        if (!supportsFlush) {
+          // hsync not ignored

Review comment:
       little confusing comment, maybe we can change this to "hysc tests not ignored"?

##########
File path: hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java
##########
@@ -137,8 +139,13 @@ public void initialize(URI uri, Configuration conf) throws IOException {
             STREAM_READ_SKIP_BYTES)
         .build();
 
+    /** Reference to the bytes read counter for slightly faster counting. */
+    private final AtomicLong bytesRead;
+
     public LocalFSFileInputStream(Path f) throws IOException {
       fis = new FileInputStream(pathToFile(f));
+      bytesRead = ioStatistics.getCounterReference(

Review comment:
       No issues here, just confirming if these changes aren't part of a different PR by mistake(IOStats PR).

##########
File path: hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractCreateTest.java
##########
@@ -436,4 +447,110 @@ private void createFile(Path path) throws IOException {
     writeDataset(fs, path, data, data.length, 1024 * 1024,
         true);
   }
+
+  @Test
+  public void testSyncable() throws Throwable {
+    describe("test declared and actual Syncable behaviors");
+    FileSystem fs = getFileSystem();
+    boolean supportsFlush = isSupported(SUPPORTS_HFLUSH);
+    boolean supportsSync = isSupported(SUPPORTS_HSYNC);
+
+    validateSyncableSemantics(fs, supportsSync, supportsFlush);
+  }
+
+  /**
+   * Validate the semantics of syncable.
+   * @param fs filesystem
+   * @param supportsSync sync is present
+   * @param supportsFlush flush is present.
+   * @throws IOException failure
+   */
+  protected void validateSyncableSemantics(final FileSystem fs,
+      final boolean supportsSync, final boolean supportsFlush) throws IOException {
+    Path path = methodPath();
+    LOG.info("Expecting files under {} to have supportsSync={}"
+            + " and supportsFlush={}",
+        path, supportsSync, supportsFlush);
+

Review comment:
       nit: blank line.

##########
File path: hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractCreateTest.java
##########
@@ -436,4 +447,110 @@ private void createFile(Path path) throws IOException {
     writeDataset(fs, path, data, data.length, 1024 * 1024,
         true);
   }
+
+  @Test
+  public void testSyncable() throws Throwable {
+    describe("test declared and actual Syncable behaviors");
+    FileSystem fs = getFileSystem();
+    boolean supportsFlush = isSupported(SUPPORTS_HFLUSH);
+    boolean supportsSync = isSupported(SUPPORTS_HSYNC);
+
+    validateSyncableSemantics(fs, supportsSync, supportsFlush);
+  }
+
+  /**
+   * Validate the semantics of syncable.
+   * @param fs filesystem
+   * @param supportsSync sync is present
+   * @param supportsFlush flush is present.
+   * @throws IOException failure
+   */
+  protected void validateSyncableSemantics(final FileSystem fs,
+      final boolean supportsSync, final boolean supportsFlush) throws IOException {
+    Path path = methodPath();
+    LOG.info("Expecting files under {} to have supportsSync={}"
+            + " and supportsFlush={}",
+        path, supportsSync, supportsFlush);
+
+
+    try (FSDataOutputStream out = fs.create(path, true)) {
+      LOG.info("Created output stream {}", out);
+
+      // probe stream for support for flush/sync, whose capabilities
+      // of supports/does not support must match what is expected
+      String[] hflushCapabilities = {
+          StreamCapabilities.HFLUSH
+      };
+      String[] hsyncCapabilities = {
+          StreamCapabilities.HSYNC
+      };
+      if (supportsFlush) {
+        assertCapabilities(out, hflushCapabilities, null);
+      } else {
+        assertCapabilities(out, null, hflushCapabilities);
+      }
+      if (supportsSync) {
+        assertCapabilities(out, hsyncCapabilities, null);
+      } else {
+        assertCapabilities(out, null, hsyncCapabilities);
+      }
+      out.write('a');
+      try {
+        out.hflush();
+        if (!supportsFlush) {
+          // hsync not ignored
+          LOG.warn("FS doesn't support Syncable.hflush(),"
+              + " but doesn't reject it either.");
+        }
+      } catch (UnsupportedOperationException e) {
+        if (supportsFlush) {
+          throw new AssertionError("hflush not supported", e);
+        }
+      }
+      out.write('b');
+      try {
+        out.hsync();
+      } catch (UnsupportedOperationException e) {
+        if (supportsSync) {
+          throw new AssertionError("HSync not supported", e);
+        }
+      }
+
+      if (supportsSync) {
+        // if sync really worked, data is visible here
+
+        try(FSDataInputStream in = fs.open(path)) {
+          assertEquals('a', in.read());
+          assertEquals('b', in.read());
+          assertEquals(-1, in.read());
+          LOG.info("Successfully read synced data on a new reader {}", in);
+        }
+      } else {
+        // np sync. Let's do a flush and see what happens.
+        out.flush();
+        // Now look at the filesystem.
+        try (FSDataInputStream in = fs.open(path)) {
+
+          int c = in.read();
+          if (c == -1) {
+            // nothing was synced; sync and flush really aren't there.
+            LOG.info("sync and flush are declared unsupported"
+                + " -flushed changes were not saved");
+
+          } else {
+            LOG.info("sync and flush are declared unsupported"
+                + " - but the stream does offer some sync/flush semantics");
+          }
+        } catch (FileNotFoundException e) {
+          // that's OK if it's an object store, but not if its a real
+          // FS
+          if (!isSupported(IS_BLOBSTORE)) {
+            throw e;
+          }
+        }

Review comment:
       Maybe add a LOG.warn() about the FileNotFoundException if it is an object store?

##########
File path: hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractCreateTest.java
##########
@@ -436,4 +447,110 @@ private void createFile(Path path) throws IOException {
     writeDataset(fs, path, data, data.length, 1024 * 1024,
         true);
   }
+
+  @Test
+  public void testSyncable() throws Throwable {
+    describe("test declared and actual Syncable behaviors");
+    FileSystem fs = getFileSystem();
+    boolean supportsFlush = isSupported(SUPPORTS_HFLUSH);
+    boolean supportsSync = isSupported(SUPPORTS_HSYNC);
+
+    validateSyncableSemantics(fs, supportsSync, supportsFlush);
+  }
+
+  /**
+   * Validate the semantics of syncable.
+   * @param fs filesystem
+   * @param supportsSync sync is present
+   * @param supportsFlush flush is present.
+   * @throws IOException failure
+   */
+  protected void validateSyncableSemantics(final FileSystem fs,
+      final boolean supportsSync, final boolean supportsFlush) throws IOException {
+    Path path = methodPath();
+    LOG.info("Expecting files under {} to have supportsSync={}"
+            + " and supportsFlush={}",
+        path, supportsSync, supportsFlush);
+
+
+    try (FSDataOutputStream out = fs.create(path, true)) {
+      LOG.info("Created output stream {}", out);
+
+      // probe stream for support for flush/sync, whose capabilities
+      // of supports/does not support must match what is expected
+      String[] hflushCapabilities = {
+          StreamCapabilities.HFLUSH
+      };
+      String[] hsyncCapabilities = {
+          StreamCapabilities.HSYNC
+      };
+      if (supportsFlush) {
+        assertCapabilities(out, hflushCapabilities, null);
+      } else {
+        assertCapabilities(out, null, hflushCapabilities);
+      }
+      if (supportsSync) {
+        assertCapabilities(out, hsyncCapabilities, null);
+      } else {
+        assertCapabilities(out, null, hsyncCapabilities);
+      }
+      out.write('a');
+      try {
+        out.hflush();
+        if (!supportsFlush) {
+          // hsync not ignored
+          LOG.warn("FS doesn't support Syncable.hflush(),"
+              + " but doesn't reject it either.");
+        }
+      } catch (UnsupportedOperationException e) {
+        if (supportsFlush) {
+          throw new AssertionError("hflush not supported", e);
+        }
+      }
+      out.write('b');
+      try {
+        out.hsync();
+      } catch (UnsupportedOperationException e) {
+        if (supportsSync) {
+          throw new AssertionError("HSync not supported", e);
+        }
+      }
+
+      if (supportsSync) {
+        // if sync really worked, data is visible here
+
+        try(FSDataInputStream in = fs.open(path)) {
+          assertEquals('a', in.read());
+          assertEquals('b', in.read());
+          assertEquals(-1, in.read());
+          LOG.info("Successfully read synced data on a new reader {}", in);
+        }
+      } else {
+        // np sync. Let's do a flush and see what happens.

Review comment:
       nit: typo in "np" -> "no"




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org