You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by we...@apache.org on 2017/05/24 23:52:18 UTC
[44/54] [abbrv] hive git commit: HIVE-16686 : repl invocations of
distcp needs additional handling (Sushanth Sowmyan, reviewed by Thejas Nair)
HIVE-16686 : repl invocations of distcp needs additional handling (Sushanth Sowmyan, reviewed by Thejas Nair)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1c891ad4
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1c891ad4
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1c891ad4
Branch: refs/heads/hive-14535
Commit: 1c891ad4e228aee22aae6c4f8ab572f8867ea441
Parents: dec96ca
Author: Sushanth Sowmyan <kh...@gmail.com>
Authored: Mon May 22 13:55:20 2017 -0700
Committer: Sushanth Sowmyan <kh...@gmail.com>
Committed: Wed May 24 09:38:10 2017 -0700
----------------------------------------------------------------------
.../apache/hadoop/hive/common/FileUtils.java | 42 +++++++--
.../org/apache/hadoop/hive/conf/HiveConf.java | 5 +
.../hadoop/hive/common/TestFileUtils.java | 22 +++++
.../hadoop/hive/ql/exec/ReplCopyTask.java | 15 ++-
shims/0.23/pom.xml | 10 +-
.../apache/hadoop/hive/shims/Hadoop23Shims.java | 46 +++++++++-
.../hadoop/hive/shims/TestHadoop23Shims.java | 96 ++++++++++++++++++++
.../apache/hadoop/hive/shims/HadoopShims.java | 14 +++
8 files changed, 234 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/1c891ad4/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
index 985fd8c..c0388f6 100644
--- a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
+++ b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
@@ -583,13 +583,23 @@ public final class FileUtils {
* Copies files between filesystems.
*/
public static boolean copy(FileSystem srcFS, Path src,
- FileSystem dstFS, Path dst,
- boolean deleteSource,
- boolean overwrite,
- HiveConf conf) throws IOException {
+ FileSystem dstFS, Path dst,
+ boolean deleteSource,
+ boolean overwrite,
+ HiveConf conf) throws IOException {
return copy(srcFS, src, dstFS, dst, deleteSource, overwrite, conf, ShimLoader.getHadoopShims());
}
+ /**
+ * Copies files between filesystems as a fs super user using distcp, and runs
+ * as a privileged user.
+ */
+ public static boolean privilegedCopy(FileSystem srcFS, Path src, Path dst,
+ HiveConf conf) throws IOException {
+ String privilegedUser = conf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER);
+ return distCp(srcFS, src, dst, false, privilegedUser, conf, ShimLoader.getHadoopShims());
+ }
+
@VisibleForTesting
static boolean copy(FileSystem srcFS, Path src,
FileSystem dstFS, Path dst,
@@ -612,18 +622,34 @@ public final class FileUtils {
HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES) + ")");
LOG.info("Launch distributed copy (distcp) job.");
triedDistcp = true;
- copied = shims.runDistCp(src, dst, conf);
- if (copied && deleteSource) {
- srcFS.delete(src, true);
- }
+ copied = distCp(srcFS, src, dst, deleteSource, null, conf, shims);
}
}
if (!triedDistcp) {
+ // Note : Currently, this implementation does not "fall back" to regular copy if distcp
+ // is tried and it fails. We depend upon that behaviour in cases like replication,
+ // wherein if distcp fails, there is good reason to not plod along with a trivial
+ // implementation, and fail instead.
copied = FileUtil.copy(srcFS, src, dstFS, dst, deleteSource, overwrite, conf);
}
return copied;
}
+ static boolean distCp(FileSystem srcFS, Path src, Path dst,
+ boolean deleteSource, String doAsUser,
+ HiveConf conf, HadoopShims shims) throws IOException {
+ boolean copied = false;
+ if (doAsUser == null){
+ copied = shims.runDistCp(src, dst, conf);
+ } else {
+ copied = shims.runDistCpAs(src, dst, conf, doAsUser);
+ }
+ if (copied && deleteSource) {
+ srcFS.delete(src, true);
+ }
+ return copied;
+ }
+
/**
* Move a particular file or directory to the trash.
* @param fs FileSystem to use
http://git-wip-us.apache.org/repos/asf/hive/blob/1c891ad4/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 06332ac..2dfc8b6 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2611,6 +2611,10 @@ public class HiveConf extends Configuration {
HIVE_SERVER2_ENABLE_DOAS("hive.server2.enable.doAs", true,
"Setting this property to true will have HiveServer2 execute\n" +
"Hive operations as the user making the calls to it."),
+ HIVE_DISTCP_DOAS_USER("hive.distcp.privileged.doAs","hdfs",
+ "This property allows privileged distcp executions done by hive\n" +
+ "to run as this user. Typically, it should be the user you\n" +
+ "run the namenode as, such as the 'hdfs' user."),
HIVE_SERVER2_TABLE_TYPE_MAPPING("hive.server2.table.type.mapping", "CLASSIC", new StringSet("CLASSIC", "HIVE"),
"This setting reflects how HiveServer2 will report the table types for JDBC and other\n" +
"client implementations that retrieve the available tables and supported table types\n" +
@@ -3401,6 +3405,7 @@ public class HiveConf extends Configuration {
"hive.security.authenticator.manager,hive.security.authorization.manager," +
"hive.security.metastore.authorization.manager,hive.security.metastore.authenticator.manager," +
"hive.users.in.admin.role,hive.server2.xsrf.filter.enabled,hive.security.authorization.enabled," +
+ "hive.distcp.privileged.doAs," +
"hive.server2.authentication.ldap.baseDN," +
"hive.server2.authentication.ldap.url," +
"hive.server2.authentication.ldap.Domain," +
http://git-wip-us.apache.org/repos/asf/hive/blob/1c891ad4/common/src/test/org/apache/hadoop/hive/common/TestFileUtils.java
----------------------------------------------------------------------
diff --git a/common/src/test/org/apache/hadoop/hive/common/TestFileUtils.java b/common/src/test/org/apache/hadoop/hive/common/TestFileUtils.java
index adc9b0c..d3c8761 100644
--- a/common/src/test/org/apache/hadoop/hive/common/TestFileUtils.java
+++ b/common/src/test/org/apache/hadoop/hive/common/TestFileUtils.java
@@ -228,4 +228,26 @@ public class TestFileUtils {
Assert.assertTrue(FileUtils.copy(mockFs, copySrc, mockFs, copyDst, false, false, conf, shims));
verify(shims).runDistCp(copySrc, copyDst, conf);
}
+
+ @Test
+ public void testCopyWithDistCpAs() throws IOException {
+ Path copySrc = new Path("copySrc");
+ Path copyDst = new Path("copyDst");
+ HiveConf conf = new HiveConf(TestFileUtils.class);
+
+ FileSystem fs = copySrc.getFileSystem(conf);
+
+ String doAsUser = conf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER);
+
+ HadoopShims shims = mock(HadoopShims.class);
+ when(shims.runDistCpAs(copySrc, copyDst, conf, doAsUser)).thenReturn(true);
+ when(shims.runDistCp(copySrc, copyDst, conf)).thenReturn(false);
+
+ // doAs when asked
+ Assert.assertTrue(FileUtils.distCp(fs, copySrc, copyDst, true, doAsUser, conf, shims));
+ verify(shims).runDistCpAs(copySrc, copyDst, conf, doAsUser);
+ // don't doAs when not asked
+ Assert.assertFalse(FileUtils.distCp(fs, copySrc, copyDst, true, null, conf, shims));
+ verify(shims).runDistCp(copySrc, copyDst, conf);
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/1c891ad4/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
index d2f9e79..f277284 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
@@ -139,10 +139,7 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable {
if (!rwork.getListFilesOnOutputBehaviour(oneSrc)){
LOG.debug("ReplCopyTask :cp:" + oneSrc.getPath() + "=>" + toPath);
- if (!FileUtils.copy(actualSrcFs, oneSrc.getPath(), dstFs, toPath,
- false, // delete source
- true, // overwrite destination
- conf)) {
+ if (!doCopy(toPath, dstFs, oneSrc.getPath(), actualSrcFs)) {
console.printError("Failed to copy: '" + oneSrc.getPath().toString()
+ "to: '" + toPath.toString() + "'");
return 1;
@@ -169,6 +166,16 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable {
}
}
+ private boolean doCopy(Path dst, FileSystem dstFs, Path src, FileSystem srcFs) throws IOException {
+ if (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST)){
+ // regular copy in test env.
+ return FileUtils.copy(srcFs, src, dstFs, dst, false, true, conf);
+ } else {
+ // distcp in actual deployment with privilege escalation
+ return FileUtils.privilegedCopy(srcFs, src, dst, conf);
+ }
+ }
+
private List<FileStatus> filesInFileListing(FileSystem fs, Path path)
throws IOException {
http://git-wip-us.apache.org/repos/asf/hive/blob/1c891ad4/shims/0.23/pom.xml
----------------------------------------------------------------------
diff --git a/shims/0.23/pom.xml b/shims/0.23/pom.xml
index 7c586fa..3ff1d38 100644
--- a/shims/0.23/pom.xml
+++ b/shims/0.23/pom.xml
@@ -205,6 +205,14 @@
<version>${hadoop.version}</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
-
+ <build>
+ <sourceDirectory>${basedir}/src/main/java</sourceDirectory>
+ <testSourceDirectory>${basedir}/src/main/test</testSourceDirectory>
+ </build>
</project>
http://git-wip-us.apache.org/repos/asf/hive/blob/1c891ad4/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
----------------------------------------------------------------------
diff --git a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
index 0483e91..4319bed 100644
--- a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
+++ b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
@@ -27,6 +27,7 @@ import java.net.URI;
import java.nio.ByteBuffer;
import java.security.AccessControlException;
import java.security.NoSuchAlgorithmException;
+import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
@@ -37,6 +38,8 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import javax.security.auth.Subject;
+import javax.security.auth.login.LoginException;
+
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.key.KeyProvider;
@@ -1081,16 +1084,53 @@ public class Hadoop23Shims extends HadoopShimsSecure {
}
}
+ private static final String DISTCP_OPTIONS_PREFIX = "distcp.options.";
+
+ List<String> constructDistCpParams(Path src, Path dst, Configuration conf) {
+ List<String> params = new ArrayList<String>();
+ for (Map.Entry<String,String> entry : conf.getPropsWithPrefix(DISTCP_OPTIONS_PREFIX).entrySet()){
+ String distCpOption = entry.getKey();
+ String distCpVal = entry.getValue();
+ params.add("-" + distCpOption);
+ if ((distCpVal != null) && (!distCpVal.isEmpty())){
+ params.add(distCpVal);
+ }
+ }
+ if (params.size() == 0){
+ // if no entries were added via conf, we initiate our defaults
+ params.add("-update");
+ params.add("-skipcrccheck");
+ }
+ params.add(src.toString());
+ params.add(dst.toString());
+ return params;
+ }
+
@Override
- public boolean runDistCp(Path src, Path dst, Configuration conf) throws IOException {
+ public boolean runDistCpAs(Path src, Path dst, Configuration conf, String doAsUser) throws IOException {
+ UserGroupInformation proxyUser = UserGroupInformation.createProxyUser(
+ doAsUser, UserGroupInformation.getLoginUser());
+ try {
+ return proxyUser.doAs(new PrivilegedExceptionAction<Boolean>() {
+ @Override
+ public Boolean run() throws Exception {
+ return runDistCp(src, dst, conf);
+ }
+ });
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+ @Override
+ public boolean runDistCp(Path src, Path dst, Configuration conf) throws IOException {
DistCpOptions options = new DistCpOptions(Collections.singletonList(src), dst);
options.setSyncFolder(true);
options.setSkipCRC(true);
options.preserve(FileAttribute.BLOCKSIZE);
// Creates the command-line parameters for distcp
- String[] params = {"-update", "-skipcrccheck", src.toString(), dst.toString()};
+ List<String> params = constructDistCpParams(src, dst, conf);
try {
conf.setBoolean("mapred.mapper.new-api", true);
@@ -1098,7 +1138,7 @@ public class Hadoop23Shims extends HadoopShimsSecure {
// HIVE-13704 states that we should use run() instead of execute() due to a hadoop known issue
// added by HADOOP-10459
- if (distcp.run(params) == 0) {
+ if (distcp.run(params.toArray(new String[0])) == 0) {
return true;
} else {
return false;
http://git-wip-us.apache.org/repos/asf/hive/blob/1c891ad4/shims/0.23/src/main/test/org/apache/hadoop/hive/shims/TestHadoop23Shims.java
----------------------------------------------------------------------
diff --git a/shims/0.23/src/main/test/org/apache/hadoop/hive/shims/TestHadoop23Shims.java b/shims/0.23/src/main/test/org/apache/hadoop/hive/shims/TestHadoop23Shims.java
new file mode 100644
index 0000000..ba1086c
--- /dev/null
+++ b/shims/0.23/src/main/test/org/apache/hadoop/hive/shims/TestHadoop23Shims.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.shims;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.tools.DistCpOptions;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+public class TestHadoop23Shims {
+
+ @Test
+ public void testConstructDistCpParams() {
+ Path copySrc = new Path("copySrc");
+ Path copyDst = new Path("copyDst");
+ Configuration conf = new Configuration();
+
+ Hadoop23Shims shims = new Hadoop23Shims();
+ List<String> paramsDefault = shims.constructDistCpParams(copySrc, copyDst, conf);
+
+ assertEquals(4, paramsDefault.size());
+ assertTrue("Distcp -update set by default", paramsDefault.contains("-update"));
+ assertTrue("Distcp -skipcrccheck set by default", paramsDefault.contains("-skipcrccheck"));
+ assertEquals(copySrc.toString(), paramsDefault.get(2));
+ assertEquals(copyDst.toString(), paramsDefault.get(3));
+
+ conf.set("distcp.options.foo", "bar"); // should set "-foo bar"
+ conf.set("distcp.options.blah", ""); // should set "-blah"
+ conf.set("dummy", "option"); // should be ignored.
+ List<String> paramsWithCustomParamInjection =
+ shims.constructDistCpParams(copySrc, copyDst, conf);
+
+ assertEquals(5, paramsWithCustomParamInjection.size());
+
+ // check that the defaults did not remain.
+ assertTrue("Distcp -update not set if not requested",
+ !paramsWithCustomParamInjection.contains("-update"));
+ assertTrue("Distcp -skipcrccheck not set if not requested",
+ !paramsWithCustomParamInjection.contains("-skipcrccheck"));
+
+ // the "-foo bar" and "-blah" params order is not guaranteed
+ String firstParam = paramsWithCustomParamInjection.get(0);
+ if (firstParam.equals("-foo")){
+ // "-foo bar -blah" form
+ assertEquals("bar", paramsWithCustomParamInjection.get(1));
+ assertEquals("-blah", paramsWithCustomParamInjection.get(2));
+ } else {
+ // "-blah -foo bar" form
+ assertEquals("-blah", paramsWithCustomParamInjection.get(0));
+ assertEquals("-foo", paramsWithCustomParamInjection.get(1));
+ assertEquals("bar", paramsWithCustomParamInjection.get(2));
+ }
+
+ // the dummy option should not have made it either - only options
+ // beginning with distcp.options. should be honoured
+ assertTrue(!paramsWithCustomParamInjection.contains("dummy"));
+ assertTrue(!paramsWithCustomParamInjection.contains("-dummy"));
+ assertTrue(!paramsWithCustomParamInjection.contains("option"));
+ assertTrue(!paramsWithCustomParamInjection.contains("-option"));
+
+ assertEquals(copySrc.toString(), paramsWithCustomParamInjection.get(3));
+ assertEquals(copyDst.toString(), paramsWithCustomParamInjection.get(4));
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/1c891ad4/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
----------------------------------------------------------------------
diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
index c280d49..d08ad04 100644
--- a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
+++ b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
@@ -479,6 +479,20 @@ public interface HadoopShims {
/**
* Copies a source dir/file to a destination by orchestrating the copy between hdfs nodes.
* This distributed process is meant to copy huge files that could take some time if a single
+ * copy is done. This is a variation which allows proxying as a different user to perform
+ * the distcp, and requires that the caller have requisite proxy user privileges.
+ *
+ * @param src Path to the source file or directory to copy
+ * @param dst Path to the destination file or directory
+ * @param conf The hadoop configuration object
+ * @param doAsUser The user to perform the distcp as
+ * @return True if it is successfull; False otherwise.
+ */
+ public boolean runDistCpAs(Path src, Path dst, Configuration conf, String doAsUser) throws IOException;
+
+ /**
+ * Copies a source dir/file to a destination by orchestrating the copy between hdfs nodes.
+ * This distributed process is meant to copy huge files that could take some time if a single
* copy is done.
*
* @param src Path to the source file or directory to copy