You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ma...@apache.org on 2019/03/11 11:45:12 UTC
[hive] branch master updated: HIVE-21325 : Hive external table
replication failed with Permission denied issue. (Mahesh Kumar Behera,
reviewed by Sankar Hariappan)
This is an automated email from the ASF dual-hosted git repository.
mahesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 4274f48 HIVE-21325 : Hive external table replication failed with Permission denied issue. (Mahesh Kumar Behera, reviewed by Sankar Hariappan)
4274f48 is described below
commit 4274f48a250db9624a48421147fa5700dc255e86
Author: Mahesh Kumar Behera <mb...@hortonworks.com>
AuthorDate: Mon Mar 11 17:02:28 2019 +0530
HIVE-21325 : Hive external table replication failed with Permission denied issue. (Mahesh Kumar Behera, reviewed by Sankar Hariappan)
---
...BasedMetastoreAuthorizationProviderWithACL.java | 11 ++-
.../TestMetastoreAuthorizationProvider.java | 18 ++++
.../ql/exec/repl/ExternalTableCopyTaskBuilder.java | 107 +++++++++++++++++++--
.../hadoop/hive/ql/exec/repl/ReplLoadTask.java | 49 ++++------
.../StorageBasedAuthorizationProvider.java | 22 +++++
.../hadoop/hive/metastore/HiveMetaStore.java | 2 +-
6 files changed, 165 insertions(+), 44 deletions(-)
diff --git a/itests/hive-unit-hadoop2/src/test/java/org/apache/hadoop/hive/ql/security/TestStorageBasedMetastoreAuthorizationProviderWithACL.java b/itests/hive-unit-hadoop2/src/test/java/org/apache/hadoop/hive/ql/security/TestStorageBasedMetastoreAuthorizationProviderWithACL.java
index 62c109c..c9253c1 100644
--- a/itests/hive-unit-hadoop2/src/test/java/org/apache/hadoop/hive/ql/security/TestStorageBasedMetastoreAuthorizationProviderWithACL.java
+++ b/itests/hive-unit-hadoop2/src/test/java/org/apache/hadoop/hive/ql/security/TestStorageBasedMetastoreAuthorizationProviderWithACL.java
@@ -49,7 +49,12 @@ public class TestStorageBasedMetastoreAuthorizationProviderWithACL
protected static Path warehouseDir = null;
protected UserGroupInformation userUgi = null;
protected String testUserName = "test_user";
+ protected String proxyUserName = null;
+ @Override
+ protected String getProxyUserName() {
+ return proxyUserName;
+ }
@Override
protected boolean isTestEnabled() {
@@ -74,10 +79,10 @@ public class TestStorageBasedMetastoreAuthorizationProviderWithACL
// Hadoop FS ACLs do not work with LocalFileSystem, so set up MiniDFS.
HiveConf conf = super.createHiveConf();
- String currentUserName = Utils.getUGI().getShortUserName();
+ proxyUserName = Utils.getUGI().getShortUserName();
conf.set("dfs.namenode.acls.enabled", "true");
- conf.set("hadoop.proxyuser." + currentUserName + ".groups", "*");
- conf.set("hadoop.proxyuser." + currentUserName + ".hosts", "*");
+ conf.set("hadoop.proxyuser." + proxyUserName + ".groups", "*");
+ conf.set("hadoop.proxyuser." + proxyUserName + ".hosts", "*");
dfs = ShimLoader.getHadoopShims().getMiniDfs(conf, 4, true, null);
FileSystem fs = dfs.getFileSystem();
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/security/TestMetastoreAuthorizationProvider.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/security/TestMetastoreAuthorizationProvider.java
index 0e08e81..0fc677b 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/security/TestMetastoreAuthorizationProvider.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/security/TestMetastoreAuthorizationProvider.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
+import java.util.Collections;
import junit.framework.TestCase;
@@ -85,6 +86,10 @@ public class TestMetastoreAuthorizationProvider extends TestCase {
return new HiveConf(this.getClass());
}
+ protected String getProxyUserName() {
+ return null;
+ }
+
@Override
protected void setUp() throws Exception {
@@ -304,6 +309,19 @@ public class TestMetastoreAuthorizationProvider extends TestCase {
ret = driver.run("alter table "+tblName+" add partition (b='2011')");
assertEquals(0,ret.getResponseCode());
+ String proxyUserName = getProxyUserName();
+ if (proxyUserName != null) {
+ // for storage based authorization, user having proxy privilege should be allowed to do operation
+ // even if the file permission is not there.
+ InjectableDummyAuthenticator.injectUserName(proxyUserName);
+ InjectableDummyAuthenticator.injectGroupNames(Collections.singletonList(proxyUserName));
+ InjectableDummyAuthenticator.injectMode(true);
+ disallowCreateInTbl(tbl.getTableName(), proxyUserName, tbl.getSd().getLocation());
+ ret = driver.run("alter table "+tblName+" add partition (b='2012')");
+ assertEquals(0, ret.getResponseCode());
+ InjectableDummyAuthenticator.injectMode(false);
+ }
+
allowDropOnTable(tblName, userName, tbl.getSd().getLocation());
allowDropOnDb(dbName,userName,db.getLocationUri());
ret = driver.run("drop database if exists "+getTestDbName()+" cascade");
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ExternalTableCopyTaskBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ExternalTableCopyTaskBuilder.java
index efecdb8..d7eed2c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ExternalTableCopyTaskBuilder.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ExternalTableCopyTaskBuilder.java
@@ -33,8 +33,13 @@ import org.apache.hadoop.hive.shims.Utils;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import java.security.PrivilegedExceptionAction;
import java.io.Serializable;
+import java.io.FileNotFoundException;
+import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
@@ -68,6 +73,83 @@ public class ExternalTableCopyTaskBuilder {
private static final Logger LOG = LoggerFactory.getLogger(DirCopyTask.class);
private static final int MAX_COPY_RETRY = 5;
+ private boolean createAndSetPathOwner(Path destPath, Path sourcePath) throws IOException {
+ FileSystem targetFs = destPath.getFileSystem(conf);
+ boolean createdDir = false;
+ if (!targetFs.exists(destPath)) {
+ // target path is created even if the source path is missing, so that ddl task does not try to create it.
+ if (!targetFs.mkdirs(destPath)) {
+ throw new IOException(destPath + " is not a directory or unable to create one");
+ }
+ createdDir = true;
+ }
+
+ FileStatus status;
+ try {
+ status = sourcePath.getFileSystem(conf).getFileStatus(sourcePath);
+ } catch (FileNotFoundException e) {
+ // Don't delete target path created else ddl task will try to create it using user hive and may fail.
+ LOG.warn("source path missing " + sourcePath);
+ return false;
+ }
+ LOG.info("Setting permission for path dest {} from source {} owner {} : {} : {}",
+ destPath, sourcePath, status.getOwner(), status.getGroup(), status.getPermission());
+ destPath.getFileSystem(conf).setOwner(destPath, status.getOwner(), status.getGroup());
+ destPath.getFileSystem(conf).setPermission(destPath, status.getPermission());
+ return createdDir;
+ }
+
+ private boolean setTargetPathOwner(Path targetPath, Path sourcePath, String distCpDoAsUser)
+ throws IOException {
+ if (distCpDoAsUser == null) {
+ return createAndSetPathOwner(targetPath, sourcePath);
+ }
+ UserGroupInformation proxyUser = UserGroupInformation.createProxyUser(
+ distCpDoAsUser, UserGroupInformation.getLoginUser());
+ try {
+ Path finalTargetPath = targetPath;
+ Path finalSourcePath = sourcePath;
+ return proxyUser.doAs((PrivilegedExceptionAction<Boolean>) () ->
+ createAndSetPathOwner(finalTargetPath, finalSourcePath));
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+
+ private int handleException(Exception e, Path sourcePath, Path targetPath, int currentRetry) {
+ try {
+ if (!sourcePath.getFileSystem(conf).exists(sourcePath)) {
+ LOG.warn("Source path missing " + sourcePath, e);
+ return 0;
+ }
+ } catch (Exception ex) {
+ LOG.warn("Source path missing check failed" + sourcePath, ex);
+ }
+
+ if (currentRetry <= MAX_COPY_RETRY) {
+ LOG.warn("unable to copy {} to {}", sourcePath, targetPath, e);
+ } else {
+ LOG.error("unable to copy {} to {}", sourcePath, targetPath, e);
+ setException(e);
+ return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
+ }
+
+ int sleepTime = FileUtils.getSleepTime(currentRetry);
+ LOG.info("Sleep for " + sleepTime + " milliseconds before retry " + (currentRetry));
+ try {
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException timerEx) {
+ LOG.info("sleep interrupted", timerEx.getMessage());
+ }
+
+ try {
+ FileSystem.closeAllForUGI(Utils.getUGI());
+ } catch (Exception ex) {
+ LOG.error("unable to closeAllForUGI", ex);
+ }
+ return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
+ }
+
@Override
protected int execute(DriverContext driverContext) {
String distCpDoAsUser = conf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER);
@@ -79,13 +161,16 @@ public class ExternalTableCopyTaskBuilder {
targetPath = reservedRawPath(work.fullyQualifiedTargetPath.toUri());
}
int currentRetry = 0;
- while (currentRetry < MAX_COPY_RETRY) {
+ int error = 0;
+ while (currentRetry <= MAX_COPY_RETRY) {
try {
UserGroupInformation ugi = Utils.getUGI();
String currentUser = ugi.getShortUserName();
boolean usePrivilegedUser =
distCpDoAsUser != null && !currentUser.equals(distCpDoAsUser);
+ setTargetPathOwner(targetPath, sourcePath, usePrivilegedUser ? distCpDoAsUser : null);
+
// do we create a new conf and only here provide this additional option so that we get away from
// differences of data in two location for the same directories ?
// basically add distcp.options.delete to hiveconf new object ?
@@ -99,17 +184,14 @@ public class ExternalTableCopyTaskBuilder {
ShimLoader.getHadoopShims());
return 0;
} catch (Exception e) {
- if (++currentRetry < MAX_COPY_RETRY) {
- LOG.warn("unable to copy", e);
- } else {
- LOG.error("unable to copy {} to {}", sourcePath, targetPath, e);
- setException(e);
- return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
+ currentRetry++;
+ error = handleException(e, sourcePath, targetPath, currentRetry);
+ if (error == 0) {
+ return 0;
}
}
}
- LOG.error("should never come here ");
- return -1;
+ return error;
}
private static Path reservedRawPath(URI uri) {
@@ -119,13 +201,18 @@ public class ExternalTableCopyTaskBuilder {
@Override
public StageType getType() {
- return StageType.REPL_INCREMENTAL_LOAD;
+ return StageType.COPY;
}
@Override
public String getName() {
return "DIR_COPY_TASK";
}
+
+ @Override
+ public boolean canExecuteInParallel(){
+ return true;
+ }
}
@Explain(displayName = "HDFS Copy Operator", explainLevels = { Explain.Level.USER,
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
index b5e39b8..2309fc9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
@@ -130,7 +130,16 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
if (!iterator.hasNext() && constraintIterator.hasNext()) {
loadingConstraint = true;
}
- while ((iterator.hasNext() || (loadingConstraint && constraintIterator.hasNext())) && loadTaskTracker.canAddMoreTasks()) {
+ while ((iterator.hasNext() || (loadingConstraint && constraintIterator.hasNext()) ||
+ (work.getPathsToCopyIterator().hasNext())) && loadTaskTracker.canAddMoreTasks()) {
+ // First start the distcp tasks to copy the files related to external table. The distcp tasks should be
+ // started first to avoid ddl task trying to create table/partition directory. Distcp task creates these
+ // directory with proper permission and owner.
+ if (work.getPathsToCopyIterator().hasNext()) {
+ scope.rootTasks.addAll(new ExternalTableCopyTaskBuilder(work, conf).tasks(loadTaskTracker));
+ break;
+ }
+
BootstrapEvent next;
if (!loadingConstraint) {
next = iterator.next();
@@ -249,10 +258,6 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
}
}
- if (loadTaskTracker.canAddMoreTasks()) {
- scope.rootTasks.addAll(new ExternalTableCopyTaskBuilder(work, conf).tasks(loadTaskTracker));
- }
-
boolean addAnotherLoadTask = iterator.hasNext()
|| loadTaskTracker.hasReplicationState()
|| constraintIterator.hasNext()
@@ -265,7 +270,8 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
// Update last repl ID of the database only if the current dump is not incremental. If bootstrap
// is combined with incremental dump, it contains only tables to bootstrap. So, needn't change
// last repl ID of the database.
- if (!iterator.hasNext() && !constraintIterator.hasNext() && !work.isIncrementalLoad()) {
+ if (!iterator.hasNext() && !constraintIterator.hasNext() && !work.getPathsToCopyIterator().hasNext()
+ && !work.isIncrementalLoad()) {
loadTaskTracker.update(updateDatabaseLastReplID(maxTasks, context, scope));
work.updateDbEventState(null);
}
@@ -470,34 +476,17 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
}
List<Task<? extends Serializable>> childTasks = new ArrayList<>();
- int parallelism = conf.getIntVar(HiveConf.ConfVars.EXECPARALLETHREADNUMBER);
- // during incremental we will have no parallelism from replication tasks since they are event based
- // and hence are linear. To achieve parallelism we have to use copy tasks(which have no DAG) for
- // all threads except one, in execution phase.
int maxTasks = conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS);
- // If the total number of tasks that can be created are less than the parallelism we can achieve
- // do nothing since someone is working on 1950's machine. else try to achieve max parallelism
- int calculatedMaxNumOfTasks = 0, maxNumOfHDFSTasks = 0;
- if (maxTasks <= parallelism) {
- if (builder.hasMoreWork()) {
- calculatedMaxNumOfTasks = maxTasks;
- } else {
- maxNumOfHDFSTasks = maxTasks;
- }
+ // First start the distcp tasks to copy the files related to external table. The distcp tasks should be
+ // started first to avoid ddl task trying to create table/partition directory. Distcp task creates these
+ // directory with proper permission and owner.
+ TaskTracker tracker = new TaskTracker(maxTasks);
+ if (work.getPathsToCopyIterator().hasNext()) {
+ childTasks.addAll(new ExternalTableCopyTaskBuilder(work, conf).tasks(tracker));
} else {
- calculatedMaxNumOfTasks = maxTasks - parallelism + 1;
- maxNumOfHDFSTasks = parallelism - 1;
+ childTasks.add(builder.build(driverContext, getHive(), LOG, tracker));
}
- TaskTracker trackerForReplIncremental = new TaskTracker(calculatedMaxNumOfTasks);
- Task<? extends Serializable> incrementalLoadTaskRoot =
- builder.build(driverContext, getHive(), LOG, trackerForReplIncremental);
- // we are adding the incremental task first so that its always processed first,
- // followed by dir copy tasks if capacity allows.
- childTasks.add(incrementalLoadTaskRoot);
-
- TaskTracker trackerForCopy = new TaskTracker(maxNumOfHDFSTasks);
- childTasks.addAll(new ExternalTableCopyTaskBuilder(work, conf).tasks(trackerForCopy));
// Either the incremental has more work or the external table file copy has more paths to process.
// Once all the incremental events are applied and external tables file copies are done, enable
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/StorageBasedAuthorizationProvider.java b/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/StorageBasedAuthorizationProvider.java
index de55044..2a52e83 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/StorageBasedAuthorizationProvider.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/StorageBasedAuthorizationProvider.java
@@ -26,7 +26,10 @@ import java.util.List;
import javax.security.auth.login.LoginException;
+import org.apache.hadoop.hive.metastore.HiveMetaStore;
import org.apache.hadoop.hive.metastore.IHMSHandler;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils;
+import org.apache.hadoop.hive.metastore.utils.SecurityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -154,6 +157,20 @@ public class StorageBasedAuthorizationProvider extends HiveAuthorizationProvider
authorize(path, readRequiredPriv, writeRequiredPriv);
}
+ private static boolean userHasProxyPrivilege(String user, Configuration conf) {
+ try {
+ if (MetaStoreServerUtils.checkUserHasHostProxyPrivileges(user, conf,
+ HiveMetaStore.HMSHandler.getIPAddress())) {
+ LOG.info("user {} has host proxy privilege.", user);
+ return true;
+ }
+ } catch (Exception ex) {
+ // if can not decide on the proxy privilege status, then proceed with authorization check.
+ LOG.warn("Cannot obtain username to check for host proxy privilege", ex);
+ }
+ return false;
+ }
+
@Override
public void authorize(Table table, Privilege[] readRequiredPriv, Privilege[] writeRequiredPriv)
throws HiveException, AuthorizationException {
@@ -366,6 +383,11 @@ public class StorageBasedAuthorizationProvider extends HiveAuthorizationProvider
throw new IllegalArgumentException("path is null");
}
+ if (userHasProxyPrivilege(authenticator.getUserName(), conf)) {
+ LOG.info("Path authorization is skipped for path {}.", path);
+ return;
+ }
+
final FileSystem fs = path.getFileSystem(conf);
FileStatus pathStatus = FileUtils.getFileStatusOrNull(fs, path);
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index 41f399b..510be01 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -358,7 +358,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
auditLog.info("ugi={} ip={} cmd={} ", ugi.getUserName(), address, cmd);
}
- private static String getIPAddress() {
+ public static String getIPAddress() {
if (useSasl) {
if (saslServer != null && saslServer.getRemoteAddress() != null) {
return saslServer.getRemoteAddress().getHostAddress();