You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by cz...@apache.org on 2022/11/08 09:19:18 UTC
[flink-table-store] branch release-0.2 updated: [FLINK-29876] Explicitly throw exception from Table Store sink when unaligned checkpoint is enabled or at least once checkpoint mode is used
This is an automated email from the ASF dual-hosted git repository.
czweng pushed a commit to branch release-0.2
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/release-0.2 by this push:
new 2af8c0a4 [FLINK-29876] Explicitly throw exception from Table Store sink when unaligned checkpoint is enabled or at least once checkpoint mode is used
2af8c0a4 is described below
commit 2af8c0a4603807a1a2d21cfcefb1a6c6dee07a7f
Author: tsreaper <ts...@gmail.com>
AuthorDate: Tue Nov 8 17:19:13 2022 +0800
[FLINK-29876] Explicitly throw exception from Table Store sink when unaligned checkpoint is enabled or at least once checkpoint mode is used
This closes #362.
---
docs/content/docs/development/write-table.md | 2 +-
.../table/store/connector/sink/StoreSink.java | 20 ++++++++
.../table/store/connector/CatalogITCaseBase.java | 4 +-
.../CompositePkAndMultiPartitionedTableITCase.java | 19 ++++----
.../store/connector/FileStoreTableITCase.java | 8 +++-
.../table/store/connector/LookupJoinITCase.java | 4 +-
.../store/connector/ReadWriteTableITCase.java | 17 +++----
.../store/connector/ReadWriteTableTestBase.java | 5 +-
.../store/connector/StreamingWarehouseITCase.java | 4 +-
.../store/connector/TableEnvironmentTestUtils.java | 54 ++++++++++++++++++++++
.../table/store/connector/TableStoreTestBase.java | 3 +-
.../store/connector/sink/SinkSavepointITCase.java | 5 +-
.../table/store/kafka/KafkaTableTestBase.java | 4 ++
13 files changed, 120 insertions(+), 29 deletions(-)
diff --git a/docs/content/docs/development/write-table.md b/docs/content/docs/development/write-table.md
index 9bbf0e7e..7c99a751 100644
--- a/docs/content/docs/development/write-table.md
+++ b/docs/content/docs/development/write-table.md
@@ -41,7 +41,7 @@ column_list:
__IMPORTANT:__
- Checkpointing needs to be enabled when writing to the Table Store in STREAMING mode.
- `execution.checkpointing.unaligned=true` is not supported when writing to the Table Store in STREAMING mode.
-- `execution.checkpointing.mode=AT_LEAST_ONCE` is not supported when writing to the Table Store in STREAMING mode.
+- `execution.checkpointing.mode=at-least-once` is not supported when writing to the Table Store in STREAMING mode.
{{< /hint >}}
## Parallelism
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
index 8613d6e4..c81faeb7 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
@@ -20,9 +20,11 @@ package org.apache.flink.table.store.connector.sink;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -34,6 +36,7 @@ import org.apache.flink.table.store.file.manifest.ManifestCommittableSerializer;
import org.apache.flink.table.store.file.operation.Lock;
import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.table.store.table.sink.LogSinkFunction;
+import org.apache.flink.util.Preconditions;
import javax.annotation.Nullable;
@@ -108,6 +111,10 @@ public class StoreSink implements Serializable {
.get(ExecutionOptions.RUNTIME_MODE)
== RuntimeExecutionMode.STREAMING
&& env.getCheckpointConfig().isCheckpointingEnabled();
+ if (streamingCheckpointEnabled) {
+ assertCheckpointConfiguration(env);
+ }
+
SingleOutputStreamOperator<?> committed =
written.transform(
GLOBAL_COMMITTER_NAME,
@@ -124,4 +131,17 @@ public class StoreSink implements Serializable {
.setMaxParallelism(1);
return committed.addSink(new DiscardingSink<>()).name("end").setParallelism(1);
}
+
+ private void assertCheckpointConfiguration(StreamExecutionEnvironment env) {
+ Preconditions.checkArgument(
+ !env.getCheckpointConfig().isUnalignedCheckpointsEnabled(),
+ "Table Store sink currently does not support unaligned checkpoints. Please set "
+ + ExecutionCheckpointingOptions.ENABLE_UNALIGNED.key()
+ + " to false.");
+ Preconditions.checkArgument(
+ env.getCheckpointConfig().getCheckpointingMode() == CheckpointingMode.EXACTLY_ONCE,
+ "Table Store sink currently only supports EXACTLY_ONCE checkpoint mode. Please set "
+ + ExecutionCheckpointingOptions.CHECKPOINTING_MODE.key()
+ + " to exactly-once");
+ }
}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CatalogITCaseBase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CatalogITCaseBase.java
index feffad76..c705e21f 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CatalogITCaseBase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CatalogITCaseBase.java
@@ -43,7 +43,9 @@ public abstract class CatalogITCaseBase extends AbstractTestBase {
@Before
public void before() throws IOException {
- tEnv = TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
+ tEnv =
+ TableEnvironmentTestUtils.create(
+ EnvironmentSettings.newInstance().inBatchMode().build());
tEnv.executeSql(
String.format(
"CREATE CATALOG TABLE_STORE WITH ("
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CompositePkAndMultiPartitionedTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CompositePkAndMultiPartitionedTableITCase.java
index a0a6b3f6..f0886d05 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CompositePkAndMultiPartitionedTableITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CompositePkAndMultiPartitionedTableITCase.java
@@ -87,7 +87,7 @@ public class CompositePkAndMultiPartitionedTableITCase extends ReadWriteTableTes
// test streaming read
final StreamTableEnvironment streamTableEnv =
- StreamTableEnvironment.create(buildStreamEnv());
+ TableEnvironmentTestUtils.create(buildStreamEnv());
registerTable(streamTableEnv, managedTable);
BlockingIterator<Row, Row> streamIter =
collectAndCheck(
@@ -273,7 +273,7 @@ public class CompositePkAndMultiPartitionedTableITCase extends ReadWriteTableTes
// test streaming read
final StreamTableEnvironment streamTableEnv =
- StreamTableEnvironment.create(buildStreamEnv());
+ TableEnvironmentTestUtils.create(buildStreamEnv());
registerTable(streamTableEnv, managedTable);
BlockingIterator<Row, Row> streamIter =
collectAndCheck(
@@ -402,7 +402,7 @@ public class CompositePkAndMultiPartitionedTableITCase extends ReadWriteTableTes
// test streaming read
final StreamTableEnvironment streamTableEnv =
- StreamTableEnvironment.create(buildStreamEnv());
+ TableEnvironmentTestUtils.create(buildStreamEnv());
registerTable(streamTableEnv, managedTable);
BlockingIterator<Row, Row> streamIter =
collectAndCheck(
@@ -495,7 +495,7 @@ public class CompositePkAndMultiPartitionedTableITCase extends ReadWriteTableTes
// test streaming read
final StreamTableEnvironment streamTableEnv =
- StreamTableEnvironment.create(buildStreamEnv());
+ TableEnvironmentTestUtils.create(buildStreamEnv());
registerTable(streamTableEnv, managedTable);
BlockingIterator<Row, Row> streamIter =
collectAndCheck(
@@ -598,7 +598,7 @@ public class CompositePkAndMultiPartitionedTableITCase extends ReadWriteTableTes
// test streaming read
final StreamTableEnvironment streamTableEnv =
- StreamTableEnvironment.create(buildStreamEnv());
+ TableEnvironmentTestUtils.create(buildStreamEnv());
registerTable(streamTableEnv, managedTable);
BlockingIterator<Row, Row> streamIter =
collectAndCheck(
@@ -737,7 +737,8 @@ public class CompositePkAndMultiPartitionedTableITCase extends ReadWriteTableTes
// batch read to check data refresh
final StreamTableEnvironment batchTableEnv =
- StreamTableEnvironment.create(buildBatchEnv(), EnvironmentSettings.inBatchMode());
+ TableEnvironmentTestUtils.create(
+ buildBatchEnv(), EnvironmentSettings.inBatchMode());
registerTable(batchTableEnv, managedTable);
collectAndCheck(
batchTableEnv,
@@ -842,7 +843,8 @@ public class CompositePkAndMultiPartitionedTableITCase extends ReadWriteTableTes
// batch read to check data refresh
final StreamTableEnvironment batchTableEnv =
- StreamTableEnvironment.create(buildBatchEnv(), EnvironmentSettings.inBatchMode());
+ TableEnvironmentTestUtils.create(
+ buildBatchEnv(), EnvironmentSettings.inBatchMode());
registerTable(batchTableEnv, managedTable);
collectAndCheck(
batchTableEnv,
@@ -923,7 +925,8 @@ public class CompositePkAndMultiPartitionedTableITCase extends ReadWriteTableTes
// batch read to check data refresh
final StreamTableEnvironment batchTableEnv =
- StreamTableEnvironment.create(buildBatchEnv(), EnvironmentSettings.inBatchMode());
+ TableEnvironmentTestUtils.create(
+ buildBatchEnv(), EnvironmentSettings.inBatchMode());
registerTable(batchTableEnv, managedTable);
collectAndCheck(
batchTableEnv,
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreTableITCase.java
index 754d1354..799c36cb 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreTableITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreTableITCase.java
@@ -65,8 +65,12 @@ public abstract class FileStoreTableITCase extends AbstractTestBase {
@Before
public void before() throws IOException {
- bEnv = TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
- sEnv = TableEnvironment.create(EnvironmentSettings.newInstance().inStreamingMode().build());
+ bEnv =
+ TableEnvironmentTestUtils.create(
+ EnvironmentSettings.newInstance().inBatchMode().build());
+ sEnv =
+ TableEnvironmentTestUtils.create(
+ EnvironmentSettings.newInstance().inStreamingMode().build());
sEnv.getConfig().getConfiguration().set(CHECKPOINTING_INTERVAL, Duration.ofMillis(100));
path = TEMPORARY_FOLDER.newFolder().toURI().toString();
prepareConfiguration(bEnv, path);
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/LookupJoinITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/LookupJoinITCase.java
index 6b06aef0..9b38dbc3 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/LookupJoinITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/LookupJoinITCase.java
@@ -42,7 +42,9 @@ public class LookupJoinITCase extends AbstractTestBase {
@Before
public void before() throws Exception {
- env = TableEnvironment.create(EnvironmentSettings.newInstance().inStreamingMode().build());
+ env =
+ TableEnvironmentTestUtils.create(
+ EnvironmentSettings.newInstance().inStreamingMode().build());
env.getConfig().getConfiguration().set(CHECKPOINTING_INTERVAL, Duration.ofMillis(100));
env.getConfig()
.getConfiguration()
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
index 0cc8fc07..b4a56c39 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
@@ -92,7 +92,7 @@ public class ReadWriteTableITCase extends ReadWriteTableTestBase {
// test streaming read
final StreamTableEnvironment streamTableEnv =
- StreamTableEnvironment.create(buildStreamEnv());
+ TableEnvironmentTestUtils.create(buildStreamEnv());
registerTable(streamTableEnv, managedTable);
BlockingIterator<Row, Row> streamIter =
collectAndCheck(
@@ -211,7 +211,7 @@ public class ReadWriteTableITCase extends ReadWriteTableTestBase {
// test streaming read
final StreamTableEnvironment streamTableEnv =
- StreamTableEnvironment.create(buildStreamEnv());
+ TableEnvironmentTestUtils.create(buildStreamEnv());
registerTable(streamTableEnv, managedTable);
collectAndCheck(
streamTableEnv,
@@ -428,7 +428,8 @@ public class ReadWriteTableITCase extends ReadWriteTableTestBase {
// batch read to check data refresh
final StreamTableEnvironment batchTableEnv =
- StreamTableEnvironment.create(buildBatchEnv(), EnvironmentSettings.inBatchMode());
+ TableEnvironmentTestUtils.create(
+ buildBatchEnv(), EnvironmentSettings.inBatchMode());
registerTable(batchTableEnv, managedTable);
collectAndCheck(
batchTableEnv,
@@ -1207,7 +1208,7 @@ public class ReadWriteTableITCase extends ReadWriteTableTestBase {
@Test
public void testQueryContainsDefaultFieldName() throws Exception {
rootPath = TEMPORARY_FOLDER.newFolder().getPath();
- tEnv = StreamTableEnvironment.create(buildBatchEnv(), EnvironmentSettings.inBatchMode());
+ tEnv = TableEnvironmentTestUtils.create(buildBatchEnv(), EnvironmentSettings.inBatchMode());
String id = registerData(Collections.singletonList(changelogRow("+I", 1, "abc")));
tEnv.executeSql(
String.format(
@@ -1233,7 +1234,7 @@ public class ReadWriteTableITCase extends ReadWriteTableTestBase {
@Test
public void testLike() throws Exception {
rootPath = TEMPORARY_FOLDER.newFolder().getPath();
- tEnv = StreamTableEnvironment.create(buildBatchEnv(), EnvironmentSettings.inBatchMode());
+ tEnv = TableEnvironmentTestUtils.create(buildBatchEnv(), EnvironmentSettings.inBatchMode());
List<Row> input =
Arrays.asList(
changelogRow("+I", 1, "test_1"),
@@ -1334,7 +1335,7 @@ public class ReadWriteTableITCase extends ReadWriteTableTestBase {
@Test
public void testIn() throws Exception {
rootPath = TEMPORARY_FOLDER.newFolder().getPath();
- tEnv = StreamTableEnvironment.create(buildBatchEnv(), EnvironmentSettings.inBatchMode());
+ tEnv = TableEnvironmentTestUtils.create(buildBatchEnv(), EnvironmentSettings.inBatchMode());
List<Row> input =
Arrays.asList(
changelogRow("+I", 1, "aaa"),
@@ -1415,7 +1416,7 @@ public class ReadWriteTableITCase extends ReadWriteTableTestBase {
@Test
public void testChangeBucketNumber() throws Exception {
rootPath = TEMPORARY_FOLDER.newFolder().getPath();
- tEnv = StreamTableEnvironment.create(buildBatchEnv(), EnvironmentSettings.inBatchMode());
+ tEnv = TableEnvironmentTestUtils.create(buildBatchEnv(), EnvironmentSettings.inBatchMode());
tEnv.executeSql(
String.format(
"CREATE TABLE IF NOT EXISTS rates (\n"
@@ -1483,7 +1484,7 @@ public class ReadWriteTableITCase extends ReadWriteTableTestBase {
public void testStreamingInsertOverwrite() throws Exception {
rootPath = TEMPORARY_FOLDER.newFolder().getPath();
tEnv =
- StreamTableEnvironment.create(
+ TableEnvironmentTestUtils.create(
buildStreamEnv(), EnvironmentSettings.inStreamingMode());
tEnv.executeSql(
String.format(
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableTestBase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableTestBase.java
index 3c9fcb76..1da0c1ed 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableTestBase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableTestBase.java
@@ -313,7 +313,7 @@ public class ReadWriteTableTestBase extends KafkaTableTestBase {
env = buildBatchEnv();
builder.inBatchMode();
}
- tEnv = StreamTableEnvironment.create(env, builder.build());
+ tEnv = TableEnvironmentTestUtils.create(env, builder.build());
tEnv.executeSql(helperTableDdl);
String managedTableDdl;
@@ -406,7 +406,8 @@ public class ReadWriteTableTestBase extends KafkaTableTestBase {
protected void prepareEnvAndOverwrite(String managedTable, String query) throws Exception {
final StreamTableEnvironment batchEnv =
- StreamTableEnvironment.create(buildBatchEnv(), EnvironmentSettings.inBatchMode());
+ TableEnvironmentTestUtils.create(
+ buildBatchEnv(), EnvironmentSettings.inBatchMode());
registerTable(batchEnv, managedTable);
batchEnv.executeSql(query).await();
}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/StreamingWarehouseITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/StreamingWarehouseITCase.java
index faa8eef7..7516df9d 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/StreamingWarehouseITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/StreamingWarehouseITCase.java
@@ -36,9 +36,9 @@ import static org.assertj.core.api.Assertions.assertThat;
public class StreamingWarehouseITCase extends ReadWriteTableTestBase {
private final StreamTableEnvironment streamTableEnv =
- StreamTableEnvironment.create(buildStreamEnv(1));
+ TableEnvironmentTestUtils.create(buildStreamEnv(1));
private final StreamTableEnvironment batchTableEnv =
- StreamTableEnvironment.create(buildBatchEnv(1), EnvironmentSettings.inBatchMode());
+ TableEnvironmentTestUtils.create(buildBatchEnv(1), EnvironmentSettings.inBatchMode());
@Test
public void testUserStory() throws Exception {
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableEnvironmentTestUtils.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableEnvironmentTestUtils.java
new file mode 100644
index 00000000..f8588ec2
--- /dev/null
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableEnvironmentTestUtils.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+
+/** Utility class for creating {@link TableEnvironment} in tests. */
+public class TableEnvironmentTestUtils {
+
+ public static TableEnvironment create(EnvironmentSettings settings) {
+ TableEnvironment tEnv = TableEnvironment.create(settings);
+ disableUnalignedCheckpoint(tEnv);
+ return tEnv;
+ }
+
+ public static StreamTableEnvironment create(StreamExecutionEnvironment env) {
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+ disableUnalignedCheckpoint(tEnv);
+ return tEnv;
+ }
+
+ public static StreamTableEnvironment create(
+ StreamExecutionEnvironment env, EnvironmentSettings settings) {
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
+ disableUnalignedCheckpoint(tEnv);
+ return tEnv;
+ }
+
+ private static void disableUnalignedCheckpoint(TableEnvironment tEnv) {
+ tEnv.getConfig()
+ .getConfiguration()
+ .set(ExecutionCheckpointingOptions.ENABLE_UNALIGNED, false);
+ }
+}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java
index f8728183..0d5868fa 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Schema;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.Column;
@@ -91,7 +90,7 @@ public abstract class TableStoreTestBase extends KafkaTableTestBase {
env.enableCheckpointing(100);
builder.inStreamingMode();
}
- tEnv = StreamTableEnvironment.create(env, builder.build());
+ tEnv = TableEnvironmentTestUtils.create(env, builder.build());
((TableEnvironmentImpl) tEnv)
.getCatalogManager()
.registerCatalog(
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/SinkSavepointITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/SinkSavepointITCase.java
index 1aa4546f..ca4614da 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/SinkSavepointITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/SinkSavepointITCase.java
@@ -33,6 +33,7 @@ import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.store.connector.TableEnvironmentTestUtils;
import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.types.Row;
@@ -133,7 +134,7 @@ public class SinkSavepointITCase extends AbstractTestBase {
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
- StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
+ StreamTableEnvironment tEnv = TableEnvironmentTestUtils.create(env, settings);
tEnv.getConfig()
.getConfiguration()
.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofMillis(500));
@@ -205,7 +206,7 @@ public class SinkSavepointITCase extends AbstractTestBase {
private void checkRecoverFromSavepointResult(String failingPath) throws Exception {
EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
- TableEnvironment tEnv = TableEnvironment.create(settings);
+ TableEnvironment tEnv = TableEnvironmentTestUtils.create(settings);
// no failure should occur when checking for answer
FailingAtomicRenameFileSystem.reset(failingName, 0, 1);
diff --git a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaTableTestBase.java b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaTableTestBase.java
index 1284fcff..aff25de2 100644
--- a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaTableTestBase.java
+++ b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaTableTestBase.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.store.kafka;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.test.util.AbstractTestBase;
@@ -95,6 +96,9 @@ public abstract class KafkaTableTestBase extends AbstractTestBase {
env = StreamExecutionEnvironment.getExecutionEnvironment();
tEnv = StreamTableEnvironment.create(env);
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+ tEnv.getConfig()
+ .getConfiguration()
+ .set(ExecutionCheckpointingOptions.ENABLE_UNALIGNED, false);
// Probe Kafka broker status per 30 seconds
scheduleTimeoutLogger(