You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by lz...@apache.org on 2023/07/26 02:37:53 UTC
[incubator-paimon] branch master updated: [bug] fix spark generic catalog could not load existing paimon tables (#1657)
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new c3033e3b1 [bug] fix spark generic catalog could not load existing paimon tables (#1657)
c3033e3b1 is described below
commit c3033e3b1f1aa7a13ab472d570238b251456f599
Author: YeJunHao <41...@users.noreply.github.com>
AuthorDate: Wed Jul 26 10:37:49 2023 +0800
[bug] fix spark generic catalog could not load existing paimon tables (#1657)
---
.../apache/paimon/spark/SparkGenericCatalog.java | 18 +++++++++--
.../paimon/spark/SparkGenericCatalogTest.java | 36 +++++++++++++++++-----
2 files changed, 43 insertions(+), 11 deletions(-)
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
index 59355af32..087709004 100644
--- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
@@ -51,6 +51,7 @@ import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.Callable;
import static org.apache.paimon.options.CatalogOptions.METASTORE;
import static org.apache.paimon.options.CatalogOptions.WAREHOUSE;
@@ -129,7 +130,7 @@ public class SparkGenericCatalog<T extends TableCatalog & SupportsNamespaces>
try {
return paimonCatalog.loadTable(ident);
} catch (NoSuchTableException e) {
- return getSessionCatalog().loadTable(ident);
+ return throwsOldIfExceptionHappens(() -> getSessionCatalog().loadTable(ident), e);
}
}
@@ -138,7 +139,8 @@ public class SparkGenericCatalog<T extends TableCatalog & SupportsNamespaces>
try {
return paimonCatalog.loadTable(ident, version);
} catch (NoSuchTableException e) {
- return getSessionCatalog().loadTable(ident, version);
+ return throwsOldIfExceptionHappens(
+ () -> getSessionCatalog().loadTable(ident, version), e);
}
}
@@ -147,7 +149,8 @@ public class SparkGenericCatalog<T extends TableCatalog & SupportsNamespaces>
try {
return paimonCatalog.loadTable(ident, timestamp);
} catch (NoSuchTableException e) {
- return getSessionCatalog().loadTable(ident, timestamp);
+ return throwsOldIfExceptionHappens(
+ () -> getSessionCatalog().loadTable(ident, timestamp), e);
}
}
@@ -283,4 +286,13 @@ public class SparkGenericCatalog<T extends TableCatalog & SupportsNamespaces>
private static boolean isSystemNamespace(String[] namespace) {
return namespace.length == 1 && namespace[0].equalsIgnoreCase("system");
}
+
+ private Table throwsOldIfExceptionHappens(Callable<Table> call, NoSuchTableException e)
+ throws NoSuchTableException {
+ try {
+ return call.call();
+ } catch (Exception exception) {
+ throw e;
+ }
+ }
}
diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkGenericCatalogTest.java b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkGenericCatalogTest.java
index aa14bf72e..b6d33c859 100644
--- a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkGenericCatalogTest.java
+++ b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkGenericCatalogTest.java
@@ -22,24 +22,25 @@ import org.apache.paimon.fs.Path;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
/** Base tests for spark read. */
public class SparkGenericCatalogTest {
- protected static SparkSession spark = null;
+ protected SparkSession spark = null;
- protected static Path warehousePath = null;
+ protected Path warehousePath = null;
- @BeforeAll
- public static void startMetastoreAndSpark(@TempDir java.nio.file.Path tempDir) {
+ @BeforeEach
+ public void startMetastoreAndSpark(@TempDir java.nio.file.Path tempDir) {
warehousePath = new Path("file:" + tempDir.toString());
spark =
SparkSession.builder()
@@ -49,8 +50,8 @@ public class SparkGenericCatalogTest {
spark.conf().set("spark.sql.catalog.spark_catalog", SparkGenericCatalog.class.getName());
}
- @AfterAll
- public static void stopMetastoreAndSpark() {
+ @AfterEach
+ public void stopMetastoreAndSpark() {
if (spark != null) {
spark.stop();
spark = null;
@@ -74,6 +75,25 @@ public class SparkGenericCatalogTest {
.containsExactlyInAnyOrder("[default]", "[my_db]");
}
+ @Test
+ public void testSparkSessionReload() {
+ spark.sql("CREATE DATABASE my_db");
+ spark.close();
+
+ spark =
+ SparkSession.builder()
+ .config("spark.sql.warehouse.dir", warehousePath.toString())
+ .master("local[2]")
+ .getOrCreate();
+ spark.conf().set("spark.sql.catalog.spark_catalog", SparkGenericCatalog.class.getName());
+ assertThatCode(
+ () ->
+ spark.sql(
+ "CREATE TABLE my_db.DB_PT (a INT, b INT, c STRING) USING paimon TBLPROPERTIES"
+ + " ('file.format'='avro')"))
+ .doesNotThrowAnyException();
+ }
+
@Test
public void testCsvTable() {
spark.sql("CREATE TABLE CT (a INT, b INT, c STRING) USING csv");