You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by an...@apache.org on 2018/01/15 05:14:14 UTC
hive git commit: HIVE-18341: Add repl load support for adding "raw"
namespace for TDE with same encryption keys (Anishek Agarwal,
reviewed Daniel Dai)
Repository: hive
Updated Branches:
refs/heads/master 970d8c968 -> 22df53b6c
HIVE-18341: Add repl load support for adding "raw" namespace for TDE with same encryption keys (Anishek Agarwal, reviewed Daniel Dai)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/22df53b6
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/22df53b6
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/22df53b6
Branch: refs/heads/master
Commit: 22df53b6c223f03edee1a8c271a77e91d66bb2a1
Parents: 970d8c9
Author: Anishek Agarwal <an...@gmail.com>
Authored: Mon Jan 15 10:43:55 2018 +0530
Committer: Anishek Agarwal <an...@gmail.com>
Committed: Mon Jan 15 10:43:55 2018 +0530
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 4 +
.../TestReplicationOnHDFSEncryptedZones.java | 144 +++++++++++++++++++
.../hadoop/hive/ql/parse/WarehouseInstance.java | 36 ++++-
.../hadoop/hive/ql/parse/repl/CopyUtils.java | 78 +++++++---
4 files changed, 241 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/22df53b6/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 0564b50..631c836 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -470,6 +470,10 @@ public class HiveConf extends Configuration {
+ "metadata for acid tables which do not require the corresponding transaction \n"
+ "semantics to be applied on target. This can be removed when ACID table \n"
+ "replication is supported."),
+ //https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/TransparentEncryption.html#Running_as_the_superuser
+ REPL_ADD_RAW_RESERVED_NAMESPACE("hive.repl.add.raw.reserved.namespace", false,
+ "For TDE with same encryption keys on source and target, allow Distcp super user to access \n"
+ + "the raw bytes from filesystem without decrypting on source and then encrypting on target."),
LOCALSCRATCHDIR("hive.exec.local.scratchdir",
"${system:java.io.tmpdir}" + File.separator + "${system:user.name}",
"Local scratch space for Hive jobs"),
http://git-wip-us.apache.org/repos/asf/hive/blob/22df53b6/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOnHDFSEncryptedZones.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOnHDFSEncryptedZones.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOnHDFSEncryptedZones.java
new file mode 100644
index 0000000..fd05e99
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOnHDFSEncryptedZones.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.shims.Utils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTORE_AGGREGATE_STATS_CACHE_ENABLED;
+
+public class TestReplicationOnHDFSEncryptedZones {
+ private static String jksFile = System.getProperty("java.io.tmpdir") + "/test.jks";
+ @Rule
+ public final TestName testName = new TestName();
+
+ protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class);
+ private static WarehouseInstance primary;
+ private static String primaryDbName, replicatedDbName;
+ private static Configuration conf;
+ private static MiniDFSCluster miniDFSCluster;
+
+ @BeforeClass
+ public static void beforeClassSetup() throws Exception {
+ conf = new Configuration();
+ conf.set("dfs.client.use.datanode.hostname", "true");
+ conf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*");
+ conf.set("hadoop.security.key.provider.path", "jceks://file" + jksFile);
+ conf.setBoolean("dfs.namenode.delegation.token.always-use", true);
+
+ conf.setLong(HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE.varname, 1);
+ conf.setLong(HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES.varname, 0);
+ conf.setBoolean(METASTORE_AGGREGATE_STATS_CACHE_ENABLED.varname, false);
+
+ miniDFSCluster =
+ new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build();
+
+ DFSTestUtil.createKey("test_key", miniDFSCluster, conf);
+ primary = new WarehouseInstance(LOG, miniDFSCluster, new HashMap<String, String>() {{
+ put(HiveConf.ConfVars.HIVE_IN_TEST.varname, "false");
+ put(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS.varname, "false");
+ }}, "test_key");
+ }
+
+ @AfterClass
+ public static void classLevelTearDown() throws IOException {
+ primary.close();
+ FileUtils.deleteQuietly(new File(jksFile));
+ }
+
+ @Before
+ public void setup() throws Throwable {
+ primaryDbName = testName.getMethodName() + "_" + +System.currentTimeMillis();
+ replicatedDbName = "replicated_" + primaryDbName;
+ primary.run("create database " + primaryDbName);
+ }
+
+ @Test
+ public void targetAndSourceHaveDifferentEncryptionZoneKeys() throws Throwable {
+ DFSTestUtil.createKey("test_key123", miniDFSCluster, conf);
+
+ WarehouseInstance replica = new WarehouseInstance(LOG, miniDFSCluster,
+ new HashMap<String, String>() {{
+ put(HiveConf.ConfVars.HIVE_IN_TEST.varname, "false");
+ put(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS.varname, "false");
+ }}, "test_key123");
+
+ WarehouseInstance.Tuple tuple =
+ primary.run("use " + primaryDbName)
+ .run("create table encrypted_table (id int, value string)")
+ .run("insert into table encrypted_table values (1,'value1')")
+ .run("insert into table encrypted_table values (2,'value2')")
+ .dump(primaryDbName, null);
+
+ replica
+ .run("repl load " + replicatedDbName + " from '" + tuple.dumpLocation
+ + "' with('hive.repl.add.raw.reserved.namespace'='true')")
+ .run("use " + replicatedDbName)
+ .run("repl status " + replicatedDbName)
+ .verifyResult(tuple.lastReplicationId)
+ .run("select value from encrypted_table")
+ .verifyFailure(new String[] { "value1", "value2" });
+ }
+
+ @Ignore("this is ignored as minidfs cluster as of writing this test looked like did not copy the "
+ + "files correctly")
+ @Test
+ public void targetAndSourceHaveSameEncryptionZoneKeys() throws Throwable {
+ WarehouseInstance replica = new WarehouseInstance(LOG, miniDFSCluster,
+ new HashMap<String, String>() {{
+ put(HiveConf.ConfVars.HIVE_IN_TEST.varname, "false");
+ put(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS.varname, "false");
+ put(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname,
+ UserGroupInformation.getCurrentUser().getUserName());
+ }}, "test_key");
+
+ WarehouseInstance.Tuple tuple =
+ primary.run("use " + primaryDbName)
+ .run("create table encrypted_table (id int, value string)")
+ .run("insert into table encrypted_table values (1,'value1')")
+ .run("insert into table encrypted_table values (2,'value2')")
+ .dump(primaryDbName, null);
+
+ replica
+ .run("repl load " + replicatedDbName + " from '" + tuple.dumpLocation
+ + "' with('hive.repl.add.raw.reserved.namespace'='true')")
+ .run("use " + replicatedDbName)
+ .run("repl status " + replicatedDbName)
+ .verifyResult(tuple.lastReplicationId)
+ .run("select value from encrypted_table")
+ .verifyResults(new String[] { "value1", "value2" });
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/22df53b6/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
index 0b46c04..0918d33 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
@@ -54,6 +54,7 @@ import java.util.Map;
import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -69,8 +70,8 @@ class WarehouseInstance implements Closeable {
private final static String LISTENER_CLASS = DbNotificationListener.class.getCanonicalName();
- WarehouseInstance(Logger logger, MiniDFSCluster cluster, Map<String, String> overridesForHiveConf)
- throws Exception {
+ WarehouseInstance(Logger logger, MiniDFSCluster cluster, Map<String, String> overridesForHiveConf,
+ String keyNameForEncryptedZone) throws Exception {
this.logger = logger;
this.miniDFSCluster = cluster;
assert miniDFSCluster.isClusterUp();
@@ -78,15 +79,28 @@ class WarehouseInstance implements Closeable {
DistributedFileSystem fs = miniDFSCluster.getFileSystem();
Path warehouseRoot = mkDir(fs, "/warehouse" + uniqueIdentifier);
+ if (StringUtils.isNotEmpty(keyNameForEncryptedZone)) {
+ fs.createEncryptionZone(warehouseRoot, keyNameForEncryptedZone);
+ }
Path cmRootPath = mkDir(fs, "/cmroot" + uniqueIdentifier);
this.functionsRoot = mkDir(fs, "/functions" + uniqueIdentifier).toString();
initialize(cmRootPath.toString(), warehouseRoot.toString(), overridesForHiveConf);
}
- WarehouseInstance(Logger logger, MiniDFSCluster cluster) throws Exception {
+ WarehouseInstance(Logger logger, MiniDFSCluster cluster, String keyNameForEncryptedZone)
+ throws Exception {
this(logger, cluster, new HashMap<String, String>() {{
put(HiveConf.ConfVars.HIVE_IN_TEST.varname, "true");
- }});
+ }}, keyNameForEncryptedZone);
+ }
+
+ WarehouseInstance(Logger logger, MiniDFSCluster cluster,
+ Map<String, String> overridesForHiveConf) throws Exception {
+ this(logger, cluster, overridesForHiveConf, null);
+ }
+
+ WarehouseInstance(Logger logger, MiniDFSCluster cluster) throws Exception {
+ this(logger, cluster, (String) null);
}
private void initialize(String cmRoot, String warehouseRoot,
@@ -236,6 +250,20 @@ class WarehouseInstance implements Closeable {
return this;
}
+ WarehouseInstance verifyFailure(String[] data) throws IOException {
+ List<String> results = getOutput();
+ logger.info("Expecting {}", StringUtils.join(data, ","));
+ logger.info("Got {}", results);
+ boolean dataMatched = (data.length == results.size());
+ if (dataMatched) {
+ for (int i = 0; i < data.length; i++) {
+ dataMatched &= data[i].toLowerCase().equals(results.get(i).toLowerCase());
+ }
+ }
+ assertFalse(dataMatched);
+ return this;
+ }
+
/**
* verify's result without regard for ordering.
*/
http://git-wip-us.apache.org/repos/asf/hive/blob/22df53b6/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
index f24d1b6..4e61280 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.parse.repl;
+import com.google.common.collect.Lists;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
@@ -31,25 +32,28 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Lists;
-
import javax.security.auth.login.LoginException;
import java.io.IOException;
+import java.net.URI;
+import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
public class CopyUtils {
private static final Logger LOG = LoggerFactory.getLogger(CopyUtils.class);
+ // https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/TransparentEncryption.html#Running_as_the_superuser
+ private static final String RAW_RESERVED_VIRTUAL_PATH = "/.reserved/raw/";
+ private static final int MAX_COPY_RETRY = 3;
private final HiveConf hiveConf;
private final long maxCopyFileSize;
private final long maxNumberOfFiles;
private final boolean hiveInTest;
private final String copyAsUser;
- private final int MAX_COPY_RETRY = 3;
public CopyUtils(String distCpDoAsUser, HiveConf hiveConf) {
this.hiveConf = hiveConf;
@@ -107,8 +111,7 @@ public class CopyUtils {
FileSystem destinationFs, Path destination,
boolean useRegularCopy) throws IOException, LoginException {
int repeat = 0;
- List<Path> pathList = Lists.transform(fileList,
- fileInfo -> { return fileInfo.getEffectivePath(); });
+ List<Path> pathList = Lists.transform(fileList, ReplChangeManager.FileInfo::getEffectivePath);
while (!pathList.isEmpty() && (repeat < MAX_COPY_RETRY)) {
try {
doCopyOnce(sourceFs, pathList, destinationFs, destination, useRegularCopy);
@@ -143,21 +146,62 @@ public class CopyUtils {
boolean useRegularCopy) throws IOException, LoginException {
UserGroupInformation ugi = Utils.getUGI();
String currentUser = ugi.getShortUserName();
- boolean usePrivilegedDistCp = copyAsUser != null && !currentUser.equals(copyAsUser);
+ boolean usePrivilegedUser = copyAsUser != null && !currentUser.equals(copyAsUser);
if (useRegularCopy) {
- Path[] paths = srcList.toArray(new Path[] {});
- FileUtil.copy(sourceFs, paths, destinationFs, destination, false, true, hiveConf);
+ doRegularCopyOnce(sourceFs, srcList, destinationFs, destination, usePrivilegedUser);
+ } else {
+ doDistCpCopyOnce(sourceFs, srcList, destination, usePrivilegedUser);
+ }
+ }
+
+ private void doDistCpCopyOnce(FileSystem sourceFs, List<Path> srcList, Path destination,
+ boolean usePrivilegedUser) throws IOException {
+ if (hiveConf.getBoolVar(HiveConf.ConfVars.REPL_ADD_RAW_RESERVED_NAMESPACE)) {
+ srcList = srcList.stream().map(path -> {
+ URI uri = path.toUri();
+ return new Path(uri.getScheme(), uri.getAuthority(),
+ RAW_RESERVED_VIRTUAL_PATH + uri.getPath());
+ }).collect(Collectors.toList());
+ URI destinationUri = destination.toUri();
+ destination = new Path(destinationUri.getScheme(), destinationUri.getAuthority(),
+ RAW_RESERVED_VIRTUAL_PATH + destinationUri.getPath());
+ hiveConf.set("distcp.options.px","");
+ }
+
+ FileUtils.distCp(
+ sourceFs, // source file system
+ srcList, // list of source paths
+ destination,
+ false,
+ usePrivilegedUser ? copyAsUser : null,
+ hiveConf,
+ ShimLoader.getHadoopShims()
+ );
+ }
+
+ private void doRegularCopyOnce(FileSystem sourceFs, List<Path> srcList, FileSystem destinationFs,
+ Path destination, boolean usePrivilegedUser) throws IOException {
+ /*
+ even for regular copy we have to use the same user permissions that distCp will use since
+ hive-server user might be different that the super user required to copy relevant files.
+ */
+ final Path[] paths = srcList.toArray(new Path[] {});
+ if (usePrivilegedUser) {
+ final Path finalDestination = destination;
+ UserGroupInformation proxyUser = UserGroupInformation.createProxyUser(
+ copyAsUser, UserGroupInformation.getLoginUser());
+ try {
+ proxyUser.doAs((PrivilegedExceptionAction<Boolean>) () -> {
+ FileUtil
+ .copy(sourceFs, paths, destinationFs, finalDestination, false, true, hiveConf);
+ return true;
+ });
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
} else {
- FileUtils.distCp(
- sourceFs, // source file system
- srcList, // list of source paths
- destination,
- false,
- usePrivilegedDistCp ? copyAsUser : null,
- hiveConf,
- ShimLoader.getHadoopShims()
- );
+ FileUtil.copy(sourceFs, paths, destinationFs, destination, false, true, hiveConf);
}
}