You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/01/11 18:16:08 UTC

[spark] branch branch-2.4 updated: [SPARK-30312][SQL][2.4] Preserve path permission and acl when truncate table

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 9b53480  [SPARK-30312][SQL][2.4] Preserve path permission and acl when truncate table
9b53480 is described below

commit 9b534808c490a8d138833c29c3031fd45557fcfc
Author: Liang-Chi Hsieh <li...@uber.com>
AuthorDate: Sat Jan 11 10:14:16 2020 -0800

    [SPARK-30312][SQL][2.4] Preserve path permission and acl when truncate table
    
    ### What changes were proposed in this pull request?
    
    This patch proposes to preserve existing permission/acls of paths when truncate table/partition.
    Note that this is backport of #26956 to branch-2.4.
    
    ### Why are the changes needed?
    
    When Spark SQL truncates table, it deletes the paths of table/partitions, then re-create new ones. If permission/acls were set on the paths, the existing permission/acls will be deleted.
    
    We should preserve the permission/acls if possible.
    
    ### Does this PR introduce any user-facing change?
    
    Yes. When truncate table/partition, Spark will keep permission/acls of paths.
    
    ### How was this patch tested?
    
    Unit test and manual test as shown in #26956.
    
    Closes #27173 from viirya/truncate-table-permission-2.4.
    
    Authored-by: Liang-Chi Hsieh <li...@uber.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../org/apache/spark/sql/internal/SQLConf.scala    | 11 +++
 .../spark/sql/execution/command/tables.scala       | 46 +++++++++++++
 .../spark/sql/execution/command/DDLSuite.scala     | 79 +++++++++++++++++++++-
 3 files changed, 135 insertions(+), 1 deletion(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index c7167f4..7a83df9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1576,6 +1576,14 @@ object SQLConf {
         "turning the flag on provides a way for these sources to see these partitionBy columns.")
       .booleanConf
       .createWithDefault(false)
+
+  val TRUNCATE_TABLE_IGNORE_PERMISSION_ACL =
+    buildConf("spark.sql.truncateTable.ignorePermissionAcl")
+      .internal()
+      .doc("When set to true, TRUNCATE TABLE command will not try to set back original " +
+        "permission and ACLs when re-creating the table/partition paths.")
+      .booleanConf
+      .createWithDefault(false)
 }
 
 /**
@@ -1983,6 +1991,9 @@ class SQLConf extends Serializable with Logging {
 
   def setOpsPrecedenceEnforced: Boolean = getConf(SQLConf.LEGACY_SETOPS_PRECEDENCE_ENABLED)
 
+  def truncateTableIgnorePermissionAcl: Boolean =
+    getConf(SQLConf.TRUNCATE_TABLE_IGNORE_PERMISSION_ACL)
+
   /** ********************** SQLConf functionality methods ************ */
 
   /** Set Spark SQL configuration properties. */
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 6dc8426..5323bf65 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -26,6 +26,7 @@ import scala.util.Try
 import scala.util.control.NonFatal
 
 import org.apache.hadoop.fs.{FileContext, FsConstants, Path}
+import org.apache.hadoop.fs.permission.{AclEntry, FsPermission}
 
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
@@ -457,13 +458,58 @@ case class TruncateTableCommand(
         partLocations
       }
     val hadoopConf = spark.sessionState.newHadoopConf()
+    val ignorePermissionAcl = SQLConf.get.truncateTableIgnorePermissionAcl
     locations.foreach { location =>
       if (location.isDefined) {
         val path = new Path(location.get)
         try {
           val fs = path.getFileSystem(hadoopConf)
+
+          // Not all fs impl. support these APIs.
+          var optPermission: Option[FsPermission] = None
+          var optAcls: Option[java.util.List[AclEntry]] = None
+          if (!ignorePermissionAcl) {
+            val fileStatus = fs.getFileStatus(path)
+            try {
+              optPermission = Some(fileStatus.getPermission())
+            } catch {
+              case NonFatal(_) => // do nothing
+            }
+
+            try {
+              optAcls = Some(fs.getAclStatus(path).getEntries)
+            } catch {
+              case NonFatal(_) => // do nothing
+            }
+          }
+
           fs.delete(path, true)
+          // We should keep original permission/acl of the path.
+          // For owner/group, only super-user can set it, for example on HDFS. Because
+          // current user can delete the path, we assume the user/group is correct or not an issue.
           fs.mkdirs(path)
+          if (!ignorePermissionAcl) {
+            optPermission.foreach { permission =>
+              try {
+                fs.setPermission(path, permission)
+              } catch {
+                case NonFatal(e) =>
+                  throw new SecurityException(
+                    s"Failed to set original permission $permission back to " +
+                      s"the created path: $path. Exception: ${e.getMessage}")
+              }
+            }
+            optAcls.foreach { acls =>
+              try {
+                fs.setAcl(path, acls)
+              } catch {
+                case NonFatal(e) =>
+                  throw new SecurityException(
+                    s"Failed to set original ACL $acls back to " +
+                      s"the created path: $path. Exception: ${e.getMessage}")
+              }
+            }
+          }
         } catch {
           case NonFatal(e) =>
             throw new AnalysisException(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 2149329..753fe1c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -21,7 +21,8 @@ import java.io.{File, PrintWriter}
 import java.net.URI
 import java.util.Locale
 
-import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.{Path, RawLocalFileSystem}
+import org.apache.hadoop.fs.permission.{AclEntry, AclEntryScope, AclEntryType, AclStatus, FsAction, FsPermission}
 import org.scalatest.BeforeAndAfterEach
 
 import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
@@ -1935,6 +1936,60 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
     }
   }
 
+  test("SPARK-30312: truncate table - keep acl/permission") {
+    import testImplicits._
+    val ignorePermissionAcl = Seq(true, false)
+
+    ignorePermissionAcl.foreach { ignore =>
+      withSQLConf(
+        "fs.file.impl" -> classOf[FakeLocalFsFileSystem].getName,
+        "fs.file.impl.disable.cache" -> "true",
+        SQLConf.TRUNCATE_TABLE_IGNORE_PERMISSION_ACL.key -> ignore.toString) {
+        withTable("tab1") {
+          sql("CREATE TABLE tab1 (col INT) USING parquet")
+          sql("INSERT INTO tab1 SELECT 1")
+          checkAnswer(spark.table("tab1"), Row(1))
+
+          val tablePath = new Path(spark.sessionState.catalog
+            .getTableMetadata(TableIdentifier("tab1")).storage.locationUri.get)
+
+          val hadoopConf = spark.sessionState.newHadoopConf()
+          val fs = tablePath.getFileSystem(hadoopConf)
+          val fileStatus = fs.getFileStatus(tablePath);
+
+          fs.setPermission(tablePath, new FsPermission("777"))
+          assert(fileStatus.getPermission().toString() == "rwxrwxrwx")
+
+          // Set ACL to table path.
+          val customAcl = new java.util.ArrayList[AclEntry]()
+          customAcl.add(new AclEntry.Builder()
+            .setType(AclEntryType.USER)
+            .setScope(AclEntryScope.ACCESS)
+            .setPermission(FsAction.READ).build())
+          fs.setAcl(tablePath, customAcl)
+          assert(fs.getAclStatus(tablePath).getEntries().get(0) == customAcl.get(0))
+
+          sql("TRUNCATE TABLE tab1")
+          assert(spark.table("tab1").collect().isEmpty)
+
+          val fileStatus2 = fs.getFileStatus(tablePath)
+          if (ignore) {
+            assert(fileStatus2.getPermission().toString() == "rwxr-xr-x")
+          } else {
+            assert(fileStatus2.getPermission().toString() == "rwxrwxrwx")
+          }
+          val aclEntries = fs.getAclStatus(tablePath).getEntries()
+          if (ignore) {
+            assert(aclEntries.size() == 0)
+          } else {
+            assert(aclEntries.size() == 1)
+            assert(aclEntries.get(0) == customAcl.get(0))
+          }
+        }
+      }
+    }
+  }
+
   test("create temporary view with mismatched schema") {
     withTable("tab1") {
       spark.range(10).write.saveAsTable("tab1")
@@ -2752,3 +2807,25 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
     }
   }
 }
+
+object FakeLocalFsFileSystem {
+  var aclStatus = new AclStatus.Builder().build()
+}
+
+// A fake test local filesystem used to test ACL. It keeps a ACL status. If deletes
+// a path of this filesystem, it will clean up the ACL status. Note that for test purpose,
+// it has only one ACL status for all paths.
+class FakeLocalFsFileSystem extends RawLocalFileSystem {
+  import FakeLocalFsFileSystem._
+
+  override def delete(f: Path, recursive: Boolean): Boolean = {
+    aclStatus = new AclStatus.Builder().build()
+    super.delete(f, recursive)
+  }
+
+  override def getAclStatus(path: Path): AclStatus = aclStatus
+
+  override def setAcl(path: Path, aclSpec: java.util.List[AclEntry]): Unit = {
+    aclStatus = new AclStatus.Builder().addEntries(aclSpec).build()
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org