You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by an...@apache.org on 2018/01/15 05:14:14 UTC

hive git commit: HIVE-18341: Add repl load support for adding "raw" namespace for TDE with same encryption keys (Anishek Agarwal, reviewed Daniel Dai)

Repository: hive
Updated Branches:
  refs/heads/master 970d8c968 -> 22df53b6c


HIVE-18341: Add repl load support for adding "raw" namespace for TDE with same encryption keys (Anishek Agarwal, reviewed Daniel Dai)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/22df53b6
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/22df53b6
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/22df53b6

Branch: refs/heads/master
Commit: 22df53b6c223f03edee1a8c271a77e91d66bb2a1
Parents: 970d8c9
Author: Anishek Agarwal <an...@gmail.com>
Authored: Mon Jan 15 10:43:55 2018 +0530
Committer: Anishek Agarwal <an...@gmail.com>
Committed: Mon Jan 15 10:43:55 2018 +0530

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   4 +
 .../TestReplicationOnHDFSEncryptedZones.java    | 144 +++++++++++++++++++
 .../hadoop/hive/ql/parse/WarehouseInstance.java |  36 ++++-
 .../hadoop/hive/ql/parse/repl/CopyUtils.java    |  78 +++++++---
 4 files changed, 241 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/22df53b6/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 0564b50..631c836 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -470,6 +470,10 @@ public class HiveConf extends Configuration {
             + "metadata for acid tables which do not require the corresponding transaction \n"
             + "semantics to be applied on target. This can be removed when ACID table \n"
             + "replication is supported."),
+    //https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/TransparentEncryption.html#Running_as_the_superuser
+    REPL_ADD_RAW_RESERVED_NAMESPACE("hive.repl.add.raw.reserved.namespace", false,
+        "For TDE with same encryption keys on source and target, allow Distcp super user to access \n"
+            + "the raw bytes from filesystem without decrypting on source and then encrypting on target."),
     LOCALSCRATCHDIR("hive.exec.local.scratchdir",
         "${system:java.io.tmpdir}" + File.separator + "${system:user.name}",
         "Local scratch space for Hive jobs"),

http://git-wip-us.apache.org/repos/asf/hive/blob/22df53b6/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOnHDFSEncryptedZones.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOnHDFSEncryptedZones.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOnHDFSEncryptedZones.java
new file mode 100644
index 0000000..fd05e99
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOnHDFSEncryptedZones.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.shims.Utils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTORE_AGGREGATE_STATS_CACHE_ENABLED;
+
+public class TestReplicationOnHDFSEncryptedZones {
+  private static String jksFile = System.getProperty("java.io.tmpdir") + "/test.jks";
+  @Rule
+  public final TestName testName = new TestName();
+
+  protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class);
+  private static WarehouseInstance primary;
+  private static String primaryDbName, replicatedDbName;
+  private static Configuration conf;
+  private static MiniDFSCluster miniDFSCluster;
+
+  @BeforeClass
+  public static void beforeClassSetup() throws Exception {
+    conf = new Configuration();
+    conf.set("dfs.client.use.datanode.hostname", "true");
+    conf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*");
+    conf.set("hadoop.security.key.provider.path", "jceks://file" + jksFile);
+    conf.setBoolean("dfs.namenode.delegation.token.always-use", true);
+
+    conf.setLong(HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE.varname, 1);
+    conf.setLong(HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES.varname, 0);
+    conf.setBoolean(METASTORE_AGGREGATE_STATS_CACHE_ENABLED.varname, false);
+
+    miniDFSCluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build();
+
+    DFSTestUtil.createKey("test_key", miniDFSCluster, conf);
+    primary = new WarehouseInstance(LOG, miniDFSCluster, new HashMap<String, String>() {{
+      put(HiveConf.ConfVars.HIVE_IN_TEST.varname, "false");
+      put(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS.varname, "false");
+    }}, "test_key");
+  }
+
+  @AfterClass
+  public static void classLevelTearDown() throws IOException {
+    primary.close();
+    FileUtils.deleteQuietly(new File(jksFile));
+  }
+
+  @Before
+  public void setup() throws Throwable {
+    primaryDbName = testName.getMethodName() + "_" + +System.currentTimeMillis();
+    replicatedDbName = "replicated_" + primaryDbName;
+    primary.run("create database " + primaryDbName);
+  }
+
+  @Test
+  public void targetAndSourceHaveDifferentEncryptionZoneKeys() throws Throwable {
+    DFSTestUtil.createKey("test_key123", miniDFSCluster, conf);
+
+    WarehouseInstance replica = new WarehouseInstance(LOG, miniDFSCluster,
+        new HashMap<String, String>() {{
+          put(HiveConf.ConfVars.HIVE_IN_TEST.varname, "false");
+          put(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS.varname, "false");
+        }}, "test_key123");
+
+    WarehouseInstance.Tuple tuple =
+        primary.run("use " + primaryDbName)
+            .run("create table encrypted_table (id int, value string)")
+            .run("insert into table encrypted_table values (1,'value1')")
+            .run("insert into table encrypted_table values (2,'value2')")
+            .dump(primaryDbName, null);
+
+    replica
+        .run("repl load " + replicatedDbName + " from '" + tuple.dumpLocation
+            + "' with('hive.repl.add.raw.reserved.namespace'='true')")
+        .run("use " + replicatedDbName)
+        .run("repl status " + replicatedDbName)
+        .verifyResult(tuple.lastReplicationId)
+        .run("select value from encrypted_table")
+        .verifyFailure(new String[] { "value1", "value2" });
+  }
+
+  @Ignore("this is ignored as minidfs cluster as of writing this test looked like did not copy the "
+              + "files correctly")
+  @Test
+  public void targetAndSourceHaveSameEncryptionZoneKeys() throws Throwable {
+    WarehouseInstance replica = new WarehouseInstance(LOG, miniDFSCluster,
+        new HashMap<String, String>() {{
+          put(HiveConf.ConfVars.HIVE_IN_TEST.varname, "false");
+          put(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS.varname, "false");
+          put(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname,
+              UserGroupInformation.getCurrentUser().getUserName());
+        }}, "test_key");
+
+    WarehouseInstance.Tuple tuple =
+        primary.run("use " + primaryDbName)
+            .run("create table encrypted_table (id int, value string)")
+            .run("insert into table encrypted_table values (1,'value1')")
+            .run("insert into table encrypted_table values (2,'value2')")
+            .dump(primaryDbName, null);
+
+    replica
+        .run("repl load " + replicatedDbName + " from '" + tuple.dumpLocation
+            + "' with('hive.repl.add.raw.reserved.namespace'='true')")
+        .run("use " + replicatedDbName)
+        .run("repl status " + replicatedDbName)
+        .verifyResult(tuple.lastReplicationId)
+        .run("select value from encrypted_table")
+        .verifyResults(new String[] { "value1", "value2" });
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/22df53b6/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
index 0b46c04..0918d33 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
@@ -54,6 +54,7 @@ import java.util.Map;
 import java.util.stream.Collectors;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -69,8 +70,8 @@ class WarehouseInstance implements Closeable {
 
   private final static String LISTENER_CLASS = DbNotificationListener.class.getCanonicalName();
 
-  WarehouseInstance(Logger logger, MiniDFSCluster cluster, Map<String, String> overridesForHiveConf)
-      throws Exception {
+  WarehouseInstance(Logger logger, MiniDFSCluster cluster, Map<String, String> overridesForHiveConf,
+      String keyNameForEncryptedZone) throws Exception {
     this.logger = logger;
     this.miniDFSCluster = cluster;
     assert miniDFSCluster.isClusterUp();
@@ -78,15 +79,28 @@ class WarehouseInstance implements Closeable {
     DistributedFileSystem fs = miniDFSCluster.getFileSystem();
 
     Path warehouseRoot = mkDir(fs, "/warehouse" + uniqueIdentifier);
+    if (StringUtils.isNotEmpty(keyNameForEncryptedZone)) {
+      fs.createEncryptionZone(warehouseRoot, keyNameForEncryptedZone);
+    }
     Path cmRootPath = mkDir(fs, "/cmroot" + uniqueIdentifier);
     this.functionsRoot = mkDir(fs, "/functions" + uniqueIdentifier).toString();
     initialize(cmRootPath.toString(), warehouseRoot.toString(), overridesForHiveConf);
   }
 
-  WarehouseInstance(Logger logger, MiniDFSCluster cluster) throws Exception {
+  WarehouseInstance(Logger logger, MiniDFSCluster cluster, String keyNameForEncryptedZone)
+      throws Exception {
     this(logger, cluster, new HashMap<String, String>() {{
       put(HiveConf.ConfVars.HIVE_IN_TEST.varname, "true");
-    }});
+    }}, keyNameForEncryptedZone);
+  }
+
+  WarehouseInstance(Logger logger, MiniDFSCluster cluster,
+      Map<String, String> overridesForHiveConf) throws Exception {
+    this(logger, cluster, overridesForHiveConf, null);
+  }
+
+  WarehouseInstance(Logger logger, MiniDFSCluster cluster) throws Exception {
+    this(logger, cluster, (String) null);
   }
 
   private void initialize(String cmRoot, String warehouseRoot,
@@ -236,6 +250,20 @@ class WarehouseInstance implements Closeable {
     return this;
   }
 
+  WarehouseInstance verifyFailure(String[] data) throws IOException {
+    List<String> results = getOutput();
+    logger.info("Expecting {}", StringUtils.join(data, ","));
+    logger.info("Got {}", results);
+    boolean dataMatched = (data.length == results.size());
+    if (dataMatched) {
+      for (int i = 0; i < data.length; i++) {
+        dataMatched &= data[i].toLowerCase().equals(results.get(i).toLowerCase());
+      }
+    }
+    assertFalse(dataMatched);
+    return this;
+  }
+
   /**
    * verify's result without regard for ordering.
    */

http://git-wip-us.apache.org/repos/asf/hive/blob/22df53b6/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
index f24d1b6..4e61280 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hive.ql.parse.repl;
 
+import com.google.common.collect.Lists;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
@@ -31,25 +32,28 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Lists;
-
 import javax.security.auth.login.LoginException;
 import java.io.IOException;
+import java.net.URI;
+import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 public class CopyUtils {
 
   private static final Logger LOG = LoggerFactory.getLogger(CopyUtils.class);
+  // https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/TransparentEncryption.html#Running_as_the_superuser
+  private static final String RAW_RESERVED_VIRTUAL_PATH = "/.reserved/raw/";
+  private static final int MAX_COPY_RETRY = 3;
 
   private final HiveConf hiveConf;
   private final long maxCopyFileSize;
   private final long maxNumberOfFiles;
   private final boolean hiveInTest;
   private final String copyAsUser;
-  private final int MAX_COPY_RETRY = 3;
 
   public CopyUtils(String distCpDoAsUser, HiveConf hiveConf) {
     this.hiveConf = hiveConf;
@@ -107,8 +111,7 @@ public class CopyUtils {
                            FileSystem destinationFs, Path destination,
                            boolean useRegularCopy) throws IOException, LoginException {
     int repeat = 0;
-    List<Path> pathList = Lists.transform(fileList,
-                                          fileInfo -> { return fileInfo.getEffectivePath(); });
+    List<Path> pathList = Lists.transform(fileList, ReplChangeManager.FileInfo::getEffectivePath);
     while (!pathList.isEmpty() && (repeat < MAX_COPY_RETRY)) {
       try {
         doCopyOnce(sourceFs, pathList, destinationFs, destination, useRegularCopy);
@@ -143,21 +146,62 @@ public class CopyUtils {
                           boolean useRegularCopy) throws IOException, LoginException {
     UserGroupInformation ugi = Utils.getUGI();
     String currentUser = ugi.getShortUserName();
-    boolean usePrivilegedDistCp = copyAsUser != null && !currentUser.equals(copyAsUser);
+    boolean usePrivilegedUser = copyAsUser != null && !currentUser.equals(copyAsUser);
 
     if (useRegularCopy) {
-      Path[] paths = srcList.toArray(new Path[] {});
-      FileUtil.copy(sourceFs, paths, destinationFs, destination, false, true, hiveConf);
+      doRegularCopyOnce(sourceFs, srcList, destinationFs, destination, usePrivilegedUser);
+    } else {
+      doDistCpCopyOnce(sourceFs, srcList, destination, usePrivilegedUser);
+    }
+  }
+
+  private void doDistCpCopyOnce(FileSystem sourceFs, List<Path> srcList, Path destination,
+      boolean usePrivilegedUser) throws IOException {
+    if (hiveConf.getBoolVar(HiveConf.ConfVars.REPL_ADD_RAW_RESERVED_NAMESPACE)) {
+      srcList = srcList.stream().map(path -> {
+        URI uri = path.toUri();
+        return new Path(uri.getScheme(), uri.getAuthority(),
+            RAW_RESERVED_VIRTUAL_PATH + uri.getPath());
+      }).collect(Collectors.toList());
+      URI destinationUri = destination.toUri();
+      destination = new Path(destinationUri.getScheme(), destinationUri.getAuthority(),
+          RAW_RESERVED_VIRTUAL_PATH + destinationUri.getPath());
+      hiveConf.set("distcp.options.px","");
+    }
+
+    FileUtils.distCp(
+        sourceFs, // source file system
+        srcList,  // list of source paths
+        destination,
+        false,
+        usePrivilegedUser ? copyAsUser : null,
+        hiveConf,
+        ShimLoader.getHadoopShims()
+    );
+  }
+
+  private void doRegularCopyOnce(FileSystem sourceFs, List<Path> srcList, FileSystem destinationFs,
+      Path destination, boolean usePrivilegedUser) throws IOException {
+  /*
+    even for regular copy we have to use the same user permissions that distCp will use since
+    hive-server user might be different that the super user required to copy relevant files.
+   */
+    final Path[] paths = srcList.toArray(new Path[] {});
+    if (usePrivilegedUser) {
+      final Path finalDestination = destination;
+      UserGroupInformation proxyUser = UserGroupInformation.createProxyUser(
+          copyAsUser, UserGroupInformation.getLoginUser());
+      try {
+        proxyUser.doAs((PrivilegedExceptionAction<Boolean>) () -> {
+          FileUtil
+              .copy(sourceFs, paths, destinationFs, finalDestination, false, true, hiveConf);
+          return true;
+        });
+      } catch (InterruptedException e) {
+        throw new IOException(e);
+      }
     } else {
-      FileUtils.distCp(
-              sourceFs, // source file system
-              srcList,  // list of source paths
-              destination,
-              false,
-              usePrivilegedDistCp ? copyAsUser : null,
-              hiveConf,
-              ShimLoader.getHadoopShims()
-      );
+      FileUtil.copy(sourceFs, paths, destinationFs, destination, false, true, hiveConf);
     }
   }