You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by cz...@apache.org on 2022/11/08 09:19:18 UTC

[flink-table-store] branch release-0.2 updated: [FLINK-29876] Explicitly throw exception from Table Store sink when unaligned checkpoint is enabled or at least once checkpoint mode is used

This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch release-0.2
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/release-0.2 by this push:
     new 2af8c0a4 [FLINK-29876] Explicitly throw exception from Table Store sink when unaligned checkpoint is enabled or at least once checkpoint mode is used
2af8c0a4 is described below

commit 2af8c0a4603807a1a2d21cfcefb1a6c6dee07a7f
Author: tsreaper <ts...@gmail.com>
AuthorDate: Tue Nov 8 17:19:13 2022 +0800

    [FLINK-29876] Explicitly throw exception from Table Store sink when unaligned checkpoint is enabled or at least once checkpoint mode is used
    
    This closes #362.
---
 docs/content/docs/development/write-table.md       |  2 +-
 .../table/store/connector/sink/StoreSink.java      | 20 ++++++++
 .../table/store/connector/CatalogITCaseBase.java   |  4 +-
 .../CompositePkAndMultiPartitionedTableITCase.java | 19 ++++----
 .../store/connector/FileStoreTableITCase.java      |  8 +++-
 .../table/store/connector/LookupJoinITCase.java    |  4 +-
 .../store/connector/ReadWriteTableITCase.java      | 17 +++----
 .../store/connector/ReadWriteTableTestBase.java    |  5 +-
 .../store/connector/StreamingWarehouseITCase.java  |  4 +-
 .../store/connector/TableEnvironmentTestUtils.java | 54 ++++++++++++++++++++++
 .../table/store/connector/TableStoreTestBase.java  |  3 +-
 .../store/connector/sink/SinkSavepointITCase.java  |  5 +-
 .../table/store/kafka/KafkaTableTestBase.java      |  4 ++
 13 files changed, 120 insertions(+), 29 deletions(-)

diff --git a/docs/content/docs/development/write-table.md b/docs/content/docs/development/write-table.md
index 9bbf0e7e..7c99a751 100644
--- a/docs/content/docs/development/write-table.md
+++ b/docs/content/docs/development/write-table.md
@@ -41,7 +41,7 @@ column_list:
 __IMPORTANT:__ 
 - Checkpointing needs to be enabled when writing to the Table Store in STREAMING mode.
 - `execution.checkpointing.unaligned=true` is not supported when writing to the Table Store in STREAMING mode.
-- `execution.checkpointing.mode=AT_LEAST_ONCE` is not supported when writing to the Table Store in STREAMING mode.
+- `execution.checkpointing.mode=at-least-once` is not supported when writing to the Table Store in STREAMING mode.
 {{< /hint >}}
 
 ## Parallelism
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
index 8613d6e4..c81faeb7 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
@@ -20,9 +20,11 @@ package org.apache.flink.table.store.connector.sink;
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -34,6 +36,7 @@ import org.apache.flink.table.store.file.manifest.ManifestCommittableSerializer;
 import org.apache.flink.table.store.file.operation.Lock;
 import org.apache.flink.table.store.table.FileStoreTable;
 import org.apache.flink.table.store.table.sink.LogSinkFunction;
+import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nullable;
 
@@ -108,6 +111,10 @@ public class StoreSink implements Serializable {
                                         .get(ExecutionOptions.RUNTIME_MODE)
                                 == RuntimeExecutionMode.STREAMING
                         && env.getCheckpointConfig().isCheckpointingEnabled();
+        if (streamingCheckpointEnabled) {
+            assertCheckpointConfiguration(env);
+        }
+
         SingleOutputStreamOperator<?> committed =
                 written.transform(
                                 GLOBAL_COMMITTER_NAME,
@@ -124,4 +131,17 @@ public class StoreSink implements Serializable {
                         .setMaxParallelism(1);
         return committed.addSink(new DiscardingSink<>()).name("end").setParallelism(1);
     }
+
+    private void assertCheckpointConfiguration(StreamExecutionEnvironment env) {
+        Preconditions.checkArgument(
+                !env.getCheckpointConfig().isUnalignedCheckpointsEnabled(),
+                "Table Store sink currently does not support unaligned checkpoints. Please set "
+                        + ExecutionCheckpointingOptions.ENABLE_UNALIGNED.key()
+                        + " to false.");
+        Preconditions.checkArgument(
+                env.getCheckpointConfig().getCheckpointingMode() == CheckpointingMode.EXACTLY_ONCE,
+                "Table Store sink currently only supports EXACTLY_ONCE checkpoint mode. Please set "
+                        + ExecutionCheckpointingOptions.CHECKPOINTING_MODE.key()
+                        + " to exactly-once");
+    }
 }
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CatalogITCaseBase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CatalogITCaseBase.java
index feffad76..c705e21f 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CatalogITCaseBase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CatalogITCaseBase.java
@@ -43,7 +43,9 @@ public abstract class CatalogITCaseBase extends AbstractTestBase {
 
     @Before
     public void before() throws IOException {
-        tEnv = TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
+        tEnv =
+                TableEnvironmentTestUtils.create(
+                        EnvironmentSettings.newInstance().inBatchMode().build());
         tEnv.executeSql(
                 String.format(
                         "CREATE CATALOG TABLE_STORE WITH ("
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CompositePkAndMultiPartitionedTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CompositePkAndMultiPartitionedTableITCase.java
index a0a6b3f6..f0886d05 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CompositePkAndMultiPartitionedTableITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CompositePkAndMultiPartitionedTableITCase.java
@@ -87,7 +87,7 @@ public class CompositePkAndMultiPartitionedTableITCase extends ReadWriteTableTes
 
         // test streaming read
         final StreamTableEnvironment streamTableEnv =
-                StreamTableEnvironment.create(buildStreamEnv());
+                TableEnvironmentTestUtils.create(buildStreamEnv());
         registerTable(streamTableEnv, managedTable);
         BlockingIterator<Row, Row> streamIter =
                 collectAndCheck(
@@ -273,7 +273,7 @@ public class CompositePkAndMultiPartitionedTableITCase extends ReadWriteTableTes
 
         // test streaming read
         final StreamTableEnvironment streamTableEnv =
-                StreamTableEnvironment.create(buildStreamEnv());
+                TableEnvironmentTestUtils.create(buildStreamEnv());
         registerTable(streamTableEnv, managedTable);
         BlockingIterator<Row, Row> streamIter =
                 collectAndCheck(
@@ -402,7 +402,7 @@ public class CompositePkAndMultiPartitionedTableITCase extends ReadWriteTableTes
 
         // test streaming read
         final StreamTableEnvironment streamTableEnv =
-                StreamTableEnvironment.create(buildStreamEnv());
+                TableEnvironmentTestUtils.create(buildStreamEnv());
         registerTable(streamTableEnv, managedTable);
         BlockingIterator<Row, Row> streamIter =
                 collectAndCheck(
@@ -495,7 +495,7 @@ public class CompositePkAndMultiPartitionedTableITCase extends ReadWriteTableTes
 
         // test streaming read
         final StreamTableEnvironment streamTableEnv =
-                StreamTableEnvironment.create(buildStreamEnv());
+                TableEnvironmentTestUtils.create(buildStreamEnv());
         registerTable(streamTableEnv, managedTable);
         BlockingIterator<Row, Row> streamIter =
                 collectAndCheck(
@@ -598,7 +598,7 @@ public class CompositePkAndMultiPartitionedTableITCase extends ReadWriteTableTes
 
         // test streaming read
         final StreamTableEnvironment streamTableEnv =
-                StreamTableEnvironment.create(buildStreamEnv());
+                TableEnvironmentTestUtils.create(buildStreamEnv());
         registerTable(streamTableEnv, managedTable);
         BlockingIterator<Row, Row> streamIter =
                 collectAndCheck(
@@ -737,7 +737,8 @@ public class CompositePkAndMultiPartitionedTableITCase extends ReadWriteTableTes
 
         // batch read to check data refresh
         final StreamTableEnvironment batchTableEnv =
-                StreamTableEnvironment.create(buildBatchEnv(), EnvironmentSettings.inBatchMode());
+                TableEnvironmentTestUtils.create(
+                        buildBatchEnv(), EnvironmentSettings.inBatchMode());
         registerTable(batchTableEnv, managedTable);
         collectAndCheck(
                 batchTableEnv,
@@ -842,7 +843,8 @@ public class CompositePkAndMultiPartitionedTableITCase extends ReadWriteTableTes
 
         // batch read to check data refresh
         final StreamTableEnvironment batchTableEnv =
-                StreamTableEnvironment.create(buildBatchEnv(), EnvironmentSettings.inBatchMode());
+                TableEnvironmentTestUtils.create(
+                        buildBatchEnv(), EnvironmentSettings.inBatchMode());
         registerTable(batchTableEnv, managedTable);
         collectAndCheck(
                 batchTableEnv,
@@ -923,7 +925,8 @@ public class CompositePkAndMultiPartitionedTableITCase extends ReadWriteTableTes
 
         // batch read to check data refresh
         final StreamTableEnvironment batchTableEnv =
-                StreamTableEnvironment.create(buildBatchEnv(), EnvironmentSettings.inBatchMode());
+                TableEnvironmentTestUtils.create(
+                        buildBatchEnv(), EnvironmentSettings.inBatchMode());
         registerTable(batchTableEnv, managedTable);
         collectAndCheck(
                 batchTableEnv,
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreTableITCase.java
index 754d1354..799c36cb 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreTableITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreTableITCase.java
@@ -65,8 +65,12 @@ public abstract class FileStoreTableITCase extends AbstractTestBase {
 
     @Before
     public void before() throws IOException {
-        bEnv = TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
-        sEnv = TableEnvironment.create(EnvironmentSettings.newInstance().inStreamingMode().build());
+        bEnv =
+                TableEnvironmentTestUtils.create(
+                        EnvironmentSettings.newInstance().inBatchMode().build());
+        sEnv =
+                TableEnvironmentTestUtils.create(
+                        EnvironmentSettings.newInstance().inStreamingMode().build());
         sEnv.getConfig().getConfiguration().set(CHECKPOINTING_INTERVAL, Duration.ofMillis(100));
         path = TEMPORARY_FOLDER.newFolder().toURI().toString();
         prepareConfiguration(bEnv, path);
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/LookupJoinITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/LookupJoinITCase.java
index 6b06aef0..9b38dbc3 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/LookupJoinITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/LookupJoinITCase.java
@@ -42,7 +42,9 @@ public class LookupJoinITCase extends AbstractTestBase {
 
     @Before
     public void before() throws Exception {
-        env = TableEnvironment.create(EnvironmentSettings.newInstance().inStreamingMode().build());
+        env =
+                TableEnvironmentTestUtils.create(
+                        EnvironmentSettings.newInstance().inStreamingMode().build());
         env.getConfig().getConfiguration().set(CHECKPOINTING_INTERVAL, Duration.ofMillis(100));
         env.getConfig()
                 .getConfiguration()
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
index 0cc8fc07..b4a56c39 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
@@ -92,7 +92,7 @@ public class ReadWriteTableITCase extends ReadWriteTableTestBase {
 
         // test streaming read
         final StreamTableEnvironment streamTableEnv =
-                StreamTableEnvironment.create(buildStreamEnv());
+                TableEnvironmentTestUtils.create(buildStreamEnv());
         registerTable(streamTableEnv, managedTable);
         BlockingIterator<Row, Row> streamIter =
                 collectAndCheck(
@@ -211,7 +211,7 @@ public class ReadWriteTableITCase extends ReadWriteTableTestBase {
 
         // test streaming read
         final StreamTableEnvironment streamTableEnv =
-                StreamTableEnvironment.create(buildStreamEnv());
+                TableEnvironmentTestUtils.create(buildStreamEnv());
         registerTable(streamTableEnv, managedTable);
         collectAndCheck(
                         streamTableEnv,
@@ -428,7 +428,8 @@ public class ReadWriteTableITCase extends ReadWriteTableTestBase {
 
         // batch read to check data refresh
         final StreamTableEnvironment batchTableEnv =
-                StreamTableEnvironment.create(buildBatchEnv(), EnvironmentSettings.inBatchMode());
+                TableEnvironmentTestUtils.create(
+                        buildBatchEnv(), EnvironmentSettings.inBatchMode());
         registerTable(batchTableEnv, managedTable);
         collectAndCheck(
                 batchTableEnv,
@@ -1207,7 +1208,7 @@ public class ReadWriteTableITCase extends ReadWriteTableTestBase {
     @Test
     public void testQueryContainsDefaultFieldName() throws Exception {
         rootPath = TEMPORARY_FOLDER.newFolder().getPath();
-        tEnv = StreamTableEnvironment.create(buildBatchEnv(), EnvironmentSettings.inBatchMode());
+        tEnv = TableEnvironmentTestUtils.create(buildBatchEnv(), EnvironmentSettings.inBatchMode());
         String id = registerData(Collections.singletonList(changelogRow("+I", 1, "abc")));
         tEnv.executeSql(
                 String.format(
@@ -1233,7 +1234,7 @@ public class ReadWriteTableITCase extends ReadWriteTableTestBase {
     @Test
     public void testLike() throws Exception {
         rootPath = TEMPORARY_FOLDER.newFolder().getPath();
-        tEnv = StreamTableEnvironment.create(buildBatchEnv(), EnvironmentSettings.inBatchMode());
+        tEnv = TableEnvironmentTestUtils.create(buildBatchEnv(), EnvironmentSettings.inBatchMode());
         List<Row> input =
                 Arrays.asList(
                         changelogRow("+I", 1, "test_1"),
@@ -1334,7 +1335,7 @@ public class ReadWriteTableITCase extends ReadWriteTableTestBase {
     @Test
     public void testIn() throws Exception {
         rootPath = TEMPORARY_FOLDER.newFolder().getPath();
-        tEnv = StreamTableEnvironment.create(buildBatchEnv(), EnvironmentSettings.inBatchMode());
+        tEnv = TableEnvironmentTestUtils.create(buildBatchEnv(), EnvironmentSettings.inBatchMode());
         List<Row> input =
                 Arrays.asList(
                         changelogRow("+I", 1, "aaa"),
@@ -1415,7 +1416,7 @@ public class ReadWriteTableITCase extends ReadWriteTableTestBase {
     @Test
     public void testChangeBucketNumber() throws Exception {
         rootPath = TEMPORARY_FOLDER.newFolder().getPath();
-        tEnv = StreamTableEnvironment.create(buildBatchEnv(), EnvironmentSettings.inBatchMode());
+        tEnv = TableEnvironmentTestUtils.create(buildBatchEnv(), EnvironmentSettings.inBatchMode());
         tEnv.executeSql(
                 String.format(
                         "CREATE TABLE IF NOT EXISTS rates (\n"
@@ -1483,7 +1484,7 @@ public class ReadWriteTableITCase extends ReadWriteTableTestBase {
     public void testStreamingInsertOverwrite() throws Exception {
         rootPath = TEMPORARY_FOLDER.newFolder().getPath();
         tEnv =
-                StreamTableEnvironment.create(
+                TableEnvironmentTestUtils.create(
                         buildStreamEnv(), EnvironmentSettings.inStreamingMode());
         tEnv.executeSql(
                 String.format(
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableTestBase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableTestBase.java
index 3c9fcb76..1da0c1ed 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableTestBase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableTestBase.java
@@ -313,7 +313,7 @@ public class ReadWriteTableTestBase extends KafkaTableTestBase {
             env = buildBatchEnv();
             builder.inBatchMode();
         }
-        tEnv = StreamTableEnvironment.create(env, builder.build());
+        tEnv = TableEnvironmentTestUtils.create(env, builder.build());
         tEnv.executeSql(helperTableDdl);
 
         String managedTableDdl;
@@ -406,7 +406,8 @@ public class ReadWriteTableTestBase extends KafkaTableTestBase {
 
     protected void prepareEnvAndOverwrite(String managedTable, String query) throws Exception {
         final StreamTableEnvironment batchEnv =
-                StreamTableEnvironment.create(buildBatchEnv(), EnvironmentSettings.inBatchMode());
+                TableEnvironmentTestUtils.create(
+                        buildBatchEnv(), EnvironmentSettings.inBatchMode());
         registerTable(batchEnv, managedTable);
         batchEnv.executeSql(query).await();
     }
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/StreamingWarehouseITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/StreamingWarehouseITCase.java
index faa8eef7..7516df9d 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/StreamingWarehouseITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/StreamingWarehouseITCase.java
@@ -36,9 +36,9 @@ import static org.assertj.core.api.Assertions.assertThat;
 public class StreamingWarehouseITCase extends ReadWriteTableTestBase {
 
     private final StreamTableEnvironment streamTableEnv =
-            StreamTableEnvironment.create(buildStreamEnv(1));
+            TableEnvironmentTestUtils.create(buildStreamEnv(1));
     private final StreamTableEnvironment batchTableEnv =
-            StreamTableEnvironment.create(buildBatchEnv(1), EnvironmentSettings.inBatchMode());
+            TableEnvironmentTestUtils.create(buildBatchEnv(1), EnvironmentSettings.inBatchMode());
 
     @Test
     public void testUserStory() throws Exception {
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableEnvironmentTestUtils.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableEnvironmentTestUtils.java
new file mode 100644
index 00000000..f8588ec2
--- /dev/null
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableEnvironmentTestUtils.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+
+/** Utility class for creating {@link TableEnvironment} in tests. */
+public class TableEnvironmentTestUtils {
+
+    public static TableEnvironment create(EnvironmentSettings settings) {
+        TableEnvironment tEnv = TableEnvironment.create(settings);
+        disableUnalignedCheckpoint(tEnv);
+        return tEnv;
+    }
+
+    public static StreamTableEnvironment create(StreamExecutionEnvironment env) {
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+        disableUnalignedCheckpoint(tEnv);
+        return tEnv;
+    }
+
+    public static StreamTableEnvironment create(
+            StreamExecutionEnvironment env, EnvironmentSettings settings) {
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
+        disableUnalignedCheckpoint(tEnv);
+        return tEnv;
+    }
+
+    private static void disableUnalignedCheckpoint(TableEnvironment tEnv) {
+        tEnv.getConfig()
+                .getConfiguration()
+                .set(ExecutionCheckpointingOptions.ENABLE_UNALIGNED, false);
+    }
+}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java
index f8728183..0d5868fa 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.Schema;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.api.internal.TableEnvironmentImpl;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.Column;
@@ -91,7 +90,7 @@ public abstract class TableStoreTestBase extends KafkaTableTestBase {
             env.enableCheckpointing(100);
             builder.inStreamingMode();
         }
-        tEnv = StreamTableEnvironment.create(env, builder.build());
+        tEnv = TableEnvironmentTestUtils.create(env, builder.build());
         ((TableEnvironmentImpl) tEnv)
                 .getCatalogManager()
                 .registerCatalog(
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/SinkSavepointITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/SinkSavepointITCase.java
index 1aa4546f..ca4614da 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/SinkSavepointITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/SinkSavepointITCase.java
@@ -33,6 +33,7 @@ import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.store.connector.TableEnvironmentTestUtils;
 import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
 import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.types.Row;
@@ -133,7 +134,7 @@ public class SinkSavepointITCase extends AbstractTestBase {
 
         EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
-        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
+        StreamTableEnvironment tEnv = TableEnvironmentTestUtils.create(env, settings);
         tEnv.getConfig()
                 .getConfiguration()
                 .set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofMillis(500));
@@ -205,7 +206,7 @@ public class SinkSavepointITCase extends AbstractTestBase {
 
     private void checkRecoverFromSavepointResult(String failingPath) throws Exception {
         EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
-        TableEnvironment tEnv = TableEnvironment.create(settings);
+        TableEnvironment tEnv = TableEnvironmentTestUtils.create(settings);
         // no failure should occur when checking for answer
         FailingAtomicRenameFileSystem.reset(failingName, 0, 1);
 
diff --git a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaTableTestBase.java b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaTableTestBase.java
index 1284fcff..aff25de2 100644
--- a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaTableTestBase.java
+++ b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaTableTestBase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.store.kafka;
 
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.test.util.AbstractTestBase;
@@ -95,6 +96,9 @@ public abstract class KafkaTableTestBase extends AbstractTestBase {
         env = StreamExecutionEnvironment.getExecutionEnvironment();
         tEnv = StreamTableEnvironment.create(env);
         env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+        tEnv.getConfig()
+                .getConfiguration()
+                .set(ExecutionCheckpointingOptions.ENABLE_UNALIGNED, false);
 
         // Probe Kafka broker status per 30 seconds
         scheduleTimeoutLogger(