You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2021/01/11 00:35:48 UTC
[spark] branch branch-3.1 updated: [SPARK-34055][SQL][3.1] Refresh
cache in `ALTER TABLE .. ADD PARTITION`
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new e0edce7 [SPARK-34055][SQL][3.1] Refresh cache in `ALTER TABLE .. ADD PARTITION`
e0edce7 is described below
commit e0edce79209a2e0d5fe17104983cd70440ad83c5
Author: Max Gekk <ma...@gmail.com>
AuthorDate: Mon Jan 11 09:35:15 2021 +0900
[SPARK-34055][SQL][3.1] Refresh cache in `ALTER TABLE .. ADD PARTITION`
### What changes were proposed in this pull request?
Invoke `refreshTable()` from `CatalogImpl` which refreshes the cache in v1 `ALTER TABLE .. ADD PARTITION`.
### Why are the changes needed?
This fixes the issues portrayed by the example:
```sql
spark-sql> create table tbl (col int, part int) using parquet partitioned by (part);
spark-sql> insert into tbl partition (part=0) select 0;
spark-sql> cache table tbl;
spark-sql> select * from tbl;
0 0
spark-sql> show table extended like 'tbl' partition(part=0);
default tbl false Partition Values: [part=0]
Location: file:/Users/maximgekk/proj/add-partition-refresh-cache-2/spark-warehouse/tbl/part=0
...
```
Create new partition by copying the existing one:
```
$ cp -r /Users/maximgekk/proj/add-partition-refresh-cache-2/spark-warehouse/tbl/part=0 /Users/maximgekk/proj/add-partition-refresh-cache-2/spark-warehouse/tbl/part=1
```
```sql
spark-sql> alter table tbl add partition (part=1) location '/Users/maximgekk/proj/add-partition-refresh-cache-2/spark-warehouse/tbl/part=1';
spark-sql> select * from tbl;
0 0
```
The last query must return `0 1` since it has been added by `ALTER TABLE .. ADD PARTITION`.
### Does this PR introduce _any_ user-facing change?
Yes. After the changes for the example above:
```sql
...
spark-sql> alter table tbl add partition (part=1) location '/Users/maximgekk/proj/add-partition-refresh-cache-2/spark-warehouse/tbl/part=1';
spark-sql> select * from tbl;
0 0
0 1
```
### How was this patch tested?
By running the affected test suite:
```
$ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *CachedTableSuite"
```
Closes #31115 from MaxGekk/add-partition-refresh-cache-2-3.1.
Authored-by: Max Gekk <ma...@gmail.com>
Signed-off-by: HyukjinKwon <gu...@apache.org>
---
.../apache/spark/sql/execution/command/ddl.scala | 1 +
.../org/apache/spark/sql/CachedTableSuite.scala | 29 ++++++++++++++++++++++
.../apache/spark/sql/hive/CachedTableSuite.scala | 29 ++++++++++++++++++++++
3 files changed, 59 insertions(+)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 8195d02..4545d73 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -485,6 +485,7 @@ case class AlterTableAddPartitionCommand(
catalog.createPartitions(table.identifier, batch, ignoreIfExists = ifNotExists)
}
+ sparkSession.catalog.refreshTable(table.identifier.quotedString)
if (table.stats.nonEmpty) {
if (sparkSession.sessionState.conf.autoSizeUpdateEnabled) {
val addedSize = CommandUtils.calculateMultipleLocationSizes(sparkSession, table.identifier,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index b9b5299..2b4871a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -23,6 +23,8 @@ import java.nio.file.{Files, Paths}
import scala.collection.mutable.HashSet
import scala.concurrent.duration._
+import org.apache.commons.io.FileUtils
+
import org.apache.spark.CleanerListener
import org.apache.spark.executor.DataReadMethod._
import org.apache.spark.executor.DataReadMethod.DataReadMethod
@@ -1335,4 +1337,31 @@ class CachedTableSuite extends QueryTest with SQLTestUtils
QueryTest.checkAnswer(sql("SELECT * FROM t"), Seq(Row(0, 2), Row(1, 1)))
}
}
+
+ test("SPARK-34055: refresh cache in partition adding") {
+ withTable("t") {
+ sql("CREATE TABLE t (id int, part int) USING parquet PARTITIONED BY (part)")
+ sql("INSERT INTO t PARTITION (part=0) SELECT 0")
+ assert(!spark.catalog.isCached("t"))
+ sql("CACHE TABLE t")
+ assert(spark.catalog.isCached("t"))
+ checkAnswer(sql("SELECT * FROM t"), Seq(Row(0, 0)))
+
+ // Create new partition (part = 1) in the filesystem
+ val information = sql("SHOW TABLE EXTENDED LIKE 't' PARTITION (part = 0)")
+ .select("information")
+ .first().getString(0)
+ val part0Loc = information
+ .split("\\r?\\n")
+ .filter(_.startsWith("Location:"))
+ .head
+ .replace("Location: file:", "")
+ val part1Loc = part0Loc.replace("part=0", "part=1")
+ FileUtils.copyDirectory(new File(part0Loc), new File(part1Loc))
+
+ sql(s"ALTER TABLE t ADD PARTITION (part=1) LOCATION '$part1Loc'")
+ assert(spark.catalog.isCached("t"))
+ checkAnswer(sql("SELECT * FROM t"), Seq(Row(0, 0), Row(0, 1)))
+ }
+ }
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
index c44b1cb..a7c8b61 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.hive
import java.io.File
+import org.apache.commons.io.FileUtils
+
import org.apache.spark.sql.{AnalysisException, Dataset, QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
@@ -469,4 +471,31 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
QueryTest.checkAnswer(sql("SELECT * FROM t"), Seq(Row(0, 2), Row(1, 1)))
}
}
+
+ test("SPARK-34055: refresh cache in partition adding") {
+ withTable("t") {
+ sql("CREATE TABLE t (id int, part int) USING hive PARTITIONED BY (part)")
+ sql("INSERT INTO t PARTITION (part=0) SELECT 0")
+ assert(!spark.catalog.isCached("t"))
+ sql("CACHE TABLE t")
+ assert(spark.catalog.isCached("t"))
+ checkAnswer(sql("SELECT * FROM t"), Seq(Row(0, 0)))
+
+ // Create new partition (part = 1) in the filesystem
+ val information = sql("SHOW TABLE EXTENDED LIKE 't' PARTITION (part = 0)")
+ .select("information")
+ .first().getString(0)
+ val part0Loc = information
+ .split("\\r?\\n")
+ .filter(_.startsWith("Location:"))
+ .head
+ .replace("Location: file:", "")
+ val part1Loc = part0Loc.replace("part=0", "part=1")
+ FileUtils.copyDirectory(new File(part0Loc), new File(part1Loc))
+
+ sql(s"ALTER TABLE t ADD PARTITION (part=1) LOCATION '$part1Loc'")
+ assert(spark.catalog.isCached("t"))
+ checkAnswer(sql("SELECT * FROM t"), Seq(Row(0, 0), Row(0, 1)))
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org