You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/07/06 09:12:47 UTC

[flink-table-store] branch master updated: [FLINK-28221] Store commit user name into Flink sink state and correct endInput behavior

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new dc38c6c8 [FLINK-28221] Store commit user name into Flink sink state and correct endInput behavior
dc38c6c8 is described below

commit dc38c6c8279fa1582ab38c34e34d37aaa09edea2
Author: tsreaper <ts...@gmail.com>
AuthorDate: Wed Jul 6 17:12:42 2022 +0800

    [FLINK-28221] Store commit user name into Flink sink state and correct endInput behavior
    
    This closes #200
---
 .../store/connector/sink/CommitterOperator.java    |  67 ++++--
 .../store/connector/sink/FlinkSinkBuilder.java     |   5 +-
 .../table/store/connector/sink/StoreSink.java      |  12 +-
 .../table/store/connector/FileStoreITCase.java     |   8 +-
 .../store/connector/sink/SinkSavepointITCase.java  | 235 +++++++++++++++++++++
 .../flink/table/store/file/AbstractFileStore.java  |   5 +-
 .../table/store/file/AppendOnlyFileStore.java      |   3 +-
 .../apache/flink/table/store/file/FileStore.java   |   2 +-
 .../flink/table/store/file/KeyValueFileStore.java  |   3 +-
 .../table/store/table/AbstractFileStoreTable.java  |   4 +-
 .../store/table/AppendOnlyFileStoreTable.java      |   4 +-
 .../table/ChangelogValueCountFileStoreTable.java   |   3 +-
 .../table/ChangelogWithKeyFileStoreTable.java      |   3 +-
 .../flink/table/store/table/FileStoreTable.java    |   2 +-
 .../table/store/table/FileStoreTableFactory.java   |  20 +-
 .../flink/table/store/file/TestFileStore.java      |   5 +-
 .../flink/table/store/file/data/DataFileTest.java  |   8 +-
 .../store/file/manifest/ManifestFileMetaTest.java  |   7 +-
 .../store/file/manifest/ManifestFileTest.java      |   7 +-
 .../store/file/manifest/ManifestListTest.java      |   7 +-
 .../store/file/operation/FileStoreCommitTest.java  |  10 +-
 .../store/file/operation/TestCommitThread.java     |   3 +-
 .../table/store/file/schema/SchemaManagerTest.java |   6 +-
 .../file/utils/FailingAtomicRenameFileSystem.java  | 130 +++++++++---
 .../store/table/AppendOnlyFileStoreTableTest.java  |  10 +-
 .../ChangelogValueCountFileStoreTableTest.java     |  10 +-
 .../table/ChangelogWithKeyFileStoreTableTest.java  |  18 +-
 .../table/store/table/FileStoreTableTestBase.java  |   9 +-
 .../table/store/table/WritePreemptMemoryTest.java  |   6 +-
 .../flink/table/store/TableStoreJobConf.java       |   9 -
 .../store/hive/TableStoreHiveStorageHandler.java   |   6 +-
 .../table/store/mapred/TableStoreInputFormat.java  |   2 +-
 .../flink/table/store/FileStoreTestUtils.java      |   2 +-
 .../hive/TableStoreHiveStorageHandlerITCase.java   |  26 ++-
 .../store/mapred/TableStoreRecordReaderTest.java   |   7 +-
 .../table/store/spark/SimpleTableTestHelper.java   |   4 +-
 36 files changed, 514 insertions(+), 154 deletions(-)

diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommitterOperator.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommitterOperator.java
index fa7f60a7..0f16adc2 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommitterOperator.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommitterOperator.java
@@ -30,6 +30,8 @@ import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.table.store.file.manifest.ManifestCommittable;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.SerializableFunction;
 import org.apache.flink.util.function.SerializableSupplier;
 
 import java.util.ArrayDeque;
@@ -38,6 +40,7 @@ import java.util.Deque;
 import java.util.List;
 import java.util.NavigableMap;
 import java.util.TreeMap;
+import java.util.UUID;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -50,10 +53,13 @@ public class CommitterOperator extends AbstractStreamOperator<Committable>
     /** Record all the inputs until commit. */
     private final Deque<Committable> inputs = new ArrayDeque<>();
 
-    /** The operator's state descriptor. */
-    private static final ListStateDescriptor<byte[]> STREAMING_COMMITTER_RAW_STATES_DESC =
-            new ListStateDescriptor<>(
-                    "streaming_committer_raw_states", BytePrimitiveArraySerializer.INSTANCE);
+    /**
+     * If checkpoint is enabled we should do nothing in {@link CommitterOperator#endInput}.
+     * Remaining data will be committed in {@link CommitterOperator#notifyCheckpointComplete}. If
+     * checkpoint is not enabled we need to commit remaining data in {@link
+     * CommitterOperator#endInput}.
+     */
+    private final boolean checkpointEnabled;
 
     /** Group the committable by the checkpoint id. */
     private final NavigableMap<Long, ManifestCommittable> committablesPerCheckpoint;
@@ -62,10 +68,10 @@ public class CommitterOperator extends AbstractStreamOperator<Committable>
     private final SerializableSupplier<SimpleVersionedSerializer<ManifestCommittable>>
             committableSerializer;
 
-    /** The operator's state. */
+    /** ManifestCommittable state of this job. Used to filter out previous successful commits. */
     private ListState<ManifestCommittable> streamingCommitterState;
 
-    private final SerializableSupplier<Committer> committerFactory;
+    private final SerializableFunction<String, Committer> committerFactory;
 
     /**
      * Aggregate committables to global committables and commit the global committables to the
@@ -74,9 +80,11 @@ public class CommitterOperator extends AbstractStreamOperator<Committable>
     private Committer committer;
 
     public CommitterOperator(
-            SerializableSupplier<Committer> committerFactory,
+            boolean checkpointEnabled,
+            SerializableFunction<String, Committer> committerFactory,
             SerializableSupplier<SimpleVersionedSerializer<ManifestCommittable>>
                     committableSerializer) {
+        this.checkpointEnabled = checkpointEnabled;
         this.committableSerializer = committableSerializer;
         this.committablesPerCheckpoint = new TreeMap<>();
         this.committerFactory = checkNotNull(committerFactory);
@@ -86,11 +94,41 @@ public class CommitterOperator extends AbstractStreamOperator<Committable>
     @Override
     public void initializeState(StateInitializationContext context) throws Exception {
         super.initializeState(context);
-        committer = committerFactory.get();
+
+        // commit user name state of this job
+        // each job can only have one user name and this name must be consistent across restarts
+        ListState<String> commitUserState =
+                context.getOperatorStateStore()
+                        .getListState(new ListStateDescriptor<>("commit_user_state", String.class));
+        List<String> commitUsers = new ArrayList<>();
+        commitUserState.get().forEach(commitUsers::add);
+        if (context.isRestored()) {
+            Preconditions.checkState(
+                    commitUsers.size() == 1,
+                    "Expecting 1 commit user name when recovering from checkpoint but found "
+                            + commitUsers.size()
+                            + ". This is unexpected.");
+        } else {
+            Preconditions.checkState(
+                    commitUsers.isEmpty(),
+                    "Expecting 0 commit user name for a fresh sink state but found "
+                            + commitUsers.size()
+                            + ". This is unexpected.");
+            String commitUser = UUID.randomUUID().toString();
+            commitUserState.add(commitUser);
+            commitUsers.add(commitUser);
+        }
+        // we cannot use job id as commit user name here because user may change job id by creating
+        // a savepoint, stop the job and then resume from savepoint
+        committer = committerFactory.apply(commitUsers.get(0));
+
         streamingCommitterState =
                 new SimpleVersionedListState<>(
                         context.getOperatorStateStore()
-                                .getListState(STREAMING_COMMITTER_RAW_STATES_DESC),
+                                .getListState(
+                                        new ListStateDescriptor<>(
+                                                "streaming_committer_raw_states",
+                                                BytePrimitiveArraySerializer.INSTANCE)),
                         committableSerializer.get());
         List<ManifestCommittable> restored = new ArrayList<>();
         streamingCommitterState.get().forEach(restored::add);
@@ -127,13 +165,10 @@ public class CommitterOperator extends AbstractStreamOperator<Committable>
 
     @Override
     public void endInput() throws Exception {
-        // Suppose the last checkpoint before endInput is 5. Flink Streaming Job calling order:
-        // 1. Receives elements from upstream prepareSnapshotPreBarrier(5)
-        // 2. this.snapshotState(5)
-        // 3. Receives elements from upstream endInput
-        // 4. this.endInput
-        // 5. this.notifyCheckpointComplete(5)
-        // So we should submit all the data in the endInput in order to avoid disordered commits.
+        if (checkpointEnabled) {
+            return;
+        }
+
         long checkpointId = Long.MAX_VALUE;
         List<Committable> poll = pollInputs();
         if (!poll.isEmpty()) {
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java
index fa3cfeb4..b0272d95 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.store.connector.sink;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.transformations.PartitionTransformation;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.data.RowData;
@@ -99,15 +100,17 @@ public class FlinkSinkBuilder {
             partitioned.setParallelism(parallelism);
         }
 
+        StreamExecutionEnvironment env = input.getExecutionEnvironment();
         StoreSink sink =
                 new StoreSink(
                         tableIdentifier,
                         table,
+                        env.getCheckpointConfig().isCheckpointingEnabled(),
                         conf.get(FlinkConnectorOptions.COMPACTION_MANUAL_TRIGGERED),
                         getCompactPartSpec(),
                         lockFactory,
                         overwritePartition,
                         logSinkFunction);
-        return sink.sinkTo(new DataStream<>(input.getExecutionEnvironment(), partitioned));
+        return sink.sinkTo(new DataStream<>(env, partitioned));
     }
 }
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
index 3a8ed3a3..0f80e62a 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
@@ -49,6 +49,8 @@ public class StoreSink implements Serializable {
 
     private final FileStoreTable table;
 
+    private final boolean checkpointEnabled;
+
     private final boolean compactionTask;
 
     @Nullable private final Map<String, String> compactPartitionSpec;
@@ -62,6 +64,7 @@ public class StoreSink implements Serializable {
     public StoreSink(
             ObjectIdentifier tableIdentifier,
             FileStoreTable table,
+            boolean checkpointEnabled,
             boolean compactionTask,
             @Nullable Map<String, String> compactPartitionSpec,
             @Nullable CatalogLock.Factory lockFactory,
@@ -69,6 +72,7 @@ public class StoreSink implements Serializable {
             @Nullable LogSinkFunction logSinkFunction) {
         this.tableIdentifier = tableIdentifier;
         this.table = table;
+        this.checkpointEnabled = checkpointEnabled;
         this.compactionTask = compactionTask;
         this.compactPartitionSpec = compactPartitionSpec;
         this.lockFactory = lockFactory;
@@ -83,7 +87,7 @@ public class StoreSink implements Serializable {
         return new StoreWriteOperator(table, overwritePartition, logSinkFunction);
     }
 
-    private StoreCommitter createCommitter() {
+    private StoreCommitter createCommitter(String user) {
         CatalogLock catalogLock;
         Lock lock;
         if (lockFactory == null) {
@@ -104,7 +108,7 @@ public class StoreSink implements Serializable {
         }
 
         return new StoreCommitter(
-                table.newCommit().withOverwritePartition(overwritePartition).withLock(lock),
+                table.newCommit(user).withOverwritePartition(overwritePartition).withLock(lock),
                 catalogLock);
     }
 
@@ -119,7 +123,9 @@ public class StoreSink implements Serializable {
                                 GLOBAL_COMMITTER_NAME,
                                 typeInfo,
                                 new CommitterOperator(
-                                        this::createCommitter, ManifestCommittableSerializer::new))
+                                        checkpointEnabled,
+                                        this::createCommitter,
+                                        ManifestCommittableSerializer::new))
                         .setParallelism(1)
                         .setMaxParallelism(1);
         return committed.addSink(new DiscardingSink<>()).name("end").setParallelism(1);
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
index 1eb0596c..180fef10 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
@@ -66,6 +66,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -390,8 +391,11 @@ public class FileStoreITCase extends AbstractTestBase {
         if (noFail) {
             options.set(PATH, folder.toURI().toString());
         } else {
-            FailingAtomicRenameFileSystem.get().reset(3, 100);
-            options.set(PATH, FailingAtomicRenameFileSystem.getFailingPath(folder.getPath()));
+            String failingName = UUID.randomUUID().toString();
+            FailingAtomicRenameFileSystem.reset(failingName, 3, 100);
+            options.set(
+                    PATH,
+                    FailingAtomicRenameFileSystem.getFailingPath(failingName, folder.getPath()));
         }
         options.set(FILE_FORMAT, "avro");
         return options;
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/SinkSavepointITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/SinkSavepointITCase.java
new file mode 100644
index 00000000..e2ab0782
--- /dev/null
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/SinkSavepointITCase.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.StateBackendOptions;
+import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointStoppingException;
+import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/** IT cases for {@link StoreSink} when writing file store and with savepoints. */
+public class SinkSavepointITCase extends AbstractTestBase {
+
+    private String path;
+    private String failingName;
+
+    @Before
+    public void before() throws Exception {
+        path = TEMPORARY_FOLDER.newFolder().toPath().toString();
+        // for failure tests
+        failingName = UUID.randomUUID().toString();
+        FailingAtomicRenameFileSystem.reset(failingName, 100, 500);
+    }
+
+    @Test(timeout = 60000)
+    public void testRecoverFromSavepoint() throws Exception {
+        String failingPath = FailingAtomicRenameFileSystem.getFailingPath(failingName, path);
+        String savepointPath = null;
+        JobID jobId;
+        ClusterClient<?> client = MINI_CLUSTER_RESOURCE.getClusterClient();
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+
+        OUTER:
+        while (true) {
+            // start a new job or recover from savepoint
+            jobId = runRecoverFromSavepointJob(failingPath, savepointPath);
+            while (true) {
+                // wait for a random number of time before stopping with savepoint
+                Thread.sleep(random.nextInt(5000));
+                if (client.getJobStatus(jobId).get() == JobStatus.FINISHED) {
+                    // job finished, check for result
+                    break OUTER;
+                }
+                try {
+                    // try to stop with savepoint
+                    savepointPath =
+                            client.stopWithSavepoint(
+                                            jobId,
+                                            false,
+                                            path + "/savepoint",
+                                            SavepointFormatType.DEFAULT)
+                                    .get();
+                    break;
+                } catch (Exception e) {
+                    Optional<StopWithSavepointStoppingException> t =
+                            ExceptionUtils.findThrowable(
+                                    e, StopWithSavepointStoppingException.class);
+                    if (t.isPresent()) {
+                        // savepoint has been created but notifyCheckpointComplete is not called
+                        //
+                        // user should follow the exception message and recover job from the
+                        // specific savepoint
+                        savepointPath = t.get().getSavepointPath();
+                        break;
+                    }
+                    // savepoint creation may fail due to various reasons (for example the job is in
+                    // failing state, or the job has finished), just wait for a while and try again
+                }
+            }
+            // wait for job to stop
+            while (!client.getJobStatus(jobId).get().isGloballyTerminalState()) {
+                Thread.sleep(1000);
+            }
+            // recover from savepoint in the next round
+        }
+
+        checkRecoverFromSavepointResult(failingPath);
+    }
+
+    private JobID runRecoverFromSavepointJob(String failingPath, String savepointPath)
+            throws Exception {
+        Configuration conf = new Configuration();
+        if (savepointPath != null) {
+            SavepointRestoreSettings savepointRestoreSettings =
+                    SavepointRestoreSettings.forPath(savepointPath, false);
+            SavepointRestoreSettings.toConfiguration(savepointRestoreSettings, conf);
+        }
+
+        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
+        tEnv.getConfig()
+                .getConfiguration()
+                .set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofMillis(500));
+        tEnv.getConfig().getConfiguration().set(StateBackendOptions.STATE_BACKEND, "filesystem");
+        tEnv.getConfig()
+                .getConfiguration()
+                .set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "file://" + path + "/checkpoint");
+        tEnv.getConfig()
+                .getConfiguration()
+                .set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2);
+        // we're creating multiple table environments in the same process
+        // if we do not set this option, stream node id will be different even with the same SQL
+        // if stream node id is different then we can't recover from savepoint
+        tEnv.getConfig()
+                .getConfiguration()
+                .set(ExecutionConfigOptions.TABLE_EXEC_LEGACY_TRANSFORMATION_UIDS, true);
+
+        tEnv.executeSql(
+                String.join(
+                        "\n",
+                        "CREATE TABLE S (",
+                        "  a INT",
+                        ") WITH (",
+                        "  'connector' = 'datagen',",
+                        "  'rows-per-second' = '10000',",
+                        "  'fields.a.kind' = 'sequence',",
+                        "  'fields.a.start' = '0',",
+                        "  'fields.a.end' = '99999'",
+                        ")"));
+
+        String createCatalogSql =
+                String.join(
+                        "\n",
+                        "CREATE CATALOG my_catalog WITH (",
+                        "  'type' = 'table-store',",
+                        "  'warehouse' = '" + failingPath + "'",
+                        ")");
+        FailingAtomicRenameFileSystem.retryArtificialException(
+                () -> tEnv.executeSql(createCatalogSql));
+
+        tEnv.executeSql("USE CATALOG my_catalog");
+
+        String createSinkSql =
+                String.join(
+                        "\n",
+                        "CREATE TABLE IF NOT EXISTS T (",
+                        "  a INT",
+                        ") WITH (",
+                        "  'bucket' = '2',",
+                        "  'file.format' = 'avro'",
+                        ")");
+        FailingAtomicRenameFileSystem.retryArtificialException(
+                () -> tEnv.executeSql(createSinkSql));
+
+        String insertIntoSql = "INSERT INTO T SELECT * FROM default_catalog.default_database.S";
+        JobID jobId =
+                FailingAtomicRenameFileSystem.retryArtificialException(
+                                () -> tEnv.executeSql(insertIntoSql))
+                        .getJobClient()
+                        .get()
+                        .getJobID();
+
+        ClusterClient<?> client = MINI_CLUSTER_RESOURCE.getClusterClient();
+        while (client.getJobStatus(jobId).get() == JobStatus.INITIALIZING) {
+            Thread.sleep(1000);
+        }
+        return jobId;
+    }
+
+    private void checkRecoverFromSavepointResult(String failingPath) throws Exception {
+        EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
+        TableEnvironment tEnv = TableEnvironment.create(settings);
+        // no failure should occur when checking for answer
+        FailingAtomicRenameFileSystem.reset(failingName, 0, 1);
+
+        String createCatalogSql =
+                String.join(
+                        "\n",
+                        "CREATE CATALOG my_catalog WITH (",
+                        "  'type' = 'table-store',",
+                        "  'warehouse' = '" + failingPath + "'",
+                        ")");
+        tEnv.executeSql(createCatalogSql);
+
+        tEnv.executeSql("USE CATALOG my_catalog");
+
+        List<Integer> actual = new ArrayList<>();
+        try (CloseableIterator<Row> it = tEnv.executeSql("SELECT * FROM T").collect()) {
+            while (it.hasNext()) {
+                Row row = it.next();
+                Assert.assertEquals(1, row.getArity());
+                actual.add((Integer) row.getField(0));
+            }
+        }
+        Collections.sort(actual);
+        Assert.assertEquals(
+                IntStream.range(0, 100000).boxed().collect(Collectors.toList()), actual);
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AbstractFileStore.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AbstractFileStore.java
index ff943a19..5773aa7a 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AbstractFileStore.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AbstractFileStore.java
@@ -39,19 +39,16 @@ public abstract class AbstractFileStore<T> implements FileStore<T> {
     protected final SchemaManager schemaManager;
     protected final long schemaId;
     protected final CoreOptions options;
-    protected final String user;
     protected final RowType partitionType;
 
     public AbstractFileStore(
             SchemaManager schemaManager,
             long schemaId,
             CoreOptions options,
-            String user,
             RowType partitionType) {
         this.schemaManager = schemaManager;
         this.schemaId = schemaId;
         this.options = options;
-        this.user = user;
         this.partitionType = partitionType;
     }
 
@@ -94,7 +91,7 @@ public abstract class AbstractFileStore<T> implements FileStore<T> {
     }
 
     @Override
-    public FileStoreCommitImpl newCommit() {
+    public FileStoreCommitImpl newCommit(String user) {
         return new FileStoreCommitImpl(
                 schemaId,
                 user,
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java
index 897ab2dc..40d4203e 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java
@@ -35,10 +35,9 @@ public class AppendOnlyFileStore extends AbstractFileStore<RowData> {
             SchemaManager schemaManager,
             long schemaId,
             CoreOptions options,
-            String user,
             RowType partitionType,
             RowType rowType) {
-        super(schemaManager, schemaId, options, user, partitionType);
+        super(schemaManager, schemaId, options, partitionType);
         this.rowType = rowType;
     }
 
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStore.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStore.java
index 9df275aa..97f390f7 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStore.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStore.java
@@ -45,7 +45,7 @@ public interface FileStore<T> extends Serializable {
 
     FileStoreWrite<T> newWrite();
 
-    FileStoreCommit newCommit();
+    FileStoreCommit newCommit(String user);
 
     FileStoreExpire newExpire();
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
index 9443ddd2..58b43685 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
@@ -45,12 +45,11 @@ public class KeyValueFileStore extends AbstractFileStore<KeyValue> {
             SchemaManager schemaManager,
             long schemaId,
             CoreOptions options,
-            String user,
             RowType partitionType,
             RowType keyType,
             RowType valueType,
             MergeFunction mergeFunction) {
-        super(schemaManager, schemaId, options, user, partitionType);
+        super(schemaManager, schemaId, options, partitionType);
         this.keyType = keyType;
         this.valueType = valueType;
         this.mergeFunction = mergeFunction;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AbstractFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AbstractFileStoreTable.java
index 0897416e..61ee3e50 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AbstractFileStoreTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AbstractFileStoreTable.java
@@ -56,8 +56,8 @@ public abstract class AbstractFileStoreTable implements FileStoreTable {
     }
 
     @Override
-    public TableCommit newCommit() {
-        return new TableCommit(store().newCommit(), store().newExpire());
+    public TableCommit newCommit(String user) {
+        return new TableCommit(store().newCommit(user), store().newExpire());
     }
 
     @Override
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
index 103dc3a9..4d343810 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
@@ -52,15 +52,13 @@ public class AppendOnlyFileStoreTable extends AbstractFileStoreTable {
 
     private final AppendOnlyFileStore store;
 
-    AppendOnlyFileStoreTable(
-            Path path, SchemaManager schemaManager, TableSchema tableSchema, String user) {
+    AppendOnlyFileStoreTable(Path path, SchemaManager schemaManager, TableSchema tableSchema) {
         super(path, tableSchema);
         this.store =
                 new AppendOnlyFileStore(
                         schemaManager,
                         tableSchema.id(),
                         new CoreOptions(tableSchema.options()),
-                        user,
                         tableSchema.logicalPartitionType(),
                         tableSchema.logicalRowType());
     }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
index 2830700f..855602f5 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
@@ -57,7 +57,7 @@ public class ChangelogValueCountFileStoreTable extends AbstractFileStoreTable {
     private final KeyValueFileStore store;
 
     ChangelogValueCountFileStoreTable(
-            Path path, SchemaManager schemaManager, TableSchema tableSchema, String user) {
+            Path path, SchemaManager schemaManager, TableSchema tableSchema) {
         super(path, tableSchema);
         RowType countType =
                 RowType.of(
@@ -68,7 +68,6 @@ public class ChangelogValueCountFileStoreTable extends AbstractFileStoreTable {
                         schemaManager,
                         tableSchema.id(),
                         new CoreOptions(tableSchema.options()),
-                        user,
                         tableSchema.logicalPartitionType(),
                         tableSchema.logicalRowType(),
                         countType,
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
index 7446cf9a..424a1dfb 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
@@ -63,7 +63,7 @@ public class ChangelogWithKeyFileStoreTable extends AbstractFileStoreTable {
     private final KeyValueFileStore store;
 
     ChangelogWithKeyFileStoreTable(
-            Path path, SchemaManager schemaManager, TableSchema tableSchema, String user) {
+            Path path, SchemaManager schemaManager, TableSchema tableSchema) {
         super(path, tableSchema);
         RowType rowType = tableSchema.logicalRowType();
 
@@ -104,7 +104,6 @@ public class ChangelogWithKeyFileStoreTable extends AbstractFileStoreTable {
                         schemaManager,
                         tableSchema.id(),
                         new CoreOptions(conf),
-                        user,
                         tableSchema.logicalPartitionType(),
                         keyType,
                         rowType,
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
index 82074f32..afaa7508 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
@@ -47,7 +47,7 @@ public interface FileStoreTable extends Serializable {
 
     TableWrite newWrite();
 
-    TableCommit newCommit();
+    TableCommit newCommit(String user);
 
     TableCompact newCompact();
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTableFactory.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTableFactory.java
index 440222f8..f9bb7881 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTableFactory.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTableFactory.java
@@ -25,8 +25,6 @@ import org.apache.flink.table.store.file.WriteMode;
 import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.schema.TableSchema;
 
-import java.util.UUID;
-
 import static org.apache.flink.table.store.CoreOptions.PATH;
 
 /** Factory to create {@link FileStoreTable}. */
@@ -39,10 +37,6 @@ public class FileStoreTableFactory {
     }
 
     public static FileStoreTable create(Configuration conf) {
-        return create(conf, UUID.randomUUID().toString());
-    }
-
-    public static FileStoreTable create(Configuration conf, String user) {
         Path tablePath = CoreOptions.path(conf);
         TableSchema tableSchema =
                 new SchemaManager(tablePath)
@@ -53,15 +47,15 @@ public class FileStoreTableFactory {
                                                 "Schema file not found in location "
                                                         + tablePath
                                                         + ". Please create table first."));
-        return create(tablePath, tableSchema, conf, user);
+        return create(tablePath, tableSchema, conf);
     }
 
     public static FileStoreTable create(Path tablePath, TableSchema tableSchema) {
-        return create(tablePath, tableSchema, new Configuration(), UUID.randomUUID().toString());
+        return create(tablePath, tableSchema, new Configuration());
     }
 
     public static FileStoreTable create(
-            Path tablePath, TableSchema tableSchema, Configuration dynamicOptions, String user) {
+            Path tablePath, TableSchema tableSchema, Configuration dynamicOptions) {
         // merge dynamic options into schema.options
         Configuration newOptions = Configuration.fromMap(tableSchema.options());
         dynamicOptions.toMap().forEach(newOptions::setString);
@@ -70,14 +64,12 @@ public class FileStoreTableFactory {
 
         SchemaManager schemaManager = new SchemaManager(tablePath);
         if (newOptions.get(CoreOptions.WRITE_MODE) == WriteMode.APPEND_ONLY) {
-            return new AppendOnlyFileStoreTable(tablePath, schemaManager, tableSchema, user);
+            return new AppendOnlyFileStoreTable(tablePath, schemaManager, tableSchema);
         } else {
             if (tableSchema.primaryKeys().isEmpty()) {
-                return new ChangelogValueCountFileStoreTable(
-                        tablePath, schemaManager, tableSchema, user);
+                return new ChangelogValueCountFileStoreTable(tablePath, schemaManager, tableSchema);
             } else {
-                return new ChangelogWithKeyFileStoreTable(
-                        tablePath, schemaManager, tableSchema, user);
+                return new ChangelogWithKeyFileStoreTable(tablePath, schemaManager, tableSchema);
             }
         }
     }
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
index a7ff2a86..21ed64a7 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
@@ -82,6 +82,7 @@ public class TestFileStore extends KeyValueFileStore {
     private final String root;
     private final RowDataSerializer keySerializer;
     private final RowDataSerializer valueSerializer;
+    private final String user;
 
     public static TestFileStore create(
             String format,
@@ -121,7 +122,6 @@ public class TestFileStore extends KeyValueFileStore {
                 new SchemaManager(options.path()),
                 0L,
                 options,
-                UUID.randomUUID().toString(),
                 partitionType,
                 keyType,
                 valueType,
@@ -129,6 +129,7 @@ public class TestFileStore extends KeyValueFileStore {
         this.root = root;
         this.keySerializer = new RowDataSerializer(keyType);
         this.valueSerializer = new RowDataSerializer(valueType);
+        this.user = UUID.randomUUID().toString();
     }
 
     public FileStoreExpireImpl newExpire(
@@ -220,7 +221,7 @@ public class TestFileStore extends KeyValueFileStore {
                     .write(kv);
         }
 
-        FileStoreCommit commit = newCommit();
+        FileStoreCommit commit = newCommit(user);
         ManifestCommittable committable =
                 new ManifestCommittable(String.valueOf(new Random().nextLong()));
         for (Map.Entry<BinaryRowData, Map<Integer, RecordWriter<KeyValue>>> entryWithPartition :
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTest.java
index 81694a3f..6c851525 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTest.java
@@ -49,6 +49,7 @@ import org.junit.jupiter.api.io.TempDir;
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.function.Function;
 
@@ -100,11 +101,14 @@ public class DataFileTest {
 
     @RepeatedTest(10)
     public void testCleanUpForException() throws IOException {
-        FailingAtomicRenameFileSystem.get().reset(1, 10);
+        String failingName = UUID.randomUUID().toString();
+        FailingAtomicRenameFileSystem.reset(failingName, 1, 10);
         DataFileTestDataGenerator.Data data = gen.next();
         DataFileWriter writer =
                 createDataFileWriter(
-                        FailingAtomicRenameFileSystem.getFailingPath(tempDir.toString()), "avro");
+                        FailingAtomicRenameFileSystem.getFailingPath(
+                                failingName, tempDir.toString()),
+                        "avro");
 
         try {
             writer.write(CloseableIterator.fromList(data.content, kv -> {}), 0);
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
index 8bb8194f..2ebdaf40 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
@@ -44,6 +44,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.TreeSet;
+import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -107,12 +108,14 @@ public class ManifestFileMetaTest {
 
     @RepeatedTest(10)
     public void testCleanUpForException() throws IOException {
-        FailingAtomicRenameFileSystem.get().reset(1, 10);
+        String failingName = UUID.randomUUID().toString();
+        FailingAtomicRenameFileSystem.reset(failingName, 1, 10);
         List<ManifestFileMeta> input = new ArrayList<>();
         createData(ThreadLocalRandom.current().nextInt(5), input, null);
         ManifestFile failingManifestFile =
                 createManifestFile(
-                        FailingAtomicRenameFileSystem.getFailingPath(tempDir.toString()));
+                        FailingAtomicRenameFileSystem.getFailingPath(
+                                failingName, tempDir.toString()));
 
         try {
             ManifestFileMeta.merge(input, failingManifestFile, 500, 3);
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java
index 98e83fdb..d176ecc4 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java
@@ -34,6 +34,7 @@ import org.junit.jupiter.api.io.TempDir;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.Collectors;
 
@@ -65,11 +66,13 @@ public class ManifestFileTest {
 
     @RepeatedTest(10)
     public void testCleanUpForException() throws IOException {
-        FailingAtomicRenameFileSystem.get().reset(1, 10);
+        String failingName = UUID.randomUUID().toString();
+        FailingAtomicRenameFileSystem.reset(failingName, 1, 10);
         List<ManifestEntry> entries = generateData();
         ManifestFile manifestFile =
                 createManifestFile(
-                        FailingAtomicRenameFileSystem.getFailingPath(tempDir.toString()));
+                        FailingAtomicRenameFileSystem.getFailingPath(
+                                failingName, tempDir.toString()));
 
         try {
             manifestFile.write(entries);
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestListTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestListTest.java
index 95bfca51..c815be6d 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestListTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestListTest.java
@@ -34,6 +34,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
+import java.util.UUID;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -57,11 +58,13 @@ public class ManifestListTest {
 
     @RepeatedTest(10)
     public void testCleanUpForException() throws IOException {
-        FailingAtomicRenameFileSystem.get().reset(1, 3);
+        String failingName = UUID.randomUUID().toString();
+        FailingAtomicRenameFileSystem.reset(failingName, 1, 3);
         List<ManifestFileMeta> metas = generateData();
         ManifestList manifestList =
                 createManifestList(
-                        FailingAtomicRenameFileSystem.getFailingPath(tempDir.toString()));
+                        FailingAtomicRenameFileSystem.getFailingPath(
+                                failingName, tempDir.toString()));
 
         try {
             manifestList.write(metas);
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
index d75949cf..169dbf18 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
@@ -46,6 +46,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
@@ -58,13 +59,15 @@ public class FileStoreCommitTest {
     private static final Logger LOG = LoggerFactory.getLogger(FileStoreCommitTest.class);
 
     private TestKeyValueGenerator gen;
+    private String failingName;
     @TempDir java.nio.file.Path tempDir;
 
     @BeforeEach
     public void beforeEach() {
         gen = new TestKeyValueGenerator();
         // for failure tests
-        FailingAtomicRenameFileSystem.get().reset(100, 5000);
+        failingName = UUID.randomUUID().toString();
+        FailingAtomicRenameFileSystem.reset(failingName, 100, 100);
     }
 
     @ParameterizedTest
@@ -105,7 +108,7 @@ public class FileStoreCommitTest {
         Path firstSnapshotPath = snapshotManager.snapshotPath(Snapshot.FIRST_SNAPSHOT_ID);
         FileUtils.deleteOrWarn(firstSnapshotPath);
         // this test succeeds if this call does not fail
-        store.newCommit()
+        store.newCommit(UUID.randomUUID().toString())
                 .filterCommitted(Collections.singletonList(new ManifestCommittable("dummy")));
     }
 
@@ -286,7 +289,8 @@ public class FileStoreCommitTest {
     private TestFileStore createStore(boolean failing, int numBucket) {
         String root =
                 failing
-                        ? FailingAtomicRenameFileSystem.getFailingPath(tempDir.toString())
+                        ? FailingAtomicRenameFileSystem.getFailingPath(
+                                failingName, tempDir.toString())
                         : TestAtomicRenameFileSystem.SCHEME + "://" + tempDir.toString();
         return TestFileStore.create(
                 "avro",
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
index 3c927599..fe1b7d7e 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
@@ -35,6 +35,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadLocalRandom;
@@ -64,7 +65,7 @@ public class TestCommitThread extends Thread {
         this.writers = new HashMap<>();
 
         this.write = safeStore.newWrite();
-        this.commit = testStore.newCommit();
+        this.commit = testStore.newCommit(UUID.randomUUID().toString());
     }
 
     public Map<BinaryRowData, List<KeyValue>> getResult() {
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaManagerTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaManagerTest.java
index dc86ac95..59f3b959 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaManagerTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaManagerTest.java
@@ -40,6 +40,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -65,8 +66,9 @@ public class SchemaManagerTest {
     @BeforeEach
     public void beforeEach() throws IOException {
         // for failure tests
-        FailingAtomicRenameFileSystem.get().reset(100, 5000);
-        String root = FailingAtomicRenameFileSystem.getFailingPath(tempDir.toString());
+        String failingName = UUID.randomUUID().toString();
+        FailingAtomicRenameFileSystem.reset(failingName, 100, 100);
+        String root = FailingAtomicRenameFileSystem.getFailingPath(failingName, tempDir.toString());
         manager = new SchemaManager(new Path(root));
     }
 
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FailingAtomicRenameFileSystem.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FailingAtomicRenameFileSystem.java
index 8e1b4ccd..b922423f 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FailingAtomicRenameFileSystem.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FailingAtomicRenameFileSystem.java
@@ -18,15 +18,21 @@
 
 package org.apache.flink.table.store.file.utils;
 
+import org.apache.flink.core.fs.BlockLocation;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FSDataInputStreamWrapper;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FSDataOutputStreamWrapper;
+import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.core.fs.LocatedFileStatus;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.local.LocalBlockLocation;
 import org.apache.flink.util.ExceptionUtils;
 
+import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URI;
 import java.util.concurrent.Callable;
@@ -41,29 +47,30 @@ public class FailingAtomicRenameFileSystem extends TestAtomicRenameFileSystem {
 
     public static final String SCHEME = "fail";
 
-    private final String threadName;
+    private final String name;
     private final AtomicInteger failCounter = new AtomicInteger();
     private int failPossibility;
 
-    public FailingAtomicRenameFileSystem(String threadName) {
-        this.threadName = threadName;
+    private FailingAtomicRenameFileSystem(String name) {
+        this.name = name;
     }
 
-    public static FailingAtomicRenameFileSystem get() {
+    public static String getFailingPath(String name, String path) {
+        // set authority as given name so that different tests use different instances
+        // for more information see FileSystem#getUnguardedFileSystem for the caching strategy
+        return SCHEME + "://" + name + path;
+    }
+
+    public static void reset(String name, int maxFails, int failPossibility) {
         try {
-            return (FailingAtomicRenameFileSystem) new Path(getFailingPath("/")).getFileSystem();
+            ((FailingAtomicRenameFileSystem) new Path(getFailingPath(name, "/")).getFileSystem())
+                    .reset(maxFails, failPossibility);
         } catch (IOException e) {
             throw new RuntimeException(e);
         }
     }
 
-    public static String getFailingPath(String path) {
-        // set authority as thread name so that different testing threads use different instances
-        // for more information see FileSystem#getUnguardedFileSystem for the caching strategy
-        return SCHEME + "://" + Thread.currentThread().getName() + path;
-    }
-
-    public void reset(int maxFails, int failPossibility) {
+    private void reset(int maxFails, int failPossibility) {
         failCounter.set(maxFails);
         this.failPossibility = failPossibility;
     }
@@ -86,7 +93,22 @@ public class FailingAtomicRenameFileSystem extends TestAtomicRenameFileSystem {
 
     @Override
     public URI getUri() {
-        return URI.create(SCHEME + "://" + threadName + "/");
+        return URI.create(SCHEME + "://" + name + "/");
+    }
+
+    @Override
+    public FileStatus getFileStatus(Path path) throws IOException {
+        File file = this.pathToFile(path);
+        if (file.exists()) {
+            return new FailingLocalFileStatus(file, path);
+        } else {
+            throw new FileNotFoundException(
+                    "File "
+                            + path
+                            + " does not exist or the user running Flink ('"
+                            + System.getProperty("user.name")
+                            + "') has insufficient permissions to access it.");
+        }
     }
 
     /** {@link FileSystemFactory} for {@link FailingAtomicRenameFileSystem}. */
@@ -117,18 +139,11 @@ public class FailingAtomicRenameFileSystem extends TestAtomicRenameFileSystem {
             super(inputStream);
         }
 
-        @Override
-        public int read() throws IOException {
-            if (ThreadLocalRandom.current().nextInt(failPossibility) == 0
-                    && failCounter.getAndDecrement() > 0) {
-                throw new ArtificialException();
-            }
-            return super.read();
-        }
-
         @Override
         public int read(byte[] b, int off, int len) throws IOException {
-            if (ThreadLocalRandom.current().nextInt(failPossibility) == 0
+            // only fail when reading more than 1 byte so that we won't fail too often
+            if (b.length > 1
+                    && ThreadLocalRandom.current().nextInt(failPossibility) == 0
                     && failCounter.getAndDecrement() > 0) {
                 throw new ArtificialException();
             }
@@ -143,21 +158,72 @@ public class FailingAtomicRenameFileSystem extends TestAtomicRenameFileSystem {
         }
 
         @Override
-        public void write(int b) throws IOException {
-            if (ThreadLocalRandom.current().nextInt(failPossibility) == 0
+        public void write(byte[] b, int off, int len) throws IOException {
+            // only fail when writing more than 1 byte so that we won't fail too often
+            if (b.length > 1
+                    && ThreadLocalRandom.current().nextInt(failPossibility) == 0
                     && failCounter.getAndDecrement() > 0) {
                 throw new ArtificialException();
             }
-            super.write(b);
+            super.write(b, off, len);
+        }
+    }
+
+    private static class FailingLocalFileStatus implements LocatedFileStatus {
+
+        private final File file;
+        private final Path path;
+        private final long len;
+
+        private FailingLocalFileStatus(File file, Path path) {
+            this.file = file;
+            this.path = path;
+            this.len = file.length();
         }
 
         @Override
-        public void write(byte[] b, int off, int len) throws IOException {
-            if (ThreadLocalRandom.current().nextInt(failPossibility) == 0
-                    && failCounter.getAndDecrement() > 0) {
-                throw new ArtificialException();
-            }
-            super.write(b, off, len);
+        public BlockLocation[] getBlockLocations() {
+            return new BlockLocation[] {new LocalBlockLocation(len)};
+        }
+
+        @Override
+        public long getLen() {
+            return len;
+        }
+
+        @Override
+        public long getBlockSize() {
+            return len;
+        }
+
+        @Override
+        public short getReplication() {
+            return 1;
+        }
+
+        @Override
+        public long getModificationTime() {
+            return file.lastModified();
+        }
+
+        @Override
+        public long getAccessTime() {
+            return 0;
+        }
+
+        @Override
+        public boolean isDir() {
+            return file.isDirectory();
+        }
+
+        @Override
+        public Path getPath() {
+            return path;
+        }
+
+        @Override
+        public String toString() {
+            return "FailingLocalFileStatus{file=" + this.file + ", path=" + this.path + '}';
         }
     }
 
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
index 67778c2d..b6bfd6bf 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.table.store.file.predicate.PredicateBuilder;
 import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.store.file.schema.UpdateSchema;
+import org.apache.flink.table.store.table.sink.TableCommit;
 import org.apache.flink.table.store.table.sink.TableWrite;
 import org.apache.flink.table.store.table.source.Split;
 import org.apache.flink.table.store.table.source.TableRead;
@@ -140,21 +141,22 @@ public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase {
     private void writeData() throws Exception {
         FileStoreTable table = createFileStoreTable();
         TableWrite write = table.newWrite();
+        TableCommit commit = table.newCommit("user");
 
         write.write(GenericRowData.of(1, 10, 100L));
         write.write(GenericRowData.of(2, 20, 200L));
         write.write(GenericRowData.of(1, 11, 101L));
-        table.newCommit().commit("0", write.prepareCommit());
+        commit.commit("0", write.prepareCommit());
 
         write.write(GenericRowData.of(1, 12, 102L));
         write.write(GenericRowData.of(2, 21, 201L));
         write.write(GenericRowData.of(2, 22, 202L));
-        table.newCommit().commit("1", write.prepareCommit());
+        commit.commit("1", write.prepareCommit());
 
         write.write(GenericRowData.of(1, 11, 101L));
         write.write(GenericRowData.of(2, 21, 201L));
         write.write(GenericRowData.of(1, 12, 102L));
-        table.newCommit().commit("2", write.prepareCommit());
+        commit.commit("2", write.prepareCommit());
 
         write.close();
     }
@@ -175,6 +177,6 @@ public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase {
                                 Collections.emptyList(),
                                 conf.toMap(),
                                 ""));
-        return new AppendOnlyFileStoreTable(tablePath, schemaManager, tableSchema, "user");
+        return new AppendOnlyFileStoreTable(tablePath, schemaManager, tableSchema);
     }
 }
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
index 7e5422dc..e5169555 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.table.store.file.predicate.PredicateBuilder;
 import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.store.file.schema.UpdateSchema;
+import org.apache.flink.table.store.table.sink.TableCommit;
 import org.apache.flink.table.store.table.sink.TableWrite;
 import org.apache.flink.table.store.table.source.Split;
 import org.apache.flink.table.store.table.source.TableRead;
@@ -140,24 +141,25 @@ public class ChangelogValueCountFileStoreTableTest extends FileStoreTableTestBas
     private void writeData() throws Exception {
         FileStoreTable table = createFileStoreTable();
         TableWrite write = table.newWrite();
+        TableCommit commit = table.newCommit("user");
 
         write.write(GenericRowData.of(1, 10, 100L));
         write.write(GenericRowData.of(2, 20, 200L));
         write.write(GenericRowData.of(1, 11, 101L));
-        table.newCommit().commit("0", write.prepareCommit());
+        commit.commit("0", write.prepareCommit());
 
         write.write(GenericRowData.of(2, 21, 201L));
         write.write(GenericRowData.of(1, 12, 102L));
         write.write(GenericRowData.of(2, 21, 201L));
         write.write(GenericRowData.of(2, 21, 201L));
-        table.newCommit().commit("1", write.prepareCommit());
+        commit.commit("1", write.prepareCommit());
 
         write.write(GenericRowData.of(1, 11, 101L));
         write.write(GenericRowData.of(2, 22, 202L));
         write.write(GenericRowData.ofKind(RowKind.DELETE, 2, 21, 201L));
         write.write(GenericRowData.ofKind(RowKind.DELETE, 1, 10, 100L));
         write.write(GenericRowData.ofKind(RowKind.DELETE, 2, 21, 201L));
-        table.newCommit().commit("2", write.prepareCommit());
+        commit.commit("2", write.prepareCommit());
 
         write.close();
     }
@@ -178,6 +180,6 @@ public class ChangelogValueCountFileStoreTableTest extends FileStoreTableTestBas
                                 Collections.emptyList(),
                                 conf.toMap(),
                                 ""));
-        return new ChangelogValueCountFileStoreTable(tablePath, schemaManager, tableSchema, "user");
+        return new ChangelogValueCountFileStoreTable(tablePath, schemaManager, tableSchema);
     }
 }
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
index b4ac1eb1..4c3b999d 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.table.store.file.predicate.PredicateBuilder;
 import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.store.file.schema.UpdateSchema;
+import org.apache.flink.table.store.table.sink.TableCommit;
 import org.apache.flink.table.store.table.sink.TableWrite;
 import org.apache.flink.table.store.table.source.Split;
 import org.apache.flink.table.store.table.source.TableRead;
@@ -53,12 +54,13 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
         FileStoreTable table =
                 createFileStoreTable(conf -> conf.set(CoreOptions.SEQUENCE_FIELD, "b"));
         TableWrite write = table.newWrite();
+        TableCommit commit = table.newCommit("user");
         write.write(GenericRowData.of(1, 10, 200L));
         write.write(GenericRowData.of(1, 10, 100L));
         write.write(GenericRowData.of(1, 11, 101L));
-        table.newCommit().commit("0", write.prepareCommit());
+        commit.commit("0", write.prepareCommit());
         write.write(GenericRowData.of(1, 11, 55L));
-        table.newCommit().commit("1", write.prepareCommit());
+        commit.commit("1", write.prepareCommit());
         write.close();
 
         List<Split> splits = table.newScan().plan().splits;
@@ -161,12 +163,13 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
     public void testStreamingChangelog() throws Exception {
         FileStoreTable table = createFileStoreTable(true);
         TableWrite write = table.newWrite();
+        TableCommit commit = table.newCommit("user");
         write.write(GenericRowData.of(1, 10, 100L));
         write.write(GenericRowData.ofKind(RowKind.DELETE, 1, 10, 100L));
         write.write(GenericRowData.of(1, 10, 101L));
         write.write(GenericRowData.ofKind(RowKind.UPDATE_BEFORE, 1, 10, 101L));
         write.write(GenericRowData.ofKind(RowKind.UPDATE_AFTER, 1, 10, 102L));
-        table.newCommit().commit("0", write.prepareCommit());
+        commit.commit("0", write.prepareCommit());
         write.close();
 
         List<Split> splits = table.newScan().withIncremental(true).plan().splits;
@@ -184,23 +187,24 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
     private void writeData() throws Exception {
         FileStoreTable table = createFileStoreTable();
         TableWrite write = table.newWrite();
+        TableCommit commit = table.newCommit("user");
 
         write.write(GenericRowData.of(1, 10, 100L));
         write.write(GenericRowData.of(2, 20, 200L));
         write.write(GenericRowData.of(1, 11, 101L));
-        table.newCommit().commit("0", write.prepareCommit());
+        commit.commit("0", write.prepareCommit());
 
         write.write(GenericRowData.of(1, 10, 1000L));
         write.write(GenericRowData.of(2, 21, 201L));
         write.write(GenericRowData.of(2, 21, 2001L));
-        table.newCommit().commit("1", write.prepareCommit());
+        commit.commit("1", write.prepareCommit());
 
         write.write(GenericRowData.of(1, 11, 1001L));
         write.write(GenericRowData.of(2, 21, 20001L));
         write.write(GenericRowData.of(2, 22, 202L));
         write.write(GenericRowData.ofKind(RowKind.DELETE, 1, 11, 1001L));
         write.write(GenericRowData.ofKind(RowKind.DELETE, 2, 20, 200L));
-        table.newCommit().commit("2", write.prepareCommit());
+        commit.commit("2", write.prepareCommit());
 
         write.close();
     }
@@ -231,6 +235,6 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
                                 Arrays.asList("pt", "a"),
                                 conf.toMap(),
                                 ""));
-        return new ChangelogWithKeyFileStoreTable(tablePath, schemaManager, tableSchema, "user");
+        return new ChangelogWithKeyFileStoreTable(tablePath, schemaManager, tableSchema);
     }
 }
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
index e36a7314..f41de18b 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
@@ -27,6 +27,7 @@ import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader;
 import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader.ReaderSupplier;
 import org.apache.flink.table.store.file.utils.RecordReader;
 import org.apache.flink.table.store.file.utils.RecordReaderIterator;
+import org.apache.flink.table.store.table.sink.TableCommit;
 import org.apache.flink.table.store.table.sink.TableWrite;
 import org.apache.flink.table.store.table.source.Split;
 import org.apache.flink.table.store.table.source.TableRead;
@@ -78,18 +79,18 @@ public abstract class FileStoreTableTestBase {
         FileStoreTable table = createFileStoreTable();
 
         TableWrite write = table.newWrite();
+        TableCommit commit = table.newCommit("user");
         write.write(GenericRowData.of(1, 10, 100L));
         write.write(GenericRowData.of(2, 20, 200L));
-        table.newCommit().commit("0", write.prepareCommit());
+        commit.commit("0", write.prepareCommit());
         write.close();
 
         write = table.newWrite().withOverwrite(true);
+        commit = table.newCommit("user");
         write.write(GenericRowData.of(2, 21, 201L));
         Map<String, String> overwritePartition = new HashMap<>();
         overwritePartition.put("pt", "2");
-        table.newCommit()
-                .withOverwritePartition(overwritePartition)
-                .commit("1", write.prepareCommit());
+        commit.withOverwritePartition(overwritePartition).commit("1", write.prepareCommit());
         write.close();
 
         List<Split> splits = table.newScan().plan().splits;
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
index b9f77bbd..85c65757 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.store.file.schema.UpdateSchema;
 import org.apache.flink.table.store.table.sink.FileCommittable;
+import org.apache.flink.table.store.table.sink.TableCommit;
 import org.apache.flink.table.store.table.sink.TableWrite;
 import org.apache.flink.table.store.table.source.Split;
 import org.apache.flink.table.store.table.source.TableRead;
@@ -62,6 +63,7 @@ public class WritePreemptMemoryTest extends FileStoreTableTestBase {
         // write
         FileStoreTable table = createFileStoreTable();
         TableWrite write = table.newWrite();
+        TableCommit commit = table.newCommit("user");
         Random random = new Random();
         List<String> expected = new ArrayList<>();
         for (int i = 0; i < 10_000; i++) {
@@ -71,7 +73,7 @@ public class WritePreemptMemoryTest extends FileStoreTableTestBase {
             expected.add(BATCH_ROW_TO_STRING.apply(row));
         }
         List<FileCommittable> committables = write.prepareCommit();
-        table.newCommit().commit("0", committables);
+        commit.commit("0", committables);
         write.close();
 
         // read
@@ -102,6 +104,6 @@ public class WritePreemptMemoryTest extends FileStoreTableTestBase {
                                 Arrays.asList("pt", "a"),
                                 conf.toMap(),
                                 ""));
-        return new ChangelogWithKeyFileStoreTable(tablePath, schemaManager, schema, "user");
+        return new ChangelogWithKeyFileStoreTable(tablePath, schemaManager, schema);
     }
 }
diff --git a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/TableStoreJobConf.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/TableStoreJobConf.java
index 7d630b9d..db3f6821 100644
--- a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/TableStoreJobConf.java
+++ b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/TableStoreJobConf.java
@@ -31,7 +31,6 @@ import java.util.Properties;
 public class TableStoreJobConf {
 
     private static final String INTERNAL_LOCATION = "table-store.internal.location";
-    private static final String INTERNAL_FILE_STORE_USER = "table-store.internal.file-store.user";
 
     private final JobConf jobConf;
 
@@ -48,12 +47,4 @@ public class TableStoreJobConf {
     public String getLocation() {
         return jobConf.get(INTERNAL_LOCATION);
     }
-
-    public String getFileStoreUser() {
-        return jobConf.get(INTERNAL_FILE_STORE_USER);
-    }
-
-    public void setFileStoreUser(String user) {
-        jobConf.set(INTERNAL_FILE_STORE_USER, user);
-    }
 }
diff --git a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandler.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandler.java
index 94788716..f93acfc9 100644
--- a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandler.java
+++ b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandler.java
@@ -39,7 +39,6 @@ import org.apache.hadoop.mapred.OutputFormat;
 
 import java.util.Map;
 import java.util.Properties;
-import java.util.UUID;
 
 /** {@link HiveStorageHandler} for table store. This is the entrance class of Hive API. */
 public class TableStoreHiveStorageHandler
@@ -85,10 +84,7 @@ public class TableStoreHiveStorageHandler
     public void configureTableJobProperties(TableDesc tableDesc, Map<String, String> map) {}
 
     @Override
-    public void configureJobConf(TableDesc tableDesc, JobConf jobConf) {
-        TableStoreJobConf wrapper = new TableStoreJobConf(jobConf);
-        wrapper.setFileStoreUser(UUID.randomUUID().toString());
-    }
+    public void configureJobConf(TableDesc tableDesc, JobConf jobConf) {}
 
     @Override
     public void setConf(Configuration configuration) {
diff --git a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
index 1e63b051..91ef0bdc 100644
--- a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
+++ b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
@@ -72,7 +72,7 @@ public class TableStoreInputFormat implements InputFormat<Void, RowDataContainer
         TableStoreJobConf wrapper = new TableStoreJobConf(jobConf);
         Configuration conf = new Configuration();
         conf.set(CoreOptions.PATH, wrapper.getLocation());
-        return FileStoreTableFactory.create(conf, wrapper.getFileStoreUser());
+        return FileStoreTableFactory.create(conf);
     }
 
     private Optional<Predicate> createPredicate(TableSchema tableSchema, JobConf jobConf) {
diff --git a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/FileStoreTestUtils.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/FileStoreTestUtils.java
index 05ef9fbf..a583c66a 100644
--- a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/FileStoreTestUtils.java
+++ b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/FileStoreTestUtils.java
@@ -47,6 +47,6 @@ public class FileStoreTestUtils {
         // only path, other config should be read from file store.
         conf = new Configuration();
         conf.set(PATH, tablePath.toString());
-        return FileStoreTableFactory.create(conf, "user");
+        return FileStoreTableFactory.create(conf);
     }
 }
diff --git a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
index 05cff356..8b691355 100644
--- a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
+++ b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
@@ -28,6 +28,7 @@ import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.FileStoreTestUtils;
 import org.apache.flink.table.store.hive.objectinspector.TableStoreObjectInspectorFactory;
 import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.sink.TableCommit;
 import org.apache.flink.table.store.table.sink.TableWrite;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
@@ -101,13 +102,14 @@ public class TableStoreHiveStorageHandlerITCase {
                         Arrays.asList("a", "b"));
 
         TableWrite write = table.newWrite();
+        TableCommit commit = table.newCommit("user");
         write.write(GenericRowData.of(1, 10L, StringData.fromString("Hi")));
         write.write(GenericRowData.of(1, 20L, StringData.fromString("Hello")));
         write.write(GenericRowData.of(2, 30L, StringData.fromString("World")));
         write.write(GenericRowData.of(1, 10L, StringData.fromString("Hi Again")));
         write.write(GenericRowData.ofKind(RowKind.DELETE, 2, 30L, StringData.fromString("World")));
         write.write(GenericRowData.of(2, 40L, StringData.fromString("Test")));
-        table.newCommit().commit("0", write.prepareCommit());
+        commit.commit("0", write.prepareCommit());
         write.close();
 
         hiveShell.execute(
@@ -143,13 +145,14 @@ public class TableStoreHiveStorageHandlerITCase {
                         Collections.emptyList());
 
         TableWrite write = table.newWrite();
+        TableCommit commit = table.newCommit("user");
         write.write(GenericRowData.of(1, 10L, StringData.fromString("Hi")));
         write.write(GenericRowData.of(1, 20L, StringData.fromString("Hello")));
         write.write(GenericRowData.of(2, 30L, StringData.fromString("World")));
         write.write(GenericRowData.of(1, 10L, StringData.fromString("Hi")));
         write.write(GenericRowData.ofKind(RowKind.DELETE, 2, 30L, StringData.fromString("World")));
         write.write(GenericRowData.of(2, 40L, StringData.fromString("Test")));
-        table.newCommit().commit("0", write.prepareCommit());
+        commit.commit("0", write.prepareCommit());
         write.close();
 
         hiveShell.execute(
@@ -192,10 +195,11 @@ public class TableStoreHiveStorageHandlerITCase {
         }
 
         TableWrite write = table.newWrite();
+        TableCommit commit = table.newCommit("user");
         for (GenericRowData rowData : input) {
             write.write(rowData);
         }
-        table.newCommit().commit("0", write.prepareCommit());
+        commit.commit("0", write.prepareCommit());
         write.close();
 
         hiveShell.execute(
@@ -298,18 +302,19 @@ public class TableStoreHiveStorageHandlerITCase {
         // TODO add NaN related tests after FLINK-27627 and FLINK-27628 are fixed
 
         TableWrite write = table.newWrite();
+        TableCommit commit = table.newCommit("user");
         write.write(GenericRowData.of(1));
-        table.newCommit().commit("0", write.prepareCommit());
+        commit.commit("0", write.prepareCommit());
         write.write(GenericRowData.of((Object) null));
-        table.newCommit().commit("1", write.prepareCommit());
+        commit.commit("1", write.prepareCommit());
         write.write(GenericRowData.of(2));
         write.write(GenericRowData.of(3));
         write.write(GenericRowData.of((Object) null));
-        table.newCommit().commit("2", write.prepareCommit());
+        commit.commit("2", write.prepareCommit());
         write.write(GenericRowData.of(4));
         write.write(GenericRowData.of(5));
         write.write(GenericRowData.of(6));
-        table.newCommit().commit("3", write.prepareCommit());
+        commit.commit("3", write.prepareCommit());
         write.close();
 
         hiveShell.execute(
@@ -389,20 +394,21 @@ public class TableStoreHiveStorageHandlerITCase {
                         Collections.emptyList());
 
         TableWrite write = table.newWrite();
+        TableCommit commit = table.newCommit("user");
         write.write(
                 GenericRowData.of(
                         375, /* 1971-01-11 */
                         TimestampData.fromLocalDateTime(
                                 LocalDateTime.of(2022, 5, 17, 17, 29, 20))));
-        table.newCommit().commit("0", write.prepareCommit());
+        commit.commit("0", write.prepareCommit());
         write.write(GenericRowData.of(null, null));
-        table.newCommit().commit("1", write.prepareCommit());
+        commit.commit("1", write.prepareCommit());
         write.write(GenericRowData.of(376 /* 1971-01-12 */, null));
         write.write(
                 GenericRowData.of(
                         null,
                         TimestampData.fromLocalDateTime(LocalDateTime.of(2022, 6, 18, 8, 30, 0))));
-        table.newCommit().commit("2", write.prepareCommit());
+        commit.commit("2", write.prepareCommit());
         write.close();
 
         hiveShell.execute(
diff --git a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
index 610dea4f..fb68c2d5 100644
--- a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
+++ b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.table.store.RowDataContainer;
 import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.utils.RecordReader;
 import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.sink.TableCommit;
 import org.apache.flink.table.store.table.sink.TableWrite;
 import org.apache.flink.table.store.table.source.Split;
 import org.apache.flink.table.types.logical.LogicalType;
@@ -72,12 +73,13 @@ public class TableStoreRecordReaderTest {
                         Collections.singletonList("a"));
 
         TableWrite write = table.newWrite();
+        TableCommit commit = table.newCommit("user");
         write.write(GenericRowData.of(1L, StringData.fromString("Hi")));
         write.write(GenericRowData.of(2L, StringData.fromString("Hello")));
         write.write(GenericRowData.of(3L, StringData.fromString("World")));
         write.write(GenericRowData.of(1L, StringData.fromString("Hi again")));
         write.write(GenericRowData.ofKind(RowKind.DELETE, 2L, StringData.fromString("Hello")));
-        table.newCommit().commit("0", write.prepareCommit());
+        commit.commit("0", write.prepareCommit());
 
         Tuple2<RecordReader<RowData>, Long> tuple = read(table, BinaryRowDataUtil.EMPTY_ROW, 0);
         TableStoreRecordReader reader = new TableStoreRecordReader(tuple.f0, tuple.f1);
@@ -113,13 +115,14 @@ public class TableStoreRecordReaderTest {
                         Collections.emptyList());
 
         TableWrite write = table.newWrite();
+        TableCommit commit = table.newCommit("user");
         write.write(GenericRowData.of(1, StringData.fromString("Hi")));
         write.write(GenericRowData.of(2, StringData.fromString("Hello")));
         write.write(GenericRowData.of(3, StringData.fromString("World")));
         write.write(GenericRowData.of(1, StringData.fromString("Hi")));
         write.write(GenericRowData.ofKind(RowKind.DELETE, 2, StringData.fromString("Hello")));
         write.write(GenericRowData.of(1, StringData.fromString("Hi")));
-        table.newCommit().commit("0", write.prepareCommit());
+        commit.commit("0", write.prepareCommit());
 
         Tuple2<RecordReader<RowData>, Long> tuple = read(table, BinaryRowDataUtil.EMPTY_ROW, 0);
         TableStoreRecordReader reader = new TableStoreRecordReader(tuple.f0, tuple.f1);
diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java
index 14656cb5..680b8dea 100644
--- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java
+++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java
@@ -55,9 +55,9 @@ public class SimpleTableTestHelper {
                                 ""));
         Configuration conf = Configuration.fromMap(options);
         conf.setString("path", path.toString());
-        FileStoreTable table = FileStoreTableFactory.create(conf, "user");
+        FileStoreTable table = FileStoreTableFactory.create(conf);
         this.writer = table.newWrite();
-        this.commit = table.newCommit();
+        this.commit = table.newCommit("user");
     }
 
     public void write(RowData row) throws Exception {