You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2023/01/09 06:41:20 UTC

[flink-table-store] branch master updated: [FLINK-30602] Remove FileStoreTableITCase in table store

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new b27fa51e [FLINK-30602] Remove FileStoreTableITCase in table store
b27fa51e is described below

commit b27fa51ed05bca3596ef1ba5131e97921ec01e33
Author: shammon <zj...@gmail.com>
AuthorDate: Mon Jan 9 14:41:16 2023 +0800

    [FLINK-30602] Remove FileStoreTableITCase in table store
    
    This closes #470
---
 .../store/connector/AppendOnlyTableITCase.java     |   8 +-
 .../store/connector/BatchFileStoreITCase.java      |   2 +-
 .../table/store/connector/CatalogITCaseBase.java   |  38 ++++-
 .../store/connector/ContinuousFileStoreITCase.java |   7 +-
 .../store/connector/FileStoreTableITCase.java      | 160 ---------------------
 .../store/connector/ForceCompactionITCase.java     |   8 +-
 .../store/connector/PreAggregationITCase.java      |  26 ++--
 .../table/store/connector/RescaleBucketITCase.java |  51 +++----
 8 files changed, 80 insertions(+), 220 deletions(-)

diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AppendOnlyTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AppendOnlyTableITCase.java
index cfee8bd2..354c7a5f 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AppendOnlyTableITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AppendOnlyTableITCase.java
@@ -34,7 +34,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Test case for append-only managed table. */
-public class AppendOnlyTableITCase extends FileStoreTableITCase {
+public class AppendOnlyTableITCase extends CatalogITCaseBase {
 
     @Test
     public void testCreateTableWithPrimaryKey() {
@@ -103,7 +103,7 @@ public class AppendOnlyTableITCase extends FileStoreTableITCase {
 
         String id = TestValuesTableFactory.registerData(input);
         batchSql(
-                "CREATE TABLE source (id INT, data STRING) WITH ('connector'='values', 'bounded'='true', 'data-id'='%s')",
+                "CREATE TEMPORARY TABLE source (id INT, data STRING) WITH ('connector'='values', 'bounded'='true', 'data-id'='%s')",
                 id);
 
         batchSql("INSERT INTO append_table SELECT * FROM source");
@@ -214,7 +214,7 @@ public class AppendOnlyTableITCase extends FileStoreTableITCase {
 
         String id = TestValuesTableFactory.registerData(input);
         batchSql(
-                "CREATE TABLE source (id INT, data STRING) WITH ('connector'='values', 'bounded'='true', 'data-id'='%s')",
+                "CREATE TEMPORARY TABLE source (id INT, data STRING) WITH ('connector'='values', 'bounded'='true', 'data-id'='%s')",
                 id);
 
         assertThatThrownBy(() -> batchSql("INSERT INTO append_table SELECT * FROM source"))
@@ -225,7 +225,7 @@ public class AppendOnlyTableITCase extends FileStoreTableITCase {
     private void assertAutoCompaction(
             String sql, long expectedSnapshotId, Snapshot.CommitKind expectedCommitKind) {
         batchSql(sql);
-        Snapshot snapshot = findLatestSnapshot("append_table", true);
+        Snapshot snapshot = findLatestSnapshot("append_table");
         assertThat(snapshot.id()).isEqualTo(expectedSnapshotId);
         assertThat(snapshot.commitKind()).isEqualTo(expectedCommitKind);
     }
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/BatchFileStoreITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/BatchFileStoreITCase.java
index b9560afe..7fdd2278 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/BatchFileStoreITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/BatchFileStoreITCase.java
@@ -30,7 +30,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** ITCase for batch file store. */
-public class BatchFileStoreITCase extends FileStoreTableITCase {
+public class BatchFileStoreITCase extends CatalogITCaseBase {
 
     @Override
     protected List<String> ddl() {
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CatalogITCaseBase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CatalogITCaseBase.java
index 0c661ffe..f8032fee 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CatalogITCaseBase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CatalogITCaseBase.java
@@ -19,14 +19,21 @@
 package org.apache.flink.table.store.connector;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
 import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.delegation.Parser;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.ddl.CreateCatalogOperation;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
 import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.CloseableIterator;
@@ -35,7 +42,10 @@ import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
 
 import org.junit.Before;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
+import java.net.URI;
 import java.time.Duration;
 import java.util.Collections;
 import java.util.List;
@@ -47,15 +57,18 @@ public abstract class CatalogITCaseBase extends AbstractTestBase {
 
     protected TableEnvironment tEnv;
     protected TableEnvironment sEnv;
+    protected String path;
 
     @Before
     public void before() throws IOException {
         tEnv = TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
         String catalog = "TABLE_STORE";
+        URI uri = TEMPORARY_FOLDER.newFolder().toURI();
+        path = uri.toString();
         tEnv.executeSql(
                 String.format(
                         "CREATE CATALOG %s WITH (" + "'type'='table-store', 'warehouse'='%s')",
-                        catalog, TEMPORARY_FOLDER.newFolder().toURI()));
+                        catalog, uri));
         tEnv.useCatalog(catalog);
 
         sEnv = TableEnvironment.create(EnvironmentSettings.newInstance().inStreamingMode().build());
@@ -65,9 +78,21 @@ public abstract class CatalogITCaseBase extends AbstractTestBase {
 
         prepareConfiguration(tEnv);
         prepareConfiguration(sEnv);
+        prepareEnv();
+    }
 
+    private void prepareEnv() {
+        Parser parser = ((TableEnvironmentImpl) tEnv).getParser();
         for (String ddl : ddl()) {
             tEnv.executeSql(ddl);
+            List<Operation> operations = parser.parse(ddl);
+            if (operations.size() == 1) {
+                Operation operation = operations.get(0);
+                if (operation instanceof CreateCatalogOperation) {
+                    String name = ((CreateCatalogOperation) operation).getCatalogName();
+                    sEnv.registerCatalog(name, tEnv.getCatalog(name).orElse(null));
+                }
+            }
         }
     }
 
@@ -108,4 +133,15 @@ public abstract class CatalogITCaseBase extends AbstractTestBase {
                 catalog.getTable(new ObjectPath(catalog.getDefaultDatabase(), tableName));
         return (CatalogTable) table;
     }
+
+    protected Path getTableDirectory(String tableName) {
+        return new Path(path + String.format("%s.db/%s", tEnv.getCurrentDatabase(), tableName));
+    }
+
+    @Nullable
+    protected Snapshot findLatestSnapshot(String tableName) {
+        SnapshotManager snapshotManager = new SnapshotManager(getTableDirectory(tableName));
+        Long id = snapshotManager.latestSnapshotId();
+        return id == null ? null : snapshotManager.snapshot(id);
+    }
 }
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ContinuousFileStoreITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ContinuousFileStoreITCase.java
index 829ed6ca..8646c5f3 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ContinuousFileStoreITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ContinuousFileStoreITCase.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.table.store.connector;
 
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.store.file.Snapshot;
 import org.apache.flink.table.store.file.utils.BlockingIterator;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
@@ -42,7 +41,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** SQL ITCase for continuous file store. */
 @RunWith(Parameterized.class)
-public class ContinuousFileStoreITCase extends FileStoreTableITCase {
+public class ContinuousFileStoreITCase extends CatalogITCaseBase {
 
     private final boolean changelogFile;
 
@@ -134,9 +133,7 @@ public class ContinuousFileStoreITCase extends FileStoreTableITCase {
                 .containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", "5", "6"));
         iterator.close();
 
-        SnapshotManager snapshotManager =
-                new SnapshotManager(
-                        new Path(path, "default_catalog.catalog/default_database.db/T1"));
+        SnapshotManager snapshotManager = new SnapshotManager(getTableDirectory("T1"));
         List<Snapshot> snapshots =
                 new ArrayList<>(ImmutableList.copyOf(snapshotManager.snapshots()));
         snapshots.sort(Comparator.comparingLong(Snapshot::timeMillis));
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreTableITCase.java
deleted file mode 100644
index caee8632..00000000
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreTableITCase.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.connector;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.api.EnvironmentSettings;
-import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.TableResult;
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
-import org.apache.flink.table.api.internal.TableEnvironmentImpl;
-import org.apache.flink.table.catalog.CatalogBaseTable;
-import org.apache.flink.table.catalog.ObjectIdentifier;
-import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
-import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
-import org.apache.flink.table.catalog.exceptions.TableNotExistException;
-import org.apache.flink.table.delegation.Parser;
-import org.apache.flink.table.operations.Operation;
-import org.apache.flink.table.operations.ddl.CreateCatalogOperation;
-import org.apache.flink.table.operations.ddl.CreateTableOperation;
-import org.apache.flink.table.store.file.Snapshot;
-import org.apache.flink.table.store.file.utils.SnapshotManager;
-import org.apache.flink.test.util.AbstractTestBase;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.CloseableIterator;
-
-import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
-
-import org.junit.Before;
-
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-import java.time.Duration;
-import java.util.List;
-
-import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL;
-import static org.apache.flink.table.store.connector.FlinkConnectorOptions.ROOT_PATH;
-import static org.apache.flink.table.store.connector.FlinkConnectorOptions.TABLE_STORE_PREFIX;
-import static org.apache.flink.table.store.connector.FlinkConnectorOptions.relativeTablePath;
-import static org.junit.jupiter.api.Assertions.fail;
-
-/** ITCase for file store table api. */
-@Deprecated
-public abstract class FileStoreTableITCase extends AbstractTestBase {
-
-    protected TableEnvironment bEnv;
-    protected TableEnvironment sEnv;
-    protected String path;
-
-    @Before
-    public void before() throws IOException {
-        bEnv = TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
-        sEnv = TableEnvironment.create(EnvironmentSettings.newInstance().inStreamingMode().build());
-        sEnv.getConfig().getConfiguration().set(CHECKPOINTING_INTERVAL, Duration.ofMillis(100));
-        path = TEMPORARY_FOLDER.newFolder().toURI().toString();
-        prepareConfiguration(bEnv, path);
-        prepareConfiguration(sEnv, path);
-        prepareEnv();
-    }
-
-    private void prepareConfiguration(TableEnvironment env, String path) {
-        Configuration config = env.getConfig().getConfiguration();
-        config.set(
-                ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
-                defaultParallelism());
-        config.setString(TABLE_STORE_PREFIX + ROOT_PATH.key(), path);
-    }
-
-    protected int defaultParallelism() {
-        return 2;
-    }
-
-    private void prepareEnv() {
-        Parser parser = ((TableEnvironmentImpl) sEnv).getParser();
-        for (String ddl : ddl()) {
-            sEnv.executeSql(ddl);
-            List<Operation> operations = parser.parse(ddl);
-            if (operations.size() == 1) {
-                Operation operation = operations.get(0);
-                if (operation instanceof CreateCatalogOperation) {
-                    String name = ((CreateCatalogOperation) operation).getCatalogName();
-                    bEnv.registerCatalog(name, sEnv.getCatalog(name).orElse(null));
-                } else if (operation instanceof CreateTableOperation) {
-                    ObjectIdentifier tableIdentifier =
-                            ((CreateTableOperation) operation).getTableIdentifier();
-                    try {
-                        CatalogBaseTable table =
-                                sEnv.getCatalog(tableIdentifier.getCatalogName())
-                                        .get()
-                                        .getTable(tableIdentifier.toObjectPath());
-                        ((TableEnvironmentImpl) bEnv)
-                                .getCatalogManager()
-                                .getCatalog(tableIdentifier.getCatalogName())
-                                .get()
-                                .createTable(tableIdentifier.toObjectPath(), table, true);
-                    } catch (TableNotExistException
-                            | TableAlreadyExistException
-                            | DatabaseNotExistException e) {
-                        fail("This should not happen");
-                    }
-                } else {
-                    bEnv.executeSql(ddl);
-                }
-            }
-        }
-    }
-
-    protected abstract List<String> ddl();
-
-    protected CloseableIterator<Row> streamSqlIter(String query, Object... args) {
-        return sEnv.executeSql(String.format(query, args)).collect();
-    }
-
-    protected List<Row> batchSql(String query, Object... args) {
-        TableResult tableResult = bEnv.executeSql(String.format(query, args));
-
-        try (CloseableIterator<Row> iter = tableResult.collect()) {
-            return ImmutableList.copyOf(iter);
-        } catch (Exception e) {
-            throw new RuntimeException("Failed to collect the table result.", e);
-        }
-    }
-
-    protected Path getTableDirectory(String tableName, boolean managedTable) {
-        return new Path(
-                path
-                        + (managedTable
-                                ? relativeTablePath(
-                                        ObjectIdentifier.of(
-                                                bEnv.getCurrentCatalog(),
-                                                bEnv.getCurrentDatabase(),
-                                                tableName))
-                                : String.format("%s.db/%s", bEnv.getCurrentDatabase(), tableName)));
-    }
-
-    @Nullable
-    protected Snapshot findLatestSnapshot(String tableName, boolean managedTable) {
-        SnapshotManager snapshotManager =
-                new SnapshotManager(getTableDirectory(tableName, managedTable));
-        Long id = snapshotManager.latestSnapshotId();
-        return id == null ? null : snapshotManager.snapshot(id);
-    }
-}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ForceCompactionITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ForceCompactionITCase.java
index 18c9a9f6..1d81d729 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ForceCompactionITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ForceCompactionITCase.java
@@ -38,7 +38,7 @@ import java.util.List;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** ITCase for auto-enabling commit.force-compact under batch mode. */
-public class ForceCompactionITCase extends FileStoreTableITCase {
+public class ForceCompactionITCase extends CatalogITCaseBase {
 
     private final FileFormat avro = FileFormat.fromIdentifier("avro", new Configuration());
 
@@ -210,7 +210,7 @@ public class ForceCompactionITCase extends FileStoreTableITCase {
 
     private void assertAppend(String sql, String tableName, long expectSnapshotId) {
         batchSql(sql);
-        Snapshot snapshot = findLatestSnapshot(tableName, true);
+        Snapshot snapshot = findLatestSnapshot(tableName);
         assertThat(snapshot.id()).isEqualTo(expectSnapshotId);
         assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
     }
@@ -218,7 +218,7 @@ public class ForceCompactionITCase extends FileStoreTableITCase {
     private void assertCompact(
             String sql, String tableName, long expectSnapshotId, String... expectPartMinMax) {
         batchSql(sql);
-        Snapshot snapshot = findLatestSnapshot(tableName, true);
+        Snapshot snapshot = findLatestSnapshot(tableName);
         assertThat(snapshot.id()).isEqualTo(expectSnapshotId);
         assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.COMPACT);
         RowType partType =
@@ -227,7 +227,7 @@ public class ForceCompactionITCase extends FileStoreTableITCase {
                         : RowType.of();
         FileStorePathFactory pathFactory =
                 new FileStorePathFactory(
-                        getTableDirectory(tableName, true),
+                        getTableDirectory(tableName),
                         partType,
                         "default",
                         CoreOptions.FILE_FORMAT.defaultValue());
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/PreAggregationITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/PreAggregationITCase.java
index d7c49711..732cd9a2 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/PreAggregationITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/PreAggregationITCase.java
@@ -34,7 +34,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
 /** ITCase for pre-aggregation. */
 public class PreAggregationITCase {
     /** ITCase for bool_or/bool_and aggregate function. */
-    public static class BoolOrAndAggregation extends FileStoreTableITCase {
+    public static class BoolOrAndAggregation extends CatalogITCaseBase {
         @Override
         protected List<String> ddl() {
             return Collections.singletonList(
@@ -105,7 +105,7 @@ public class PreAggregationITCase {
     }
 
     /** ITCase for listagg aggregate function. */
-    public static class ListAggAggregation extends FileStoreTableITCase {
+    public static class ListAggAggregation extends CatalogITCaseBase {
         @Override
         protected int defaultParallelism() {
             // set parallelism to 1 so that the order of input data is determined
@@ -129,7 +129,7 @@ public class PreAggregationITCase {
             // VALUES does not guarantee order, but order is important for list aggregations.
             // So we need to sort the input data.
             batchSql(
-                    "CREATE VIEW myView AS "
+                    "CREATE TABLE myTable AS "
                             + "SELECT b, c, d FROM "
                             + "(VALUES "
                             + "  (1, 1, 2, 'first line'),"
@@ -137,7 +137,7 @@ public class PreAggregationITCase {
                             + "  (3, 1, 2, 'second line')"
                             + ") AS V(a, b, c, d) "
                             + "ORDER BY a");
-            batchSql("INSERT INTO T6 SELECT * FROM myView");
+            batchSql("INSERT INTO T6 SELECT * FROM myTable");
             List<Row> result = batchSql("SELECT * FROM T6");
             assertThat(result).containsExactlyInAnyOrder(Row.of(1, 2, "first line,second line"));
         }
@@ -181,7 +181,7 @@ public class PreAggregationITCase {
     }
 
     /** ITCase for last value aggregate function. */
-    public static class LastValueAggregation extends FileStoreTableITCase {
+    public static class LastValueAggregation extends CatalogITCaseBase {
         @Override
         protected int defaultParallelism() {
             // set parallelism to 1 so that the order of input data is determined
@@ -207,7 +207,7 @@ public class PreAggregationITCase {
             // VALUES does not guarantee order, but order is important for last value aggregations.
             // So we need to sort the input data.
             batchSql(
-                    "CREATE VIEW myView AS "
+                    "CREATE TABLE myTable AS "
                             + "SELECT b, c, d, e FROM "
                             + "(VALUES "
                             + "  (1, 1, 2, CAST(NULL AS INT), CAST('2020-01-01' AS DATE)),"
@@ -215,7 +215,7 @@ public class PreAggregationITCase {
                             + "  (3, 1, 2, 3, CAST(NULL AS DATE))"
                             + ") AS V(a, b, c, d, e) "
                             + "ORDER BY a");
-            batchSql("INSERT INTO T5 SELECT * FROM myView");
+            batchSql("INSERT INTO T5 SELECT * FROM myTable");
             List<Row> result = batchSql("SELECT * FROM T5");
             assertThat(result).containsExactlyInAnyOrder(Row.of(1, 2, 3, null));
         }
@@ -259,7 +259,7 @@ public class PreAggregationITCase {
     }
 
     /** ITCase for last non-null value aggregate function. */
-    public static class LastNonNullValueAggregation extends FileStoreTableITCase {
+    public static class LastNonNullValueAggregation extends CatalogITCaseBase {
         @Override
         protected int defaultParallelism() {
             // set parallelism to 1 so that the order of input data is determined
@@ -285,7 +285,7 @@ public class PreAggregationITCase {
             // VALUES does not guarantee order, but order is important for last value aggregations.
             // So we need to sort the input data.
             batchSql(
-                    "CREATE VIEW myView AS "
+                    "CREATE TABLE myTable AS "
                             + "SELECT b, c, d, e FROM "
                             + "(VALUES "
                             + "  (1, 1, 2, CAST(NULL AS INT), CAST('2020-01-01' AS DATE)),"
@@ -293,7 +293,7 @@ public class PreAggregationITCase {
                             + "  (3, 1, 2, 3, CAST(NULL AS DATE))"
                             + ") AS V(a, b, c, d, e) "
                             + "ORDER BY a");
-            batchSql("INSERT INTO T4 SELECT * FROM myView");
+            batchSql("INSERT INTO T4 SELECT * FROM myTable");
             List<Row> result = batchSql("SELECT * FROM T4");
             assertThat(result).containsExactlyInAnyOrder(Row.of(1, 2, 3, LocalDate.of(2020, 1, 2)));
         }
@@ -338,7 +338,7 @@ public class PreAggregationITCase {
     }
 
     /** ITCase for min aggregate function. */
-    public static class MinAggregation extends FileStoreTableITCase {
+    public static class MinAggregation extends CatalogITCaseBase {
         @Override
         protected List<String> ddl() {
             return Collections.singletonList(
@@ -499,7 +499,7 @@ public class PreAggregationITCase {
     }
 
     /** ITCase for max aggregate function. */
-    public static class MaxAggregation extends FileStoreTableITCase {
+    public static class MaxAggregation extends CatalogITCaseBase {
         @Override
         protected List<String> ddl() {
             return Collections.singletonList(
@@ -663,7 +663,7 @@ public class PreAggregationITCase {
     }
 
     /** ITCase for sum aggregate function. */
-    public static class SumAggregation extends FileStoreTableITCase {
+    public static class SumAggregation extends CatalogITCaseBase {
         @Override
         protected List<String> ddl() {
             return Collections.singletonList(
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/RescaleBucketITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/RescaleBucketITCase.java
index a14ba63b..328045c4 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/RescaleBucketITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/RescaleBucketITCase.java
@@ -42,7 +42,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** IT case for overwrite data layout after changing num of bucket. */
-public class RescaleBucketITCase extends FileStoreTableITCase {
+public class RescaleBucketITCase extends CatalogITCaseBase {
 
     private final String alterTableSql = "ALTER TABLE %s SET ('bucket' = '%d')";
 
@@ -51,40 +51,33 @@ public class RescaleBucketITCase extends FileStoreTableITCase {
     @Override
     protected List<String> ddl() {
         return Arrays.asList(
-                "CREATE TABLE IF NOT EXISTS `default_catalog`.`default_database`.`T0` (f0 INT) WITH ('bucket' = '2')",
                 String.format(
                         "CREATE CATALOG `fs_catalog` WITH ('type' = 'table-store', 'warehouse' = '%s')",
                         path),
                 "CREATE TABLE IF NOT EXISTS `fs_catalog`.`default`.`T1` (f0 INT) WITH ('bucket' = '2')");
     }
 
-    @Test
-    public void testRescaleManagedTable() {
-        innerTest("default_catalog", "T0", true);
-    }
-
     @Test
     public void testRescaleCatalogTable() {
-        innerTest("fs_catalog", "T1", false);
+        innerTest("fs_catalog", "T1");
     }
 
     @Test
     public void testSuspendAndRecoverAfterRescaleOverwrite() throws Exception {
-        sEnv.executeSql(
-                "CREATE TABLE IF NOT EXISTS `default_catalog`.`default_database`.`S0` (f0 INT) WITH ('connector' = 'datagen')");
         // register a companion table T4 for T3
         executeBoth(
                 Arrays.asList(
                         "USE CATALOG fs_catalog",
+                        "CREATE TEMPORARY TABLE IF NOT EXISTS `S0` (f0 INT) WITH ('connector' = 'datagen')",
                         "CREATE TABLE IF NOT EXISTS `T3` (f0 INT) WITH ('bucket' = '2')",
                         "CREATE TABLE IF NOT EXISTS `T4` (f0 INT)"));
-        SchemaManager schemaManager = new SchemaManager(getTableDirectory("T3", false));
+        SchemaManager schemaManager = new SchemaManager(getTableDirectory("T3"));
         assertLatestSchema(schemaManager, 0L, 2);
 
         String streamSql =
                 "EXECUTE STATEMENT SET BEGIN\n "
-                        + "INSERT INTO `T3` SELECT * FROM `default_catalog`.`default_database`.`S0`;\n "
-                        + "INSERT INTO `T4` SELECT * FROM `default_catalog`.`default_database`.`S0`;\n"
+                        + "INSERT INTO `T3` SELECT * FROM `S0`;\n "
+                        + "INSERT INTO `T4` SELECT * FROM `S0`;\n"
                         + "END";
 
         sEnv.getConfig().getConfiguration().set(SavepointConfigOptions.SAVEPOINT_PATH, path);
@@ -96,7 +89,7 @@ public class RescaleBucketITCase extends FileStoreTableITCase {
         // step2: stop with savepoint
         stopJobSafely(client, jobId);
 
-        final Snapshot snapshotBeforeRescale = findLatestSnapshot("T3", false);
+        final Snapshot snapshotBeforeRescale = findLatestSnapshot("T3");
         assertThat(snapshotBeforeRescale).isNotNull();
         assertSnapshotSchema(schemaManager, snapshotBeforeRescale.schemaId(), 0L, 2);
         List<Row> committedData = batchSql("SELECT * FROM T3");
@@ -107,7 +100,7 @@ public class RescaleBucketITCase extends FileStoreTableITCase {
 
         // step4: rescale data layout according to the new bucket num
         batchSql(rescaleOverwriteSql, "T3", "T3");
-        Snapshot snapshotAfterRescale = findLatestSnapshot("T3", false);
+        Snapshot snapshotAfterRescale = findLatestSnapshot("T3");
         assertThat(snapshotAfterRescale).isNotNull();
         assertThat(snapshotAfterRescale.id()).isEqualTo(snapshotBeforeRescale.id() + 1);
         assertThat(snapshotAfterRescale.commitKind()).isEqualTo(Snapshot.CommitKind.OVERWRITE);
@@ -120,9 +113,9 @@ public class RescaleBucketITCase extends FileStoreTableITCase {
         stopJobSafely(client, resumedJobId);
 
         // check snapshot and schema
-        Snapshot lastSnapshot = findLatestSnapshot("T3", false);
+        Snapshot lastSnapshot = findLatestSnapshot("T3");
         assertThat(lastSnapshot).isNotNull();
-        SnapshotManager snapshotManager = new SnapshotManager(getTableDirectory("T3", false));
+        SnapshotManager snapshotManager = new SnapshotManager(getTableDirectory("T3"));
         for (long snapshotId = lastSnapshot.id();
                 snapshotId > snapshotAfterRescale.id();
                 snapshotId--) {
@@ -135,10 +128,10 @@ public class RescaleBucketITCase extends FileStoreTableITCase {
     }
 
     private void waitForTheNextSnapshot(@Nullable Long initSnapshotId) throws InterruptedException {
-        Snapshot snapshot = findLatestSnapshot("T3", false);
+        Snapshot snapshot = findLatestSnapshot("T3");
         while (snapshot == null || new Long(snapshot.id()).equals(initSnapshotId)) {
             Thread.sleep(2000L);
-            snapshot = findLatestSnapshot("T3", false);
+            snapshot = findLatestSnapshot("T3");
         }
     }
 
@@ -178,25 +171,20 @@ public class RescaleBucketITCase extends FileStoreTableITCase {
                 .containsEntry(BUCKET.key(), String.valueOf(expectedBucketNum));
     }
 
-    private void innerTest(String catalogName, String tableName, boolean managedTable) {
+    private void innerTest(String catalogName, String tableName) {
         String useCatalogSql = "USE CATALOG %s";
         batchSql(useCatalogSql, catalogName);
         String insertSql = "INSERT INTO %s VALUES (1), (2), (3), (4), (5)";
         batchSql(insertSql, tableName);
-        Snapshot snapshot = findLatestSnapshot(tableName, managedTable);
+        Snapshot snapshot = findLatestSnapshot(tableName);
         assertThat(snapshot).isNotNull();
 
-        SchemaManager schemaManager = new SchemaManager(getTableDirectory(tableName, managedTable));
+        SchemaManager schemaManager = new SchemaManager(getTableDirectory(tableName));
         assertSnapshotSchema(schemaManager, snapshot.schemaId(), 0L, 2);
 
         // for managed table schema id remains unchanged, for catalog table id increase from 0 to 1
         batchSql(alterTableSql, tableName, 4);
-        if (managedTable) {
-            // managed table cannot update schema
-            assertLatestSchema(schemaManager, 0L, 2);
-        } else {
-            assertLatestSchema(schemaManager, 1L, 4);
-        }
+        assertLatestSchema(schemaManager, 1L, 4);
 
         // check read is not influenced
         List<Row> expected = Arrays.asList(Row.of(1), Row.of(2), Row.of(3), Row.of(4), Row.of(5));
@@ -212,12 +200,11 @@ public class RescaleBucketITCase extends FileStoreTableITCase {
                                 + "Please switch to batch mode, and perform INSERT OVERWRITE to rescale current data layout first.");
 
         batchSql(rescaleOverwriteSql, tableName, tableName);
-        snapshot = findLatestSnapshot(tableName, managedTable);
+        snapshot = findLatestSnapshot(tableName);
         assertThat(snapshot).isNotNull();
         assertThat(snapshot.id()).isEqualTo(2L);
         assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.OVERWRITE);
-        assertSnapshotSchema(
-                schemaManager, snapshot.schemaId(), managedTable ? 0L : 1L, managedTable ? 2 : 4);
+        assertSnapshotSchema(schemaManager, snapshot.schemaId(), 1L, 4);
         assertThat(batchSql("SELECT * FROM %s", tableName))
                 .containsExactlyInAnyOrderElementsOf(expected);
 
@@ -232,7 +219,7 @@ public class RescaleBucketITCase extends FileStoreTableITCase {
         sqlList.forEach(
                 sql -> {
                     sEnv.executeSql(sql);
-                    bEnv.executeSql(sql);
+                    tEnv.executeSql(sql);
                 });
     }
 }