You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/07/06 09:12:47 UTC
[flink-table-store] branch master updated: [FLINK-28221] Store commit user name into Flink sink state and correct endInput behavior
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new dc38c6c8 [FLINK-28221] Store commit user name into Flink sink state and correct endInput behavior
dc38c6c8 is described below
commit dc38c6c8279fa1582ab38c34e34d37aaa09edea2
Author: tsreaper <ts...@gmail.com>
AuthorDate: Wed Jul 6 17:12:42 2022 +0800
[FLINK-28221] Store commit user name into Flink sink state and correct endInput behavior
This closes #200
---
.../store/connector/sink/CommitterOperator.java | 67 ++++--
.../store/connector/sink/FlinkSinkBuilder.java | 5 +-
.../table/store/connector/sink/StoreSink.java | 12 +-
.../table/store/connector/FileStoreITCase.java | 8 +-
.../store/connector/sink/SinkSavepointITCase.java | 235 +++++++++++++++++++++
.../flink/table/store/file/AbstractFileStore.java | 5 +-
.../table/store/file/AppendOnlyFileStore.java | 3 +-
.../apache/flink/table/store/file/FileStore.java | 2 +-
.../flink/table/store/file/KeyValueFileStore.java | 3 +-
.../table/store/table/AbstractFileStoreTable.java | 4 +-
.../store/table/AppendOnlyFileStoreTable.java | 4 +-
.../table/ChangelogValueCountFileStoreTable.java | 3 +-
.../table/ChangelogWithKeyFileStoreTable.java | 3 +-
.../flink/table/store/table/FileStoreTable.java | 2 +-
.../table/store/table/FileStoreTableFactory.java | 20 +-
.../flink/table/store/file/TestFileStore.java | 5 +-
.../flink/table/store/file/data/DataFileTest.java | 8 +-
.../store/file/manifest/ManifestFileMetaTest.java | 7 +-
.../store/file/manifest/ManifestFileTest.java | 7 +-
.../store/file/manifest/ManifestListTest.java | 7 +-
.../store/file/operation/FileStoreCommitTest.java | 10 +-
.../store/file/operation/TestCommitThread.java | 3 +-
.../table/store/file/schema/SchemaManagerTest.java | 6 +-
.../file/utils/FailingAtomicRenameFileSystem.java | 130 +++++++++---
.../store/table/AppendOnlyFileStoreTableTest.java | 10 +-
.../ChangelogValueCountFileStoreTableTest.java | 10 +-
.../table/ChangelogWithKeyFileStoreTableTest.java | 18 +-
.../table/store/table/FileStoreTableTestBase.java | 9 +-
.../table/store/table/WritePreemptMemoryTest.java | 6 +-
.../flink/table/store/TableStoreJobConf.java | 9 -
.../store/hive/TableStoreHiveStorageHandler.java | 6 +-
.../table/store/mapred/TableStoreInputFormat.java | 2 +-
.../flink/table/store/FileStoreTestUtils.java | 2 +-
.../hive/TableStoreHiveStorageHandlerITCase.java | 26 ++-
.../store/mapred/TableStoreRecordReaderTest.java | 7 +-
.../table/store/spark/SimpleTableTestHelper.java | 4 +-
36 files changed, 514 insertions(+), 154 deletions(-)
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommitterOperator.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommitterOperator.java
index fa7f60a7..0f16adc2 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommitterOperator.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommitterOperator.java
@@ -30,6 +30,8 @@ import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.store.file.manifest.ManifestCommittable;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.SerializableFunction;
import org.apache.flink.util.function.SerializableSupplier;
import java.util.ArrayDeque;
@@ -38,6 +40,7 @@ import java.util.Deque;
import java.util.List;
import java.util.NavigableMap;
import java.util.TreeMap;
+import java.util.UUID;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -50,10 +53,13 @@ public class CommitterOperator extends AbstractStreamOperator<Committable>
/** Record all the inputs until commit. */
private final Deque<Committable> inputs = new ArrayDeque<>();
- /** The operator's state descriptor. */
- private static final ListStateDescriptor<byte[]> STREAMING_COMMITTER_RAW_STATES_DESC =
- new ListStateDescriptor<>(
- "streaming_committer_raw_states", BytePrimitiveArraySerializer.INSTANCE);
+ /**
+ * If checkpoint is enabled we should do nothing in {@link CommitterOperator#endInput}.
+ * Remaining data will be committed in {@link CommitterOperator#notifyCheckpointComplete}. If
+ * checkpoint is not enabled we need to commit remaining data in {@link
+ * CommitterOperator#endInput}.
+ */
+ private final boolean checkpointEnabled;
/** Group the committable by the checkpoint id. */
private final NavigableMap<Long, ManifestCommittable> committablesPerCheckpoint;
@@ -62,10 +68,10 @@ public class CommitterOperator extends AbstractStreamOperator<Committable>
private final SerializableSupplier<SimpleVersionedSerializer<ManifestCommittable>>
committableSerializer;
- /** The operator's state. */
+ /** ManifestCommittable state of this job. Used to filter out previous successful commits. */
private ListState<ManifestCommittable> streamingCommitterState;
- private final SerializableSupplier<Committer> committerFactory;
+ private final SerializableFunction<String, Committer> committerFactory;
/**
* Aggregate committables to global committables and commit the global committables to the
@@ -74,9 +80,11 @@ public class CommitterOperator extends AbstractStreamOperator<Committable>
private Committer committer;
public CommitterOperator(
- SerializableSupplier<Committer> committerFactory,
+ boolean checkpointEnabled,
+ SerializableFunction<String, Committer> committerFactory,
SerializableSupplier<SimpleVersionedSerializer<ManifestCommittable>>
committableSerializer) {
+ this.checkpointEnabled = checkpointEnabled;
this.committableSerializer = committableSerializer;
this.committablesPerCheckpoint = new TreeMap<>();
this.committerFactory = checkNotNull(committerFactory);
@@ -86,11 +94,41 @@ public class CommitterOperator extends AbstractStreamOperator<Committable>
@Override
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
- committer = committerFactory.get();
+
+ // commit user name state of this job
+ // each job can only have one user name and this name must be consistent across restarts
+ ListState<String> commitUserState =
+ context.getOperatorStateStore()
+ .getListState(new ListStateDescriptor<>("commit_user_state", String.class));
+ List<String> commitUsers = new ArrayList<>();
+ commitUserState.get().forEach(commitUsers::add);
+ if (context.isRestored()) {
+ Preconditions.checkState(
+ commitUsers.size() == 1,
+ "Expecting 1 commit user name when recovering from checkpoint but found "
+ + commitUsers.size()
+ + ". This is unexpected.");
+ } else {
+ Preconditions.checkState(
+ commitUsers.isEmpty(),
+ "Expecting 0 commit user name for a fresh sink state but found "
+ + commitUsers.size()
+ + ". This is unexpected.");
+ String commitUser = UUID.randomUUID().toString();
+ commitUserState.add(commitUser);
+ commitUsers.add(commitUser);
+ }
+ // we cannot use job id as commit user name here because user may change job id by creating
+ // a savepoint, stop the job and then resume from savepoint
+ committer = committerFactory.apply(commitUsers.get(0));
+
streamingCommitterState =
new SimpleVersionedListState<>(
context.getOperatorStateStore()
- .getListState(STREAMING_COMMITTER_RAW_STATES_DESC),
+ .getListState(
+ new ListStateDescriptor<>(
+ "streaming_committer_raw_states",
+ BytePrimitiveArraySerializer.INSTANCE)),
committableSerializer.get());
List<ManifestCommittable> restored = new ArrayList<>();
streamingCommitterState.get().forEach(restored::add);
@@ -127,13 +165,10 @@ public class CommitterOperator extends AbstractStreamOperator<Committable>
@Override
public void endInput() throws Exception {
- // Suppose the last checkpoint before endInput is 5. Flink Streaming Job calling order:
- // 1. Receives elements from upstream prepareSnapshotPreBarrier(5)
- // 2. this.snapshotState(5)
- // 3. Receives elements from upstream endInput
- // 4. this.endInput
- // 5. this.notifyCheckpointComplete(5)
- // So we should submit all the data in the endInput in order to avoid disordered commits.
+ if (checkpointEnabled) {
+ return;
+ }
+
long checkpointId = Long.MAX_VALUE;
List<Committable> poll = pollInputs();
if (!poll.isEmpty()) {
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java
index fa3cfeb4..b0272d95 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.store.connector.sink;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.data.RowData;
@@ -99,15 +100,17 @@ public class FlinkSinkBuilder {
partitioned.setParallelism(parallelism);
}
+ StreamExecutionEnvironment env = input.getExecutionEnvironment();
StoreSink sink =
new StoreSink(
tableIdentifier,
table,
+ env.getCheckpointConfig().isCheckpointingEnabled(),
conf.get(FlinkConnectorOptions.COMPACTION_MANUAL_TRIGGERED),
getCompactPartSpec(),
lockFactory,
overwritePartition,
logSinkFunction);
- return sink.sinkTo(new DataStream<>(input.getExecutionEnvironment(), partitioned));
+ return sink.sinkTo(new DataStream<>(env, partitioned));
}
}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
index 3a8ed3a3..0f80e62a 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
@@ -49,6 +49,8 @@ public class StoreSink implements Serializable {
private final FileStoreTable table;
+ private final boolean checkpointEnabled;
+
private final boolean compactionTask;
@Nullable private final Map<String, String> compactPartitionSpec;
@@ -62,6 +64,7 @@ public class StoreSink implements Serializable {
public StoreSink(
ObjectIdentifier tableIdentifier,
FileStoreTable table,
+ boolean checkpointEnabled,
boolean compactionTask,
@Nullable Map<String, String> compactPartitionSpec,
@Nullable CatalogLock.Factory lockFactory,
@@ -69,6 +72,7 @@ public class StoreSink implements Serializable {
@Nullable LogSinkFunction logSinkFunction) {
this.tableIdentifier = tableIdentifier;
this.table = table;
+ this.checkpointEnabled = checkpointEnabled;
this.compactionTask = compactionTask;
this.compactPartitionSpec = compactPartitionSpec;
this.lockFactory = lockFactory;
@@ -83,7 +87,7 @@ public class StoreSink implements Serializable {
return new StoreWriteOperator(table, overwritePartition, logSinkFunction);
}
- private StoreCommitter createCommitter() {
+ private StoreCommitter createCommitter(String user) {
CatalogLock catalogLock;
Lock lock;
if (lockFactory == null) {
@@ -104,7 +108,7 @@ public class StoreSink implements Serializable {
}
return new StoreCommitter(
- table.newCommit().withOverwritePartition(overwritePartition).withLock(lock),
+ table.newCommit(user).withOverwritePartition(overwritePartition).withLock(lock),
catalogLock);
}
@@ -119,7 +123,9 @@ public class StoreSink implements Serializable {
GLOBAL_COMMITTER_NAME,
typeInfo,
new CommitterOperator(
- this::createCommitter, ManifestCommittableSerializer::new))
+ checkpointEnabled,
+ this::createCommitter,
+ ManifestCommittableSerializer::new))
.setParallelism(1)
.setMaxParallelism(1);
return committed.addSink(new DiscardingSink<>()).name("end").setParallelism(1);
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
index 1eb0596c..180fef10 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
@@ -66,6 +66,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -390,8 +391,11 @@ public class FileStoreITCase extends AbstractTestBase {
if (noFail) {
options.set(PATH, folder.toURI().toString());
} else {
- FailingAtomicRenameFileSystem.get().reset(3, 100);
- options.set(PATH, FailingAtomicRenameFileSystem.getFailingPath(folder.getPath()));
+ String failingName = UUID.randomUUID().toString();
+ FailingAtomicRenameFileSystem.reset(failingName, 3, 100);
+ options.set(
+ PATH,
+ FailingAtomicRenameFileSystem.getFailingPath(failingName, folder.getPath()));
}
options.set(FILE_FORMAT, "avro");
return options;
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/SinkSavepointITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/SinkSavepointITCase.java
new file mode 100644
index 00000000..e2ab0782
--- /dev/null
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/SinkSavepointITCase.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.StateBackendOptions;
+import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointStoppingException;
+import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/** IT cases for {@link StoreSink} when writing file store and with savepoints. */
+public class SinkSavepointITCase extends AbstractTestBase {
+
+ private String path;
+ private String failingName;
+
+ @Before
+ public void before() throws Exception {
+ path = TEMPORARY_FOLDER.newFolder().toPath().toString();
+ // for failure tests
+ failingName = UUID.randomUUID().toString();
+ FailingAtomicRenameFileSystem.reset(failingName, 100, 500);
+ }
+
+ @Test(timeout = 60000)
+ public void testRecoverFromSavepoint() throws Exception {
+ String failingPath = FailingAtomicRenameFileSystem.getFailingPath(failingName, path);
+ String savepointPath = null;
+ JobID jobId;
+ ClusterClient<?> client = MINI_CLUSTER_RESOURCE.getClusterClient();
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+
+ OUTER:
+ while (true) {
+ // start a new job or recover from savepoint
+ jobId = runRecoverFromSavepointJob(failingPath, savepointPath);
+ while (true) {
+ // wait for a random number of time before stopping with savepoint
+ Thread.sleep(random.nextInt(5000));
+ if (client.getJobStatus(jobId).get() == JobStatus.FINISHED) {
+ // job finished, check for result
+ break OUTER;
+ }
+ try {
+ // try to stop with savepoint
+ savepointPath =
+ client.stopWithSavepoint(
+ jobId,
+ false,
+ path + "/savepoint",
+ SavepointFormatType.DEFAULT)
+ .get();
+ break;
+ } catch (Exception e) {
+ Optional<StopWithSavepointStoppingException> t =
+ ExceptionUtils.findThrowable(
+ e, StopWithSavepointStoppingException.class);
+ if (t.isPresent()) {
+ // savepoint has been created but notifyCheckpointComplete is not called
+ //
+ // user should follow the exception message and recover job from the
+ // specific savepoint
+ savepointPath = t.get().getSavepointPath();
+ break;
+ }
+ // savepoint creation may fail due to various reasons (for example the job is in
+ // failing state, or the job has finished), just wait for a while and try again
+ }
+ }
+ // wait for job to stop
+ while (!client.getJobStatus(jobId).get().isGloballyTerminalState()) {
+ Thread.sleep(1000);
+ }
+ // recover from savepoint in the next round
+ }
+
+ checkRecoverFromSavepointResult(failingPath);
+ }
+
+ private JobID runRecoverFromSavepointJob(String failingPath, String savepointPath)
+ throws Exception {
+ Configuration conf = new Configuration();
+ if (savepointPath != null) {
+ SavepointRestoreSettings savepointRestoreSettings =
+ SavepointRestoreSettings.forPath(savepointPath, false);
+ SavepointRestoreSettings.toConfiguration(savepointRestoreSettings, conf);
+ }
+
+ EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
+ tEnv.getConfig()
+ .getConfiguration()
+ .set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofMillis(500));
+ tEnv.getConfig().getConfiguration().set(StateBackendOptions.STATE_BACKEND, "filesystem");
+ tEnv.getConfig()
+ .getConfiguration()
+ .set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "file://" + path + "/checkpoint");
+ tEnv.getConfig()
+ .getConfiguration()
+ .set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2);
+ // we're creating multiple table environments in the same process
+ // if we do not set this option, stream node id will be different even with the same SQL
+ // if stream node id is different then we can't recover from savepoint
+ tEnv.getConfig()
+ .getConfiguration()
+ .set(ExecutionConfigOptions.TABLE_EXEC_LEGACY_TRANSFORMATION_UIDS, true);
+
+ tEnv.executeSql(
+ String.join(
+ "\n",
+ "CREATE TABLE S (",
+ " a INT",
+ ") WITH (",
+ " 'connector' = 'datagen',",
+ " 'rows-per-second' = '10000',",
+ " 'fields.a.kind' = 'sequence',",
+ " 'fields.a.start' = '0',",
+ " 'fields.a.end' = '99999'",
+ ")"));
+
+ String createCatalogSql =
+ String.join(
+ "\n",
+ "CREATE CATALOG my_catalog WITH (",
+ " 'type' = 'table-store',",
+ " 'warehouse' = '" + failingPath + "'",
+ ")");
+ FailingAtomicRenameFileSystem.retryArtificialException(
+ () -> tEnv.executeSql(createCatalogSql));
+
+ tEnv.executeSql("USE CATALOG my_catalog");
+
+ String createSinkSql =
+ String.join(
+ "\n",
+ "CREATE TABLE IF NOT EXISTS T (",
+ " a INT",
+ ") WITH (",
+ " 'bucket' = '2',",
+ " 'file.format' = 'avro'",
+ ")");
+ FailingAtomicRenameFileSystem.retryArtificialException(
+ () -> tEnv.executeSql(createSinkSql));
+
+ String insertIntoSql = "INSERT INTO T SELECT * FROM default_catalog.default_database.S";
+ JobID jobId =
+ FailingAtomicRenameFileSystem.retryArtificialException(
+ () -> tEnv.executeSql(insertIntoSql))
+ .getJobClient()
+ .get()
+ .getJobID();
+
+ ClusterClient<?> client = MINI_CLUSTER_RESOURCE.getClusterClient();
+ while (client.getJobStatus(jobId).get() == JobStatus.INITIALIZING) {
+ Thread.sleep(1000);
+ }
+ return jobId;
+ }
+
+ private void checkRecoverFromSavepointResult(String failingPath) throws Exception {
+ EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
+ TableEnvironment tEnv = TableEnvironment.create(settings);
+ // no failure should occur when checking for answer
+ FailingAtomicRenameFileSystem.reset(failingName, 0, 1);
+
+ String createCatalogSql =
+ String.join(
+ "\n",
+ "CREATE CATALOG my_catalog WITH (",
+ " 'type' = 'table-store',",
+ " 'warehouse' = '" + failingPath + "'",
+ ")");
+ tEnv.executeSql(createCatalogSql);
+
+ tEnv.executeSql("USE CATALOG my_catalog");
+
+ List<Integer> actual = new ArrayList<>();
+ try (CloseableIterator<Row> it = tEnv.executeSql("SELECT * FROM T").collect()) {
+ while (it.hasNext()) {
+ Row row = it.next();
+ Assert.assertEquals(1, row.getArity());
+ actual.add((Integer) row.getField(0));
+ }
+ }
+ Collections.sort(actual);
+ Assert.assertEquals(
+ IntStream.range(0, 100000).boxed().collect(Collectors.toList()), actual);
+ }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AbstractFileStore.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AbstractFileStore.java
index ff943a19..5773aa7a 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AbstractFileStore.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AbstractFileStore.java
@@ -39,19 +39,16 @@ public abstract class AbstractFileStore<T> implements FileStore<T> {
protected final SchemaManager schemaManager;
protected final long schemaId;
protected final CoreOptions options;
- protected final String user;
protected final RowType partitionType;
public AbstractFileStore(
SchemaManager schemaManager,
long schemaId,
CoreOptions options,
- String user,
RowType partitionType) {
this.schemaManager = schemaManager;
this.schemaId = schemaId;
this.options = options;
- this.user = user;
this.partitionType = partitionType;
}
@@ -94,7 +91,7 @@ public abstract class AbstractFileStore<T> implements FileStore<T> {
}
@Override
- public FileStoreCommitImpl newCommit() {
+ public FileStoreCommitImpl newCommit(String user) {
return new FileStoreCommitImpl(
schemaId,
user,
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java
index 897ab2dc..40d4203e 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java
@@ -35,10 +35,9 @@ public class AppendOnlyFileStore extends AbstractFileStore<RowData> {
SchemaManager schemaManager,
long schemaId,
CoreOptions options,
- String user,
RowType partitionType,
RowType rowType) {
- super(schemaManager, schemaId, options, user, partitionType);
+ super(schemaManager, schemaId, options, partitionType);
this.rowType = rowType;
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStore.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStore.java
index 9df275aa..97f390f7 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStore.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStore.java
@@ -45,7 +45,7 @@ public interface FileStore<T> extends Serializable {
FileStoreWrite<T> newWrite();
- FileStoreCommit newCommit();
+ FileStoreCommit newCommit(String user);
FileStoreExpire newExpire();
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
index 9443ddd2..58b43685 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
@@ -45,12 +45,11 @@ public class KeyValueFileStore extends AbstractFileStore<KeyValue> {
SchemaManager schemaManager,
long schemaId,
CoreOptions options,
- String user,
RowType partitionType,
RowType keyType,
RowType valueType,
MergeFunction mergeFunction) {
- super(schemaManager, schemaId, options, user, partitionType);
+ super(schemaManager, schemaId, options, partitionType);
this.keyType = keyType;
this.valueType = valueType;
this.mergeFunction = mergeFunction;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AbstractFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AbstractFileStoreTable.java
index 0897416e..61ee3e50 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AbstractFileStoreTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AbstractFileStoreTable.java
@@ -56,8 +56,8 @@ public abstract class AbstractFileStoreTable implements FileStoreTable {
}
@Override
- public TableCommit newCommit() {
- return new TableCommit(store().newCommit(), store().newExpire());
+ public TableCommit newCommit(String user) {
+ return new TableCommit(store().newCommit(user), store().newExpire());
}
@Override
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
index 103dc3a9..4d343810 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
@@ -52,15 +52,13 @@ public class AppendOnlyFileStoreTable extends AbstractFileStoreTable {
private final AppendOnlyFileStore store;
- AppendOnlyFileStoreTable(
- Path path, SchemaManager schemaManager, TableSchema tableSchema, String user) {
+ AppendOnlyFileStoreTable(Path path, SchemaManager schemaManager, TableSchema tableSchema) {
super(path, tableSchema);
this.store =
new AppendOnlyFileStore(
schemaManager,
tableSchema.id(),
new CoreOptions(tableSchema.options()),
- user,
tableSchema.logicalPartitionType(),
tableSchema.logicalRowType());
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
index 2830700f..855602f5 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
@@ -57,7 +57,7 @@ public class ChangelogValueCountFileStoreTable extends AbstractFileStoreTable {
private final KeyValueFileStore store;
ChangelogValueCountFileStoreTable(
- Path path, SchemaManager schemaManager, TableSchema tableSchema, String user) {
+ Path path, SchemaManager schemaManager, TableSchema tableSchema) {
super(path, tableSchema);
RowType countType =
RowType.of(
@@ -68,7 +68,6 @@ public class ChangelogValueCountFileStoreTable extends AbstractFileStoreTable {
schemaManager,
tableSchema.id(),
new CoreOptions(tableSchema.options()),
- user,
tableSchema.logicalPartitionType(),
tableSchema.logicalRowType(),
countType,
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
index 7446cf9a..424a1dfb 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
@@ -63,7 +63,7 @@ public class ChangelogWithKeyFileStoreTable extends AbstractFileStoreTable {
private final KeyValueFileStore store;
ChangelogWithKeyFileStoreTable(
- Path path, SchemaManager schemaManager, TableSchema tableSchema, String user) {
+ Path path, SchemaManager schemaManager, TableSchema tableSchema) {
super(path, tableSchema);
RowType rowType = tableSchema.logicalRowType();
@@ -104,7 +104,6 @@ public class ChangelogWithKeyFileStoreTable extends AbstractFileStoreTable {
schemaManager,
tableSchema.id(),
new CoreOptions(conf),
- user,
tableSchema.logicalPartitionType(),
keyType,
rowType,
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
index 82074f32..afaa7508 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
@@ -47,7 +47,7 @@ public interface FileStoreTable extends Serializable {
TableWrite newWrite();
- TableCommit newCommit();
+ TableCommit newCommit(String user);
TableCompact newCompact();
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTableFactory.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTableFactory.java
index 440222f8..f9bb7881 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTableFactory.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTableFactory.java
@@ -25,8 +25,6 @@ import org.apache.flink.table.store.file.WriteMode;
import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.schema.TableSchema;
-import java.util.UUID;
-
import static org.apache.flink.table.store.CoreOptions.PATH;
/** Factory to create {@link FileStoreTable}. */
@@ -39,10 +37,6 @@ public class FileStoreTableFactory {
}
public static FileStoreTable create(Configuration conf) {
- return create(conf, UUID.randomUUID().toString());
- }
-
- public static FileStoreTable create(Configuration conf, String user) {
Path tablePath = CoreOptions.path(conf);
TableSchema tableSchema =
new SchemaManager(tablePath)
@@ -53,15 +47,15 @@ public class FileStoreTableFactory {
"Schema file not found in location "
+ tablePath
+ ". Please create table first."));
- return create(tablePath, tableSchema, conf, user);
+ return create(tablePath, tableSchema, conf);
}
public static FileStoreTable create(Path tablePath, TableSchema tableSchema) {
- return create(tablePath, tableSchema, new Configuration(), UUID.randomUUID().toString());
+ return create(tablePath, tableSchema, new Configuration());
}
public static FileStoreTable create(
- Path tablePath, TableSchema tableSchema, Configuration dynamicOptions, String user) {
+ Path tablePath, TableSchema tableSchema, Configuration dynamicOptions) {
// merge dynamic options into schema.options
Configuration newOptions = Configuration.fromMap(tableSchema.options());
dynamicOptions.toMap().forEach(newOptions::setString);
@@ -70,14 +64,12 @@ public class FileStoreTableFactory {
SchemaManager schemaManager = new SchemaManager(tablePath);
if (newOptions.get(CoreOptions.WRITE_MODE) == WriteMode.APPEND_ONLY) {
- return new AppendOnlyFileStoreTable(tablePath, schemaManager, tableSchema, user);
+ return new AppendOnlyFileStoreTable(tablePath, schemaManager, tableSchema);
} else {
if (tableSchema.primaryKeys().isEmpty()) {
- return new ChangelogValueCountFileStoreTable(
- tablePath, schemaManager, tableSchema, user);
+ return new ChangelogValueCountFileStoreTable(tablePath, schemaManager, tableSchema);
} else {
- return new ChangelogWithKeyFileStoreTable(
- tablePath, schemaManager, tableSchema, user);
+ return new ChangelogWithKeyFileStoreTable(tablePath, schemaManager, tableSchema);
}
}
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
index a7ff2a86..21ed64a7 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
@@ -82,6 +82,7 @@ public class TestFileStore extends KeyValueFileStore {
private final String root;
private final RowDataSerializer keySerializer;
private final RowDataSerializer valueSerializer;
+ private final String user;
public static TestFileStore create(
String format,
@@ -121,7 +122,6 @@ public class TestFileStore extends KeyValueFileStore {
new SchemaManager(options.path()),
0L,
options,
- UUID.randomUUID().toString(),
partitionType,
keyType,
valueType,
@@ -129,6 +129,7 @@ public class TestFileStore extends KeyValueFileStore {
this.root = root;
this.keySerializer = new RowDataSerializer(keyType);
this.valueSerializer = new RowDataSerializer(valueType);
+ this.user = UUID.randomUUID().toString();
}
public FileStoreExpireImpl newExpire(
@@ -220,7 +221,7 @@ public class TestFileStore extends KeyValueFileStore {
.write(kv);
}
- FileStoreCommit commit = newCommit();
+ FileStoreCommit commit = newCommit(user);
ManifestCommittable committable =
new ManifestCommittable(String.valueOf(new Random().nextLong()));
for (Map.Entry<BinaryRowData, Map<Integer, RecordWriter<KeyValue>>> entryWithPartition :
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTest.java
index 81694a3f..6c851525 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTest.java
@@ -49,6 +49,7 @@ import org.junit.jupiter.api.io.TempDir;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
+import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Function;
@@ -100,11 +101,14 @@ public class DataFileTest {
@RepeatedTest(10)
public void testCleanUpForException() throws IOException {
- FailingAtomicRenameFileSystem.get().reset(1, 10);
+ String failingName = UUID.randomUUID().toString();
+ FailingAtomicRenameFileSystem.reset(failingName, 1, 10);
DataFileTestDataGenerator.Data data = gen.next();
DataFileWriter writer =
createDataFileWriter(
- FailingAtomicRenameFileSystem.getFailingPath(tempDir.toString()), "avro");
+ FailingAtomicRenameFileSystem.getFailingPath(
+ failingName, tempDir.toString()),
+ "avro");
try {
writer.write(CloseableIterator.fromList(data.content, kv -> {}), 0);
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
index 8bb8194f..2ebdaf40 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
@@ -44,6 +44,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.TreeSet;
+import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -107,12 +108,14 @@ public class ManifestFileMetaTest {
@RepeatedTest(10)
public void testCleanUpForException() throws IOException {
- FailingAtomicRenameFileSystem.get().reset(1, 10);
+ String failingName = UUID.randomUUID().toString();
+ FailingAtomicRenameFileSystem.reset(failingName, 1, 10);
List<ManifestFileMeta> input = new ArrayList<>();
createData(ThreadLocalRandom.current().nextInt(5), input, null);
ManifestFile failingManifestFile =
createManifestFile(
- FailingAtomicRenameFileSystem.getFailingPath(tempDir.toString()));
+ FailingAtomicRenameFileSystem.getFailingPath(
+ failingName, tempDir.toString()));
try {
ManifestFileMeta.merge(input, failingManifestFile, 500, 3);
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java
index 98e83fdb..d176ecc4 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java
@@ -34,6 +34,7 @@ import org.junit.jupiter.api.io.TempDir;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
@@ -65,11 +66,13 @@ public class ManifestFileTest {
@RepeatedTest(10)
public void testCleanUpForException() throws IOException {
- FailingAtomicRenameFileSystem.get().reset(1, 10);
+ String failingName = UUID.randomUUID().toString();
+ FailingAtomicRenameFileSystem.reset(failingName, 1, 10);
List<ManifestEntry> entries = generateData();
ManifestFile manifestFile =
createManifestFile(
- FailingAtomicRenameFileSystem.getFailingPath(tempDir.toString()));
+ FailingAtomicRenameFileSystem.getFailingPath(
+ failingName, tempDir.toString()));
try {
manifestFile.write(entries);
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestListTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestListTest.java
index 95bfca51..c815be6d 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestListTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestListTest.java
@@ -34,6 +34,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
+import java.util.UUID;
import static org.assertj.core.api.Assertions.assertThat;
@@ -57,11 +58,13 @@ public class ManifestListTest {
@RepeatedTest(10)
public void testCleanUpForException() throws IOException {
- FailingAtomicRenameFileSystem.get().reset(1, 3);
+ String failingName = UUID.randomUUID().toString();
+ FailingAtomicRenameFileSystem.reset(failingName, 1, 3);
List<ManifestFileMeta> metas = generateData();
ManifestList manifestList =
createManifestList(
- FailingAtomicRenameFileSystem.getFailingPath(tempDir.toString()));
+ FailingAtomicRenameFileSystem.getFailingPath(
+ failingName, tempDir.toString()));
try {
manifestList.write(metas);
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
index d75949cf..169dbf18 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
@@ -46,6 +46,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@@ -58,13 +59,15 @@ public class FileStoreCommitTest {
private static final Logger LOG = LoggerFactory.getLogger(FileStoreCommitTest.class);
private TestKeyValueGenerator gen;
+ private String failingName;
@TempDir java.nio.file.Path tempDir;
@BeforeEach
public void beforeEach() {
gen = new TestKeyValueGenerator();
// for failure tests
- FailingAtomicRenameFileSystem.get().reset(100, 5000);
+ failingName = UUID.randomUUID().toString();
+ FailingAtomicRenameFileSystem.reset(failingName, 100, 100);
}
@ParameterizedTest
@@ -105,7 +108,7 @@ public class FileStoreCommitTest {
Path firstSnapshotPath = snapshotManager.snapshotPath(Snapshot.FIRST_SNAPSHOT_ID);
FileUtils.deleteOrWarn(firstSnapshotPath);
// this test succeeds if this call does not fail
- store.newCommit()
+ store.newCommit(UUID.randomUUID().toString())
.filterCommitted(Collections.singletonList(new ManifestCommittable("dummy")));
}
@@ -286,7 +289,8 @@ public class FileStoreCommitTest {
private TestFileStore createStore(boolean failing, int numBucket) {
String root =
failing
- ? FailingAtomicRenameFileSystem.getFailingPath(tempDir.toString())
+ ? FailingAtomicRenameFileSystem.getFailingPath(
+ failingName, tempDir.toString())
: TestAtomicRenameFileSystem.SCHEME + "://" + tempDir.toString();
return TestFileStore.create(
"avro",
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
index 3c927599..fe1b7d7e 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
@@ -35,6 +35,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
@@ -64,7 +65,7 @@ public class TestCommitThread extends Thread {
this.writers = new HashMap<>();
this.write = safeStore.newWrite();
- this.commit = testStore.newCommit();
+ this.commit = testStore.newCommit(UUID.randomUUID().toString());
}
public Map<BinaryRowData, List<KeyValue>> getResult() {
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaManagerTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaManagerTest.java
index dc86ac95..59f3b959 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaManagerTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaManagerTest.java
@@ -40,6 +40,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -65,8 +66,9 @@ public class SchemaManagerTest {
@BeforeEach
public void beforeEach() throws IOException {
// for failure tests
- FailingAtomicRenameFileSystem.get().reset(100, 5000);
- String root = FailingAtomicRenameFileSystem.getFailingPath(tempDir.toString());
+ String failingName = UUID.randomUUID().toString();
+ FailingAtomicRenameFileSystem.reset(failingName, 100, 100);
+ String root = FailingAtomicRenameFileSystem.getFailingPath(failingName, tempDir.toString());
manager = new SchemaManager(new Path(root));
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FailingAtomicRenameFileSystem.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FailingAtomicRenameFileSystem.java
index 8e1b4ccd..b922423f 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FailingAtomicRenameFileSystem.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FailingAtomicRenameFileSystem.java
@@ -18,15 +18,21 @@
package org.apache.flink.table.store.file.utils;
+import org.apache.flink.core.fs.BlockLocation;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataInputStreamWrapper;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FSDataOutputStreamWrapper;
+import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.core.fs.LocatedFileStatus;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.local.LocalBlockLocation;
import org.apache.flink.util.ExceptionUtils;
+import java.io.File;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.util.concurrent.Callable;
@@ -41,29 +47,30 @@ public class FailingAtomicRenameFileSystem extends TestAtomicRenameFileSystem {
public static final String SCHEME = "fail";
- private final String threadName;
+ private final String name;
private final AtomicInteger failCounter = new AtomicInteger();
private int failPossibility;
- public FailingAtomicRenameFileSystem(String threadName) {
- this.threadName = threadName;
+ private FailingAtomicRenameFileSystem(String name) {
+ this.name = name;
}
- public static FailingAtomicRenameFileSystem get() {
+ public static String getFailingPath(String name, String path) {
+ // set authority as given name so that different tests use different instances
+ // for more information see FileSystem#getUnguardedFileSystem for the caching strategy
+ return SCHEME + "://" + name + path;
+ }
+
+ public static void reset(String name, int maxFails, int failPossibility) {
try {
- return (FailingAtomicRenameFileSystem) new Path(getFailingPath("/")).getFileSystem();
+ ((FailingAtomicRenameFileSystem) new Path(getFailingPath(name, "/")).getFileSystem())
+ .reset(maxFails, failPossibility);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
- public static String getFailingPath(String path) {
- // set authority as thread name so that different testing threads use different instances
- // for more information see FileSystem#getUnguardedFileSystem for the caching strategy
- return SCHEME + "://" + Thread.currentThread().getName() + path;
- }
-
- public void reset(int maxFails, int failPossibility) {
+ private void reset(int maxFails, int failPossibility) {
failCounter.set(maxFails);
this.failPossibility = failPossibility;
}
@@ -86,7 +93,22 @@ public class FailingAtomicRenameFileSystem extends TestAtomicRenameFileSystem {
@Override
public URI getUri() {
- return URI.create(SCHEME + "://" + threadName + "/");
+ return URI.create(SCHEME + "://" + name + "/");
+ }
+
+ @Override
+ public FileStatus getFileStatus(Path path) throws IOException {
+ File file = this.pathToFile(path);
+ if (file.exists()) {
+ return new FailingLocalFileStatus(file, path);
+ } else {
+ throw new FileNotFoundException(
+ "File "
+ + path
+ + " does not exist or the user running Flink ('"
+ + System.getProperty("user.name")
+ + "') has insufficient permissions to access it.");
+ }
}
/** {@link FileSystemFactory} for {@link FailingAtomicRenameFileSystem}. */
@@ -117,18 +139,11 @@ public class FailingAtomicRenameFileSystem extends TestAtomicRenameFileSystem {
super(inputStream);
}
- @Override
- public int read() throws IOException {
- if (ThreadLocalRandom.current().nextInt(failPossibility) == 0
- && failCounter.getAndDecrement() > 0) {
- throw new ArtificialException();
- }
- return super.read();
- }
-
@Override
public int read(byte[] b, int off, int len) throws IOException {
- if (ThreadLocalRandom.current().nextInt(failPossibility) == 0
+ // only fail when reading more than 1 byte so that we won't fail too often
+ if (b.length > 1
+ && ThreadLocalRandom.current().nextInt(failPossibility) == 0
&& failCounter.getAndDecrement() > 0) {
throw new ArtificialException();
}
@@ -143,21 +158,72 @@ public class FailingAtomicRenameFileSystem extends TestAtomicRenameFileSystem {
}
@Override
- public void write(int b) throws IOException {
- if (ThreadLocalRandom.current().nextInt(failPossibility) == 0
+ public void write(byte[] b, int off, int len) throws IOException {
+ // only fail when writing more than 1 byte so that we won't fail too often
+ if (b.length > 1
+ && ThreadLocalRandom.current().nextInt(failPossibility) == 0
&& failCounter.getAndDecrement() > 0) {
throw new ArtificialException();
}
- super.write(b);
+ super.write(b, off, len);
+ }
+ }
+
+ private static class FailingLocalFileStatus implements LocatedFileStatus {
+
+ private final File file;
+ private final Path path;
+ private final long len;
+
+ private FailingLocalFileStatus(File file, Path path) {
+ this.file = file;
+ this.path = path;
+ this.len = file.length();
}
@Override
- public void write(byte[] b, int off, int len) throws IOException {
- if (ThreadLocalRandom.current().nextInt(failPossibility) == 0
- && failCounter.getAndDecrement() > 0) {
- throw new ArtificialException();
- }
- super.write(b, off, len);
+ public BlockLocation[] getBlockLocations() {
+ return new BlockLocation[] {new LocalBlockLocation(len)};
+ }
+
+ @Override
+ public long getLen() {
+ return len;
+ }
+
+ @Override
+ public long getBlockSize() {
+ return len;
+ }
+
+ @Override
+ public short getReplication() {
+ return 1;
+ }
+
+ @Override
+ public long getModificationTime() {
+ return file.lastModified();
+ }
+
+ @Override
+ public long getAccessTime() {
+ return 0;
+ }
+
+ @Override
+ public boolean isDir() {
+ return file.isDirectory();
+ }
+
+ @Override
+ public Path getPath() {
+ return path;
+ }
+
+ @Override
+ public String toString() {
+ return "FailingLocalFileStatus{file=" + this.file + ", path=" + this.path + '}';
}
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
index 67778c2d..b6bfd6bf 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.table.store.file.predicate.PredicateBuilder;
import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.file.schema.UpdateSchema;
+import org.apache.flink.table.store.table.sink.TableCommit;
import org.apache.flink.table.store.table.sink.TableWrite;
import org.apache.flink.table.store.table.source.Split;
import org.apache.flink.table.store.table.source.TableRead;
@@ -140,21 +141,22 @@ public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase {
private void writeData() throws Exception {
FileStoreTable table = createFileStoreTable();
TableWrite write = table.newWrite();
+ TableCommit commit = table.newCommit("user");
write.write(GenericRowData.of(1, 10, 100L));
write.write(GenericRowData.of(2, 20, 200L));
write.write(GenericRowData.of(1, 11, 101L));
- table.newCommit().commit("0", write.prepareCommit());
+ commit.commit("0", write.prepareCommit());
write.write(GenericRowData.of(1, 12, 102L));
write.write(GenericRowData.of(2, 21, 201L));
write.write(GenericRowData.of(2, 22, 202L));
- table.newCommit().commit("1", write.prepareCommit());
+ commit.commit("1", write.prepareCommit());
write.write(GenericRowData.of(1, 11, 101L));
write.write(GenericRowData.of(2, 21, 201L));
write.write(GenericRowData.of(1, 12, 102L));
- table.newCommit().commit("2", write.prepareCommit());
+ commit.commit("2", write.prepareCommit());
write.close();
}
@@ -175,6 +177,6 @@ public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase {
Collections.emptyList(),
conf.toMap(),
""));
- return new AppendOnlyFileStoreTable(tablePath, schemaManager, tableSchema, "user");
+ return new AppendOnlyFileStoreTable(tablePath, schemaManager, tableSchema);
}
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
index 7e5422dc..e5169555 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.table.store.file.predicate.PredicateBuilder;
import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.file.schema.UpdateSchema;
+import org.apache.flink.table.store.table.sink.TableCommit;
import org.apache.flink.table.store.table.sink.TableWrite;
import org.apache.flink.table.store.table.source.Split;
import org.apache.flink.table.store.table.source.TableRead;
@@ -140,24 +141,25 @@ public class ChangelogValueCountFileStoreTableTest extends FileStoreTableTestBas
private void writeData() throws Exception {
FileStoreTable table = createFileStoreTable();
TableWrite write = table.newWrite();
+ TableCommit commit = table.newCommit("user");
write.write(GenericRowData.of(1, 10, 100L));
write.write(GenericRowData.of(2, 20, 200L));
write.write(GenericRowData.of(1, 11, 101L));
- table.newCommit().commit("0", write.prepareCommit());
+ commit.commit("0", write.prepareCommit());
write.write(GenericRowData.of(2, 21, 201L));
write.write(GenericRowData.of(1, 12, 102L));
write.write(GenericRowData.of(2, 21, 201L));
write.write(GenericRowData.of(2, 21, 201L));
- table.newCommit().commit("1", write.prepareCommit());
+ commit.commit("1", write.prepareCommit());
write.write(GenericRowData.of(1, 11, 101L));
write.write(GenericRowData.of(2, 22, 202L));
write.write(GenericRowData.ofKind(RowKind.DELETE, 2, 21, 201L));
write.write(GenericRowData.ofKind(RowKind.DELETE, 1, 10, 100L));
write.write(GenericRowData.ofKind(RowKind.DELETE, 2, 21, 201L));
- table.newCommit().commit("2", write.prepareCommit());
+ commit.commit("2", write.prepareCommit());
write.close();
}
@@ -178,6 +180,6 @@ public class ChangelogValueCountFileStoreTableTest extends FileStoreTableTestBas
Collections.emptyList(),
conf.toMap(),
""));
- return new ChangelogValueCountFileStoreTable(tablePath, schemaManager, tableSchema, "user");
+ return new ChangelogValueCountFileStoreTable(tablePath, schemaManager, tableSchema);
}
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
index b4ac1eb1..4c3b999d 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.table.store.file.predicate.PredicateBuilder;
import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.file.schema.UpdateSchema;
+import org.apache.flink.table.store.table.sink.TableCommit;
import org.apache.flink.table.store.table.sink.TableWrite;
import org.apache.flink.table.store.table.source.Split;
import org.apache.flink.table.store.table.source.TableRead;
@@ -53,12 +54,13 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
FileStoreTable table =
createFileStoreTable(conf -> conf.set(CoreOptions.SEQUENCE_FIELD, "b"));
TableWrite write = table.newWrite();
+ TableCommit commit = table.newCommit("user");
write.write(GenericRowData.of(1, 10, 200L));
write.write(GenericRowData.of(1, 10, 100L));
write.write(GenericRowData.of(1, 11, 101L));
- table.newCommit().commit("0", write.prepareCommit());
+ commit.commit("0", write.prepareCommit());
write.write(GenericRowData.of(1, 11, 55L));
- table.newCommit().commit("1", write.prepareCommit());
+ commit.commit("1", write.prepareCommit());
write.close();
List<Split> splits = table.newScan().plan().splits;
@@ -161,12 +163,13 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
public void testStreamingChangelog() throws Exception {
FileStoreTable table = createFileStoreTable(true);
TableWrite write = table.newWrite();
+ TableCommit commit = table.newCommit("user");
write.write(GenericRowData.of(1, 10, 100L));
write.write(GenericRowData.ofKind(RowKind.DELETE, 1, 10, 100L));
write.write(GenericRowData.of(1, 10, 101L));
write.write(GenericRowData.ofKind(RowKind.UPDATE_BEFORE, 1, 10, 101L));
write.write(GenericRowData.ofKind(RowKind.UPDATE_AFTER, 1, 10, 102L));
- table.newCommit().commit("0", write.prepareCommit());
+ commit.commit("0", write.prepareCommit());
write.close();
List<Split> splits = table.newScan().withIncremental(true).plan().splits;
@@ -184,23 +187,24 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
private void writeData() throws Exception {
FileStoreTable table = createFileStoreTable();
TableWrite write = table.newWrite();
+ TableCommit commit = table.newCommit("user");
write.write(GenericRowData.of(1, 10, 100L));
write.write(GenericRowData.of(2, 20, 200L));
write.write(GenericRowData.of(1, 11, 101L));
- table.newCommit().commit("0", write.prepareCommit());
+ commit.commit("0", write.prepareCommit());
write.write(GenericRowData.of(1, 10, 1000L));
write.write(GenericRowData.of(2, 21, 201L));
write.write(GenericRowData.of(2, 21, 2001L));
- table.newCommit().commit("1", write.prepareCommit());
+ commit.commit("1", write.prepareCommit());
write.write(GenericRowData.of(1, 11, 1001L));
write.write(GenericRowData.of(2, 21, 20001L));
write.write(GenericRowData.of(2, 22, 202L));
write.write(GenericRowData.ofKind(RowKind.DELETE, 1, 11, 1001L));
write.write(GenericRowData.ofKind(RowKind.DELETE, 2, 20, 200L));
- table.newCommit().commit("2", write.prepareCommit());
+ commit.commit("2", write.prepareCommit());
write.close();
}
@@ -231,6 +235,6 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
Arrays.asList("pt", "a"),
conf.toMap(),
""));
- return new ChangelogWithKeyFileStoreTable(tablePath, schemaManager, tableSchema, "user");
+ return new ChangelogWithKeyFileStoreTable(tablePath, schemaManager, tableSchema);
}
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
index e36a7314..f41de18b 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
@@ -27,6 +27,7 @@ import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader;
import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader.ReaderSupplier;
import org.apache.flink.table.store.file.utils.RecordReader;
import org.apache.flink.table.store.file.utils.RecordReaderIterator;
+import org.apache.flink.table.store.table.sink.TableCommit;
import org.apache.flink.table.store.table.sink.TableWrite;
import org.apache.flink.table.store.table.source.Split;
import org.apache.flink.table.store.table.source.TableRead;
@@ -78,18 +79,18 @@ public abstract class FileStoreTableTestBase {
FileStoreTable table = createFileStoreTable();
TableWrite write = table.newWrite();
+ TableCommit commit = table.newCommit("user");
write.write(GenericRowData.of(1, 10, 100L));
write.write(GenericRowData.of(2, 20, 200L));
- table.newCommit().commit("0", write.prepareCommit());
+ commit.commit("0", write.prepareCommit());
write.close();
write = table.newWrite().withOverwrite(true);
+ commit = table.newCommit("user");
write.write(GenericRowData.of(2, 21, 201L));
Map<String, String> overwritePartition = new HashMap<>();
overwritePartition.put("pt", "2");
- table.newCommit()
- .withOverwritePartition(overwritePartition)
- .commit("1", write.prepareCommit());
+ commit.withOverwritePartition(overwritePartition).commit("1", write.prepareCommit());
write.close();
List<Split> splits = table.newScan().plan().splits;
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
index b9f77bbd..85c65757 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.file.schema.UpdateSchema;
import org.apache.flink.table.store.table.sink.FileCommittable;
+import org.apache.flink.table.store.table.sink.TableCommit;
import org.apache.flink.table.store.table.sink.TableWrite;
import org.apache.flink.table.store.table.source.Split;
import org.apache.flink.table.store.table.source.TableRead;
@@ -62,6 +63,7 @@ public class WritePreemptMemoryTest extends FileStoreTableTestBase {
// write
FileStoreTable table = createFileStoreTable();
TableWrite write = table.newWrite();
+ TableCommit commit = table.newCommit("user");
Random random = new Random();
List<String> expected = new ArrayList<>();
for (int i = 0; i < 10_000; i++) {
@@ -71,7 +73,7 @@ public class WritePreemptMemoryTest extends FileStoreTableTestBase {
expected.add(BATCH_ROW_TO_STRING.apply(row));
}
List<FileCommittable> committables = write.prepareCommit();
- table.newCommit().commit("0", committables);
+ commit.commit("0", committables);
write.close();
// read
@@ -102,6 +104,6 @@ public class WritePreemptMemoryTest extends FileStoreTableTestBase {
Arrays.asList("pt", "a"),
conf.toMap(),
""));
- return new ChangelogWithKeyFileStoreTable(tablePath, schemaManager, schema, "user");
+ return new ChangelogWithKeyFileStoreTable(tablePath, schemaManager, schema);
}
}
diff --git a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/TableStoreJobConf.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/TableStoreJobConf.java
index 7d630b9d..db3f6821 100644
--- a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/TableStoreJobConf.java
+++ b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/TableStoreJobConf.java
@@ -31,7 +31,6 @@ import java.util.Properties;
public class TableStoreJobConf {
private static final String INTERNAL_LOCATION = "table-store.internal.location";
- private static final String INTERNAL_FILE_STORE_USER = "table-store.internal.file-store.user";
private final JobConf jobConf;
@@ -48,12 +47,4 @@ public class TableStoreJobConf {
public String getLocation() {
return jobConf.get(INTERNAL_LOCATION);
}
-
- public String getFileStoreUser() {
- return jobConf.get(INTERNAL_FILE_STORE_USER);
- }
-
- public void setFileStoreUser(String user) {
- jobConf.set(INTERNAL_FILE_STORE_USER, user);
- }
}
diff --git a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandler.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandler.java
index 94788716..f93acfc9 100644
--- a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandler.java
+++ b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandler.java
@@ -39,7 +39,6 @@ import org.apache.hadoop.mapred.OutputFormat;
import java.util.Map;
import java.util.Properties;
-import java.util.UUID;
/** {@link HiveStorageHandler} for table store. This is the entrance class of Hive API. */
public class TableStoreHiveStorageHandler
@@ -85,10 +84,7 @@ public class TableStoreHiveStorageHandler
public void configureTableJobProperties(TableDesc tableDesc, Map<String, String> map) {}
@Override
- public void configureJobConf(TableDesc tableDesc, JobConf jobConf) {
- TableStoreJobConf wrapper = new TableStoreJobConf(jobConf);
- wrapper.setFileStoreUser(UUID.randomUUID().toString());
- }
+ public void configureJobConf(TableDesc tableDesc, JobConf jobConf) {}
@Override
public void setConf(Configuration configuration) {
diff --git a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
index 1e63b051..91ef0bdc 100644
--- a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
+++ b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
@@ -72,7 +72,7 @@ public class TableStoreInputFormat implements InputFormat<Void, RowDataContainer
TableStoreJobConf wrapper = new TableStoreJobConf(jobConf);
Configuration conf = new Configuration();
conf.set(CoreOptions.PATH, wrapper.getLocation());
- return FileStoreTableFactory.create(conf, wrapper.getFileStoreUser());
+ return FileStoreTableFactory.create(conf);
}
private Optional<Predicate> createPredicate(TableSchema tableSchema, JobConf jobConf) {
diff --git a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/FileStoreTestUtils.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/FileStoreTestUtils.java
index 05ef9fbf..a583c66a 100644
--- a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/FileStoreTestUtils.java
+++ b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/FileStoreTestUtils.java
@@ -47,6 +47,6 @@ public class FileStoreTestUtils {
// only path, other config should be read from file store.
conf = new Configuration();
conf.set(PATH, tablePath.toString());
- return FileStoreTableFactory.create(conf, "user");
+ return FileStoreTableFactory.create(conf);
}
}
diff --git a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
index 05cff356..8b691355 100644
--- a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
+++ b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
@@ -28,6 +28,7 @@ import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.FileStoreTestUtils;
import org.apache.flink.table.store.hive.objectinspector.TableStoreObjectInspectorFactory;
import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.sink.TableCommit;
import org.apache.flink.table.store.table.sink.TableWrite;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
@@ -101,13 +102,14 @@ public class TableStoreHiveStorageHandlerITCase {
Arrays.asList("a", "b"));
TableWrite write = table.newWrite();
+ TableCommit commit = table.newCommit("user");
write.write(GenericRowData.of(1, 10L, StringData.fromString("Hi")));
write.write(GenericRowData.of(1, 20L, StringData.fromString("Hello")));
write.write(GenericRowData.of(2, 30L, StringData.fromString("World")));
write.write(GenericRowData.of(1, 10L, StringData.fromString("Hi Again")));
write.write(GenericRowData.ofKind(RowKind.DELETE, 2, 30L, StringData.fromString("World")));
write.write(GenericRowData.of(2, 40L, StringData.fromString("Test")));
- table.newCommit().commit("0", write.prepareCommit());
+ commit.commit("0", write.prepareCommit());
write.close();
hiveShell.execute(
@@ -143,13 +145,14 @@ public class TableStoreHiveStorageHandlerITCase {
Collections.emptyList());
TableWrite write = table.newWrite();
+ TableCommit commit = table.newCommit("user");
write.write(GenericRowData.of(1, 10L, StringData.fromString("Hi")));
write.write(GenericRowData.of(1, 20L, StringData.fromString("Hello")));
write.write(GenericRowData.of(2, 30L, StringData.fromString("World")));
write.write(GenericRowData.of(1, 10L, StringData.fromString("Hi")));
write.write(GenericRowData.ofKind(RowKind.DELETE, 2, 30L, StringData.fromString("World")));
write.write(GenericRowData.of(2, 40L, StringData.fromString("Test")));
- table.newCommit().commit("0", write.prepareCommit());
+ commit.commit("0", write.prepareCommit());
write.close();
hiveShell.execute(
@@ -192,10 +195,11 @@ public class TableStoreHiveStorageHandlerITCase {
}
TableWrite write = table.newWrite();
+ TableCommit commit = table.newCommit("user");
for (GenericRowData rowData : input) {
write.write(rowData);
}
- table.newCommit().commit("0", write.prepareCommit());
+ commit.commit("0", write.prepareCommit());
write.close();
hiveShell.execute(
@@ -298,18 +302,19 @@ public class TableStoreHiveStorageHandlerITCase {
// TODO add NaN related tests after FLINK-27627 and FLINK-27628 are fixed
TableWrite write = table.newWrite();
+ TableCommit commit = table.newCommit("user");
write.write(GenericRowData.of(1));
- table.newCommit().commit("0", write.prepareCommit());
+ commit.commit("0", write.prepareCommit());
write.write(GenericRowData.of((Object) null));
- table.newCommit().commit("1", write.prepareCommit());
+ commit.commit("1", write.prepareCommit());
write.write(GenericRowData.of(2));
write.write(GenericRowData.of(3));
write.write(GenericRowData.of((Object) null));
- table.newCommit().commit("2", write.prepareCommit());
+ commit.commit("2", write.prepareCommit());
write.write(GenericRowData.of(4));
write.write(GenericRowData.of(5));
write.write(GenericRowData.of(6));
- table.newCommit().commit("3", write.prepareCommit());
+ commit.commit("3", write.prepareCommit());
write.close();
hiveShell.execute(
@@ -389,20 +394,21 @@ public class TableStoreHiveStorageHandlerITCase {
Collections.emptyList());
TableWrite write = table.newWrite();
+ TableCommit commit = table.newCommit("user");
write.write(
GenericRowData.of(
375, /* 1971-01-11 */
TimestampData.fromLocalDateTime(
LocalDateTime.of(2022, 5, 17, 17, 29, 20))));
- table.newCommit().commit("0", write.prepareCommit());
+ commit.commit("0", write.prepareCommit());
write.write(GenericRowData.of(null, null));
- table.newCommit().commit("1", write.prepareCommit());
+ commit.commit("1", write.prepareCommit());
write.write(GenericRowData.of(376 /* 1971-01-12 */, null));
write.write(
GenericRowData.of(
null,
TimestampData.fromLocalDateTime(LocalDateTime.of(2022, 6, 18, 8, 30, 0))));
- table.newCommit().commit("2", write.prepareCommit());
+ commit.commit("2", write.prepareCommit());
write.close();
hiveShell.execute(
diff --git a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
index 610dea4f..fb68c2d5 100644
--- a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
+++ b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.table.store.RowDataContainer;
import org.apache.flink.table.store.file.data.DataFileMeta;
import org.apache.flink.table.store.file.utils.RecordReader;
import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.sink.TableCommit;
import org.apache.flink.table.store.table.sink.TableWrite;
import org.apache.flink.table.store.table.source.Split;
import org.apache.flink.table.types.logical.LogicalType;
@@ -72,12 +73,13 @@ public class TableStoreRecordReaderTest {
Collections.singletonList("a"));
TableWrite write = table.newWrite();
+ TableCommit commit = table.newCommit("user");
write.write(GenericRowData.of(1L, StringData.fromString("Hi")));
write.write(GenericRowData.of(2L, StringData.fromString("Hello")));
write.write(GenericRowData.of(3L, StringData.fromString("World")));
write.write(GenericRowData.of(1L, StringData.fromString("Hi again")));
write.write(GenericRowData.ofKind(RowKind.DELETE, 2L, StringData.fromString("Hello")));
- table.newCommit().commit("0", write.prepareCommit());
+ commit.commit("0", write.prepareCommit());
Tuple2<RecordReader<RowData>, Long> tuple = read(table, BinaryRowDataUtil.EMPTY_ROW, 0);
TableStoreRecordReader reader = new TableStoreRecordReader(tuple.f0, tuple.f1);
@@ -113,13 +115,14 @@ public class TableStoreRecordReaderTest {
Collections.emptyList());
TableWrite write = table.newWrite();
+ TableCommit commit = table.newCommit("user");
write.write(GenericRowData.of(1, StringData.fromString("Hi")));
write.write(GenericRowData.of(2, StringData.fromString("Hello")));
write.write(GenericRowData.of(3, StringData.fromString("World")));
write.write(GenericRowData.of(1, StringData.fromString("Hi")));
write.write(GenericRowData.ofKind(RowKind.DELETE, 2, StringData.fromString("Hello")));
write.write(GenericRowData.of(1, StringData.fromString("Hi")));
- table.newCommit().commit("0", write.prepareCommit());
+ commit.commit("0", write.prepareCommit());
Tuple2<RecordReader<RowData>, Long> tuple = read(table, BinaryRowDataUtil.EMPTY_ROW, 0);
TableStoreRecordReader reader = new TableStoreRecordReader(tuple.f0, tuple.f1);
diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java
index 14656cb5..680b8dea 100644
--- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java
+++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java
@@ -55,9 +55,9 @@ public class SimpleTableTestHelper {
""));
Configuration conf = Configuration.fromMap(options);
conf.setString("path", path.toString());
- FileStoreTable table = FileStoreTableFactory.create(conf, "user");
+ FileStoreTable table = FileStoreTableFactory.create(conf);
this.writer = table.newWrite();
- this.commit = table.newCommit();
+ this.commit = table.newCommit("user");
}
public void write(RowData row) throws Exception {