You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ma...@apache.org on 2020/02/07 01:57:52 UTC
[hive] branch master updated: HIVE-22736 : Support replication
across multiple encryption zones. (Aasha Medhi,
reviewed by Mahesh Kumar Behera)
This is an automated email from the ASF dual-hosted git repository.
mahesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 7411d42 HIVE-22736 : Support replication across multiple encryption zones. (Aasha Medhi, reviewed by Mahesh Kumar Behera)
7411d42 is described below
commit 7411d42579ffa0bad96e8da731a1a35afc9ff614
Author: Aasha Medhi <aa...@gmail.com>
AuthorDate: Fri Feb 7 07:16:24 2020 +0530
HIVE-22736 : Support replication across multiple encryption zones. (Aasha Medhi, reviewed by Mahesh Kumar Behera)
Signed-off-by: Mahesh Kumar Behera <ma...@apache.org>
---
.../java/org/apache/hadoop/hive/conf/HiveConf.java | 5 +
.../TestMetaStoreMultipleEncryptionZones.java | 1434 ++++++++++++++++++++
.../hive/metastore/TestReplChangeManager.java | 1 +
.../cache/TestCachedStoreUpdateUsingEvents.java | 2 +
.../hive/ql/metadata/TestAlterTableMetadata.java | 1 +
.../java/org/apache/hive/jdbc/TestJdbcDriver2.java | 4 +
.../org/apache/hive/jdbc/TestJdbcWithMiniHS2.java | 1 +
.../ql/ddl/table/create/CreateTableOperation.java | 1 -
.../table/storage/AlterTableArchiveOperation.java | 4 +-
.../ddl/table/storage/AlterTableArchiveUtils.java | 5 +-
.../storage/AlterTableSetLocationOperation.java | 1 +
.../storage/AlterTableUnarchiveOperation.java | 6 +-
.../org/apache/hadoop/hive/ql/metadata/Hive.java | 4 +-
.../hive/ql/parse/ImportSemanticAnalyzer.java | 4 +-
.../hadoop/hive/ql/txn/compactor/Cleaner.java | 4 +-
.../apache/hadoop/hive/shims/Hadoop23Shims.java | 2 +-
.../org/apache/hadoop/hive/shims/HadoopShims.java | 15 +-
standalone-metastore/metastore-common/pom.xml | 5 +
.../hadoop/hive/metastore/ReplChangeManager.java | 187 ++-
.../hadoop/hive/metastore/conf/MetastoreConf.java | 5 +
.../apache/hadoop/hive/metastore/utils/Retry.java | 52 +
.../hadoop/hive/metastore/utils/package-info.java | 22 +
.../hadoop/hive/metastore/utils/RetryTest.java | 57 +
.../hadoop/hive/metastore/utils/package-info.java | 22 +
.../hadoop/hive/metastore/HiveAlterHandler.java | 7 +-
.../hadoop/hive/metastore/HiveMetaStore.java | 120 +-
26 files changed, 1886 insertions(+), 85 deletions(-)
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 12a022c..a120b45 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -452,6 +452,11 @@ public class HiveConf extends Configuration {
REPLCMRETIAN("hive.repl.cm.retain","24h",
new TimeValidator(TimeUnit.HOURS),
"Time to retain removed files in cmrootdir."),
+ REPLCMENCRYPTEDDIR("hive.repl.cm.encryptionzone.rootdir", ".cmroot",
+ "Root dir for ChangeManager if encryption zones are enabled, used for deleted files."),
+ REPLCMFALLBACKNONENCRYPTEDDIR("hive.repl.cm.nonencryptionzone.rootdir",
+ "/user/${system:user.name}/cmroot/",
+ "Root dir for ChangeManager for non encrypted paths if hive.repl.cmrootdir is encrypted."),
REPLCMINTERVAL("hive.repl.cm.interval","3600s",
new TimeValidator(TimeUnit.SECONDS),
"Inteval for cmroot cleanup thread."),
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMultipleEncryptionZones.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMultipleEncryptionZones.java
new file mode 100644
index 0000000..51bb787
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMultipleEncryptionZones.java
@@ -0,0 +1,1434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.ReplChangeManager.RecycleType;
+
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.hive.shims.HadoopShims;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.hive.metastore.client.builder.DatabaseBuilder;
+import org.apache.hadoop.hive.metastore.client.builder.TableBuilder;
+import org.apache.hadoop.hive.metastore.api.Type;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.thrift.TException;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * TestMetaStoreAuthorization.
+ */
+public class TestMetaStoreMultipleEncryptionZones {
+ private static HiveMetaStoreClient client;
+ private static HiveConf hiveConf;
+ private static Configuration conf;
+ private static Warehouse warehouse;
+ private static FileSystem warehouseFs;
+ private static MiniDFSCluster miniDFSCluster;
+ private static String cmroot;
+ private static FileSystem fs;
+ private static HadoopShims.HdfsEncryptionShim shimCm;
+ private static String cmrootEncrypted;
+ private static String jksFile = System.getProperty("java.io.tmpdir") + "/test.jks";
+ private static String cmrootFallBack;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ //Create secure cluster
+ conf = new Configuration();
+ conf.set("hadoop.security.key.provider.path", "jceks://file" + jksFile);
+ miniDFSCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build();
+ DFSTestUtil.createKey("test_key_cm", miniDFSCluster, conf);
+ DFSTestUtil.createKey("test_key_db", miniDFSCluster, conf);
+ hiveConf = new HiveConf(TestReplChangeManager.class);
+ hiveConf.setBoolean(HiveConf.ConfVars.REPLCMENABLED.varname, true);
+ hiveConf.setInt(CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY, 60);
+ hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname,
+ "hdfs://" + miniDFSCluster.getNameNode().getHostAndPort()
+ + HiveConf.ConfVars.METASTOREWAREHOUSE.defaultStrVal);
+
+ cmroot = "hdfs://" + miniDFSCluster.getNameNode().getHostAndPort() + "/cmroot";
+ cmrootFallBack = "hdfs://" + miniDFSCluster.getNameNode().getHostAndPort() + "/cmrootFallback";
+ cmrootEncrypted = "cmrootEncrypted";
+ hiveConf.set(HiveConf.ConfVars.REPLCMDIR.varname, cmroot);
+ hiveConf.set(HiveConf.ConfVars.REPLCMENCRYPTEDDIR.varname, cmrootEncrypted);
+ hiveConf.set(HiveConf.ConfVars.REPLCMFALLBACKNONENCRYPTEDDIR.varname, cmrootFallBack);
+ initReplChangeManager();
+ //Create cm in encrypted zone
+ shimCm = ShimLoader.getHadoopShims().createHdfsEncryptionShim(fs, conf);
+
+ try {
+ client = new HiveMetaStoreClient(hiveConf);
+ } catch (Throwable e) {
+ System.err.println("Unable to open the metastore");
+ System.err.println(StringUtils.stringifyException(e));
+ throw e;
+ }
+ }
+
+ private static void initReplChangeManager() throws Exception{
+ warehouse = new Warehouse(hiveConf);
+ warehouseFs = warehouse.getWhRoot().getFileSystem(hiveConf);
+ fs = new Path(cmroot).getFileSystem(hiveConf);
+ fs.mkdirs(warehouse.getWhRoot());
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ try {
+ miniDFSCluster.shutdown();
+ client.close();
+ } catch (Throwable e) {
+ System.err.println("Unable to close metastore");
+ System.err.println(StringUtils.stringifyException(e));
+ throw e;
+ }
+ }
+
+ @Test
+ public void dropTableWithDifferentEncryptionZonesDifferentKey() throws Throwable {
+ String dbName1 = "encrdbdiffkey1";
+ String dbName2 = "encrdbdiffkey2";
+ String tblName1 = "encrtbl1";
+ String tblName2 = "encrtbl2";
+ String typeName = "Person";
+
+ silentDropDatabase(dbName1);
+ silentDropDatabase(dbName2);
+ new DatabaseBuilder()
+ .setName(dbName1)
+ .addParam("repl.source.for", "1, 2, 3")
+ .create(client, hiveConf);
+
+ new DatabaseBuilder()
+ .setName(dbName2)
+ .addParam("repl.source.for", "1, 2, 3")
+ .create(client, hiveConf);
+
+ client.dropType(typeName);
+ Type typ1 = new Type();
+ typ1.setName(typeName);
+ typ1.setFields(new ArrayList<>(2));
+ typ1.getFields().add(
+ new FieldSchema("name", ColumnType.STRING_TYPE_NAME, ""));
+ typ1.getFields().add(
+ new FieldSchema("income", ColumnType.INT_TYPE_NAME, ""));
+ client.createType(typ1);
+
+ Path dirDb1 = new Path(warehouse.getWhRoot(), dbName1 +".db");
+ warehouseFs.delete(dirDb1, true);
+ warehouseFs.mkdirs(dirDb1);
+ shimCm.createEncryptionZone(dirDb1, "test_key_db");
+ Path dirTbl1 = new Path(dirDb1, tblName1);
+ warehouseFs.mkdirs(dirTbl1);
+ Path part11 = new Path(dirTbl1, "part1");
+ createFile(part11, "testClearer11");
+
+ Path dirDb2 = new Path(warehouse.getWhRoot(), dbName2 +".db");
+ warehouseFs.delete(dirDb2, true);
+ warehouseFs.mkdirs(dirDb2);
+ shimCm.createEncryptionZone(dirDb2, "test_key_cm");
+ Path dirTbl2 = new Path(dirDb2, tblName2);
+ warehouseFs.mkdirs(dirTbl2);
+ Path part12 = new Path(dirTbl2, "part1");
+ createFile(part12, "testClearer12");
+
+ new TableBuilder()
+ .setDbName(dbName1)
+ .setTableName(tblName1)
+ .setCols(typ1.getFields())
+ .setNumBuckets(1)
+ .addBucketCol("name")
+ .addStorageDescriptorParam("test_param_1", "Use this for comments etc")
+ .create(client, hiveConf);
+
+ Table tbl = client.getTable(dbName1, tblName1);
+ Assert.assertNotNull(tbl);
+
+ new TableBuilder()
+ .setDbName(dbName2)
+ .setTableName(tblName2)
+ .setCols(typ1.getFields())
+ .setNumBuckets(1)
+ .addBucketCol("name")
+ .addStorageDescriptorParam("test_param_1", "Use this for comments etc")
+ .create(client, hiveConf);
+
+ boolean exceptionThrown = false;
+ try {
+ client.dropTable(dbName1, tblName1);
+ } catch (MetaException e) {
+ exceptionThrown = true;
+ assertTrue(e.getMessage().contains("can't be moved from encryption zone"));
+ }
+ assertFalse(exceptionThrown);
+ assertFalse(warehouseFs.exists(part11));
+ try {
+ client.getTable(dbName1, tblName1);
+ } catch (NoSuchObjectException e) {
+ exceptionThrown = true;
+ }
+ assertTrue(exceptionThrown);
+ exceptionThrown = false;
+ try {
+ client.dropTable(dbName2, tblName2);
+ } catch (MetaException e) {
+ exceptionThrown = true;
+ assertTrue(e.getMessage().contains("can't be moved from encryption zone"));
+ }
+ assertFalse(exceptionThrown);
+ assertFalse(warehouseFs.exists(part12));
+ try {
+ client.getTable(dbName2, tblName2);
+ } catch (NoSuchObjectException e) {
+ exceptionThrown = true;
+ }
+ assertTrue(exceptionThrown);
+
+
+ }
+
+ @Test
+ public void dropTableWithTableAtEncryptionZoneRoot() throws Throwable {
+ String dbName = "encrdbroot";
+ String tblName1 = "encrtbl1";
+ String tblName2 = "encrtbl2";
+ String typeName = "Person";
+
+ silentDropDatabase(dbName);
+ new DatabaseBuilder()
+ .setName(dbName)
+ .addParam("repl.source.for", "1, 2, 3")
+ .create(client, hiveConf);
+
+ client.dropType(typeName);
+ Type typ1 = new Type();
+ typ1.setName(typeName);
+ typ1.setFields(new ArrayList<>(2));
+ typ1.getFields().add(
+ new FieldSchema("name", ColumnType.STRING_TYPE_NAME, ""));
+ typ1.getFields().add(
+ new FieldSchema("income", ColumnType.INT_TYPE_NAME, ""));
+ client.createType(typ1);
+
+ new TableBuilder()
+ .setDbName(dbName)
+ .setTableName(tblName1)
+ .setCols(typ1.getFields())
+ .setNumBuckets(1)
+ .addBucketCol("name")
+ .addStorageDescriptorParam("test_param_1", "Use this for comments etc")
+ .create(client, hiveConf);
+
+ Table tbl = client.getTable(dbName, tblName1);
+ Assert.assertNotNull(tbl);
+
+ new TableBuilder()
+ .setDbName(dbName)
+ .setTableName(tblName2)
+ .setCols(typ1.getFields())
+ .setNumBuckets(1)
+ .addBucketCol("name")
+ .addStorageDescriptorParam("test_param_1", "Use this for comments etc")
+ .create(client, hiveConf);
+
+ Path dirDb = new Path(warehouse.getWhRoot(), dbName +".db");
+ warehouseFs.mkdirs(dirDb);
+ Path dirTbl1 = new Path(dirDb, tblName1);
+ warehouseFs.mkdirs(dirTbl1);
+ shimCm.createEncryptionZone(dirTbl1, "test_key_db");
+ Path part11 = new Path(dirTbl1, "part1");
+ createFile(part11, "testClearer11");
+
+ Path dirTbl2 = new Path(dirDb, tblName2);
+ warehouseFs.mkdirs(dirTbl2);
+ shimCm.createEncryptionZone(dirTbl2, "test_key_cm");
+ Path part12 = new Path(dirTbl2, "part1");
+ createFile(part12, "testClearer12");
+
+ boolean exceptionThrown = false;
+ try {
+ client.dropTable(dbName, tblName1);
+ } catch (MetaException e) {
+ exceptionThrown = true;
+ assertTrue(e.getMessage().contains("can't be moved from encryption zone"));
+ }
+ assertFalse(exceptionThrown);
+ assertFalse(warehouseFs.exists(part11));
+ try {
+ client.getTable(dbName, tblName1);
+ } catch (NoSuchObjectException e) {
+ exceptionThrown = true;
+ }
+ assertTrue(exceptionThrown);
+ exceptionThrown = false;
+ try {
+ client.dropTable(dbName, tblName2);
+ } catch (MetaException e) {
+ exceptionThrown = true;
+ assertTrue(e.getMessage().contains("can't be moved from encryption zone"));
+ }
+ assertFalse(exceptionThrown);
+ assertFalse(warehouseFs.exists(part12));
+ try {
+ client.getTable(dbName, tblName2);
+ } catch (NoSuchObjectException e) {
+ exceptionThrown = true;
+ }
+ assertTrue(exceptionThrown);
+ assertTrue(warehouseFs.exists(new Path(dirTbl1, cmrootEncrypted)));
+ assertTrue(warehouseFs.exists(new Path(dirTbl2, cmrootEncrypted)));
+ }
+
+ @Test
+ public void dropTableWithDifferentEncryptionZonesSameKey() throws Throwable {
+ String dbName1 = "encrdbsamekey1";
+ String dbName2 = "encrdbsamekey2";
+ String tblName1 = "encrtbl1";
+ String tblName2 = "encrtbl2";
+ String typeName = "Person";
+
+ silentDropDatabase(dbName1);
+ silentDropDatabase(dbName2);
+ new DatabaseBuilder()
+ .setName(dbName1)
+ .addParam("repl.source.for", "1, 2, 3")
+ .create(client, hiveConf);
+
+ new DatabaseBuilder()
+ .setName(dbName2)
+ .addParam("repl.source.for", "1, 2, 3")
+ .create(client, hiveConf);
+
+ client.dropType(typeName);
+ Type typ1 = new Type();
+ typ1.setName(typeName);
+ typ1.setFields(new ArrayList<>(2));
+ typ1.getFields().add(
+ new FieldSchema("name", ColumnType.STRING_TYPE_NAME, ""));
+ typ1.getFields().add(
+ new FieldSchema("income", ColumnType.INT_TYPE_NAME, ""));
+ client.createType(typ1);
+
+ Path dirDb1 = new Path(warehouse.getWhRoot(), dbName1 +".db");
+ warehouseFs.mkdirs(dirDb1);
+ shimCm.createEncryptionZone(dirDb1, "test_key_db");
+ Path dirTbl1 = new Path(dirDb1, tblName1);
+ warehouseFs.mkdirs(dirTbl1);
+ Path part11 = new Path(dirTbl1, "part1");
+ createFile(part11, "testClearer11");
+
+ Path dirDb2 = new Path(warehouse.getWhRoot(), dbName2 +".db");
+ warehouseFs.mkdirs(dirDb2);
+ shimCm.createEncryptionZone(dirDb2, "test_key_db");
+ Path dirTbl2 = new Path(dirDb2, tblName2);
+ warehouseFs.mkdirs(dirTbl2);
+ Path part12 = new Path(dirTbl2, "part1");
+ createFile(part12, "testClearer12");
+
+ new TableBuilder()
+ .setDbName(dbName1)
+ .setTableName(tblName1)
+ .setCols(typ1.getFields())
+ .setNumBuckets(1)
+ .addBucketCol("name")
+ .addStorageDescriptorParam("test_param_1", "Use this for comments etc")
+ .create(client, hiveConf);
+
+ Table tbl = client.getTable(dbName1, tblName1);
+ Assert.assertNotNull(tbl);
+
+ new TableBuilder()
+ .setDbName(dbName2)
+ .setTableName(tblName2)
+ .setCols(typ1.getFields())
+ .setNumBuckets(1)
+ .addBucketCol("name")
+ .addStorageDescriptorParam("test_param_1", "Use this for comments etc")
+ .create(client, hiveConf);
+
+ boolean exceptionThrown = false;
+ try {
+ client.dropTable(dbName1, tblName1);
+ } catch (MetaException e) {
+ exceptionThrown = true;
+ assertTrue(e.getMessage().contains("can't be moved from encryption zone"));
+ }
+ assertFalse(exceptionThrown);
+ assertFalse(warehouseFs.exists(part11));
+ try {
+ client.getTable(dbName1, tblName1);
+ } catch (NoSuchObjectException e) {
+ exceptionThrown = true;
+ }
+ assertTrue(exceptionThrown);
+ exceptionThrown = false;
+ try {
+ client.dropTable(dbName2, tblName2);
+ } catch (MetaException e) {
+ exceptionThrown = true;
+ assertTrue(e.getMessage().contains("can't be moved from encryption zone"));
+ }
+ assertFalse(exceptionThrown);
+ assertFalse(warehouseFs.exists(part12));
+ try {
+ client.getTable(dbName2, tblName2);
+ } catch (NoSuchObjectException e) {
+ exceptionThrown = true;
+ }
+ assertTrue(exceptionThrown);
+
+ }
+
+ @Test
+ public void dropTableWithSameEncryptionZones() throws Throwable {
+ String dbName = "encrdb3";
+ String tblName1 = "encrtbl1";
+ String tblName2 = "encrtbl2";
+ String typeName = "Person";
+ silentDropDatabase(dbName);
+
+ new DatabaseBuilder()
+ .setName(dbName)
+ .addParam("repl.source.for", "1, 2, 3")
+ .create(client, hiveConf);
+
+ client.dropType(typeName);
+ Type typ1 = new Type();
+ typ1.setName(typeName);
+ typ1.setFields(new ArrayList<>(2));
+ typ1.getFields().add(
+ new FieldSchema("name", ColumnType.STRING_TYPE_NAME, ""));
+ typ1.getFields().add(
+ new FieldSchema("income", ColumnType.INT_TYPE_NAME, ""));
+ client.createType(typ1);
+
+ new TableBuilder()
+ .setDbName(dbName)
+ .setTableName(tblName1)
+ .setCols(typ1.getFields())
+ .setNumBuckets(1)
+ .addBucketCol("name")
+ .addStorageDescriptorParam("test_param_1", "Use this for comments etc")
+ .create(client, hiveConf);
+
+ Table tbl = client.getTable(dbName, tblName1);
+ Assert.assertNotNull(tbl);
+
+ new TableBuilder()
+ .setDbName(dbName)
+ .setTableName(tblName2)
+ .setCols(typ1.getFields())
+ .setNumBuckets(1)
+ .addBucketCol("name")
+ .addStorageDescriptorParam("test_param_1", "Use this for comments etc")
+ .create(client, hiveConf);
+
+ Path dirDb = new Path(warehouse.getWhRoot(), dbName +".db");
+ warehouseFs.delete(dirDb, true);
+ warehouseFs.mkdirs(dirDb);
+ shimCm.createEncryptionZone(dirDb, "test_key_db");
+ Path dirTbl1 = new Path(dirDb, tblName1);
+ warehouseFs.mkdirs(dirTbl1);
+ Path part11 = new Path(dirTbl1, "part1");
+ createFile(part11, "testClearer11");
+
+ Path dirTbl2 = new Path(dirDb, tblName2);
+ warehouseFs.mkdirs(dirTbl2);
+ Path part12 = new Path(dirTbl2, "part1");
+ createFile(part12, "testClearer12");
+
+ boolean exceptionThrown = false;
+ try {
+ client.dropTable(dbName, tblName1);
+ } catch (MetaException e) {
+ exceptionThrown = true;
+ assertTrue(e.getMessage().contains("can't be moved from encryption zone"));
+ }
+ assertFalse(exceptionThrown);
+ assertFalse(warehouseFs.exists(part11));
+ try {
+ client.getTable(dbName, tblName1);
+ } catch (NoSuchObjectException e) {
+ exceptionThrown = true;
+ }
+ assertTrue(exceptionThrown);
+ exceptionThrown = false;
+ try {
+ client.dropTable(dbName, tblName2);
+ } catch (MetaException e) {
+ exceptionThrown = true;
+ assertTrue(e.getMessage().contains("can't be moved from encryption zone"));
+ }
+ assertFalse(exceptionThrown);
+ assertFalse(warehouseFs.exists(part12));
+ try {
+ client.getTable(dbName, tblName2);
+ } catch (NoSuchObjectException e) {
+ exceptionThrown = true;
+ }
+ assertTrue(exceptionThrown);
+ }
+
+ @Test
+ public void dropTableWithoutEncryptionZonesForCm() throws Throwable {
+ String dbName = "simpdb1";
+ String tblName = "simptbl";
+ String typeName = "Person";
+ silentDropDatabase(dbName);
+ new DatabaseBuilder()
+ .setName(dbName)
+ .addParam("repl.source.for", "1, 2, 3")
+ .create(client, hiveConf);
+
+ client.dropType(typeName);
+ Type typ1 = new Type();
+ typ1.setName(typeName);
+ typ1.setFields(new ArrayList<>(2));
+ typ1.getFields().add(
+ new FieldSchema("name", ColumnType.STRING_TYPE_NAME, ""));
+ typ1.getFields().add(
+ new FieldSchema("income", ColumnType.INT_TYPE_NAME, ""));
+ client.createType(typ1);
+
+ new TableBuilder()
+ .setDbName(dbName)
+ .setTableName(tblName)
+ .setCols(typ1.getFields())
+ .setNumBuckets(1)
+ .addBucketCol("name")
+ .addStorageDescriptorParam("test_param_1", "Use this for comments etc")
+ .create(client, hiveConf);
+
+ Table tbl = client.getTable(dbName, tblName);
+ Assert.assertNotNull(tbl);
+
+ Path dirDb = new Path(warehouse.getWhRoot(), dbName +".db");
+ warehouseFs.mkdirs(dirDb);
+ Path dirTbl1 = new Path(dirDb, tblName);
+ warehouseFs.mkdirs(dirTbl1);
+ Path part11 = new Path(dirTbl1, "part1");
+ createFile(part11, "testClearer11");
+
+ boolean exceptionThrown = false;
+ try {
+ client.dropTable(dbName, tblName);
+ } catch (Exception e) {
+ exceptionThrown = true;
+ }
+ assertFalse(exceptionThrown);
+ assertFalse(warehouseFs.exists(part11));
+ try {
+ client.getTable(dbName, tblName);
+ } catch (NoSuchObjectException e) {
+ exceptionThrown = true;
+ }
+ assertTrue(exceptionThrown);
+ }
+
+ @Test
+ public void dropExternalTableWithSameEncryptionZonesForCm() throws Throwable {
+ String dbName = "encrdb4";
+ String tblName1 = "encrtbl1";
+ String tblName2 = "encrtbl2";
+ String typeName = "Person";
+ silentDropDatabase(dbName);
+ new DatabaseBuilder()
+ .setName(dbName)
+ .addParam("repl.source.for", "1, 2, 3")
+ .create(client, hiveConf);
+
+ client.dropType(typeName);
+ Type typ1 = new Type();
+ typ1.setName(typeName);
+ typ1.setFields(new ArrayList<>(2));
+ typ1.getFields().add(
+ new FieldSchema("name", ColumnType.STRING_TYPE_NAME, ""));
+ typ1.getFields().add(
+ new FieldSchema("income", ColumnType.INT_TYPE_NAME, ""));
+ client.createType(typ1);
+
+ new TableBuilder()
+ .setDbName(dbName)
+ .setTableName(tblName1)
+ .setCols(typ1.getFields())
+ .setNumBuckets(1)
+ .addBucketCol("name")
+ .addTableParam("EXTERNAL", "true")
+ .addTableParam("external.table.purge", "true")
+ .addStorageDescriptorParam("test_param_1", "Use this for comments etc")
+ .create(client, hiveConf);
+
+ Table tbl = client.getTable(dbName, tblName1);
+ Assert.assertNotNull(tbl);
+
+ new TableBuilder()
+ .setDbName(dbName)
+ .setTableName(tblName2)
+ .setCols(typ1.getFields())
+ .setNumBuckets(1)
+ .addBucketCol("name")
+ .addTableParam("EXTERNAL", "true")
+ .addTableParam("external.table.purge", "true")
+ .addStorageDescriptorParam("test_param_1", "Use this for comments etc")
+ .create(client, hiveConf);
+
+ Path dirDb = new Path(warehouse.getWhRoot(), dbName +".db");
+ warehouseFs.delete(dirDb, true);
+ warehouseFs.mkdirs(dirDb);
+ shimCm.createEncryptionZone(dirDb, "test_key_db");
+ Path dirTbl1 = new Path(dirDb, tblName1);
+ warehouseFs.mkdirs(dirTbl1);
+ Path part11 = new Path(dirTbl1, "part1");
+ createFile(part11, "testClearer11");
+
+ Path dirTbl2 = new Path(dirDb, tblName2);
+ warehouseFs.mkdirs(dirTbl2);
+ Path part12 = new Path(dirTbl2, "part1");
+ createFile(part12, "testClearer12");
+
+ boolean exceptionThrown = false;
+ try {
+ client.dropTable(dbName, tblName1);
+ } catch (MetaException e) {
+ exceptionThrown = true;
+ }
+ assertFalse(exceptionThrown);
+ assertFalse(warehouseFs.exists(part11));
+ try {
+ client.getTable(dbName, tblName1);
+ } catch (NoSuchObjectException e) {
+ exceptionThrown = true;
+ }
+ assertTrue(exceptionThrown);
+ exceptionThrown = false;
+ try {
+ client.dropTable(dbName, tblName2);
+ } catch (MetaException e) {
+ exceptionThrown = true;
+ }
+ assertFalse(exceptionThrown);
+ assertFalse(warehouseFs.exists(part11));
+ try {
+ client.getTable(dbName, tblName2);
+ } catch (NoSuchObjectException e) {
+ exceptionThrown = true;
+ }
+ assertTrue(exceptionThrown);
+ }
+
+ @Test
+ public void dropExternalTableWithDifferentEncryptionZones() throws Throwable {
+ String dbName = "encrdb5";
+ String tblName1 = "encrtbl1";
+ String tblName2 = "encrtbl2";
+ String typeName = "Person";
+
+ silentDropDatabase(dbName);
+ new DatabaseBuilder()
+ .setName(dbName)
+ .addParam("repl.source.for", "1, 2, 3")
+ .create(client, hiveConf);
+
+ client.dropType(typeName);
+ Type typ1 = new Type();
+ typ1.setName(typeName);
+ typ1.setFields(new ArrayList<>(2));
+ typ1.getFields().add(
+ new FieldSchema("name", ColumnType.STRING_TYPE_NAME, ""));
+ typ1.getFields().add(
+ new FieldSchema("income", ColumnType.INT_TYPE_NAME, ""));
+ client.createType(typ1);
+
+ new TableBuilder()
+ .setDbName(dbName)
+ .setTableName(tblName1)
+ .setCols(typ1.getFields())
+ .setNumBuckets(1)
+ .addBucketCol("name")
+ .addTableParam("EXTERNAL", "true")
+ .addTableParam("external.table.purge", "true")
+ .addStorageDescriptorParam("test_param_1", "Use this for comments etc")
+ .create(client, hiveConf);
+
+ Table tbl = client.getTable(dbName, tblName1);
+ Assert.assertNotNull(tbl);
+
+ new TableBuilder()
+ .setDbName(dbName)
+ .setTableName(tblName2)
+ .setCols(typ1.getFields())
+ .setNumBuckets(1)
+ .addBucketCol("name")
+ .addTableParam("EXTERNAL", "true")
+ .addTableParam("external.table.purge", "true")
+ .addStorageDescriptorParam("test_param_1", "Use this for comments etc")
+ .create(client, hiveConf);
+
+ Path dirDb = new Path(warehouse.getWhRoot(), dbName +".db");
+ warehouseFs.mkdirs(dirDb);
+ Path dirTbl1 = new Path(dirDb, tblName1);
+ warehouseFs.mkdirs(dirTbl1);
+ shimCm.createEncryptionZone(dirTbl1, "test_key_db");
+ Path part11 = new Path(dirTbl1, "part1");
+ createFile(part11, "testClearer11");
+
+ Path dirTbl2 = new Path(dirDb, tblName2);
+ warehouseFs.mkdirs(dirTbl2);
+ shimCm.createEncryptionZone(dirTbl2, "test_key_db");
+ Path part12 = new Path(dirTbl2, "part1");
+ createFile(part12, "testClearer12");
+
+ boolean exceptionThrown = false;
+ try {
+ client.dropTable(dbName, tblName1);
+ } catch (MetaException e) {
+ exceptionThrown = true;
+ }
+ assertFalse(exceptionThrown);
+ assertFalse(warehouseFs.exists(part11));
+ try {
+ client.getTable(dbName, tblName1);
+ } catch (NoSuchObjectException e) {
+ exceptionThrown = true;
+ }
+ assertTrue(exceptionThrown);
+ exceptionThrown = false;
+ try {
+ client.dropTable(dbName, tblName2);
+ } catch (MetaException e) {
+ exceptionThrown = true;
+ }
+ assertFalse(exceptionThrown);
+ assertFalse(warehouseFs.exists(part12));
+ try {
+ client.getTable(dbName, tblName2);
+ } catch (NoSuchObjectException e) {
+ exceptionThrown = true;
+ }
+ assertTrue(exceptionThrown);
+ }
+
+ @Test
+ public void dropExternalTableWithDifferentEncryptionZonesDifferentKey() throws Throwable {
+ String dbName = "encrdb6";
+ String tblName1 = "encrtbl1";
+ String tblName2 = "encrtbl2";
+ String typeName = "Person";
+
+ silentDropDatabase(dbName);
+ new DatabaseBuilder()
+ .setName(dbName)
+ .addParam("repl.source.for", "1, 2, 3")
+ .create(client, hiveConf);
+
+ client.dropType(typeName);
+ Type typ1 = new Type();
+ typ1.setName(typeName);
+ typ1.setFields(new ArrayList<>(2));
+ typ1.getFields().add(
+ new FieldSchema("name", ColumnType.STRING_TYPE_NAME, ""));
+ typ1.getFields().add(
+ new FieldSchema("income", ColumnType.INT_TYPE_NAME, ""));
+ client.createType(typ1);
+
+ new TableBuilder()
+ .setDbName(dbName)
+ .setTableName(tblName1)
+ .setCols(typ1.getFields())
+ .setNumBuckets(1)
+ .addBucketCol("name")
+ .addTableParam("EXTERNAL", "true")
+ .addTableParam("external.table.purge", "true")
+ .addStorageDescriptorParam("test_param_1", "Use this for comments etc")
+ .create(client, hiveConf);
+
+ Table tbl = client.getTable(dbName, tblName1);
+ Assert.assertNotNull(tbl);
+
+ new TableBuilder()
+ .setDbName(dbName)
+ .setTableName(tblName2)
+ .setCols(typ1.getFields())
+ .setNumBuckets(1)
+ .addBucketCol("name")
+ .addTableParam("EXTERNAL", "true")
+ .addTableParam("external.table.purge", "true")
+ .addStorageDescriptorParam("test_param_1", "Use this for comments etc")
+ .create(client, hiveConf);
+
+ Path dirDb = new Path(warehouse.getWhRoot(), dbName +".db");
+ warehouseFs.mkdirs(dirDb);
+ Path dirTbl1 = new Path(dirDb, tblName1);
+ warehouseFs.mkdirs(dirTbl1);
+ shimCm.createEncryptionZone(dirTbl1, "test_key_db");
+ Path part11 = new Path(dirTbl1, "part1");
+ createFile(part11, "testClearer11");
+
+ Path dirTbl2 = new Path(dirDb, tblName2);
+ warehouseFs.mkdirs(dirTbl2);
+ shimCm.createEncryptionZone(dirTbl2, "test_key_cm");
+ Path part12 = new Path(dirTbl2, "part1");
+ createFile(part12, "testClearer12");
+
+ boolean exceptionThrown = false;
+ try {
+ client.dropTable(dbName, tblName1);
+ } catch (MetaException e) {
+ exceptionThrown = true;
+ }
+ assertFalse(exceptionThrown);
+ assertFalse(warehouseFs.exists(part11));
+ try {
+ client.getTable(dbName, tblName1);
+ } catch (NoSuchObjectException e) {
+ exceptionThrown = true;
+ }
+ assertTrue(exceptionThrown);
+ exceptionThrown = false;
+ try {
+ client.dropTable(dbName, tblName2);
+ } catch (MetaException e) {
+ exceptionThrown = true;
+ }
+ assertFalse(exceptionThrown);
+ assertFalse(warehouseFs.exists(part12));
+ try {
+ client.getTable(dbName, tblName2);
+ } catch (NoSuchObjectException e) {
+ exceptionThrown = true;
+ }
+ assertTrue(exceptionThrown);
+ }
+
+ @Test
+ public void dropExternalTableWithoutEncryptionZonesForCm() throws Throwable {
+ String dbName = "simpdb2";
+ String tblName = "simptbl";
+ String typeName = "Person";
+ silentDropDatabase(dbName);
+ new DatabaseBuilder()
+ .setName(dbName)
+ .addParam("repl.source.for", "1, 2, 3")
+ .create(client, hiveConf);
+
+ client.dropType(typeName);
+ Type typ1 = new Type();
+ typ1.setName(typeName);
+ typ1.setFields(new ArrayList<>(2));
+ typ1.getFields().add(
+ new FieldSchema("name", ColumnType.STRING_TYPE_NAME, ""));
+ typ1.getFields().add(
+ new FieldSchema("income", ColumnType.INT_TYPE_NAME, ""));
+ client.createType(typ1);
+
+ new TableBuilder()
+ .setDbName(dbName)
+ .setTableName(tblName)
+ .setCols(typ1.getFields())
+ .setNumBuckets(1)
+ .addBucketCol("name")
+ .addTableParam("EXTERNAL", "true")
+ .addTableParam("external.table.purge", "true")
+ .addStorageDescriptorParam("test_param_1", "Use this for comments etc")
+ .create(client, hiveConf);
+
+ Table tbl = client.getTable(dbName, tblName);
+ Assert.assertNotNull(tbl);
+
+ Path dirDb = new Path(warehouse.getWhRoot(), dbName +".db");
+ warehouseFs.mkdirs(dirDb);
+ Path dirTbl1 = new Path(dirDb, tblName);
+ warehouseFs.mkdirs(dirTbl1);
+ Path part11 = new Path(dirTbl1, "part1");
+ createFile(part11, "testClearer11");
+
+ boolean exceptionThrown = false;
+ try {
+ client.dropTable(dbName, tblName);
+ } catch (Exception e) {
+ exceptionThrown = true;
+ }
+ assertFalse(exceptionThrown);
+ assertFalse(warehouseFs.exists(part11));
+ try {
+ client.getTable(dbName, tblName);
+ } catch (NoSuchObjectException e) {
+ exceptionThrown = true;
+ }
+ assertTrue(exceptionThrown);
+ }
+
+ @Test
+ public void truncateTableWithDifferentEncryptionZones() throws Throwable {
+ String dbName1 = "encrdbtrunc1";
+ String dbName2 = "encrdbtrunc2";
+ String tblName1 = "encrtbl1";
+ String tblName2 = "encrtbl2";
+ String typeName = "Person";
+
+ silentDropDatabase(dbName1);
+ silentDropDatabase(dbName2);
+ new DatabaseBuilder()
+ .setName(dbName1)
+ .addParam("repl.source.for", "1, 2, 3")
+ .create(client, hiveConf);
+
+ new DatabaseBuilder()
+ .setName(dbName2)
+ .addParam("repl.source.for", "1, 2, 3")
+ .create(client, hiveConf);
+
+ client.dropType(typeName);
+ Type typ1 = new Type();
+ typ1.setName(typeName);
+ typ1.setFields(new ArrayList<>(2));
+ typ1.getFields().add(
+ new FieldSchema("name", ColumnType.STRING_TYPE_NAME, ""));
+ typ1.getFields().add(
+ new FieldSchema("income", ColumnType.INT_TYPE_NAME, ""));
+ client.createType(typ1);
+
+ Path dirDb1 = new Path(warehouse.getWhRoot(), dbName1 +".db");
+ warehouseFs.delete(dirDb1, true);
+ warehouseFs.mkdirs(dirDb1);
+ shimCm.createEncryptionZone(dirDb1, "test_key_db");
+ Path dirTbl1 = new Path(dirDb1, tblName1);
+ warehouseFs.mkdirs(dirTbl1);
+ Path part11 = new Path(dirTbl1, "part1");
+ createFile(part11, "testClearer11");
+
+ Path dirDb2 = new Path(warehouse.getWhRoot(), dbName2 +".db");
+ warehouseFs.delete(dirDb2, true);
+ warehouseFs.mkdirs(dirDb2);
+ shimCm.createEncryptionZone(dirDb2, "test_key_db");
+ Path dirTbl2 = new Path(dirDb2, tblName2);
+ warehouseFs.mkdirs(dirTbl2);
+ Path part12 = new Path(dirTbl2, "part1");
+ createFile(part12, "testClearer12");
+
+ new TableBuilder()
+ .setDbName(dbName1)
+ .setTableName(tblName1)
+ .setCols(typ1.getFields())
+ .setNumBuckets(1)
+ .addBucketCol("name")
+ .addStorageDescriptorParam("test_param_1", "Use this for comments etc")
+ .create(client, hiveConf);
+
+ Table tbl = client.getTable(dbName1, tblName1);
+ Assert.assertNotNull(tbl);
+
+ new TableBuilder()
+ .setDbName(dbName2)
+ .setTableName(tblName2)
+ .setCols(typ1.getFields())
+ .setNumBuckets(1)
+ .addBucketCol("name")
+ .addStorageDescriptorParam("test_param_1", "Use this for comments etc")
+ .create(client, hiveConf);
+
+ boolean exceptionThrown = false;
+ try {
+ client.truncateTable(dbName1, tblName1, null);
+ } catch (MetaException e) {
+ exceptionThrown = true;
+ assertTrue(e.getMessage().contains("can't be moved from encryption zone"));
+ }
+ assertFalse(exceptionThrown);
+ assertFalse(warehouseFs.exists(part11));
+ assertNotNull(client.getTable(dbName1, tblName1));
+ exceptionThrown = false;
+ try {
+ client.truncateTable(dbName2, tblName2, null);
+ } catch (MetaException e) {
+ exceptionThrown = true;
+ assertTrue(e.getMessage().contains("can't be moved from encryption zone"));
+ }
+ assertFalse(exceptionThrown);
+ assertFalse(warehouseFs.exists(part12));
+ assertNotNull(client.getTable(dbName2, tblName2));
+ }
+
+ @Test
+ public void truncateTableWithDifferentEncryptionZonesDifferentKey() throws Throwable {
+ String dbName1 = "encrdb1";
+ String dbName2 = "encrdb2";
+ String tblName1 = "encrtbl1";
+ String tblName2 = "encrtbl2";
+ String typeName = "Person";
+
+ silentDropDatabase(dbName1);
+ silentDropDatabase(dbName2);
+ new DatabaseBuilder()
+ .setName(dbName1)
+ .addParam("repl.source.for", "1, 2, 3")
+ .create(client, hiveConf);
+
+ new DatabaseBuilder()
+ .setName(dbName2)
+ .addParam("repl.source.for", "1, 2, 3")
+ .create(client, hiveConf);
+
+ client.dropType(typeName);
+ Type typ1 = new Type();
+ typ1.setName(typeName);
+ typ1.setFields(new ArrayList<>(2));
+ typ1.getFields().add(
+ new FieldSchema("name", ColumnType.STRING_TYPE_NAME, ""));
+ typ1.getFields().add(
+ new FieldSchema("income", ColumnType.INT_TYPE_NAME, ""));
+ client.createType(typ1);
+
+ Path dirDb1 = new Path(warehouse.getWhRoot(), dbName1 +".db");
+ warehouseFs.mkdirs(dirDb1);
+ shimCm.createEncryptionZone(dirDb1, "test_key_db");
+ Path dirTbl1 = new Path(dirDb1, tblName1);
+ warehouseFs.mkdirs(dirTbl1);
+ Path part11 = new Path(dirTbl1, "part1");
+ createFile(part11, "testClearer11");
+
+ Path dirDb2 = new Path(warehouse.getWhRoot(), dbName2 +".db");
+ warehouseFs.mkdirs(dirDb2);
+ shimCm.createEncryptionZone(dirDb2, "test_key_db");
+ Path dirTbl2 = new Path(dirDb2, tblName2);
+ warehouseFs.mkdirs(dirTbl2);
+ Path part12 = new Path(dirTbl2, "part1");
+ createFile(part12, "testClearer12");
+
+ new TableBuilder()
+ .setDbName(dbName1)
+ .setTableName(tblName1)
+ .setCols(typ1.getFields())
+ .setNumBuckets(1)
+ .addBucketCol("name")
+ .addStorageDescriptorParam("test_param_1", "Use this for comments etc")
+ .create(client, hiveConf);
+
+ Table tbl = client.getTable(dbName1, tblName1);
+ Assert.assertNotNull(tbl);
+
+ new TableBuilder()
+ .setDbName(dbName2)
+ .setTableName(tblName2)
+ .setCols(typ1.getFields())
+ .setNumBuckets(1)
+ .addBucketCol("name")
+ .addStorageDescriptorParam("test_param_1", "Use this for comments etc")
+ .create(client, hiveConf);
+
+ boolean exceptionThrown = false;
+ try {
+ client.truncateTable(dbName1, tblName1, null);
+ } catch (MetaException e) {
+ exceptionThrown = true;
+ assertTrue(e.getMessage().contains("can't be moved from encryption zone"));
+ }
+ assertFalse(exceptionThrown);
+ assertFalse(warehouseFs.exists(part11));
+ assertNotNull(client.getTable(dbName1, tblName1));
+ exceptionThrown = false;
+ try {
+ client.truncateTable(dbName2, tblName2, null);
+ } catch (MetaException e) {
+ exceptionThrown = true;
+ assertTrue(e.getMessage().contains("can't be moved from encryption zone"));
+ }
+ assertFalse(exceptionThrown);
+ assertFalse(warehouseFs.exists(part12));
+ assertNotNull(client.getTable(dbName2, tblName2));
+ }
+
+ @Test
+ public void truncateTableWithSameEncryptionZones() throws Throwable {
+ String dbName = "encrdb9";
+ String tblName1 = "encrtbl1";
+ String tblName2 = "encrtbl2";
+ String typeName = "Person";
+ client.dropTable(dbName, tblName1);
+ client.dropTable(dbName, tblName2);
+ silentDropDatabase(dbName);
+ new DatabaseBuilder()
+ .setName(dbName)
+ .addParam("repl.source.for", "1, 2, 3")
+ .create(client, hiveConf);
+
+ client.dropType(typeName);
+ Type typ1 = new Type();
+ typ1.setName(typeName);
+ typ1.setFields(new ArrayList<>(2));
+ typ1.getFields().add(
+ new FieldSchema("name", ColumnType.STRING_TYPE_NAME, ""));
+ typ1.getFields().add(
+ new FieldSchema("income", ColumnType.INT_TYPE_NAME, ""));
+ client.createType(typ1);
+
+ new TableBuilder()
+ .setDbName(dbName)
+ .setTableName(tblName1)
+ .setCols(typ1.getFields())
+ .setNumBuckets(1)
+ .addBucketCol("name")
+ .addStorageDescriptorParam("test_param_1", "Use this for comments etc")
+ .create(client, hiveConf);
+
+ Table tbl = client.getTable(dbName, tblName1);
+ Assert.assertNotNull(tbl);
+
+ new TableBuilder()
+ .setDbName(dbName)
+ .setTableName(tblName2)
+ .setCols(typ1.getFields())
+ .setNumBuckets(1)
+ .addBucketCol("name")
+ .addStorageDescriptorParam("test_param_1", "Use this for comments etc")
+ .create(client, hiveConf);
+
+ Path dirDb = new Path(warehouse.getWhRoot(), dbName +".db");
+ warehouseFs.delete(dirDb, true);
+ warehouseFs.mkdirs(dirDb);
+ shimCm.createEncryptionZone(dirDb, "test_key_db");
+ Path dirTbl1 = new Path(dirDb, tblName1);
+ warehouseFs.mkdirs(dirTbl1);
+ Path part11 = new Path(dirTbl1, "part1");
+ createFile(part11, "testClearer11");
+
+ Path dirTbl2 = new Path(dirDb, tblName2);
+ warehouseFs.mkdirs(dirTbl2);
+ Path part12 = new Path(dirTbl2, "part1");
+ createFile(part12, "testClearer12");
+
+ boolean exceptionThrown = false;
+ try {
+ client.truncateTable(dbName, tblName1, null);
+ } catch (MetaException e) {
+ exceptionThrown = true;
+ assertTrue(e.getMessage().contains("can't be moved from encryption zone"));
+ }
+ assertFalse(exceptionThrown);
+ assertFalse(warehouseFs.exists(part11));
+ try {
+ client.getTable(dbName, tblName1);
+ } catch (NoSuchObjectException e) {
+ exceptionThrown = true;
+ }
+ assertFalse(exceptionThrown);
+
+ try {
+ client.truncateTable(dbName, tblName2, null);
+ } catch (MetaException e) {
+ exceptionThrown = true;
+ assertTrue(e.getMessage().contains("can't be moved from encryption zone"));
+ }
+ assertFalse(exceptionThrown);
+ assertFalse(warehouseFs.exists(part12));
+ try {
+ client.getTable(dbName, tblName2);
+ } catch (NoSuchObjectException e) {
+ exceptionThrown = true;
+ }
+ assertFalse(exceptionThrown);
+ }
+
+ @Test
+ public void truncateTableWithoutEncryptionZonesForCm() throws Throwable {
+ String dbName = "simpdb3";
+ String tblName = "simptbl";
+ String typeName = "Person";
+ client.dropTable(dbName, tblName);
+ silentDropDatabase(dbName);
+
+ new DatabaseBuilder()
+ .setName(dbName)
+ .addParam("repl.source.for", "1, 2, 3")
+ .create(client, hiveConf);
+
+ client.dropType(typeName);
+ Type typ1 = new Type();
+ typ1.setName(typeName);
+ typ1.setFields(new ArrayList<>(2));
+ typ1.getFields().add(
+ new FieldSchema("name", ColumnType.STRING_TYPE_NAME, ""));
+ typ1.getFields().add(
+ new FieldSchema("income", ColumnType.INT_TYPE_NAME, ""));
+ client.createType(typ1);
+
+ new TableBuilder()
+ .setDbName(dbName)
+ .setTableName(tblName)
+ .setCols(typ1.getFields())
+ .setNumBuckets(1)
+ .addBucketCol("name")
+ .addStorageDescriptorParam("test_param_1", "Use this for comments etc")
+ .create(client, hiveConf);
+
+ Table tbl2 = client.getTable(dbName, tblName);
+ Assert.assertNotNull(tbl2);
+
+ Path dirDb = new Path(warehouse.getWhRoot(), dbName +".db");
+ warehouseFs.mkdirs(dirDb);
+ Path dirTbl1 = new Path(dirDb, tblName);
+ warehouseFs.mkdirs(dirTbl1);
+ Path part11 = new Path(dirTbl1, "part1");
+ createFile(part11, "testClearer11");
+
+ boolean exceptionThrown = false;
+ try {
+ client.truncateTable(dbName, tblName, null);
+ } catch (Exception e) {
+ exceptionThrown = true;
+ }
+ assertFalse(exceptionThrown);
+ assertFalse(warehouseFs.exists(part11));
+ try {
+ client.getTable(dbName, tblName);
+ } catch (NoSuchObjectException e) {
+ exceptionThrown = true;
+ }
+ assertFalse(exceptionThrown);
+ }
+
+ @Test
+ public void recycleFailureWithDifferentEncryptionZonesForCm() throws Throwable {
+ Path dirDb = new Path(warehouse.getWhRoot(), "db2");
+ warehouseFs.delete(dirDb, true);
+ warehouseFs.mkdirs(dirDb);
+ Path dirTbl1 = new Path(dirDb, "tbl1");
+ warehouseFs.mkdirs(dirTbl1);
+ shimCm.createEncryptionZone(dirTbl1, "test_key_db");
+ Path part11 = new Path(dirTbl1, "part1");
+ createFile(part11, "testClearer11");
+
+ boolean exceptionThrown = false;
+ try {
+ ReplChangeManager.getInstance(hiveConf).recycle(dirTbl1, RecycleType.MOVE, false);
+ } catch (RemoteException e) {
+ exceptionThrown = true;
+ assertTrue(e.getMessage().contains("can't be moved from encryption zone"));
+ }
+ assertFalse(exceptionThrown);
+ }
+
+ @Test
+ public void testClearerEncrypted() throws Exception {
+ HiveConf hiveConfCmClearer = new HiveConf(TestReplChangeManager.class);
+ hiveConfCmClearer.setBoolean(HiveConf.ConfVars.REPLCMENABLED.varname, true);
+ hiveConfCmClearer.setInt(CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY, 60);
+ hiveConfCmClearer.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname,
+ "hdfs://" + miniDFSCluster.getNameNode().getHostAndPort()
+ + HiveConf.ConfVars.METASTOREWAREHOUSE.defaultStrVal);
+
+ String cmrootCmClearer = "hdfs://" + miniDFSCluster.getNameNode().getHostAndPort() + "/cmrootClearer";
+ hiveConfCmClearer.set(HiveConf.ConfVars.REPLCMDIR.varname, cmrootCmClearer);
+ Warehouse warehouseCmClearer = new Warehouse(hiveConfCmClearer);
+ FileSystem cmfs = new Path(cmrootCmClearer).getFileSystem(hiveConfCmClearer);
+ cmfs.mkdirs(warehouseCmClearer.getWhRoot());
+
+ HadoopShims.HdfsEncryptionShim shimCmEncrypted = ShimLoader.getHadoopShims().createHdfsEncryptionShim(cmfs, conf);
+
+ FileSystem fsWarehouse = warehouseCmClearer.getWhRoot().getFileSystem(hiveConfCmClearer);
+ long now = System.currentTimeMillis();
+ Path dirDb = new Path(warehouseCmClearer.getWhRoot(), "db1");
+ fsWarehouse.delete(dirDb, true);
+ fsWarehouse.mkdirs(dirDb);
+ Path dirTbl1 = new Path(dirDb, "tbl1");
+ fsWarehouse.mkdirs(dirTbl1);
+ shimCmEncrypted.createEncryptionZone(dirTbl1, "test_key_db");
+ Path part11 = new Path(dirTbl1, "part1");
+ createFile(part11, "testClearer11");
+ String fileChksum11 = ReplChangeManager.checksumFor(part11, fsWarehouse);
+ Path part12 = new Path(dirTbl1, "part2");
+ createFile(part12, "testClearer12");
+ String fileChksum12 = ReplChangeManager.checksumFor(part12, fsWarehouse);
+ Path dirTbl2 = new Path(dirDb, "tbl2");
+ fsWarehouse.mkdirs(dirTbl2);
+ shimCmEncrypted.createEncryptionZone(dirTbl2, "test_key_db");
+ Path part21 = new Path(dirTbl2, "part1");
+ createFile(part21, "testClearer21");
+ String fileChksum21 = ReplChangeManager.checksumFor(part21, fsWarehouse);
+ Path part22 = new Path(dirTbl2, "part2");
+ createFile(part22, "testClearer22");
+ String fileChksum22 = ReplChangeManager.checksumFor(part22, fsWarehouse);
+ Path dirTbl3 = new Path(dirDb, "tbl3");
+ fsWarehouse.mkdirs(dirTbl3);
+ shimCmEncrypted.createEncryptionZone(dirTbl3, "test_key_cm");
+ Path part31 = new Path(dirTbl3, "part1");
+ createFile(part31, "testClearer31");
+ String fileChksum31 = ReplChangeManager.checksumFor(part31, fsWarehouse);
+ Path part32 = new Path(dirTbl3, "part2");
+ createFile(part32, "testClearer32");
+ String fileChksum32 = ReplChangeManager.checksumFor(part32, fsWarehouse);
+
+ ReplChangeManager.getInstance(hiveConfCmClearer).recycle(dirTbl1, RecycleType.MOVE, false);
+ ReplChangeManager.getInstance(hiveConfCmClearer).recycle(dirTbl2, RecycleType.MOVE, false);
+ ReplChangeManager.getInstance(hiveConfCmClearer).recycle(dirTbl3, RecycleType.MOVE, true);
+
+ assertTrue(fsWarehouse.exists(ReplChangeManager.getCMPath(hiveConfCmClearer, part11.getName(), fileChksum11,
+ ReplChangeManager.getCmRoot(part11).toString())));
+ assertTrue(fsWarehouse.exists(ReplChangeManager.getCMPath(hiveConfCmClearer, part12.getName(), fileChksum12,
+ ReplChangeManager.getCmRoot(part12).toString())));
+ assertTrue(fsWarehouse.exists(ReplChangeManager.getCMPath(hiveConfCmClearer, part21.getName(), fileChksum21,
+ ReplChangeManager.getCmRoot(part21).toString())));
+ assertTrue(fsWarehouse.exists(ReplChangeManager.getCMPath(hiveConfCmClearer, part22.getName(), fileChksum22,
+ ReplChangeManager.getCmRoot(part22).toString())));
+ assertTrue(fsWarehouse.exists(ReplChangeManager.getCMPath(hiveConfCmClearer, part31.getName(), fileChksum31,
+ ReplChangeManager.getCmRoot(part31).toString())));
+ assertTrue(fsWarehouse.exists(ReplChangeManager.getCMPath(hiveConfCmClearer, part32.getName(), fileChksum32,
+ ReplChangeManager.getCmRoot(part32).toString())));
+
+ fsWarehouse.setTimes(ReplChangeManager.getCMPath(hiveConfCmClearer, part11.getName(), fileChksum11,
+ ReplChangeManager.getCmRoot(part11).toString()),
+ now - 86400*1000*2, now - 86400*1000*2);
+ fsWarehouse.setTimes(ReplChangeManager.getCMPath(hiveConfCmClearer, part21.getName(), fileChksum21,
+ ReplChangeManager.getCmRoot(part21).toString()),
+ now - 86400*1000*2, now - 86400*1000*2);
+ fsWarehouse.setTimes(ReplChangeManager.getCMPath(hiveConfCmClearer, part31.getName(), fileChksum31,
+ ReplChangeManager.getCmRoot(part31).toString()),
+ now - 86400*1000*2, now - 86400*1000*2);
+ fsWarehouse.setTimes(ReplChangeManager.getCMPath(hiveConfCmClearer, part32.getName(), fileChksum32,
+ ReplChangeManager.getCmRoot(part32).toString()),
+ now - 86400*1000*2, now - 86400*1000*2);
+
+ ReplChangeManager.scheduleCMClearer(hiveConfCmClearer);
+
+ long start = System.currentTimeMillis();
+ long end;
+ boolean cleared = false;
+ do {
+ Thread.sleep(200);
+ end = System.currentTimeMillis();
+ if (end - start > 5000) {
+ Assert.fail("timeout, cmroot has not been cleared");
+ }
+ if (!fsWarehouse.exists(ReplChangeManager.getCMPath(hiveConfCmClearer, part11.getName(), fileChksum11,
+ ReplChangeManager.getCmRoot(part11).toString())) &&
+ fsWarehouse.exists(ReplChangeManager.getCMPath(hiveConfCmClearer, part12.getName(), fileChksum12,
+ ReplChangeManager.getCmRoot(part12).toString())) &&
+ !fsWarehouse.exists(ReplChangeManager.getCMPath(hiveConfCmClearer, part21.getName(), fileChksum21,
+ ReplChangeManager.getCmRoot(part21).toString())) &&
+ fsWarehouse.exists(ReplChangeManager.getCMPath(hiveConfCmClearer, part22.getName(), fileChksum22,
+ ReplChangeManager.getCmRoot(part22).toString())) &&
+ !fsWarehouse.exists(ReplChangeManager.getCMPath(hiveConfCmClearer, part31.getName(), fileChksum31,
+ ReplChangeManager.getCmRoot(part31).toString())) &&
+ !fsWarehouse.exists(ReplChangeManager.getCMPath(hiveConfCmClearer, part32.getName(), fileChksum32,
+ ReplChangeManager.getCmRoot(part32).toString()))) {
+ cleared = true;
+ }
+ } while (!cleared);
+ }
+
+ @Test
+ public void testCmrootEncrypted() throws Exception {
+ HiveConf encryptedHiveConf = new HiveConf(TestReplChangeManager.class);
+ encryptedHiveConf.setBoolean(HiveConf.ConfVars.REPLCMENABLED.varname, true);
+ encryptedHiveConf.setInt(CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY, 60);
+ encryptedHiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname,
+ "hdfs://" + miniDFSCluster.getNameNode().getHostAndPort()
+ + HiveConf.ConfVars.METASTOREWAREHOUSE.defaultStrVal);
+
+ String cmrootdirEncrypted = "hdfs://" + miniDFSCluster.getNameNode().getHostAndPort() + "/cmroot";
+ encryptedHiveConf.set(HiveConf.ConfVars.REPLCMDIR.varname, cmrootdirEncrypted);
+ encryptedHiveConf.set(HiveConf.ConfVars.REPLCMFALLBACKNONENCRYPTEDDIR.varname, cmrootFallBack);
+
+ //Create cm in encrypted zone
+ HadoopShims.HdfsEncryptionShim shimCmEncrypted = ShimLoader.getHadoopShims().createHdfsEncryptionShim(fs, conf);
+ shimCmEncrypted.createEncryptionZone(new Path(cmrootdirEncrypted), "test_key_db");
+ ReplChangeManager.resetReplChangeManagerInstance();
+ Warehouse warehouseEncrypted = new Warehouse(encryptedHiveConf);
+ FileSystem warehouseFsEncrypted = warehouseEncrypted.getWhRoot().getFileSystem(encryptedHiveConf);
+ FileSystem fsCmEncrypted = new Path(cmrootdirEncrypted).getFileSystem(encryptedHiveConf);
+ fsCmEncrypted.mkdirs(warehouseEncrypted.getWhRoot());
+
+ Path dirDb = new Path(warehouseEncrypted.getWhRoot(), "db3");
+ warehouseFsEncrypted.delete(dirDb, true);
+ warehouseFsEncrypted.mkdirs(dirDb);
+ Path dirTbl1 = new Path(dirDb, "tbl1");
+ warehouseFsEncrypted.mkdirs(dirTbl1);
+ shimCmEncrypted.createEncryptionZone(dirTbl1, "test_key_db");
+ Path part11 = new Path(dirTbl1, "part1");
+ createFile(part11, "testClearer11");
+
+ boolean exceptionThrown = false;
+ try {
+ ReplChangeManager.getInstance(encryptedHiveConf).recycle(dirTbl1, RecycleType.MOVE, false);
+ } catch (RemoteException e) {
+ exceptionThrown = true;
+ assertTrue(e.getMessage().contains("can't be moved from encryption zone"));
+ }
+ assertFalse(exceptionThrown);
+
+ Path dirDbUnEncrypted = new Path(warehouseEncrypted.getWhRoot(), "db3en");
+ warehouseFsEncrypted.delete(dirDbUnEncrypted, true);
+ warehouseFsEncrypted.mkdirs(dirDbUnEncrypted);
+ Path dirTblun1 = new Path(dirDbUnEncrypted, "tbl1");
+ warehouseFsEncrypted.mkdirs(dirTblun1);
+ Path partun11 = new Path(dirTblun1, "part1");
+ createFile(partun11, "testClearer11");
+
+ exceptionThrown = false;
+ try {
+ ReplChangeManager.getInstance(encryptedHiveConf).recycle(dirDbUnEncrypted, RecycleType.MOVE, false);
+ } catch (IOException e) {
+ exceptionThrown = true;
+ }
+ assertFalse(exceptionThrown);
+ ReplChangeManager.resetReplChangeManagerInstance();
+ initReplChangeManager();
+ }
+
+
+ private void createFile(Path path, String content) throws IOException {
+ FSDataOutputStream output = path.getFileSystem(hiveConf).create(path);
+ output.writeChars(content);
+ output.close();
+ }
+
+ private void silentDropDatabase(String dbName) throws TException {
+ try {
+ for (String tableName : client.getTables(dbName, "*")) {
+ client.dropTable(dbName, tableName);
+ }
+ client.dropDatabase(dbName);
+ } catch (NoSuchObjectException|InvalidOperationException e) {
+ // NOP
+ }
+ }
+}
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java
index 5ab4f91..d3891cf 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java
@@ -271,6 +271,7 @@ public class TestReplChangeManager {
FileSystem fs = warehouse.getWhRoot().getFileSystem(hiveConf);
long now = System.currentTimeMillis();
Path dirDb = new Path(warehouse.getWhRoot(), "db3");
+ fs.delete(dirDb, true);
fs.mkdirs(dirDb);
Path dirTbl1 = new Path(dirDb, "tbl1");
fs.mkdirs(dirTbl1);
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStoreUpdateUsingEvents.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStoreUpdateUsingEvents.java
index 562b2c9..19d38d2 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStoreUpdateUsingEvents.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStoreUpdateUsingEvents.java
@@ -48,6 +48,8 @@ public class TestCachedStoreUpdateUsingEvents {
MetastoreConf.setBoolVar(conf, ConfVars.METASTORE_CACHE_CAN_USE_EVENT, true);
MetastoreConf.setBoolVar(conf, ConfVars.HIVE_TXN_STATS_ENABLED, true);
MetastoreConf.setBoolVar(conf, ConfVars.AGGREGATE_STATS_CACHE_ENABLED, false);
+ MetastoreConf.setBoolVar(conf, ConfVars.REPLCMENABLED, true);
+ MetastoreConf.setVar(conf, ConfVars.REPLCMDIR, "cmroot");
MetaStoreTestUtils.setConfForStandloneMode(conf);
hmsHandler = new HiveMetaStore.HMSHandler("testCachedStore", conf, true);
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/metadata/TestAlterTableMetadata.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/metadata/TestAlterTableMetadata.java
index f6035fa..96aeb0f 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/metadata/TestAlterTableMetadata.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/metadata/TestAlterTableMetadata.java
@@ -63,5 +63,6 @@ public class TestAlterTableMetadata {
table = Hive.get(conf).getTable("t1");
assertEquals(PrincipalType.ROLE, table.getOwnerType());
assertEquals("r1", table.getOwner());
+
}
}
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java
index 92a0bbe..7e0a7f2 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java
@@ -202,6 +202,8 @@ public class TestJdbcDriver2 {
System.setProperty(ConfVars.HIVE_AUTHORIZATION_MANAGER.varname,
"org.apache.hadoop.hive.ql.security.authorization.DefaultHiveAuthorizationProvider");
System.setProperty(ConfVars.HIVE_SERVER2_PARALLEL_OPS_IN_SESSION.varname, "false");
+ System.setProperty(ConfVars.REPLCMENABLED.varname, "true");
+ System.setProperty(ConfVars.REPLCMDIR.varname, "cmroot");
con = getConnection(defaultDbName + ";create=true");
Statement stmt = con.createStatement();
assertNotNull("Statement is null", stmt);
@@ -2828,6 +2830,8 @@ public class TestJdbcDriver2 {
stmt.execute("set hive.metastore.transactional.event.listeners =" +
" org.apache.hive.hcatalog.listener.DbNotificationListener");
stmt.execute("set hive.metastore.dml.events = true");
+ stmt.execute("set hive.repl.cm.enabled = true");
+ stmt.execute("set hive.repl.cmrootdir = cmroot");
stmt.execute("create database " + primaryDb + " with dbproperties('repl.source.for'='1,2,3')");
stmt.execute("create table " + primaryTblName + " (id int)");
stmt.execute("insert into " + primaryTblName + " values (1), (2)");
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java
index a3299ee..79beadd 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java
@@ -106,6 +106,7 @@ public class TestJdbcWithMiniHS2 {
HiveConf conf = new HiveConf();
dataFileDir = conf.get("test.data.files").replace('\\', '/').replace("c:", "");
kvDataFilePath = new Path(dataFileDir, "kv1.txt");
+
try {
startMiniHS2(conf);
} catch (Exception e) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/create/CreateTableOperation.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/create/CreateTableOperation.java
index cf4bc81..93c0209 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/create/CreateTableOperation.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/create/CreateTableOperation.java
@@ -67,7 +67,6 @@ public class CreateTableOperation extends DDLOperation<CreateTableDesc> {
if (desc.getReplicationSpec().allowEventReplacementInto(existingTable.getParameters())) {
desc.setReplaceMode(true); // we replace existing table.
ReplicationSpec.copyLastReplId(existingTable.getParameters(), tbl.getParameters());
-
// If location of an existing managed table is changed, then need to delete the old location if exists.
// This scenario occurs when a managed table is converted into external table at source. In this case,
// at target, the table data would be moved to different location under base directory for external tables.
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableArchiveOperation.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableArchiveOperation.java
index 5c5dec4..248fe0f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableArchiveOperation.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableArchiveOperation.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.ddl.table.storage;
+import org.apache.hadoop.hive.metastore.ReplChangeManager;
import org.apache.hadoop.hive.ql.ddl.DDLOperationContext;
import org.apache.hadoop.hive.ql.exec.ArchiveUtils;
import org.apache.hadoop.hive.ql.exec.Utilities;
@@ -308,7 +309,8 @@ public class AlterTableArchiveOperation extends DDLOperation<AlterTableArchiveDe
private void deleteIntermediateOriginalDir(Table table, Path intermediateOriginalDir) throws HiveException {
if (HdfsUtils.pathExists(intermediateOriginalDir, context.getConf())) {
- AlterTableArchiveUtils.deleteDir(intermediateOriginalDir, context.getDb().getDatabase(table.getDbName()),
+ AlterTableArchiveUtils.deleteDir(intermediateOriginalDir,
+ ReplChangeManager.shouldEnableCm(context.getDb().getDatabase(table.getDbName()), table.getTTable()),
context.getConf());
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableArchiveUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableArchiveUtils.java
index ebac6f6..c285405 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableArchiveUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableArchiveUtils.java
@@ -25,7 +25,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.Warehouse;
-import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.exec.ArchiveUtils;
@@ -95,10 +94,10 @@ final class AlterTableArchiveUtils {
return new Path(dir.getParent(), dir.getName() + intermediateDirSuffix);
}
- static void deleteDir(Path dir, Database db, Configuration conf) throws HiveException {
+ static void deleteDir(Path dir, boolean shouldEnableCm, Configuration conf) throws HiveException {
try {
Warehouse wh = new Warehouse(conf);
- wh.deleteDir(dir, true, db);
+ wh.deleteDir(dir, true, false, shouldEnableCm);
} catch (MetaException e) {
throw new HiveException(e);
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableSetLocationOperation.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableSetLocationOperation.java
index 22a29d7..509d577 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableSetLocationOperation.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableSetLocationOperation.java
@@ -43,6 +43,7 @@ public class AlterTableSetLocationOperation extends AbstractAlterTableOperation<
protected void doAlteration(Table table, Partition partition) throws HiveException {
StorageDescriptor sd = getStorageDescriptor(table, partition);
String newLocation = desc.getLocation();
+
try {
URI locUri = new URI(newLocation);
if (!new Path(locUri).isAbsolute()) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableUnarchiveOperation.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableUnarchiveOperation.java
index 4f791a3..39416ed 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableUnarchiveOperation.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableUnarchiveOperation.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.ddl.table.storage;
+import org.apache.hadoop.hive.metastore.ReplChangeManager;
import org.apache.hadoop.hive.ql.ddl.DDLOperationContext;
import org.apache.hadoop.hive.ql.exec.ArchiveUtils;
import org.apache.hadoop.hive.ql.exec.ArchiveUtils.PartSpecInfo;
@@ -282,8 +283,9 @@ public class AlterTableUnarchiveOperation extends DDLOperation<AlterTableUnarchi
private void deleteIntermediateArchivedDir(Table table, Path intermediateArchivedDir) throws HiveException {
if (HdfsUtils.pathExists(intermediateArchivedDir, context.getConf())) {
- AlterTableArchiveUtils.deleteDir(intermediateArchivedDir, context.getDb().getDatabase(table.getDbName()),
- context.getConf());
+ AlterTableArchiveUtils.deleteDir(intermediateArchivedDir,
+ ReplChangeManager.shouldEnableCm(context.getDb().getDatabase(table.getDbName()), table.getTTable()),
+ context.getConf());
}
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 7f061d4..945eafc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -2369,7 +2369,7 @@ public class Hive {
// base_x. (there is Insert Overwrite and Load Data Overwrite)
boolean isAutoPurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge"));
boolean needRecycle = !tbl.isTemporary()
- && ReplChangeManager.isSourceOfReplication(Hive.get().getDatabase(tbl.getDbName()));
+ && ReplChangeManager.shouldEnableCm(Hive.get().getDatabase(tbl.getDbName()), tbl.getTTable());
replaceFiles(tbl.getPath(), loadPath, destPath, oldPartPath, getConf(), isSrcLocal,
isAutoPurge, newFiles, FileUtils.HIDDEN_FILES_PATH_FILTER, needRecycle, isManaged, isInsertOverwrite);
} else {
@@ -3123,7 +3123,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
//for fullAcid we don't want to delete any files even for OVERWRITE see HIVE-14988/HIVE-17361
boolean isAutopurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge"));
boolean needRecycle = !tbl.isTemporary()
- && ReplChangeManager.isSourceOfReplication(Hive.get().getDatabase(tbl.getDbName()));
+ && ReplChangeManager.shouldEnableCm(Hive.get().getDatabase(tbl.getDbName()), tbl.getTTable());
replaceFiles(tblPath, loadPath, destPath, tblPath, conf, isSrcLocal, isAutopurge,
newFiles, FileUtils.HIDDEN_FILES_PATH_FILTER, needRecycle, isManaged, isInsertOverwrite);
} else {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index 7b6ce10..dd97f3d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@ -467,7 +467,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
needRecycle = false;
} else {
org.apache.hadoop.hive.metastore.api.Database db = x.getHive().getDatabase(table.getDbName());
- needRecycle = db != null && ReplChangeManager.isSourceOfReplication(db);
+ needRecycle = db != null && ReplChangeManager.shouldEnableCm(db, table.getTTable());
}
} else {
if (AcidUtils.isTransactionalTable(table) && !replicationSpec.isInReplicationScope()) {
@@ -613,7 +613,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
needRecycle = false;
} else {
org.apache.hadoop.hive.metastore.api.Database db = x.getHive().getDatabase(table.getDbName());
- needRecycle = db != null && ReplChangeManager.isSourceOfReplication(db);
+ needRecycle = db != null && ReplChangeManager.shouldEnableCm(db, table.getTTable());
}
} else {
loadFileType = replicationSpec.isReplace() ?
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
index 9ba2b24..54b616e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
@@ -260,11 +260,11 @@ public class Cleaner extends MetaStoreCompactorThread {
FileSystem fs = filesToDelete.get(0).getFileSystem(conf);
Database db = getMSForConf(conf).getDatabase(getDefaultCatalog(conf), ci.dbname);
- boolean isSourceOfRepl = ReplChangeManager.isSourceOfReplication(db);
+ Table table = getMSForConf(conf).getTable(getDefaultCatalog(conf), ci.dbname, ci.tableName);
for (Path dead : filesToDelete) {
LOG.debug("Going to delete path " + dead.toString());
- if (isSourceOfRepl) {
+ if (ReplChangeManager.shouldEnableCm(db, table)) {
replChangeManager.recycle(dead, ReplChangeManager.RecycleType.MOVE, true);
}
fs.delete(dead, true);
diff --git a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
index 23e7d5e..2eafef0 100644
--- a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
+++ b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
@@ -1242,7 +1242,7 @@ public class Hadoop23Shims extends HadoopShimsSecure {
return (getEncryptionZoneForPath(fullPath) != null);
}
- private EncryptionZone getEncryptionZoneForPath(Path path) throws IOException {
+ public EncryptionZone getEncryptionZoneForPath(Path path) throws IOException {
if (path.getFileSystem(conf).exists(path)) {
return hdfsAdmin.getEncryptionZoneForPath(path);
} else if (!path.getParent().equals(path)) {
diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
index 49a2ab3..f71f5a5 100644
--- a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
+++ b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.protocol.EncryptionZone;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapred.ClusterStatus;
import org.apache.hadoop.mapred.JobConf;
@@ -564,7 +565,7 @@ public interface HadoopShims {
public int comparePathKeyStrength(Path path1, Path path2) throws IOException;
/**
- * create encryption zone by path and keyname
+ * Create encryption zone by path and keyname.
* @param path HDFS path to create encryption zone
* @param keyName keyname
* @throws IOException
@@ -573,6 +574,13 @@ public interface HadoopShims {
public void createEncryptionZone(Path path, String keyName) throws IOException;
/**
+ * Get encryption zone by path.
+ * @param path HDFS path to create encryption zone.
+ * @throws IOException
+ */
+ EncryptionZone getEncryptionZoneForPath(Path path) throws IOException;
+
+ /**
* Creates an encryption key.
*
* @param keyName Name of the key
@@ -625,6 +633,11 @@ public interface HadoopShims {
}
@Override
+ public EncryptionZone getEncryptionZoneForPath(Path path) throws IOException {
+ return null;
+ }
+
+ @Override
public void createKey(String keyName, int bitLength) {
/* not supported */
}
diff --git a/standalone-metastore/metastore-common/pom.xml b/standalone-metastore/metastore-common/pom.xml
index e252f12..81dc6b6 100644
--- a/standalone-metastore/metastore-common/pom.xml
+++ b/standalone-metastore/metastore-common/pom.xml
@@ -256,6 +256,11 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-shims</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
<profiles>
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
index c6acc57..1041d92 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
@@ -19,11 +19,13 @@
package org.apache.hadoop.hive.metastore;
import java.io.IOException;
+import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileChecksum;
@@ -36,11 +38,18 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
import org.apache.hadoop.hive.metastore.utils.FileUtils;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.utils.Retry;
import org.apache.hadoop.hive.metastore.utils.StringUtils;
+import org.apache.hadoop.hive.shims.HadoopShims;
+import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.hive.shims.HadoopShims.HdfsEncryptionShim;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,7 +59,8 @@ public class ReplChangeManager {
private static boolean inited = false;
private static boolean enabled = false;
- private static Path cmroot;
+ private static Map<String, String> encryptionZones = new HashMap<>();
+ private static HadoopShims hadoopShims = ShimLoader.getHadoopShims();
private static Configuration conf;
private String msUser;
private String msGroup;
@@ -61,6 +71,10 @@ public class ReplChangeManager {
public static final String SOURCE_OF_REPLICATION = "repl.source.for";
private static final String TXN_WRITE_EVENT_FILE_SEPARATOR = "]";
static final String CM_THREAD_NAME_PREFIX = "cmclearer-";
+ private static final String NO_ENCRYPTION = "noEncryption";
+ private static String cmRootDir;
+ private static String encryptedCmRootDir;
+ private static String fallbackNonEncryptedCmRootDir;
public enum RecycleType {
MOVE,
@@ -138,14 +152,27 @@ public class ReplChangeManager {
if (!inited) {
if (MetastoreConf.getBoolVar(conf, ConfVars.REPLCMENABLED)) {
ReplChangeManager.enabled = true;
- ReplChangeManager.cmroot = new Path(MetastoreConf.getVar(conf, ConfVars.REPLCMDIR));
ReplChangeManager.conf = conf;
-
- FileSystem cmFs = cmroot.getFileSystem(conf);
- // Create cmroot with permission 700 if not exist
- if (!cmFs.exists(cmroot)) {
- cmFs.mkdirs(cmroot);
- cmFs.setPermission(cmroot, new FsPermission("700"));
+ cmRootDir = MetastoreConf.getVar(conf, ConfVars.REPLCMDIR);
+ encryptedCmRootDir = MetastoreConf.getVar(conf, ConfVars.REPLCMENCRYPTEDDIR);
+ fallbackNonEncryptedCmRootDir = MetastoreConf.getVar(conf, ConfVars.REPLCMFALLBACKNONENCRYPTEDDIR);
+ //Create default cm root
+ Path cmroot = new Path(cmRootDir);
+ createCmRoot(cmroot);
+ FileSystem cmRootFs = cmroot.getFileSystem(conf);
+ HdfsEncryptionShim pathEncryptionShim = hadoopShims
+ .createHdfsEncryptionShim(cmRootFs, conf);
+ Path cmRootEncrypted = new Path(encryptedCmRootDir);
+ if (cmRootEncrypted.isAbsolute()) {
+ throw new MetaException(ConfVars.REPLCMENCRYPTEDDIR.getHiveName() + " should be a relative path");
+ }
+ if (pathEncryptionShim.isPathEncrypted(cmroot)) {
+ //If cm root is encrypted we keep using it for the encryption zone
+ String encryptionZonePath = cmRootFs.getUri()
+ + pathEncryptionShim.getEncryptionZoneForPath(cmroot).getPath();
+ encryptionZones.put(encryptionZonePath, cmRootDir);
+ } else {
+ encryptionZones.put(NO_ENCRYPTION, cmRootDir);
}
UserGroupInformation usergroupInfo = UserGroupInformation.getCurrentUser();
msUser = usergroupInfo.getShortUserName();
@@ -194,7 +221,7 @@ public class ReplChangeManager {
}
} else {
String fileCheckSum = checksumFor(path, fs);
- Path cmPath = getCMPath(conf, path.getName(), fileCheckSum, cmroot.toString());
+ Path cmPath = getCMPath(conf, path.getName(), fileCheckSum, getCmRoot(path).toString());
// set timestamp before moving to cmroot, so we can
// avoid race condition CM remove the file before setting
@@ -213,9 +240,18 @@ public class ReplChangeManager {
switch (type) {
case MOVE: {
LOG.info("Moving {} to {}", path.toString(), cmPath.toString());
-
// Rename fails if the file with same name already exist.
- success = fs.rename(path, cmPath);
+ Retry<Boolean> retriable = new Retry<Boolean>(IOException.class) {
+ @Override
+ public Boolean execute() throws IOException {
+ return fs.rename(path, cmPath);
+ }
+ };
+ try {
+ success = retriable.run();
+ } catch (Exception e) {
+ throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
+ }
break;
}
case COPY: {
@@ -361,9 +397,10 @@ public class ReplChangeManager {
throw new IllegalStateException("Uninitialized ReplChangeManager instance.");
}
String encodedUri = fileUriStr;
- if ((fileChecksum != null) && (cmroot != null)) {
+ Path cmRoot = getCmRoot(new Path(fileUriStr));
+ if ((fileChecksum != null) && (cmRoot != null)) {
encodedUri = encodedUri + URI_FRAGMENT_SEPARATOR + fileChecksum
- + URI_FRAGMENT_SEPARATOR + FileUtils.makeQualified(cmroot, conf);
+ + URI_FRAGMENT_SEPARATOR + FileUtils.makeQualified(cmRoot, conf);
} else {
encodedUri = encodedUri + URI_FRAGMENT_SEPARATOR + URI_FRAGMENT_SEPARATOR;
}
@@ -404,12 +441,12 @@ public class ReplChangeManager {
* Thread to clear old files of cmroot recursively
*/
static class CMClearer implements Runnable {
- private Path cmroot;
+ private Map<String, String> encryptionZones;
private long secRetain;
private Configuration conf;
- CMClearer(String cmrootString, long secRetain, Configuration conf) {
- this.cmroot = new Path(cmrootString);
+ CMClearer(Map<String, String> encryptionZones, long secRetain, Configuration conf) {
+ this.encryptionZones = encryptionZones;
this.secRetain = secRetain;
this.conf = conf;
}
@@ -418,32 +455,34 @@ public class ReplChangeManager {
public void run() {
try {
LOG.info("CMClearer started");
-
- long now = System.currentTimeMillis();
- FileSystem fs = cmroot.getFileSystem(conf);
- FileStatus[] files = fs.listStatus(cmroot);
-
- for (FileStatus file : files) {
- long modifiedTime = file.getModificationTime();
- if (now - modifiedTime > secRetain*1000) {
- try {
- if (fs.getXAttrs(file.getPath()).containsKey(REMAIN_IN_TRASH_TAG)) {
- boolean succ = Trash.moveToAppropriateTrash(fs, file.getPath(), conf);
- if (succ) {
- LOG.debug("Move " + file.toString() + " to trash");
+ for (String cmrootString : encryptionZones.values()) {
+ Path cmroot = new Path(cmrootString);
+ long now = System.currentTimeMillis();
+ FileSystem fs = cmroot.getFileSystem(conf);
+ FileStatus[] files = fs.listStatus(cmroot);
+
+ for (FileStatus file : files) {
+ long modifiedTime = file.getModificationTime();
+ if (now - modifiedTime > secRetain * 1000) {
+ try {
+ if (fs.getXAttrs(file.getPath()).containsKey(REMAIN_IN_TRASH_TAG)) {
+ boolean succ = Trash.moveToAppropriateTrash(fs, file.getPath(), conf);
+ if (succ) {
+ LOG.debug("Move " + file.toString() + " to trash");
+ } else {
+ LOG.warn("Fail to move " + file.toString() + " to trash");
+ }
} else {
- LOG.warn("Fail to move " + file.toString() + " to trash");
- }
- } else {
- boolean succ = fs.delete(file.getPath(), false);
- if (succ) {
- LOG.debug("Remove " + file.toString());
- } else {
- LOG.warn("Fail to remove " + file.toString());
+ boolean succ = fs.delete(file.getPath(), false);
+ if (succ) {
+ LOG.debug("Remove " + file.toString());
+ } else {
+ LOG.warn("Fail to remove " + file.toString());
+ }
}
+ } catch (UnsupportedOperationException e) {
+ LOG.warn("Error getting xattr for " + file.getPath().toString());
}
- } catch (UnsupportedOperationException e) {
- LOG.warn("Error getting xattr for " + file.getPath().toString());
}
}
}
@@ -461,12 +500,17 @@ public class ReplChangeManager {
.namingPattern(CM_THREAD_NAME_PREFIX + "%d")
.daemon(true)
.build());
- executor.scheduleAtFixedRate(new CMClearer(MetastoreConf.getVar(conf, ConfVars.REPLCMDIR),
- MetastoreConf.getTimeVar(conf, ConfVars.REPLCMRETIAN, TimeUnit.SECONDS), conf),
- 0, MetastoreConf.getTimeVar(conf, ConfVars.REPLCMINTERVAL, TimeUnit.SECONDS), TimeUnit.SECONDS);
+ executor.scheduleAtFixedRate(new CMClearer(encryptionZones,
+ MetastoreConf.getTimeVar(conf, ConfVars.REPLCMRETIAN, TimeUnit.SECONDS), conf),
+ 0, MetastoreConf.getTimeVar(conf, ConfVars.REPLCMINTERVAL, TimeUnit.SECONDS), TimeUnit.SECONDS);
}
}
+ public static boolean shouldEnableCm(Database db, Table table) {
+ assert (table != null);
+ return isSourceOfReplication(db) && !MetaStoreUtils.isExternalTable(table);
+ }
+
public static boolean isSourceOfReplication(Database db) {
assert (db != null);
String replPolicyIds = getReplPolicyIdString(db);
@@ -493,4 +537,63 @@ public class ReplChangeManager {
public static String[] getListFromSeparatedString(String commaSeparatedString) {
return commaSeparatedString.split("\\s*" + TXN_WRITE_EVENT_FILE_SEPARATOR + "\\s*");
}
+
+ @VisibleForTesting
+ static Path getCmRoot(Path path) throws IOException {
+ Path cmroot = null;
+ //Default path if hive.repl.cm dir is encrypted
+ String cmrootDir = fallbackNonEncryptedCmRootDir;
+ String encryptionZonePath = NO_ENCRYPTION;
+ if (enabled) {
+ HdfsEncryptionShim pathEncryptionShim = hadoopShims.createHdfsEncryptionShim(path.getFileSystem(conf), conf);
+ if (pathEncryptionShim.isPathEncrypted(path)) {
+ encryptionZonePath = path.getFileSystem(conf).getUri()
+ + pathEncryptionShim.getEncryptionZoneForPath(path).getPath();
+ //For encryption zone, create cm at the relative path specified by hive.repl.cm.encryptionzone.rootdir
+ //at the root of the encryption zone
+ cmrootDir = encryptionZonePath + Path.SEPARATOR + encryptedCmRootDir;
+ }
+ if (encryptionZones.containsKey(encryptionZonePath)) {
+ cmroot = new Path(encryptionZones.get(encryptionZonePath));
+ } else {
+ cmroot = new Path(cmrootDir);
+ synchronized (instance) {
+ if (!encryptionZones.containsKey(encryptionZonePath)) {
+ createCmRoot(cmroot);
+ encryptionZones.put(encryptionZonePath, cmrootDir);
+ }
+ }
+ }
+ }
+ return cmroot;
+ }
+
+ private static void createCmRoot(Path cmroot) throws IOException {
+ FileSystem cmFs = cmroot.getFileSystem(conf);
+ // Create cmroot with permission 700 if not exist
+ if (!cmFs.exists(cmroot)) {
+ cmFs.mkdirs(cmroot);
+ cmFs.setPermission(cmroot, new FsPermission("700"));
+ }
+ }
+
+ @VisibleForTesting
+ static void resetReplChangeManagerInstance() {
+ inited = false;
+ enabled = false;
+ instance = null;
+ encryptionZones.clear();
+ }
+
+ public static final PathFilter CMROOT_PATH_FILTER = new PathFilter() {
+ @Override
+ public boolean accept(Path p) {
+ if (enabled) {
+ String name = p.getName();
+ return !name.contains(cmRootDir) && !name.contains(encryptedCmRootDir)
+ && !name.contains(fallbackNonEncryptedCmRootDir);
+ }
+ return true;
+ }
+ };
}
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
index bc87e8f..2aeb374 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
@@ -917,6 +917,11 @@ public class MetastoreConf {
"This class is used to store and retrieval of raw metadata objects such as table, database"),
REPLCMDIR("metastore.repl.cmrootdir", "hive.repl.cmrootdir", "/user/${system:user.name}/cmroot/",
"Root dir for ChangeManager, used for deleted files."),
+ REPLCMENCRYPTEDDIR("metastore.repl.cm.encryptionzone.rootdir", "hive.repl.cm.encryptionzone.rootdir", ".cmroot",
+ "Root dir for ChangeManager if encryption zones are enabled, used for deleted files."),
+ REPLCMFALLBACKNONENCRYPTEDDIR("metastore.repl.cm.nonencryptionzone.rootdir",
+ "hive.repl.cm.nonencryptionzone.rootdir", "/user/${system:user.name}/cmroot/",
+ "Root dir for ChangeManager for non encrypted paths if hive.repl.cmrootdir is encrypted."),
REPLCMRETIAN("metastore.repl.cm.retain", "hive.repl.cm.retain", 24, TimeUnit.HOURS,
"Time to retain removed files in cmrootdir."),
REPLCMINTERVAL("metastore.repl.cm.interval", "hive.repl.cm.interval", 3600, TimeUnit.SECONDS,
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/Retry.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/Retry.java
new file mode 100644
index 0000000..bdb269a
--- /dev/null
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/Retry.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.utils;
+
+/**
+ * Class to implement any retry logic in case of exceptions.
+ */
+public abstract class Retry<T> {
+
+ public static final int MAX_RETRIES = 3;
+ private int tries = 0;
+ private Class retryExceptionType;
+
+ public Retry(Class exceptionClassType) {
+ this.retryExceptionType = exceptionClassType;
+ }
+
+ public abstract T execute() throws Exception;
+
+ public T run() throws Exception {
+ try {
+ return execute();
+ } catch(Exception e) {
+ if (e.getClass().equals(retryExceptionType)){
+ tries++;
+ if (MAX_RETRIES == tries) {
+ throw e;
+ } else {
+ return run();
+ }
+ } else {
+ throw e;
+ }
+ }
+ }
+}
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/package-info.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/package-info.java
new file mode 100644
index 0000000..2eb51c8
--- /dev/null
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package consisting the utility methods for metastore.
+ */
+package org.apache.hadoop.hive.metastore.utils;
diff --git a/standalone-metastore/metastore-common/src/test/java/org/apache/hadoop/hive/metastore/utils/RetryTest.java b/standalone-metastore/metastore-common/src/test/java/org/apache/hadoop/hive/metastore/utils/RetryTest.java
new file mode 100644
index 0000000..67bd658
--- /dev/null
+++ b/standalone-metastore/metastore-common/src/test/java/org/apache/hadoop/hive/metastore/utils/RetryTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.utils;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests for retriable interface.
+ */
+public class RetryTest {
+ @Test
+ public void testRetrySuccess() {
+ Retry<Void> retriable = new Retry<Void>(NullPointerException.class) {
+ @Override
+ public Void execute() {
+ throw new NullPointerException();
+ }
+ };
+ try {
+ retriable.run();
+ } catch (Exception e) {
+ Assert.assertEquals(NullPointerException.class, e.getClass());
+ }
+ }
+
+ @Test
+ public void testRetryFailure() {
+ Retry<Void> retriable = new Retry<Void>(NullPointerException.class) {
+ @Override
+ public Void execute() {
+ throw new RuntimeException();
+ }
+ };
+ try {
+ retriable.run();
+ } catch (Exception e) {
+ Assert.assertEquals(RuntimeException.class, e.getClass());
+ }
+ }
+}
diff --git a/standalone-metastore/metastore-common/src/test/java/org/apache/hadoop/hive/metastore/utils/package-info.java b/standalone-metastore/metastore-common/src/test/java/org/apache/hadoop/hive/metastore/utils/package-info.java
new file mode 100644
index 0000000..2eb51c8
--- /dev/null
+++ b/standalone-metastore/metastore-common/src/test/java/org/apache/hadoop/hive/metastore/utils/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package consisting the utility methods for metastore.
+ */
+package org.apache.hadoop.hive.metastore.utils;
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java
index dda407a..8d77ffe 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java
@@ -272,7 +272,7 @@ public class HiveAlterHandler implements AlterHandler {
}
// check that src exists and also checks permissions necessary, rename src to dest
if (srcFs.exists(srcPath) && wh.renameDir(srcPath, destPath,
- ReplChangeManager.isSourceOfReplication(olddb))) {
+ ReplChangeManager.shouldEnableCm(olddb, oldt))) {
dataWasMoved = true;
}
} catch (IOException | MetaException e) {
@@ -428,7 +428,8 @@ public class HiveAlterHandler implements AlterHandler {
Path deleteOldDataLoc = new Path(oldt.getSd().getLocation());
boolean isAutoPurge = "true".equalsIgnoreCase(oldt.getParameters().get("auto.purge"));
try {
- wh.deleteDir(deleteOldDataLoc, true, isAutoPurge, olddb);
+ wh.deleteDir(deleteOldDataLoc, true, isAutoPurge,
+ ReplChangeManager.shouldEnableCm(olddb, oldt));
LOG.info("Deleted the old data location: {} for the table: {}",
deleteOldDataLoc, dbname + "." + name);
} catch (MetaException ex) {
@@ -651,7 +652,7 @@ public class HiveAlterHandler implements AlterHandler {
}
//rename the data directory
- wh.renameDir(srcPath, destPath, ReplChangeManager.isSourceOfReplication(db));
+ wh.renameDir(srcPath, destPath, ReplChangeManager.shouldEnableCm(db, tbl));
LOG.info("Partition directory rename from " + srcPath + " to " + destPath + " done.");
dataWasMoved = true;
}
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index b8418c5..ee9f988 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -2228,7 +2228,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
if (!success) {
ms.rollbackTransaction();
if (madeDir) {
- wh.deleteDir(tblPath, true, db);
+ wh.deleteDir(tblPath, true, false, ReplChangeManager.shouldEnableCm(db, tbl));
}
}
@@ -2806,10 +2806,9 @@ public class HiveMetaStore extends ThriftHiveMetastore {
} else if (tableDataShouldBeDeleted) {
// Data needs deletion. Check if trash may be skipped.
// Delete the data in the partitions which have other locations
- deletePartitionData(partPaths, ifPurge, db);
+ deletePartitionData(partPaths, ifPurge, ReplChangeManager.shouldEnableCm(db, tbl));
// Delete the data in the table
- deleteTableData(tblPath, ifPurge, db);
- // ok even if the data is not deleted
+ deleteTableData(tblPath, ifPurge, ReplChangeManager.shouldEnableCm(db, tbl));
}
if (!listeners.isEmpty()) {
@@ -2837,16 +2836,49 @@ public class HiveMetaStore extends ThriftHiveMetastore {
* @param tablePath
* @param ifPurge completely purge the table (skipping trash) while removing
* data from warehouse
- * @param db database the table belongs to
+ * @param shouldEnableCm If cm should be enabled
*/
- private void deleteTableData(Path tablePath, boolean ifPurge, Database db) {
+ private void deleteTableData(Path tablePath, boolean ifPurge, boolean shouldEnableCm) {
+ if (tablePath != null) {
+ try {
+ if (shouldEnableCm) {
+ //Don't delete cmdir if its inside the table path
+ FileStatus[] statuses = tablePath.getFileSystem(conf).listStatus(tablePath,
+ ReplChangeManager.CMROOT_PATH_FILTER);
+ for (final FileStatus status : statuses) {
+ wh.deleteDir(status.getPath(), true, ifPurge, shouldEnableCm);
+ }
+ //Check if table directory is empty, delete it
+ FileStatus[] statusWithoutFilter = tablePath.getFileSystem(conf).listStatus(tablePath);
+ if (statusWithoutFilter.length == 0) {
+ wh.deleteDir(tablePath, true, ifPurge, shouldEnableCm);
+ }
+ } else {
+ //If no cm delete the complete table directory
+ wh.deleteDir(tablePath, true, ifPurge, shouldEnableCm);
+ }
+ } catch (Exception e) {
+ LOG.error("Failed to delete table directory: " + tablePath +
+ " " + e.getMessage());
+ }
+ }
+ }
+ /**
+ * Deletes the data in a table's location, if it fails logs an error.
+ *
+ * @param tablePath
+ * @param ifPurge completely purge the table (skipping trash) while removing
+ * data from warehouse
+ * @param db Database
+ */
+ private void deleteTableData(Path tablePath, boolean ifPurge, Database db) {
if (tablePath != null) {
try {
wh.deleteDir(tablePath, true, ifPurge, db);
} catch (Exception e) {
LOG.error("Failed to delete table directory: " + tablePath +
- " " + e.getMessage());
+ " " + e.getMessage());
}
}
}
@@ -2858,8 +2890,45 @@ public class HiveMetaStore extends ThriftHiveMetastore {
* @param partPaths
* @param ifPurge completely purge the partition (skipping trash) while
* removing data from warehouse
- * @param db database the partition belongs to
+ * @param shouldEnableCm If cm should be enabled
*/
+ private void deletePartitionData(List<Path> partPaths, boolean ifPurge, boolean shouldEnableCm) {
+ if (partPaths != null && !partPaths.isEmpty()) {
+ for (Path partPath : partPaths) {
+ try {
+ if (shouldEnableCm) {
+ //Don't delete cmdir if its inside the partition path
+ FileStatus[] statuses = partPath.getFileSystem(conf).listStatus(partPath,
+ ReplChangeManager.CMROOT_PATH_FILTER);
+ for (final FileStatus status : statuses) {
+ wh.deleteDir(status.getPath(), true, ifPurge, shouldEnableCm);
+ }
+ //Check if table directory is empty, delete it
+ FileStatus[] statusWithoutFilter = partPath.getFileSystem(conf).listStatus(partPath);
+ if (statusWithoutFilter.length == 0) {
+ wh.deleteDir(partPath, true, ifPurge, shouldEnableCm);
+ }
+ } else {
+ //If no cm delete the complete table directory
+ wh.deleteDir(partPath, true, ifPurge, shouldEnableCm);
+ }
+ } catch (Exception e) {
+ LOG.error("Failed to delete partition directory: " + partPath +
+ " " + e.getMessage());
+ }
+ }
+ }
+ }
+
+ /**
+ * Give a list of partitions' locations, tries to delete each one
+ * and for each that fails logs an error.
+ *
+ * @param partPaths
+ * @param ifPurge completely purge the partition (skipping trash) while
+ * removing data from warehouse
+ * @param db Database
+ */
private void deletePartitionData(List<Path> partPaths, boolean ifPurge, Database db) {
if (partPaths != null && !partPaths.isEmpty()) {
for (Path partPath : partPaths) {
@@ -2867,7 +2936,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
wh.deleteDir(partPath, true, ifPurge, db);
} catch (Exception e) {
LOG.error("Failed to delete partition directory: " + partPath +
- " " + e.getMessage());
+ " " + e.getMessage());
}
}
}
@@ -3125,7 +3194,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
HdfsUtils.HadoopFileStatus status = new HdfsUtils.HadoopFileStatus(getConf(), fs, location);
FileStatus targetStatus = fs.getFileStatus(location);
String targetGroup = targetStatus == null ? null : targetStatus.getGroup();
- wh.deleteDir(location, true, isAutopurge, db);
+ wh.deleteDir(location, true, isAutopurge, ReplChangeManager.shouldEnableCm(db, tbl));
fs.mkdirs(location);
HdfsUtils.setFullFileStatus(getConf(), status, targetGroup, fs, location, false);
} else {
@@ -3134,7 +3203,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
continue;
}
for (final FileStatus status : statuses) {
- wh.deleteDir(status.getPath(), true, isAutopurge, db);
+ wh.deleteDir(status.getPath(), true, isAutopurge, ReplChangeManager.shouldEnableCm(db, tbl));
}
}
}
@@ -3619,7 +3688,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
if (!success) {
ms.rollbackTransaction();
if (madeDir) {
- wh.deleteDir(partLocation, true, db);
+ wh.deleteDir(partLocation, true, false, ReplChangeManager.shouldEnableCm(db, tbl));
}
}
@@ -4352,8 +4421,8 @@ public class HiveMetaStore extends ThriftHiveMetastore {
success = ms.addPartition(part);
} finally {
if (!success && madeDir) {
- wh.deleteDir(new Path(part.getSd().getLocation()), true,
- ms.getDatabase(tbl.getCatName(), tbl.getDbName()));
+ wh.deleteDir(new Path(part.getSd().getLocation()), true, false,
+ ReplChangeManager.shouldEnableCm(ms.getDatabase(part.getCatName(), part.getDbName()), tbl));
}
}
@@ -4623,7 +4692,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
Path archiveParentDir = null;
boolean mustPurge = false;
boolean tableDataShouldBeDeleted = false;
- boolean isSourceOfReplication = false;
+ boolean needsCm = false;
Map<String, String> transactionalListenerResponses = Collections.emptyMap();
if (db_name == null) {
@@ -4671,7 +4740,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
new DropPartitionEvent(tbl, part, true, deleteData, this),
envContext);
}
- isSourceOfReplication = ReplChangeManager.isSourceOfReplication(ms.getDatabase(catName, db_name));
+ needsCm = ReplChangeManager.shouldEnableCm(ms.getDatabase(catName, db_name), tbl);
success = ms.commitTransaction();
}
} finally {
@@ -4690,11 +4759,11 @@ public class HiveMetaStore extends ThriftHiveMetastore {
if (isArchived) {
assert (archiveParentDir != null);
- wh.deleteDir(archiveParentDir, true, mustPurge, isSourceOfReplication);
+ wh.deleteDir(archiveParentDir, true, mustPurge, needsCm);
} else {
assert (partPath != null);
- wh.deleteDir(partPath, true, mustPurge, isSourceOfReplication);
- deleteParentRecursive(partPath.getParent(), part_vals.size() - 1, mustPurge, isSourceOfReplication);
+ wh.deleteDir(partPath, true, mustPurge, needsCm);
+ deleteParentRecursive(partPath.getParent(), part_vals.size() - 1, mustPurge, needsCm);
}
// ok even if the data is not deleted
}
@@ -4770,7 +4839,8 @@ public class HiveMetaStore extends ThriftHiveMetastore {
List<Partition> parts = null;
boolean mustPurge = false;
List<Map<String, String>> transactionalListenerResponses = Lists.newArrayList();
- boolean isSourceOfReplication = ReplChangeManager.isSourceOfReplication(ms.getDatabase(catName, dbName));
+ boolean needsCm = ReplChangeManager.shouldEnableCm(ms.getDatabase(catName, dbName),
+ ms.getTable(catName, dbName, tblName));
try {
// We need Partition-s for firing events and for result; DN needs MPartition-s to drop.
@@ -4878,12 +4948,12 @@ public class HiveMetaStore extends ThriftHiveMetastore {
// Archived partitions have har:/to_har_file as their location.
// The original directory was saved in params
for (Path path : archToDelete) {
- wh.deleteDir(path, true, mustPurge, isSourceOfReplication);
+ wh.deleteDir(path, true, mustPurge, needsCm);
}
for (PathAndPartValSize p : dirsToDelete) {
- wh.deleteDir(p.path, true, mustPurge, isSourceOfReplication);
+ wh.deleteDir(p.path, true, mustPurge, needsCm);
try {
- deleteParentRecursive(p.path.getParent(), p.partValSize - 1, mustPurge, isSourceOfReplication);
+ deleteParentRecursive(p.path.getParent(), p.partValSize - 1, mustPurge, needsCm);
} catch (IOException ex) {
LOG.warn("Error from deleteParentRecursive", ex);
throw new MetaException("Failed to delete parent: " + ex.getMessage());
@@ -7785,7 +7855,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
if (func == null) {
throw new NoSuchObjectException("Function " + funcName + " does not exist");
}
- Boolean isSourceOfReplication =
+ Boolean needsCm =
ReplChangeManager.isSourceOfReplication(get_database_core(parsedDbName[CAT_NAME], parsedDbName[DB_NAME]));
// if copy of jar to change management fails we fail the metastore transaction, since the
@@ -7793,7 +7863,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
// a copy is required to allow incremental replication to work correctly.
if (func.getResourceUris() != null && !func.getResourceUris().isEmpty()) {
for (ResourceUri uri : func.getResourceUris()) {
- if (uri.getUri().toLowerCase().startsWith("hdfs:") && isSourceOfReplication) {
+ if (uri.getUri().toLowerCase().startsWith("hdfs:") && needsCm) {
wh.addToChangeManagement(new Path(uri.getUri()));
}
}