You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2023/01/09 06:41:20 UTC
[flink-table-store] branch master updated: [FLINK-30602] Remove FileStoreTableITCase in table store
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new b27fa51e [FLINK-30602] Remove FileStoreTableITCase in table store
b27fa51e is described below
commit b27fa51ed05bca3596ef1ba5131e97921ec01e33
Author: shammon <zj...@gmail.com>
AuthorDate: Mon Jan 9 14:41:16 2023 +0800
[FLINK-30602] Remove FileStoreTableITCase in table store
This closes #470
---
.../store/connector/AppendOnlyTableITCase.java | 8 +-
.../store/connector/BatchFileStoreITCase.java | 2 +-
.../table/store/connector/CatalogITCaseBase.java | 38 ++++-
.../store/connector/ContinuousFileStoreITCase.java | 7 +-
.../store/connector/FileStoreTableITCase.java | 160 ---------------------
.../store/connector/ForceCompactionITCase.java | 8 +-
.../store/connector/PreAggregationITCase.java | 26 ++--
.../table/store/connector/RescaleBucketITCase.java | 51 +++----
8 files changed, 80 insertions(+), 220 deletions(-)
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AppendOnlyTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AppendOnlyTableITCase.java
index cfee8bd2..354c7a5f 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AppendOnlyTableITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AppendOnlyTableITCase.java
@@ -34,7 +34,7 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Test case for append-only managed table. */
-public class AppendOnlyTableITCase extends FileStoreTableITCase {
+public class AppendOnlyTableITCase extends CatalogITCaseBase {
@Test
public void testCreateTableWithPrimaryKey() {
@@ -103,7 +103,7 @@ public class AppendOnlyTableITCase extends FileStoreTableITCase {
String id = TestValuesTableFactory.registerData(input);
batchSql(
- "CREATE TABLE source (id INT, data STRING) WITH ('connector'='values', 'bounded'='true', 'data-id'='%s')",
+ "CREATE TEMPORARY TABLE source (id INT, data STRING) WITH ('connector'='values', 'bounded'='true', 'data-id'='%s')",
id);
batchSql("INSERT INTO append_table SELECT * FROM source");
@@ -214,7 +214,7 @@ public class AppendOnlyTableITCase extends FileStoreTableITCase {
String id = TestValuesTableFactory.registerData(input);
batchSql(
- "CREATE TABLE source (id INT, data STRING) WITH ('connector'='values', 'bounded'='true', 'data-id'='%s')",
+ "CREATE TEMPORARY TABLE source (id INT, data STRING) WITH ('connector'='values', 'bounded'='true', 'data-id'='%s')",
id);
assertThatThrownBy(() -> batchSql("INSERT INTO append_table SELECT * FROM source"))
@@ -225,7 +225,7 @@ public class AppendOnlyTableITCase extends FileStoreTableITCase {
private void assertAutoCompaction(
String sql, long expectedSnapshotId, Snapshot.CommitKind expectedCommitKind) {
batchSql(sql);
- Snapshot snapshot = findLatestSnapshot("append_table", true);
+ Snapshot snapshot = findLatestSnapshot("append_table");
assertThat(snapshot.id()).isEqualTo(expectedSnapshotId);
assertThat(snapshot.commitKind()).isEqualTo(expectedCommitKind);
}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/BatchFileStoreITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/BatchFileStoreITCase.java
index b9560afe..7fdd2278 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/BatchFileStoreITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/BatchFileStoreITCase.java
@@ -30,7 +30,7 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** ITCase for batch file store. */
-public class BatchFileStoreITCase extends FileStoreTableITCase {
+public class BatchFileStoreITCase extends CatalogITCaseBase {
@Override
protected List<String> ddl() {
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CatalogITCaseBase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CatalogITCaseBase.java
index 0c661ffe..f8032fee 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CatalogITCaseBase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CatalogITCaseBase.java
@@ -19,14 +19,21 @@
package org.apache.flink.table.store.connector;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.delegation.Parser;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.ddl.CreateCatalogOperation;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
@@ -35,7 +42,10 @@ import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
import org.junit.Before;
+import javax.annotation.Nullable;
+
import java.io.IOException;
+import java.net.URI;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
@@ -47,15 +57,18 @@ public abstract class CatalogITCaseBase extends AbstractTestBase {
protected TableEnvironment tEnv;
protected TableEnvironment sEnv;
+ protected String path;
@Before
public void before() throws IOException {
tEnv = TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
String catalog = "TABLE_STORE";
+ URI uri = TEMPORARY_FOLDER.newFolder().toURI();
+ path = uri.toString();
tEnv.executeSql(
String.format(
"CREATE CATALOG %s WITH (" + "'type'='table-store', 'warehouse'='%s')",
- catalog, TEMPORARY_FOLDER.newFolder().toURI()));
+ catalog, uri));
tEnv.useCatalog(catalog);
sEnv = TableEnvironment.create(EnvironmentSettings.newInstance().inStreamingMode().build());
@@ -65,9 +78,21 @@ public abstract class CatalogITCaseBase extends AbstractTestBase {
prepareConfiguration(tEnv);
prepareConfiguration(sEnv);
+ prepareEnv();
+ }
+ private void prepareEnv() {
+ Parser parser = ((TableEnvironmentImpl) tEnv).getParser();
for (String ddl : ddl()) {
tEnv.executeSql(ddl);
+ List<Operation> operations = parser.parse(ddl);
+ if (operations.size() == 1) {
+ Operation operation = operations.get(0);
+ if (operation instanceof CreateCatalogOperation) {
+ String name = ((CreateCatalogOperation) operation).getCatalogName();
+ sEnv.registerCatalog(name, tEnv.getCatalog(name).orElse(null));
+ }
+ }
}
}
@@ -108,4 +133,15 @@ public abstract class CatalogITCaseBase extends AbstractTestBase {
catalog.getTable(new ObjectPath(catalog.getDefaultDatabase(), tableName));
return (CatalogTable) table;
}
+
+ protected Path getTableDirectory(String tableName) {
+ return new Path(path + String.format("%s.db/%s", tEnv.getCurrentDatabase(), tableName));
+ }
+
+ @Nullable
+ protected Snapshot findLatestSnapshot(String tableName) {
+ SnapshotManager snapshotManager = new SnapshotManager(getTableDirectory(tableName));
+ Long id = snapshotManager.latestSnapshotId();
+ return id == null ? null : snapshotManager.snapshot(id);
+ }
}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ContinuousFileStoreITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ContinuousFileStoreITCase.java
index 829ed6ca..8646c5f3 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ContinuousFileStoreITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ContinuousFileStoreITCase.java
@@ -18,7 +18,6 @@
package org.apache.flink.table.store.connector;
-import org.apache.flink.core.fs.Path;
import org.apache.flink.table.store.file.Snapshot;
import org.apache.flink.table.store.file.utils.BlockingIterator;
import org.apache.flink.table.store.file.utils.SnapshotManager;
@@ -42,7 +41,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** SQL ITCase for continuous file store. */
@RunWith(Parameterized.class)
-public class ContinuousFileStoreITCase extends FileStoreTableITCase {
+public class ContinuousFileStoreITCase extends CatalogITCaseBase {
private final boolean changelogFile;
@@ -134,9 +133,7 @@ public class ContinuousFileStoreITCase extends FileStoreTableITCase {
.containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", "5", "6"));
iterator.close();
- SnapshotManager snapshotManager =
- new SnapshotManager(
- new Path(path, "default_catalog.catalog/default_database.db/T1"));
+ SnapshotManager snapshotManager = new SnapshotManager(getTableDirectory("T1"));
List<Snapshot> snapshots =
new ArrayList<>(ImmutableList.copyOf(snapshotManager.snapshots()));
snapshots.sort(Comparator.comparingLong(Snapshot::timeMillis));
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreTableITCase.java
deleted file mode 100644
index caee8632..00000000
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreTableITCase.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.connector;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.api.EnvironmentSettings;
-import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.TableResult;
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
-import org.apache.flink.table.api.internal.TableEnvironmentImpl;
-import org.apache.flink.table.catalog.CatalogBaseTable;
-import org.apache.flink.table.catalog.ObjectIdentifier;
-import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
-import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
-import org.apache.flink.table.catalog.exceptions.TableNotExistException;
-import org.apache.flink.table.delegation.Parser;
-import org.apache.flink.table.operations.Operation;
-import org.apache.flink.table.operations.ddl.CreateCatalogOperation;
-import org.apache.flink.table.operations.ddl.CreateTableOperation;
-import org.apache.flink.table.store.file.Snapshot;
-import org.apache.flink.table.store.file.utils.SnapshotManager;
-import org.apache.flink.test.util.AbstractTestBase;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.CloseableIterator;
-
-import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
-
-import org.junit.Before;
-
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-import java.time.Duration;
-import java.util.List;
-
-import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL;
-import static org.apache.flink.table.store.connector.FlinkConnectorOptions.ROOT_PATH;
-import static org.apache.flink.table.store.connector.FlinkConnectorOptions.TABLE_STORE_PREFIX;
-import static org.apache.flink.table.store.connector.FlinkConnectorOptions.relativeTablePath;
-import static org.junit.jupiter.api.Assertions.fail;
-
-/** ITCase for file store table api. */
-@Deprecated
-public abstract class FileStoreTableITCase extends AbstractTestBase {
-
- protected TableEnvironment bEnv;
- protected TableEnvironment sEnv;
- protected String path;
-
- @Before
- public void before() throws IOException {
- bEnv = TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
- sEnv = TableEnvironment.create(EnvironmentSettings.newInstance().inStreamingMode().build());
- sEnv.getConfig().getConfiguration().set(CHECKPOINTING_INTERVAL, Duration.ofMillis(100));
- path = TEMPORARY_FOLDER.newFolder().toURI().toString();
- prepareConfiguration(bEnv, path);
- prepareConfiguration(sEnv, path);
- prepareEnv();
- }
-
- private void prepareConfiguration(TableEnvironment env, String path) {
- Configuration config = env.getConfig().getConfiguration();
- config.set(
- ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
- defaultParallelism());
- config.setString(TABLE_STORE_PREFIX + ROOT_PATH.key(), path);
- }
-
- protected int defaultParallelism() {
- return 2;
- }
-
- private void prepareEnv() {
- Parser parser = ((TableEnvironmentImpl) sEnv).getParser();
- for (String ddl : ddl()) {
- sEnv.executeSql(ddl);
- List<Operation> operations = parser.parse(ddl);
- if (operations.size() == 1) {
- Operation operation = operations.get(0);
- if (operation instanceof CreateCatalogOperation) {
- String name = ((CreateCatalogOperation) operation).getCatalogName();
- bEnv.registerCatalog(name, sEnv.getCatalog(name).orElse(null));
- } else if (operation instanceof CreateTableOperation) {
- ObjectIdentifier tableIdentifier =
- ((CreateTableOperation) operation).getTableIdentifier();
- try {
- CatalogBaseTable table =
- sEnv.getCatalog(tableIdentifier.getCatalogName())
- .get()
- .getTable(tableIdentifier.toObjectPath());
- ((TableEnvironmentImpl) bEnv)
- .getCatalogManager()
- .getCatalog(tableIdentifier.getCatalogName())
- .get()
- .createTable(tableIdentifier.toObjectPath(), table, true);
- } catch (TableNotExistException
- | TableAlreadyExistException
- | DatabaseNotExistException e) {
- fail("This should not happen");
- }
- } else {
- bEnv.executeSql(ddl);
- }
- }
- }
- }
-
- protected abstract List<String> ddl();
-
- protected CloseableIterator<Row> streamSqlIter(String query, Object... args) {
- return sEnv.executeSql(String.format(query, args)).collect();
- }
-
- protected List<Row> batchSql(String query, Object... args) {
- TableResult tableResult = bEnv.executeSql(String.format(query, args));
-
- try (CloseableIterator<Row> iter = tableResult.collect()) {
- return ImmutableList.copyOf(iter);
- } catch (Exception e) {
- throw new RuntimeException("Failed to collect the table result.", e);
- }
- }
-
- protected Path getTableDirectory(String tableName, boolean managedTable) {
- return new Path(
- path
- + (managedTable
- ? relativeTablePath(
- ObjectIdentifier.of(
- bEnv.getCurrentCatalog(),
- bEnv.getCurrentDatabase(),
- tableName))
- : String.format("%s.db/%s", bEnv.getCurrentDatabase(), tableName)));
- }
-
- @Nullable
- protected Snapshot findLatestSnapshot(String tableName, boolean managedTable) {
- SnapshotManager snapshotManager =
- new SnapshotManager(getTableDirectory(tableName, managedTable));
- Long id = snapshotManager.latestSnapshotId();
- return id == null ? null : snapshotManager.snapshot(id);
- }
-}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ForceCompactionITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ForceCompactionITCase.java
index 18c9a9f6..1d81d729 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ForceCompactionITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ForceCompactionITCase.java
@@ -38,7 +38,7 @@ import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
/** ITCase for auto-enabling commit.force-compact under batch mode. */
-public class ForceCompactionITCase extends FileStoreTableITCase {
+public class ForceCompactionITCase extends CatalogITCaseBase {
private final FileFormat avro = FileFormat.fromIdentifier("avro", new Configuration());
@@ -210,7 +210,7 @@ public class ForceCompactionITCase extends FileStoreTableITCase {
private void assertAppend(String sql, String tableName, long expectSnapshotId) {
batchSql(sql);
- Snapshot snapshot = findLatestSnapshot(tableName, true);
+ Snapshot snapshot = findLatestSnapshot(tableName);
assertThat(snapshot.id()).isEqualTo(expectSnapshotId);
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
}
@@ -218,7 +218,7 @@ public class ForceCompactionITCase extends FileStoreTableITCase {
private void assertCompact(
String sql, String tableName, long expectSnapshotId, String... expectPartMinMax) {
batchSql(sql);
- Snapshot snapshot = findLatestSnapshot(tableName, true);
+ Snapshot snapshot = findLatestSnapshot(tableName);
assertThat(snapshot.id()).isEqualTo(expectSnapshotId);
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.COMPACT);
RowType partType =
@@ -227,7 +227,7 @@ public class ForceCompactionITCase extends FileStoreTableITCase {
: RowType.of();
FileStorePathFactory pathFactory =
new FileStorePathFactory(
- getTableDirectory(tableName, true),
+ getTableDirectory(tableName),
partType,
"default",
CoreOptions.FILE_FORMAT.defaultValue());
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/PreAggregationITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/PreAggregationITCase.java
index d7c49711..732cd9a2 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/PreAggregationITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/PreAggregationITCase.java
@@ -34,7 +34,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** ITCase for pre-aggregation. */
public class PreAggregationITCase {
/** ITCase for bool_or/bool_and aggregate function. */
- public static class BoolOrAndAggregation extends FileStoreTableITCase {
+ public static class BoolOrAndAggregation extends CatalogITCaseBase {
@Override
protected List<String> ddl() {
return Collections.singletonList(
@@ -105,7 +105,7 @@ public class PreAggregationITCase {
}
/** ITCase for listagg aggregate function. */
- public static class ListAggAggregation extends FileStoreTableITCase {
+ public static class ListAggAggregation extends CatalogITCaseBase {
@Override
protected int defaultParallelism() {
// set parallelism to 1 so that the order of input data is determined
@@ -129,7 +129,7 @@ public class PreAggregationITCase {
// VALUES does not guarantee order, but order is important for list aggregations.
// So we need to sort the input data.
batchSql(
- "CREATE VIEW myView AS "
+ "CREATE TABLE myTable AS "
+ "SELECT b, c, d FROM "
+ "(VALUES "
+ " (1, 1, 2, 'first line'),"
@@ -137,7 +137,7 @@ public class PreAggregationITCase {
+ " (3, 1, 2, 'second line')"
+ ") AS V(a, b, c, d) "
+ "ORDER BY a");
- batchSql("INSERT INTO T6 SELECT * FROM myView");
+ batchSql("INSERT INTO T6 SELECT * FROM myTable");
List<Row> result = batchSql("SELECT * FROM T6");
assertThat(result).containsExactlyInAnyOrder(Row.of(1, 2, "first line,second line"));
}
@@ -181,7 +181,7 @@ public class PreAggregationITCase {
}
/** ITCase for last value aggregate function. */
- public static class LastValueAggregation extends FileStoreTableITCase {
+ public static class LastValueAggregation extends CatalogITCaseBase {
@Override
protected int defaultParallelism() {
// set parallelism to 1 so that the order of input data is determined
@@ -207,7 +207,7 @@ public class PreAggregationITCase {
// VALUES does not guarantee order, but order is important for last value aggregations.
// So we need to sort the input data.
batchSql(
- "CREATE VIEW myView AS "
+ "CREATE TABLE myTable AS "
+ "SELECT b, c, d, e FROM "
+ "(VALUES "
+ " (1, 1, 2, CAST(NULL AS INT), CAST('2020-01-01' AS DATE)),"
@@ -215,7 +215,7 @@ public class PreAggregationITCase {
+ " (3, 1, 2, 3, CAST(NULL AS DATE))"
+ ") AS V(a, b, c, d, e) "
+ "ORDER BY a");
- batchSql("INSERT INTO T5 SELECT * FROM myView");
+ batchSql("INSERT INTO T5 SELECT * FROM myTable");
List<Row> result = batchSql("SELECT * FROM T5");
assertThat(result).containsExactlyInAnyOrder(Row.of(1, 2, 3, null));
}
@@ -259,7 +259,7 @@ public class PreAggregationITCase {
}
/** ITCase for last non-null value aggregate function. */
- public static class LastNonNullValueAggregation extends FileStoreTableITCase {
+ public static class LastNonNullValueAggregation extends CatalogITCaseBase {
@Override
protected int defaultParallelism() {
// set parallelism to 1 so that the order of input data is determined
@@ -285,7 +285,7 @@ public class PreAggregationITCase {
// VALUES does not guarantee order, but order is important for last value aggregations.
// So we need to sort the input data.
batchSql(
- "CREATE VIEW myView AS "
+ "CREATE TABLE myTable AS "
+ "SELECT b, c, d, e FROM "
+ "(VALUES "
+ " (1, 1, 2, CAST(NULL AS INT), CAST('2020-01-01' AS DATE)),"
@@ -293,7 +293,7 @@ public class PreAggregationITCase {
+ " (3, 1, 2, 3, CAST(NULL AS DATE))"
+ ") AS V(a, b, c, d, e) "
+ "ORDER BY a");
- batchSql("INSERT INTO T4 SELECT * FROM myView");
+ batchSql("INSERT INTO T4 SELECT * FROM myTable");
List<Row> result = batchSql("SELECT * FROM T4");
assertThat(result).containsExactlyInAnyOrder(Row.of(1, 2, 3, LocalDate.of(2020, 1, 2)));
}
@@ -338,7 +338,7 @@ public class PreAggregationITCase {
}
/** ITCase for min aggregate function. */
- public static class MinAggregation extends FileStoreTableITCase {
+ public static class MinAggregation extends CatalogITCaseBase {
@Override
protected List<String> ddl() {
return Collections.singletonList(
@@ -499,7 +499,7 @@ public class PreAggregationITCase {
}
/** ITCase for max aggregate function. */
- public static class MaxAggregation extends FileStoreTableITCase {
+ public static class MaxAggregation extends CatalogITCaseBase {
@Override
protected List<String> ddl() {
return Collections.singletonList(
@@ -663,7 +663,7 @@ public class PreAggregationITCase {
}
/** ITCase for sum aggregate function. */
- public static class SumAggregation extends FileStoreTableITCase {
+ public static class SumAggregation extends CatalogITCaseBase {
@Override
protected List<String> ddl() {
return Collections.singletonList(
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/RescaleBucketITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/RescaleBucketITCase.java
index a14ba63b..328045c4 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/RescaleBucketITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/RescaleBucketITCase.java
@@ -42,7 +42,7 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** IT case for overwrite data layout after changing num of bucket. */
-public class RescaleBucketITCase extends FileStoreTableITCase {
+public class RescaleBucketITCase extends CatalogITCaseBase {
private final String alterTableSql = "ALTER TABLE %s SET ('bucket' = '%d')";
@@ -51,40 +51,33 @@ public class RescaleBucketITCase extends FileStoreTableITCase {
@Override
protected List<String> ddl() {
return Arrays.asList(
- "CREATE TABLE IF NOT EXISTS `default_catalog`.`default_database`.`T0` (f0 INT) WITH ('bucket' = '2')",
String.format(
"CREATE CATALOG `fs_catalog` WITH ('type' = 'table-store', 'warehouse' = '%s')",
path),
"CREATE TABLE IF NOT EXISTS `fs_catalog`.`default`.`T1` (f0 INT) WITH ('bucket' = '2')");
}
- @Test
- public void testRescaleManagedTable() {
- innerTest("default_catalog", "T0", true);
- }
-
@Test
public void testRescaleCatalogTable() {
- innerTest("fs_catalog", "T1", false);
+ innerTest("fs_catalog", "T1");
}
@Test
public void testSuspendAndRecoverAfterRescaleOverwrite() throws Exception {
- sEnv.executeSql(
- "CREATE TABLE IF NOT EXISTS `default_catalog`.`default_database`.`S0` (f0 INT) WITH ('connector' = 'datagen')");
// register a companion table T4 for T3
executeBoth(
Arrays.asList(
"USE CATALOG fs_catalog",
+ "CREATE TEMPORARY TABLE IF NOT EXISTS `S0` (f0 INT) WITH ('connector' = 'datagen')",
"CREATE TABLE IF NOT EXISTS `T3` (f0 INT) WITH ('bucket' = '2')",
"CREATE TABLE IF NOT EXISTS `T4` (f0 INT)"));
- SchemaManager schemaManager = new SchemaManager(getTableDirectory("T3", false));
+ SchemaManager schemaManager = new SchemaManager(getTableDirectory("T3"));
assertLatestSchema(schemaManager, 0L, 2);
String streamSql =
"EXECUTE STATEMENT SET BEGIN\n "
- + "INSERT INTO `T3` SELECT * FROM `default_catalog`.`default_database`.`S0`;\n "
- + "INSERT INTO `T4` SELECT * FROM `default_catalog`.`default_database`.`S0`;\n"
+ + "INSERT INTO `T3` SELECT * FROM `S0`;\n "
+ + "INSERT INTO `T4` SELECT * FROM `S0`;\n"
+ "END";
sEnv.getConfig().getConfiguration().set(SavepointConfigOptions.SAVEPOINT_PATH, path);
@@ -96,7 +89,7 @@ public class RescaleBucketITCase extends FileStoreTableITCase {
// step2: stop with savepoint
stopJobSafely(client, jobId);
- final Snapshot snapshotBeforeRescale = findLatestSnapshot("T3", false);
+ final Snapshot snapshotBeforeRescale = findLatestSnapshot("T3");
assertThat(snapshotBeforeRescale).isNotNull();
assertSnapshotSchema(schemaManager, snapshotBeforeRescale.schemaId(), 0L, 2);
List<Row> committedData = batchSql("SELECT * FROM T3");
@@ -107,7 +100,7 @@ public class RescaleBucketITCase extends FileStoreTableITCase {
// step4: rescale data layout according to the new bucket num
batchSql(rescaleOverwriteSql, "T3", "T3");
- Snapshot snapshotAfterRescale = findLatestSnapshot("T3", false);
+ Snapshot snapshotAfterRescale = findLatestSnapshot("T3");
assertThat(snapshotAfterRescale).isNotNull();
assertThat(snapshotAfterRescale.id()).isEqualTo(snapshotBeforeRescale.id() + 1);
assertThat(snapshotAfterRescale.commitKind()).isEqualTo(Snapshot.CommitKind.OVERWRITE);
@@ -120,9 +113,9 @@ public class RescaleBucketITCase extends FileStoreTableITCase {
stopJobSafely(client, resumedJobId);
// check snapshot and schema
- Snapshot lastSnapshot = findLatestSnapshot("T3", false);
+ Snapshot lastSnapshot = findLatestSnapshot("T3");
assertThat(lastSnapshot).isNotNull();
- SnapshotManager snapshotManager = new SnapshotManager(getTableDirectory("T3", false));
+ SnapshotManager snapshotManager = new SnapshotManager(getTableDirectory("T3"));
for (long snapshotId = lastSnapshot.id();
snapshotId > snapshotAfterRescale.id();
snapshotId--) {
@@ -135,10 +128,10 @@ public class RescaleBucketITCase extends FileStoreTableITCase {
}
private void waitForTheNextSnapshot(@Nullable Long initSnapshotId) throws InterruptedException {
- Snapshot snapshot = findLatestSnapshot("T3", false);
+ Snapshot snapshot = findLatestSnapshot("T3");
while (snapshot == null || new Long(snapshot.id()).equals(initSnapshotId)) {
Thread.sleep(2000L);
- snapshot = findLatestSnapshot("T3", false);
+ snapshot = findLatestSnapshot("T3");
}
}
@@ -178,25 +171,20 @@ public class RescaleBucketITCase extends FileStoreTableITCase {
.containsEntry(BUCKET.key(), String.valueOf(expectedBucketNum));
}
- private void innerTest(String catalogName, String tableName, boolean managedTable) {
+ private void innerTest(String catalogName, String tableName) {
String useCatalogSql = "USE CATALOG %s";
batchSql(useCatalogSql, catalogName);
String insertSql = "INSERT INTO %s VALUES (1), (2), (3), (4), (5)";
batchSql(insertSql, tableName);
- Snapshot snapshot = findLatestSnapshot(tableName, managedTable);
+ Snapshot snapshot = findLatestSnapshot(tableName);
assertThat(snapshot).isNotNull();
- SchemaManager schemaManager = new SchemaManager(getTableDirectory(tableName, managedTable));
+ SchemaManager schemaManager = new SchemaManager(getTableDirectory(tableName));
assertSnapshotSchema(schemaManager, snapshot.schemaId(), 0L, 2);
// for managed table schema id remains unchanged, for catalog table id increase from 0 to 1
batchSql(alterTableSql, tableName, 4);
- if (managedTable) {
- // managed table cannot update schema
- assertLatestSchema(schemaManager, 0L, 2);
- } else {
- assertLatestSchema(schemaManager, 1L, 4);
- }
+ assertLatestSchema(schemaManager, 1L, 4);
// check read is not influenced
List<Row> expected = Arrays.asList(Row.of(1), Row.of(2), Row.of(3), Row.of(4), Row.of(5));
@@ -212,12 +200,11 @@ public class RescaleBucketITCase extends FileStoreTableITCase {
+ "Please switch to batch mode, and perform INSERT OVERWRITE to rescale current data layout first.");
batchSql(rescaleOverwriteSql, tableName, tableName);
- snapshot = findLatestSnapshot(tableName, managedTable);
+ snapshot = findLatestSnapshot(tableName);
assertThat(snapshot).isNotNull();
assertThat(snapshot.id()).isEqualTo(2L);
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.OVERWRITE);
- assertSnapshotSchema(
- schemaManager, snapshot.schemaId(), managedTable ? 0L : 1L, managedTable ? 2 : 4);
+ assertSnapshotSchema(schemaManager, snapshot.schemaId(), 1L, 4);
assertThat(batchSql("SELECT * FROM %s", tableName))
.containsExactlyInAnyOrderElementsOf(expected);
@@ -232,7 +219,7 @@ public class RescaleBucketITCase extends FileStoreTableITCase {
sqlList.forEach(
sql -> {
sEnv.executeSql(sql);
- bEnv.executeSql(sql);
+ tEnv.executeSql(sql);
});
}
}