You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by lz...@apache.org on 2023/07/26 02:37:53 UTC

[incubator-paimon] branch master updated: [bug] fix spark generic catalog could not load existing paimon tables (#1657)

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new c3033e3b1 [bug] fix spark generic catalog could not load existing paimon tables (#1657)
c3033e3b1 is described below

commit c3033e3b1f1aa7a13ab472d570238b251456f599
Author: YeJunHao <41...@users.noreply.github.com>
AuthorDate: Wed Jul 26 10:37:49 2023 +0800

    [bug] fix spark generic catalog could not load existing paimon tables (#1657)
---
 .../apache/paimon/spark/SparkGenericCatalog.java   | 18 +++++++++--
 .../paimon/spark/SparkGenericCatalogTest.java      | 36 +++++++++++++++++-----
 2 files changed, 43 insertions(+), 11 deletions(-)

diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
index 59355af32..087709004 100644
--- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
@@ -51,6 +51,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.Callable;
 
 import static org.apache.paimon.options.CatalogOptions.METASTORE;
 import static org.apache.paimon.options.CatalogOptions.WAREHOUSE;
@@ -129,7 +130,7 @@ public class SparkGenericCatalog<T extends TableCatalog & SupportsNamespaces>
         try {
             return paimonCatalog.loadTable(ident);
         } catch (NoSuchTableException e) {
-            return getSessionCatalog().loadTable(ident);
+            return throwsOldIfExceptionHappens(() -> getSessionCatalog().loadTable(ident), e);
         }
     }
 
@@ -138,7 +139,8 @@ public class SparkGenericCatalog<T extends TableCatalog & SupportsNamespaces>
         try {
             return paimonCatalog.loadTable(ident, version);
         } catch (NoSuchTableException e) {
-            return getSessionCatalog().loadTable(ident, version);
+            return throwsOldIfExceptionHappens(
+                    () -> getSessionCatalog().loadTable(ident, version), e);
         }
     }
 
@@ -147,7 +149,8 @@ public class SparkGenericCatalog<T extends TableCatalog & SupportsNamespaces>
         try {
             return paimonCatalog.loadTable(ident, timestamp);
         } catch (NoSuchTableException e) {
-            return getSessionCatalog().loadTable(ident, timestamp);
+            return throwsOldIfExceptionHappens(
+                    () -> getSessionCatalog().loadTable(ident, timestamp), e);
         }
     }
 
@@ -283,4 +286,13 @@ public class SparkGenericCatalog<T extends TableCatalog & SupportsNamespaces>
     private static boolean isSystemNamespace(String[] namespace) {
         return namespace.length == 1 && namespace[0].equalsIgnoreCase("system");
     }
+
+    private Table throwsOldIfExceptionHappens(Callable<Table> call, NoSuchTableException e)
+            throws NoSuchTableException {
+        try {
+            return call.call();
+        } catch (Exception exception) {
+            throw e;
+        }
+    }
 }
diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkGenericCatalogTest.java b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkGenericCatalogTest.java
index aa14bf72e..b6d33c859 100644
--- a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkGenericCatalogTest.java
+++ b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkGenericCatalogTest.java
@@ -22,24 +22,25 @@ import org.apache.paimon.fs.Path;
 
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.util.List;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
 
 /** Base tests for spark read. */
 public class SparkGenericCatalogTest {
 
-    protected static SparkSession spark = null;
+    protected SparkSession spark = null;
 
-    protected static Path warehousePath = null;
+    protected Path warehousePath = null;
 
-    @BeforeAll
-    public static void startMetastoreAndSpark(@TempDir java.nio.file.Path tempDir) {
+    @BeforeEach
+    public void startMetastoreAndSpark(@TempDir java.nio.file.Path tempDir) {
         warehousePath = new Path("file:" + tempDir.toString());
         spark =
                 SparkSession.builder()
@@ -49,8 +50,8 @@ public class SparkGenericCatalogTest {
         spark.conf().set("spark.sql.catalog.spark_catalog", SparkGenericCatalog.class.getName());
     }
 
-    @AfterAll
-    public static void stopMetastoreAndSpark() {
+    @AfterEach
+    public void stopMetastoreAndSpark() {
         if (spark != null) {
             spark.stop();
             spark = null;
@@ -74,6 +75,25 @@ public class SparkGenericCatalogTest {
                 .containsExactlyInAnyOrder("[default]", "[my_db]");
     }
 
+    @Test
+    public void testSparkSessionReload() {
+        spark.sql("CREATE DATABASE my_db");
+        spark.close();
+
+        spark =
+                SparkSession.builder()
+                        .config("spark.sql.warehouse.dir", warehousePath.toString())
+                        .master("local[2]")
+                        .getOrCreate();
+        spark.conf().set("spark.sql.catalog.spark_catalog", SparkGenericCatalog.class.getName());
+        assertThatCode(
+                        () ->
+                                spark.sql(
+                                        "CREATE TABLE my_db.DB_PT (a INT, b INT, c STRING) USING paimon TBLPROPERTIES"
+                                                + " ('file.format'='avro')"))
+                .doesNotThrowAnyException();
+    }
+
     @Test
     public void testCsvTable() {
         spark.sql("CREATE TABLE CT (a INT, b INT, c STRING) USING csv");