You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ma...@apache.org on 2020/02/07 01:57:52 UTC

[hive] branch master updated: HIVE-22736 : Support replication across multiple encryption zones. (Aasha Medhi, reviewed by Mahesh Kumar Behera)

This is an automated email from the ASF dual-hosted git repository.

mahesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 7411d42  HIVE-22736 : Support replication across multiple encryption zones. (Aasha Medhi, reviewed by Mahesh Kumar Behera)
7411d42 is described below

commit 7411d42579ffa0bad96e8da731a1a35afc9ff614
Author: Aasha Medhi <aa...@gmail.com>
AuthorDate: Fri Feb 7 07:16:24 2020 +0530

    HIVE-22736 : Support replication across multiple encryption zones. (Aasha Medhi, reviewed by Mahesh Kumar Behera)
    
    Signed-off-by: Mahesh Kumar Behera <ma...@apache.org>
---
 .../java/org/apache/hadoop/hive/conf/HiveConf.java |    5 +
 .../TestMetaStoreMultipleEncryptionZones.java      | 1434 ++++++++++++++++++++
 .../hive/metastore/TestReplChangeManager.java      |    1 +
 .../cache/TestCachedStoreUpdateUsingEvents.java    |    2 +
 .../hive/ql/metadata/TestAlterTableMetadata.java   |    1 +
 .../java/org/apache/hive/jdbc/TestJdbcDriver2.java |    4 +
 .../org/apache/hive/jdbc/TestJdbcWithMiniHS2.java  |    1 +
 .../ql/ddl/table/create/CreateTableOperation.java  |    1 -
 .../table/storage/AlterTableArchiveOperation.java  |    4 +-
 .../ddl/table/storage/AlterTableArchiveUtils.java  |    5 +-
 .../storage/AlterTableSetLocationOperation.java    |    1 +
 .../storage/AlterTableUnarchiveOperation.java      |    6 +-
 .../org/apache/hadoop/hive/ql/metadata/Hive.java   |    4 +-
 .../hive/ql/parse/ImportSemanticAnalyzer.java      |    4 +-
 .../hadoop/hive/ql/txn/compactor/Cleaner.java      |    4 +-
 .../apache/hadoop/hive/shims/Hadoop23Shims.java    |    2 +-
 .../org/apache/hadoop/hive/shims/HadoopShims.java  |   15 +-
 standalone-metastore/metastore-common/pom.xml      |    5 +
 .../hadoop/hive/metastore/ReplChangeManager.java   |  187 ++-
 .../hadoop/hive/metastore/conf/MetastoreConf.java  |    5 +
 .../apache/hadoop/hive/metastore/utils/Retry.java  |   52 +
 .../hadoop/hive/metastore/utils/package-info.java  |   22 +
 .../hadoop/hive/metastore/utils/RetryTest.java     |   57 +
 .../hadoop/hive/metastore/utils/package-info.java  |   22 +
 .../hadoop/hive/metastore/HiveAlterHandler.java    |    7 +-
 .../hadoop/hive/metastore/HiveMetaStore.java       |  120 +-
 26 files changed, 1886 insertions(+), 85 deletions(-)

diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 12a022c..a120b45 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -452,6 +452,11 @@ public class HiveConf extends Configuration {
     REPLCMRETIAN("hive.repl.cm.retain","24h",
         new TimeValidator(TimeUnit.HOURS),
         "Time to retain removed files in cmrootdir."),
+    REPLCMENCRYPTEDDIR("hive.repl.cm.encryptionzone.rootdir", ".cmroot",
+            "Root dir for ChangeManager if encryption zones are enabled, used for deleted files."),
+    REPLCMFALLBACKNONENCRYPTEDDIR("hive.repl.cm.nonencryptionzone.rootdir",
+            "/user/${system:user.name}/cmroot/",
+            "Root dir for ChangeManager for non encrypted paths if hive.repl.cmrootdir is encrypted."),
     REPLCMINTERVAL("hive.repl.cm.interval","3600s",
         new TimeValidator(TimeUnit.SECONDS),
         "Inteval for cmroot cleanup thread."),
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMultipleEncryptionZones.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMultipleEncryptionZones.java
new file mode 100644
index 0000000..51bb787
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMultipleEncryptionZones.java
@@ -0,0 +1,1434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.ReplChangeManager.RecycleType;
+
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.hive.shims.HadoopShims;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.hive.metastore.client.builder.DatabaseBuilder;
+import org.apache.hadoop.hive.metastore.client.builder.TableBuilder;
+import org.apache.hadoop.hive.metastore.api.Type;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.thrift.TException;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * TestMetaStoreAuthorization.
+ */
+public class TestMetaStoreMultipleEncryptionZones {
+  private static HiveMetaStoreClient client;
+  private static HiveConf hiveConf;
+  private static Configuration conf;
+  private static Warehouse warehouse;
+  private static FileSystem warehouseFs;
+  private static MiniDFSCluster miniDFSCluster;
+  private static String cmroot;
+  private static FileSystem fs;
+  private static HadoopShims.HdfsEncryptionShim shimCm;
+  private static String cmrootEncrypted;
+  private static String jksFile = System.getProperty("java.io.tmpdir") + "/test.jks";
+  private static String cmrootFallBack;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    //Create secure cluster
+    conf = new Configuration();
+    conf.set("hadoop.security.key.provider.path", "jceks://file" + jksFile);
+    miniDFSCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build();
+    DFSTestUtil.createKey("test_key_cm", miniDFSCluster, conf);
+    DFSTestUtil.createKey("test_key_db", miniDFSCluster, conf);
+    hiveConf = new HiveConf(TestReplChangeManager.class);
+    hiveConf.setBoolean(HiveConf.ConfVars.REPLCMENABLED.varname, true);
+    hiveConf.setInt(CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY, 60);
+    hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname,
+            "hdfs://" + miniDFSCluster.getNameNode().getHostAndPort()
+                    + HiveConf.ConfVars.METASTOREWAREHOUSE.defaultStrVal);
+
+    cmroot = "hdfs://" + miniDFSCluster.getNameNode().getHostAndPort() + "/cmroot";
+    cmrootFallBack = "hdfs://" + miniDFSCluster.getNameNode().getHostAndPort() + "/cmrootFallback";
+    cmrootEncrypted = "cmrootEncrypted";
+    hiveConf.set(HiveConf.ConfVars.REPLCMDIR.varname, cmroot);
+    hiveConf.set(HiveConf.ConfVars.REPLCMENCRYPTEDDIR.varname, cmrootEncrypted);
+    hiveConf.set(HiveConf.ConfVars.REPLCMFALLBACKNONENCRYPTEDDIR.varname, cmrootFallBack);
+    initReplChangeManager();
+    //Create cm in encrypted zone
+    shimCm = ShimLoader.getHadoopShims().createHdfsEncryptionShim(fs, conf);
+
+    try {
+      client = new HiveMetaStoreClient(hiveConf);
+    } catch (Throwable e) {
+      System.err.println("Unable to open the metastore");
+      System.err.println(StringUtils.stringifyException(e));
+      throw e;
+    }
+  }
+
+  private static void initReplChangeManager() throws Exception{
+    warehouse = new Warehouse(hiveConf);
+    warehouseFs = warehouse.getWhRoot().getFileSystem(hiveConf);
+    fs = new Path(cmroot).getFileSystem(hiveConf);
+    fs.mkdirs(warehouse.getWhRoot());
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    try {
+      miniDFSCluster.shutdown();
+      client.close();
+    } catch (Throwable e) {
+      System.err.println("Unable to close metastore");
+      System.err.println(StringUtils.stringifyException(e));
+      throw e;
+    }
+  }
+
+  @Test
+  public void dropTableWithDifferentEncryptionZonesDifferentKey() throws Throwable {
+    String dbName1 = "encrdbdiffkey1";
+    String dbName2 = "encrdbdiffkey2";
+    String tblName1 = "encrtbl1";
+    String tblName2 = "encrtbl2";
+    String typeName = "Person";
+
+    silentDropDatabase(dbName1);
+    silentDropDatabase(dbName2);
+    new DatabaseBuilder()
+            .setName(dbName1)
+            .addParam("repl.source.for", "1, 2, 3")
+            .create(client, hiveConf);
+
+    new DatabaseBuilder()
+            .setName(dbName2)
+            .addParam("repl.source.for", "1, 2, 3")
+            .create(client, hiveConf);
+
+    client.dropType(typeName);
+    Type typ1 = new Type();
+    typ1.setName(typeName);
+    typ1.setFields(new ArrayList<>(2));
+    typ1.getFields().add(
+            new FieldSchema("name", ColumnType.STRING_TYPE_NAME, ""));
+    typ1.getFields().add(
+            new FieldSchema("income", ColumnType.INT_TYPE_NAME, ""));
+    client.createType(typ1);
+
+    Path dirDb1 = new Path(warehouse.getWhRoot(), dbName1 +".db");
+    warehouseFs.delete(dirDb1, true);
+    warehouseFs.mkdirs(dirDb1);
+    shimCm.createEncryptionZone(dirDb1, "test_key_db");
+    Path dirTbl1 = new Path(dirDb1, tblName1);
+    warehouseFs.mkdirs(dirTbl1);
+    Path part11 = new Path(dirTbl1, "part1");
+    createFile(part11, "testClearer11");
+
+    Path dirDb2 = new Path(warehouse.getWhRoot(), dbName2 +".db");
+    warehouseFs.delete(dirDb2, true);
+    warehouseFs.mkdirs(dirDb2);
+    shimCm.createEncryptionZone(dirDb2, "test_key_cm");
+    Path dirTbl2 = new Path(dirDb2, tblName2);
+    warehouseFs.mkdirs(dirTbl2);
+    Path part12 = new Path(dirTbl2, "part1");
+    createFile(part12, "testClearer12");
+
+    new TableBuilder()
+            .setDbName(dbName1)
+            .setTableName(tblName1)
+            .setCols(typ1.getFields())
+            .setNumBuckets(1)
+            .addBucketCol("name")
+            .addStorageDescriptorParam("test_param_1", "Use this for comments etc")
+            .create(client, hiveConf);
+
+    Table tbl = client.getTable(dbName1, tblName1);
+    Assert.assertNotNull(tbl);
+
+    new TableBuilder()
+            .setDbName(dbName2)
+            .setTableName(tblName2)
+            .setCols(typ1.getFields())
+            .setNumBuckets(1)
+            .addBucketCol("name")
+            .addStorageDescriptorParam("test_param_1", "Use this for comments etc")
+            .create(client, hiveConf);
+
+    boolean exceptionThrown = false;
+    try {
+      client.dropTable(dbName1, tblName1);
+    } catch (MetaException e) {
+      exceptionThrown = true;
+      assertTrue(e.getMessage().contains("can't be moved from encryption zone"));
+    }
+    assertFalse(exceptionThrown);
+    assertFalse(warehouseFs.exists(part11));
+    try {
+      client.getTable(dbName1, tblName1);
+    } catch (NoSuchObjectException e) {
+      exceptionThrown = true;
+    }
+    assertTrue(exceptionThrown);
+    exceptionThrown = false;
+    try {
+      client.dropTable(dbName2, tblName2);
+    } catch (MetaException e) {
+      exceptionThrown = true;
+      assertTrue(e.getMessage().contains("can't be moved from encryption zone"));
+    }
+    assertFalse(exceptionThrown);
+    assertFalse(warehouseFs.exists(part12));
+    try {
+      client.getTable(dbName2, tblName2);
+    } catch (NoSuchObjectException e) {
+      exceptionThrown = true;
+    }
+    assertTrue(exceptionThrown);
+
+
+  }
+
+  @Test
+  public void dropTableWithTableAtEncryptionZoneRoot() throws Throwable {
+    String dbName = "encrdbroot";
+    String tblName1 = "encrtbl1";
+    String tblName2 = "encrtbl2";
+    String typeName = "Person";
+
+    silentDropDatabase(dbName);
+    new DatabaseBuilder()
+            .setName(dbName)
+            .addParam("repl.source.for", "1, 2, 3")
+            .create(client, hiveConf);
+
+    client.dropType(typeName);
+    Type typ1 = new Type();
+    typ1.setName(typeName);
+    typ1.setFields(new ArrayList<>(2));
+    typ1.getFields().add(
+            new FieldSchema("name", ColumnType.STRING_TYPE_NAME, ""));
+    typ1.getFields().add(
+            new FieldSchema("income", ColumnType.INT_TYPE_NAME, ""));
+    client.createType(typ1);
+
+    new TableBuilder()
+            .setDbName(dbName)
+            .setTableName(tblName1)
+            .setCols(typ1.getFields())
+            .setNumBuckets(1)
+            .addBucketCol("name")
+            .addStorageDescriptorParam("test_param_1", "Use this for comments etc")
+            .create(client, hiveConf);
+
+    Table tbl = client.getTable(dbName, tblName1);
+    Assert.assertNotNull(tbl);
+
+    new TableBuilder()
+            .setDbName(dbName)
+            .setTableName(tblName2)
+            .setCols(typ1.getFields())
+            .setNumBuckets(1)
+            .addBucketCol("name")
+            .addStorageDescriptorParam("test_param_1", "Use this for comments etc")
+            .create(client, hiveConf);
+
+    Path dirDb = new Path(warehouse.getWhRoot(), dbName +".db");
+    warehouseFs.mkdirs(dirDb);
+    Path dirTbl1 = new Path(dirDb, tblName1);
+    warehouseFs.mkdirs(dirTbl1);
+    shimCm.createEncryptionZone(dirTbl1, "test_key_db");
+    Path part11 = new Path(dirTbl1, "part1");
+    createFile(part11, "testClearer11");
+
+    Path dirTbl2 = new Path(dirDb, tblName2);
+    warehouseFs.mkdirs(dirTbl2);
+    shimCm.createEncryptionZone(dirTbl2, "test_key_cm");
+    Path part12 = new Path(dirTbl2, "part1");
+    createFile(part12, "testClearer12");
+
+    boolean exceptionThrown = false;
+    try {
+      client.dropTable(dbName, tblName1);
+    } catch (MetaException e) {
+      exceptionThrown = true;
+      assertTrue(e.getMessage().contains("can't be moved from encryption zone"));
+    }
+    assertFalse(exceptionThrown);
+    assertFalse(warehouseFs.exists(part11));
+    try {
+      client.getTable(dbName, tblName1);
+    } catch (NoSuchObjectException e) {
+      exceptionThrown = true;
+    }
+    assertTrue(exceptionThrown);
+    exceptionThrown = false;
+    try {
+      client.dropTable(dbName, tblName2);
+    } catch (MetaException e) {
+      exceptionThrown = true;
+      assertTrue(e.getMessage().contains("can't be moved from encryption zone"));
+    }
+    assertFalse(exceptionThrown);
+    assertFalse(warehouseFs.exists(part12));
+    try {
+      client.getTable(dbName, tblName2);
+    } catch (NoSuchObjectException e) {
+      exceptionThrown = true;
+    }
+    assertTrue(exceptionThrown);
+    assertTrue(warehouseFs.exists(new Path(dirTbl1, cmrootEncrypted)));
+    assertTrue(warehouseFs.exists(new Path(dirTbl2, cmrootEncrypted)));
+  }
+
+  @Test
+  public void dropTableWithDifferentEncryptionZonesSameKey() throws Throwable {
+    String dbName1 = "encrdbsamekey1";
+    String dbName2 = "encrdbsamekey2";
+    String tblName1 = "encrtbl1";
+    String tblName2 = "encrtbl2";
+    String typeName = "Person";
+
+    silentDropDatabase(dbName1);
+    silentDropDatabase(dbName2);
+    new DatabaseBuilder()
+            .setName(dbName1)
+            .addParam("repl.source.for", "1, 2, 3")
+            .create(client, hiveConf);
+
+    new DatabaseBuilder()
+            .setName(dbName2)
+            .addParam("repl.source.for", "1, 2, 3")
+            .create(client, hiveConf);
+
+    client.dropType(typeName);
+    Type typ1 = new Type();
+    typ1.setName(typeName);
+    typ1.setFields(new ArrayList<>(2));
+    typ1.getFields().add(
+            new FieldSchema("name", ColumnType.STRING_TYPE_NAME, ""));
+    typ1.getFields().add(
+            new FieldSchema("income", ColumnType.INT_TYPE_NAME, ""));
+    client.createType(typ1);
+
+    Path dirDb1 = new Path(warehouse.getWhRoot(), dbName1 +".db");
+    warehouseFs.mkdirs(dirDb1);
+    shimCm.createEncryptionZone(dirDb1, "test_key_db");
+    Path dirTbl1 = new Path(dirDb1, tblName1);
+    warehouseFs.mkdirs(dirTbl1);
+    Path part11 = new Path(dirTbl1, "part1");
+    createFile(part11, "testClearer11");
+
+    Path dirDb2 = new Path(warehouse.getWhRoot(), dbName2 +".db");
+    warehouseFs.mkdirs(dirDb2);
+    shimCm.createEncryptionZone(dirDb2, "test_key_db");
+    Path dirTbl2 = new Path(dirDb2, tblName2);
+    warehouseFs.mkdirs(dirTbl2);
+    Path part12 = new Path(dirTbl2, "part1");
+    createFile(part12, "testClearer12");
+
+    new TableBuilder()
+            .setDbName(dbName1)
+            .setTableName(tblName1)
+            .setCols(typ1.getFields())
+            .setNumBuckets(1)
+            .addBucketCol("name")
+            .addStorageDescriptorParam("test_param_1", "Use this for comments etc")
+            .create(client, hiveConf);
+
+    Table tbl = client.getTable(dbName1, tblName1);
+    Assert.assertNotNull(tbl);
+
+    new TableBuilder()
+            .setDbName(dbName2)
+            .setTableName(tblName2)
+            .setCols(typ1.getFields())
+            .setNumBuckets(1)
+            .addBucketCol("name")
+            .addStorageDescriptorParam("test_param_1", "Use this for comments etc")
+            .create(client, hiveConf);
+
+    boolean exceptionThrown = false;
+    try {
+      client.dropTable(dbName1, tblName1);
+    } catch (MetaException e) {
+      exceptionThrown = true;
+      assertTrue(e.getMessage().contains("can't be moved from encryption zone"));
+    }
+    assertFalse(exceptionThrown);
+    assertFalse(warehouseFs.exists(part11));
+    try {
+      client.getTable(dbName1, tblName1);
+    } catch (NoSuchObjectException e) {
+      exceptionThrown = true;
+    }
+    assertTrue(exceptionThrown);
+    exceptionThrown = false;
+    try {
+      client.dropTable(dbName2, tblName2);
+    } catch (MetaException e) {
+      exceptionThrown = true;
+      assertTrue(e.getMessage().contains("can't be moved from encryption zone"));
+    }
+    assertFalse(exceptionThrown);
+    assertFalse(warehouseFs.exists(part12));
+    try {
+      client.getTable(dbName2, tblName2);
+    } catch (NoSuchObjectException e) {
+      exceptionThrown = true;
+    }
+    assertTrue(exceptionThrown);
+
+  }
+
+  @Test
+  public void dropTableWithSameEncryptionZones() throws Throwable {
+    String dbName = "encrdb3";
+    String tblName1 = "encrtbl1";
+    String tblName2 = "encrtbl2";
+    String typeName = "Person";
+    silentDropDatabase(dbName);
+
+    new DatabaseBuilder()
+            .setName(dbName)
+            .addParam("repl.source.for", "1, 2, 3")
+            .create(client, hiveConf);
+
+    client.dropType(typeName);
+    Type typ1 = new Type();
+    typ1.setName(typeName);
+    typ1.setFields(new ArrayList<>(2));
+    typ1.getFields().add(
+            new FieldSchema("name", ColumnType.STRING_TYPE_NAME, ""));
+    typ1.getFields().add(
+            new FieldSchema("income", ColumnType.INT_TYPE_NAME, ""));
+    client.createType(typ1);
+
+    new TableBuilder()
+            .setDbName(dbName)
+            .setTableName(tblName1)
+            .setCols(typ1.getFields())
+            .setNumBuckets(1)
+            .addBucketCol("name")
+            .addStorageDescriptorParam("test_param_1", "Use this for comments etc")
+            .create(client, hiveConf);
+
+    Table tbl = client.getTable(dbName, tblName1);
+    Assert.assertNotNull(tbl);
+
+    new TableBuilder()
+            .setDbName(dbName)
+            .setTableName(tblName2)
+            .setCols(typ1.getFields())
+            .setNumBuckets(1)
+            .addBucketCol("name")
+            .addStorageDescriptorParam("test_param_1", "Use this for comments etc")
+            .create(client, hiveConf);
+
+    Path dirDb = new Path(warehouse.getWhRoot(), dbName +".db");
+    warehouseFs.delete(dirDb, true);
+    warehouseFs.mkdirs(dirDb);
+    shimCm.createEncryptionZone(dirDb, "test_key_db");
+    Path dirTbl1 = new Path(dirDb, tblName1);
+    warehouseFs.mkdirs(dirTbl1);
+    Path part11 = new Path(dirTbl1, "part1");
+    createFile(part11, "testClearer11");
+
+    Path dirTbl2 = new Path(dirDb, tblName2);
+    warehouseFs.mkdirs(dirTbl2);
+    Path part12 = new Path(dirTbl2, "part1");
+    createFile(part12, "testClearer12");
+
+    boolean exceptionThrown = false;
+    try {
+      client.dropTable(dbName, tblName1);
+    } catch (MetaException e) {
+      exceptionThrown = true;
+      assertTrue(e.getMessage().contains("can't be moved from encryption zone"));
+    }
+    assertFalse(exceptionThrown);
+    assertFalse(warehouseFs.exists(part11));
+    try {
+      client.getTable(dbName, tblName1);
+    } catch (NoSuchObjectException e) {
+      exceptionThrown = true;
+    }
+    assertTrue(exceptionThrown);
+    exceptionThrown = false;
+    try {
+      client.dropTable(dbName, tblName2);
+    } catch (MetaException e) {
+      exceptionThrown = true;
+      assertTrue(e.getMessage().contains("can't be moved from encryption zone"));
+    }
+    assertFalse(exceptionThrown);
+    assertFalse(warehouseFs.exists(part12));
+    try {
+      client.getTable(dbName, tblName2);
+    } catch (NoSuchObjectException e) {
+      exceptionThrown = true;
+    }
+    assertTrue(exceptionThrown);
+  }
+
+  @Test
+  public void dropTableWithoutEncryptionZonesForCm() throws Throwable {
+    String dbName = "simpdb1";
+    String tblName = "simptbl";
+    String typeName = "Person";
+    silentDropDatabase(dbName);
+    new DatabaseBuilder()
+            .setName(dbName)
+            .addParam("repl.source.for", "1, 2, 3")
+            .create(client, hiveConf);
+
+    client.dropType(typeName);
+    Type typ1 = new Type();
+    typ1.setName(typeName);
+    typ1.setFields(new ArrayList<>(2));
+    typ1.getFields().add(
+            new FieldSchema("name", ColumnType.STRING_TYPE_NAME, ""));
+    typ1.getFields().add(
+            new FieldSchema("income", ColumnType.INT_TYPE_NAME, ""));
+    client.createType(typ1);
+
+    new TableBuilder()
+            .setDbName(dbName)
+            .setTableName(tblName)
+            .setCols(typ1.getFields())
+            .setNumBuckets(1)
+            .addBucketCol("name")
+            .addStorageDescriptorParam("test_param_1", "Use this for comments etc")
+            .create(client, hiveConf);
+
+    Table tbl = client.getTable(dbName, tblName);
+    Assert.assertNotNull(tbl);
+
+    Path dirDb = new Path(warehouse.getWhRoot(), dbName +".db");
+    warehouseFs.mkdirs(dirDb);
+    Path dirTbl1 = new Path(dirDb, tblName);
+    warehouseFs.mkdirs(dirTbl1);
+    Path part11 = new Path(dirTbl1, "part1");
+    createFile(part11, "testClearer11");
+
+    boolean exceptionThrown = false;
+    try {
+      client.dropTable(dbName, tblName);
+    } catch (Exception e) {
+      exceptionThrown = true;
+    }
+    assertFalse(exceptionThrown);
+    assertFalse(warehouseFs.exists(part11));
+    try {
+      client.getTable(dbName, tblName);
+    } catch (NoSuchObjectException e) {
+      exceptionThrown = true;
+    }
+    assertTrue(exceptionThrown);
+  }
+
+  @Test
+  public void dropExternalTableWithSameEncryptionZonesForCm() throws Throwable {
+    String dbName = "encrdb4";
+    String tblName1 = "encrtbl1";
+    String tblName2 = "encrtbl2";
+    String typeName = "Person";
+    silentDropDatabase(dbName);
+    new DatabaseBuilder()
+            .setName(dbName)
+            .addParam("repl.source.for", "1, 2, 3")
+            .create(client, hiveConf);
+
+    client.dropType(typeName);
+    Type typ1 = new Type();
+    typ1.setName(typeName);
+    typ1.setFields(new ArrayList<>(2));
+    typ1.getFields().add(
+            new FieldSchema("name", ColumnType.STRING_TYPE_NAME, ""));
+    typ1.getFields().add(
+            new FieldSchema("income", ColumnType.INT_TYPE_NAME, ""));
+    client.createType(typ1);
+
+    new TableBuilder()
+            .setDbName(dbName)
+            .setTableName(tblName1)
+            .setCols(typ1.getFields())
+            .setNumBuckets(1)
+            .addBucketCol("name")
+            .addTableParam("EXTERNAL", "true")
+            .addTableParam("external.table.purge", "true")
+            .addStorageDescriptorParam("test_param_1", "Use this for comments etc")
+            .create(client, hiveConf);
+
+    Table tbl = client.getTable(dbName, tblName1);
+    Assert.assertNotNull(tbl);
+
+    new TableBuilder()
+            .setDbName(dbName)
+            .setTableName(tblName2)
+            .setCols(typ1.getFields())
+            .setNumBuckets(1)
+            .addBucketCol("name")
+            .addTableParam("EXTERNAL", "true")
+            .addTableParam("external.table.purge", "true")
+            .addStorageDescriptorParam("test_param_1", "Use this for comments etc")
+            .create(client, hiveConf);
+
+    Path dirDb = new Path(warehouse.getWhRoot(), dbName +".db");
+    warehouseFs.delete(dirDb, true);
+    warehouseFs.mkdirs(dirDb);
+    shimCm.createEncryptionZone(dirDb, "test_key_db");
+    Path dirTbl1 = new Path(dirDb, tblName1);
+    warehouseFs.mkdirs(dirTbl1);
+    Path part11 = new Path(dirTbl1, "part1");
+    createFile(part11, "testClearer11");
+
+    Path dirTbl2 = new Path(dirDb, tblName2);
+    warehouseFs.mkdirs(dirTbl2);
+    Path part12 = new Path(dirTbl2, "part1");
+    createFile(part12, "testClearer12");
+
+    boolean exceptionThrown = false;
+    try {
+      client.dropTable(dbName, tblName1);
+    } catch (MetaException e) {
+      exceptionThrown = true;
+    }
+    assertFalse(exceptionThrown);
+    assertFalse(warehouseFs.exists(part11));
+    try {
+      client.getTable(dbName, tblName1);
+    } catch (NoSuchObjectException e) {
+      exceptionThrown = true;
+    }
+    assertTrue(exceptionThrown);
+    exceptionThrown = false;
+    try {
+      client.dropTable(dbName, tblName2);
+    } catch (MetaException e) {
+      exceptionThrown = true;
+    }
+    assertFalse(exceptionThrown);
+    assertFalse(warehouseFs.exists(part11));
+    try {
+      client.getTable(dbName, tblName2);
+    } catch (NoSuchObjectException e) {
+      exceptionThrown = true;
+    }
+    assertTrue(exceptionThrown);
+  }
+
+  @Test
+  public void dropExternalTableWithDifferentEncryptionZones() throws Throwable {
+    String dbName = "encrdb5";
+    String tblName1 = "encrtbl1";
+    String tblName2 = "encrtbl2";
+    String typeName = "Person";
+
+    silentDropDatabase(dbName);
+    new DatabaseBuilder()
+            .setName(dbName)
+            .addParam("repl.source.for", "1, 2, 3")
+            .create(client, hiveConf);
+
+    client.dropType(typeName);
+    Type typ1 = new Type();
+    typ1.setName(typeName);
+    typ1.setFields(new ArrayList<>(2));
+    typ1.getFields().add(
+            new FieldSchema("name", ColumnType.STRING_TYPE_NAME, ""));
+    typ1.getFields().add(
+            new FieldSchema("income", ColumnType.INT_TYPE_NAME, ""));
+    client.createType(typ1);
+
+    new TableBuilder()
+            .setDbName(dbName)
+            .setTableName(tblName1)
+            .setCols(typ1.getFields())
+            .setNumBuckets(1)
+            .addBucketCol("name")
+            .addTableParam("EXTERNAL", "true")
+            .addTableParam("external.table.purge", "true")
+            .addStorageDescriptorParam("test_param_1", "Use this for comments etc")
+            .create(client, hiveConf);
+
+    Table tbl = client.getTable(dbName, tblName1);
+    Assert.assertNotNull(tbl);
+
+    new TableBuilder()
+            .setDbName(dbName)
+            .setTableName(tblName2)
+            .setCols(typ1.getFields())
+            .setNumBuckets(1)
+            .addBucketCol("name")
+            .addTableParam("EXTERNAL", "true")
+            .addTableParam("external.table.purge", "true")
+            .addStorageDescriptorParam("test_param_1", "Use this for comments etc")
+            .create(client, hiveConf);
+
+    Path dirDb = new Path(warehouse.getWhRoot(), dbName +".db");
+    warehouseFs.mkdirs(dirDb);
+    Path dirTbl1 = new Path(dirDb, tblName1);
+    warehouseFs.mkdirs(dirTbl1);
+    shimCm.createEncryptionZone(dirTbl1, "test_key_db");
+    Path part11 = new Path(dirTbl1, "part1");
+    createFile(part11, "testClearer11");
+
+    Path dirTbl2 = new Path(dirDb, tblName2);
+    warehouseFs.mkdirs(dirTbl2);
+    shimCm.createEncryptionZone(dirTbl2, "test_key_db");
+    Path part12 = new Path(dirTbl2, "part1");
+    createFile(part12, "testClearer12");
+
+    boolean exceptionThrown = false;
+    try {
+      client.dropTable(dbName, tblName1);
+    } catch (MetaException e) {
+      exceptionThrown = true;
+    }
+    assertFalse(exceptionThrown);
+    assertFalse(warehouseFs.exists(part11));
+    try {
+      client.getTable(dbName, tblName1);
+    } catch (NoSuchObjectException e) {
+      exceptionThrown = true;
+    }
+    assertTrue(exceptionThrown);
+    exceptionThrown = false;
+    try {
+      client.dropTable(dbName, tblName2);
+    } catch (MetaException e) {
+      exceptionThrown = true;
+    }
+    assertFalse(exceptionThrown);
+    assertFalse(warehouseFs.exists(part12));
+    try {
+      client.getTable(dbName, tblName2);
+    } catch (NoSuchObjectException e) {
+      exceptionThrown = true;
+    }
+    assertTrue(exceptionThrown);
+  }
+
+  @Test
+  public void dropExternalTableWithDifferentEncryptionZonesDifferentKey() throws Throwable {
+    String dbName = "encrdb6";
+    String tblName1 = "encrtbl1";
+    String tblName2 = "encrtbl2";
+    String typeName = "Person";
+
+    silentDropDatabase(dbName);
+    new DatabaseBuilder()
+            .setName(dbName)
+            .addParam("repl.source.for", "1, 2, 3")
+            .create(client, hiveConf);
+
+    client.dropType(typeName);
+    Type typ1 = new Type();
+    typ1.setName(typeName);
+    typ1.setFields(new ArrayList<>(2));
+    typ1.getFields().add(
+            new FieldSchema("name", ColumnType.STRING_TYPE_NAME, ""));
+    typ1.getFields().add(
+            new FieldSchema("income", ColumnType.INT_TYPE_NAME, ""));
+    client.createType(typ1);
+
+    new TableBuilder()
+            .setDbName(dbName)
+            .setTableName(tblName1)
+            .setCols(typ1.getFields())
+            .setNumBuckets(1)
+            .addBucketCol("name")
+            .addTableParam("EXTERNAL", "true")
+            .addTableParam("external.table.purge", "true")
+            .addStorageDescriptorParam("test_param_1", "Use this for comments etc")
+            .create(client, hiveConf);
+
+    Table tbl = client.getTable(dbName, tblName1);
+    Assert.assertNotNull(tbl);
+
+    new TableBuilder()
+            .setDbName(dbName)
+            .setTableName(tblName2)
+            .setCols(typ1.getFields())
+            .setNumBuckets(1)
+            .addBucketCol("name")
+            .addTableParam("EXTERNAL", "true")
+            .addTableParam("external.table.purge", "true")
+            .addStorageDescriptorParam("test_param_1", "Use this for comments etc")
+            .create(client, hiveConf);
+
+    Path dirDb = new Path(warehouse.getWhRoot(), dbName +".db");
+    warehouseFs.mkdirs(dirDb);
+    Path dirTbl1 = new Path(dirDb, tblName1);
+    warehouseFs.mkdirs(dirTbl1);
+    shimCm.createEncryptionZone(dirTbl1, "test_key_db");
+    Path part11 = new Path(dirTbl1, "part1");
+    createFile(part11, "testClearer11");
+
+    Path dirTbl2 = new Path(dirDb, tblName2);
+    warehouseFs.mkdirs(dirTbl2);
+    shimCm.createEncryptionZone(dirTbl2, "test_key_cm");
+    Path part12 = new Path(dirTbl2, "part1");
+    createFile(part12, "testClearer12");
+
+    boolean exceptionThrown = false;
+    try {
+      client.dropTable(dbName, tblName1);
+    } catch (MetaException e) {
+      exceptionThrown = true;
+    }
+    assertFalse(exceptionThrown);
+    assertFalse(warehouseFs.exists(part11));
+    try {
+      client.getTable(dbName, tblName1);
+    } catch (NoSuchObjectException e) {
+      exceptionThrown = true;
+    }
+    assertTrue(exceptionThrown);
+    exceptionThrown = false;
+    try {
+      client.dropTable(dbName, tblName2);
+    } catch (MetaException e) {
+      exceptionThrown = true;
+    }
+    assertFalse(exceptionThrown);
+    assertFalse(warehouseFs.exists(part12));
+    try {
+      client.getTable(dbName, tblName2);
+    } catch (NoSuchObjectException e) {
+      exceptionThrown = true;
+    }
+    assertTrue(exceptionThrown);
+  }
+
+  @Test
+  public void dropExternalTableWithoutEncryptionZonesForCm() throws Throwable {
+    String dbName = "simpdb2";
+    String tblName = "simptbl";
+    String typeName = "Person";
+    silentDropDatabase(dbName);
+    new DatabaseBuilder()
+            .setName(dbName)
+            .addParam("repl.source.for", "1, 2, 3")
+            .create(client, hiveConf);
+
+    client.dropType(typeName);
+    Type typ1 = new Type();
+    typ1.setName(typeName);
+    typ1.setFields(new ArrayList<>(2));
+    typ1.getFields().add(
+            new FieldSchema("name", ColumnType.STRING_TYPE_NAME, ""));
+    typ1.getFields().add(
+            new FieldSchema("income", ColumnType.INT_TYPE_NAME, ""));
+    client.createType(typ1);
+
+    new TableBuilder()
+            .setDbName(dbName)
+            .setTableName(tblName)
+            .setCols(typ1.getFields())
+            .setNumBuckets(1)
+            .addBucketCol("name")
+            .addTableParam("EXTERNAL", "true")
+            .addTableParam("external.table.purge", "true")
+            .addStorageDescriptorParam("test_param_1", "Use this for comments etc")
+            .create(client, hiveConf);
+
+    Table tbl = client.getTable(dbName, tblName);
+    Assert.assertNotNull(tbl);
+
+    Path dirDb = new Path(warehouse.getWhRoot(), dbName +".db");
+    warehouseFs.mkdirs(dirDb);
+    Path dirTbl1 = new Path(dirDb, tblName);
+    warehouseFs.mkdirs(dirTbl1);
+    Path part11 = new Path(dirTbl1, "part1");
+    createFile(part11, "testClearer11");
+
+    boolean exceptionThrown = false;
+    try {
+      client.dropTable(dbName, tblName);
+    } catch (Exception e) {
+      exceptionThrown = true;
+    }
+    assertFalse(exceptionThrown);
+    assertFalse(warehouseFs.exists(part11));
+    try {
+      client.getTable(dbName, tblName);
+    } catch (NoSuchObjectException e) {
+      exceptionThrown = true;
+    }
+    assertTrue(exceptionThrown);
+  }
+
+  @Test
+  public void truncateTableWithDifferentEncryptionZones() throws Throwable {
+    String dbName1 = "encrdbtrunc1";
+    String dbName2 = "encrdbtrunc2";
+    String tblName1 = "encrtbl1";
+    String tblName2 = "encrtbl2";
+    String typeName = "Person";
+
+    silentDropDatabase(dbName1);
+    silentDropDatabase(dbName2);
+    new DatabaseBuilder()
+            .setName(dbName1)
+            .addParam("repl.source.for", "1, 2, 3")
+            .create(client, hiveConf);
+
+    new DatabaseBuilder()
+            .setName(dbName2)
+            .addParam("repl.source.for", "1, 2, 3")
+            .create(client, hiveConf);
+
+    client.dropType(typeName);
+    Type typ1 = new Type();
+    typ1.setName(typeName);
+    typ1.setFields(new ArrayList<>(2));
+    typ1.getFields().add(
+            new FieldSchema("name", ColumnType.STRING_TYPE_NAME, ""));
+    typ1.getFields().add(
+            new FieldSchema("income", ColumnType.INT_TYPE_NAME, ""));
+    client.createType(typ1);
+
+    Path dirDb1 = new Path(warehouse.getWhRoot(), dbName1 +".db");
+    warehouseFs.delete(dirDb1, true);
+    warehouseFs.mkdirs(dirDb1);
+    shimCm.createEncryptionZone(dirDb1, "test_key_db");
+    Path dirTbl1 = new Path(dirDb1, tblName1);
+    warehouseFs.mkdirs(dirTbl1);
+    Path part11 = new Path(dirTbl1, "part1");
+    createFile(part11, "testClearer11");
+
+    Path dirDb2 = new Path(warehouse.getWhRoot(), dbName2 +".db");
+    warehouseFs.delete(dirDb2, true);
+    warehouseFs.mkdirs(dirDb2);
+    shimCm.createEncryptionZone(dirDb2, "test_key_db");
+    Path dirTbl2 = new Path(dirDb2, tblName2);
+    warehouseFs.mkdirs(dirTbl2);
+    Path part12 = new Path(dirTbl2, "part1");
+    createFile(part12, "testClearer12");
+
+    new TableBuilder()
+            .setDbName(dbName1)
+            .setTableName(tblName1)
+            .setCols(typ1.getFields())
+            .setNumBuckets(1)
+            .addBucketCol("name")
+            .addStorageDescriptorParam("test_param_1", "Use this for comments etc")
+            .create(client, hiveConf);
+
+    Table tbl = client.getTable(dbName1, tblName1);
+    Assert.assertNotNull(tbl);
+
+    new TableBuilder()
+            .setDbName(dbName2)
+            .setTableName(tblName2)
+            .setCols(typ1.getFields())
+            .setNumBuckets(1)
+            .addBucketCol("name")
+            .addStorageDescriptorParam("test_param_1", "Use this for comments etc")
+            .create(client, hiveConf);
+
+    boolean exceptionThrown = false;
+    try {
+      client.truncateTable(dbName1, tblName1, null);
+    } catch (MetaException e) {
+      exceptionThrown = true;
+      assertTrue(e.getMessage().contains("can't be moved from encryption zone"));
+    }
+    assertFalse(exceptionThrown);
+    assertFalse(warehouseFs.exists(part11));
+    assertNotNull(client.getTable(dbName1, tblName1));
+    exceptionThrown = false;
+    try {
+      client.truncateTable(dbName2, tblName2, null);
+    } catch (MetaException e) {
+      exceptionThrown = true;
+      assertTrue(e.getMessage().contains("can't be moved from encryption zone"));
+    }
+    assertFalse(exceptionThrown);
+    assertFalse(warehouseFs.exists(part12));
+    assertNotNull(client.getTable(dbName2, tblName2));
+  }
+
+  @Test
+  public void truncateTableWithDifferentEncryptionZonesDifferentKey() throws Throwable {
+    String dbName1 = "encrdb1";
+    String dbName2 = "encrdb2";
+    String tblName1 = "encrtbl1";
+    String tblName2 = "encrtbl2";
+    String typeName = "Person";
+
+    silentDropDatabase(dbName1);
+    silentDropDatabase(dbName2);
+    new DatabaseBuilder()
+            .setName(dbName1)
+            .addParam("repl.source.for", "1, 2, 3")
+            .create(client, hiveConf);
+
+    new DatabaseBuilder()
+            .setName(dbName2)
+            .addParam("repl.source.for", "1, 2, 3")
+            .create(client, hiveConf);
+
+    client.dropType(typeName);
+    Type typ1 = new Type();
+    typ1.setName(typeName);
+    typ1.setFields(new ArrayList<>(2));
+    typ1.getFields().add(
+            new FieldSchema("name", ColumnType.STRING_TYPE_NAME, ""));
+    typ1.getFields().add(
+            new FieldSchema("income", ColumnType.INT_TYPE_NAME, ""));
+    client.createType(typ1);
+
+    Path dirDb1 = new Path(warehouse.getWhRoot(), dbName1 +".db");
+    warehouseFs.mkdirs(dirDb1);
+    shimCm.createEncryptionZone(dirDb1, "test_key_db");
+    Path dirTbl1 = new Path(dirDb1, tblName1);
+    warehouseFs.mkdirs(dirTbl1);
+    Path part11 = new Path(dirTbl1, "part1");
+    createFile(part11, "testClearer11");
+
+    Path dirDb2 = new Path(warehouse.getWhRoot(), dbName2 +".db");
+    warehouseFs.mkdirs(dirDb2);
+    shimCm.createEncryptionZone(dirDb2, "test_key_db");
+    Path dirTbl2 = new Path(dirDb2, tblName2);
+    warehouseFs.mkdirs(dirTbl2);
+    Path part12 = new Path(dirTbl2, "part1");
+    createFile(part12, "testClearer12");
+
+    new TableBuilder()
+            .setDbName(dbName1)
+            .setTableName(tblName1)
+            .setCols(typ1.getFields())
+            .setNumBuckets(1)
+            .addBucketCol("name")
+            .addStorageDescriptorParam("test_param_1", "Use this for comments etc")
+            .create(client, hiveConf);
+
+    Table tbl = client.getTable(dbName1, tblName1);
+    Assert.assertNotNull(tbl);
+
+    new TableBuilder()
+            .setDbName(dbName2)
+            .setTableName(tblName2)
+            .setCols(typ1.getFields())
+            .setNumBuckets(1)
+            .addBucketCol("name")
+            .addStorageDescriptorParam("test_param_1", "Use this for comments etc")
+            .create(client, hiveConf);
+
+    boolean exceptionThrown = false;
+    try {
+      client.truncateTable(dbName1, tblName1, null);
+    } catch (MetaException e) {
+      exceptionThrown = true;
+      assertTrue(e.getMessage().contains("can't be moved from encryption zone"));
+    }
+    assertFalse(exceptionThrown);
+    assertFalse(warehouseFs.exists(part11));
+    assertNotNull(client.getTable(dbName1, tblName1));
+    exceptionThrown = false;
+    try {
+      client.truncateTable(dbName2, tblName2, null);
+    } catch (MetaException e) {
+      exceptionThrown = true;
+      assertTrue(e.getMessage().contains("can't be moved from encryption zone"));
+    }
+    assertFalse(exceptionThrown);
+    assertFalse(warehouseFs.exists(part12));
+    assertNotNull(client.getTable(dbName2, tblName2));
+  }
+
+  @Test
+  public void truncateTableWithSameEncryptionZones() throws Throwable {
+    String dbName = "encrdb9";
+    String tblName1 = "encrtbl1";
+    String tblName2 = "encrtbl2";
+    String typeName = "Person";
+    client.dropTable(dbName, tblName1);
+    client.dropTable(dbName, tblName2);
+    silentDropDatabase(dbName);
+    new DatabaseBuilder()
+            .setName(dbName)
+            .addParam("repl.source.for", "1, 2, 3")
+            .create(client, hiveConf);
+
+    client.dropType(typeName);
+    Type typ1 = new Type();
+    typ1.setName(typeName);
+    typ1.setFields(new ArrayList<>(2));
+    typ1.getFields().add(
+            new FieldSchema("name", ColumnType.STRING_TYPE_NAME, ""));
+    typ1.getFields().add(
+            new FieldSchema("income", ColumnType.INT_TYPE_NAME, ""));
+    client.createType(typ1);
+
+    new TableBuilder()
+            .setDbName(dbName)
+            .setTableName(tblName1)
+            .setCols(typ1.getFields())
+            .setNumBuckets(1)
+            .addBucketCol("name")
+            .addStorageDescriptorParam("test_param_1", "Use this for comments etc")
+            .create(client, hiveConf);
+
+    Table tbl = client.getTable(dbName, tblName1);
+    Assert.assertNotNull(tbl);
+
+    new TableBuilder()
+            .setDbName(dbName)
+            .setTableName(tblName2)
+            .setCols(typ1.getFields())
+            .setNumBuckets(1)
+            .addBucketCol("name")
+            .addStorageDescriptorParam("test_param_1", "Use this for comments etc")
+            .create(client, hiveConf);
+
+    Path dirDb = new Path(warehouse.getWhRoot(), dbName +".db");
+    warehouseFs.delete(dirDb, true);
+    warehouseFs.mkdirs(dirDb);
+    shimCm.createEncryptionZone(dirDb, "test_key_db");
+    Path dirTbl1 = new Path(dirDb, tblName1);
+    warehouseFs.mkdirs(dirTbl1);
+    Path part11 = new Path(dirTbl1, "part1");
+    createFile(part11, "testClearer11");
+
+    Path dirTbl2 = new Path(dirDb, tblName2);
+    warehouseFs.mkdirs(dirTbl2);
+    Path part12 = new Path(dirTbl2, "part1");
+    createFile(part12, "testClearer12");
+
+    boolean exceptionThrown = false;
+    try {
+      client.truncateTable(dbName, tblName1, null);
+    } catch (MetaException e) {
+      exceptionThrown = true;
+      assertTrue(e.getMessage().contains("can't be moved from encryption zone"));
+    }
+    assertFalse(exceptionThrown);
+    assertFalse(warehouseFs.exists(part11));
+    try {
+      client.getTable(dbName, tblName1);
+    } catch (NoSuchObjectException e) {
+      exceptionThrown = true;
+    }
+    assertFalse(exceptionThrown);
+
+    try {
+      client.truncateTable(dbName, tblName2, null);
+    } catch (MetaException e) {
+      exceptionThrown = true;
+      assertTrue(e.getMessage().contains("can't be moved from encryption zone"));
+    }
+    assertFalse(exceptionThrown);
+    assertFalse(warehouseFs.exists(part12));
+    try {
+      client.getTable(dbName, tblName2);
+    } catch (NoSuchObjectException e) {
+      exceptionThrown = true;
+    }
+    assertFalse(exceptionThrown);
+  }
+
+  @Test
+  public void truncateTableWithoutEncryptionZonesForCm() throws Throwable {
+    String dbName = "simpdb3";
+    String tblName = "simptbl";
+    String typeName = "Person";
+    client.dropTable(dbName, tblName);
+    silentDropDatabase(dbName);
+
+    new DatabaseBuilder()
+            .setName(dbName)
+            .addParam("repl.source.for", "1, 2, 3")
+            .create(client, hiveConf);
+
+    client.dropType(typeName);
+    Type typ1 = new Type();
+    typ1.setName(typeName);
+    typ1.setFields(new ArrayList<>(2));
+    typ1.getFields().add(
+            new FieldSchema("name", ColumnType.STRING_TYPE_NAME, ""));
+    typ1.getFields().add(
+            new FieldSchema("income", ColumnType.INT_TYPE_NAME, ""));
+    client.createType(typ1);
+
+    new TableBuilder()
+            .setDbName(dbName)
+            .setTableName(tblName)
+            .setCols(typ1.getFields())
+            .setNumBuckets(1)
+            .addBucketCol("name")
+            .addStorageDescriptorParam("test_param_1", "Use this for comments etc")
+            .create(client, hiveConf);
+
+    Table tbl2 = client.getTable(dbName, tblName);
+    Assert.assertNotNull(tbl2);
+
+    Path dirDb = new Path(warehouse.getWhRoot(), dbName +".db");
+    warehouseFs.mkdirs(dirDb);
+    Path dirTbl1 = new Path(dirDb, tblName);
+    warehouseFs.mkdirs(dirTbl1);
+    Path part11 = new Path(dirTbl1, "part1");
+    createFile(part11, "testClearer11");
+
+    boolean exceptionThrown = false;
+    try {
+      client.truncateTable(dbName, tblName, null);
+    } catch (Exception e) {
+      exceptionThrown = true;
+    }
+    assertFalse(exceptionThrown);
+    assertFalse(warehouseFs.exists(part11));
+    try {
+      client.getTable(dbName, tblName);
+    } catch (NoSuchObjectException e) {
+      exceptionThrown = true;
+    }
+    assertFalse(exceptionThrown);
+  }
+
+  @Test
+  public void recycleFailureWithDifferentEncryptionZonesForCm() throws Throwable {
+    Path dirDb = new Path(warehouse.getWhRoot(), "db2");
+    warehouseFs.delete(dirDb, true);
+    warehouseFs.mkdirs(dirDb);
+    Path dirTbl1 = new Path(dirDb, "tbl1");
+    warehouseFs.mkdirs(dirTbl1);
+    shimCm.createEncryptionZone(dirTbl1, "test_key_db");
+    Path part11 = new Path(dirTbl1, "part1");
+    createFile(part11, "testClearer11");
+
+    boolean exceptionThrown = false;
+    try {
+      ReplChangeManager.getInstance(hiveConf).recycle(dirTbl1, RecycleType.MOVE, false);
+    } catch (RemoteException e) {
+      exceptionThrown = true;
+      assertTrue(e.getMessage().contains("can't be moved from encryption zone"));
+    }
+    assertFalse(exceptionThrown);
+  }
+
+  @Test
+  public void testClearerEncrypted() throws Exception {
+    HiveConf hiveConfCmClearer = new HiveConf(TestReplChangeManager.class);
+    hiveConfCmClearer.setBoolean(HiveConf.ConfVars.REPLCMENABLED.varname, true);
+    hiveConfCmClearer.setInt(CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY, 60);
+    hiveConfCmClearer.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname,
+            "hdfs://" + miniDFSCluster.getNameNode().getHostAndPort()
+                    + HiveConf.ConfVars.METASTOREWAREHOUSE.defaultStrVal);
+
+    String cmrootCmClearer = "hdfs://" + miniDFSCluster.getNameNode().getHostAndPort() + "/cmrootClearer";
+    hiveConfCmClearer.set(HiveConf.ConfVars.REPLCMDIR.varname, cmrootCmClearer);
+    Warehouse warehouseCmClearer = new Warehouse(hiveConfCmClearer);
+    FileSystem cmfs = new Path(cmrootCmClearer).getFileSystem(hiveConfCmClearer);
+    cmfs.mkdirs(warehouseCmClearer.getWhRoot());
+
+    HadoopShims.HdfsEncryptionShim shimCmEncrypted = ShimLoader.getHadoopShims().createHdfsEncryptionShim(cmfs, conf);
+
+    FileSystem fsWarehouse = warehouseCmClearer.getWhRoot().getFileSystem(hiveConfCmClearer);
+    long now = System.currentTimeMillis();
+    Path dirDb = new Path(warehouseCmClearer.getWhRoot(), "db1");
+    fsWarehouse.delete(dirDb, true);
+    fsWarehouse.mkdirs(dirDb);
+    Path dirTbl1 = new Path(dirDb, "tbl1");
+    fsWarehouse.mkdirs(dirTbl1);
+    shimCmEncrypted.createEncryptionZone(dirTbl1, "test_key_db");
+    Path part11 = new Path(dirTbl1, "part1");
+    createFile(part11, "testClearer11");
+    String fileChksum11 = ReplChangeManager.checksumFor(part11, fsWarehouse);
+    Path part12 = new Path(dirTbl1, "part2");
+    createFile(part12, "testClearer12");
+    String fileChksum12 = ReplChangeManager.checksumFor(part12, fsWarehouse);
+    Path dirTbl2 = new Path(dirDb, "tbl2");
+    fsWarehouse.mkdirs(dirTbl2);
+    shimCmEncrypted.createEncryptionZone(dirTbl2, "test_key_db");
+    Path part21 = new Path(dirTbl2, "part1");
+    createFile(part21, "testClearer21");
+    String fileChksum21 = ReplChangeManager.checksumFor(part21, fsWarehouse);
+    Path part22 = new Path(dirTbl2, "part2");
+    createFile(part22, "testClearer22");
+    String fileChksum22 = ReplChangeManager.checksumFor(part22, fsWarehouse);
+    Path dirTbl3 = new Path(dirDb, "tbl3");
+    fsWarehouse.mkdirs(dirTbl3);
+    shimCmEncrypted.createEncryptionZone(dirTbl3, "test_key_cm");
+    Path part31 = new Path(dirTbl3, "part1");
+    createFile(part31, "testClearer31");
+    String fileChksum31 = ReplChangeManager.checksumFor(part31, fsWarehouse);
+    Path part32 = new Path(dirTbl3, "part2");
+    createFile(part32, "testClearer32");
+    String fileChksum32 = ReplChangeManager.checksumFor(part32, fsWarehouse);
+
+    ReplChangeManager.getInstance(hiveConfCmClearer).recycle(dirTbl1, RecycleType.MOVE, false);
+    ReplChangeManager.getInstance(hiveConfCmClearer).recycle(dirTbl2, RecycleType.MOVE, false);
+    ReplChangeManager.getInstance(hiveConfCmClearer).recycle(dirTbl3, RecycleType.MOVE, true);
+
+    assertTrue(fsWarehouse.exists(ReplChangeManager.getCMPath(hiveConfCmClearer, part11.getName(), fileChksum11,
+            ReplChangeManager.getCmRoot(part11).toString())));
+    assertTrue(fsWarehouse.exists(ReplChangeManager.getCMPath(hiveConfCmClearer, part12.getName(), fileChksum12,
+            ReplChangeManager.getCmRoot(part12).toString())));
+    assertTrue(fsWarehouse.exists(ReplChangeManager.getCMPath(hiveConfCmClearer, part21.getName(), fileChksum21,
+            ReplChangeManager.getCmRoot(part21).toString())));
+    assertTrue(fsWarehouse.exists(ReplChangeManager.getCMPath(hiveConfCmClearer, part22.getName(), fileChksum22,
+            ReplChangeManager.getCmRoot(part22).toString())));
+    assertTrue(fsWarehouse.exists(ReplChangeManager.getCMPath(hiveConfCmClearer, part31.getName(), fileChksum31,
+            ReplChangeManager.getCmRoot(part31).toString())));
+    assertTrue(fsWarehouse.exists(ReplChangeManager.getCMPath(hiveConfCmClearer, part32.getName(), fileChksum32,
+            ReplChangeManager.getCmRoot(part32).toString())));
+
+    fsWarehouse.setTimes(ReplChangeManager.getCMPath(hiveConfCmClearer, part11.getName(), fileChksum11,
+            ReplChangeManager.getCmRoot(part11).toString()),
+            now - 86400*1000*2, now - 86400*1000*2);
+    fsWarehouse.setTimes(ReplChangeManager.getCMPath(hiveConfCmClearer, part21.getName(), fileChksum21,
+            ReplChangeManager.getCmRoot(part21).toString()),
+            now - 86400*1000*2, now - 86400*1000*2);
+    fsWarehouse.setTimes(ReplChangeManager.getCMPath(hiveConfCmClearer, part31.getName(), fileChksum31,
+            ReplChangeManager.getCmRoot(part31).toString()),
+            now - 86400*1000*2, now - 86400*1000*2);
+    fsWarehouse.setTimes(ReplChangeManager.getCMPath(hiveConfCmClearer, part32.getName(), fileChksum32,
+            ReplChangeManager.getCmRoot(part32).toString()),
+            now - 86400*1000*2, now - 86400*1000*2);
+
+    ReplChangeManager.scheduleCMClearer(hiveConfCmClearer);
+
+    long start = System.currentTimeMillis();
+    long end;
+    boolean cleared = false;
+    do {
+      Thread.sleep(200);
+      end = System.currentTimeMillis();
+      if (end - start > 5000) {
+        Assert.fail("timeout, cmroot has not been cleared");
+      }
+      if (!fsWarehouse.exists(ReplChangeManager.getCMPath(hiveConfCmClearer, part11.getName(), fileChksum11,
+              ReplChangeManager.getCmRoot(part11).toString())) &&
+              fsWarehouse.exists(ReplChangeManager.getCMPath(hiveConfCmClearer, part12.getName(), fileChksum12,
+                      ReplChangeManager.getCmRoot(part12).toString())) &&
+              !fsWarehouse.exists(ReplChangeManager.getCMPath(hiveConfCmClearer, part21.getName(), fileChksum21,
+                      ReplChangeManager.getCmRoot(part21).toString())) &&
+              fsWarehouse.exists(ReplChangeManager.getCMPath(hiveConfCmClearer, part22.getName(), fileChksum22,
+                      ReplChangeManager.getCmRoot(part22).toString())) &&
+              !fsWarehouse.exists(ReplChangeManager.getCMPath(hiveConfCmClearer, part31.getName(), fileChksum31,
+                      ReplChangeManager.getCmRoot(part31).toString())) &&
+              !fsWarehouse.exists(ReplChangeManager.getCMPath(hiveConfCmClearer, part32.getName(), fileChksum32,
+                      ReplChangeManager.getCmRoot(part32).toString()))) {
+        cleared = true;
+      }
+    } while (!cleared);
+  }
+
+  @Test
+  public void testCmrootEncrypted() throws Exception {
+    HiveConf encryptedHiveConf = new HiveConf(TestReplChangeManager.class);
+    encryptedHiveConf.setBoolean(HiveConf.ConfVars.REPLCMENABLED.varname, true);
+    encryptedHiveConf.setInt(CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY, 60);
+    encryptedHiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname,
+            "hdfs://" + miniDFSCluster.getNameNode().getHostAndPort()
+                    + HiveConf.ConfVars.METASTOREWAREHOUSE.defaultStrVal);
+
+    String cmrootdirEncrypted = "hdfs://" + miniDFSCluster.getNameNode().getHostAndPort() + "/cmroot";
+    encryptedHiveConf.set(HiveConf.ConfVars.REPLCMDIR.varname, cmrootdirEncrypted);
+    encryptedHiveConf.set(HiveConf.ConfVars.REPLCMFALLBACKNONENCRYPTEDDIR.varname, cmrootFallBack);
+
+    //Create cm in encrypted zone
+    HadoopShims.HdfsEncryptionShim shimCmEncrypted = ShimLoader.getHadoopShims().createHdfsEncryptionShim(fs, conf);
+    shimCmEncrypted.createEncryptionZone(new Path(cmrootdirEncrypted), "test_key_db");
+    ReplChangeManager.resetReplChangeManagerInstance();
+    Warehouse warehouseEncrypted = new Warehouse(encryptedHiveConf);
+    FileSystem warehouseFsEncrypted = warehouseEncrypted.getWhRoot().getFileSystem(encryptedHiveConf);
+    FileSystem fsCmEncrypted = new Path(cmrootdirEncrypted).getFileSystem(encryptedHiveConf);
+    fsCmEncrypted.mkdirs(warehouseEncrypted.getWhRoot());
+
+    Path dirDb = new Path(warehouseEncrypted.getWhRoot(), "db3");
+    warehouseFsEncrypted.delete(dirDb, true);
+    warehouseFsEncrypted.mkdirs(dirDb);
+    Path dirTbl1 = new Path(dirDb, "tbl1");
+    warehouseFsEncrypted.mkdirs(dirTbl1);
+    shimCmEncrypted.createEncryptionZone(dirTbl1, "test_key_db");
+    Path part11 = new Path(dirTbl1, "part1");
+    createFile(part11, "testClearer11");
+
+    boolean exceptionThrown = false;
+    try {
+      ReplChangeManager.getInstance(encryptedHiveConf).recycle(dirTbl1, RecycleType.MOVE, false);
+    } catch (RemoteException e) {
+      exceptionThrown = true;
+      assertTrue(e.getMessage().contains("can't be moved from encryption zone"));
+    }
+    assertFalse(exceptionThrown);
+
+    Path dirDbUnEncrypted = new Path(warehouseEncrypted.getWhRoot(), "db3en");
+    warehouseFsEncrypted.delete(dirDbUnEncrypted, true);
+    warehouseFsEncrypted.mkdirs(dirDbUnEncrypted);
+    Path dirTblun1 = new Path(dirDbUnEncrypted, "tbl1");
+    warehouseFsEncrypted.mkdirs(dirTblun1);
+    Path partun11 = new Path(dirTblun1, "part1");
+    createFile(partun11, "testClearer11");
+
+    exceptionThrown = false;
+    try {
+      ReplChangeManager.getInstance(encryptedHiveConf).recycle(dirDbUnEncrypted, RecycleType.MOVE, false);
+    } catch (IOException e) {
+      exceptionThrown = true;
+    }
+    assertFalse(exceptionThrown);
+    ReplChangeManager.resetReplChangeManagerInstance();
+    initReplChangeManager();
+  }
+
+
+  private void createFile(Path path, String content) throws IOException {
+    FSDataOutputStream output = path.getFileSystem(hiveConf).create(path);
+    output.writeChars(content);
+    output.close();
+  }
+
+  private void silentDropDatabase(String dbName) throws TException {
+    try {
+      for (String tableName : client.getTables(dbName, "*")) {
+        client.dropTable(dbName, tableName);
+      }
+      client.dropDatabase(dbName);
+    } catch (NoSuchObjectException|InvalidOperationException e) {
+      // NOP
+    }
+  }
+}
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java
index 5ab4f91..d3891cf 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java
@@ -271,6 +271,7 @@ public class TestReplChangeManager {
     FileSystem fs = warehouse.getWhRoot().getFileSystem(hiveConf);
     long now = System.currentTimeMillis();
     Path dirDb = new Path(warehouse.getWhRoot(), "db3");
+    fs.delete(dirDb, true);
     fs.mkdirs(dirDb);
     Path dirTbl1 = new Path(dirDb, "tbl1");
     fs.mkdirs(dirTbl1);
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStoreUpdateUsingEvents.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStoreUpdateUsingEvents.java
index 562b2c9..19d38d2 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStoreUpdateUsingEvents.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStoreUpdateUsingEvents.java
@@ -48,6 +48,8 @@ public class TestCachedStoreUpdateUsingEvents {
     MetastoreConf.setBoolVar(conf, ConfVars.METASTORE_CACHE_CAN_USE_EVENT, true);
     MetastoreConf.setBoolVar(conf, ConfVars.HIVE_TXN_STATS_ENABLED, true);
     MetastoreConf.setBoolVar(conf, ConfVars.AGGREGATE_STATS_CACHE_ENABLED, false);
+    MetastoreConf.setBoolVar(conf, ConfVars.REPLCMENABLED, true);
+    MetastoreConf.setVar(conf, ConfVars.REPLCMDIR, "cmroot");
     MetaStoreTestUtils.setConfForStandloneMode(conf);
 
     hmsHandler = new HiveMetaStore.HMSHandler("testCachedStore", conf, true);
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/metadata/TestAlterTableMetadata.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/metadata/TestAlterTableMetadata.java
index f6035fa..96aeb0f 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/metadata/TestAlterTableMetadata.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/metadata/TestAlterTableMetadata.java
@@ -63,5 +63,6 @@ public class TestAlterTableMetadata {
     table = Hive.get(conf).getTable("t1");
     assertEquals(PrincipalType.ROLE, table.getOwnerType());
     assertEquals("r1", table.getOwner());
+
   }
 }
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java
index 92a0bbe..7e0a7f2 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java
@@ -202,6 +202,8 @@ public class TestJdbcDriver2 {
     System.setProperty(ConfVars.HIVE_AUTHORIZATION_MANAGER.varname,
         "org.apache.hadoop.hive.ql.security.authorization.DefaultHiveAuthorizationProvider");
     System.setProperty(ConfVars.HIVE_SERVER2_PARALLEL_OPS_IN_SESSION.varname, "false");
+    System.setProperty(ConfVars.REPLCMENABLED.varname, "true");
+    System.setProperty(ConfVars.REPLCMDIR.varname, "cmroot");
     con = getConnection(defaultDbName + ";create=true");
     Statement stmt = con.createStatement();
     assertNotNull("Statement is null", stmt);
@@ -2828,6 +2830,8 @@ public class TestJdbcDriver2 {
       stmt.execute("set hive.metastore.transactional.event.listeners =" +
               " org.apache.hive.hcatalog.listener.DbNotificationListener");
       stmt.execute("set hive.metastore.dml.events = true");
+      stmt.execute("set hive.repl.cm.enabled = true");
+      stmt.execute("set hive.repl.cmrootdir = cmroot");
       stmt.execute("create database " + primaryDb + " with dbproperties('repl.source.for'='1,2,3')");
       stmt.execute("create table " + primaryTblName + " (id int)");
       stmt.execute("insert into " + primaryTblName + " values (1), (2)");
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java
index a3299ee..79beadd 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java
@@ -106,6 +106,7 @@ public class TestJdbcWithMiniHS2 {
     HiveConf conf = new HiveConf();
     dataFileDir = conf.get("test.data.files").replace('\\', '/').replace("c:", "");
     kvDataFilePath = new Path(dataFileDir, "kv1.txt");
+
     try {
       startMiniHS2(conf);
     } catch (Exception e) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/create/CreateTableOperation.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/create/CreateTableOperation.java
index cf4bc81..93c0209 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/create/CreateTableOperation.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/create/CreateTableOperation.java
@@ -67,7 +67,6 @@ public class CreateTableOperation extends DDLOperation<CreateTableDesc> {
         if (desc.getReplicationSpec().allowEventReplacementInto(existingTable.getParameters())) {
           desc.setReplaceMode(true); // we replace existing table.
           ReplicationSpec.copyLastReplId(existingTable.getParameters(), tbl.getParameters());
-
           // If location of an existing managed table is changed, then need to delete the old location if exists.
           // This scenario occurs when a managed table is converted into external table at source. In this case,
           // at target, the table data would be moved to different location under base directory for external tables.
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableArchiveOperation.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableArchiveOperation.java
index 5c5dec4..248fe0f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableArchiveOperation.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableArchiveOperation.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hive.ql.ddl.table.storage;
 
+import org.apache.hadoop.hive.metastore.ReplChangeManager;
 import org.apache.hadoop.hive.ql.ddl.DDLOperationContext;
 import org.apache.hadoop.hive.ql.exec.ArchiveUtils;
 import org.apache.hadoop.hive.ql.exec.Utilities;
@@ -308,7 +309,8 @@ public class AlterTableArchiveOperation extends DDLOperation<AlterTableArchiveDe
 
   private void deleteIntermediateOriginalDir(Table table, Path intermediateOriginalDir) throws HiveException {
     if (HdfsUtils.pathExists(intermediateOriginalDir, context.getConf())) {
-      AlterTableArchiveUtils.deleteDir(intermediateOriginalDir, context.getDb().getDatabase(table.getDbName()),
+      AlterTableArchiveUtils.deleteDir(intermediateOriginalDir,
+              ReplChangeManager.shouldEnableCm(context.getDb().getDatabase(table.getDbName()), table.getTTable()),
           context.getConf());
     }
   }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableArchiveUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableArchiveUtils.java
index ebac6f6..c285405 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableArchiveUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableArchiveUtils.java
@@ -25,7 +25,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.Warehouse;
-import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.exec.ArchiveUtils;
@@ -95,10 +94,10 @@ final class AlterTableArchiveUtils {
     return new Path(dir.getParent(), dir.getName() + intermediateDirSuffix);
   }
 
-  static void deleteDir(Path dir, Database db, Configuration conf) throws HiveException {
+  static void deleteDir(Path dir, boolean shouldEnableCm, Configuration conf) throws HiveException {
     try {
       Warehouse wh = new Warehouse(conf);
-      wh.deleteDir(dir, true, db);
+      wh.deleteDir(dir, true, false, shouldEnableCm);
     } catch (MetaException e) {
       throw new HiveException(e);
     }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableSetLocationOperation.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableSetLocationOperation.java
index 22a29d7..509d577 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableSetLocationOperation.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableSetLocationOperation.java
@@ -43,6 +43,7 @@ public class AlterTableSetLocationOperation extends AbstractAlterTableOperation<
   protected void doAlteration(Table table, Partition partition) throws HiveException {
     StorageDescriptor sd = getStorageDescriptor(table, partition);
     String newLocation = desc.getLocation();
+
     try {
       URI locUri = new URI(newLocation);
       if (!new Path(locUri).isAbsolute()) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableUnarchiveOperation.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableUnarchiveOperation.java
index 4f791a3..39416ed 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableUnarchiveOperation.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableUnarchiveOperation.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hive.ql.ddl.table.storage;
 
+import org.apache.hadoop.hive.metastore.ReplChangeManager;
 import org.apache.hadoop.hive.ql.ddl.DDLOperationContext;
 import org.apache.hadoop.hive.ql.exec.ArchiveUtils;
 import org.apache.hadoop.hive.ql.exec.ArchiveUtils.PartSpecInfo;
@@ -282,8 +283,9 @@ public class AlterTableUnarchiveOperation extends DDLOperation<AlterTableUnarchi
 
   private void deleteIntermediateArchivedDir(Table table, Path intermediateArchivedDir) throws HiveException {
     if (HdfsUtils.pathExists(intermediateArchivedDir, context.getConf())) {
-      AlterTableArchiveUtils.deleteDir(intermediateArchivedDir, context.getDb().getDatabase(table.getDbName()),
-          context.getConf());
+      AlterTableArchiveUtils.deleteDir(intermediateArchivedDir,
+              ReplChangeManager.shouldEnableCm(context.getDb().getDatabase(table.getDbName()), table.getTTable()),
+              context.getConf());
     }
   }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 7f061d4..945eafc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -2369,7 +2369,7 @@ public class Hive {
           // base_x.  (there is Insert Overwrite and Load Data Overwrite)
           boolean isAutoPurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge"));
           boolean needRecycle = !tbl.isTemporary()
-                  && ReplChangeManager.isSourceOfReplication(Hive.get().getDatabase(tbl.getDbName()));
+                  && ReplChangeManager.shouldEnableCm(Hive.get().getDatabase(tbl.getDbName()), tbl.getTTable());
           replaceFiles(tbl.getPath(), loadPath, destPath, oldPartPath, getConf(), isSrcLocal,
               isAutoPurge, newFiles, FileUtils.HIDDEN_FILES_PATH_FILTER, needRecycle, isManaged, isInsertOverwrite);
         } else {
@@ -3123,7 +3123,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
         //for fullAcid we don't want to delete any files even for OVERWRITE see HIVE-14988/HIVE-17361
         boolean isAutopurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge"));
         boolean needRecycle = !tbl.isTemporary()
-                && ReplChangeManager.isSourceOfReplication(Hive.get().getDatabase(tbl.getDbName()));
+                && ReplChangeManager.shouldEnableCm(Hive.get().getDatabase(tbl.getDbName()), tbl.getTTable());
         replaceFiles(tblPath, loadPath, destPath, tblPath, conf, isSrcLocal, isAutopurge,
             newFiles, FileUtils.HIDDEN_FILES_PATH_FILTER, needRecycle, isManaged, isInsertOverwrite);
       } else {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index 7b6ce10..dd97f3d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@ -467,7 +467,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
         needRecycle = false;
       } else {
         org.apache.hadoop.hive.metastore.api.Database db = x.getHive().getDatabase(table.getDbName());
-        needRecycle = db != null && ReplChangeManager.isSourceOfReplication(db);
+        needRecycle = db != null && ReplChangeManager.shouldEnableCm(db, table.getTTable());
       }
     } else {
       if (AcidUtils.isTransactionalTable(table) && !replicationSpec.isInReplicationScope()) {
@@ -613,7 +613,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
           needRecycle = false;
         } else {
           org.apache.hadoop.hive.metastore.api.Database db = x.getHive().getDatabase(table.getDbName());
-          needRecycle = db != null && ReplChangeManager.isSourceOfReplication(db);
+          needRecycle = db != null && ReplChangeManager.shouldEnableCm(db, table.getTTable());
         }
       } else {
         loadFileType = replicationSpec.isReplace() ?
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
index 9ba2b24..54b616e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
@@ -260,11 +260,11 @@ public class Cleaner extends MetaStoreCompactorThread {
 
     FileSystem fs = filesToDelete.get(0).getFileSystem(conf);
     Database db = getMSForConf(conf).getDatabase(getDefaultCatalog(conf), ci.dbname);
-    boolean isSourceOfRepl = ReplChangeManager.isSourceOfReplication(db);
+    Table table = getMSForConf(conf).getTable(getDefaultCatalog(conf), ci.dbname, ci.tableName);
 
     for (Path dead : filesToDelete) {
       LOG.debug("Going to delete path " + dead.toString());
-      if (isSourceOfRepl) {
+      if (ReplChangeManager.shouldEnableCm(db, table)) {
         replChangeManager.recycle(dead, ReplChangeManager.RecycleType.MOVE, true);
       }
       fs.delete(dead, true);
diff --git a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
index 23e7d5e..2eafef0 100644
--- a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
+++ b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
@@ -1242,7 +1242,7 @@ public class Hadoop23Shims extends HadoopShimsSecure {
       return (getEncryptionZoneForPath(fullPath) != null);
     }
 
-    private EncryptionZone getEncryptionZoneForPath(Path path) throws IOException {
+    public EncryptionZone getEncryptionZoneForPath(Path path) throws IOException {
       if (path.getFileSystem(conf).exists(path)) {
         return hdfsAdmin.getEncryptionZoneForPath(path);
       } else if (!path.getParent().equals(path)) {
diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
index 49a2ab3..f71f5a5 100644
--- a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
+++ b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.protocol.EncryptionZone;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.mapred.ClusterStatus;
 import org.apache.hadoop.mapred.JobConf;
@@ -564,7 +565,7 @@ public interface HadoopShims {
     public int comparePathKeyStrength(Path path1, Path path2) throws IOException;
 
     /**
-     * create encryption zone by path and keyname
+     * Create encryption zone by path and keyname.
      * @param path HDFS path to create encryption zone
      * @param keyName keyname
      * @throws IOException
@@ -573,6 +574,13 @@ public interface HadoopShims {
     public void createEncryptionZone(Path path, String keyName) throws IOException;
 
     /**
+     * Get encryption zone by path.
+     * @param path HDFS path to create encryption zone.
+     * @throws IOException
+     */
+    EncryptionZone getEncryptionZoneForPath(Path path) throws IOException;
+
+    /**
      * Creates an encryption key.
      *
      * @param keyName Name of the key
@@ -625,6 +633,11 @@ public interface HadoopShims {
     }
 
     @Override
+    public EncryptionZone getEncryptionZoneForPath(Path path) throws IOException {
+      return null;
+    }
+
+    @Override
     public void createKey(String keyName, int bitLength) {
     /* not supported */
     }
diff --git a/standalone-metastore/metastore-common/pom.xml b/standalone-metastore/metastore-common/pom.xml
index e252f12..81dc6b6 100644
--- a/standalone-metastore/metastore-common/pom.xml
+++ b/standalone-metastore/metastore-common/pom.xml
@@ -256,6 +256,11 @@
       <artifactId>mockito-core</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-shims</artifactId>
+      <version>${project.version}</version>
+    </dependency>
   </dependencies>
 
   <profiles>
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
index c6acc57..1041d92 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
@@ -19,11 +19,13 @@
 package org.apache.hadoop.hive.metastore;
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.lang3.concurrent.BasicThreadFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileChecksum;
@@ -36,11 +38,18 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
 import org.apache.hadoop.hive.metastore.utils.FileUtils;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.utils.Retry;
 import org.apache.hadoop.hive.metastore.utils.StringUtils;
+import org.apache.hadoop.hive.shims.HadoopShims;
+import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.hive.shims.HadoopShims.HdfsEncryptionShim;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,7 +59,8 @@ public class ReplChangeManager {
 
   private static boolean inited = false;
   private static boolean enabled = false;
-  private static Path cmroot;
+  private static Map<String, String> encryptionZones = new HashMap<>();
+  private static HadoopShims hadoopShims = ShimLoader.getHadoopShims();
   private static Configuration conf;
   private String msUser;
   private String msGroup;
@@ -61,6 +71,10 @@ public class ReplChangeManager {
   public static final String SOURCE_OF_REPLICATION = "repl.source.for";
   private static final String TXN_WRITE_EVENT_FILE_SEPARATOR = "]";
   static final String CM_THREAD_NAME_PREFIX = "cmclearer-";
+  private static final String NO_ENCRYPTION = "noEncryption";
+  private static String cmRootDir;
+  private static String encryptedCmRootDir;
+  private static String fallbackNonEncryptedCmRootDir;
 
   public enum RecycleType {
     MOVE,
@@ -138,14 +152,27 @@ public class ReplChangeManager {
       if (!inited) {
         if (MetastoreConf.getBoolVar(conf, ConfVars.REPLCMENABLED)) {
           ReplChangeManager.enabled = true;
-          ReplChangeManager.cmroot = new Path(MetastoreConf.getVar(conf, ConfVars.REPLCMDIR));
           ReplChangeManager.conf = conf;
-
-          FileSystem cmFs = cmroot.getFileSystem(conf);
-          // Create cmroot with permission 700 if not exist
-          if (!cmFs.exists(cmroot)) {
-            cmFs.mkdirs(cmroot);
-            cmFs.setPermission(cmroot, new FsPermission("700"));
+          cmRootDir = MetastoreConf.getVar(conf, ConfVars.REPLCMDIR);
+          encryptedCmRootDir = MetastoreConf.getVar(conf, ConfVars.REPLCMENCRYPTEDDIR);
+          fallbackNonEncryptedCmRootDir = MetastoreConf.getVar(conf, ConfVars.REPLCMFALLBACKNONENCRYPTEDDIR);
+          //Create default cm root
+          Path cmroot = new Path(cmRootDir);
+          createCmRoot(cmroot);
+          FileSystem cmRootFs = cmroot.getFileSystem(conf);
+          HdfsEncryptionShim pathEncryptionShim = hadoopShims
+                  .createHdfsEncryptionShim(cmRootFs, conf);
+          Path cmRootEncrypted = new Path(encryptedCmRootDir);
+          if (cmRootEncrypted.isAbsolute()) {
+            throw new MetaException(ConfVars.REPLCMENCRYPTEDDIR.getHiveName() + " should be a relative path");
+          }
+          if (pathEncryptionShim.isPathEncrypted(cmroot)) {
+            //If cm root is encrypted we keep using it for the encryption zone
+            String encryptionZonePath = cmRootFs.getUri()
+                    + pathEncryptionShim.getEncryptionZoneForPath(cmroot).getPath();
+            encryptionZones.put(encryptionZonePath, cmRootDir);
+          } else {
+            encryptionZones.put(NO_ENCRYPTION, cmRootDir);
           }
           UserGroupInformation usergroupInfo = UserGroupInformation.getCurrentUser();
           msUser = usergroupInfo.getShortUserName();
@@ -194,7 +221,7 @@ public class ReplChangeManager {
       }
     } else {
       String fileCheckSum = checksumFor(path, fs);
-      Path cmPath = getCMPath(conf, path.getName(), fileCheckSum, cmroot.toString());
+      Path cmPath = getCMPath(conf, path.getName(), fileCheckSum, getCmRoot(path).toString());
 
       // set timestamp before moving to cmroot, so we can
       // avoid race condition CM remove the file before setting
@@ -213,9 +240,18 @@ public class ReplChangeManager {
         switch (type) {
         case MOVE: {
           LOG.info("Moving {} to {}", path.toString(), cmPath.toString());
-
           // Rename fails if the file with same name already exist.
-          success = fs.rename(path, cmPath);
+          Retry<Boolean> retriable = new Retry<Boolean>(IOException.class) {
+            @Override
+            public Boolean execute() throws IOException {
+              return fs.rename(path, cmPath);
+            }
+          };
+          try {
+            success = retriable.run();
+          } catch (Exception e) {
+            throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
+          }
           break;
         }
         case COPY: {
@@ -361,9 +397,10 @@ public class ReplChangeManager {
       throw new IllegalStateException("Uninitialized ReplChangeManager instance.");
     }
     String encodedUri = fileUriStr;
-    if ((fileChecksum != null) && (cmroot != null)) {
+    Path cmRoot = getCmRoot(new Path(fileUriStr));
+    if ((fileChecksum != null) && (cmRoot != null)) {
       encodedUri = encodedUri + URI_FRAGMENT_SEPARATOR + fileChecksum
-              + URI_FRAGMENT_SEPARATOR + FileUtils.makeQualified(cmroot, conf);
+              + URI_FRAGMENT_SEPARATOR + FileUtils.makeQualified(cmRoot, conf);
     } else {
       encodedUri = encodedUri + URI_FRAGMENT_SEPARATOR + URI_FRAGMENT_SEPARATOR;
     }
@@ -404,12 +441,12 @@ public class ReplChangeManager {
    * Thread to clear old files of cmroot recursively
    */
   static class CMClearer implements Runnable {
-    private Path cmroot;
+    private Map<String, String> encryptionZones;
     private long secRetain;
     private Configuration conf;
 
-    CMClearer(String cmrootString, long secRetain, Configuration conf) {
-      this.cmroot = new Path(cmrootString);
+    CMClearer(Map<String, String> encryptionZones, long secRetain, Configuration conf) {
+      this.encryptionZones = encryptionZones;
       this.secRetain = secRetain;
       this.conf = conf;
     }
@@ -418,32 +455,34 @@ public class ReplChangeManager {
     public void run() {
       try {
         LOG.info("CMClearer started");
-
-        long now = System.currentTimeMillis();
-        FileSystem fs = cmroot.getFileSystem(conf);
-        FileStatus[] files = fs.listStatus(cmroot);
-
-        for (FileStatus file : files) {
-          long modifiedTime = file.getModificationTime();
-          if (now - modifiedTime > secRetain*1000) {
-            try {
-              if (fs.getXAttrs(file.getPath()).containsKey(REMAIN_IN_TRASH_TAG)) {
-                boolean succ = Trash.moveToAppropriateTrash(fs, file.getPath(), conf);
-                if (succ) {
-                  LOG.debug("Move " + file.toString() + " to trash");
+        for (String cmrootString : encryptionZones.values()) {
+          Path cmroot = new Path(cmrootString);
+          long now = System.currentTimeMillis();
+          FileSystem fs = cmroot.getFileSystem(conf);
+          FileStatus[] files = fs.listStatus(cmroot);
+
+          for (FileStatus file : files) {
+            long modifiedTime = file.getModificationTime();
+            if (now - modifiedTime > secRetain * 1000) {
+              try {
+                if (fs.getXAttrs(file.getPath()).containsKey(REMAIN_IN_TRASH_TAG)) {
+                  boolean succ = Trash.moveToAppropriateTrash(fs, file.getPath(), conf);
+                  if (succ) {
+                    LOG.debug("Move " + file.toString() + " to trash");
+                  } else {
+                    LOG.warn("Fail to move " + file.toString() + " to trash");
+                  }
                 } else {
-                  LOG.warn("Fail to move " + file.toString() + " to trash");
-                }
-              } else {
-                boolean succ = fs.delete(file.getPath(), false);
-                if (succ) {
-                  LOG.debug("Remove " + file.toString());
-                } else {
-                  LOG.warn("Fail to remove " + file.toString());
+                  boolean succ = fs.delete(file.getPath(), false);
+                  if (succ) {
+                    LOG.debug("Remove " + file.toString());
+                  } else {
+                    LOG.warn("Fail to remove " + file.toString());
+                  }
                 }
+              } catch (UnsupportedOperationException e) {
+                LOG.warn("Error getting xattr for " + file.getPath().toString());
               }
-            } catch (UnsupportedOperationException e) {
-              LOG.warn("Error getting xattr for " + file.getPath().toString());
             }
           }
         }
@@ -461,12 +500,17 @@ public class ReplChangeManager {
           .namingPattern(CM_THREAD_NAME_PREFIX + "%d")
           .daemon(true)
           .build());
-      executor.scheduleAtFixedRate(new CMClearer(MetastoreConf.getVar(conf, ConfVars.REPLCMDIR),
-          MetastoreConf.getTimeVar(conf, ConfVars.REPLCMRETIAN, TimeUnit.SECONDS), conf),
-          0, MetastoreConf.getTimeVar(conf, ConfVars.REPLCMINTERVAL, TimeUnit.SECONDS), TimeUnit.SECONDS);
+      executor.scheduleAtFixedRate(new CMClearer(encryptionZones,
+                      MetastoreConf.getTimeVar(conf, ConfVars.REPLCMRETIAN, TimeUnit.SECONDS), conf),
+              0, MetastoreConf.getTimeVar(conf, ConfVars.REPLCMINTERVAL, TimeUnit.SECONDS), TimeUnit.SECONDS);
     }
   }
 
+  public static boolean shouldEnableCm(Database db, Table table) {
+    assert (table != null);
+    return isSourceOfReplication(db) && !MetaStoreUtils.isExternalTable(table);
+  }
+
   public static boolean isSourceOfReplication(Database db) {
     assert (db != null);
     String replPolicyIds = getReplPolicyIdString(db);
@@ -493,4 +537,63 @@ public class ReplChangeManager {
   public static String[] getListFromSeparatedString(String commaSeparatedString) {
     return commaSeparatedString.split("\\s*" + TXN_WRITE_EVENT_FILE_SEPARATOR + "\\s*");
   }
+
+  @VisibleForTesting
+  static Path getCmRoot(Path path) throws IOException {
+    Path cmroot = null;
+    //Default path if hive.repl.cm dir is encrypted
+    String cmrootDir = fallbackNonEncryptedCmRootDir;
+    String encryptionZonePath = NO_ENCRYPTION;
+    if (enabled) {
+      HdfsEncryptionShim pathEncryptionShim = hadoopShims.createHdfsEncryptionShim(path.getFileSystem(conf), conf);
+      if (pathEncryptionShim.isPathEncrypted(path)) {
+        encryptionZonePath = path.getFileSystem(conf).getUri()
+                + pathEncryptionShim.getEncryptionZoneForPath(path).getPath();
+        //For encryption zone, create cm at the relative path specified by hive.repl.cm.encryptionzone.rootdir
+        //at the root of the encryption zone
+        cmrootDir = encryptionZonePath + Path.SEPARATOR + encryptedCmRootDir;
+      }
+      if (encryptionZones.containsKey(encryptionZonePath)) {
+        cmroot = new Path(encryptionZones.get(encryptionZonePath));
+      } else {
+        cmroot = new Path(cmrootDir);
+        synchronized (instance) {
+          if (!encryptionZones.containsKey(encryptionZonePath)) {
+            createCmRoot(cmroot);
+            encryptionZones.put(encryptionZonePath, cmrootDir);
+          }
+        }
+      }
+    }
+    return cmroot;
+  }
+
+  private static void createCmRoot(Path cmroot) throws IOException {
+    FileSystem cmFs = cmroot.getFileSystem(conf);
+    // Create cmroot with permission 700 if not exist
+    if (!cmFs.exists(cmroot)) {
+      cmFs.mkdirs(cmroot);
+      cmFs.setPermission(cmroot, new FsPermission("700"));
+    }
+  }
+
+  @VisibleForTesting
+  static void resetReplChangeManagerInstance() {
+    inited = false;
+    enabled = false;
+    instance = null;
+    encryptionZones.clear();
+  }
+
+  public static final PathFilter CMROOT_PATH_FILTER = new PathFilter() {
+    @Override
+    public boolean accept(Path p) {
+      if (enabled) {
+        String name = p.getName();
+        return !name.contains(cmRootDir) && !name.contains(encryptedCmRootDir)
+                && !name.contains(fallbackNonEncryptedCmRootDir);
+      }
+      return true;
+    }
+  };
 }
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
index bc87e8f..2aeb374 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
@@ -917,6 +917,11 @@ public class MetastoreConf {
             "This class is used to store and retrieval of raw metadata objects such as table, database"),
     REPLCMDIR("metastore.repl.cmrootdir", "hive.repl.cmrootdir", "/user/${system:user.name}/cmroot/",
         "Root dir for ChangeManager, used for deleted files."),
+    REPLCMENCRYPTEDDIR("metastore.repl.cm.encryptionzone.rootdir", "hive.repl.cm.encryptionzone.rootdir", ".cmroot",
+            "Root dir for ChangeManager if encryption zones are enabled, used for deleted files."),
+    REPLCMFALLBACKNONENCRYPTEDDIR("metastore.repl.cm.nonencryptionzone.rootdir",
+            "hive.repl.cm.nonencryptionzone.rootdir", "/user/${system:user.name}/cmroot/",
+            "Root dir for ChangeManager for non encrypted paths if hive.repl.cmrootdir is encrypted."),
     REPLCMRETIAN("metastore.repl.cm.retain", "hive.repl.cm.retain",  24, TimeUnit.HOURS,
         "Time to retain removed files in cmrootdir."),
     REPLCMINTERVAL("metastore.repl.cm.interval", "hive.repl.cm.interval", 3600, TimeUnit.SECONDS,
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/Retry.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/Retry.java
new file mode 100644
index 0000000..bdb269a
--- /dev/null
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/Retry.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.utils;
+
+/**
+ * Class to implement any retry logic in case of exceptions.
+ */
+public abstract class Retry<T> {
+
+  public static final int MAX_RETRIES = 3;
+  private int tries = 0;
+  private Class retryExceptionType;
+
+  public Retry(Class exceptionClassType) {
+    this.retryExceptionType = exceptionClassType;
+  }
+
+  public abstract T execute() throws Exception;
+
+  public T run() throws Exception {
+    try {
+      return execute();
+    } catch(Exception e) {
+      if (e.getClass().equals(retryExceptionType)){
+        tries++;
+        if (MAX_RETRIES == tries) {
+          throw e;
+        } else {
+          return run();
+        }
+      } else {
+        throw e;
+      }
+    }
+  }
+}
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/package-info.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/package-info.java
new file mode 100644
index 0000000..2eb51c8
--- /dev/null
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package consisting the utility methods for metastore.
+ */
+package org.apache.hadoop.hive.metastore.utils;
diff --git a/standalone-metastore/metastore-common/src/test/java/org/apache/hadoop/hive/metastore/utils/RetryTest.java b/standalone-metastore/metastore-common/src/test/java/org/apache/hadoop/hive/metastore/utils/RetryTest.java
new file mode 100644
index 0000000..67bd658
--- /dev/null
+++ b/standalone-metastore/metastore-common/src/test/java/org/apache/hadoop/hive/metastore/utils/RetryTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.utils;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests for retriable interface.
+ */
+public class RetryTest {
+  @Test
+  public void testRetrySuccess() {
+    Retry<Void> retriable = new Retry<Void>(NullPointerException.class) {
+      @Override
+      public Void execute() {
+        throw new NullPointerException();
+      }
+    };
+    try {
+      retriable.run();
+    } catch (Exception e) {
+      Assert.assertEquals(NullPointerException.class, e.getClass());
+    }
+  }
+
+  @Test
+  public void testRetryFailure() {
+    Retry<Void> retriable = new Retry<Void>(NullPointerException.class) {
+      @Override
+      public Void execute() {
+        throw new RuntimeException();
+      }
+    };
+    try {
+      retriable.run();
+    } catch (Exception e) {
+      Assert.assertEquals(RuntimeException.class, e.getClass());
+    }
+  }
+}
diff --git a/standalone-metastore/metastore-common/src/test/java/org/apache/hadoop/hive/metastore/utils/package-info.java b/standalone-metastore/metastore-common/src/test/java/org/apache/hadoop/hive/metastore/utils/package-info.java
new file mode 100644
index 0000000..2eb51c8
--- /dev/null
+++ b/standalone-metastore/metastore-common/src/test/java/org/apache/hadoop/hive/metastore/utils/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package consisting the utility methods for metastore.
+ */
+package org.apache.hadoop.hive.metastore.utils;
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java
index dda407a..8d77ffe 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java
@@ -272,7 +272,7 @@ public class HiveAlterHandler implements AlterHandler {
               }
               // check that src exists and also checks permissions necessary, rename src to dest
               if (srcFs.exists(srcPath) && wh.renameDir(srcPath, destPath,
-                      ReplChangeManager.isSourceOfReplication(olddb))) {
+                      ReplChangeManager.shouldEnableCm(olddb, oldt))) {
                 dataWasMoved = true;
               }
             } catch (IOException | MetaException e) {
@@ -428,7 +428,8 @@ public class HiveAlterHandler implements AlterHandler {
           Path deleteOldDataLoc = new Path(oldt.getSd().getLocation());
           boolean isAutoPurge = "true".equalsIgnoreCase(oldt.getParameters().get("auto.purge"));
           try {
-            wh.deleteDir(deleteOldDataLoc, true, isAutoPurge, olddb);
+            wh.deleteDir(deleteOldDataLoc, true, isAutoPurge,
+                    ReplChangeManager.shouldEnableCm(olddb, oldt));
             LOG.info("Deleted the old data location: {} for the table: {}",
                     deleteOldDataLoc, dbname + "." + name);
           } catch (MetaException ex) {
@@ -651,7 +652,7 @@ public class HiveAlterHandler implements AlterHandler {
               }
 
               //rename the data directory
-              wh.renameDir(srcPath, destPath, ReplChangeManager.isSourceOfReplication(db));
+              wh.renameDir(srcPath, destPath, ReplChangeManager.shouldEnableCm(db, tbl));
               LOG.info("Partition directory rename from " + srcPath + " to " + destPath + " done.");
               dataWasMoved = true;
             }
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index b8418c5..ee9f988 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -2228,7 +2228,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         if (!success) {
           ms.rollbackTransaction();
           if (madeDir) {
-            wh.deleteDir(tblPath, true, db);
+            wh.deleteDir(tblPath, true, false, ReplChangeManager.shouldEnableCm(db, tbl));
           }
         }
 
@@ -2806,10 +2806,9 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         } else if (tableDataShouldBeDeleted) {
           // Data needs deletion. Check if trash may be skipped.
           // Delete the data in the partitions which have other locations
-          deletePartitionData(partPaths, ifPurge, db);
+          deletePartitionData(partPaths, ifPurge, ReplChangeManager.shouldEnableCm(db, tbl));
           // Delete the data in the table
-          deleteTableData(tblPath, ifPurge, db);
-          // ok even if the data is not deleted
+          deleteTableData(tblPath, ifPurge, ReplChangeManager.shouldEnableCm(db, tbl));
         }
 
         if (!listeners.isEmpty()) {
@@ -2837,16 +2836,49 @@ public class HiveMetaStore extends ThriftHiveMetastore {
      * @param tablePath
      * @param ifPurge completely purge the table (skipping trash) while removing
      *                data from warehouse
-     * @param db database the table belongs to
+     * @param shouldEnableCm If cm should be enabled
      */
-    private void deleteTableData(Path tablePath, boolean ifPurge, Database db) {
+    private void deleteTableData(Path tablePath, boolean ifPurge, boolean shouldEnableCm) {
+      if (tablePath != null) {
+        try {
+          if (shouldEnableCm) {
+            //Don't delete cmdir if its inside the table path
+            FileStatus[] statuses = tablePath.getFileSystem(conf).listStatus(tablePath,
+                    ReplChangeManager.CMROOT_PATH_FILTER);
+            for (final FileStatus status : statuses) {
+              wh.deleteDir(status.getPath(), true, ifPurge, shouldEnableCm);
+            }
+            //Check if table directory is empty, delete it
+            FileStatus[] statusWithoutFilter = tablePath.getFileSystem(conf).listStatus(tablePath);
+            if (statusWithoutFilter.length == 0) {
+              wh.deleteDir(tablePath, true, ifPurge, shouldEnableCm);
+            }
+          } else {
+            //If no cm delete the complete table directory
+            wh.deleteDir(tablePath, true, ifPurge, shouldEnableCm);
+          }
+        } catch (Exception e) {
+          LOG.error("Failed to delete table directory: " + tablePath +
+                  " " + e.getMessage());
+        }
+      }
+    }
 
+    /**
+     * Deletes the data in a table's location, if it fails logs an error.
+     *
+     * @param tablePath
+     * @param ifPurge completely purge the table (skipping trash) while removing
+     *                data from warehouse
+     * @param db Database
+     */
+    private void deleteTableData(Path tablePath, boolean ifPurge, Database db) {
       if (tablePath != null) {
         try {
           wh.deleteDir(tablePath, true, ifPurge, db);
         } catch (Exception e) {
           LOG.error("Failed to delete table directory: " + tablePath +
-              " " + e.getMessage());
+                  " " + e.getMessage());
         }
       }
     }
@@ -2858,8 +2890,45 @@ public class HiveMetaStore extends ThriftHiveMetastore {
     * @param partPaths
     * @param ifPurge completely purge the partition (skipping trash) while
     *                removing data from warehouse
-    * @param db database the partition belongs to
+    * @param shouldEnableCm If cm should be enabled
     */
+    private void deletePartitionData(List<Path> partPaths, boolean ifPurge, boolean shouldEnableCm) {
+      if (partPaths != null && !partPaths.isEmpty()) {
+        for (Path partPath : partPaths) {
+          try {
+            if (shouldEnableCm) {
+              //Don't delete cmdir if its inside the partition path
+              FileStatus[] statuses = partPath.getFileSystem(conf).listStatus(partPath,
+                      ReplChangeManager.CMROOT_PATH_FILTER);
+              for (final FileStatus status : statuses) {
+                wh.deleteDir(status.getPath(), true, ifPurge, shouldEnableCm);
+              }
+              //Check if table directory is empty, delete it
+              FileStatus[] statusWithoutFilter = partPath.getFileSystem(conf).listStatus(partPath);
+              if (statusWithoutFilter.length == 0) {
+                wh.deleteDir(partPath, true, ifPurge, shouldEnableCm);
+              }
+            } else {
+              //If no cm delete the complete table directory
+              wh.deleteDir(partPath, true, ifPurge, shouldEnableCm);
+            }
+          } catch (Exception e) {
+            LOG.error("Failed to delete partition directory: " + partPath +
+                " " + e.getMessage());
+          }
+        }
+      }
+    }
+
+    /**
+     * Give a list of partitions' locations, tries to delete each one
+     * and for each that fails logs an error.
+     *
+     * @param partPaths
+     * @param ifPurge completely purge the partition (skipping trash) while
+     *                removing data from warehouse
+     * @param db Database
+     */
     private void deletePartitionData(List<Path> partPaths, boolean ifPurge, Database db) {
       if (partPaths != null && !partPaths.isEmpty()) {
         for (Path partPath : partPaths) {
@@ -2867,7 +2936,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
             wh.deleteDir(partPath, true, ifPurge, db);
           } catch (Exception e) {
             LOG.error("Failed to delete partition directory: " + partPath +
-                " " + e.getMessage());
+                    " " + e.getMessage());
           }
         }
       }
@@ -3125,7 +3194,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
             HdfsUtils.HadoopFileStatus status = new HdfsUtils.HadoopFileStatus(getConf(), fs, location);
             FileStatus targetStatus = fs.getFileStatus(location);
             String targetGroup = targetStatus == null ? null : targetStatus.getGroup();
-            wh.deleteDir(location, true, isAutopurge, db);
+            wh.deleteDir(location, true, isAutopurge, ReplChangeManager.shouldEnableCm(db, tbl));
             fs.mkdirs(location);
             HdfsUtils.setFullFileStatus(getConf(), status, targetGroup, fs, location, false);
           } else {
@@ -3134,7 +3203,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
               continue;
             }
             for (final FileStatus status : statuses) {
-              wh.deleteDir(status.getPath(), true, isAutopurge, db);
+              wh.deleteDir(status.getPath(), true, isAutopurge, ReplChangeManager.shouldEnableCm(db, tbl));
             }
           }
         }
@@ -3619,7 +3688,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         if (!success) {
           ms.rollbackTransaction();
           if (madeDir) {
-            wh.deleteDir(partLocation, true, db);
+            wh.deleteDir(partLocation, true, false, ReplChangeManager.shouldEnableCm(db, tbl));
           }
         }
 
@@ -4352,8 +4421,8 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           success = ms.addPartition(part);
         } finally {
           if (!success && madeDir) {
-            wh.deleteDir(new Path(part.getSd().getLocation()), true,
-                    ms.getDatabase(tbl.getCatName(), tbl.getDbName()));
+            wh.deleteDir(new Path(part.getSd().getLocation()), true, false,
+                    ReplChangeManager.shouldEnableCm(ms.getDatabase(part.getCatName(), part.getDbName()), tbl));
           }
         }
 
@@ -4623,7 +4692,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       Path archiveParentDir = null;
       boolean mustPurge = false;
       boolean tableDataShouldBeDeleted = false;
-      boolean isSourceOfReplication = false;
+      boolean needsCm = false;
       Map<String, String> transactionalListenerResponses = Collections.emptyMap();
 
       if (db_name == null) {
@@ -4671,7 +4740,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
                                                       new DropPartitionEvent(tbl, part, true, deleteData, this),
                                                       envContext);
           }
-          isSourceOfReplication = ReplChangeManager.isSourceOfReplication(ms.getDatabase(catName, db_name));
+          needsCm = ReplChangeManager.shouldEnableCm(ms.getDatabase(catName, db_name), tbl);
           success = ms.commitTransaction();
         }
       } finally {
@@ -4690,11 +4759,11 @@ public class HiveMetaStore extends ThriftHiveMetastore {
 
             if (isArchived) {
               assert (archiveParentDir != null);
-              wh.deleteDir(archiveParentDir, true, mustPurge, isSourceOfReplication);
+              wh.deleteDir(archiveParentDir, true, mustPurge, needsCm);
             } else {
               assert (partPath != null);
-              wh.deleteDir(partPath, true, mustPurge, isSourceOfReplication);
-              deleteParentRecursive(partPath.getParent(), part_vals.size() - 1, mustPurge, isSourceOfReplication);
+              wh.deleteDir(partPath, true, mustPurge, needsCm);
+              deleteParentRecursive(partPath.getParent(), part_vals.size() - 1, mustPurge, needsCm);
             }
             // ok even if the data is not deleted
           }
@@ -4770,7 +4839,8 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       List<Partition> parts = null;
       boolean mustPurge = false;
       List<Map<String, String>> transactionalListenerResponses = Lists.newArrayList();
-      boolean isSourceOfReplication = ReplChangeManager.isSourceOfReplication(ms.getDatabase(catName, dbName));
+      boolean needsCm = ReplChangeManager.shouldEnableCm(ms.getDatabase(catName, dbName),
+              ms.getTable(catName, dbName, tblName));
 
       try {
         // We need Partition-s for firing events and for result; DN needs MPartition-s to drop.
@@ -4878,12 +4948,12 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           // Archived partitions have har:/to_har_file as their location.
           // The original directory was saved in params
           for (Path path : archToDelete) {
-            wh.deleteDir(path, true, mustPurge, isSourceOfReplication);
+            wh.deleteDir(path, true, mustPurge, needsCm);
           }
           for (PathAndPartValSize p : dirsToDelete) {
-            wh.deleteDir(p.path, true, mustPurge, isSourceOfReplication);
+            wh.deleteDir(p.path, true, mustPurge, needsCm);
             try {
-              deleteParentRecursive(p.path.getParent(), p.partValSize - 1, mustPurge, isSourceOfReplication);
+              deleteParentRecursive(p.path.getParent(), p.partValSize - 1, mustPurge, needsCm);
             } catch (IOException ex) {
               LOG.warn("Error from deleteParentRecursive", ex);
               throw new MetaException("Failed to delete parent: " + ex.getMessage());
@@ -7785,7 +7855,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         if (func == null) {
           throw new NoSuchObjectException("Function " + funcName + " does not exist");
         }
-        Boolean isSourceOfReplication =
+        Boolean needsCm =
               ReplChangeManager.isSourceOfReplication(get_database_core(parsedDbName[CAT_NAME], parsedDbName[DB_NAME]));
 
         // if copy of jar to change management fails we fail the metastore transaction, since the
@@ -7793,7 +7863,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         // a copy is required to allow incremental replication to work correctly.
         if (func.getResourceUris() != null && !func.getResourceUris().isEmpty()) {
           for (ResourceUri uri : func.getResourceUris()) {
-            if (uri.getUri().toLowerCase().startsWith("hdfs:") && isSourceOfReplication) {
+            if (uri.getUri().toLowerCase().startsWith("hdfs:") && needsCm) {
               wh.addToChangeManagement(new Path(uri.getUri()));
             }
           }