You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2021/02/21 08:51:04 UTC

[spark] branch master updated: [SPARK-34360][SQL] Support truncation of v2 tables

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 04c3125  [SPARK-34360][SQL] Support truncation of v2 tables
04c3125 is described below

commit 04c3125dcfb2a40b13eef443e5b543795aa31c34
Author: Max Gekk <ma...@gmail.com>
AuthorDate: Sun Feb 21 17:50:38 2021 +0900

    [SPARK-34360][SQL] Support truncation of v2 tables
    
    ### What changes were proposed in this pull request?
    1. Add new interface `TruncatableTable` which represents tables that allow atomic truncation.
    2. Implement new method in `InMemoryTable` and in `InMemoryPartitionTable`.
    
    ### Why are the changes needed?
    To support `TRUNCATE TABLE` for v2 tables.
    
    ### Does this PR introduce _any_ user-facing change?
    Should not.
    
    ### How was this patch tested?
    Added new tests to `TableCatalogSuite` that check truncation of non-partitioned and partitioned tables:
    ```
    $ build/sbt "test:testOnly *TableCatalogSuite"
    ```
    
    Closes #31475 from MaxGekk/dsv2-truncate-table.
    
    Authored-by: Max Gekk <ma...@gmail.com>
    Signed-off-by: HyukjinKwon <gu...@apache.org>
---
 .../sql/connector/catalog/SupportsDelete.java      | 13 ++++++-
 .../sql/connector/catalog/TruncatableTable.java    | 35 ++++++++++++++++++
 .../apache/spark/sql/connector/InMemoryTable.scala |  4 +-
 .../sql/connector/catalog/TableCatalogSuite.scala  | 43 +++++++++++++++++++++-
 4 files changed, 92 insertions(+), 3 deletions(-)

diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsDelete.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsDelete.java
index 8f51f4e..6a28bca 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsDelete.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsDelete.java
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.connector.catalog;
 
 import org.apache.spark.annotation.Evolving;
+import org.apache.spark.sql.sources.AlwaysTrue;
 import org.apache.spark.sql.sources.Filter;
 
 /**
@@ -27,7 +28,7 @@ import org.apache.spark.sql.sources.Filter;
  * @since 3.0.0
  */
 @Evolving
-public interface SupportsDelete {
+public interface SupportsDelete extends TruncatableTable {
 
   /**
    * Checks whether it is possible to delete data from a data source table that matches filter
@@ -68,4 +69,14 @@ public interface SupportsDelete {
    * @throws IllegalArgumentException If the delete is rejected due to required effort
    */
   void deleteWhere(Filter[] filters);
+
+  @Override
+  default boolean truncateTable() {
+    Filter[] filters = new Filter[] { new AlwaysTrue() };
+    boolean canDelete = canDeleteWhere(filters);
+    if (canDelete) {
+      deleteWhere(filters);
+    }
+    return canDelete;
+  }
 }
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TruncatableTable.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TruncatableTable.java
new file mode 100644
index 0000000..a69f384
--- /dev/null
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TruncatableTable.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog;
+
+import org.apache.spark.annotation.Evolving;
+
+/**
+ * Represents a table which can be atomically truncated.
+ */
+@Evolving
+public interface TruncatableTable extends Table {
+  /**
+   * Truncate a table by removing all rows from the table atomically.
+   *
+   * @return true if a table was truncated successfully otherwise false
+   *
+   * @since 3.2.0
+   */
+  boolean truncateTable();
+}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala
index df531b1..00de3a4 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala
@@ -35,7 +35,7 @@ import org.apache.spark.sql.connector.expressions.{BucketTransform, DaysTransfor
 import org.apache.spark.sql.connector.read._
 import org.apache.spark.sql.connector.write._
 import org.apache.spark.sql.connector.write.streaming.{StreamingDataWriterFactory, StreamingWrite}
-import org.apache.spark.sql.sources.{And, EqualNullSafe, EqualTo, Filter, IsNotNull, IsNull}
+import org.apache.spark.sql.sources.{AlwaysTrue, And, EqualNullSafe, EqualTo, Filter, IsNotNull, IsNull}
 import org.apache.spark.sql.types.{DataType, DateType, IntegerType, StringType, StructField, StructType, TimestampType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.apache.spark.unsafe.types.UTF8String
@@ -419,6 +419,7 @@ object InMemoryTable {
           null == extractValue(attr, partitionNames, partValues)
         case IsNotNull(attr) =>
           null != extractValue(attr, partitionNames, partValues)
+        case AlwaysTrue() => true
         case f =>
           throw new IllegalArgumentException(s"Unsupported filter type: $f")
       }
@@ -431,6 +432,7 @@ object InMemoryTable {
       case _: EqualNullSafe => true
       case _: IsNull => true
       case _: IsNotNull => true
+      case _: AlwaysTrue => true
       case _ => false
     }
   }
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/TableCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/TableCatalogSuite.scala
index ef342e7..485e41f 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/TableCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/TableCatalogSuite.scala
@@ -23,9 +23,11 @@ import java.util.Collections
 import scala.collection.JavaConverters._
 
 import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
-import org.apache.spark.sql.connector.InMemoryTableCatalog
+import org.apache.spark.sql.connector.{BufferedRows, InMemoryPartitionTable, InMemoryPartitionTableCatalog, InMemoryTable, InMemoryTableCatalog}
+import org.apache.spark.sql.connector.expressions.LogicalExpressions
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, StringType, StructField, StructType, TimestampType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -887,4 +889,43 @@ class TableCatalogSuite extends SparkFunSuite {
 
     assert(exc.getMessage.contains(testNs.quoted))
   }
+
+  test("truncate non-partitioned table") {
+    val catalog = newCatalog()
+
+    val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+      .asInstanceOf[InMemoryTable]
+    table.withData(Array(
+      new BufferedRows("3").withRow(InternalRow(0, "abc", "3")),
+      new BufferedRows("4").withRow(InternalRow(1, "def", "4"))))
+    assert(table.truncateTable())
+    assert(table.rows.isEmpty)
+  }
+
+  test("truncate partitioned table") {
+    val partCatalog = new InMemoryPartitionTableCatalog
+    partCatalog.initialize("test", CaseInsensitiveStringMap.empty())
+
+    val table = partCatalog.createTable(
+      testIdent,
+      new StructType()
+        .add("col0", IntegerType)
+        .add("part0", IntegerType),
+      Array(LogicalExpressions.identity(LogicalExpressions.parseReference("part0"))),
+      util.Collections.emptyMap[String, String])
+    val partTable = table.asInstanceOf[InMemoryPartitionTable]
+    val partIdent = InternalRow.apply(0)
+    val partIdent1 = InternalRow.apply(1)
+    partTable.createPartition(partIdent, new util.HashMap[String, String]())
+    partTable.createPartition(partIdent1, new util.HashMap[String, String]())
+    partTable.withData(Array(
+      new BufferedRows("0").withRow(InternalRow(0, 0)),
+      new BufferedRows("1").withRow(InternalRow(1, 1))
+    ))
+    assert(partTable.listPartitionIdentifiers(Array.empty, InternalRow.empty).length == 2)
+    assert(!partTable.rows.isEmpty)
+    assert(partTable.truncateTable())
+    assert(partTable.listPartitionIdentifiers(Array.empty, InternalRow.empty).length == 2)
+    assert(partTable.rows.isEmpty)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org