You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ma...@apache.org on 2019/03/11 11:45:12 UTC

[hive] branch master updated: HIVE-21325 : Hive external table replication failed with Permission denied issue. (Mahesh Kumar Behera, reviewed by Sankar Hariappan)

This is an automated email from the ASF dual-hosted git repository.

mahesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 4274f48  HIVE-21325 : Hive external table replication failed with Permission denied issue. (Mahesh Kumar Behera, reviewed by Sankar Hariappan)
4274f48 is described below

commit 4274f48a250db9624a48421147fa5700dc255e86
Author: Mahesh Kumar Behera <mb...@hortonworks.com>
AuthorDate: Mon Mar 11 17:02:28 2019 +0530

    HIVE-21325 : Hive external table replication failed with Permission denied issue. (Mahesh Kumar Behera, reviewed by Sankar Hariappan)
---
 ...BasedMetastoreAuthorizationProviderWithACL.java |  11 ++-
 .../TestMetastoreAuthorizationProvider.java        |  18 ++++
 .../ql/exec/repl/ExternalTableCopyTaskBuilder.java | 107 +++++++++++++++++++--
 .../hadoop/hive/ql/exec/repl/ReplLoadTask.java     |  49 ++++------
 .../StorageBasedAuthorizationProvider.java         |  22 +++++
 .../hadoop/hive/metastore/HiveMetaStore.java       |   2 +-
 6 files changed, 165 insertions(+), 44 deletions(-)

diff --git a/itests/hive-unit-hadoop2/src/test/java/org/apache/hadoop/hive/ql/security/TestStorageBasedMetastoreAuthorizationProviderWithACL.java b/itests/hive-unit-hadoop2/src/test/java/org/apache/hadoop/hive/ql/security/TestStorageBasedMetastoreAuthorizationProviderWithACL.java
index 62c109c..c9253c1 100644
--- a/itests/hive-unit-hadoop2/src/test/java/org/apache/hadoop/hive/ql/security/TestStorageBasedMetastoreAuthorizationProviderWithACL.java
+++ b/itests/hive-unit-hadoop2/src/test/java/org/apache/hadoop/hive/ql/security/TestStorageBasedMetastoreAuthorizationProviderWithACL.java
@@ -49,7 +49,12 @@ public class TestStorageBasedMetastoreAuthorizationProviderWithACL
   protected static Path warehouseDir = null;
   protected UserGroupInformation userUgi = null;
   protected String testUserName = "test_user";
+  protected String proxyUserName = null;
 
+  @Override
+  protected String getProxyUserName() {
+    return proxyUserName;
+  }
 
   @Override
   protected boolean isTestEnabled() {
@@ -74,10 +79,10 @@ public class TestStorageBasedMetastoreAuthorizationProviderWithACL
 
     // Hadoop FS ACLs do not work with LocalFileSystem, so set up MiniDFS.
     HiveConf conf = super.createHiveConf();
-    String currentUserName = Utils.getUGI().getShortUserName();
+    proxyUserName = Utils.getUGI().getShortUserName();
     conf.set("dfs.namenode.acls.enabled", "true");
-    conf.set("hadoop.proxyuser." + currentUserName + ".groups", "*");
-    conf.set("hadoop.proxyuser." + currentUserName + ".hosts", "*");
+    conf.set("hadoop.proxyuser." + proxyUserName + ".groups", "*");
+    conf.set("hadoop.proxyuser." + proxyUserName + ".hosts", "*");
     dfs = ShimLoader.getHadoopShims().getMiniDfs(conf, 4, true, null);
     FileSystem fs = dfs.getFileSystem();
 
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/security/TestMetastoreAuthorizationProvider.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/security/TestMetastoreAuthorizationProvider.java
index 0e08e81..0fc677b 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/security/TestMetastoreAuthorizationProvider.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/security/TestMetastoreAuthorizationProvider.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Collections;
 
 import junit.framework.TestCase;
 
@@ -85,6 +86,10 @@ public class TestMetastoreAuthorizationProvider extends TestCase {
     return new HiveConf(this.getClass());
   }
 
+  protected String getProxyUserName() {
+    return null;
+  }
+
   @Override
   protected void setUp() throws Exception {
 
@@ -304,6 +309,19 @@ public class TestMetastoreAuthorizationProvider extends TestCase {
     ret = driver.run("alter table "+tblName+" add partition (b='2011')");
     assertEquals(0,ret.getResponseCode());
 
+    String proxyUserName = getProxyUserName();
+    if (proxyUserName != null) {
+      // for storage based authorization, user having proxy privilege should be allowed to do operation
+      // even if the file permission is not there.
+      InjectableDummyAuthenticator.injectUserName(proxyUserName);
+      InjectableDummyAuthenticator.injectGroupNames(Collections.singletonList(proxyUserName));
+      InjectableDummyAuthenticator.injectMode(true);
+      disallowCreateInTbl(tbl.getTableName(), proxyUserName, tbl.getSd().getLocation());
+      ret = driver.run("alter table "+tblName+" add partition (b='2012')");
+      assertEquals(0, ret.getResponseCode());
+      InjectableDummyAuthenticator.injectMode(false);
+    }
+
     allowDropOnTable(tblName, userName, tbl.getSd().getLocation());
     allowDropOnDb(dbName,userName,db.getLocationUri());
     ret = driver.run("drop database if exists "+getTestDbName()+" cascade");
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ExternalTableCopyTaskBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ExternalTableCopyTaskBuilder.java
index efecdb8..d7eed2c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ExternalTableCopyTaskBuilder.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ExternalTableCopyTaskBuilder.java
@@ -33,8 +33,13 @@ import org.apache.hadoop.hive.shims.Utils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import java.security.PrivilegedExceptionAction;
 
 import java.io.Serializable;
+import java.io.FileNotFoundException;
+import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -68,6 +73,83 @@ public class ExternalTableCopyTaskBuilder {
     private static final Logger LOG = LoggerFactory.getLogger(DirCopyTask.class);
     private static final int MAX_COPY_RETRY = 5;
 
+    private boolean createAndSetPathOwner(Path destPath, Path sourcePath) throws IOException {
+      FileSystem targetFs = destPath.getFileSystem(conf);
+      boolean createdDir = false;
+      if (!targetFs.exists(destPath)) {
+        // target path is created even if the source path is missing, so that ddl task does not try to create it.
+        if (!targetFs.mkdirs(destPath)) {
+          throw new IOException(destPath + " is not a directory or unable to create one");
+        }
+        createdDir = true;
+      }
+
+      FileStatus status;
+      try {
+        status = sourcePath.getFileSystem(conf).getFileStatus(sourcePath);
+      } catch (FileNotFoundException e) {
+        // Don't delete target path created else ddl task will try to create it using user hive and may fail.
+        LOG.warn("source path missing " + sourcePath);
+        return false;
+      }
+      LOG.info("Setting permission for path dest {} from source {} owner {} : {} : {}",
+              destPath, sourcePath, status.getOwner(), status.getGroup(), status.getPermission());
+      destPath.getFileSystem(conf).setOwner(destPath, status.getOwner(), status.getGroup());
+      destPath.getFileSystem(conf).setPermission(destPath, status.getPermission());
+      return createdDir;
+    }
+
+    private boolean setTargetPathOwner(Path targetPath, Path sourcePath, String distCpDoAsUser)
+            throws IOException {
+      if (distCpDoAsUser == null) {
+        return createAndSetPathOwner(targetPath, sourcePath);
+      }
+      UserGroupInformation proxyUser = UserGroupInformation.createProxyUser(
+              distCpDoAsUser, UserGroupInformation.getLoginUser());
+      try {
+        Path finalTargetPath = targetPath;
+        Path finalSourcePath = sourcePath;
+        return proxyUser.doAs((PrivilegedExceptionAction<Boolean>) () ->
+                createAndSetPathOwner(finalTargetPath, finalSourcePath));
+      } catch (InterruptedException e) {
+        throw new IOException(e);
+      }
+    }
+
+    private int handleException(Exception e, Path sourcePath, Path targetPath, int currentRetry) {
+      try {
+        if (!sourcePath.getFileSystem(conf).exists(sourcePath)) {
+          LOG.warn("Source path missing " + sourcePath, e);
+          return 0;
+        }
+      } catch (Exception ex) {
+        LOG.warn("Source path missing check failed" + sourcePath, ex);
+      }
+
+      if (currentRetry <= MAX_COPY_RETRY) {
+        LOG.warn("unable to copy {} to {}", sourcePath, targetPath, e);
+      } else {
+        LOG.error("unable to copy {} to {}", sourcePath, targetPath, e);
+        setException(e);
+        return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
+      }
+
+      int sleepTime = FileUtils.getSleepTime(currentRetry);
+      LOG.info("Sleep for " + sleepTime + " milliseconds before retry " + (currentRetry));
+      try {
+        Thread.sleep(sleepTime);
+      } catch (InterruptedException timerEx) {
+        LOG.info("sleep interrupted", timerEx.getMessage());
+      }
+
+      try {
+        FileSystem.closeAllForUGI(Utils.getUGI());
+      } catch (Exception ex) {
+        LOG.error("unable to closeAllForUGI", ex);
+      }
+      return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
+    }
+
     @Override
     protected int execute(DriverContext driverContext) {
       String distCpDoAsUser = conf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER);
@@ -79,13 +161,16 @@ public class ExternalTableCopyTaskBuilder {
         targetPath = reservedRawPath(work.fullyQualifiedTargetPath.toUri());
       }
       int currentRetry = 0;
-      while (currentRetry < MAX_COPY_RETRY) {
+      int error = 0;
+      while (currentRetry <= MAX_COPY_RETRY) {
         try {
           UserGroupInformation ugi = Utils.getUGI();
           String currentUser = ugi.getShortUserName();
           boolean usePrivilegedUser =
               distCpDoAsUser != null && !currentUser.equals(distCpDoAsUser);
 
+          setTargetPathOwner(targetPath, sourcePath, usePrivilegedUser ? distCpDoAsUser : null);
+
           // do we create a new conf and only here provide this additional option so that we get away from
           // differences of data in two location for the same directories ?
           // basically add distcp.options.delete to hiveconf new object ?
@@ -99,17 +184,14 @@ public class ExternalTableCopyTaskBuilder {
               ShimLoader.getHadoopShims());
           return 0;
         } catch (Exception e) {
-          if (++currentRetry < MAX_COPY_RETRY) {
-            LOG.warn("unable to copy", e);
-          } else {
-            LOG.error("unable to copy {} to {}", sourcePath, targetPath, e);
-            setException(e);
-            return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
+          currentRetry++;
+          error = handleException(e, sourcePath, targetPath, currentRetry);
+          if (error == 0) {
+            return 0;
           }
         }
       }
-      LOG.error("should never come here ");
-      return -1;
+      return error;
     }
 
     private static Path reservedRawPath(URI uri) {
@@ -119,13 +201,18 @@ public class ExternalTableCopyTaskBuilder {
 
     @Override
     public StageType getType() {
-      return StageType.REPL_INCREMENTAL_LOAD;
+      return StageType.COPY;
     }
 
     @Override
     public String getName() {
       return "DIR_COPY_TASK";
     }
+
+    @Override
+    public boolean canExecuteInParallel(){
+      return true;
+    }
   }
 
   @Explain(displayName = "HDFS Copy Operator", explainLevels = { Explain.Level.USER,
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
index b5e39b8..2309fc9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
@@ -130,7 +130,16 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
       if (!iterator.hasNext() && constraintIterator.hasNext()) {
         loadingConstraint = true;
       }
-      while ((iterator.hasNext() || (loadingConstraint && constraintIterator.hasNext())) && loadTaskTracker.canAddMoreTasks()) {
+      while ((iterator.hasNext() || (loadingConstraint && constraintIterator.hasNext()) ||
+              (work.getPathsToCopyIterator().hasNext())) && loadTaskTracker.canAddMoreTasks()) {
+        // First start the distcp tasks to copy the files related to external table. The distcp tasks should be
+        // started first to avoid ddl task trying to create table/partition directory. Distcp task creates these
+        // directory with proper permission and owner.
+        if (work.getPathsToCopyIterator().hasNext()) {
+          scope.rootTasks.addAll(new ExternalTableCopyTaskBuilder(work, conf).tasks(loadTaskTracker));
+          break;
+        }
+
         BootstrapEvent next;
         if (!loadingConstraint) {
           next = iterator.next();
@@ -249,10 +258,6 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
         }
       }
 
-      if (loadTaskTracker.canAddMoreTasks()) {
-        scope.rootTasks.addAll(new ExternalTableCopyTaskBuilder(work, conf).tasks(loadTaskTracker));
-      }
-
       boolean addAnotherLoadTask = iterator.hasNext()
           || loadTaskTracker.hasReplicationState()
           || constraintIterator.hasNext()
@@ -265,7 +270,8 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
       // Update last repl ID of the database only if the current dump is not incremental. If bootstrap
       // is combined with incremental dump, it contains only tables to bootstrap. So, needn't change
       // last repl ID of the database.
-      if (!iterator.hasNext() && !constraintIterator.hasNext() && !work.isIncrementalLoad()) {
+      if (!iterator.hasNext() && !constraintIterator.hasNext() && !work.getPathsToCopyIterator().hasNext()
+              && !work.isIncrementalLoad()) {
         loadTaskTracker.update(updateDatabaseLastReplID(maxTasks, context, scope));
         work.updateDbEventState(null);
       }
@@ -470,34 +476,17 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
       }
 
       List<Task<? extends Serializable>> childTasks = new ArrayList<>();
-      int parallelism = conf.getIntVar(HiveConf.ConfVars.EXECPARALLETHREADNUMBER);
-      // during incremental we will have no parallelism from replication tasks since they are event based
-      // and hence are linear. To achieve parallelism we have to use copy tasks(which have no DAG) for
-      // all threads except one, in execution phase.
       int maxTasks = conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS);
 
-      // If the total number of tasks that can be created are less than the parallelism we can achieve
-      // do nothing since someone is working on 1950's machine. else try to achieve max parallelism
-      int calculatedMaxNumOfTasks = 0, maxNumOfHDFSTasks = 0;
-      if (maxTasks <= parallelism) {
-        if (builder.hasMoreWork()) {
-          calculatedMaxNumOfTasks = maxTasks;
-        } else {
-          maxNumOfHDFSTasks = maxTasks;
-        }
+      // First start the distcp tasks to copy the files related to external table. The distcp tasks should be
+      // started first to avoid ddl task trying to create table/partition directory. Distcp task creates these
+      // directory with proper permission and owner.
+      TaskTracker tracker = new TaskTracker(maxTasks);
+      if (work.getPathsToCopyIterator().hasNext()) {
+        childTasks.addAll(new ExternalTableCopyTaskBuilder(work, conf).tasks(tracker));
       } else {
-        calculatedMaxNumOfTasks = maxTasks - parallelism + 1;
-        maxNumOfHDFSTasks = parallelism - 1;
+        childTasks.add(builder.build(driverContext, getHive(), LOG, tracker));
       }
-      TaskTracker trackerForReplIncremental = new TaskTracker(calculatedMaxNumOfTasks);
-      Task<? extends Serializable> incrementalLoadTaskRoot =
-              builder.build(driverContext, getHive(), LOG, trackerForReplIncremental);
-      // we are adding the incremental task first so that its always processed first,
-      // followed by dir copy tasks if capacity allows.
-      childTasks.add(incrementalLoadTaskRoot);
-
-      TaskTracker trackerForCopy = new TaskTracker(maxNumOfHDFSTasks);
-      childTasks.addAll(new ExternalTableCopyTaskBuilder(work, conf).tasks(trackerForCopy));
 
       // Either the incremental has more work or the external table file copy has more paths to process.
       // Once all the incremental events are applied and external tables file copies are done, enable
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/StorageBasedAuthorizationProvider.java b/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/StorageBasedAuthorizationProvider.java
index de55044..2a52e83 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/StorageBasedAuthorizationProvider.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/StorageBasedAuthorizationProvider.java
@@ -26,7 +26,10 @@ import java.util.List;
 
 import javax.security.auth.login.LoginException;
 
+import org.apache.hadoop.hive.metastore.HiveMetaStore;
 import org.apache.hadoop.hive.metastore.IHMSHandler;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils;
+import org.apache.hadoop.hive.metastore.utils.SecurityUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -154,6 +157,20 @@ public class StorageBasedAuthorizationProvider extends HiveAuthorizationProvider
     authorize(path, readRequiredPriv, writeRequiredPriv);
   }
 
+  private static boolean userHasProxyPrivilege(String user, Configuration conf) {
+    try {
+      if (MetaStoreServerUtils.checkUserHasHostProxyPrivileges(user, conf,
+              HiveMetaStore.HMSHandler.getIPAddress())) {
+        LOG.info("user {} has host proxy privilege.", user);
+        return true;
+      }
+    } catch (Exception ex) {
+      // if can not decide on the proxy privilege status, then proceed with authorization check.
+      LOG.warn("Cannot obtain username to check for host proxy privilege", ex);
+    }
+    return false;
+  }
+
   @Override
   public void authorize(Table table, Privilege[] readRequiredPriv, Privilege[] writeRequiredPriv)
       throws HiveException, AuthorizationException {
@@ -366,6 +383,11 @@ public class StorageBasedAuthorizationProvider extends HiveAuthorizationProvider
       throw new IllegalArgumentException("path is null");
     }
 
+    if (userHasProxyPrivilege(authenticator.getUserName(), conf)) {
+      LOG.info("Path authorization is skipped for path {}.", path);
+      return;
+    }
+
     final FileSystem fs = path.getFileSystem(conf);
 
     FileStatus pathStatus = FileUtils.getFileStatusOrNull(fs, path);
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index 41f399b..510be01 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -358,7 +358,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       auditLog.info("ugi={}	ip={}	cmd={}	", ugi.getUserName(), address, cmd);
     }
 
-    private static String getIPAddress() {
+    public static String getIPAddress() {
       if (useSasl) {
         if (saslServer != null && saslServer.getRemoteAddress() != null) {
           return saslServer.getRemoteAddress().getHostAddress();