You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by lz...@apache.org on 2023/03/31 06:47:11 UTC

[incubator-paimon] branch master updated: [hive] Use new ReadBuilder and WriteBuilder API in tests of paimon-hive module (#718)

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 224a1abfe [hive] Use new ReadBuilder and WriteBuilder API in tests of paimon-hive module (#718)
224a1abfe is described below

commit 224a1abfe26b79e3cbbf156244a9f88690a6512d
Author: Tyrantlucifer <ty...@apache.org>
AuthorDate: Fri Mar 31 14:47:07 2023 +0800

    [hive] Use new ReadBuilder and WriteBuilder API in tests of paimon-hive module (#718)
---
 .../org/apache/paimon/hive/FileStoreTestUtils.java | 44 ++++++++-------
 .../paimon/hive/PaimonStorageHandlerITCase.java    | 62 +++++++++++++---------
 .../paimon/hive/mapred/PaimonRecordReaderTest.java | 55 ++++++++++---------
 3 files changed, 88 insertions(+), 73 deletions(-)

diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/FileStoreTestUtils.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/FileStoreTestUtils.java
index ea74b7b9b..12dce9978 100644
--- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/FileStoreTestUtils.java
+++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/FileStoreTestUtils.java
@@ -18,36 +18,42 @@
 
 package org.apache.paimon.hive;
 
-import org.apache.paimon.CoreOptions;
 import org.apache.paimon.FileStore;
-import org.apache.paimon.fs.Path;
-import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.Schema;
-import org.apache.paimon.schema.SchemaManager;
-import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.Table;
 import org.apache.paimon.types.RowType;
 
 import java.util.List;
 
-import static org.apache.paimon.CoreOptions.PATH;
-
 /** Test utils related to {@link FileStore}. */
 public class FileStoreTestUtils {
 
-    public static FileStoreTable createFileStoreTable(
+    private static final String TABLE_NAME = "hive_test_table";
+
+    private static final String DATABASE_NAME = "default";
+
+    private static final Identifier TABLE_IDENTIFIER = Identifier.create(DATABASE_NAME, TABLE_NAME);
+
+    public static Table createFileStoreTable(
             Options conf, RowType rowType, List<String> partitionKeys, List<String> primaryKeys)
             throws Exception {
-        Path tablePath = CoreOptions.path(conf);
-        new SchemaManager(LocalFileIO.create(), tablePath)
-                .createTable(
-                        new Schema(
-                                rowType.getFields(), partitionKeys, primaryKeys, conf.toMap(), ""));
-
-        // only path, other config should be read from file store.
-        conf = new Options();
-        conf.set(PATH, tablePath.toString());
-        return FileStoreTableFactory.create(LocalFileIO.create(), conf);
+        // create CatalogContext using the options
+        CatalogContext catalogContext = CatalogContext.create(conf);
+        Catalog catalog = CatalogFactory.createCatalog(catalogContext);
+        // create database
+        catalog.createDatabase(DATABASE_NAME, false);
+        // create table
+        catalog.createTable(
+                TABLE_IDENTIFIER,
+                new Schema(rowType.getFields(), partitionKeys, primaryKeys, conf.toMap(), ""),
+                false);
+        Table table = catalog.getTable(TABLE_IDENTIFIER);
+        catalog.close();
+        return table;
     }
 }
diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/PaimonStorageHandlerITCase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/PaimonStorageHandlerITCase.java
index a7be90ebb..28f519b1b 100644
--- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/PaimonStorageHandlerITCase.java
+++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/PaimonStorageHandlerITCase.java
@@ -26,10 +26,12 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.Timestamp;
 import org.apache.paimon.hive.mapred.PaimonInputFormat;
 import org.apache.paimon.hive.objectinspector.PaimonObjectInspectorFactory;
+import org.apache.paimon.options.CatalogOptions;
 import org.apache.paimon.options.Options;
-import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
 import org.apache.paimon.table.sink.StreamTableCommit;
 import org.apache.paimon.table.sink.StreamTableWrite;
+import org.apache.paimon.table.sink.StreamWriteBuilder;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowKind;
@@ -601,35 +603,37 @@ public class PaimonStorageHandlerITCase {
             List<InternalRow> data)
             throws Exception {
         String path = folder.newFolder().toURI().toString();
+        String tablePath = String.format("%s/default.db/hive_test_table", path);
         Options conf = new Options();
-        conf.set(CoreOptions.PATH, path);
+        conf.set(CatalogOptions.WAREHOUSE, path);
         conf.set(CoreOptions.BUCKET, 2);
         conf.set(CoreOptions.FILE_FORMAT, CoreOptions.FileFormatType.AVRO);
-        FileStoreTable table =
+        Table table =
                 FileStoreTestUtils.createFileStoreTable(conf, rowType, partitionKeys, primaryKeys);
 
-        return writeData(table, path, data);
+        return writeData(table, tablePath, data);
     }
 
     private String createAppendOnlyExternalTable(
             RowType rowType, List<String> partitionKeys, List<InternalRow> data) throws Exception {
         String path = folder.newFolder().toURI().toString();
+        String tablePath = String.format("%s/default.db/hive_test_table", path);
         Options conf = new Options();
-        conf.set(CoreOptions.PATH, path);
+        conf.set(CatalogOptions.WAREHOUSE, path);
         conf.set(CoreOptions.BUCKET, 2);
         conf.set(CoreOptions.FILE_FORMAT, CoreOptions.FileFormatType.AVRO);
         conf.set(CoreOptions.WRITE_MODE, WriteMode.APPEND_ONLY);
-        FileStoreTable table =
+        Table table =
                 FileStoreTestUtils.createFileStoreTable(
                         conf, rowType, partitionKeys, Collections.emptyList());
 
-        return writeData(table, path, data);
+        return writeData(table, tablePath, data);
     }
 
-    private String writeData(FileStoreTable table, String path, List<InternalRow> data)
-            throws Exception {
-        StreamTableWrite write = table.newWrite(commitUser);
-        StreamTableCommit commit = table.newCommit(commitUser);
+    private String writeData(Table table, String path, List<InternalRow> data) throws Exception {
+        StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder();
+        StreamTableWrite write = streamWriteBuilder.newWrite();
+        StreamTableCommit commit = streamWriteBuilder.newCommit();
         for (InternalRow rowData : data) {
             write.write(rowData);
             if (ThreadLocalRandom.current().nextInt(5) == 0) {
@@ -655,10 +659,11 @@ public class PaimonStorageHandlerITCase {
     @Test
     public void testReadAllSupportedTypes() throws Exception {
         String root = folder.newFolder().toString();
+        String tablePath = String.format("%s/default.db/hive_test_table", root);
         Options conf = new Options();
-        conf.set(CoreOptions.PATH, root);
+        conf.set(CatalogOptions.WAREHOUSE, root);
         conf.set(CoreOptions.FILE_FORMAT, CoreOptions.FileFormatType.AVRO);
-        FileStoreTable table =
+        Table table =
                 FileStoreTestUtils.createFileStoreTable(
                         conf,
                         RandomGenericRowDataGenerator.ROW_TYPE,
@@ -678,8 +683,9 @@ public class PaimonStorageHandlerITCase {
             }
         }
 
-        StreamTableWrite write = table.newWrite(commitUser);
-        StreamTableCommit commit = table.newCommit(commitUser);
+        StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder();
+        StreamTableWrite write = streamWriteBuilder.newWrite();
+        StreamTableCommit commit = streamWriteBuilder.newCommit();
         for (GenericRow rowData : input) {
             write.write(rowData);
         }
@@ -692,7 +698,7 @@ public class PaimonStorageHandlerITCase {
                         Arrays.asList(
                                 "CREATE EXTERNAL TABLE test_table",
                                 "STORED BY '" + PaimonStorageHandler.class.getName() + "'",
-                                "LOCATION '" + root + "'")));
+                                "LOCATION '" + tablePath + "'")));
         List<Object[]> actual =
                 hiveShell.executeStatement("SELECT * FROM test_table WHERE f_int > 0");
 
@@ -771,10 +777,11 @@ public class PaimonStorageHandlerITCase {
     @Test
     public void testPredicatePushDown() throws Exception {
         String path = folder.newFolder().toURI().toString();
+        String tablePath = String.format("%s/default.db/hive_test_table", path);
         Options conf = new Options();
-        conf.set(CoreOptions.PATH, path);
+        conf.set(CatalogOptions.WAREHOUSE, path);
         conf.set(CoreOptions.FILE_FORMAT, CoreOptions.FileFormatType.AVRO);
-        FileStoreTable table =
+        Table table =
                 FileStoreTestUtils.createFileStoreTable(
                         conf,
                         RowType.of(new DataType[] {DataTypes.INT()}, new String[] {"a"}),
@@ -783,8 +790,9 @@ public class PaimonStorageHandlerITCase {
 
         // TODO add NaN related tests
 
-        StreamTableWrite write = table.newWrite(commitUser);
-        StreamTableCommit commit = table.newCommit(commitUser);
+        StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder();
+        StreamTableWrite write = streamWriteBuilder.newWrite();
+        StreamTableCommit commit = streamWriteBuilder.newCommit();
         write.write(GenericRow.of(1));
         commit.commit(0, write.prepareCommit(true, 0));
         write.write(GenericRow.of((Object) null));
@@ -805,7 +813,7 @@ public class PaimonStorageHandlerITCase {
                         Arrays.asList(
                                 "CREATE EXTERNAL TABLE test_table",
                                 "STORED BY '" + PaimonStorageHandler.class.getName() + "'",
-                                "LOCATION '" + path + "'")));
+                                "LOCATION '" + tablePath + "'")));
         Assert.assertEquals(
                 Arrays.asList("1", "5"),
                 hiveShell.executeQuery("SELECT * FROM test_table WHERE a = 1 OR a = 5"));
@@ -860,10 +868,11 @@ public class PaimonStorageHandlerITCase {
     @Test
     public void testDateAndTimestamp() throws Exception {
         String path = folder.newFolder().toURI().toString();
+        String tablePath = String.format("%s/default.db/hive_test_table", path);
         Options conf = new Options();
-        conf.set(CoreOptions.PATH, path);
+        conf.set(CatalogOptions.WAREHOUSE, path);
         conf.set(CoreOptions.FILE_FORMAT, CoreOptions.FileFormatType.AVRO);
-        FileStoreTable table =
+        Table table =
                 FileStoreTestUtils.createFileStoreTable(
                         conf,
                         RowType.of(
@@ -872,8 +881,9 @@ public class PaimonStorageHandlerITCase {
                         Collections.emptyList(),
                         Collections.emptyList());
 
-        StreamTableWrite write = table.newWrite(commitUser);
-        StreamTableCommit commit = table.newCommit(commitUser);
+        StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder();
+        StreamTableWrite write = streamWriteBuilder.newWrite();
+        StreamTableCommit commit = streamWriteBuilder.newCommit();
         write.write(
                 GenericRow.of(
                         375, /* 1971-01-11 */
@@ -897,7 +907,7 @@ public class PaimonStorageHandlerITCase {
                         Arrays.asList(
                                 "CREATE EXTERNAL TABLE test_table",
                                 "STORED BY '" + PaimonStorageHandler.class.getName() + "'",
-                                "LOCATION '" + path + "'")));
+                                "LOCATION '" + tablePath + "'")));
         Assert.assertEquals(
                 Collections.singletonList("1971-01-11\t2022-05-17 17:29:20.1"),
                 hiveShell.executeQuery("SELECT * FROM test_table WHERE dt = '1971-01-11'"));
diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonRecordReaderTest.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonRecordReaderTest.java
index 2548accff..f83a6269e 100644
--- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonRecordReaderTest.java
+++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonRecordReaderTest.java
@@ -25,17 +25,20 @@ import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.hive.FileStoreTestUtils;
 import org.apache.paimon.hive.RowDataContainer;
+import org.apache.paimon.options.CatalogOptions;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
 import org.apache.paimon.table.sink.StreamTableCommit;
 import org.apache.paimon.table.sink.StreamTableWrite;
+import org.apache.paimon.table.sink.StreamWriteBuilder;
 import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.Split;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowKind;
 import org.apache.paimon.types.RowType;
 
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
@@ -46,7 +49,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.UUID;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -54,19 +56,13 @@ import static org.assertj.core.api.Assertions.assertThat;
 public class PaimonRecordReaderTest {
 
     @TempDir java.nio.file.Path tempDir;
-    private String commitUser;
-
-    @BeforeEach
-    public void beforeEach() {
-        commitUser = UUID.randomUUID().toString();
-    }
 
     @Test
     public void testPk() throws Exception {
         Options conf = new Options();
-        conf.set(CoreOptions.PATH, tempDir.toString());
+        conf.set(CatalogOptions.WAREHOUSE, tempDir.toString());
         conf.set(CoreOptions.FILE_FORMAT, CoreOptions.FileFormatType.AVRO);
-        FileStoreTable table =
+        Table table =
                 FileStoreTestUtils.createFileStoreTable(
                         conf,
                         RowType.of(
@@ -75,8 +71,9 @@ public class PaimonRecordReaderTest {
                         Collections.emptyList(),
                         Collections.singletonList("a"));
 
-        StreamTableWrite write = table.newWrite(commitUser);
-        StreamTableCommit commit = table.newCommit(commitUser);
+        StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder();
+        StreamTableWrite write = streamWriteBuilder.newWrite();
+        StreamTableCommit commit = streamWriteBuilder.newCommit();
         write.write(GenericRow.of(1L, BinaryString.fromString("Hi")));
         write.write(GenericRow.of(2L, BinaryString.fromString("Hello")));
         write.write(GenericRow.of(3L, BinaryString.fromString("World")));
@@ -102,9 +99,9 @@ public class PaimonRecordReaderTest {
     @Test
     public void testValueCount() throws Exception {
         Options conf = new Options();
-        conf.set(CoreOptions.PATH, tempDir.toString());
+        conf.set(CatalogOptions.WAREHOUSE, tempDir.toString());
         conf.set(CoreOptions.FILE_FORMAT, CoreOptions.FileFormatType.AVRO);
-        FileStoreTable table =
+        Table table =
                 FileStoreTestUtils.createFileStoreTable(
                         conf,
                         RowType.of(
@@ -113,8 +110,9 @@ public class PaimonRecordReaderTest {
                         Collections.emptyList(),
                         Collections.emptyList());
 
-        StreamTableWrite write = table.newWrite(commitUser);
-        StreamTableCommit commit = table.newCommit(commitUser);
+        StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder();
+        StreamTableWrite write = streamWriteBuilder.newWrite();
+        StreamTableCommit commit = streamWriteBuilder.newCommit();
         write.write(GenericRow.of(1, BinaryString.fromString("Hi")));
         write.write(GenericRow.of(2, BinaryString.fromString("Hello")));
         write.write(GenericRow.of(3, BinaryString.fromString("World")));
@@ -141,9 +139,9 @@ public class PaimonRecordReaderTest {
     @Test
     public void testProjectionPushdown() throws Exception {
         Options conf = new Options();
-        conf.set(CoreOptions.PATH, tempDir.toString());
+        conf.set(CatalogOptions.WAREHOUSE, tempDir.toString());
         conf.set(CoreOptions.FILE_FORMAT, CoreOptions.FileFormatType.AVRO);
-        FileStoreTable table =
+        Table table =
                 FileStoreTestUtils.createFileStoreTable(
                         conf,
                         RowType.of(
@@ -154,8 +152,9 @@ public class PaimonRecordReaderTest {
                         Collections.emptyList(),
                         Collections.emptyList());
 
-        StreamTableWrite write = table.newWrite(commitUser);
-        StreamTableCommit commit = table.newCommit(commitUser);
+        StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder();
+        StreamTableWrite write = streamWriteBuilder.newWrite();
+        StreamTableCommit commit = streamWriteBuilder.newCommit();
         write.write(GenericRow.of(1, 10L, BinaryString.fromString("Hi")));
         write.write(GenericRow.of(2, 20L, BinaryString.fromString("Hello")));
         write.write(GenericRow.of(1, 10L, BinaryString.fromString("Hi")));
@@ -176,20 +175,20 @@ public class PaimonRecordReaderTest {
         assertThat(actual).isEqualTo(expected);
     }
 
-    private PaimonRecordReader read(FileStoreTable table, BinaryRow partition, int bucket)
-            throws Exception {
-        return read(table, partition, bucket, table.schema().fieldNames());
+    private PaimonRecordReader read(Table table, BinaryRow partition, int bucket) throws Exception {
+        return read(table, partition, bucket, ((FileStoreTable) table).schema().fieldNames());
     }
 
     private PaimonRecordReader read(
-            FileStoreTable table, BinaryRow partition, int bucket, List<String> selectedColumns)
+            Table table, BinaryRow partition, int bucket, List<String> selectedColumns)
             throws Exception {
-        for (DataSplit split : table.newScan().plan().splits) {
-            if (split.partition().equals(partition) && split.bucket() == bucket) {
+        for (Split split : table.newReadBuilder().newScan().plan().splits()) {
+            DataSplit dataSplit = (DataSplit) split;
+            if (dataSplit.partition().equals(partition) && dataSplit.bucket() == bucket) {
                 return new PaimonRecordReader(
                         table.newReadBuilder(),
-                        new PaimonInputSplit(tempDir.toString(), split),
-                        table.schema().fieldNames(),
+                        new PaimonInputSplit(tempDir.toString(), dataSplit),
+                        ((FileStoreTable) table).schema().fieldNames(),
                         selectedColumns);
             }
         }