You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/03/01 07:33:33 UTC
[flink-table-store] branch master updated: [hotfix] Set authority as thread name for FailingAtomicRenameFileSystem to support multi-thread testing
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 41c0eef [hotfix] Set authority as thread name for FailingAtomicRenameFileSystem to support multi-thread testing
41c0eef is described below
commit 41c0eefa0fec69aae051c004dc606a7be04f123a
Author: tsreaper <ts...@gmail.com>
AuthorDate: Tue Mar 1 15:33:30 2022 +0800
[hotfix] Set authority as thread name for FailingAtomicRenameFileSystem to support multi-thread testing
This closes #26
---
.../table/store/connector/FileStoreITCase.java | 5 +--
.../store/file/manifest/ManifestFileMetaTest.java | 6 ++--
.../store/file/manifest/ManifestFileTest.java | 5 ++-
.../store/file/manifest/ManifestListTest.java | 5 ++-
.../store/file/mergetree/sst/SstFileTest.java | 5 ++-
.../store/file/operation/FileStoreCommitTest.java | 32 +++++++----------
.../store/file/operation/FileStoreExpireTest.java | 5 +--
.../store/file/operation/FileStoreScanTest.java | 5 +--
.../store/file/operation/OperationTestUtils.java | 10 ++++--
.../store/file/operation/TestCommitThread.java | 2 +-
.../file/utils/FailingAtomicRenameFileSystem.java | 42 +++++++++++++++-------
11 files changed, 65 insertions(+), 57 deletions(-)
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
index 8e7aa42..f278086 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
@@ -38,6 +38,7 @@ import org.apache.flink.table.store.connector.source.FileStoreSource;
import org.apache.flink.table.store.file.FileStore;
import org.apache.flink.table.store.file.FileStoreImpl;
import org.apache.flink.table.store.file.mergetree.compact.DeduplicateAccumulator;
+import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
@@ -214,8 +215,8 @@ public class FileStoreITCase extends AbstractTestBase {
if (isBatch) {
options.set(FILE_PATH, folder.toURI().toString());
} else {
- options.set(FILE_PATH, "fail://" + folder.getPath());
- // FailingAtomicRenameFileSystem.setFailPossibility(20);
+ FailingAtomicRenameFileSystem.get().reset(3, 100);
+ options.set(FILE_PATH, FailingAtomicRenameFileSystem.getFailingPath(folder.getPath()));
}
options.set(FILE_FORMAT, "avro");
return options;
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
index b594802..8bf995f 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
@@ -110,15 +110,13 @@ public class ManifestFileMetaTest {
@RepeatedTest(10)
public void testCleanUpForException() throws IOException {
- FailingAtomicRenameFileSystem.resetFailCounter(1);
- FailingAtomicRenameFileSystem.setFailPossibility(10);
-
+ FailingAtomicRenameFileSystem.get().reset(1, 10);
List<ManifestFileMeta> input = new ArrayList<>();
List<ManifestEntry> entries = new ArrayList<>();
createData(input, entries, null);
ManifestFile failingManifestFile =
createManifestFile(
- FailingAtomicRenameFileSystem.SCHEME + "://" + tempDir.toString());
+ FailingAtomicRenameFileSystem.getFailingPath(tempDir.toString()));
try {
ManifestFileMeta.merge(input, entries, failingManifestFile, 500, 30);
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java
index 99a8b40..aa65636 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java
@@ -60,12 +60,11 @@ public class ManifestFileTest {
@RepeatedTest(10)
public void testCleanUpForException() throws IOException {
- FailingAtomicRenameFileSystem.resetFailCounter(1);
- FailingAtomicRenameFileSystem.setFailPossibility(10);
+ FailingAtomicRenameFileSystem.get().reset(1, 10);
List<ManifestEntry> entries = generateData();
ManifestFile manifestFile =
createManifestFile(
- FailingAtomicRenameFileSystem.SCHEME + "://" + tempDir.toString());
+ FailingAtomicRenameFileSystem.getFailingPath(tempDir.toString()));
try {
manifestFile.write(entries);
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestListTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestListTest.java
index 6b295c7..39f2529 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestListTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestListTest.java
@@ -58,12 +58,11 @@ public class ManifestListTest {
@RepeatedTest(10)
public void testCleanUpForException() throws IOException {
- FailingAtomicRenameFileSystem.resetFailCounter(1);
- FailingAtomicRenameFileSystem.setFailPossibility(3);
+ FailingAtomicRenameFileSystem.get().reset(1, 3);
List<ManifestFileMeta> metas = generateData();
ManifestList manifestList =
createManifestList(
- FailingAtomicRenameFileSystem.SCHEME + "://" + tempDir.toString());
+ FailingAtomicRenameFileSystem.getFailingPath(tempDir.toString()));
try {
manifestList.write(metas);
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstFileTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstFileTest.java
index 4441ad3..b83efb4 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstFileTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstFileTest.java
@@ -98,12 +98,11 @@ public class SstFileTest {
@RepeatedTest(10)
public void testCleanUpForException() throws IOException {
- FailingAtomicRenameFileSystem.resetFailCounter(1);
- FailingAtomicRenameFileSystem.setFailPossibility(10);
+ FailingAtomicRenameFileSystem.get().reset(1, 10);
SstTestDataGenerator.Data data = gen.next();
SstFileWriter writer =
createSstFileWriter(
- FailingAtomicRenameFileSystem.SCHEME + "://" + tempDir.toString());
+ FailingAtomicRenameFileSystem.getFailingPath(tempDir.toString()));
try {
writer.write(CloseableIterator.fromList(data.content, kv -> {}), 0);
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
index 927e211..136dff1 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
@@ -27,7 +27,6 @@ import org.apache.flink.table.store.file.TestKeyValueGenerator;
import org.apache.flink.table.store.file.ValueKind;
import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
-import org.apache.flink.table.store.file.utils.TestAtomicRenameFileSystem;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -67,25 +66,23 @@ public class FileStoreCommitTest {
Path root = new Path(tempDir.toString());
root.getFileSystem().mkdirs(new Path(root + "/snapshot"));
// for failure tests
- FailingAtomicRenameFileSystem.resetFailCounter(100);
- FailingAtomicRenameFileSystem.setFailPossibility(5000);
+ FailingAtomicRenameFileSystem.get().reset(100, 5000);
}
@ParameterizedTest
- @ValueSource(
- strings = {TestAtomicRenameFileSystem.SCHEME, FailingAtomicRenameFileSystem.SCHEME})
- public void testSingleCommitUser(String scheme) throws Exception {
- testRandomConcurrentNoConflict(1, scheme);
+ @ValueSource(booleans = {false, true})
+ public void testSingleCommitUser(boolean failing) throws Exception {
+ testRandomConcurrentNoConflict(1, failing);
}
@ParameterizedTest
- @ValueSource(
- strings = {TestAtomicRenameFileSystem.SCHEME, FailingAtomicRenameFileSystem.SCHEME})
- public void testManyCommitUsersNoConflict(String scheme) throws Exception {
- testRandomConcurrentNoConflict(ThreadLocalRandom.current().nextInt(3) + 2, scheme);
+ @ValueSource(booleans = {false, true})
+ public void testManyCommitUsersNoConflict(boolean failing) throws Exception {
+ testRandomConcurrentNoConflict(ThreadLocalRandom.current().nextInt(3) + 2, failing);
}
- protected void testRandomConcurrentNoConflict(int numThreads, String scheme) throws Exception {
+ protected void testRandomConcurrentNoConflict(int numThreads, boolean failing)
+ throws Exception {
// prepare test data
Map<BinaryRowData, List<KeyValue>> data =
generateData(ThreadLocalRandom.current().nextInt(1000) + 1);
@@ -112,9 +109,8 @@ public class FileStoreCommitTest {
TestCommitThread thread =
new TestCommitThread(
dataPerThread.get(i),
- OperationTestUtils.createPathFactory(scheme, tempDir.toString()),
- OperationTestUtils.createPathFactory(
- TestAtomicRenameFileSystem.SCHEME, tempDir.toString()));
+ OperationTestUtils.createPathFactory(failing, tempDir.toString()),
+ OperationTestUtils.createPathFactory(false, tempDir.toString()));
thread.start();
threads.add(thread);
}
@@ -135,8 +131,7 @@ public class FileStoreCommitTest {
// read actual data and compare
FileStorePathFactory safePathFactory =
- OperationTestUtils.createPathFactory(
- TestAtomicRenameFileSystem.SCHEME, tempDir.toString());
+ OperationTestUtils.createPathFactory(false, tempDir.toString());
Long snapshotId = safePathFactory.latestSnapshotId();
assertThat(snapshotId).isNotNull();
List<KeyValue> actualKvs =
@@ -161,8 +156,7 @@ public class FileStoreCommitTest {
"data1");
FileStorePathFactory pathFactory =
- OperationTestUtils.createPathFactory(
- TestAtomicRenameFileSystem.SCHEME, tempDir.toString());
+ OperationTestUtils.createPathFactory(false, tempDir.toString());
OperationTestUtils.commitData(
data1.values().stream().flatMap(Collection::stream).collect(Collectors.toList()),
gen::getPartition,
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
index de8f410..54ba19f 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
@@ -27,7 +27,6 @@ import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.Snapshot;
import org.apache.flink.table.store.file.TestKeyValueGenerator;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
-import org.apache.flink.table.store.file.utils.TestAtomicRenameFileSystem;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -55,9 +54,7 @@ public class FileStoreExpireTest {
@BeforeEach
public void beforeEach() throws IOException {
gen = new TestKeyValueGenerator();
- pathFactory =
- OperationTestUtils.createPathFactory(
- TestAtomicRenameFileSystem.SCHEME, tempDir.toString());
+ pathFactory = OperationTestUtils.createPathFactory(false, tempDir.toString());
Path root = new Path(tempDir.toString());
root.getFileSystem().mkdirs(new Path(root + "/snapshot"));
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java
index 0d1a76c..6f21c9e 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java
@@ -28,7 +28,6 @@ import org.apache.flink.table.store.file.TestKeyValueGenerator;
import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
import org.apache.flink.table.store.file.manifest.ManifestList;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
-import org.apache.flink.table.store.file.utils.TestAtomicRenameFileSystem;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.RepeatedTest;
@@ -62,9 +61,7 @@ public class FileStoreScanTest {
@BeforeEach
public void beforeEach() throws IOException {
gen = new TestKeyValueGenerator();
- pathFactory =
- OperationTestUtils.createPathFactory(
- TestAtomicRenameFileSystem.SCHEME, tempDir.toString());
+ pathFactory = OperationTestUtils.createPathFactory(false, tempDir.toString());
Path root = new Path(tempDir.toString());
root.getFileSystem().mkdirs(new Path(root + "/snapshot"));
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/OperationTestUtils.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/OperationTestUtils.java
index bbec00b..0fe2fe0 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/OperationTestUtils.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/OperationTestUtils.java
@@ -34,9 +34,11 @@ import org.apache.flink.table.store.file.mergetree.Increment;
import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
import org.apache.flink.table.store.file.mergetree.compact.DeduplicateAccumulator;
import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
+import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.RecordReaderIterator;
import org.apache.flink.table.store.file.utils.RecordWriter;
+import org.apache.flink.table.store.file.utils.TestAtomicRenameFileSystem;
import org.apache.flink.util.function.QuadFunction;
import java.io.IOException;
@@ -129,9 +131,13 @@ public class OperationTestUtils {
pathFactory);
}
- public static FileStorePathFactory createPathFactory(String scheme, String root) {
+ public static FileStorePathFactory createPathFactory(boolean failing, String root) {
+ String path =
+ failing
+ ? FailingAtomicRenameFileSystem.getFailingPath(root)
+ : TestAtomicRenameFileSystem.SCHEME + "://" + root;
return new FileStorePathFactory(
- new Path(scheme + "://" + root), TestKeyValueGenerator.PARTITION_TYPE, "default");
+ new Path(path), TestKeyValueGenerator.PARTITION_TYPE, "default");
}
private static ManifestFile.Factory createManifestFileFactory(
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
index 43fb2e9..c6b6501 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
@@ -137,7 +137,7 @@ public class TestCommitThread extends Thread {
break;
} catch (Throwable e) {
if (LOG.isDebugEnabled()) {
- LOG.warn("Failed to commit because of exception, try again", e);
+ LOG.debug("Failed to commit because of exception, try again", e);
}
writers.clear();
shouldCheckFilter = true;
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FailingAtomicRenameFileSystem.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FailingAtomicRenameFileSystem.java
index eb3b32a..4371b01 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FailingAtomicRenameFileSystem.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FailingAtomicRenameFileSystem.java
@@ -39,15 +39,31 @@ public class FailingAtomicRenameFileSystem extends TestAtomicRenameFileSystem {
public static final String SCHEME = "fail";
- private static final AtomicInteger failCounter = new AtomicInteger();
- private static int failPossibility = 1000;
+ private final String threadName;
+ private final AtomicInteger failCounter = new AtomicInteger();
+ private int failPossibility;
- public static void resetFailCounter(int maxValue) {
- failCounter.set(maxValue);
+ public FailingAtomicRenameFileSystem(String threadName) {
+ this.threadName = threadName;
}
- public static void setFailPossibility(int v) {
- failPossibility = v;
+ public static FailingAtomicRenameFileSystem get() {
+ try {
+ return (FailingAtomicRenameFileSystem) new Path(getFailingPath("/")).getFileSystem();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static String getFailingPath(String path) {
+ // set authority as thread name so that different testing threads use different instances
+ // for more information see FileSystem#getUnguardedFileSystem for the caching strategy
+ return SCHEME + "://" + Thread.currentThread().getName() + path;
+ }
+
+ public void reset(int maxFails, int failPossibility) {
+ failCounter.set(maxFails);
+ this.failPossibility = failPossibility;
}
@Override
@@ -68,7 +84,7 @@ public class FailingAtomicRenameFileSystem extends TestAtomicRenameFileSystem {
@Override
public URI getUri() {
- return URI.create(SCHEME + ":///");
+ return URI.create(SCHEME + "://" + threadName + "/");
}
/** {@link FileSystemFactory} for {@link FailingAtomicRenameFileSystem}. */
@@ -81,7 +97,7 @@ public class FailingAtomicRenameFileSystem extends TestAtomicRenameFileSystem {
@Override
public FileSystem create(URI uri) throws IOException {
- return new FailingAtomicRenameFileSystem();
+ return new FailingAtomicRenameFileSystem(uri.getAuthority());
}
}
@@ -93,7 +109,7 @@ public class FailingAtomicRenameFileSystem extends TestAtomicRenameFileSystem {
}
}
- private static class FailingFSDataInputStreamWrapper extends FSDataInputStreamWrapper {
+ private class FailingFSDataInputStreamWrapper extends FSDataInputStreamWrapper {
public FailingFSDataInputStreamWrapper(FSDataInputStream inputStream) {
super(inputStream);
@@ -118,7 +134,7 @@ public class FailingAtomicRenameFileSystem extends TestAtomicRenameFileSystem {
}
}
- private static class FailingFSDataOutputStreamWrapper extends FSDataOutputStreamWrapper {
+ private class FailingFSDataOutputStreamWrapper extends FSDataOutputStreamWrapper {
public FailingFSDataOutputStreamWrapper(FSDataOutputStream outputStream) {
super(outputStream);
@@ -126,7 +142,8 @@ public class FailingAtomicRenameFileSystem extends TestAtomicRenameFileSystem {
@Override
public void write(int b) throws IOException {
- if (ThreadLocalRandom.current().nextInt(failPossibility) == 0) {
+ if (ThreadLocalRandom.current().nextInt(failPossibility) == 0
+ && failCounter.getAndDecrement() > 0) {
throw new ArtificialException();
}
super.write(b);
@@ -134,7 +151,8 @@ public class FailingAtomicRenameFileSystem extends TestAtomicRenameFileSystem {
@Override
public void write(byte[] b, int off, int len) throws IOException {
- if (ThreadLocalRandom.current().nextInt(failPossibility) == 0) {
+ if (ThreadLocalRandom.current().nextInt(failPossibility) == 0
+ && failCounter.getAndDecrement() > 0) {
throw new ArtificialException();
}
super.write(b, off, len);