You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2021/02/21 08:51:04 UTC
[spark] branch master updated: [SPARK-34360][SQL] Support
truncation of v2 tables
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 04c3125 [SPARK-34360][SQL] Support truncation of v2 tables
04c3125 is described below
commit 04c3125dcfb2a40b13eef443e5b543795aa31c34
Author: Max Gekk <ma...@gmail.com>
AuthorDate: Sun Feb 21 17:50:38 2021 +0900
[SPARK-34360][SQL] Support truncation of v2 tables
### What changes were proposed in this pull request?
1. Add new interface `TruncatableTable` which represents tables that allow atomic truncation.
2. Implement new method in `InMemoryTable` and in `InMemoryPartitionTable`.
### Why are the changes needed?
To support `TRUNCATE TABLE` for v2 tables.
### Does this PR introduce _any_ user-facing change?
Should not.
### How was this patch tested?
Added new tests to `TableCatalogSuite` that check truncation of non-partitioned and partitioned tables:
```
$ build/sbt "test:testOnly *TableCatalogSuite"
```
Closes #31475 from MaxGekk/dsv2-truncate-table.
Authored-by: Max Gekk <ma...@gmail.com>
Signed-off-by: HyukjinKwon <gu...@apache.org>
---
.../sql/connector/catalog/SupportsDelete.java | 13 ++++++-
.../sql/connector/catalog/TruncatableTable.java | 35 ++++++++++++++++++
.../apache/spark/sql/connector/InMemoryTable.scala | 4 +-
.../sql/connector/catalog/TableCatalogSuite.scala | 43 +++++++++++++++++++++-
4 files changed, 92 insertions(+), 3 deletions(-)
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsDelete.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsDelete.java
index 8f51f4e..6a28bca 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsDelete.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsDelete.java
@@ -18,6 +18,7 @@
package org.apache.spark.sql.connector.catalog;
import org.apache.spark.annotation.Evolving;
+import org.apache.spark.sql.sources.AlwaysTrue;
import org.apache.spark.sql.sources.Filter;
/**
@@ -27,7 +28,7 @@ import org.apache.spark.sql.sources.Filter;
* @since 3.0.0
*/
@Evolving
-public interface SupportsDelete {
+public interface SupportsDelete extends TruncatableTable {
/**
* Checks whether it is possible to delete data from a data source table that matches filter
@@ -68,4 +69,14 @@ public interface SupportsDelete {
* @throws IllegalArgumentException If the delete is rejected due to required effort
*/
void deleteWhere(Filter[] filters);
+
+ @Override
+ default boolean truncateTable() {
+ Filter[] filters = new Filter[] { new AlwaysTrue() };
+ boolean canDelete = canDeleteWhere(filters);
+ if (canDelete) {
+ deleteWhere(filters);
+ }
+ return canDelete;
+ }
}
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TruncatableTable.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TruncatableTable.java
new file mode 100644
index 0000000..a69f384
--- /dev/null
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TruncatableTable.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog;
+
+import org.apache.spark.annotation.Evolving;
+
+/**
+ * Represents a table which can be atomically truncated.
+ */
+@Evolving
+public interface TruncatableTable extends Table {
+ /**
+ * Truncate a table by removing all rows from the table atomically.
+ *
+ * @return true if a table was truncated successfully otherwise false
+ *
+ * @since 3.2.0
+ */
+ boolean truncateTable();
+}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala
index df531b1..00de3a4 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala
@@ -35,7 +35,7 @@ import org.apache.spark.sql.connector.expressions.{BucketTransform, DaysTransfor
import org.apache.spark.sql.connector.read._
import org.apache.spark.sql.connector.write._
import org.apache.spark.sql.connector.write.streaming.{StreamingDataWriterFactory, StreamingWrite}
-import org.apache.spark.sql.sources.{And, EqualNullSafe, EqualTo, Filter, IsNotNull, IsNull}
+import org.apache.spark.sql.sources.{AlwaysTrue, And, EqualNullSafe, EqualTo, Filter, IsNotNull, IsNull}
import org.apache.spark.sql.types.{DataType, DateType, IntegerType, StringType, StructField, StructType, TimestampType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.unsafe.types.UTF8String
@@ -419,6 +419,7 @@ object InMemoryTable {
null == extractValue(attr, partitionNames, partValues)
case IsNotNull(attr) =>
null != extractValue(attr, partitionNames, partValues)
+ case AlwaysTrue() => true
case f =>
throw new IllegalArgumentException(s"Unsupported filter type: $f")
}
@@ -431,6 +432,7 @@ object InMemoryTable {
case _: EqualNullSafe => true
case _: IsNull => true
case _: IsNotNull => true
+ case _: AlwaysTrue => true
case _ => false
}
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/TableCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/TableCatalogSuite.scala
index ef342e7..485e41f 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/TableCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/TableCatalogSuite.scala
@@ -23,9 +23,11 @@ import java.util.Collections
import scala.collection.JavaConverters._
import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
-import org.apache.spark.sql.connector.InMemoryTableCatalog
+import org.apache.spark.sql.connector.{BufferedRows, InMemoryPartitionTable, InMemoryPartitionTableCatalog, InMemoryTable, InMemoryTableCatalog}
+import org.apache.spark.sql.connector.expressions.LogicalExpressions
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, StringType, StructField, StructType, TimestampType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -887,4 +889,43 @@ class TableCatalogSuite extends SparkFunSuite {
assert(exc.getMessage.contains(testNs.quoted))
}
+
+ test("truncate non-partitioned table") {
+ val catalog = newCatalog()
+
+ val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+ .asInstanceOf[InMemoryTable]
+ table.withData(Array(
+ new BufferedRows("3").withRow(InternalRow(0, "abc", "3")),
+ new BufferedRows("4").withRow(InternalRow(1, "def", "4"))))
+ assert(table.truncateTable())
+ assert(table.rows.isEmpty)
+ }
+
+ test("truncate partitioned table") {
+ val partCatalog = new InMemoryPartitionTableCatalog
+ partCatalog.initialize("test", CaseInsensitiveStringMap.empty())
+
+ val table = partCatalog.createTable(
+ testIdent,
+ new StructType()
+ .add("col0", IntegerType)
+ .add("part0", IntegerType),
+ Array(LogicalExpressions.identity(LogicalExpressions.parseReference("part0"))),
+ util.Collections.emptyMap[String, String])
+ val partTable = table.asInstanceOf[InMemoryPartitionTable]
+ val partIdent = InternalRow.apply(0)
+ val partIdent1 = InternalRow.apply(1)
+ partTable.createPartition(partIdent, new util.HashMap[String, String]())
+ partTable.createPartition(partIdent1, new util.HashMap[String, String]())
+ partTable.withData(Array(
+ new BufferedRows("0").withRow(InternalRow(0, 0)),
+ new BufferedRows("1").withRow(InternalRow(1, 1))
+ ))
+ assert(partTable.listPartitionIdentifiers(Array.empty, InternalRow.empty).length == 2)
+ assert(!partTable.rows.isEmpty)
+ assert(partTable.truncateTable())
+ assert(partTable.listPartitionIdentifiers(Array.empty, InternalRow.empty).length == 2)
+ assert(partTable.rows.isEmpty)
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org