You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by lz...@apache.org on 2023/03/31 06:47:11 UTC
[incubator-paimon] branch master updated: [hive] Use new ReadBuilder and WriteBuilder API in tests of paimon-hive module (#718)
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 224a1abfe [hive] Use new ReadBuilder and WriteBuilder API in tests of paimon-hive module (#718)
224a1abfe is described below
commit 224a1abfe26b79e3cbbf156244a9f88690a6512d
Author: Tyrantlucifer <ty...@apache.org>
AuthorDate: Fri Mar 31 14:47:07 2023 +0800
[hive] Use new ReadBuilder and WriteBuilder API in tests of paimon-hive module (#718)
---
.../org/apache/paimon/hive/FileStoreTestUtils.java | 44 ++++++++-------
.../paimon/hive/PaimonStorageHandlerITCase.java | 62 +++++++++++++---------
.../paimon/hive/mapred/PaimonRecordReaderTest.java | 55 ++++++++++---------
3 files changed, 88 insertions(+), 73 deletions(-)
diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/FileStoreTestUtils.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/FileStoreTestUtils.java
index ea74b7b9b..12dce9978 100644
--- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/FileStoreTestUtils.java
+++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/FileStoreTestUtils.java
@@ -18,36 +18,42 @@
package org.apache.paimon.hive;
-import org.apache.paimon.CoreOptions;
import org.apache.paimon.FileStore;
-import org.apache.paimon.fs.Path;
-import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
-import org.apache.paimon.schema.SchemaManager;
-import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.Table;
import org.apache.paimon.types.RowType;
import java.util.List;
-import static org.apache.paimon.CoreOptions.PATH;
-
/** Test utils related to {@link FileStore}. */
public class FileStoreTestUtils {
- public static FileStoreTable createFileStoreTable(
+ private static final String TABLE_NAME = "hive_test_table";
+
+ private static final String DATABASE_NAME = "default";
+
+ private static final Identifier TABLE_IDENTIFIER = Identifier.create(DATABASE_NAME, TABLE_NAME);
+
+ public static Table createFileStoreTable(
Options conf, RowType rowType, List<String> partitionKeys, List<String> primaryKeys)
throws Exception {
- Path tablePath = CoreOptions.path(conf);
- new SchemaManager(LocalFileIO.create(), tablePath)
- .createTable(
- new Schema(
- rowType.getFields(), partitionKeys, primaryKeys, conf.toMap(), ""));
-
- // only path, other config should be read from file store.
- conf = new Options();
- conf.set(PATH, tablePath.toString());
- return FileStoreTableFactory.create(LocalFileIO.create(), conf);
+ // create CatalogContext using the options
+ CatalogContext catalogContext = CatalogContext.create(conf);
+ Catalog catalog = CatalogFactory.createCatalog(catalogContext);
+ // create database
+ catalog.createDatabase(DATABASE_NAME, false);
+ // create table
+ catalog.createTable(
+ TABLE_IDENTIFIER,
+ new Schema(rowType.getFields(), partitionKeys, primaryKeys, conf.toMap(), ""),
+ false);
+ Table table = catalog.getTable(TABLE_IDENTIFIER);
+ catalog.close();
+ return table;
}
}
diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/PaimonStorageHandlerITCase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/PaimonStorageHandlerITCase.java
index a7be90ebb..28f519b1b 100644
--- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/PaimonStorageHandlerITCase.java
+++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/PaimonStorageHandlerITCase.java
@@ -26,10 +26,12 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.hive.mapred.PaimonInputFormat;
import org.apache.paimon.hive.objectinspector.PaimonObjectInspectorFactory;
+import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
-import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
+import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
@@ -601,35 +603,37 @@ public class PaimonStorageHandlerITCase {
List<InternalRow> data)
throws Exception {
String path = folder.newFolder().toURI().toString();
+ String tablePath = String.format("%s/default.db/hive_test_table", path);
Options conf = new Options();
- conf.set(CoreOptions.PATH, path);
+ conf.set(CatalogOptions.WAREHOUSE, path);
conf.set(CoreOptions.BUCKET, 2);
conf.set(CoreOptions.FILE_FORMAT, CoreOptions.FileFormatType.AVRO);
- FileStoreTable table =
+ Table table =
FileStoreTestUtils.createFileStoreTable(conf, rowType, partitionKeys, primaryKeys);
- return writeData(table, path, data);
+ return writeData(table, tablePath, data);
}
private String createAppendOnlyExternalTable(
RowType rowType, List<String> partitionKeys, List<InternalRow> data) throws Exception {
String path = folder.newFolder().toURI().toString();
+ String tablePath = String.format("%s/default.db/hive_test_table", path);
Options conf = new Options();
- conf.set(CoreOptions.PATH, path);
+ conf.set(CatalogOptions.WAREHOUSE, path);
conf.set(CoreOptions.BUCKET, 2);
conf.set(CoreOptions.FILE_FORMAT, CoreOptions.FileFormatType.AVRO);
conf.set(CoreOptions.WRITE_MODE, WriteMode.APPEND_ONLY);
- FileStoreTable table =
+ Table table =
FileStoreTestUtils.createFileStoreTable(
conf, rowType, partitionKeys, Collections.emptyList());
- return writeData(table, path, data);
+ return writeData(table, tablePath, data);
}
- private String writeData(FileStoreTable table, String path, List<InternalRow> data)
- throws Exception {
- StreamTableWrite write = table.newWrite(commitUser);
- StreamTableCommit commit = table.newCommit(commitUser);
+ private String writeData(Table table, String path, List<InternalRow> data) throws Exception {
+ StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder();
+ StreamTableWrite write = streamWriteBuilder.newWrite();
+ StreamTableCommit commit = streamWriteBuilder.newCommit();
for (InternalRow rowData : data) {
write.write(rowData);
if (ThreadLocalRandom.current().nextInt(5) == 0) {
@@ -655,10 +659,11 @@ public class PaimonStorageHandlerITCase {
@Test
public void testReadAllSupportedTypes() throws Exception {
String root = folder.newFolder().toString();
+ String tablePath = String.format("%s/default.db/hive_test_table", root);
Options conf = new Options();
- conf.set(CoreOptions.PATH, root);
+ conf.set(CatalogOptions.WAREHOUSE, root);
conf.set(CoreOptions.FILE_FORMAT, CoreOptions.FileFormatType.AVRO);
- FileStoreTable table =
+ Table table =
FileStoreTestUtils.createFileStoreTable(
conf,
RandomGenericRowDataGenerator.ROW_TYPE,
@@ -678,8 +683,9 @@ public class PaimonStorageHandlerITCase {
}
}
- StreamTableWrite write = table.newWrite(commitUser);
- StreamTableCommit commit = table.newCommit(commitUser);
+ StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder();
+ StreamTableWrite write = streamWriteBuilder.newWrite();
+ StreamTableCommit commit = streamWriteBuilder.newCommit();
for (GenericRow rowData : input) {
write.write(rowData);
}
@@ -692,7 +698,7 @@ public class PaimonStorageHandlerITCase {
Arrays.asList(
"CREATE EXTERNAL TABLE test_table",
"STORED BY '" + PaimonStorageHandler.class.getName() + "'",
- "LOCATION '" + root + "'")));
+ "LOCATION '" + tablePath + "'")));
List<Object[]> actual =
hiveShell.executeStatement("SELECT * FROM test_table WHERE f_int > 0");
@@ -771,10 +777,11 @@ public class PaimonStorageHandlerITCase {
@Test
public void testPredicatePushDown() throws Exception {
String path = folder.newFolder().toURI().toString();
+ String tablePath = String.format("%s/default.db/hive_test_table", path);
Options conf = new Options();
- conf.set(CoreOptions.PATH, path);
+ conf.set(CatalogOptions.WAREHOUSE, path);
conf.set(CoreOptions.FILE_FORMAT, CoreOptions.FileFormatType.AVRO);
- FileStoreTable table =
+ Table table =
FileStoreTestUtils.createFileStoreTable(
conf,
RowType.of(new DataType[] {DataTypes.INT()}, new String[] {"a"}),
@@ -783,8 +790,9 @@ public class PaimonStorageHandlerITCase {
// TODO add NaN related tests
- StreamTableWrite write = table.newWrite(commitUser);
- StreamTableCommit commit = table.newCommit(commitUser);
+ StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder();
+ StreamTableWrite write = streamWriteBuilder.newWrite();
+ StreamTableCommit commit = streamWriteBuilder.newCommit();
write.write(GenericRow.of(1));
commit.commit(0, write.prepareCommit(true, 0));
write.write(GenericRow.of((Object) null));
@@ -805,7 +813,7 @@ public class PaimonStorageHandlerITCase {
Arrays.asList(
"CREATE EXTERNAL TABLE test_table",
"STORED BY '" + PaimonStorageHandler.class.getName() + "'",
- "LOCATION '" + path + "'")));
+ "LOCATION '" + tablePath + "'")));
Assert.assertEquals(
Arrays.asList("1", "5"),
hiveShell.executeQuery("SELECT * FROM test_table WHERE a = 1 OR a = 5"));
@@ -860,10 +868,11 @@ public class PaimonStorageHandlerITCase {
@Test
public void testDateAndTimestamp() throws Exception {
String path = folder.newFolder().toURI().toString();
+ String tablePath = String.format("%s/default.db/hive_test_table", path);
Options conf = new Options();
- conf.set(CoreOptions.PATH, path);
+ conf.set(CatalogOptions.WAREHOUSE, path);
conf.set(CoreOptions.FILE_FORMAT, CoreOptions.FileFormatType.AVRO);
- FileStoreTable table =
+ Table table =
FileStoreTestUtils.createFileStoreTable(
conf,
RowType.of(
@@ -872,8 +881,9 @@ public class PaimonStorageHandlerITCase {
Collections.emptyList(),
Collections.emptyList());
- StreamTableWrite write = table.newWrite(commitUser);
- StreamTableCommit commit = table.newCommit(commitUser);
+ StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder();
+ StreamTableWrite write = streamWriteBuilder.newWrite();
+ StreamTableCommit commit = streamWriteBuilder.newCommit();
write.write(
GenericRow.of(
375, /* 1971-01-11 */
@@ -897,7 +907,7 @@ public class PaimonStorageHandlerITCase {
Arrays.asList(
"CREATE EXTERNAL TABLE test_table",
"STORED BY '" + PaimonStorageHandler.class.getName() + "'",
- "LOCATION '" + path + "'")));
+ "LOCATION '" + tablePath + "'")));
Assert.assertEquals(
Collections.singletonList("1971-01-11\t2022-05-17 17:29:20.1"),
hiveShell.executeQuery("SELECT * FROM test_table WHERE dt = '1971-01-11'"));
diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonRecordReaderTest.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonRecordReaderTest.java
index 2548accff..f83a6269e 100644
--- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonRecordReaderTest.java
+++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonRecordReaderTest.java
@@ -25,17 +25,20 @@ import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.hive.FileStoreTestUtils;
import org.apache.paimon.hive.RowDataContainer;
+import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
+import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.Split;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
@@ -46,7 +49,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.UUID;
import static org.assertj.core.api.Assertions.assertThat;
@@ -54,19 +56,13 @@ import static org.assertj.core.api.Assertions.assertThat;
public class PaimonRecordReaderTest {
@TempDir java.nio.file.Path tempDir;
- private String commitUser;
-
- @BeforeEach
- public void beforeEach() {
- commitUser = UUID.randomUUID().toString();
- }
@Test
public void testPk() throws Exception {
Options conf = new Options();
- conf.set(CoreOptions.PATH, tempDir.toString());
+ conf.set(CatalogOptions.WAREHOUSE, tempDir.toString());
conf.set(CoreOptions.FILE_FORMAT, CoreOptions.FileFormatType.AVRO);
- FileStoreTable table =
+ Table table =
FileStoreTestUtils.createFileStoreTable(
conf,
RowType.of(
@@ -75,8 +71,9 @@ public class PaimonRecordReaderTest {
Collections.emptyList(),
Collections.singletonList("a"));
- StreamTableWrite write = table.newWrite(commitUser);
- StreamTableCommit commit = table.newCommit(commitUser);
+ StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder();
+ StreamTableWrite write = streamWriteBuilder.newWrite();
+ StreamTableCommit commit = streamWriteBuilder.newCommit();
write.write(GenericRow.of(1L, BinaryString.fromString("Hi")));
write.write(GenericRow.of(2L, BinaryString.fromString("Hello")));
write.write(GenericRow.of(3L, BinaryString.fromString("World")));
@@ -102,9 +99,9 @@ public class PaimonRecordReaderTest {
@Test
public void testValueCount() throws Exception {
Options conf = new Options();
- conf.set(CoreOptions.PATH, tempDir.toString());
+ conf.set(CatalogOptions.WAREHOUSE, tempDir.toString());
conf.set(CoreOptions.FILE_FORMAT, CoreOptions.FileFormatType.AVRO);
- FileStoreTable table =
+ Table table =
FileStoreTestUtils.createFileStoreTable(
conf,
RowType.of(
@@ -113,8 +110,9 @@ public class PaimonRecordReaderTest {
Collections.emptyList(),
Collections.emptyList());
- StreamTableWrite write = table.newWrite(commitUser);
- StreamTableCommit commit = table.newCommit(commitUser);
+ StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder();
+ StreamTableWrite write = streamWriteBuilder.newWrite();
+ StreamTableCommit commit = streamWriteBuilder.newCommit();
write.write(GenericRow.of(1, BinaryString.fromString("Hi")));
write.write(GenericRow.of(2, BinaryString.fromString("Hello")));
write.write(GenericRow.of(3, BinaryString.fromString("World")));
@@ -141,9 +139,9 @@ public class PaimonRecordReaderTest {
@Test
public void testProjectionPushdown() throws Exception {
Options conf = new Options();
- conf.set(CoreOptions.PATH, tempDir.toString());
+ conf.set(CatalogOptions.WAREHOUSE, tempDir.toString());
conf.set(CoreOptions.FILE_FORMAT, CoreOptions.FileFormatType.AVRO);
- FileStoreTable table =
+ Table table =
FileStoreTestUtils.createFileStoreTable(
conf,
RowType.of(
@@ -154,8 +152,9 @@ public class PaimonRecordReaderTest {
Collections.emptyList(),
Collections.emptyList());
- StreamTableWrite write = table.newWrite(commitUser);
- StreamTableCommit commit = table.newCommit(commitUser);
+ StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder();
+ StreamTableWrite write = streamWriteBuilder.newWrite();
+ StreamTableCommit commit = streamWriteBuilder.newCommit();
write.write(GenericRow.of(1, 10L, BinaryString.fromString("Hi")));
write.write(GenericRow.of(2, 20L, BinaryString.fromString("Hello")));
write.write(GenericRow.of(1, 10L, BinaryString.fromString("Hi")));
@@ -176,20 +175,20 @@ public class PaimonRecordReaderTest {
assertThat(actual).isEqualTo(expected);
}
- private PaimonRecordReader read(FileStoreTable table, BinaryRow partition, int bucket)
- throws Exception {
- return read(table, partition, bucket, table.schema().fieldNames());
+ private PaimonRecordReader read(Table table, BinaryRow partition, int bucket) throws Exception {
+ return read(table, partition, bucket, ((FileStoreTable) table).schema().fieldNames());
}
private PaimonRecordReader read(
- FileStoreTable table, BinaryRow partition, int bucket, List<String> selectedColumns)
+ Table table, BinaryRow partition, int bucket, List<String> selectedColumns)
throws Exception {
- for (DataSplit split : table.newScan().plan().splits) {
- if (split.partition().equals(partition) && split.bucket() == bucket) {
+ for (Split split : table.newReadBuilder().newScan().plan().splits()) {
+ DataSplit dataSplit = (DataSplit) split;
+ if (dataSplit.partition().equals(partition) && dataSplit.bucket() == bucket) {
return new PaimonRecordReader(
table.newReadBuilder(),
- new PaimonInputSplit(tempDir.toString(), split),
- table.schema().fieldNames(),
+ new PaimonInputSplit(tempDir.toString(), dataSplit),
+ ((FileStoreTable) table).schema().fieldNames(),
selectedColumns);
}
}