You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2021/01/24 17:25:13 UTC
[spark] branch branch-3.0 updated: [SPARK-34213][SQL][3.0] Refresh
cached data of v1 table in `LOAD DATA`
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 92de93a [SPARK-34213][SQL][3.0] Refresh cached data of v1 table in `LOAD DATA`
92de93a is described below
commit 92de93a4e392f603e4c028263b97a1e36334c8da
Author: Max Gekk <ma...@gmail.com>
AuthorDate: Sun Jan 24 09:24:00 2021 -0800
[SPARK-34213][SQL][3.0] Refresh cached data of v1 table in `LOAD DATA`
### What changes were proposed in this pull request?
Invoke `CatalogImpl.refreshTable()` instead of `SessionCatalog.refreshTable` in v1 implementation of the `LOAD DATA` command. `SessionCatalog.refreshTable` just refreshes metadata comparing to `CatalogImpl.refreshTable()` which refreshes cached table data as well.
### Why are the changes needed?
The example below portraits the issue:
- Create a source table:
```sql
spark-sql> CREATE TABLE src_tbl (c0 int, part int) USING hive PARTITIONED BY (part);
spark-sql> INSERT INTO src_tbl PARTITION (part=0) SELECT 0;
spark-sql> SHOW TABLE EXTENDED LIKE 'src_tbl' PARTITION (part=0);
default src_tbl false Partition Values: [part=0]
Location: file:/Users/maximgekk/proj/load-data-refresh-cache/spark-warehouse/src_tbl/part=0
...
```
- Load data from the source table to a cached destination table:
```sql
spark-sql> CREATE TABLE dst_tbl (c0 int, part int) USING hive PARTITIONED BY (part);
spark-sql> INSERT INTO dst_tbl PARTITION (part=1) SELECT 1;
spark-sql> CACHE TABLE dst_tbl;
spark-sql> SELECT * FROM dst_tbl;
1 1
spark-sql> LOAD DATA LOCAL INPATH '/Users/maximgekk/proj/load-data-refresh-cache/spark-warehouse/src_tbl/part=0' INTO TABLE dst_tbl PARTITION (part=0);
spark-sql> SELECT * FROM dst_tbl;
1 1
```
The last query does not return new loaded data.
### Does this PR introduce _any_ user-facing change?
Yes. After the changes, the example above works correctly:
```sql
spark-sql> LOAD DATA LOCAL INPATH '/Users/maximgekk/proj/load-data-refresh-cache/spark-warehouse/src_tbl/part=0' INTO TABLE dst_tbl PARTITION (part=0);
spark-sql> SELECT * FROM dst_tbl;
0 0
1 1
```
### How was this patch tested?
Added new test to `org.apache.spark.sql.hive.CachedTableSuite`:
```
$ build/sbt -Phive -Phive-thriftserver "test:testOnly *CachedTableSuite"
```
Authored-by: Max Gekk <max.gekkgmail.com>
Signed-off-by: Dongjoon Hyun <dhyunapple.com>
(cherry picked from commit f8bf72ed5d1c25cb9068dc80d3996fcd5aade3ae)
Signed-off-by: Max Gekk <max.gekkgmail.com>
Closes #31305 from MaxGekk/load-data-refresh-cache-3.0.
Authored-by: Max Gekk <ma...@gmail.com>
Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
.../spark/sql/execution/command/tables.scala | 4 ++--
.../apache/spark/sql/hive/CachedTableSuite.scala | 25 ++++++++++++++++++++++
2 files changed, 27 insertions(+), 2 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 42f4a72..5fb3ce3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -389,8 +389,8 @@ case class LoadDataCommand(
isSrcLocal = isLocal)
}
- // Refresh the metadata cache to ensure the data visible to the users
- catalog.refreshTable(targetTable.identifier)
+ // Refresh the data and metadata cache to ensure the data visible to the users
+ sparkSession.catalog.refreshTable(tableIdentwithDB)
CommandUtils.updateTableStats(sparkSession, targetTable)
Seq.empty[Row]
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
index e3141f5..5b74b4b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
@@ -527,4 +527,29 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
checkAnswer(sql("SELECT * FROM t"), Seq(Row(1, 1)))
}
}
+
+ test("SPARK-34213: LOAD DATA refreshes cached table") {
+ withTable("src_tbl") {
+ withTable("dst_tbl") {
+ sql("CREATE TABLE src_tbl (c0 int, part int) USING hive PARTITIONED BY (part)")
+ sql("INSERT INTO src_tbl PARTITION (part=0) SELECT 0")
+ val information = sql("SHOW TABLE EXTENDED LIKE 'src_tbl' PARTITION (part=0)")
+ .select("information")
+ .first().getString(0)
+ val location = information
+ .split("\\r?\\n")
+ .filter(_.startsWith("Location:"))
+ .head
+ .replace("Location: file:", "")
+ sql("CREATE TABLE dst_tbl (c0 int, part int) USING hive PARTITIONED BY (part)")
+ sql("INSERT INTO dst_tbl PARTITION (part=1) SELECT 1")
+ sql("CACHE TABLE dst_tbl")
+ assert(spark.catalog.isCached("dst_tbl"))
+ checkAnswer(sql("SELECT * FROM dst_tbl"), Row(1, 1))
+ sql(s"LOAD DATA LOCAL INPATH '$location' INTO TABLE dst_tbl PARTITION (part=0)")
+ assert(spark.catalog.isCached("dst_tbl"))
+ checkAnswer(sql("SELECT * FROM dst_tbl"), Seq(Row(0, 0), Row(1, 1)))
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org