You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2015/05/07 03:21:23 UTC

[29/52] [abbrv] hive git commit: HIVE-9736 : StorageBasedAuthProvider should batch namenode-calls where possible (Mithun Radhakrishnan, reviewed by Chris Nauroth, Sushanth Sowmyan)

HIVE-9736 : StorageBasedAuthProvider should batch namenode-calls where possible (Mithun Radhakrishnan, reviewed by Chris Nauroth, Sushanth Sowmyan)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/19886150
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/19886150
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/19886150

Branch: refs/heads/llap
Commit: 19886150121b6081127bf1e581b24d8dcc12f1df
Parents: 3f72f81
Author: Sushanth Sowmyan <kh...@gmail.com>
Authored: Tue May 5 08:56:27 2015 -0700
Committer: Sushanth Sowmyan <kh...@gmail.com>
Committed: Tue May 5 08:58:35 2015 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hive/common/FileUtils.java    | 155 +++++++++++++------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   7 +
 .../StorageBasedAuthorizationProvider.java      | 114 +++++++++++++-
 .../apache/hadoop/hive/shims/Hadoop23Shims.java |  29 +++-
 .../org/apache/hadoop/fs/DefaultFileAccess.java |  65 +++++---
 .../apache/hadoop/hive/shims/HadoopShims.java   |  24 ++-
 .../hadoop/hive/shims/HadoopShimsSecure.java    |   8 +
 7 files changed, 318 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/19886150/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
index c2c54bc..536fe11 100644
--- a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
+++ b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
@@ -25,12 +25,16 @@ import java.net.URISyntaxException;
 import java.security.AccessControlException;
 import java.security.PrivilegedExceptionAction;
 import java.util.BitSet;
+import java.util.EnumSet;
+import java.util.Iterator;
 import java.util.List;
 
+import com.google.common.base.Function;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.DefaultFileAccess;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
@@ -369,26 +373,54 @@ public final class FileUtils {
   public static void checkFileAccessWithImpersonation(final FileSystem fs,
       final FileStatus stat, final FsAction action, final String user)
           throws IOException, AccessControlException, InterruptedException, Exception {
+    checkFileAccessWithImpersonation(fs,
+                                     Iterators.singletonIterator(stat),
+                                     EnumSet.of(action),
+                                     user);
+  }
+
+  /**
+   * Perform a check to determine if the user is able to access the file passed in.
+   * If the user name passed in is different from the current user, this method will
+   * attempt to do impersonate the user to do the check; the current user should be
+   * able to create proxy users in this case.
+   * @param fs   FileSystem of the path to check
+   * @param statuses FileStatus instances representing the file
+   * @param actions The FsActions that will be checked
+   * @param user User name of the user that will be checked for access.  If the user name
+   *             is null or the same as the current user, no user impersonation will be done
+   *             and the check will be done as the current user. Otherwise the file access
+   *             check will be performed within a doAs() block to use the access privileges
+   *             of this user. In this case the user must be configured to impersonate other
+   *             users, otherwise this check will fail with error.
+   * @throws IOException
+   * @throws AccessControlException
+   * @throws InterruptedException
+   * @throws Exception
+   */
+  public static void checkFileAccessWithImpersonation(final FileSystem fs,
+      final Iterator<FileStatus> statuses, final EnumSet<FsAction> actions, final String user)
+          throws IOException, AccessControlException, InterruptedException, Exception {
     UserGroupInformation ugi = Utils.getUGI();
     String currentUser = ugi.getShortUserName();
 
     if (user == null || currentUser.equals(user)) {
       // No need to impersonate user, do the checks as the currently configured user.
-      ShimLoader.getHadoopShims().checkFileAccess(fs, stat, action);
-      return;
+      ShimLoader.getHadoopShims().checkFileAccess(fs, statuses, actions);
+    }
+    else {
+      // Otherwise, try user impersonation. Current user must be configured to do user impersonation.
+      UserGroupInformation proxyUser = UserGroupInformation.createProxyUser(
+          user, UserGroupInformation.getLoginUser());
+      proxyUser.doAs(new PrivilegedExceptionAction<Object>() {
+        @Override
+        public Object run() throws Exception {
+          FileSystem fsAsUser = FileSystem.get(fs.getUri(), fs.getConf());
+          ShimLoader.getHadoopShims().checkFileAccess(fsAsUser, statuses, actions);
+          return null;
+        }
+      });
     }
-
-    // Otherwise, try user impersonation. Current user must be configured to do user impersonation.
-    UserGroupInformation proxyUser = UserGroupInformation.createProxyUser(
-        user, UserGroupInformation.getLoginUser());
-    proxyUser.doAs(new PrivilegedExceptionAction<Object>() {
-      @Override
-      public Object run() throws Exception {
-        FileSystem fsAsUser = FileSystem.get(fs.getUri(), fs.getConf());
-        ShimLoader.getHadoopShims().checkFileAccess(fsAsUser, stat, action);
-        return null;
-      }
-    });
   }
 
   /**
@@ -677,70 +709,91 @@ public final class FileUtils {
    * @param path
    * @param conf
    * @param user
-   * @throws AccessControlException
-   * @throws InterruptedException
    * @throws Exception
    */
-  public static void checkDeletePermission(Path path, Configuration conf, String user)
-      throws AccessControlException, InterruptedException, Exception {
-   // This requires ability to delete the given path.
-    // The following 2 conditions should be satisfied for this-
-    // 1. Write permissions on parent dir
-    // 2. If sticky bit is set on parent dir then one of following should be
-    // true
-    //   a. User is owner of the current dir/file
-    //   b. User is owner of the parent dir
-    //   Super users are also allowed to drop the file, but there is no good way of checking
-    //   if a user is a super user. Also super users running hive queries is not a common
-    //   use case. super users can also do a chown to be able to drop the file
+  public static void checkDeletePermission(Path path, Configuration conf, String user) throws  Exception {
 
     if(path == null) {
       // no file/dir to be deleted
       return;
     }
 
-    final FileSystem fs = path.getFileSystem(conf);
     // check user has write permissions on the parent dir
+    final FileSystem fs = path.getFileSystem(conf);
     FileStatus stat = null;
     try {
       stat = fs.getFileStatus(path);
     } catch (FileNotFoundException e) {
       // ignore
     }
+
     if (stat == null) {
       // no file/dir to be deleted
       return;
     }
-    FileUtils.checkFileAccessWithImpersonation(fs, stat, FsAction.WRITE, user);
+
+    checkDeletePermission(fs, Lists.newArrayList(stat), conf, user);
+  }
+
+  /**
+   * Checks if delete can be performed on given path by given user.
+   * If file does not exist it just returns without throwing an Exception
+   * @param fs The FileSystem instance
+   * @param fileStatuses The FileStatus instances for the paths being checked.
+   * @param conf Configuration, corresponding to the FileSystem.
+   * @param user The user, whose permission is to be checked.
+   * @throws Exception
+   */
+  public static void checkDeletePermission(FileSystem fs, Iterable<FileStatus> fileStatuses,
+                                           Configuration conf, String user) throws Exception {
+
+    // This requires ability to delete the given path.
+    // The following 2 conditions should be satisfied for this-
+    // 1. Write permissions on parent dir
+    // 2. If sticky bit is set on parent dir then one of following should be
+    // true
+    //   a. User is owner of the current dir/file
+    //   b. User is owner of the parent dir
+    FileUtils.checkFileAccessWithImpersonation(fs, fileStatuses.iterator(), EnumSet.of(FsAction.WRITE), user);
 
     HadoopShims shims = ShimLoader.getHadoopShims();
     if (!shims.supportStickyBit()) {
-      // not supports sticky bit
+      // No support for sticky-bit.
       return;
     }
 
-    // check if sticky bit is set on the parent dir
-    FileStatus parStatus = fs.getFileStatus(path.getParent());
-    if (!shims.hasStickyBit(parStatus.getPermission())) {
-      // no sticky bit, so write permission on parent dir is sufficient
-      // no further checks needed
-      return;
-    }
+    List<Path> allParentPaths =
+        Lists.newArrayList(
+            Iterators.transform(fileStatuses.iterator(), new Function<FileStatus, Path>() {
+              @Override
+              public Path apply(FileStatus input) {
+                return input.getPath().getParent();
+              }
+            })
+        );
+
+    Iterator<FileStatus> childStatusIterator = fileStatuses.iterator();
+    for (List<Path> parentPaths : Lists.partition(allParentPaths, getListStatusBatchSize(conf))) {
+      for (FileStatus parentFileStatus : fs.listStatus(parentPaths.toArray(new Path[parentPaths.size()]))) {
+        assert childStatusIterator.hasNext() : "Number of parent-file-statuses doesn't match children.";
+        FileStatus childFileStatus = childStatusIterator.next();
+        // Check sticky-bits on parent-dirs.
+        if (shims.hasStickyBit(parentFileStatus.getPermission())
+            && !parentFileStatus.getOwner().equals(user)
+            && !childFileStatus.getOwner().equals(user)) {
+          throw new IOException(String.format("Permission Denied: User %s can't delete %s because sticky bit is\""
+              + " set on the parent dir and user does not own this file or its parent\"", user, childFileStatus.getPath()));
+        }
+      } // for_each( parent_path );
+    } // for_each( batch_of_parentPaths );
 
-    // check if user is owner of parent dir
-    if (parStatus.getOwner().equals(user)) {
-      return;
-    }
+    assert !childStatusIterator.hasNext() : "Did not process all file-statuses.";
 
-    // check if user is owner of current dir/file
-    FileStatus childStatus = fs.getFileStatus(path);
-    if (childStatus.getOwner().equals(user)) {
-      return;
-    }
-    String msg = String.format("Permission Denied: User %s can't delete %s because sticky bit is"
-        + " set on the parent dir and user does not own this file or its parent", user, path);
-    throw new IOException(msg);
+  } // static void checkDeletePermission();
 
+  private static int getListStatusBatchSize(Configuration configuration) {
+    return HiveConf.getIntVar(configuration,
+        HiveConf.ConfVars.HIVE_AUTHORIZATION_HDFS_LIST_STATUS_BATCH_SIZE);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hive/blob/19886150/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index d208b88..f04ce82 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1623,6 +1623,13 @@ public class HiveConf extends Configuration {
         "of updating the original list means that you can append to the defaults\n" +
         "set by SQL standard authorization instead of replacing it entirely."),
 
+    HIVE_AUTHORIZATION_HDFS_LIST_STATUS_BATCH_SIZE(
+      "hive.authprovider.hdfs.liststatus.batch.size", 1000,
+      "Number of FileStatus objects to be queried for when listing files, for HDFS-based authorization.\n" +
+          "Note: If this exceeds dfs.ls.limit (as set in hdfs-site.xml), DFSClient might use the smaller value as \n" +
+          "the batch-size, internally."
+    ),
+
     HIVE_CLI_PRINT_HEADER("hive.cli.print.header", false, "Whether to print the names of the columns in query output."),
 
     HIVE_ERROR_ON_EMPTY_PARTITION("hive.error.on.empty.partition", false,

http://git-wip-us.apache.org/repos/asf/hive/blob/19886150/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/StorageBasedAuthorizationProvider.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/StorageBasedAuthorizationProvider.java b/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/StorageBasedAuthorizationProvider.java
index 8f81ef9..6a5c510 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/StorageBasedAuthorizationProvider.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/StorageBasedAuthorizationProvider.java
@@ -18,15 +18,20 @@
 
 package org.apache.hadoop.hive.ql.security.authorization;
 
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.security.AccessControlException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.EnumSet;
+import java.util.Iterator;
 import java.util.List;
 
 import javax.security.auth.login.LoginException;
 
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -35,6 +40,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.Warehouse;
@@ -63,7 +69,7 @@ import org.apache.hadoop.hive.ql.metadata.Table;
  * out to the parent directory recursively to determine its permissions till
  * it finds a parent that does exist.
  */
-public class StorageBasedAuthorizationProvider extends HiveAuthorizationProviderBase
+public class StorageBasedAuthorizationProvider extends HiveMultiPartitionAuthorizationProviderBase
     implements HiveMetastoreAuthorizationProvider {
 
   private Warehouse wh;
@@ -242,6 +248,89 @@ public class StorageBasedAuthorizationProvider extends HiveAuthorizationProvider
     }
   }
 
+  @Override
+  public void authorize(Table table, Iterable<Partition> partitions,
+                        Privilege[] requiredReadPrivileges, Privilege[] requiredWritePrivileges)
+       throws HiveException, AuthorizationException {
+
+    try {
+      class MustCheckTablePermissions { // For closure.
+        public boolean value = false;
+      }
+
+      final MustCheckTablePermissions mustCheckTablePermissions = new MustCheckTablePermissions();
+      final FileSystem fs = table.getDataLocation().getFileSystem(getConf());
+
+      // Get partition paths. Filter out null-partitions, and partitions without data-locations.
+      Iterator<Partition> nonNullPartitions
+               = Iterators.filter(partitions.iterator(), new Predicate<Partition>() {
+        @Override
+        public boolean apply(Partition partition) {
+          try {
+            boolean isValidPartitionPath = partition != null
+                                           && partition.getDataLocation() != null
+                                           && fs.exists(partition.getDataLocation());
+            mustCheckTablePermissions.value |= isValidPartitionPath;
+            return isValidPartitionPath;
+          }
+          catch (IOException exception){
+            throw new RuntimeException("Could not find location for partition: " + partition, exception);
+          }
+        }
+      });
+
+      if (mustCheckTablePermissions.value) {
+        // At least one partition was null, or had a non-existent path. So check table-permissions, once.
+        // Partition path can be null in the case of a new create partition - in this case,
+        // we try to default to checking the permissions of the parent table.
+        // Partition itself can also be null, in cases where this gets called as a generic
+        // catch-all call in cases like those with CTAS onto an unpartitioned table (see HIVE-1887)
+
+        // this should be the case only if this is a create partition.
+        // The privilege needed on the table should be ALTER_DATA, and not CREATE
+        authorize(table, new Privilege[]{}, new Privilege[]{Privilege.ALTER_DATA});
+      }
+
+
+      // authorize drops if there was a drop privilege requirement
+      // extract drop privileges
+      DropPrivilegeExtractor privExtractor = new DropPrivilegeExtractor(requiredReadPrivileges, requiredWritePrivileges);
+      requiredReadPrivileges = privExtractor.getReadReqPriv();
+      requiredWritePrivileges = privExtractor.getWriteReqPriv();
+      EnumSet<FsAction> actions = getFsActions(requiredReadPrivileges);
+      actions.addAll(getFsActions(requiredWritePrivileges));
+
+      ArrayList<Path> allPartitionPaths
+                = Lists.newArrayList(Iterators.transform(nonNullPartitions, new Function<Partition, Path>() {
+        @Override
+        public Path apply(Partition input) {
+          return input.getDataLocation();
+        }
+      }));
+
+      for (List<Path> partitionPaths : Lists.partition(allPartitionPaths, getListStatusBatchSize(getConf()))) {
+
+        List<FileStatus> fileStatuses = Arrays.asList(
+            fs.listStatus(partitionPaths.toArray(new Path[partitionPaths.size()])));
+
+        if (privExtractor.hasDropPrivilege) {
+          FileUtils.checkDeletePermission(fs, fileStatuses, getConf(), authenticator.getUserName());
+        }
+
+        checkPermissions(fs, fileStatuses.iterator(), actions, authenticator.getUserName());
+      }
+
+    }
+    catch (Exception exception) {
+      throw hiveException(exception);
+    }
+  }
+
+  private static int getListStatusBatchSize(Configuration configuration) {
+    return HiveConf.getIntVar(configuration,
+                              HiveConf.ConfVars.HIVE_AUTHORIZATION_HDFS_LIST_STATUS_BATCH_SIZE);
+  }
+
   private void checkDeletePermission(Path dataLocation, Configuration conf, String userName)
       throws HiveException {
     try {
@@ -388,17 +477,28 @@ public class StorageBasedAuthorizationProvider extends HiveAuthorizationProvider
   protected static void checkPermissions(final FileSystem fs, final FileStatus stat,
       final EnumSet<FsAction> actions, String user) throws IOException,
       AccessControlException, HiveException {
+    checkPermissions(fs, Iterators.singletonIterator(stat), actions, user);
+  }
+
+  @SuppressWarnings("deprecation")
+  protected static void checkPermissions(final FileSystem fs, Iterator<FileStatus> fileStatuses,
+                                         final EnumSet<FsAction> actions, String user)
+      throws IOException, AccessControlException, HiveException {
 
-    if (stat == null) {
-      // File named by path doesn't exist; nothing to validate.
-      return;
-    }
     FsAction checkActions = FsAction.NONE;
     for (FsAction action : actions) {
       checkActions = checkActions.or(action);
     }
+
+    Iterator<FileStatus> nonNullFileStatuses = Iterators.filter(fileStatuses, new Predicate<FileStatus>() {
+      @Override
+      public boolean apply(FileStatus fileStatus) {
+        return fileStatus != null;
+      }
+    });
+
     try {
-      FileUtils.checkFileAccessWithImpersonation(fs, stat, checkActions, user);
+      FileUtils.checkFileAccessWithImpersonation(fs, nonNullFileStatuses, EnumSet.of(checkActions), user);
     } catch (Exception err) {
       // fs.permission.AccessControlException removed by HADOOP-11356, but Hive users on older
       // Hadoop versions may still see this exception .. have to reference by name.

http://git-wip-us.apache.org/repos/asf/hive/blob/19886150/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
----------------------------------------------------------------------
diff --git a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
index d349068..4547baa 100644
--- a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
+++ b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
@@ -29,11 +29,11 @@ import java.security.AccessControlException;
 import java.security.NoSuchAlgorithmException;
 import java.util.ArrayList;
 import java.util.Comparator;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.TreeMap;
 
 import org.apache.commons.lang.StringUtils;
@@ -986,6 +986,33 @@ public class Hadoop23Shims extends HadoopShimsSecure {
     }
   }
 
+  @Override
+  public void checkFileAccess(FileSystem fs, Iterator<FileStatus> statuses, EnumSet<FsAction> actions)
+      throws IOException, AccessControlException, Exception {
+    try {
+      if (accessMethod == null) {
+        // Have to rely on Hive implementation of filesystem permission checks.
+        DefaultFileAccess.checkFileAccess(fs, statuses, actions);
+      }
+      else {
+        while (statuses.hasNext()) {
+          accessMethod.invoke(fs, statuses.next(), combine(actions));
+        }
+      }
+
+    } catch (Exception err) {
+      throw wrapAccessException(err);
+    }
+  }
+
+  private static FsAction combine(EnumSet<FsAction> actions) {
+    FsAction resultantAction = FsAction.NONE;
+    for (FsAction action : actions) {
+      resultantAction = resultantAction.or(action);
+    }
+    return resultantAction;
+  }
+
   /**
    * If there is an AccessException buried somewhere in the chain of failures, wrap the original
    * exception in an AccessException. Othewise just return the original exception.

http://git-wip-us.apache.org/repos/asf/hive/blob/19886150/shims/common/src/main/java/org/apache/hadoop/fs/DefaultFileAccess.java
----------------------------------------------------------------------
diff --git a/shims/common/src/main/java/org/apache/hadoop/fs/DefaultFileAccess.java b/shims/common/src/main/java/org/apache/hadoop/fs/DefaultFileAccess.java
index 45ca210..c4261cb 100644
--- a/shims/common/src/main/java/org/apache/hadoop/fs/DefaultFileAccess.java
+++ b/shims/common/src/main/java/org/apache/hadoop/fs/DefaultFileAccess.java
@@ -18,23 +18,22 @@
 
 package org.apache.hadoop.fs;
 
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.security.AccessControlException;
-import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.EnumSet;
+import java.util.Iterator;
 import java.util.List;
 
 import javax.security.auth.login.LoginException;
 
+import com.google.common.collect.Iterators;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.hive.shims.Utils;
 import org.apache.hadoop.security.UserGroupInformation;
 
@@ -47,7 +46,7 @@ public class DefaultFileAccess {
 
   private static Log LOG = LogFactory.getLog(DefaultFileAccess.class);
 
-  private static List<String> emptyGroups = new ArrayList<String>(0);
+  private static List<String> emptyGroups = Collections.emptyList();
 
   public static void checkFileAccess(FileSystem fs, FileStatus stat, FsAction action)
       throws IOException, AccessControlException, LoginException {
@@ -60,34 +59,62 @@ public class DefaultFileAccess {
 
   public static void checkFileAccess(FileSystem fs, FileStatus stat, FsAction action,
       String user, List<String> groups) throws IOException, AccessControlException {
+    checkFileAccess(fs, Iterators.singletonIterator(stat), EnumSet.of(action), user, groups);
+  }
+
+  public static void checkFileAccess(FileSystem fs, Iterator<FileStatus> statuses, EnumSet<FsAction> actions,
+                                     String user, List<String> groups)
+    throws IOException, AccessControlException {
 
     if (groups == null) {
       groups = emptyGroups;
     }
 
+    // Short-circuit for super-users.
     String superGroupName = getSuperGroupName(fs.getConf());
     if (userBelongsToSuperGroup(superGroupName, groups)) {
       LOG.info("User \"" + user + "\" belongs to super-group \"" + superGroupName + "\". " +
-          "Permission granted for action: " + action + ".");
+          "Permission granted for actions: " + actions + ".");
       return;
     }
 
-    final FsPermission dirPerms = stat.getPermission();
-    final String grp = stat.getGroup();
+    while (statuses.hasNext()) {
 
-    if (user.equals(stat.getOwner())) {
-      if (dirPerms.getUserAction().implies(action)) {
-        return;
-      }
-    } else if (groups.contains(grp)) {
-      if (dirPerms.getGroupAction().implies(action)) {
-        return;
+      FileStatus stat = statuses.next();
+      final FsPermission dirPerms = stat.getPermission();
+      final String grp = stat.getGroup();
+
+      FsAction combinedAction = combine(actions);
+      if (user.equals(stat.getOwner())) {
+        if (dirPerms.getUserAction().implies(combinedAction)) {
+          continue;
+        }
+      } else if (groups.contains(grp)) {
+        if (dirPerms.getGroupAction().implies(combinedAction)) {
+          continue;
+        }
+      } else if (dirPerms.getOtherAction().implies(combinedAction)) {
+        continue;
       }
-    } else if (dirPerms.getOtherAction().implies(action)) {
-      return;
+
+      throw new AccessControlException("action " + combinedAction + " not permitted on path "
+          + stat.getPath() + " for user " + user);
+
+    } // for_each(fileStatus);
+  }
+
+  private static FsAction combine(EnumSet<FsAction> actions) {
+    FsAction resultantAction = FsAction.NONE;
+    for (FsAction action : actions) {
+      resultantAction = resultantAction.or(action);
     }
-    throw new AccessControlException("action " + action + " not permitted on path "
-        + stat.getPath() + " for user " + user);
+    return resultantAction;
+  }
+
+  public static void checkFileAccess(FileSystem fs, Iterator<FileStatus> statuses, EnumSet<FsAction> actions)
+    throws IOException, AccessControlException, LoginException {
+    UserGroupInformation ugi = Utils.getUGI();
+    checkFileAccess(fs, statuses, actions, ugi.getShortUserName(), Arrays.asList(ugi.getGroupNames()));
   }
 
   private static String getSuperGroupName(Configuration configuration) {

http://git-wip-us.apache.org/repos/asf/hive/blob/19886150/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
----------------------------------------------------------------------
diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
index 5a6bc44..4b79d95 100644
--- a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
+++ b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
@@ -24,19 +24,17 @@ import java.net.URI;
 import java.nio.ByteBuffer;
 import java.security.AccessControlException;
 import java.security.NoSuchAlgorithmException;
-import java.security.PrivilegedExceptionAction;
 import java.util.Comparator;
+import java.util.EnumSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.TreeMap;
 
-import javax.security.auth.login.LoginException;
 
 import com.google.common.annotations.VisibleForTesting;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -47,7 +45,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hive.shims.HadoopShims.StoragePolicyValue;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.mapred.ClusterStatus;
 import org.apache.hadoop.mapred.JobConf;
@@ -530,6 +527,21 @@ public interface HadoopShims {
       throws IOException, AccessControlException, Exception;
 
   /**
+   * Check if the configured UGI has access to the path for the given file system action.
+   * Method will return successfully if action is permitted. AccessControlExceptoin will
+   * be thrown if user does not have access to perform the action. Other exceptions may
+   * be thrown for non-access related errors.
+   * @param fs The FileSystem instance
+   * @param statuses The FileStatuses for the paths being checked
+   * @param actions The FsActions being checked
+   * @throws IOException
+   * @throws AccessControlException
+   * @throws Exception
+   */
+  public void checkFileAccess(FileSystem fs, Iterator<FileStatus> statuses, EnumSet<FsAction> actions)
+      throws Exception;
+
+  /**
    * Use password API (if available) to fetch credentials/password
    * @param conf
    * @param name

http://git-wip-us.apache.org/repos/asf/hive/blob/19886150/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java
----------------------------------------------------------------------
diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java
index 89d7798..8e51c02 100644
--- a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java
+++ b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java
@@ -25,7 +25,9 @@ import java.net.URI;
 import java.security.AccessControlException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.Set;
 
 import org.apache.commons.lang.ArrayUtils;
@@ -391,5 +393,11 @@ public abstract class HadoopShimsSecure implements HadoopShims {
   }
 
   @Override
+  public void checkFileAccess(FileSystem fs, Iterator<FileStatus> statuses, EnumSet<FsAction> action)
+      throws IOException, AccessControlException, Exception {
+    DefaultFileAccess.checkFileAccess(fs, statuses, action);
+  }
+
+  @Override
   abstract public void addDelegationTokens(FileSystem fs, Credentials cred, String uname) throws IOException;
 }