You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/07/12 09:45:13 UTC
[flink-table-store] branch master updated: [FLINK-28483] Basic schema evolution for table store
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new a73350c3 [FLINK-28483] Basic schema evolution for table store
a73350c3 is described below
commit a73350c3694d78f3bff4900c8be7f3f02983b6e1
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Tue Jul 12 17:45:07 2022 +0800
[FLINK-28483] Basic schema evolution for table store
This closes #208
---
docs/content/docs/engines/spark.md | 26 +++
.../flink/table/store/connector/FlinkCatalog.java | 115 +++++++++---
.../table/store/connector/sink/StoreCommitter.java | 20 +-
.../table/store/connector/sink/StoreSink.java | 24 +--
...PredicateITCase.java => CatalogITCaseBase.java} | 47 ++---
.../table/store/connector/PredicateITCase.java | 31 +--
.../table/store/connector/SchemaChangeITCase.java | 47 +++++
.../flink/table/store/file/catalog/Catalog.java | 7 +-
.../store/file/catalog/FileSystemCatalog.java | 20 +-
.../flink/table/store/file/operation/Lock.java | 47 ++++-
.../table/store/file/schema/SchemaChange.java | 208 +++++++++++++++++++++
.../table/store/file/schema/SchemaManager.java | 127 +++++++++++--
.../flink/table/store/file/schema/TableSchema.java | 2 +-
.../flink/table/store/table/sink/TableCommit.java | 11 +-
.../apache/flink/table/store/hive/HiveCatalog.java | 61 +++---
.../flink/table/store/hive/HiveCatalogLock.java | 32 ++--
.../flink/table/store/spark/SparkCatalog.java | 72 +++++--
.../flink/table/store/spark/SparkTypeUtils.java | 132 +++++++++++--
.../table/store/spark/SparkInternalRowTest.java | 23 +--
.../flink/table/store/spark/SparkReadITCase.java | 58 +++++-
.../flink/table/store/spark/SparkTypeTest.java | 33 ++--
21 files changed, 884 insertions(+), 259 deletions(-)
diff --git a/docs/content/docs/engines/spark.md b/docs/content/docs/engines/spark.md
index b831a67f..7657f118 100644
--- a/docs/content/docs/engines/spark.md
+++ b/docs/content/docs/engines/spark.md
@@ -70,3 +70,29 @@ OPTIONS (
```sql
SELECT * FROM table_store.default.myTable;
```
+
+## DDL
+
+`ALTER TABLE ... SET TBLPROPERTIES`
+```sql
+ALTER TABLE table_store.default.myTable SET TBLPROPERTIES (
+ 'write-buffer-size'='256 MB'
+)
+```
+
+`ALTER TABLE ... UNSET TBLPROPERTIES`
+```sql
+ALTER TABLE table_store.default.myTable UNSET TBLPROPERTIES ('write-buffer-size')
+```
+
+`ALTER TABLE ... ADD COLUMN`
+```sql
+ALTER TABLE table_store.default.myTable
+ADD COLUMNS (new_column STRING)
+```
+
+`ALTER TABLE ... ALTER COLUMN ... TYPE`
+```sql
+ALTER TABLE table_store.default.myTable
+ALTER COLUMN column_name TYPE BIGINT
+```
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java
index 15fefe42..e4dcefb7 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java
@@ -41,12 +41,15 @@ import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.factories.Factory;
import org.apache.flink.table.store.file.catalog.Catalog;
+import org.apache.flink.table.store.file.schema.SchemaChange;
import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.file.schema.UpdateSchema;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
@@ -132,7 +135,7 @@ public class FlinkCatalog extends AbstractCatalog {
}
@Override
- public CatalogBaseTable getTable(ObjectPath tablePath)
+ public CatalogTable getTable(ObjectPath tablePath)
throws TableNotExistException, CatalogException {
TableSchema schema;
try {
@@ -165,8 +168,33 @@ public class FlinkCatalog extends AbstractCatalog {
@Override
public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
+ if (!(table instanceof CatalogTable)) {
+ throw new UnsupportedOperationException(
+ "Only support CatalogTable, but is: " + table.getClass());
+ }
+ CatalogTable catalogTable = (CatalogTable) table;
+ Map<String, String> options = table.getOptions();
+ if (options.containsKey(CONNECTOR.key())) {
+ throw new CatalogException(
+ String.format(
+ "Table Store Catalog only supports table store tables, not '%s' connector."
+ + " You can create TEMPORARY table instead.",
+ options.get(CONNECTOR.key())));
+ }
+
+ // remove table path
+ String specific = options.remove(PATH.key());
+ if (specific != null) {
+ if (!catalog.getTableLocation(tablePath).equals(new Path(specific))) {
+ throw new IllegalArgumentException(
+ "Illegal table path in table options: " + specific);
+ }
+ catalogTable = catalogTable.copy(options);
+ }
+
try {
- catalog.createTable(tablePath, convertTableToSchema(tablePath, table), ignoreIfExists);
+ catalog.createTable(
+ tablePath, UpdateSchema.fromCatalogTable(catalogTable), ignoreIfExists);
} catch (Catalog.TableAlreadyExistException e) {
throw new TableAlreadyExistException(getName(), e.tablePath());
} catch (Catalog.DatabaseNotExistException e) {
@@ -178,40 +206,75 @@ public class FlinkCatalog extends AbstractCatalog {
public void alterTable(
ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
+ if (ignoreIfNotExists && !tableExists(tablePath)) {
+ return;
+ }
+
+ CatalogTable table = getTable(tablePath);
+
+ // Currently, Flink SQL only support altering table properties.
+ validateAlterTable(table, (CatalogTable) newTable);
+
+ List<SchemaChange> changes = new ArrayList<>();
+ Map<String, String> oldProperties = table.getOptions();
+ for (Map.Entry<String, String> entry : newTable.getOptions().entrySet()) {
+ String key = entry.getKey();
+ String value = entry.getValue();
+
+ if (Objects.equals(value, oldProperties.get(key))) {
+ continue;
+ }
+
+ if (PATH.key().equalsIgnoreCase(key)) {
+ throw new IllegalArgumentException("Illegal table path in table options: " + value);
+ }
+
+ changes.add(SchemaChange.setOption(key, value));
+ }
+
+ oldProperties
+ .keySet()
+ .forEach(
+ k -> {
+ if (!newTable.getOptions().containsKey(k)) {
+ changes.add(SchemaChange.removeOption(k));
+ }
+ });
+
try {
- catalog.alterTable(
- tablePath, convertTableToSchema(tablePath, newTable), ignoreIfNotExists);
+ catalog.alterTable(tablePath, changes, ignoreIfNotExists);
} catch (Catalog.TableNotExistException e) {
throw new TableNotExistException(getName(), e.tablePath());
}
}
- private UpdateSchema convertTableToSchema(ObjectPath tablePath, CatalogBaseTable baseTable) {
- if (!(baseTable instanceof CatalogTable)) {
- throw new UnsupportedOperationException(
- "Only support CatalogTable, but is: " + baseTable.getClass());
- }
- CatalogTable table = (CatalogTable) baseTable;
- Map<String, String> options = table.getOptions();
- if (options.containsKey(CONNECTOR.key())) {
- throw new CatalogException(
- String.format(
- "Table Store Catalog only supports table store tables, not '%s' connector."
- + " You can create TEMPORARY table instead.",
- options.get(CONNECTOR.key())));
+ private static void validateAlterTable(CatalogTable ct1, CatalogTable ct2) {
+ org.apache.flink.table.api.TableSchema ts1 = ct1.getSchema();
+ org.apache.flink.table.api.TableSchema ts2 = ct2.getSchema();
+ boolean pkEquality = false;
+
+ if (ts1.getPrimaryKey().isPresent() && ts2.getPrimaryKey().isPresent()) {
+ pkEquality =
+ Objects.equals(
+ ts1.getPrimaryKey().get().getType(),
+ ts2.getPrimaryKey().get().getType())
+ && Objects.equals(
+ ts1.getPrimaryKey().get().getColumns(),
+ ts2.getPrimaryKey().get().getColumns());
+ } else if (!ts1.getPrimaryKey().isPresent() && !ts2.getPrimaryKey().isPresent()) {
+ pkEquality = true;
}
- // remove table path
- String specific = options.remove(PATH.key());
- if (specific != null) {
- if (!catalog.getTableLocation(tablePath).equals(new Path(specific))) {
- throw new IllegalArgumentException(
- "Illegal table path in table options: " + specific);
- }
- table = table.copy(options);
+ if (!(Objects.equals(ts1.getTableColumns(), ts2.getTableColumns())
+ && Objects.equals(ts1.getWatermarkSpecs(), ts2.getWatermarkSpecs())
+ && pkEquality)) {
+ throw new UnsupportedOperationException("Altering schema is not supported yet.");
}
- return UpdateSchema.fromCatalogTable(table);
+ if (!ct1.getPartitionKeys().equals(ct2.getPartitionKeys())) {
+ throw new UnsupportedOperationException(
+ "Altering partition keys is not supported yet.");
+ }
}
@Override
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCommitter.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCommitter.java
index d06e09fd..cfe0551b 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCommitter.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCommitter.java
@@ -18,13 +18,10 @@
package org.apache.flink.table.store.connector.sink;
-import org.apache.flink.table.store.file.catalog.CatalogLock;
import org.apache.flink.table.store.file.manifest.ManifestCommittable;
import org.apache.flink.table.store.table.sink.FileCommittable;
import org.apache.flink.table.store.table.sink.TableCommit;
-import javax.annotation.Nullable;
-
import java.io.IOException;
import java.util.List;
@@ -33,18 +30,8 @@ public class StoreCommitter implements Committer {
private final TableCommit commit;
- @Nullable private final CatalogLock lock;
-
- public StoreCommitter(TableCommit commit, @Nullable CatalogLock lock) {
+ public StoreCommitter(TableCommit commit) {
this.commit = commit;
- this.lock = lock;
- }
-
- @Override
- public void close() throws Exception {
- if (lock != null) {
- lock.close();
- }
}
@Override
@@ -78,4 +65,9 @@ public class StoreCommitter implements Committer {
throws IOException, InterruptedException {
commit.commit(committables);
}
+
+ @Override
+ public void close() throws Exception {
+ commit.close();
+ }
}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
index 544675a6..3f205871 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
@@ -38,7 +38,6 @@ import javax.annotation.Nullable;
import java.io.Serializable;
import java.util.Map;
-import java.util.concurrent.Callable;
/** Sink of dynamic store. */
public class StoreSink implements Serializable {
@@ -88,28 +87,9 @@ public class StoreSink implements Serializable {
}
private StoreCommitter createCommitter(String user) {
- CatalogLock catalogLock;
- Lock lock;
- if (lockFactory == null) {
- catalogLock = null;
- lock = null;
- } else {
- catalogLock = lockFactory.create();
- lock =
- new Lock() {
- @Override
- public <T> T runWithLock(Callable<T> callable) throws Exception {
- return catalogLock.runWithLock(
- tableIdentifier.getDatabaseName(),
- tableIdentifier.getObjectName(),
- callable);
- }
- };
- }
-
+ Lock lock = Lock.fromCatalog(lockFactory, tableIdentifier.toObjectPath());
return new StoreCommitter(
- table.newCommit(user).withOverwritePartition(overwritePartition).withLock(lock),
- catalogLock);
+ table.newCommit(user).withOverwritePartition(overwritePartition).withLock(lock));
}
public DataStreamSink<?> sinkTo(DataStream<RowData> input) {
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/PredicateITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CatalogITCaseBase.java
similarity index 63%
copy from flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/PredicateITCase.java
copy to flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CatalogITCaseBase.java
index 4e56fec8..feffad76 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/PredicateITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CatalogITCaseBase.java
@@ -20,6 +20,11 @@ package org.apache.flink.table.store.connector;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
@@ -27,17 +32,14 @@ import org.apache.flink.util.CloseableIterator;
import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
import org.junit.Before;
-import org.junit.Test;
import java.io.IOException;
import java.util.List;
-import static org.assertj.core.api.Assertions.assertThat;
+/** ITCase for catalog. */
+public abstract class CatalogITCaseBase extends AbstractTestBase {
-/** Predicate ITCase. */
-public class PredicateITCase extends AbstractTestBase {
-
- private TableEnvironment tEnv;
+ protected TableEnvironment tEnv;
@Before
public void before() throws IOException {
@@ -50,33 +52,16 @@ public class PredicateITCase extends AbstractTestBase {
tEnv.useCatalog("TABLE_STORE");
}
- @Test
- public void testPkFilterBucket() throws Exception {
- sql("CREATE TABLE T (a INT PRIMARY KEY NOT ENFORCED, b INT) WITH ('bucket' = '5')");
- innerTest();
- }
-
- @Test
- public void testNoPkFilterBucket() throws Exception {
- sql("CREATE TABLE T (a INT, b INT) WITH ('bucket' = '5', 'bucket-key'='a')");
- innerTest();
- }
-
- @Test
- public void testAppendFilterBucket() throws Exception {
- sql(
- "CREATE TABLE T (a INT, b INT) WITH ('bucket' = '5', 'bucket-key'='a', 'write-mode'='append-only')");
- innerTest();
- }
-
- private void innerTest() throws Exception {
- sql("INSERT INTO T VALUES (1, 2), (3, 4), (5, 6), (7, 8), (9, 10)");
- assertThat(sql("SELECT * FROM T WHERE a = 5")).containsExactlyInAnyOrder(Row.of(5, 6));
- }
-
- private List<Row> sql(String query, Object... args) throws Exception {
+ protected List<Row> sql(String query, Object... args) throws Exception {
try (CloseableIterator<Row> iter = tEnv.executeSql(String.format(query, args)).collect()) {
return ImmutableList.copyOf(iter);
}
}
+
+ protected CatalogTable table(String tableName) throws TableNotExistException {
+ Catalog catalog = tEnv.getCatalog(tEnv.getCurrentCatalog()).get();
+ CatalogBaseTable table =
+ catalog.getTable(new ObjectPath(catalog.getDefaultDatabase(), tableName));
+ return (CatalogTable) table;
+ }
}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/PredicateITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/PredicateITCase.java
index 4e56fec8..ab122791 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/PredicateITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/PredicateITCase.java
@@ -18,37 +18,14 @@
package org.apache.flink.table.store.connector;
-import org.apache.flink.table.api.EnvironmentSettings;
-import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.types.Row;
-import org.apache.flink.util.CloseableIterator;
-import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
-
-import org.junit.Before;
import org.junit.Test;
-import java.io.IOException;
-import java.util.List;
-
import static org.assertj.core.api.Assertions.assertThat;
/** Predicate ITCase. */
-public class PredicateITCase extends AbstractTestBase {
-
- private TableEnvironment tEnv;
-
- @Before
- public void before() throws IOException {
- tEnv = TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
- tEnv.executeSql(
- String.format(
- "CREATE CATALOG TABLE_STORE WITH ("
- + "'type'='table-store', 'warehouse'='%s')",
- TEMPORARY_FOLDER.newFolder().toURI()));
- tEnv.useCatalog("TABLE_STORE");
- }
+public class PredicateITCase extends CatalogITCaseBase {
@Test
public void testPkFilterBucket() throws Exception {
@@ -73,10 +50,4 @@ public class PredicateITCase extends AbstractTestBase {
sql("INSERT INTO T VALUES (1, 2), (3, 4), (5, 6), (7, 8), (9, 10)");
assertThat(sql("SELECT * FROM T WHERE a = 5")).containsExactlyInAnyOrder(Row.of(5, 6));
}
-
- private List<Row> sql(String query, Object... args) throws Exception {
- try (CloseableIterator<Row> iter = tEnv.executeSql(String.format(query, args)).collect()) {
- return ImmutableList.copyOf(iter);
- }
- }
}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/SchemaChangeITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/SchemaChangeITCase.java
new file mode 100644
index 00000000..378957a5
--- /dev/null
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/SchemaChangeITCase.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.junit.Test;
+
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** ITCase for schema changes. */
+public class SchemaChangeITCase extends CatalogITCaseBase {
+
+ // TODO cover more cases once Flink supports more ALTER operations.
+
+ @Test
+ public void testSetAndRemoveOption() throws Exception {
+ sql("CREATE TABLE T (a STRING, b STRING, c STRING)");
+ sql("ALTER TABLE T SET ('xyc'='unknown1', 'abc'='unknown2')");
+
+ Map<String, String> options = table("T").getOptions();
+ assertThat(options).containsEntry("xyc", "unknown1");
+ assertThat(options).containsEntry("abc", "unknown2");
+
+ sql("ALTER TABLE T RESET ('xyc', 'abc')");
+
+ options = table("T").getOptions();
+ assertThat(options).doesNotContainKey("xyc");
+ assertThat(options).doesNotContainKey("abc");
+ }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java
index 1642d4ff..5d46aa07 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.store.file.catalog;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.store.file.schema.SchemaChange;
import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.file.schema.UpdateSchema;
import org.apache.flink.table.store.table.FileStoreTable;
@@ -150,15 +151,15 @@ public interface Catalog extends AutoCloseable {
throws TableAlreadyExistException, DatabaseNotExistException;
/**
- * Modify an existing table.
+ * Modify an existing table from {@link SchemaChange}s.
*
* @param tablePath path of the table to be modified
- * @param newTableSchema the new table definition
+ * @param changes the schema changes
* @param ignoreIfNotExists flag to specify behavior when the table does not exist: if set to
* false, throw an exception, if set to true, do nothing.
* @throws TableNotExistException if the table does not exist
*/
- void alterTable(ObjectPath tablePath, UpdateSchema newTableSchema, boolean ignoreIfNotExists)
+ void alterTable(ObjectPath tablePath, List<SchemaChange> changes, boolean ignoreIfNotExists)
throws TableNotExistException;
/** Exception for trying to drop on a database that is not empty. */
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalog.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalog.java
index 23c55131..6dae077e 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalog.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalog.java
@@ -22,6 +22,7 @@ import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.store.file.schema.SchemaChange;
import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.file.schema.UpdateSchema;
@@ -159,22 +160,17 @@ public class FileSystemCatalog extends AbstractCatalog {
throw new TableAlreadyExistException(tablePath);
}
- commitTableChange(path, table);
+ uncheck(() -> new SchemaManager(path).commitNewVersion(table));
}
@Override
- public void alterTable(ObjectPath tablePath, UpdateSchema newTable, boolean ignoreIfNotExists)
+ public void alterTable(
+ ObjectPath tablePath, List<SchemaChange> changes, boolean ignoreIfNotExists)
throws TableNotExistException {
- Path path = getTableLocation(tablePath);
- if (!tableExists(path)) {
- if (ignoreIfNotExists) {
- return;
- }
-
+ if (!tableExists(tablePath)) {
throw new TableNotExistException(tablePath);
}
-
- commitTableChange(path, newTable);
+ uncheck(() -> new SchemaManager(getTableLocation(tablePath)).commitChanges(changes));
}
private static <T> T uncheck(Callable<T> callable) {
@@ -194,10 +190,6 @@ public class FileSystemCatalog extends AbstractCatalog {
return name.substring(0, name.length() - DB_SUFFIX.length());
}
- private void commitTableChange(Path tablePath, UpdateSchema table) {
- uncheck(() -> new SchemaManager(tablePath).commitNewVersion(table));
- }
-
@Override
public void close() throws Exception {}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/Lock.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/Lock.java
index 9f28f833..9f28769c 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/Lock.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/Lock.java
@@ -18,11 +18,56 @@
package org.apache.flink.table.store.file.operation;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.store.file.catalog.CatalogLock;
+
+import javax.annotation.Nullable;
+
import java.util.concurrent.Callable;
/** An interface that allows file store to use global lock to some transaction-related things. */
-public interface Lock {
+public interface Lock extends AutoCloseable {
/** Run with lock. */
<T> T runWithLock(Callable<T> callable) throws Exception;
+
+ @Nullable
+ static Lock fromCatalog(CatalogLock.Factory lockFactory, ObjectPath tablePath) {
+ if (lockFactory == null) {
+ return null;
+ }
+
+ return fromCatalog(lockFactory.create(), tablePath);
+ }
+
+ @Nullable
+ static Lock fromCatalog(CatalogLock lock, ObjectPath tablePath) {
+ if (lock == null) {
+ return null;
+ }
+ return new CatalogLockImpl(lock, tablePath);
+ }
+
+ /** A {@link Lock} to wrap {@link CatalogLock}. */
+ class CatalogLockImpl implements Lock {
+
+ private final CatalogLock catalogLock;
+ private final ObjectPath tablePath;
+
+ private CatalogLockImpl(CatalogLock catalogLock, ObjectPath tablePath) {
+ this.catalogLock = catalogLock;
+ this.tablePath = tablePath;
+ }
+
+ @Override
+ public <T> T runWithLock(Callable<T> callable) throws Exception {
+ return catalogLock.runWithLock(
+ tablePath.getDatabaseName(), tablePath.getObjectName(), callable);
+ }
+
+ @Override
+ public void close() throws Exception {
+ this.catalogLock.close();
+ }
+ }
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaChange.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaChange.java
new file mode 100644
index 00000000..58a3a6c6
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaChange.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.schema;
+
+import org.apache.flink.table.types.logical.LogicalType;
+
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+
+/** Schema change to table. */
+public interface SchemaChange {
+
+ // TODO more change support like updateColumnNullability and updateColumnComment.
+
+ static SchemaChange setOption(String key, String value) {
+ return new SetOption(key, value);
+ }
+
+ static SchemaChange removeOption(String key) {
+ return new RemoveOption(key);
+ }
+
+ static SchemaChange addColumn(
+ String fieldName, LogicalType logicalType, boolean isNullable, String comment) {
+ return new AddColumn(fieldName, logicalType, isNullable, comment);
+ }
+
+ static SchemaChange updateColumnType(String fieldName, LogicalType newLogicalType) {
+ return new UpdateColumnType(fieldName, newLogicalType);
+ }
+
+ /** A SchemaChange to set a table option. */
+ final class SetOption implements SchemaChange {
+ private final String key;
+ private final String value;
+
+ private SetOption(String key, String value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ public String key() {
+ return key;
+ }
+
+ public String value() {
+ return value;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ SetOption that = (SetOption) o;
+ return key.equals(that.key) && value.equals(that.value);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(key, value);
+ }
+ }
+
+ /** A SchemaChange to remove a table option. */
+ final class RemoveOption implements SchemaChange {
+ private final String key;
+
+ private RemoveOption(String key) {
+ this.key = key;
+ }
+
+ public String key() {
+ return key;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ RemoveOption that = (RemoveOption) o;
+ return key.equals(that.key);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(key);
+ }
+ }
+
+ /** A SchemaChange to add a field. */
+ final class AddColumn implements SchemaChange {
+ private final String fieldName;
+ private final LogicalType logicalType;
+ private final boolean isNullable;
+ private final String description;
+
+ private AddColumn(
+ String fieldName, LogicalType logicalType, boolean isNullable, String description) {
+ this.fieldName = fieldName;
+ this.logicalType = logicalType;
+ this.isNullable = isNullable;
+ this.description = description;
+ }
+
+ public String fieldName() {
+ return fieldName;
+ }
+
+ public LogicalType logicalType() {
+ return logicalType;
+ }
+
+ public boolean isNullable() {
+ return isNullable;
+ }
+
+ @Nullable
+ public String description() {
+ return description;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ AddColumn addColumn = (AddColumn) o;
+ return isNullable == addColumn.isNullable
+ && Objects.equals(fieldName, addColumn.fieldName)
+ && logicalType.equals(addColumn.logicalType)
+ && Objects.equals(description, addColumn.description);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = Objects.hash(logicalType, isNullable, description);
+ result = 31 * result + Objects.hashCode(fieldName);
+ return result;
+ }
+ }
+
+ /** A SchemaChange to update the field type. */
+ final class UpdateColumnType implements SchemaChange {
+ private final String fieldName;
+ private final LogicalType newLogicalType;
+
+ private UpdateColumnType(String fieldName, LogicalType newLogicalType) {
+ this.fieldName = fieldName;
+ this.newLogicalType = newLogicalType;
+ }
+
+ public String fieldName() {
+ return fieldName;
+ }
+
+ public LogicalType newLogicalType() {
+ return newLogicalType;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ UpdateColumnType that = (UpdateColumnType) o;
+ return Objects.equals(fieldName, that.fieldName)
+ && newLogicalType.equals(that.newLogicalType);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = Objects.hash(newLogicalType);
+ result = 31 * result + Objects.hashCode(fieldName);
+ return result;
+ }
+ }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
index ad4a13c5..b5a0c775 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
@@ -21,19 +21,28 @@ package org.apache.flink.table.store.file.schema;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.store.file.operation.Lock;
+import org.apache.flink.table.store.file.schema.SchemaChange.AddColumn;
+import org.apache.flink.table.store.file.schema.SchemaChange.RemoveOption;
+import org.apache.flink.table.store.file.schema.SchemaChange.SetOption;
+import org.apache.flink.table.store.file.schema.SchemaChange.UpdateColumnType;
import org.apache.flink.table.store.file.utils.AtomicFileWriter;
import org.apache.flink.table.store.file.utils.FileUtils;
import org.apache.flink.table.store.file.utils.JsonSerdeUtil;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.io.Serializable;
import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import static org.apache.flink.table.store.file.utils.FileUtils.listVersionedFiles;
@@ -45,10 +54,17 @@ public class SchemaManager implements Serializable {
private final Path tableRoot;
+ @Nullable private transient Lock lock;
+
public SchemaManager(Path tableRoot) {
this.tableRoot = tableRoot;
}
+ public SchemaManager withLock(@Nullable Lock lock) {
+ this.lock = lock;
+ return this;
+ }
+
/** @return latest schema. */
public Optional<TableSchema> latest() {
try {
@@ -77,11 +93,6 @@ public class SchemaManager implements Serializable {
/** Create a new schema from {@link UpdateSchema}. */
public TableSchema commitNewVersion(UpdateSchema updateSchema) throws Exception {
- return commitNewVersion(Callable::call, updateSchema);
- }
-
- /** Create a new schema from {@link UpdateSchema}. */
- public TableSchema commitNewVersion(Lock lock, UpdateSchema updateSchema) throws Exception {
RowType rowType = updateSchema.rowType();
List<String> partitionKeys = updateSchema.partitionKeys();
List<String> primaryKeys = updateSchema.primaryKeys();
@@ -119,7 +130,7 @@ public class SchemaManager implements Serializable {
id = 0;
}
- TableSchema tableSchema =
+ TableSchema newSchema =
new TableSchema(
id,
fields,
@@ -129,25 +140,105 @@ public class SchemaManager implements Serializable {
options,
updateSchema.comment());
- Path schemaPath = toSchemaPath(id);
+ boolean success = commit(newSchema);
+ if (success) {
+ return newSchema;
+ }
+ }
+ }
- FileSystem fs = schemaPath.getFileSystem();
- boolean success =
- lock.runWithLock(
- () -> {
- if (fs.exists(schemaPath)) {
- return false;
- }
+ /** Create {@link SchemaChange}s. */
+ public TableSchema commitChanges(List<SchemaChange> changes) throws Exception {
+ while (true) {
+ TableSchema schema =
+ latest().orElseThrow(
+ () -> new RuntimeException("Table not exists: " + tableRoot));
+ Map<String, String> newOptions = new HashMap<>(schema.options());
+ List<DataField> newFields = new ArrayList<>(schema.fields());
+ AtomicInteger highestFieldId = new AtomicInteger(schema.highestFieldId());
+ for (SchemaChange change : changes) {
+ if (change instanceof SetOption) {
+ SetOption setOption = (SetOption) change;
+ newOptions.put(setOption.key(), setOption.value());
+ } else if (change instanceof RemoveOption) {
+ newOptions.remove(((RemoveOption) change).key());
+ } else if (change instanceof AddColumn) {
+ AddColumn addColumn = (AddColumn) change;
+ int id = highestFieldId.incrementAndGet();
+ DataType dataType =
+ TableSchema.toDataType(addColumn.logicalType(), highestFieldId);
+ newFields.add(
+ new DataField(
+ id, addColumn.fieldName(), dataType, addColumn.description()));
+ } else if (change instanceof UpdateColumnType) {
+ UpdateColumnType update = (UpdateColumnType) change;
+ boolean found = false;
+ for (int i = 0; i < newFields.size(); i++) {
+ DataField field = newFields.get(i);
+ if (field.name().equals(update.fieldName())) {
+ AtomicInteger dummyId = new AtomicInteger(0);
+ DataType newType =
+ TableSchema.toDataType(update.newLogicalType(), dummyId);
+ if (dummyId.get() != 0) {
+ throw new RuntimeException(
+ String.format(
+ "Update column to nested row type '%s' is not supported.",
+ update.newLogicalType()));
+ }
+ newFields.set(
+ i,
+ new DataField(
+ field.id(),
+ field.name(),
+ newType,
+ field.description()));
+ found = true;
+ break;
+ }
+ }
+
+ if (!found) {
+ throw new RuntimeException("Can not find column: " + update.fieldName());
+ }
+ } else {
+ throw new UnsupportedOperationException(
+ "Unsupported change: " + change.getClass());
+ }
+ }
- return AtomicFileWriter.writeFileUtf8(
- schemaPath, tableSchema.toString());
- });
+ TableSchema newSchema =
+ new TableSchema(
+ schema.id() + 1,
+ newFields,
+ highestFieldId.get(),
+ schema.partitionKeys(),
+ schema.primaryKeys(),
+ newOptions,
+ schema.comment());
+
+ boolean success = commit(newSchema);
if (success) {
- return tableSchema;
+ return newSchema;
}
}
}
+ private boolean commit(TableSchema newSchema) throws Exception {
+ Path schemaPath = toSchemaPath(newSchema.id());
+ FileSystem fs = schemaPath.getFileSystem();
+ Callable<Boolean> callable =
+ () -> {
+ if (fs.exists(schemaPath)) {
+ return false;
+ }
+ return AtomicFileWriter.writeFileUtf8(schemaPath, newSchema.toString());
+ };
+ if (lock == null) {
+ return callable.call();
+ }
+ return lock.runWithLock(callable);
+ }
+
/** Read schema for schema id. */
public TableSchema schema(long id) {
try {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/TableSchema.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/TableSchema.java
index bf7d0068..dbdeccf1 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/TableSchema.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/TableSchema.java
@@ -249,7 +249,7 @@ public class TableSchema implements Serializable {
return ((RowDataType) toDataType(rowType, new AtomicInteger(-1))).fields();
}
- private static DataType toDataType(LogicalType type, AtomicInteger currentHighestFieldId) {
+ public static DataType toDataType(LogicalType type, AtomicInteger currentHighestFieldId) {
if (type instanceof ArrayType) {
DataType element =
toDataType(((ArrayType) type).getElementType(), currentHighestFieldId);
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCommit.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCommit.java
index 7fe87165..78c318cc 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCommit.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCommit.java
@@ -34,12 +34,13 @@ import java.util.Map;
* An abstraction layer above {@link FileStoreCommit} and {@link FileStoreExpire} to provide
* snapshot commit and expiration.
*/
-public class TableCommit {
+public class TableCommit implements AutoCloseable {
private final FileStoreCommit commit;
private final FileStoreExpire expire;
@Nullable private Map<String, String> overwritePartition = null;
+ @Nullable private Lock lock;
public TableCommit(FileStoreCommit commit, FileStoreExpire expire) {
this.commit = commit;
@@ -54,6 +55,7 @@ public class TableCommit {
public TableCommit withLock(Lock lock) {
commit.withLock(lock);
expire.withLock(lock);
+ this.lock = lock;
return this;
}
@@ -84,4 +86,11 @@ public class TableCommit {
}
expire.expire();
}
+
+ @Override
+ public void close() throws Exception {
+ if (lock != null) {
+ lock.close();
+ }
+ }
}
diff --git a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java
index f6a49057..b649c872 100644
--- a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java
+++ b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java
@@ -22,7 +22,9 @@ import org.apache.flink.core.fs.Path;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.store.file.catalog.AbstractCatalog;
import org.apache.flink.table.store.file.catalog.CatalogLock;
+import org.apache.flink.table.store.file.operation.Lock;
import org.apache.flink.table.store.file.schema.DataField;
+import org.apache.flink.table.store.file.schema.SchemaChange;
import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.file.schema.UpdateSchema;
@@ -53,6 +55,8 @@ import java.util.Optional;
import java.util.stream.Collectors;
import static org.apache.flink.table.store.CatalogOptions.LOCK_ENABLED;
+import static org.apache.flink.table.store.hive.HiveCatalogLock.acquireTimeout;
+import static org.apache.flink.table.store.hive.HiveCatalogLock.checkMaxSleep;
/** A catalog implementation for Hive. */
public class HiveCatalog extends AbstractCatalog {
@@ -78,14 +82,16 @@ public class HiveCatalog extends AbstractCatalog {
@Override
public Optional<CatalogLock.Factory> lockFactory() {
- boolean lockEnabled =
- Boolean.parseBoolean(
- hiveConf.get(LOCK_ENABLED.key(), LOCK_ENABLED.defaultValue().toString()));
- return lockEnabled
+ return lockEnabled()
? Optional.of(HiveCatalogLock.createFactory(hiveConf))
: Optional.empty();
}
+ private boolean lockEnabled() {
+ return Boolean.parseBoolean(
+ hiveConf.get(LOCK_ENABLED.key(), LOCK_ENABLED.defaultValue().toString()));
+ }
+
@Override
public List<String> listDatabases() {
try {
@@ -211,7 +217,16 @@ public class HiveCatalog extends AbstractCatalog {
// first commit changes to underlying files
// if changes on Hive fails there is no harm to perform the same changes to files again
- TableSchema schema = commitToUnderlyingFiles(tablePath, updateSchema);
+ TableSchema schema;
+ try {
+ schema = schemaManager(tablePath).commitNewVersion(updateSchema);
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Failed to commit changes of table "
+ + tablePath.getFullName()
+ + " to underlying files",
+ e);
+ }
Table table = newHmsTable(tablePath);
updateHmsTable(table, tablePath, schema);
try {
@@ -223,7 +238,7 @@ public class HiveCatalog extends AbstractCatalog {
@Override
public void alterTable(
- ObjectPath tablePath, UpdateSchema updateSchema, boolean ignoreIfNotExists)
+ ObjectPath tablePath, List<SchemaChange> changes, boolean ignoreIfNotExists)
throws TableNotExistException {
if (isTableStoreTableNotExisted(tablePath)) {
if (ignoreIfNotExists) {
@@ -233,20 +248,21 @@ public class HiveCatalog extends AbstractCatalog {
}
}
- // first commit changes to underlying files
- // if changes on Hive fails there is no harm to perform the same changes to files again
- TableSchema schema = commitToUnderlyingFiles(tablePath, updateSchema);
try {
+ // first commit changes to underlying files
+ TableSchema schema = schemaManager(tablePath).commitChanges(changes);
+
+ // sync to hive hms
Table table = client.getTable(tablePath.getDatabaseName(), tablePath.getObjectName());
updateHmsTable(table, tablePath, schema);
client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), table);
- } catch (TException e) {
- throw new RuntimeException("Failed to alter table " + tablePath.getFullName(), e);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
}
@Override
- public void close() throws Exception {
+ public void close() {
client.close();
}
@@ -343,17 +359,18 @@ public class HiveCatalog extends AbstractCatalog {
return false;
}
- private TableSchema commitToUnderlyingFiles(ObjectPath tablePath, UpdateSchema schema) {
- Path path = getTableLocation(tablePath);
- try {
- return new SchemaManager(path).commitNewVersion(schema);
- } catch (Exception e) {
- throw new RuntimeException(
- "Failed to commit changes of table "
- + tablePath.getFullName()
- + " to underlying files",
- e);
+ private SchemaManager schemaManager(ObjectPath tablePath) {
+ return new SchemaManager(getTableLocation(tablePath)).withLock(lock(tablePath));
+ }
+
+ private Lock lock(ObjectPath tablePath) {
+ if (!lockEnabled()) {
+ return null;
}
+
+ HiveCatalogLock lock =
+ new HiveCatalogLock(client, checkMaxSleep(hiveConf), acquireTimeout(hiveConf));
+ return Lock.fromCatalog(lock, tablePath);
}
static IMetaStoreClient createClient(HiveConf hiveConf) {
diff --git a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalogLock.java b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalogLock.java
index 7dd8232c..64352278 100644
--- a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalogLock.java
+++ b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalogLock.java
@@ -129,22 +129,24 @@ public class HiveCatalogLock implements CatalogLock {
@Override
public CatalogLock create() {
HiveConf conf = hiveConf.conf();
- long checkMaxSleep =
- TimeUtils.parseDuration(
- conf.get(
- LOCK_CHECK_MAX_SLEEP.key(),
- TimeUtils.getStringInMillis(
- LOCK_CHECK_MAX_SLEEP.defaultValue())))
- .toMillis();
- long acquireTimeout =
- TimeUtils.parseDuration(
- conf.get(
- LOCK_ACQUIRE_TIMEOUT.key(),
- TimeUtils.getStringInMillis(
- LOCK_ACQUIRE_TIMEOUT.defaultValue())))
- .toMillis();
return new HiveCatalogLock(
- HiveCatalog.createClient(conf), checkMaxSleep, acquireTimeout);
+ HiveCatalog.createClient(conf), checkMaxSleep(conf), acquireTimeout(conf));
}
}
+
+ public static long checkMaxSleep(HiveConf conf) {
+ return TimeUtils.parseDuration(
+ conf.get(
+ LOCK_CHECK_MAX_SLEEP.key(),
+ TimeUtils.getStringInMillis(LOCK_CHECK_MAX_SLEEP.defaultValue())))
+ .toMillis();
+ }
+
+ public static long acquireTimeout(HiveConf conf) {
+ return TimeUtils.parseDuration(
+ conf.get(
+ LOCK_ACQUIRE_TIMEOUT.key(),
+ TimeUtils.getStringInMillis(LOCK_ACQUIRE_TIMEOUT.defaultValue())))
+ .toMillis();
+ }
}
diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
index ae8e2f51..b53cb33f 100644
--- a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
+++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
@@ -22,6 +22,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.store.file.catalog.Catalog;
import org.apache.flink.table.store.file.catalog.CatalogFactory;
+import org.apache.flink.table.store.file.schema.SchemaChange;
import org.apache.flink.util.Preconditions;
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
@@ -32,6 +33,10 @@ import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
import org.apache.spark.sql.connector.catalog.Table;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.connector.catalog.TableChange;
+import org.apache.spark.sql.connector.catalog.TableChange.AddColumn;
+import org.apache.spark.sql.connector.catalog.TableChange.RemoveProperty;
+import org.apache.spark.sql.connector.catalog.TableChange.SetProperty;
+import org.apache.spark.sql.connector.catalog.TableChange.UpdateColumnType;
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
@@ -40,6 +45,9 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.store.spark.SparkTypeUtils.toFlinkType;
/** Spark {@link TableCatalog} for table store. */
public class SparkCatalog implements TableCatalog, SupportsNamespaces {
@@ -101,28 +109,70 @@ public class SparkCatalog implements TableCatalog, SupportsNamespaces {
}
@Override
- public Table loadTable(Identifier ident) throws NoSuchTableException {
- if (!isValidateNamespace(ident.namespace())) {
+ public SparkTable loadTable(Identifier ident) throws NoSuchTableException {
+ try {
+ return new SparkTable(catalog.getTable(objectPath(ident)));
+ } catch (Catalog.TableNotExistException e) {
throw new NoSuchTableException(ident);
}
+ }
+ @Override
+ public Table alterTable(Identifier ident, TableChange... changes) throws NoSuchTableException {
+ List<SchemaChange> schemaChanges =
+ Arrays.stream(changes).map(this::toSchemaChange).collect(Collectors.toList());
try {
- return new SparkTable(
- catalog.getTable(new ObjectPath(ident.namespace()[0], ident.name())));
+ catalog.alterTable(objectPath(ident), schemaChanges, false);
+ return loadTable(ident);
} catch (Catalog.TableNotExistException e) {
throw new NoSuchTableException(ident);
}
}
- @Override
- public void renameTable(Identifier oldIdent, Identifier newIdent) {
- throw new UnsupportedOperationException();
+ private SchemaChange toSchemaChange(TableChange change) {
+ if (change instanceof SetProperty) {
+ SetProperty set = (SetProperty) change;
+ return SchemaChange.setOption(set.property(), set.value());
+ } else if (change instanceof RemoveProperty) {
+ return SchemaChange.removeOption(((RemoveProperty) change).property());
+ } else if (change instanceof AddColumn) {
+ AddColumn add = (AddColumn) change;
+ validateAlterNestedField(add.fieldNames());
+ return SchemaChange.addColumn(
+ add.fieldNames()[0],
+ toFlinkType(add.dataType()),
+ add.isNullable(),
+ add.comment());
+ } else if (change instanceof UpdateColumnType) {
+ UpdateColumnType update = (UpdateColumnType) change;
+ validateAlterNestedField(update.fieldNames());
+ return SchemaChange.updateColumnType(
+ update.fieldNames()[0], toFlinkType(update.newDataType()));
+ } else {
+ throw new UnsupportedOperationException(
+ "Change is not supported: " + change.getClass());
+ }
+ }
+
+ private void validateAlterNestedField(String[] fieldNames) {
+ if (fieldNames.length > 1) {
+ throw new UnsupportedOperationException(
+ "Alter nested column is not supported: " + Arrays.toString(fieldNames));
+ }
}
private boolean isValidateNamespace(String[] namespace) {
return namespace.length == 1;
}
+ private ObjectPath objectPath(Identifier ident) throws NoSuchTableException {
+ if (!isValidateNamespace(ident.namespace())) {
+ throw new NoSuchTableException(ident);
+ }
+
+ return new ObjectPath(ident.namespace()[0], ident.name());
+ }
+
// --------------------- unsupported methods ----------------------------
@Override
@@ -150,12 +200,12 @@ public class SparkCatalog implements TableCatalog, SupportsNamespaces {
}
@Override
- public Table alterTable(Identifier ident, TableChange... changes) {
- throw new UnsupportedOperationException("Alter table in Spark is not supported yet.");
+ public boolean dropTable(Identifier ident) {
+ throw new UnsupportedOperationException("Drop table in Spark is not supported yet.");
}
@Override
- public boolean dropTable(Identifier ident) {
- throw new UnsupportedOperationException("Drop table in Spark is not supported yet.");
+ public void renameTable(Identifier oldIdent, Identifier newIdent) {
+ throw new UnsupportedOperationException();
}
}
diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTypeUtils.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTypeUtils.java
index 65481c21..db607fa3 100644
--- a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTypeUtils.java
+++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTypeUtils.java
@@ -33,6 +33,7 @@ import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.MultisetType;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.RowType.RowField;
import org.apache.flink.table.types.logical.SmallIntType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.TinyIntType;
@@ -42,11 +43,13 @@ import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.LongType;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.types.UserDefinedType;
+import java.util.ArrayList;
import java.util.List;
-import java.util.stream.Collectors;
/** Utils for spark {@link DataType}. */
public class SparkTypeUtils {
@@ -58,12 +61,16 @@ public class SparkTypeUtils {
}
public static DataType fromFlinkType(LogicalType type) {
- return type.accept(FlinkToSparkTypeVistor.INSTANCE);
+ return type.accept(FlinkToSparkTypeVisitor.INSTANCE);
}
- private static class FlinkToSparkTypeVistor extends LogicalTypeDefaultVisitor<DataType> {
+ public static LogicalType toFlinkType(DataType dataType) {
+ return SparkToFlinkTypeVisitor.visit(dataType);
+ }
+
+ private static class FlinkToSparkTypeVisitor extends LogicalTypeDefaultVisitor<DataType> {
- private static final FlinkToSparkTypeVistor INSTANCE = new FlinkToSparkTypeVistor();
+ private static final FlinkToSparkTypeVisitor INSTANCE = new FlinkToSparkTypeVisitor();
@Override
public DataType visit(CharType charType) {
@@ -162,15 +169,17 @@ public class SparkTypeUtils {
@Override
public DataType visit(RowType rowType) {
- List<StructField> fields =
- rowType.getFields().stream()
- .map(
- field ->
- DataTypes.createStructField(
- field.getName(),
- field.getType().accept(this),
- field.getType().isNullable()))
- .collect(Collectors.toList());
+ List<StructField> fields = new ArrayList<>(rowType.getFieldCount());
+ for (RowField field : rowType.getFields()) {
+ StructField structField =
+ DataTypes.createStructField(
+ field.getName(),
+ field.getType().accept(this),
+ field.getType().isNullable());
+ structField =
+ field.getDescription().map(structField::withComment).orElse(structField);
+ fields.add(structField);
+ }
return DataTypes.createStructType(fields);
}
@@ -179,4 +188,101 @@ public class SparkTypeUtils {
throw new UnsupportedOperationException("Unsupported type: " + logicalType);
}
}
+
+ private static class SparkToFlinkTypeVisitor {
+
+ static LogicalType visit(DataType type) {
+ return visit(type, new SparkToFlinkTypeVisitor());
+ }
+
+ static LogicalType visit(DataType type, SparkToFlinkTypeVisitor visitor) {
+ if (type instanceof StructType) {
+ StructField[] fields = ((StructType) type).fields();
+ List<LogicalType> fieldResults = new ArrayList<>(fields.length);
+
+ for (StructField field : fields) {
+ fieldResults.add(visit(field.dataType(), visitor));
+ }
+
+ return visitor.struct((StructType) type, fieldResults);
+
+ } else if (type instanceof org.apache.spark.sql.types.MapType) {
+ return visitor.map(
+ (org.apache.spark.sql.types.MapType) type,
+ visit(((org.apache.spark.sql.types.MapType) type).keyType(), visitor),
+ visit(((org.apache.spark.sql.types.MapType) type).valueType(), visitor));
+
+ } else if (type instanceof org.apache.spark.sql.types.ArrayType) {
+ return visitor.array(
+ (org.apache.spark.sql.types.ArrayType) type,
+ visit(
+ ((org.apache.spark.sql.types.ArrayType) type).elementType(),
+ visitor));
+
+ } else if (type instanceof UserDefinedType) {
+ throw new UnsupportedOperationException("User-defined types are not supported");
+
+ } else {
+ return visitor.atomic(type);
+ }
+ }
+
+ public LogicalType struct(StructType struct, List<LogicalType> fieldResults) {
+ StructField[] fields = struct.fields();
+ List<RowField> newFields = new ArrayList<>(fields.length);
+ for (int i = 0; i < fields.length; i += 1) {
+ StructField field = fields[i];
+ LogicalType fieldType = fieldResults.get(i).copy(field.nullable());
+ String comment = field.getComment().getOrElse(() -> null);
+ newFields.add(new RowField(field.name(), fieldType, comment));
+ }
+
+ return new RowType(newFields);
+ }
+
+ public LogicalType array(
+ org.apache.spark.sql.types.ArrayType array, LogicalType elementResult) {
+ return new ArrayType(elementResult.copy(array.containsNull()));
+ }
+
+ public LogicalType map(
+ org.apache.spark.sql.types.MapType map,
+ LogicalType keyResult,
+ LogicalType valueResult) {
+ return new MapType(keyResult.copy(false), valueResult.copy(map.valueContainsNull()));
+ }
+
+ public LogicalType atomic(DataType atomic) {
+ if (atomic instanceof org.apache.spark.sql.types.BooleanType) {
+ return new BooleanType();
+ } else if (atomic instanceof org.apache.spark.sql.types.ByteType) {
+ return new TinyIntType();
+ } else if (atomic instanceof org.apache.spark.sql.types.ShortType) {
+ return new SmallIntType();
+ } else if (atomic instanceof org.apache.spark.sql.types.IntegerType) {
+ return new IntType();
+ } else if (atomic instanceof LongType) {
+ return new BigIntType();
+ } else if (atomic instanceof org.apache.spark.sql.types.FloatType) {
+ return new FloatType();
+ } else if (atomic instanceof org.apache.spark.sql.types.DoubleType) {
+ return new DoubleType();
+ } else if (atomic instanceof org.apache.spark.sql.types.StringType) {
+ return new VarCharType(VarCharType.MAX_LENGTH);
+ } else if (atomic instanceof org.apache.spark.sql.types.DateType) {
+ return new DateType();
+ } else if (atomic instanceof org.apache.spark.sql.types.TimestampType) {
+ return new TimestampType();
+ } else if (atomic instanceof org.apache.spark.sql.types.DecimalType) {
+ return new DecimalType(
+ ((org.apache.spark.sql.types.DecimalType) atomic).precision(),
+ ((org.apache.spark.sql.types.DecimalType) atomic).scale());
+ } else if (atomic instanceof org.apache.spark.sql.types.BinaryType) {
+ return new VarBinaryType(VarBinaryType.MAX_LENGTH);
+ }
+
+ throw new UnsupportedOperationException(
+ "Not a supported type: " + atomic.catalogString());
+ }
+ }
}
diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkInternalRowTest.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkInternalRowTest.java
index f1311dd0..ba8b5ddb 100644
--- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkInternalRowTest.java
+++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkInternalRowTest.java
@@ -28,7 +28,6 @@ import org.junit.jupiter.api.Test;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
-import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.AbstractMap;
@@ -57,24 +56,16 @@ public class SparkInternalRowTest {
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)),
new String[] {"v1", "v5"},
new Integer[] {10, 30},
- "char_v",
- "varchar_v",
true,
(byte) 22,
(short) 356,
23567222L,
- "binary_v".getBytes(StandardCharsets.UTF_8),
"varbinary_v".getBytes(StandardCharsets.UTF_8),
LocalDateTime.parse("2007-12-03T10:15:30"),
- Instant.parse("2007-12-03T10:15:30.00Z"),
LocalDate.parse("2022-05-02"),
BigDecimal.valueOf(0.21),
BigDecimal.valueOf(65782123123.01),
- BigDecimal.valueOf(62123123.5),
- Stream.of(
- new AbstractMap.SimpleEntry<>("key1", 5),
- new AbstractMap.SimpleEntry<>("key2", 2))
- .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
+ BigDecimal.valueOf(62123123.5));
RowRowConverter flinkConverter =
RowRowConverter.create(TypeConversions.fromLogicalToDataType(ALL_TYPES));
@@ -96,21 +87,17 @@ public class SparkInternalRowTest {
+ "\"locations\":{\"key1\":{\"posX\":1.2,\"posY\":2.3},\"key2\":{\"posX\":2.4,\"posY\":3.5}},"
+ "\"strArray\":[\"v1\",\"v5\"],"
+ "\"intArray\":[10,30],"
- + "\"char\":\"char_v\","
- + "\"varchar\":\"varchar_v\","
+ "\"boolean\":true,"
+ "\"tinyint\":22,"
+ "\"smallint\":356,"
+ "\"bigint\":23567222,"
- + "\"varbinary\":\"YmluYXJ5X3Y=\","
- + "\"binary\":\"dmFyYmluYXJ5X3Y=\","
- + "\"timestampWithoutZone\":\"2007-12-03 10:15:30\","
- + "\"timestampWithZone\":\"2007-12-03 10:15:30\","
+ + "\"bytes\":\"dmFyYmluYXJ5X3Y=\","
+ + "\"timestamp\":\"2007-12-03 10:15:30\","
+ "\"date\":\"2022-05-02\","
+ "\"decimal\":0.21,"
+ "\"decimal2\":65782123123.01,"
- + "\"decimal3\":62123123.5,"
- + "\"multiset\":{\"key1\":5,\"key2\":2}}";
+ + "\"decimal3\":62123123.5"
+ + "}";
assertThat(sparkRow.json()).isEqualTo(expected);
}
}
diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
index 26567405..b7b2ffae 100644
--- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
+++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
@@ -21,6 +21,8 @@ package org.apache.flink.table.store.spark;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.store.file.schema.TableSchema;
+import org.apache.flink.table.store.table.FileStoreTableFactory;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.RowType;
@@ -39,6 +41,8 @@ import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
+import java.util.UUID;
import static org.assertj.core.api.Assertions.assertThat;
@@ -49,6 +53,8 @@ public class SparkReadITCase {
private static SparkSession spark = null;
+ private static Path warehousePath = null;
+
private static Path tablePath1;
private static Path tablePath2;
@@ -57,7 +63,7 @@ public class SparkReadITCase {
public static void startMetastoreAndSpark() throws Exception {
warehouse = File.createTempFile("warehouse", null);
assertThat(warehouse.delete()).isTrue();
- Path warehousePath = new Path("file:" + warehouse);
+ warehousePath = new Path("file:" + warehouse);
spark = SparkSession.builder().master("local[2]").getOrCreate();
spark.conf().set("spark.sql.catalog.table_store", SparkCatalog.class.getName());
spark.conf().set("spark.sql.catalog.table_store.warehouse", warehousePath.toString());
@@ -122,6 +128,56 @@ public class SparkReadITCase {
innerTestFilterPushDown(spark.table("table_store.default.t2"));
}
+ @Test
+ public void testSetAndRemoveOption() {
+ spark.sql("ALTER TABLE table_store.default.t1 SET TBLPROPERTIES('xyc' 'unknown1')");
+
+ Map<String, String> options = schema1().options();
+ assertThat(options).containsEntry("xyc", "unknown1");
+
+ spark.sql("ALTER TABLE table_store.default.t1 UNSET TBLPROPERTIES('xyc')");
+
+ options = schema1().options();
+ assertThat(options).doesNotContainKey("xyc");
+ }
+
+ @Test
+ public void testAddColumn() throws Exception {
+ Path tablePath = new Path(warehousePath, "default.db/" + UUID.randomUUID());
+ SimpleTableTestHelper testHelper1 = createTestHelper(tablePath);
+ testHelper1.write(GenericRowData.of(1, 2L, StringData.fromString("1")));
+ testHelper1.write(GenericRowData.of(5, 6L, StringData.fromString("3")));
+ testHelper1.commit();
+
+ spark.sql("ALTER TABLE table_store.default.testAddColumn ADD COLUMN d STRING");
+
+ Dataset<Row> table = spark.table("table_store.default.testAddColumn");
+ List<Row> results = table.collectAsList();
+ assertThat(results.toString()).isEqualTo("[[1,2,1,null], [5,6,3,null]]");
+
+ results = table.select("a", "c").collectAsList();
+ assertThat(results.toString()).isEqualTo("[[1,1], [5,3]]");
+
+ results = table.groupBy().sum("b").collectAsList();
+ assertThat(results.toString()).isEqualTo("[[8]]");
+ }
+
+ @Test
+ public void testAlterColumnType() throws Exception {
+ Path tablePath = new Path(warehousePath, "default.db/testAddColumn");
+ SimpleTableTestHelper testHelper1 = createTestHelper(tablePath);
+ testHelper1.write(GenericRowData.of(1, 2L, StringData.fromString("1")));
+ testHelper1.write(GenericRowData.of(5, 6L, StringData.fromString("3")));
+ testHelper1.commit();
+
+ spark.sql("ALTER TABLE table_store.default.testAddColumn ALTER COLUMN a TYPE BIGINT");
+ innerTestNormal(spark.table("table_store.default.testAddColumn"));
+ }
+
+ private TableSchema schema1() {
+ return FileStoreTableFactory.create(tablePath1).schema();
+ }
+
private void innerTestNormal(Dataset<Row> dataset) {
List<Row> results = dataset.collectAsList();
assertThat(results.toString()).isEqualTo("[[1,2,1], [5,6,3]]");
diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkTypeTest.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkTypeTest.java
index 12c8694a..abae4e2d 100644
--- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkTypeTest.java
+++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkTypeTest.java
@@ -22,8 +22,11 @@ import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.spark.sql.types.StructType;
import org.junit.jupiter.api.Test;
+import static org.apache.flink.table.store.spark.SparkTypeUtils.fromFlinkRowType;
+import static org.apache.flink.table.store.spark.SparkTypeUtils.toFlinkType;
import static org.assertj.core.api.Assertions.assertThat;
/** Test for {@link SparkTypeUtils}. */
@@ -38,7 +41,7 @@ public class SparkTypeTest {
.field(
"locations",
DataTypes.MAP(
- DataTypes.STRING(),
+ DataTypes.STRING().notNull(),
DataTypes.ROW(
DataTypes.FIELD(
"posX",
@@ -50,24 +53,20 @@ public class SparkTypeTest {
"Y field"))))
.field("strArray", DataTypes.ARRAY(DataTypes.STRING()).nullable())
.field("intArray", DataTypes.ARRAY(DataTypes.INT()).nullable())
- .field("char", DataTypes.CHAR(10).notNull())
- .field("varchar", DataTypes.VARCHAR(10).notNull())
.field("boolean", DataTypes.BOOLEAN().nullable())
.field("tinyint", DataTypes.TINYINT())
.field("smallint", DataTypes.SMALLINT())
.field("bigint", DataTypes.BIGINT())
- .field("varbinary", DataTypes.VARBINARY(10))
- .field("binary", DataTypes.BINARY(10))
- .field("timestampWithoutZone", DataTypes.TIMESTAMP())
- .field("timestampWithZone", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())
+ .field("bytes", DataTypes.BYTES())
+ .field("timestamp", DataTypes.TIMESTAMP())
.field("date", DataTypes.DATE())
.field("decimal", DataTypes.DECIMAL(2, 2))
.field("decimal2", DataTypes.DECIMAL(38, 2))
.field("decimal3", DataTypes.DECIMAL(10, 1))
- .field("multiset", DataTypes.MULTISET(DataTypes.STRING().notNull()))
.build()
.toRowDataType()
- .getLogicalType();
+ .getLogicalType()
+ .copy(true);
@Test
public void testAllTypes() {
@@ -84,22 +83,20 @@ public class SparkTypeTest {
+ ", "
+ "StructField(strArray,ArrayType(StringType,true),true), "
+ "StructField(intArray,ArrayType(IntegerType,true),true), "
- + "StructField(char,StringType,false), "
- + "StructField(varchar,StringType,false), "
+ "StructField(boolean,BooleanType,true), "
+ "StructField(tinyint,ByteType,true), "
+ "StructField(smallint,ShortType,true), "
+ "StructField(bigint,LongType,true), "
- + "StructField(varbinary,BinaryType,true), "
- + "StructField(binary,BinaryType,true), "
- + "StructField(timestampWithoutZone,TimestampType,true), "
- + "StructField(timestampWithZone,TimestampType,true), "
+ + "StructField(bytes,BinaryType,true), "
+ + "StructField(timestamp,TimestampType,true), "
+ "StructField(date,DateType,true), "
+ "StructField(decimal,DecimalType(2,2),true), "
+ "StructField(decimal2,DecimalType(38,2),true), "
- + "StructField(decimal3,DecimalType(10,1),true), "
- + "StructField(multiset,MapType(StringType,IntegerType,false),true))";
+ + "StructField(decimal3,DecimalType(10,1),true))";
- assertThat(SparkTypeUtils.fromFlinkRowType(ALL_TYPES).toString()).isEqualTo(expected);
+ StructType sparkType = fromFlinkRowType(ALL_TYPES);
+ assertThat(sparkType.toString()).isEqualTo(expected);
+
+ assertThat(toFlinkType(sparkType)).isEqualTo(ALL_TYPES);
}
}