You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/03/01 07:33:33 UTC

[flink-table-store] branch master updated: [hotfix] Set authority as thread name for FailingAtomicRenameFileSystem to support multi-thread testing

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 41c0eef  [hotfix] Set authority as thread name for FailingAtomicRenameFileSystem to support multi-thread testing
41c0eef is described below

commit 41c0eefa0fec69aae051c004dc606a7be04f123a
Author: tsreaper <ts...@gmail.com>
AuthorDate: Tue Mar 1 15:33:30 2022 +0800

    [hotfix] Set authority as thread name for FailingAtomicRenameFileSystem to support multi-thread testing
    
    This closes #26
---
 .../table/store/connector/FileStoreITCase.java     |  5 +--
 .../store/file/manifest/ManifestFileMetaTest.java  |  6 ++--
 .../store/file/manifest/ManifestFileTest.java      |  5 ++-
 .../store/file/manifest/ManifestListTest.java      |  5 ++-
 .../store/file/mergetree/sst/SstFileTest.java      |  5 ++-
 .../store/file/operation/FileStoreCommitTest.java  | 32 +++++++----------
 .../store/file/operation/FileStoreExpireTest.java  |  5 +--
 .../store/file/operation/FileStoreScanTest.java    |  5 +--
 .../store/file/operation/OperationTestUtils.java   | 10 ++++--
 .../store/file/operation/TestCommitThread.java     |  2 +-
 .../file/utils/FailingAtomicRenameFileSystem.java  | 42 +++++++++++++++-------
 11 files changed, 65 insertions(+), 57 deletions(-)

diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
index 8e7aa42..f278086 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
@@ -38,6 +38,7 @@ import org.apache.flink.table.store.connector.source.FileStoreSource;
 import org.apache.flink.table.store.file.FileStore;
 import org.apache.flink.table.store.file.FileStoreImpl;
 import org.apache.flink.table.store.file.mergetree.compact.DeduplicateAccumulator;
+import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.VarCharType;
@@ -214,8 +215,8 @@ public class FileStoreITCase extends AbstractTestBase {
         if (isBatch) {
             options.set(FILE_PATH, folder.toURI().toString());
         } else {
-            options.set(FILE_PATH, "fail://" + folder.getPath());
-            // FailingAtomicRenameFileSystem.setFailPossibility(20);
+            FailingAtomicRenameFileSystem.get().reset(3, 100);
+            options.set(FILE_PATH, FailingAtomicRenameFileSystem.getFailingPath(folder.getPath()));
         }
         options.set(FILE_FORMAT, "avro");
         return options;
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
index b594802..8bf995f 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
@@ -110,15 +110,13 @@ public class ManifestFileMetaTest {
 
     @RepeatedTest(10)
     public void testCleanUpForException() throws IOException {
-        FailingAtomicRenameFileSystem.resetFailCounter(1);
-        FailingAtomicRenameFileSystem.setFailPossibility(10);
-
+        FailingAtomicRenameFileSystem.get().reset(1, 10);
         List<ManifestFileMeta> input = new ArrayList<>();
         List<ManifestEntry> entries = new ArrayList<>();
         createData(input, entries, null);
         ManifestFile failingManifestFile =
                 createManifestFile(
-                        FailingAtomicRenameFileSystem.SCHEME + "://" + tempDir.toString());
+                        FailingAtomicRenameFileSystem.getFailingPath(tempDir.toString()));
 
         try {
             ManifestFileMeta.merge(input, entries, failingManifestFile, 500, 30);
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java
index 99a8b40..aa65636 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java
@@ -60,12 +60,11 @@ public class ManifestFileTest {
 
     @RepeatedTest(10)
     public void testCleanUpForException() throws IOException {
-        FailingAtomicRenameFileSystem.resetFailCounter(1);
-        FailingAtomicRenameFileSystem.setFailPossibility(10);
+        FailingAtomicRenameFileSystem.get().reset(1, 10);
         List<ManifestEntry> entries = generateData();
         ManifestFile manifestFile =
                 createManifestFile(
-                        FailingAtomicRenameFileSystem.SCHEME + "://" + tempDir.toString());
+                        FailingAtomicRenameFileSystem.getFailingPath(tempDir.toString()));
 
         try {
             manifestFile.write(entries);
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestListTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestListTest.java
index 6b295c7..39f2529 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestListTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestListTest.java
@@ -58,12 +58,11 @@ public class ManifestListTest {
 
     @RepeatedTest(10)
     public void testCleanUpForException() throws IOException {
-        FailingAtomicRenameFileSystem.resetFailCounter(1);
-        FailingAtomicRenameFileSystem.setFailPossibility(3);
+        FailingAtomicRenameFileSystem.get().reset(1, 3);
         List<ManifestFileMeta> metas = generateData();
         ManifestList manifestList =
                 createManifestList(
-                        FailingAtomicRenameFileSystem.SCHEME + "://" + tempDir.toString());
+                        FailingAtomicRenameFileSystem.getFailingPath(tempDir.toString()));
 
         try {
             manifestList.write(metas);
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstFileTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstFileTest.java
index 4441ad3..b83efb4 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstFileTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstFileTest.java
@@ -98,12 +98,11 @@ public class SstFileTest {
 
     @RepeatedTest(10)
     public void testCleanUpForException() throws IOException {
-        FailingAtomicRenameFileSystem.resetFailCounter(1);
-        FailingAtomicRenameFileSystem.setFailPossibility(10);
+        FailingAtomicRenameFileSystem.get().reset(1, 10);
         SstTestDataGenerator.Data data = gen.next();
         SstFileWriter writer =
                 createSstFileWriter(
-                        FailingAtomicRenameFileSystem.SCHEME + "://" + tempDir.toString());
+                        FailingAtomicRenameFileSystem.getFailingPath(tempDir.toString()));
 
         try {
             writer.write(CloseableIterator.fromList(data.content, kv -> {}), 0);
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
index 927e211..136dff1 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
@@ -27,7 +27,6 @@ import org.apache.flink.table.store.file.TestKeyValueGenerator;
 import org.apache.flink.table.store.file.ValueKind;
 import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
-import org.apache.flink.table.store.file.utils.TestAtomicRenameFileSystem;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -67,25 +66,23 @@ public class FileStoreCommitTest {
         Path root = new Path(tempDir.toString());
         root.getFileSystem().mkdirs(new Path(root + "/snapshot"));
         // for failure tests
-        FailingAtomicRenameFileSystem.resetFailCounter(100);
-        FailingAtomicRenameFileSystem.setFailPossibility(5000);
+        FailingAtomicRenameFileSystem.get().reset(100, 5000);
     }
 
     @ParameterizedTest
-    @ValueSource(
-            strings = {TestAtomicRenameFileSystem.SCHEME, FailingAtomicRenameFileSystem.SCHEME})
-    public void testSingleCommitUser(String scheme) throws Exception {
-        testRandomConcurrentNoConflict(1, scheme);
+    @ValueSource(booleans = {false, true})
+    public void testSingleCommitUser(boolean failing) throws Exception {
+        testRandomConcurrentNoConflict(1, failing);
     }
 
     @ParameterizedTest
-    @ValueSource(
-            strings = {TestAtomicRenameFileSystem.SCHEME, FailingAtomicRenameFileSystem.SCHEME})
-    public void testManyCommitUsersNoConflict(String scheme) throws Exception {
-        testRandomConcurrentNoConflict(ThreadLocalRandom.current().nextInt(3) + 2, scheme);
+    @ValueSource(booleans = {false, true})
+    public void testManyCommitUsersNoConflict(boolean failing) throws Exception {
+        testRandomConcurrentNoConflict(ThreadLocalRandom.current().nextInt(3) + 2, failing);
     }
 
-    protected void testRandomConcurrentNoConflict(int numThreads, String scheme) throws Exception {
+    protected void testRandomConcurrentNoConflict(int numThreads, boolean failing)
+            throws Exception {
         // prepare test data
         Map<BinaryRowData, List<KeyValue>> data =
                 generateData(ThreadLocalRandom.current().nextInt(1000) + 1);
@@ -112,9 +109,8 @@ public class FileStoreCommitTest {
             TestCommitThread thread =
                     new TestCommitThread(
                             dataPerThread.get(i),
-                            OperationTestUtils.createPathFactory(scheme, tempDir.toString()),
-                            OperationTestUtils.createPathFactory(
-                                    TestAtomicRenameFileSystem.SCHEME, tempDir.toString()));
+                            OperationTestUtils.createPathFactory(failing, tempDir.toString()),
+                            OperationTestUtils.createPathFactory(false, tempDir.toString()));
             thread.start();
             threads.add(thread);
         }
@@ -135,8 +131,7 @@ public class FileStoreCommitTest {
 
         // read actual data and compare
         FileStorePathFactory safePathFactory =
-                OperationTestUtils.createPathFactory(
-                        TestAtomicRenameFileSystem.SCHEME, tempDir.toString());
+                OperationTestUtils.createPathFactory(false, tempDir.toString());
         Long snapshotId = safePathFactory.latestSnapshotId();
         assertThat(snapshotId).isNotNull();
         List<KeyValue> actualKvs =
@@ -161,8 +156,7 @@ public class FileStoreCommitTest {
                 "data1");
 
         FileStorePathFactory pathFactory =
-                OperationTestUtils.createPathFactory(
-                        TestAtomicRenameFileSystem.SCHEME, tempDir.toString());
+                OperationTestUtils.createPathFactory(false, tempDir.toString());
         OperationTestUtils.commitData(
                 data1.values().stream().flatMap(Collection::stream).collect(Collectors.toList()),
                 gen::getPartition,
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
index de8f410..54ba19f 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
@@ -27,7 +27,6 @@ import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.Snapshot;
 import org.apache.flink.table.store.file.TestKeyValueGenerator;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
-import org.apache.flink.table.store.file.utils.TestAtomicRenameFileSystem;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -55,9 +54,7 @@ public class FileStoreExpireTest {
     @BeforeEach
     public void beforeEach() throws IOException {
         gen = new TestKeyValueGenerator();
-        pathFactory =
-                OperationTestUtils.createPathFactory(
-                        TestAtomicRenameFileSystem.SCHEME, tempDir.toString());
+        pathFactory = OperationTestUtils.createPathFactory(false, tempDir.toString());
         Path root = new Path(tempDir.toString());
         root.getFileSystem().mkdirs(new Path(root + "/snapshot"));
     }
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java
index 0d1a76c..6f21c9e 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java
@@ -28,7 +28,6 @@ import org.apache.flink.table.store.file.TestKeyValueGenerator;
 import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
 import org.apache.flink.table.store.file.manifest.ManifestList;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
-import org.apache.flink.table.store.file.utils.TestAtomicRenameFileSystem;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.RepeatedTest;
@@ -62,9 +61,7 @@ public class FileStoreScanTest {
     @BeforeEach
     public void beforeEach() throws IOException {
         gen = new TestKeyValueGenerator();
-        pathFactory =
-                OperationTestUtils.createPathFactory(
-                        TestAtomicRenameFileSystem.SCHEME, tempDir.toString());
+        pathFactory = OperationTestUtils.createPathFactory(false, tempDir.toString());
         Path root = new Path(tempDir.toString());
         root.getFileSystem().mkdirs(new Path(root + "/snapshot"));
     }
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/OperationTestUtils.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/OperationTestUtils.java
index bbec00b..0fe2fe0 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/OperationTestUtils.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/OperationTestUtils.java
@@ -34,9 +34,11 @@ import org.apache.flink.table.store.file.mergetree.Increment;
 import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
 import org.apache.flink.table.store.file.mergetree.compact.DeduplicateAccumulator;
 import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
+import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.RecordReaderIterator;
 import org.apache.flink.table.store.file.utils.RecordWriter;
+import org.apache.flink.table.store.file.utils.TestAtomicRenameFileSystem;
 import org.apache.flink.util.function.QuadFunction;
 
 import java.io.IOException;
@@ -129,9 +131,13 @@ public class OperationTestUtils {
                 pathFactory);
     }
 
-    public static FileStorePathFactory createPathFactory(String scheme, String root) {
+    public static FileStorePathFactory createPathFactory(boolean failing, String root) {
+        String path =
+                failing
+                        ? FailingAtomicRenameFileSystem.getFailingPath(root)
+                        : TestAtomicRenameFileSystem.SCHEME + "://" + root;
         return new FileStorePathFactory(
-                new Path(scheme + "://" + root), TestKeyValueGenerator.PARTITION_TYPE, "default");
+                new Path(path), TestKeyValueGenerator.PARTITION_TYPE, "default");
     }
 
     private static ManifestFile.Factory createManifestFileFactory(
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
index 43fb2e9..c6b6501 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
@@ -137,7 +137,7 @@ public class TestCommitThread extends Thread {
                 break;
             } catch (Throwable e) {
                 if (LOG.isDebugEnabled()) {
-                    LOG.warn("Failed to commit because of exception, try again", e);
+                    LOG.debug("Failed to commit because of exception, try again", e);
                 }
                 writers.clear();
                 shouldCheckFilter = true;
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FailingAtomicRenameFileSystem.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FailingAtomicRenameFileSystem.java
index eb3b32a..4371b01 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FailingAtomicRenameFileSystem.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FailingAtomicRenameFileSystem.java
@@ -39,15 +39,31 @@ public class FailingAtomicRenameFileSystem extends TestAtomicRenameFileSystem {
 
     public static final String SCHEME = "fail";
 
-    private static final AtomicInteger failCounter = new AtomicInteger();
-    private static int failPossibility = 1000;
+    private final String threadName;
+    private final AtomicInteger failCounter = new AtomicInteger();
+    private int failPossibility;
 
-    public static void resetFailCounter(int maxValue) {
-        failCounter.set(maxValue);
+    public FailingAtomicRenameFileSystem(String threadName) {
+        this.threadName = threadName;
     }
 
-    public static void setFailPossibility(int v) {
-        failPossibility = v;
+    public static FailingAtomicRenameFileSystem get() {
+        try {
+            return (FailingAtomicRenameFileSystem) new Path(getFailingPath("/")).getFileSystem();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static String getFailingPath(String path) {
+        // set authority as thread name so that different testing threads use different instances
+        // for more information see FileSystem#getUnguardedFileSystem for the caching strategy
+        return SCHEME + "://" + Thread.currentThread().getName() + path;
+    }
+
+    public void reset(int maxFails, int failPossibility) {
+        failCounter.set(maxFails);
+        this.failPossibility = failPossibility;
     }
 
     @Override
@@ -68,7 +84,7 @@ public class FailingAtomicRenameFileSystem extends TestAtomicRenameFileSystem {
 
     @Override
     public URI getUri() {
-        return URI.create(SCHEME + ":///");
+        return URI.create(SCHEME + "://" + threadName + "/");
     }
 
     /** {@link FileSystemFactory} for {@link FailingAtomicRenameFileSystem}. */
@@ -81,7 +97,7 @@ public class FailingAtomicRenameFileSystem extends TestAtomicRenameFileSystem {
 
         @Override
         public FileSystem create(URI uri) throws IOException {
-            return new FailingAtomicRenameFileSystem();
+            return new FailingAtomicRenameFileSystem(uri.getAuthority());
         }
     }
 
@@ -93,7 +109,7 @@ public class FailingAtomicRenameFileSystem extends TestAtomicRenameFileSystem {
         }
     }
 
-    private static class FailingFSDataInputStreamWrapper extends FSDataInputStreamWrapper {
+    private class FailingFSDataInputStreamWrapper extends FSDataInputStreamWrapper {
 
         public FailingFSDataInputStreamWrapper(FSDataInputStream inputStream) {
             super(inputStream);
@@ -118,7 +134,7 @@ public class FailingAtomicRenameFileSystem extends TestAtomicRenameFileSystem {
         }
     }
 
-    private static class FailingFSDataOutputStreamWrapper extends FSDataOutputStreamWrapper {
+    private class FailingFSDataOutputStreamWrapper extends FSDataOutputStreamWrapper {
 
         public FailingFSDataOutputStreamWrapper(FSDataOutputStream outputStream) {
             super(outputStream);
@@ -126,7 +142,8 @@ public class FailingAtomicRenameFileSystem extends TestAtomicRenameFileSystem {
 
         @Override
         public void write(int b) throws IOException {
-            if (ThreadLocalRandom.current().nextInt(failPossibility) == 0) {
+            if (ThreadLocalRandom.current().nextInt(failPossibility) == 0
+                    && failCounter.getAndDecrement() > 0) {
                 throw new ArtificialException();
             }
             super.write(b);
@@ -134,7 +151,8 @@ public class FailingAtomicRenameFileSystem extends TestAtomicRenameFileSystem {
 
         @Override
         public void write(byte[] b, int off, int len) throws IOException {
-            if (ThreadLocalRandom.current().nextInt(failPossibility) == 0) {
+            if (ThreadLocalRandom.current().nextInt(failPossibility) == 0
+                    && failCounter.getAndDecrement() > 0) {
                 throw new ArtificialException();
             }
             super.write(b, off, len);