You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/01/16 12:11:25 UTC

[GitHub] [flink] zentol commented on a change in pull request #18191: [FLINK-25405] Let BlobServer/BlobCache check and delete corrupted blobs at start up

zentol commented on a change in pull request #18191:
URL: https://github.com/apache/flink/pull/18191#discussion_r785430902



##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java
##########
@@ -71,4 +77,44 @@ public void testTaskManagerFallbackBlobStorageDirectory1() throws IOException {
     public void testBlobUtilsFailIfNoStorageDirectoryIsSpecified() throws IOException {
         BlobUtils.createBlobStorageDirectory(new Configuration(), null);
     }
+
+    @Test
+    public void testCheckAndDeleteCorruptedBlobsDeletesCorruptedBlobs() throws IOException {
+        final File storageDir = temporaryFolder.newFolder();
+
+        final BlobKey corruptedBlobKey = BlobKey.createKey(BlobKey.BlobType.PERMANENT_BLOB);
+        final byte[] corruptedContent = "corrupted".getBytes(StandardCharsets.UTF_8);
+
+        final byte[] validContent = "valid".getBytes(StandardCharsets.UTF_8);
+        final byte[] validKey = BlobUtils.createMessageDigest().digest(validContent);
+        final BlobKey validPermanentBlobKey =
+                BlobKey.createKey(BlobKey.BlobType.PERMANENT_BLOB, validKey);
+        final BlobKey validTransientBlobKey =
+                BlobKey.createKey(BlobKey.BlobType.TRANSIENT_BLOB, validKey);
+
+        writeBlob(storageDir, corruptedBlobKey, corruptedContent);
+        writeBlob(storageDir, validPermanentBlobKey, validContent);
+        writeBlob(storageDir, validTransientBlobKey, validContent);
+
+        BlobUtils.checkAndDeleteCorruptedBlobs(storageDir.toPath(), log);
+
+        assertThat(
+                BlobUtils.listBlobsInDirectory(storageDir.toPath()).stream()
+                        .map(BlobUtils.Blob::getBlobKey)
+                        .collect(Collectors.toList()),
+                containsInAnyOrder(validPermanentBlobKey, validTransientBlobKey));
+    }
+
+    private void writeBlob(File storageDir, BlobKey corruptedBlobKey, byte[] fileContent)

Review comment:
       rename `corruptedBlobKey`

##########
File path: flink-architecture-tests/violations/e5126cae-f3fe-48aa-b6fb-60ae6cc3fcd5
##########
@@ -8,6 +8,7 @@ Method <org.apache.flink.orc.nohive.shim.OrcNoHiveShim.createRecordReader(org.ap
 Method <org.apache.flink.runtime.blob.BlobInputStream.read()> calls method <org.apache.flink.runtime.blob.BlobKey.getHash()> in (BlobInputStream.java:127)
 Method <org.apache.flink.runtime.blob.BlobInputStream.read([B, int, int)> calls method <org.apache.flink.runtime.blob.BlobKey.getHash()> in (BlobInputStream.java:163)
 Method <org.apache.flink.runtime.blob.BlobOutputStream.receiveAndCheckPutResponse(java.io.InputStream, java.security.MessageDigest, org.apache.flink.runtime.blob.BlobKey$BlobType)> calls method <org.apache.flink.runtime.blob.BlobKey.getHash()> in (BlobOutputStream.java:155)
+Method <org.apache.flink.runtime.blob.BlobUtils.checkAndDeleteCorruptedBlobs(java.nio.file.Path, org.slf4j.Logger)> calls method <org.apache.flink.runtime.blob.BlobKey.getHash()> in (BlobUtils.java:491)

Review comment:
       false positive because of the public/package-private issue?

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
##########
@@ -63,16 +64,12 @@
 import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
 import static org.apache.flink.runtime.blob.BlobKeyTest.verifyKeyDifferentHashEquals;
 import static org.apache.flink.runtime.blob.BlobServerGetTest.get;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assume.assumeTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;

Review comment:
       🎉 

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java
##########
@@ -71,4 +77,44 @@ public void testTaskManagerFallbackBlobStorageDirectory1() throws IOException {
     public void testBlobUtilsFailIfNoStorageDirectoryIsSpecified() throws IOException {
         BlobUtils.createBlobStorageDirectory(new Configuration(), null);
     }
+
+    @Test
+    public void testCheckAndDeleteCorruptedBlobsDeletesCorruptedBlobs() throws IOException {
+        final File storageDir = temporaryFolder.newFolder();
+
+        final BlobKey corruptedBlobKey = BlobKey.createKey(BlobKey.BlobType.PERMANENT_BLOB);
+        final byte[] corruptedContent = "corrupted".getBytes(StandardCharsets.UTF_8);
+
+        final byte[] validContent = "valid".getBytes(StandardCharsets.UTF_8);
+        final byte[] validKey = BlobUtils.createMessageDigest().digest(validContent);
+        final BlobKey validPermanentBlobKey =
+                BlobKey.createKey(BlobKey.BlobType.PERMANENT_BLOB, validKey);
+        final BlobKey validTransientBlobKey =
+                BlobKey.createKey(BlobKey.BlobType.TRANSIENT_BLOB, validKey);
+
+        writeBlob(storageDir, corruptedBlobKey, corruptedContent);
+        writeBlob(storageDir, validPermanentBlobKey, validContent);
+        writeBlob(storageDir, validTransientBlobKey, validContent);
+
+        BlobUtils.checkAndDeleteCorruptedBlobs(storageDir.toPath(), log);
+
+        assertThat(
+                BlobUtils.listBlobsInDirectory(storageDir.toPath()).stream()
+                        .map(BlobUtils.Blob::getBlobKey)
+                        .collect(Collectors.toList()),
+                containsInAnyOrder(validPermanentBlobKey, validTransientBlobKey));
+    }
+
+    private void writeBlob(File storageDir, BlobKey corruptedBlobKey, byte[] fileContent)
+            throws IOException {
+        final File corruptedFile = new File(storageDir, "corrupted");
+        Files.write(corruptedFile.toPath(), fileContent);

Review comment:
       Why don't we just do `Files.write(storageLocation, fileContent);` at the end instead of the move?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org