You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/07/01 07:45:09 UTC
[flink-table-store] branch master updated: [FLINK-28339] Introduce SparkCatalog in table store
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new e303ec7a [FLINK-28339] Introduce SparkCatalog in table store
e303ec7a is described below
commit e303ec7a913601389dc323d9dc719f9c937f6ee2
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Fri Jul 1 15:45:04 2022 +0800
[FLINK-28339] Introduce SparkCatalog in table store
This closes #185
---
.../flink/table/store/connector/FlinkCatalog.java | 2 +-
.../table/store/connector/FlinkCatalogFactory.java | 51 +------
.../flink/table/store/file/catalog/Catalog.java | 16 +-
.../table/store/file/catalog/CatalogFactory.java | 55 +++++++
.../store/file/catalog/FileSystemCatalog.java | 2 +-
.../table/store/table/FileStoreTableFactory.java | 23 ++-
.../apache/flink/table/store/hive/HiveCatalog.java | 2 +-
.../flink/table/store/spark/SparkCatalog.java | 161 +++++++++++++++++++++
.../flink/table/store/spark/SparkReadITCase.java | 97 ++++++++-----
9 files changed, 311 insertions(+), 98 deletions(-)
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java
index 56840aa3..6077e57d 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java
@@ -140,7 +140,7 @@ public class FlinkCatalog extends AbstractCatalog {
throws TableNotExistException, CatalogException {
TableSchema schema;
try {
- schema = catalog.getTable(tablePath);
+ schema = catalog.getTableSchema(tablePath);
} catch (Catalog.TableNotExistException e) {
throw new TableNotExistException(getName(), e.tablePath());
}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalogFactory.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalogFactory.java
index 6021260f..8ff157da 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalogFactory.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalogFactory.java
@@ -23,30 +23,15 @@ import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.store.file.catalog.CatalogFactory;
-import org.apache.flink.table.store.file.catalog.FileSystemCatalogFactory;
-import org.apache.flink.util.Preconditions;
-import java.util.ArrayList;
import java.util.Collections;
-import java.util.List;
-import java.util.ServiceLoader;
import java.util.Set;
-import java.util.stream.Collectors;
/** Factory for {@link FlinkCatalog}. */
public class FlinkCatalogFactory implements org.apache.flink.table.factories.CatalogFactory {
public static final String IDENTIFIER = "table-store";
- public static final ConfigOption<String> METASTORE =
- ConfigOptions.key("metastore")
- .stringType()
- .defaultValue(FileSystemCatalogFactory.IDENTIFIER);
- private static final ConfigOption<String> WAREHOUSE =
- ConfigOptions.key("warehouse")
- .stringType()
- .noDefaultValue()
- .withDescription("The warehouse root path of catalog.");
public static final ConfigOption<String> DEFAULT_DATABASE =
ConfigOptions.key("default-database").stringType().defaultValue("default");
@@ -74,41 +59,7 @@ public class FlinkCatalogFactory implements org.apache.flink.table.factories.Cat
}
public static FlinkCatalog createCatalog(String catalogName, ReadableConfig options) {
- // manual validation
- // because different catalog types may have different options
- // we can't list them all in the optionalOptions() method
- String warehouse =
- Preconditions.checkNotNull(
- options.get(WAREHOUSE),
- "Table store '" + WAREHOUSE.key() + "' path must be set");
-
- String metastore = options.get(METASTORE);
- List<CatalogFactory> factories = new ArrayList<>();
- ServiceLoader.load(CatalogFactory.class, Thread.currentThread().getContextClassLoader())
- .iterator()
- .forEachRemaining(
- f -> {
- if (f.identifier().equals(metastore)) {
- factories.add(f);
- }
- });
- if (factories.size() != 1) {
- throw new RuntimeException(
- "Found "
- + factories.size()
- + " classes implementing "
- + CatalogFactory.class.getName()
- + " with metastore "
- + metastore
- + ". They are:\n"
- + factories.stream()
- .map(t -> t.getClass().getName())
- .collect(Collectors.joining("\n")));
- }
-
return new FlinkCatalog(
- factories.get(0).create(warehouse, options),
- catalogName,
- options.get(DEFAULT_DATABASE));
+ CatalogFactory.createCatalog(options), catalogName, options.get(DEFAULT_DATABASE));
}
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java
index 65c099f4..a7db7694 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java
@@ -22,6 +22,8 @@ import org.apache.flink.core.fs.Path;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.file.schema.UpdateSchema;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.FileStoreTableFactory;
import java.util.List;
@@ -92,10 +94,22 @@ public interface Catalog extends AutoCloseable {
* Return a {@link TableSchema} identified by the given {@link ObjectPath}.
*
* @param tablePath Path of the table
+ * @return The requested table schema
+ * @throws TableNotExistException if the target does not exist
+ */
+ TableSchema getTableSchema(ObjectPath tablePath) throws TableNotExistException;
+
+ /**
+ * Return a {@link FileStoreTable} identified by the given {@link ObjectPath}.
+ *
+ * @param tablePath Path of the table
* @return The requested table
* @throws TableNotExistException if the target does not exist
*/
- TableSchema getTable(ObjectPath tablePath) throws TableNotExistException;
+ default FileStoreTable getTable(ObjectPath tablePath) throws TableNotExistException {
+ TableSchema tableSchema = getTableSchema(tablePath);
+ return FileStoreTableFactory.create(getTableLocation(tablePath), tableSchema);
+ }
/**
* Check if a table exists in this catalog.
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/CatalogFactory.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/CatalogFactory.java
index 5e93a3ed..403c8be0 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/CatalogFactory.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/CatalogFactory.java
@@ -18,7 +18,15 @@
package org.apache.flink.table.store.file.catalog;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ServiceLoader;
+import java.util.stream.Collectors;
/** Factory to create {@link Catalog}. Each factory should have a unique identifier. */
public interface CatalogFactory {
@@ -26,4 +34,51 @@ public interface CatalogFactory {
String identifier();
Catalog create(String warehouse, ReadableConfig options);
+
+ ConfigOption<String> METASTORE =
+ ConfigOptions.key("metastore")
+ .stringType()
+ .defaultValue(FileSystemCatalogFactory.IDENTIFIER);
+
+ ConfigOption<String> WAREHOUSE =
+ ConfigOptions.key("warehouse")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The warehouse root path of catalog.");
+
+ static Catalog createCatalog(ReadableConfig options) {
+ // manual validation
+ // because different catalog types may have different options
+ // we can't list them all in the optionalOptions() method
+ String warehouse =
+ Preconditions.checkNotNull(
+ options.get(WAREHOUSE),
+ "Table store '" + WAREHOUSE.key() + "' path must be set");
+
+ String metastore = options.get(METASTORE);
+ List<CatalogFactory> factories = new ArrayList<>();
+ ServiceLoader.load(CatalogFactory.class, Thread.currentThread().getContextClassLoader())
+ .iterator()
+ .forEachRemaining(
+ f -> {
+ if (f.identifier().equals(metastore)) {
+ factories.add(f);
+ }
+ });
+ if (factories.size() != 1) {
+ throw new RuntimeException(
+ "Found "
+ + factories.size()
+ + " classes implementing "
+ + CatalogFactory.class.getName()
+ + " with metastore "
+ + metastore
+ + ". They are:\n"
+ + factories.stream()
+ .map(t -> t.getClass().getName())
+ .collect(Collectors.joining("\n")));
+ }
+
+ return factories.get(0).create(warehouse, options);
+ }
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalog.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalog.java
index 451ad8f5..6c16366a 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalog.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalog.java
@@ -106,7 +106,7 @@ public class FileSystemCatalog extends AbstractCatalog {
}
@Override
- public TableSchema getTable(ObjectPath tablePath) throws TableNotExistException {
+ public TableSchema getTableSchema(ObjectPath tablePath) throws TableNotExistException {
Path path = getTableLocation(tablePath);
return new SchemaManager(path)
.latest()
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTableFactory.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTableFactory.java
index e94c38ff..82a1a039 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTableFactory.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTableFactory.java
@@ -25,8 +25,6 @@ import org.apache.flink.table.store.file.WriteMode;
import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.schema.TableSchema;
-import java.util.HashMap;
-import java.util.Map;
import java.util.UUID;
import static org.apache.flink.table.store.file.FileStoreOptions.PATH;
@@ -46,9 +44,8 @@ public class FileStoreTableFactory {
public static FileStoreTable create(Configuration conf, String user) {
Path tablePath = FileStoreOptions.path(conf);
- SchemaManager schemaManager = new SchemaManager(tablePath);
TableSchema tableSchema =
- schemaManager
+ new SchemaManager(tablePath)
.latest()
.orElseThrow(
() ->
@@ -56,13 +53,23 @@ public class FileStoreTableFactory {
"Schema file not found in location "
+ tablePath
+ ". Please create table first."));
+ return create(tablePath, tableSchema, conf, user);
+ }
+ public static FileStoreTable create(Path tablePath, TableSchema tableSchema) {
+ return create(tablePath, tableSchema, new Configuration(), UUID.randomUUID().toString());
+ }
+
+ public static FileStoreTable create(
+ Path tablePath, TableSchema tableSchema, Configuration dynamicOptions, String user) {
// merge dynamic options into schema.options
- Map<String, String> newOptions = new HashMap<>(tableSchema.options());
- newOptions.putAll(conf.toMap());
- tableSchema = tableSchema.copy(newOptions);
+ Configuration newOptions = Configuration.fromMap(tableSchema.options());
+ dynamicOptions.toMap().forEach(newOptions::setString);
+ newOptions.set(PATH, tablePath.toString());
+ tableSchema = tableSchema.copy(newOptions.toMap());
- if (conf.get(FileStoreOptions.WRITE_MODE) == WriteMode.APPEND_ONLY) {
+ SchemaManager schemaManager = new SchemaManager(tablePath);
+ if (newOptions.get(FileStoreOptions.WRITE_MODE) == WriteMode.APPEND_ONLY) {
return new AppendOnlyFileStoreTable(tablePath, schemaManager, tableSchema, user);
} else {
if (tableSchema.primaryKeys().isEmpty()) {
diff --git a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java
index 50a3f635..1ad4327d 100644
--- a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java
+++ b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java
@@ -149,7 +149,7 @@ public class HiveCatalog extends AbstractCatalog {
}
@Override
- public TableSchema getTable(ObjectPath tablePath) throws TableNotExistException {
+ public TableSchema getTableSchema(ObjectPath tablePath) throws TableNotExistException {
if (isTableStoreTableNotExisted(tablePath)) {
throw new TableNotExistException(tablePath);
}
diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
new file mode 100644
index 00000000..ae8e2f51
--- /dev/null
+++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.spark;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.store.file.catalog.Catalog;
+import org.apache.flink.table.store.file.catalog.CatalogFactory;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.NamespaceChange;
+import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.catalog.TableChange;
+import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/** Spark {@link TableCatalog} for table store. */
+public class SparkCatalog implements TableCatalog, SupportsNamespaces {
+
+ private String name = null;
+ private Catalog catalog = null;
+
+ @Override
+ public void initialize(String name, CaseInsensitiveStringMap options) {
+ this.name = name;
+ this.catalog = CatalogFactory.createCatalog(Configuration.fromMap(options));
+ }
+
+ @Override
+ public String name() {
+ return name;
+ }
+
+ @Override
+ public String[][] listNamespaces() {
+ List<String> databases = catalog.listDatabases();
+ String[][] namespaces = new String[databases.size()][];
+ for (int i = 0; i < databases.size(); i++) {
+ namespaces[i] = new String[] {databases.get(i)};
+ }
+ return namespaces;
+ }
+
+ @Override
+ public String[][] listNamespaces(String[] namespace) throws NoSuchNamespaceException {
+ if (namespace.length == 0) {
+ return listNamespaces();
+ }
+ if (!isValidateNamespace(namespace)) {
+ throw new NoSuchNamespaceException(namespace);
+ }
+ return new String[0][];
+ }
+
+ @Override
+ public Map<String, String> loadNamespaceMetadata(String[] namespace) {
+ return Collections.emptyMap();
+ }
+
+ @Override
+ public Identifier[] listTables(String[] namespace) throws NoSuchNamespaceException {
+ Preconditions.checkArgument(
+ isValidateNamespace(namespace),
+ "Missing database in namespace: %s",
+ Arrays.toString(namespace));
+
+ try {
+ return catalog.listTables(namespace[0]).stream()
+ .map(table -> Identifier.of(namespace, table))
+ .toArray(Identifier[]::new);
+ } catch (Catalog.DatabaseNotExistException e) {
+ throw new NoSuchNamespaceException(namespace);
+ }
+ }
+
+ @Override
+ public Table loadTable(Identifier ident) throws NoSuchTableException {
+ if (!isValidateNamespace(ident.namespace())) {
+ throw new NoSuchTableException(ident);
+ }
+
+ try {
+ return new SparkTable(
+ catalog.getTable(new ObjectPath(ident.namespace()[0], ident.name())));
+ } catch (Catalog.TableNotExistException e) {
+ throw new NoSuchTableException(ident);
+ }
+ }
+
+ @Override
+ public void renameTable(Identifier oldIdent, Identifier newIdent) {
+ throw new UnsupportedOperationException();
+ }
+
+ private boolean isValidateNamespace(String[] namespace) {
+ return namespace.length == 1;
+ }
+
+ // --------------------- unsupported methods ----------------------------
+
+ @Override
+ public void createNamespace(String[] namespace, Map<String, String> metadata) {
+ throw new UnsupportedOperationException("Create namespace in Spark is not supported yet.");
+ }
+
+ @Override
+ public void alterNamespace(String[] namespace, NamespaceChange... changes) {
+ throw new UnsupportedOperationException("Alter namespace in Spark is not supported yet.");
+ }
+
+ @Override
+ public boolean dropNamespace(String[] namespace) {
+ throw new UnsupportedOperationException("Drop namespace in Spark is not supported yet.");
+ }
+
+ @Override
+ public Table createTable(
+ Identifier ident,
+ StructType schema,
+ Transform[] partitions,
+ Map<String, String> properties) {
+ throw new UnsupportedOperationException("Create table in Spark is not supported yet.");
+ }
+
+ @Override
+ public Table alterTable(Identifier ident, TableChange... changes) {
+ throw new UnsupportedOperationException("Alter table in Spark is not supported yet.");
+ }
+
+ @Override
+ public boolean dropTable(Identifier ident) {
+ throw new UnsupportedOperationException("Drop table in Spark is not supported yet.");
+ }
+}
diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
index 08de5ce5..26567405 100644
--- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
+++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
@@ -27,15 +27,16 @@ import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.types.RowKind;
+import org.apache.commons.io.FileUtils;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.io.TempDir;
+import java.io.File;
+import java.io.IOException;
import java.util.Arrays;
import java.util.List;
@@ -44,48 +45,84 @@ import static org.assertj.core.api.Assertions.assertThat;
/** ITCase for spark reader. */
public class SparkReadITCase {
+ private static File warehouse = null;
+
private static SparkSession spark = null;
- @TempDir java.nio.file.Path tempDir;
+ private static Path tablePath1;
- private Path path;
+ private static Path tablePath2;
- private SimpleTableTestHelper testHelper;
+ @BeforeAll
+ public static void startMetastoreAndSpark() throws Exception {
+ warehouse = File.createTempFile("warehouse", null);
+ assertThat(warehouse.delete()).isTrue();
+ Path warehousePath = new Path("file:" + warehouse);
+ spark = SparkSession.builder().master("local[2]").getOrCreate();
+ spark.conf().set("spark.sql.catalog.table_store", SparkCatalog.class.getName());
+ spark.conf().set("spark.sql.catalog.table_store.warehouse", warehousePath.toString());
+
+ // flink sink
+ tablePath1 = new Path(warehousePath, "default.db/t1");
+ SimpleTableTestHelper testHelper1 = createTestHelper(tablePath1);
+ testHelper1.write(GenericRowData.of(1, 2L, StringData.fromString("1")));
+ testHelper1.write(GenericRowData.of(3, 4L, StringData.fromString("2")));
+ testHelper1.write(GenericRowData.of(5, 6L, StringData.fromString("3")));
+ testHelper1.write(GenericRowData.ofKind(RowKind.DELETE, 3, 4L, StringData.fromString("2")));
+ testHelper1.commit();
+
+ tablePath2 = new Path(warehousePath, "default.db/t2");
+ SimpleTableTestHelper testHelper2 = createTestHelper(tablePath2);
+ testHelper2.write(GenericRowData.of(1, 2L, StringData.fromString("1")));
+ testHelper2.write(GenericRowData.of(3, 4L, StringData.fromString("2")));
+ testHelper2.commit();
+ testHelper2.write(GenericRowData.of(5, 6L, StringData.fromString("3")));
+ testHelper2.write(GenericRowData.of(7, 8L, StringData.fromString("4")));
+ testHelper2.commit();
+ }
- @BeforeEach
- public void beforeEach() throws Exception {
- this.path = new Path(tempDir.toUri().toString(), "my_table");
+ private static SimpleTableTestHelper createTestHelper(Path tablePath) throws Exception {
RowType rowType =
new RowType(
Arrays.asList(
new RowType.RowField("a", new IntType()),
new RowType.RowField("b", new BigIntType()),
new RowType.RowField("c", new VarCharType())));
- testHelper = new SimpleTableTestHelper(path, rowType);
- }
-
- @BeforeAll
- public static void startMetastoreAndSpark() {
- spark = SparkSession.builder().master("local[2]").getOrCreate();
+ return new SimpleTableTestHelper(tablePath, rowType);
}
@AfterAll
- public static void stopMetastoreAndSpark() {
+ public static void stopMetastoreAndSpark() throws IOException {
+ if (warehouse != null && warehouse.exists()) {
+ FileUtils.deleteDirectory(warehouse);
+ }
spark.stop();
spark = null;
}
@Test
- public void testNormal() throws Exception {
- testHelper.write(GenericRowData.of(1, 2L, StringData.fromString("1")));
- testHelper.write(GenericRowData.of(3, 4L, StringData.fromString("2")));
- testHelper.write(GenericRowData.of(5, 6L, StringData.fromString("3")));
- testHelper.write(GenericRowData.ofKind(RowKind.DELETE, 3, 4L, StringData.fromString("2")));
- testHelper.commit();
+ public void testNormal() {
+ innerTestNormal(
+ spark.read().format("tablestore").option("path", tablePath1.toString()).load());
+ }
- Dataset<Row> dataset =
- spark.read().format("tablestore").option("path", path.toString()).load();
+ @Test
+ public void testFilterPushDown() {
+ innerTestFilterPushDown(
+ spark.read().format("tablestore").option("path", tablePath2.toString()).load());
+ }
+ @Test
+ public void testCatalogNormal() {
+ innerTestNormal(spark.table("table_store.default.t1"));
+ }
+
+ @Test
+ public void testCatalogFilterPushDown() {
+ innerTestFilterPushDown(spark.table("table_store.default.t2"));
+ }
+
+ private void innerTestNormal(Dataset<Row> dataset) {
List<Row> results = dataset.collectAsList();
assertThat(results.toString()).isEqualTo("[[1,2,1], [5,6,3]]");
@@ -96,19 +133,7 @@ public class SparkReadITCase {
assertThat(results.toString()).isEqualTo("[[8]]");
}
- @Test
- public void testFilterPushDown() throws Exception {
- testHelper.write(GenericRowData.of(1, 2L, StringData.fromString("1")));
- testHelper.write(GenericRowData.of(3, 4L, StringData.fromString("2")));
- testHelper.commit();
-
- testHelper.write(GenericRowData.of(5, 6L, StringData.fromString("3")));
- testHelper.write(GenericRowData.of(7, 8L, StringData.fromString("4")));
- testHelper.commit();
-
- Dataset<Row> dataset =
- spark.read().format("tablestore").option("path", path.toString()).load();
-
+ private void innerTestFilterPushDown(Dataset<Row> dataset) {
List<Row> results = dataset.filter("a < 4").select("a", "c").collectAsList();
assertThat(results.toString()).isEqualTo("[[1,1], [3,2]]");
}