You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by th...@apache.org on 2014/08/30 02:24:23 UTC

svn commit: r1621400 - in /hive/trunk: common/src/java/org/apache/hadoop/hive/common/ itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/security/ ql/src/java/org/apache/hadoop/hive/ql/security/authorization/

Author: thejas
Date: Sat Aug 30 00:24:22 2014
New Revision: 1621400

URL: http://svn.apache.org/r1621400
Log:
HIVE-7895 : Storage based authorization should consider sticky bit for drop actions (Thejas Nair, reviewed by Jason Dere)

Added:
    hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/security/TestStorageBasedMetastoreAuthorizationDrops.java
Modified:
    hive/trunk/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
    hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/security/TestStorageBasedMetastoreAuthorizationProvider.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/StorageBasedAuthorizationProvider.java

Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/common/FileUtils.java?rev=1621400&r1=1621399&r2=1621400&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/common/FileUtils.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/common/FileUtils.java Sat Aug 30 00:24:22 2014
@@ -24,7 +24,6 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.security.AccessControlException;
 import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.List;
 
@@ -34,12 +33,10 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.FsShell;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.shims.HadoopShims;
 import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatus;
@@ -628,4 +625,62 @@ public final class FileUtils {
     //Once equality has been added in HDFS-4321, we should make use of it
     return fs1.getUri().equals(fs2.getUri());
   }
+
+  /**
+   * Checks if delete can be performed on given path by given user.
+   * If file does not exist it just returns without throwing an Exception
+   * @param path
+   * @param conf
+   * @param user
+   * @throws AccessControlException
+   * @throws InterruptedException
+   * @throws Exception
+   */
+  public static void checkDeletePermission(Path path, Configuration conf, String user)
+      throws AccessControlException, InterruptedException, Exception {
+   // This requires ability to delete the given path.
+    // The following 2 conditions should be satisfied for this-
+    // 1. Write permissions on parent dir
+    // 2. If sticky bit is set on parent dir then one of following should be
+    // true
+    //   a. User is owner of the current dir/file
+    //   b. User is owner of the parent dir
+    //   Super users are also allowed to drop the file, but there is no good way of checking
+    //   if a user is a super user. Also super users running hive queries is not a common
+    //   use case. super users can also do a chown to be able to drop the file
+
+    final FileSystem fs = path.getFileSystem(conf);
+    if (!fs.exists(path)) {
+      // no file/dir to be deleted
+      return;
+    }
+    Path parPath = path.getParent();
+    // check user has write permissions on the parent dir
+    FileStatus stat = fs.getFileStatus(path);
+    FileUtils.checkFileAccessWithImpersonation(fs, stat, FsAction.WRITE, user);
+
+    // check if sticky bit is set on the parent dir
+    FileStatus parStatus = fs.getFileStatus(parPath);
+    if (!parStatus.getPermission().getStickyBit()) {
+      // no sticky bit, so write permission on parent dir is sufficient
+      // no further checks needed
+      return;
+    }
+
+    // check if user is owner of parent dir
+    if (parStatus.getOwner().equals(user)) {
+      return;
+    }
+
+    // check if user is owner of current dir/file
+    FileStatus childStatus = fs.getFileStatus(path);
+    if (childStatus.getOwner().equals(user)) {
+      return;
+    }
+    String msg = String.format("Permission Denied: User %s can't delete %s because sticky bit is"
+        + " set on the parent dir and user does not own this file or its parent", user, path);
+    throw new IOException(msg);
+
+  }
+
 }

Added: hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/security/TestStorageBasedMetastoreAuthorizationDrops.java
URL: http://svn.apache.org/viewvc/hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/security/TestStorageBasedMetastoreAuthorizationDrops.java?rev=1621400&view=auto
==============================================================================
--- hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/security/TestStorageBasedMetastoreAuthorizationDrops.java (added)
+++ hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/security/TestStorageBasedMetastoreAuthorizationDrops.java Sat Aug 30 00:24:22 2014
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.security;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.security.authorization.AuthorizationPreEventListener;
+import org.apache.hadoop.hive.ql.security.authorization.StorageBasedAuthorizationProvider;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * Test cases focusing on drop table permission checks
+ */
+public class TestStorageBasedMetastoreAuthorizationDrops extends TestCase{
+  protected HiveConf clientHiveConf;
+  protected HiveMetaStoreClient msc;
+  protected Driver driver;
+  protected UserGroupInformation ugi;
+  private static int objNum = 0;
+
+  protected String getAuthorizationProvider(){
+    return StorageBasedAuthorizationProvider.class.getName();
+  }
+
+  protected HiveConf createHiveConf() throws Exception {
+    return new HiveConf(this.getClass());
+  }
+
+  @Override
+  protected void setUp() throws Exception {
+
+    super.setUp();
+
+    int port = MetaStoreUtils.findFreePort();
+
+    // Turn on metastore-side authorization
+    System.setProperty(HiveConf.ConfVars.METASTORE_PRE_EVENT_LISTENERS.varname,
+        AuthorizationPreEventListener.class.getName());
+    System.setProperty(HiveConf.ConfVars.HIVE_METASTORE_AUTHORIZATION_MANAGER.varname,
+        getAuthorizationProvider());
+    System.setProperty(HiveConf.ConfVars.HIVE_METASTORE_AUTHENTICATOR_MANAGER.varname,
+        InjectableDummyAuthenticator.class.getName());
+
+    MetaStoreUtils.startMetaStore(port, ShimLoader.getHadoopThriftAuthBridge());
+
+    clientHiveConf = createHiveConf();
+
+    // Turn off client-side authorization
+    clientHiveConf.setBoolVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_ENABLED,false);
+
+    clientHiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:" + port);
+    clientHiveConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3);
+    clientHiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
+
+    clientHiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+    clientHiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+
+    ugi = ShimLoader.getHadoopShims().getUGIForConf(clientHiveConf);
+
+    SessionState.start(new CliSessionState(clientHiveConf));
+    msc = new HiveMetaStoreClient(clientHiveConf, null);
+    driver = new Driver(clientHiveConf);
+
+    setupFakeUser();
+  }
+
+
+  public void testDropDatabase() throws Exception {
+    dropDatabaseByOtherUser("-rwxrwxrwx", 0);
+    dropDatabaseByOtherUser("-rwxrwxrwt", 1);
+  }
+
+  /**
+   * Creates db and tries to drop as 'other' user
+   * @param perm - permission for warehouse dir
+   * @param expectedRet - expected return code for drop by other user
+   * @throws Exception
+   */
+  private void dropDatabaseByOtherUser(String perm, int expectedRet) throws Exception {
+    String dbName = getTestDbName();
+    setPermissions(clientHiveConf.getVar(ConfVars.METASTOREWAREHOUSE), perm);
+
+    CommandProcessorResponse resp = driver.run("create database " + dbName);
+    assertEquals(0, resp.getResponseCode());
+    Database db = msc.getDatabase(dbName);
+    validateCreateDb(db, dbName);
+
+    InjectableDummyAuthenticator.injectMode(true);
+
+
+    resp = driver.run("drop database " + dbName);
+    assertEquals(expectedRet, resp.getResponseCode());
+
+  }
+
+  public void testDropTable() throws Exception {
+    dropTableByOtherUser("-rwxrwxrwx", 0);
+    dropTableByOtherUser("-rwxrwxrwt", 1);
+  }
+
+  /**
+   * @param perm dir permission for database dir
+   * @param expectedRet expected return code on drop table
+   * @throws Exception
+   */
+  private void dropTableByOtherUser(String perm, int expectedRet) throws Exception {
+    String dbName = getTestDbName();
+    String tblName = getTestTableName();
+    setPermissions(clientHiveConf.getVar(ConfVars.METASTOREWAREHOUSE), "-rwxrwxrwx");
+
+    CommandProcessorResponse resp = driver.run("create database " + dbName);
+    assertEquals(0, resp.getResponseCode());
+    Database db = msc.getDatabase(dbName);
+    validateCreateDb(db, dbName);
+
+    setPermissions(db.getLocationUri(), perm);
+
+    String dbDotTable = dbName + "." + tblName;
+    resp = driver.run("create table " + dbDotTable + "(i int)");
+    assertEquals(0, resp.getResponseCode());
+
+
+    InjectableDummyAuthenticator.injectMode(true);
+    resp = driver.run("drop table " + dbDotTable);
+    assertEquals(expectedRet, resp.getResponseCode());
+  }
+
+
+  public void testDropPartition() throws Exception {
+    dropPartitionByOtherUser("-rwxrwxrwx", 0);
+    dropPartitionByOtherUser("-rwxrwxrwt", 1);
+  }
+
+  /**
+   * @param perm permissions for table dir
+   * @param expectedRet expected return code
+   * @throws Exception
+   */
+  private void dropPartitionByOtherUser(String perm, int expectedRet) throws Exception {
+    String dbName = getTestDbName();
+    String tblName = getTestTableName();
+    setPermissions(clientHiveConf.getVar(ConfVars.METASTOREWAREHOUSE), "-rwxrwxrwx");
+
+    CommandProcessorResponse resp = driver.run("create database " + dbName);
+    assertEquals(0, resp.getResponseCode());
+    Database db = msc.getDatabase(dbName);
+    validateCreateDb(db, dbName);
+    setPermissions(db.getLocationUri(), "-rwxrwxrwx");
+
+    String dbDotTable = dbName + "." + tblName;
+    resp = driver.run("create table " + dbDotTable + "(i int) partitioned by (b string)");
+    assertEquals(0, resp.getResponseCode());
+    Table tab = msc.getTable(dbName, tblName);
+    setPermissions(tab.getSd().getLocation(), perm);
+
+    resp = driver.run("alter table " + dbDotTable + " add partition (b='2011')");
+    assertEquals(0, resp.getResponseCode());
+
+    InjectableDummyAuthenticator.injectMode(true);
+    resp = driver.run("alter table " + dbDotTable + " drop partition (b='2011')");
+    assertEquals(expectedRet, resp.getResponseCode());
+  }
+
+  private void setupFakeUser() {
+    String fakeUser = "mal";
+    List<String> fakeGroupNames = new ArrayList<String>();
+    fakeGroupNames.add("groupygroup");
+
+    InjectableDummyAuthenticator.injectUserName(fakeUser);
+    InjectableDummyAuthenticator.injectGroupNames(fakeGroupNames);
+    InjectableDummyAuthenticator.injectMode(true);
+  }
+
+  private String setupUser() {
+    return ugi.getUserName();
+  }
+
+  private String getTestTableName() {
+    return this.getClass().getSimpleName() + "tab" + ++objNum;
+  }
+
+  private String getTestDbName() {
+    return this.getClass().getSimpleName() + "db" + ++objNum;
+  }
+
+  @Override
+  protected void tearDown() throws Exception {
+    super.tearDown();
+    InjectableDummyAuthenticator.injectMode(false);
+  }
+
+  protected void setPermissions(String locn, String permissions) throws Exception {
+    FileSystem fs = FileSystem.get(new URI(locn), clientHiveConf);
+    fs.setPermission(new Path(locn), FsPermission.valueOf(permissions));
+  }
+
+  private void validateCreateDb(Database expectedDb, String dbName) {
+    assertEquals(expectedDb.getName().toLowerCase(), dbName.toLowerCase());
+  }
+
+  private void validateCreateTable(Table expectedTable, String tblName, String dbName) {
+    assertNotNull(expectedTable);
+    assertEquals(expectedTable.getTableName().toLowerCase(),tblName.toLowerCase());
+    assertEquals(expectedTable.getDbName().toLowerCase(),dbName.toLowerCase());
+  }
+}

Modified: hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/security/TestStorageBasedMetastoreAuthorizationProvider.java
URL: http://svn.apache.org/viewvc/hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/security/TestStorageBasedMetastoreAuthorizationProvider.java?rev=1621400&r1=1621399&r2=1621400&view=diff
==============================================================================
--- hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/security/TestStorageBasedMetastoreAuthorizationProvider.java (original)
+++ hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/security/TestStorageBasedMetastoreAuthorizationProvider.java Sat Aug 30 00:24:22 2014
@@ -19,7 +19,6 @@
 package org.apache.hadoop.hive.ql.security;
 
 import java.net.URI;
-import java.security.AccessControlException;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -48,7 +47,7 @@ public class TestStorageBasedMetastoreAu
   @Override
   protected void allowCreateInDb(String dbName, String userName, String location)
       throws Exception {
-    setPermissions(location,"-rwxr--r--");
+    setPermissions(location,"-rwxr--r-t");
   }
 
   @Override
@@ -79,7 +78,7 @@ public class TestStorageBasedMetastoreAu
   @Override
   protected void allowDropOnDb(String dbName, String userName, String location)
       throws Exception {
-    setPermissions(location,"-rwxr--r--");
+    setPermissions(location,"-rwxr--r-t");
   }
 
   protected void setPermissions(String locn, String permissions) throws Exception {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/StorageBasedAuthorizationProvider.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/StorageBasedAuthorizationProvider.java?rev=1621400&r1=1621399&r2=1621400&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/StorageBasedAuthorizationProvider.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/StorageBasedAuthorizationProvider.java Sat Aug 30 00:24:22 2014
@@ -21,7 +21,7 @@ package org.apache.hadoop.hive.ql.securi
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.security.AccessControlException;
-import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.List;
 
@@ -34,12 +34,9 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.hive.common.FileUtils;
-import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler;
+import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.MetaException;
@@ -48,7 +45,6 @@ import org.apache.hadoop.hive.ql.metadat
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.shims.ShimLoader;
 
 /**
  * StorageBasedAuthorizationProvider is an implementation of
@@ -141,28 +137,77 @@ public class StorageBasedAuthorizationPr
   public void authorize(Database db, Privilege[] readRequiredPriv, Privilege[] writeRequiredPriv)
       throws HiveException, AuthorizationException {
     Path path = getDbLocation(db);
+
+    // extract drop privileges
+    DropPrivilegeExtractor privExtractor = new DropPrivilegeExtractor(readRequiredPriv,
+        writeRequiredPriv);
+    readRequiredPriv = privExtractor.getReadReqPriv();
+    writeRequiredPriv = privExtractor.getWriteReqPriv();
+
+    // authorize drops if there was a drop privilege requirement
+    if(privExtractor.hasDropPrivilege()) {
+      checkDeletePermission(path, getConf(), authenticator.getUserName());
+    }
+
     authorize(path, readRequiredPriv, writeRequiredPriv);
   }
 
   @Override
   public void authorize(Table table, Privilege[] readRequiredPriv, Privilege[] writeRequiredPriv)
       throws HiveException, AuthorizationException {
-
-    // To create/drop/alter a table, the owner should have WRITE permission on the database directory
-    authorize(hive_db.getDatabase(table.getDbName()), readRequiredPriv, writeRequiredPriv);
-
-    // If the user has specified a location - external or not, check if the user has the
     try {
       initWh();
-      String location = table.getTTable().getSd().getLocation();
-      if (location != null && !location.isEmpty()) {
-        authorize(new Path(location), readRequiredPriv, writeRequiredPriv);
-      }
     } catch (MetaException ex) {
       throw hiveException(ex);
     }
+
+    // extract any drop privileges out of required privileges
+    DropPrivilegeExtractor privExtractor = new DropPrivilegeExtractor(readRequiredPriv,
+        writeRequiredPriv);
+    readRequiredPriv = privExtractor.getReadReqPriv();
+    writeRequiredPriv = privExtractor.getWriteReqPriv();
+
+    // if CREATE or DROP priv requirement is there, the owner should have WRITE permission on
+    // the database directory
+    if (privExtractor.hasDropPrivilege || requireCreatePrivilege(readRequiredPriv)
+        || requireCreatePrivilege(writeRequiredPriv)) {
+      authorize(hive_db.getDatabase(table.getDbName()), new Privilege[] {},
+          new Privilege[] { Privilege.ALTER_DATA });
+    }
+
+    Path path = table.getDataLocation();
+    // authorize drops if there was a drop privilege requirement, and
+    // table is not external (external table data is not dropped)
+    if (privExtractor.hasDropPrivilege() && table.getTableType() != TableType.EXTERNAL_TABLE) {
+      checkDeletePermission(path, getConf(), authenticator.getUserName());
+    }
+
+    // If the user has specified a location - external or not, check if the user
+    // has the permissions on the table dir
+    if (path != null) {
+      authorize(path, readRequiredPriv, writeRequiredPriv);
+    }
   }
 
+
+  /**
+   *
+   * @param privs
+   * @return true, if set of given privileges privs contain CREATE privilege
+   */
+  private boolean requireCreatePrivilege(Privilege[] privs) {
+    if(privs == null) {
+      return false;
+    }
+    for (Privilege priv : privs) {
+      if (priv.equals(Privilege.CREATE)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+
   @Override
   public void authorize(Partition part, Privilege[] readRequiredPriv, Privilege[] writeRequiredPriv)
       throws HiveException, AuthorizationException {
@@ -173,17 +218,39 @@ public class StorageBasedAuthorizationPr
       Privilege[] writeRequiredPriv)
       throws HiveException, AuthorizationException {
 
+    // extract drop privileges
+    DropPrivilegeExtractor privExtractor = new DropPrivilegeExtractor(readRequiredPriv,
+        writeRequiredPriv);
+    readRequiredPriv = privExtractor.getReadReqPriv();
+    writeRequiredPriv = privExtractor.getWriteReqPriv();
+
+    // authorize drops if there was a drop privilege requirement
+    if(privExtractor.hasDropPrivilege()) {
+      checkDeletePermission(part.getDataLocation(), getConf(), authenticator.getUserName());
+    }
+
     // Partition path can be null in the case of a new create partition - in this case,
     // we try to default to checking the permissions of the parent table.
     // Partition itself can also be null, in cases where this gets called as a generic
     // catch-all call in cases like those with CTAS onto an unpartitioned table (see HIVE-1887)
     if ((part == null) || (part.getLocation() == null)) {
-      authorize(table, readRequiredPriv, writeRequiredPriv);
+      // this should be the case only if this is a create partition.
+      // The privilege needed on the table should be ALTER_DATA, and not CREATE
+      authorize(table, new Privilege[]{}, new Privilege[]{Privilege.ALTER_DATA});
     } else {
       authorize(part.getDataLocation(), readRequiredPriv, writeRequiredPriv);
     }
   }
 
+  private void checkDeletePermission(Path dataLocation, Configuration conf, String userName)
+      throws HiveException {
+    try {
+      FileUtils.checkDeletePermission(dataLocation, conf, userName);
+    } catch (Exception e) {
+      throw new HiveException(e);
+    }
+  }
+
   @Override
   public void authorize(Table table, Partition part, List<String> columns,
       Privilege[] readRequiredPriv, Privilege[] writeRequiredPriv) throws HiveException,
@@ -191,11 +258,7 @@ public class StorageBasedAuthorizationPr
     // In a simple storage-based auth, we have no information about columns
     // living in different files, so we do simple partition-auth and ignore
     // the columns parameter.
-    if ((part != null) && (part.getTable() != null)) {
-      authorize(part.getTable(), part, readRequiredPriv, writeRequiredPriv);
-    } else {
-      authorize(table, part, readRequiredPriv, writeRequiredPriv);
-    }
+    authorize(table, part, readRequiredPriv, writeRequiredPriv);
   }
 
   @Override
@@ -373,4 +436,48 @@ public class StorageBasedAuthorizationPr
     // no-op - SBA does not attempt to authorize auth api call. Allow it
   }
 
+  public class DropPrivilegeExtractor {
+
+    private boolean hasDropPrivilege = false;
+    private final Privilege[] readReqPriv;
+    private final Privilege[] writeReqPriv;
+
+    public DropPrivilegeExtractor(Privilege[] readRequiredPriv, Privilege[] writeRequiredPriv) {
+      this.readReqPriv = extractDropPriv(readRequiredPriv);
+      this.writeReqPriv = extractDropPriv(writeRequiredPriv);
+    }
+
+    private Privilege[] extractDropPriv(Privilege[] requiredPrivs) {
+      if (requiredPrivs == null) {
+        return null;
+      }
+      List<Privilege> privList = new ArrayList<Privilege>();
+      for (Privilege priv : requiredPrivs) {
+        if (priv.equals(Privilege.DROP)) {
+          hasDropPrivilege = true;
+        } else {
+          privList.add(priv);
+        }
+      }
+      return privList.toArray(new Privilege[0]);
+    }
+
+    public boolean hasDropPrivilege() {
+      return hasDropPrivilege;
+    }
+
+    public void setHasDropPrivilege(boolean hasDropPrivilege) {
+      this.hasDropPrivilege = hasDropPrivilege;
+    }
+
+    public Privilege[] getReadReqPriv() {
+      return readReqPriv;
+    }
+
+    public Privilege[] getWriteReqPriv() {
+      return writeReqPriv;
+    }
+
+  }
+
 }