You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sa...@apache.org on 2018/06/05 05:40:16 UTC

hive git commit: HIVE-19488 : Enable CM root based on db parameter, identifying db as a source of replication (Mahesh Kumar Behera, reviewed by Sankar Hariappan)

Repository: hive
Updated Branches:
  refs/heads/branch-3 bda5d51dd -> 752ba6824


HIVE-19488 : Enable CM root based on db parameter, identifying db as a source of replication (Mahesh Kumar Behera, reviewed by Sankar Hariappan)

Signed-off-by: Sankar Hariappan <sa...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/752ba682
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/752ba682
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/752ba682

Branch: refs/heads/branch-3
Commit: 752ba68249841b2949ba34574184cdf0ad7c1911
Parents: bda5d51
Author: Mahesh Kumar Behera <mb...@hortonworks.com>
Authored: Tue May 29 22:12:07 2018 +0530
Committer: Sankar Hariappan <sa...@apache.org>
Committed: Tue Jun 5 11:09:02 2018 +0530

----------------------------------------------------------------------
 .../hive/metastore/TestReplChangeManager.java   |   3 +
 .../hadoop/hive/ql/parse/TestCopyUtils.java     |   4 +-
 .../TestReplicationOnHDFSEncryptedZones.java    |   4 +-
 .../hive/ql/parse/TestReplicationScenarios.java |  90 ++++++++++--
 .../TestReplicationScenariosAcidTables.java     |   4 +-
 ...TestReplicationScenariosAcrossInstances.java |  18 ++-
 .../TestHiveAuthorizerCheckInvocation.java      |   4 +-
 .../compactor/TestCleanerWithReplication.java   |  34 +++--
 .../apache/hive/jdbc/TestJdbcWithMiniHS2.java   |   4 +-
 .../org/apache/hadoop/hive/ql/exec/DDLTask.java |   8 +-
 .../apache/hadoop/hive/ql/metadata/Hive.java    |   9 +-
 .../hive/ql/parse/MetaDataExportListener.java   |   6 +-
 .../ql/parse/ReplicationSemanticAnalyzer.java   |  23 ++-
 .../hadoop/hive/ql/txn/compactor/Cleaner.java   |  10 +-
 .../clientnegative/repl_dump_requires_admin.q   |   2 +-
 .../clientnegative/repl_load_requires_admin.q   |   2 +-
 .../repl_dump_requires_admin.q.out              |   4 +-
 .../repl_load_requires_admin.q.out              |   4 +-
 .../hadoop/hive/metastore/HiveAlterHandler.java |  11 +-
 .../hadoop/hive/metastore/HiveMetaStore.java    | 144 +++++++++++--------
 .../hive/metastore/ReplChangeManager.java       |  23 +++
 .../apache/hadoop/hive/metastore/Warehouse.java |  10 +-
 .../hadoop/hive/metastore/model/MDatabase.java  |  21 ++-
 23 files changed, 317 insertions(+), 125 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/752ba682/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java
index e63250c..235bd11 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.ReplChangeManager.RecycleType;
+import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Partition;
@@ -119,6 +120,7 @@ public class TestReplChangeManager {
     client.dropDatabase(dbName, true, true);
 
     Database db = new Database();
+    db.putToParameters(SOURCE_OF_REPLICATION, "1,2,3");
     db.setName(dbName);
     client.createDatabase(db);
 
@@ -204,6 +206,7 @@ public class TestReplChangeManager {
     client.dropDatabase(dbName, true, true);
 
     Database db = new Database();
+    db.putToParameters(SOURCE_OF_REPLICATION, "1, 2, 3");
     db.setName(dbName);
     client.createDatabase(db);
 

http://git-wip-us.apache.org/repos/asf/hive/blob/752ba682/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestCopyUtils.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestCopyUtils.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestCopyUtils.java
index f14b430..0e0a5cc 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestCopyUtils.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestCopyUtils.java
@@ -42,6 +42,7 @@ import java.util.Map;
 
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.shims.HadoopShims.MiniMrShim;
+import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
 
 public class TestCopyUtils {
   @Rule
@@ -110,7 +111,8 @@ public class TestCopyUtils {
     replV1BackwardCompat = primary.getReplivationV1CompatRule(new ArrayList<>());
     primaryDbName = testName.getMethodName() + "_" + +System.currentTimeMillis();
     replicatedDbName = "replicated_" + primaryDbName;
-    primary.run("create database " + primaryDbName);
+    primary.run("create database " + primaryDbName + " WITH DBPROPERTIES ( '" +
+            SOURCE_OF_REPLICATION + "' = '1,2,3')");
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hive/blob/752ba682/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOnHDFSEncryptedZones.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOnHDFSEncryptedZones.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOnHDFSEncryptedZones.java
index d8d210b..1846c21 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOnHDFSEncryptedZones.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOnHDFSEncryptedZones.java
@@ -39,6 +39,7 @@ import java.io.IOException;
 import java.util.HashMap;
 
 import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTORE_AGGREGATE_STATS_CACHE_ENABLED;
+import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
 
 public class TestReplicationOnHDFSEncryptedZones {
   private static String jksFile = System.getProperty("java.io.tmpdir") + "/test.jks";
@@ -83,7 +84,8 @@ public class TestReplicationOnHDFSEncryptedZones {
   public void setup() throws Throwable {
     primaryDbName = testName.getMethodName() + "_" + +System.currentTimeMillis();
     replicatedDbName = "replicated_" + primaryDbName;
-    primary.run("create database " + primaryDbName);
+    primary.run("create database " + primaryDbName + " WITH DBPROPERTIES ( '" +
+            SOURCE_OF_REPLICATION + "' = '1,2,3')");
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/hive/blob/752ba682/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
index 8b33b78..f4cdf02 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
@@ -89,6 +89,7 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
 
 public class TestReplicationScenarios {
 
@@ -379,7 +380,7 @@ public class TestReplicationScenarios {
     verifySetup("SELECT * from " + dbName + ".unptned ORDER BY a", unptn_data, driver);
 
     // Create an empty database to load
-    run("CREATE DATABASE " + dbName + "_empty", driverMirror);
+    createDB(dbName + "_empty", driverMirror);
 
     // Load to an empty database
     Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, dbName + "_empty");
@@ -389,7 +390,7 @@ public class TestReplicationScenarios {
     String[] nullReplId = new String[]{ "NULL" };
 
     // Create a database with a table
-    run("CREATE DATABASE " + dbName + "_withtable", driverMirror);
+    createDB(dbName + "_withtable", driverMirror);
     run("CREATE TABLE " + dbName + "_withtable.unptned(a string) STORED AS TEXTFILE", driverMirror);
     // Load using same dump to a DB with table. It should fail as DB is not empty.
     verifyFail("REPL LOAD " + dbName + "_withtable FROM '" + replDumpLocn + "'", driverMirror);
@@ -398,7 +399,7 @@ public class TestReplicationScenarios {
     verifyRun("REPL STATUS " + dbName + "_withtable", nullReplId, driverMirror);
 
     // Create a database with a view
-    run("CREATE DATABASE " + dbName + "_withview", driverMirror);
+    createDB(dbName + "_withview", driverMirror);
     run("CREATE TABLE " + dbName + "_withview.unptned(a string) STORED AS TEXTFILE", driverMirror);
     run("CREATE VIEW " + dbName + "_withview.view AS SELECT * FROM " + dbName + "_withview.unptned", driverMirror);
     // Load using same dump to a DB with view. It should fail as DB is not empty.
@@ -1893,8 +1894,8 @@ public class TestReplicationScenarios {
     String replDbName1 = dbName1 + "_dupe";
     String replDbName2 = dbName2 + "_dupe";
 
-    run("CREATE DATABASE " + dbName1, driver);
-    run("CREATE DATABASE " + dbName2, driver);
+    createDB(dbName1, driver);
+    createDB(dbName2, driver);
     run("CREATE TABLE " + dbName1 + ".unptned(a string) STORED AS TEXTFILE", driver);
 
     String[] unptn_data = new String[] { "ten", "twenty" };
@@ -1909,14 +1910,14 @@ public class TestReplicationScenarios {
     verifyRun("SELECT a from " + replDbName1 + ".unptned ORDER BY a", unptn_data, driverMirror);
     verifyIfTableNotExist(replDbName2, "unptned", metaStoreClientMirror);
 
-    run("ALTER TABLE " + dbName1 + ".unptned RENAME TO " + dbName2 + ".unptned_renamed", driver);
+    verifyFail("ALTER TABLE " + dbName1 + ".unptned RENAME TO " + dbName2 + ".unptned_renamed", driver);
 
     incrementalLoadAndVerify(dbName1, bootstrap1.lastReplId, replDbName1);
     incrementalLoadAndVerify(dbName2, bootstrap2.lastReplId, replDbName2);
 
-    verifyIfTableNotExist(replDbName1, "unptned", metaStoreClientMirror);
     verifyIfTableNotExist(replDbName1, "unptned_renamed", metaStoreClientMirror);
-    verifyRun("SELECT a from " + replDbName2 + ".unptned_renamed ORDER BY a", unptn_data, driverMirror);
+    verifyIfTableNotExist(replDbName2, "unptned_renamed", metaStoreClientMirror);
+    verifyRun("SELECT a from " + replDbName1 + ".unptned ORDER BY a", unptn_data, driverMirror);
   }
 
   @Test
@@ -1928,8 +1929,8 @@ public class TestReplicationScenarios {
     String replDbName1 = dbName1 + "_dupe";
     String replDbName2 = dbName2 + "_dupe";
 
-    run("CREATE DATABASE " + dbName1, driver);
-    run("CREATE DATABASE " + dbName2, driver);
+    createDB(dbName1, driver);
+    createDB(dbName2, driver);
     run("CREATE TABLE " + dbName1 + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE", driver);
 
     String[] ptn_data = new String[] { "fifteen", "fourteen" };
@@ -1944,14 +1945,14 @@ public class TestReplicationScenarios {
     verifyRun("SELECT a from " + replDbName1 + ".ptned where (b=1) ORDER BY a", ptn_data, driverMirror);
     verifyIfTableNotExist(replDbName2, "ptned", metaStoreClientMirror);
 
-    run("ALTER TABLE " + dbName1 + ".ptned RENAME TO " + dbName2 + ".ptned_renamed", driver);
+    verifyFail("ALTER TABLE " + dbName1 + ".ptned RENAME TO " + dbName2 + ".ptned_renamed", driver);
 
     incrementalLoadAndVerify(dbName1, bootstrap1.lastReplId, replDbName1);
     incrementalLoadAndVerify(dbName2, bootstrap2.lastReplId, replDbName2);
 
-    verifyIfTableNotExist(replDbName1, "ptned", metaStoreClientMirror);
     verifyIfTableNotExist(replDbName1, "ptned_renamed", metaStoreClientMirror);
-    verifyRun("SELECT a from " + replDbName2 + ".ptned_renamed where (b=1) ORDER BY a", ptn_data, driverMirror);
+    verifyIfTableNotExist(replDbName2, "ptned_renamed", metaStoreClientMirror);
+    verifyRun("SELECT a from " + replDbName1 + ".ptned where (b=1) ORDER BY a", ptn_data, driverMirror);
   }
 
   @Test
@@ -3076,7 +3077,7 @@ public class TestReplicationScenarios {
     // Setup
     long firstEventId = metaStoreClient.getCurrentNotificationEventId().getEventId();
     String dbName = "testAuthForNotificationAPIs";
-    driver.run("create database " + dbName);
+    createDB(dbName, driver);
     NotificationEventResponse rsp = metaStoreClient.getNextNotification(firstEventId, 0, null);
     assertEquals(1, rsp.getEventsSize());
     // Test various scenarios
@@ -3156,8 +3157,69 @@ public class TestReplicationScenarios {
     assertTrue(fileCount == fileCountAfter);
   }
 
+  @Test
+  public void testDumpNonReplDatabase() throws IOException {
+    String dbName = createDBNonRepl(testName.getMethodName(), driver);
+    verifyFail("REPL DUMP " + dbName, driver);
+    verifyFail("REPL DUMP " + dbName + " from 1 ", driver);
+    run("alter database " + dbName + " set dbproperties ('repl.source.for' = '1, 2, 3')", driver);
+    assertTrue(run("REPL DUMP " + dbName, true, driver));
+    assertTrue(run("REPL DUMP " + dbName + " from 1 ", true, driver));
+    dbName = createDBNonRepl(testName.getMethodName() + "_case", driver);
+    run("alter database " + dbName + " set dbproperties ('repl.SOURCE.for' = '1, 2, 3')", driver);
+    assertTrue(run("REPL DUMP " + dbName, true, driver));
+    assertTrue(run("REPL DUMP " + dbName + " from 1 ", true, driver));
+  }
+
+  @Test
+  public void testRecycleFileNonReplDatabase() throws IOException {
+    String dbName = createDBNonRepl(testName.getMethodName(), driver);
+
+    String cmDir = hconf.getVar(HiveConf.ConfVars.REPLCMDIR);
+    Path path = new Path(cmDir);
+    FileSystem fs = path.getFileSystem(hconf);
+    ContentSummary cs = fs.getContentSummary(path);
+    long fileCount = cs.getFileCount();
+
+    run("CREATE TABLE " + dbName + ".normal(a int)", driver);
+    run("INSERT INTO " + dbName + ".normal values (1)", driver);
+
+    cs = fs.getContentSummary(path);
+    long fileCountAfter = cs.getFileCount();
+    assertTrue(fileCount == fileCountAfter);
+
+    run("INSERT INTO " + dbName + ".normal values (3)", driver);
+    run("TRUNCATE TABLE " + dbName + ".normal", driver);
+
+    cs = fs.getContentSummary(path);
+    fileCountAfter = cs.getFileCount();
+    assertTrue(fileCount == fileCountAfter);
+
+    run("INSERT INTO " + dbName + ".normal values (4)", driver);
+    run("ALTER TABLE " + dbName + ".normal RENAME to " + dbName + ".normal1", driver);
+    verifyRun("SELECT count(*) from " + dbName + ".normal1", new String[]{"1"}, driver);
+
+    cs = fs.getContentSummary(path);
+    fileCountAfter = cs.getFileCount();
+    assertTrue(fileCount == fileCountAfter);
+
+    run("INSERT INTO " + dbName + ".normal1 values (5)", driver);
+    run("DROP TABLE " + dbName + ".normal1", driver);
+
+    cs = fs.getContentSummary(path);
+    fileCountAfter = cs.getFileCount();
+    assertTrue(fileCount == fileCountAfter);
+  }
+
   private static String createDB(String name, IDriver myDriver) {
     LOG.info("Testing " + name);
+    run("CREATE DATABASE " + name + " WITH DBPROPERTIES ( '" +
+            SOURCE_OF_REPLICATION + "' = '1,2,3')", myDriver);
+    return name;
+  }
+
+  private static String createDBNonRepl(String name, IDriver myDriver) {
+    LOG.info("Testing " + name);
     String dbName = name + "_" + tid;
     run("CREATE DATABASE " + dbName, myDriver);
     return dbName;

http://git-wip-us.apache.org/repos/asf/hive/blob/752ba682/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
index 8ad507f..9a2d296 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.hadoop.hive.shims.Utils;
+import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
 import org.junit.rules.TestName;
 import org.junit.rules.TestRule;
 import org.slf4j.Logger;
@@ -97,7 +98,8 @@ public class TestReplicationScenariosAcidTables {
     replV1BackwardCompat = primary.getReplivationV1CompatRule(new ArrayList<>());
     primaryDbName = testName.getMethodName() + "_" + +System.currentTimeMillis();
     replicatedDbName = "replicated_" + primaryDbName;
-    primary.run("create database " + primaryDbName);
+    primary.run("create database " + primaryDbName + " WITH DBPROPERTIES ( '" +
+            SOURCE_OF_REPLICATION + "' = '1,2,3')");
   }
 
   @After

http://git-wip-us.apache.org/repos/asf/hive/blob/752ba682/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
index ebcb0d6..6167459 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
@@ -60,6 +60,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
 
 public class TestReplicationScenariosAcrossInstances {
   @Rule
@@ -98,7 +99,8 @@ public class TestReplicationScenariosAcrossInstances {
     replV1BackwardCompat = primary.getReplivationV1CompatRule(new ArrayList<>());
     primaryDbName = testName.getMethodName() + "_" + +System.currentTimeMillis();
     replicatedDbName = "replicated_" + primaryDbName;
-    primary.run("create database " + primaryDbName);
+    primary.run("create database " + primaryDbName + " WITH DBPROPERTIES ( '" +
+            SOURCE_OF_REPLICATION + "' = '1,2,3')");
   }
 
   @After
@@ -402,17 +404,20 @@ public class TestReplicationScenariosAcrossInstances {
     String randomTwo = RandomStringUtils.random(10, true, false);
     String dbOne = primaryDbName + randomOne;
     String dbTwo = primaryDbName + randomTwo;
+    primary.run("alter database default set dbproperties ('repl.source.for' = '1, 2, 3')");
     WarehouseInstance.Tuple tuple = primary
         .run("use " + primaryDbName)
         .run("create table t1 (i int, j int)")
-        .run("create database " + dbOne)
+        .run("create database " + dbOne + " WITH DBPROPERTIES ( '" +
+                SOURCE_OF_REPLICATION + "' = '1,2,3')")
         .run("use " + dbOne)
     // TODO: this is wrong; this test sets up dummy txn manager and so it cannot create ACID tables.
     //       This used to work by accident, now this works due a test flag. The test needs to be fixed.
     //       Also applies for a couple more tests.
         .run("create table t1 (i int, j int) partitioned by (load_date date) "
             + "clustered by(i) into 2 buckets stored as orc tblproperties ('transactional'='true') ")
-        .run("create database " + dbTwo)
+        .run("create database " + dbTwo + " WITH DBPROPERTIES ( '" +
+                SOURCE_OF_REPLICATION + "' = '1,2,3')")
         .run("use " + dbTwo)
         .run("create table t1 (i int, j int)")
         .dump("`*`", null, Arrays.asList("'hive.repl.dump.metadata.only'='true'",
@@ -463,10 +468,12 @@ public class TestReplicationScenariosAcrossInstances {
     String randomOne = RandomStringUtils.random(10, true, false);
     String randomTwo = RandomStringUtils.random(10, true, false);
     String dbOne = primaryDbName + randomOne;
+    primary.run("alter database default set dbproperties ('repl.source.for' = '1, 2, 3')");
     WarehouseInstance.Tuple bootstrapTuple = primary
         .run("use " + primaryDbName)
         .run("create table t1 (i int, j int)")
-        .run("create database " + dbOne)
+        .run("create database " + dbOne + " WITH DBPROPERTIES ( '" +
+                SOURCE_OF_REPLICATION + "' = '1,2,3')")
         .run("use " + dbOne)
         .run("create table t1 (i int, j int) partitioned by (load_date date) "
             + "clustered by(i) into 2 buckets stored as orc tblproperties ('transactional'='true') ")
@@ -475,7 +482,8 @@ public class TestReplicationScenariosAcrossInstances {
 
     String dbTwo = primaryDbName + randomTwo;
     WarehouseInstance.Tuple incrementalTuple = primary
-        .run("create database " + dbTwo)
+        .run("create database " + dbTwo + " WITH DBPROPERTIES ( '" +
+                SOURCE_OF_REPLICATION + "' = '1,2,3')")
         .run("use " + dbTwo)
         .run("create table t1 (i int, j int)")
         .run("use " + dbOne)

http://git-wip-us.apache.org/repos/asf/hive/blob/752ba682/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/security/authorization/plugin/TestHiveAuthorizerCheckInvocation.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/security/authorization/plugin/TestHiveAuthorizerCheckInvocation.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/security/authorization/plugin/TestHiveAuthorizerCheckInvocation.java
index a3cdd6e..e3c83d2 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/security/authorization/plugin/TestHiveAuthorizerCheckInvocation.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/security/authorization/plugin/TestHiveAuthorizerCheckInvocation.java
@@ -53,6 +53,7 @@ import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
 
 /**
  * Test HiveAuthorizer api invocation
@@ -104,7 +105,8 @@ public class TestHiveAuthorizerCheckInvocation {
     runCmd("create table " + tableName
         + " (i int, j int, k string) partitioned by (city string, `date` string) ");
     runCmd("create view " + viewName + " as select * from " + tableName);
-    runCmd("create database " + dbName);
+    runCmd("create database " + dbName + " WITH DBPROPERTIES ( '" +
+            SOURCE_OF_REPLICATION + "' = '1,2,3')");
     runCmd("create table " + fullInTableName + "(i int)");
     // Need a separate table for ACID testing since it has to be bucketed and it has to be Acid
     runCmd("create table " + acidTableName + " (i int, j int, k int) clustered by (k) into 2 buckets " +

http://git-wip-us.apache.org/repos/asf/hive/blob/752ba682/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java
index 597544f..14d3894 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java
@@ -29,6 +29,8 @@ import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
 import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
 import org.apache.hadoop.hive.metastore.api.Table;
+import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
+import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
 import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
@@ -52,6 +54,7 @@ public class TestCleanerWithReplication extends CompactorTest {
   private Path cmRootDirectory;
   private static FileSystem fs;
   private static MiniDFSCluster miniDFSCluster;
+  private final String dbName = "TestCleanerWithReplication";
 
   @Before
   public void setup() throws Exception {
@@ -68,6 +71,10 @@ public class TestCleanerWithReplication extends CompactorTest {
       fs.mkdirs(cmRootDirectory);
     }
     tmpdir = new File(Files.createTempDirectory("compactor_test_table_").toString());
+    Database db = new Database();
+    db.putToParameters(SOURCE_OF_REPLICATION, "1,2,3");
+    db.setName(dbName);
+    ms.createDatabase(db);
   }
 
   @BeforeClass
@@ -81,9 +88,10 @@ public class TestCleanerWithReplication extends CompactorTest {
   }
 
   @After
-  public void tearDown() throws IOException {
+  public void tearDown() throws Exception {
     fs.delete(cmRootDirectory, true);
     compactorTestCleanup();
+    ms.dropDatabase(dbName, true, true, true);
   }
 
   @AfterClass
@@ -93,16 +101,16 @@ public class TestCleanerWithReplication extends CompactorTest {
 
   @Test
   public void cleanupAfterMajorTableCompaction() throws Exception {
-    Table t = newTable("default", "camtc", false);
+    Table t = newTable(dbName, "camtc", false);
 
     addBaseFile(t, null, 20L, 20);
     addDeltaFile(t, null, 21L, 22L, 2);
     addDeltaFile(t, null, 23L, 24L, 2);
     addBaseFile(t, null, 25L, 25);
 
-    burnThroughTransactions("default", "camtc", 25);
+    burnThroughTransactions(dbName, "camtc", 25);
 
-    CompactionRequest rqst = new CompactionRequest("default", "camtc", CompactionType.MAJOR);
+    CompactionRequest rqst = new CompactionRequest(dbName, "camtc", CompactionType.MAJOR);
     txnHandler.compact(rqst);
     CompactionInfo ci = txnHandler.findNextToCompact("fred");
     txnHandler.markCompacted(ci);
@@ -113,7 +121,7 @@ public class TestCleanerWithReplication extends CompactorTest {
 
   @Test
   public void cleanupAfterMajorPartitionCompaction() throws Exception {
-    Table t = newTable("default", "campc", true);
+    Table t = newTable(dbName, "campc", true);
     Partition p = newPartition(t, "today");
 
     addBaseFile(t, p, 20L, 20);
@@ -121,9 +129,9 @@ public class TestCleanerWithReplication extends CompactorTest {
     addDeltaFile(t, p, 23L, 24L, 2);
     addBaseFile(t, p, 25L, 25);
 
-    burnThroughTransactions("default", "campc", 25);
+    burnThroughTransactions(dbName, "campc", 25);
 
-    CompactionRequest rqst = new CompactionRequest("default", "campc", CompactionType.MAJOR);
+    CompactionRequest rqst = new CompactionRequest(dbName, "campc", CompactionType.MAJOR);
     rqst.setPartitionname("ds=today");
     txnHandler.compact(rqst);
     CompactionInfo ci = txnHandler.findNextToCompact("fred");
@@ -135,16 +143,16 @@ public class TestCleanerWithReplication extends CompactorTest {
 
   @Test
   public void cleanupAfterMinorTableCompaction() throws Exception {
-    Table t = newTable("default", "camitc", false);
+    Table t = newTable(dbName, "camitc", false);
 
     addBaseFile(t, null, 20L, 20);
     addDeltaFile(t, null, 21L, 22L, 2);
     addDeltaFile(t, null, 23L, 24L, 2);
     addDeltaFile(t, null, 21L, 24L, 4);
 
-    burnThroughTransactions("default", "camitc", 25);
+    burnThroughTransactions(dbName, "camitc", 25);
 
-    CompactionRequest rqst = new CompactionRequest("default", "camitc", CompactionType.MINOR);
+    CompactionRequest rqst = new CompactionRequest(dbName, "camitc", CompactionType.MINOR);
     txnHandler.compact(rqst);
     CompactionInfo ci = txnHandler.findNextToCompact("fred");
     txnHandler.markCompacted(ci);
@@ -155,7 +163,7 @@ public class TestCleanerWithReplication extends CompactorTest {
 
   @Test
   public void cleanupAfterMinorPartitionCompaction() throws Exception {
-    Table t = newTable("default", "camipc", true);
+    Table t = newTable(dbName, "camipc", true);
     Partition p = newPartition(t, "today");
 
     addBaseFile(t, p, 20L, 20);
@@ -163,9 +171,9 @@ public class TestCleanerWithReplication extends CompactorTest {
     addDeltaFile(t, p, 23L, 24L, 2);
     addDeltaFile(t, p, 21L, 24L, 4);
 
-    burnThroughTransactions("default", "camipc", 25);
+    burnThroughTransactions(dbName, "camipc", 25);
 
-    CompactionRequest rqst = new CompactionRequest("default", "camipc", CompactionType.MINOR);
+    CompactionRequest rqst = new CompactionRequest(dbName, "camipc", CompactionType.MINOR);
     rqst.setPartitionname("ds=today");
     txnHandler.compact(rqst);
     CompactionInfo ci = txnHandler.findNextToCompact("fred");

http://git-wip-us.apache.org/repos/asf/hive/blob/752ba682/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java
index 0919390..1795f15 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java
@@ -84,6 +84,7 @@ import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
+import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
 
 public class TestJdbcWithMiniHS2 {
   private static MiniHS2 miniHS2 = null;
@@ -120,7 +121,8 @@ public class TestJdbcWithMiniHS2 {
     }
     Statement stmt = conDefault.createStatement();
     stmt.execute("drop database if exists " + testDbName + " cascade");
-    stmt.execute("create database " + testDbName);
+    stmt.execute("create database " + testDbName + " WITH DBPROPERTIES ( '" +
+            SOURCE_OF_REPLICATION + "' = '1,2,3')");
     stmt.close();
 
     try {

http://git-wip-us.apache.org/repos/asf/hive/blob/752ba682/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index ab758c4..d4361d5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@ -1587,10 +1587,10 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
     }
   }
 
-  private void deleteDir(Path dir) throws HiveException {
+  private void deleteDir(Path dir, Database db) throws HiveException {
     try {
       Warehouse wh = new Warehouse(conf);
-      wh.deleteDir(dir, true);
+      wh.deleteDir(dir, true, db);
     } catch (MetaException e) {
       throw new HiveException(e);
     }
@@ -1845,7 +1845,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
     // If a failure occurs here, the directory containing the original files
     // will not be deleted. The user will run ARCHIVE again to clear this up
     if(pathExists(intermediateOriginalDir)) {
-      deleteDir(intermediateOriginalDir);
+      deleteDir(intermediateOriginalDir, db.getDatabase(tbl.getDbName()));
     }
 
     if(recovery) {
@@ -2051,7 +2051,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
     // If a failure happens here, the intermediate archive files won't be
     // deleted. The user will need to call unarchive again to clear those up.
     if(pathExists(intermediateArchivedDir)) {
-      deleteDir(intermediateArchivedDir);
+      deleteDir(intermediateArchivedDir, db.getDatabase(tbl.getDbName()));
     }
 
     if(recovery) {

http://git-wip-us.apache.org/repos/asf/hive/blob/752ba682/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 64b3f83..5991098 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -112,6 +112,7 @@ import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
 import org.apache.hadoop.hive.metastore.SynchronizedMetaStoreClient;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.ReplChangeManager;
 import org.apache.hadoop.hive.metastore.api.AggrStats;
 import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
 import org.apache.hadoop.hive.metastore.api.CheckConstraintsRequest;
@@ -1737,8 +1738,10 @@ public class Hive {
           //for fullAcid tables we don't delete files for commands with OVERWRITE - we create a new
           // base_x.  (there is Insert Overwrite and Load Data Overwrite)
           boolean isAutoPurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge"));
+          boolean needRecycle = !tbl.isTemporary()
+                  && ReplChangeManager.isSourceOfReplication(Hive.get().getDatabase(tbl.getDbName()));
           replaceFiles(tbl.getPath(), loadPath, destPath, oldPartPath, getConf(), isSrcLocal,
-              isAutoPurge, newFiles, FileUtils.HIDDEN_FILES_PATH_FILTER, !tbl.isTemporary());
+              isAutoPurge, newFiles, FileUtils.HIDDEN_FILES_PATH_FILTER, needRecycle);
         } else {
           FileSystem fs = tbl.getDataLocation().getFileSystem(conf);
           copyFiles(conf, loadPath, destPath, fs, isSrcLocal, isAcidIUDoperation,
@@ -2304,8 +2307,10 @@ private void constructOneLBLocationMap(FileStatus fSta,
       if (loadFileType == LoadFileType.REPLACE_ALL && !isTxnTable) {
         //for fullAcid we don't want to delete any files even for OVERWRITE see HIVE-14988/HIVE-17361
         boolean isAutopurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge"));
+        boolean needRecycle = !tbl.isTemporary()
+                && ReplChangeManager.isSourceOfReplication(Hive.get().getDatabase(tbl.getDbName()));
         replaceFiles(tblPath, loadPath, destPath, tblPath, conf, isSrcLocal, isAutopurge,
-            newFiles, FileUtils.HIDDEN_FILES_PATH_FILTER, !tbl.isTemporary());
+            newFiles, FileUtils.HIDDEN_FILES_PATH_FILTER, needRecycle);
       } else {
         try {
           FileSystem fs = tbl.getDataLocation().getFileSystem(conf);

http://git-wip-us.apache.org/repos/asf/hive/blob/752ba682/ql/src/java/org/apache/hadoop/hive/ql/parse/MetaDataExportListener.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/MetaDataExportListener.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/MetaDataExportListener.java
index 8fccf36..b6d8a28 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/MetaDataExportListener.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/MetaDataExportListener.java
@@ -90,11 +90,9 @@ public class MetaDataExportListener extends MetaStorePreEventListener {
       EximUtil.createExportDump(fs, outFile, mTbl, null, null,
           new HiveConf(conf, MetaDataExportListener.class));
       if (moveMetadataToTrash == true) {
-        wh.deleteDir(metaPath, true);
+        wh.deleteDir(metaPath, true, false, false);
       }
-    } catch (IOException e) {
-      throw new MetaException(e.getMessage());
-    } catch (SemanticException e) {
+    } catch (IOException | SemanticException e) {
       throw new MetaException(e.getMessage());
     }
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/752ba682/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index c17dfbd..b913f69 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -23,6 +23,8 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.ReplChangeManager;
+import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.QueryState;
@@ -106,7 +108,11 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
     switch (ast.getToken().getType()) {
       case TOK_REPL_DUMP: {
         LOG.debug("ReplicationSemanticAnalyzer: analyzeInternal: dump");
-        initReplDump(ast);
+        try {
+          initReplDump(ast);
+        } catch (HiveException e) {
+          throw new SemanticException("repl dump failed " + e.getMessage());
+        }
         analyzeReplDump(ast);
         break;
       }
@@ -128,9 +134,22 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
     }
   }
 
-  private void initReplDump(ASTNode ast) {
+  private void initReplDump(ASTNode ast) throws HiveException {
     int numChildren = ast.getChildCount();
     dbNameOrPattern = PlanUtils.stripQuotes(ast.getChild(0).getText());
+
+    for (String dbName : Utils.matchesDb(db, dbNameOrPattern)) {
+      Database database = db.getDatabase(dbName);
+      if (database != null) {
+        if (!ReplChangeManager.isSourceOfReplication(database)) {
+          throw new SemanticException("Cannot dump database " + dbNameOrPattern +
+                  " as it is not a source of replication");
+        }
+      } else {
+        throw new SemanticException("Cannot dump database " + dbNameOrPattern + " as it does not exist");
+      }
+    }
+
     // skip the first node, which is always required
     int currNode = 1;
     while (currNode < numChildren) {

http://git-wip-us.apache.org/repos/asf/hive/blob/752ba682/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
index fe6d2d6..3565616 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.hive.ql.txn.compactor;
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.metastore.ReplChangeManager;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.fs.FileStatus;
@@ -35,6 +37,7 @@ import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
 import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -320,7 +323,7 @@ public class Cleaner extends CompactorThread {
     return " id=" + ci.id;
   }
   private void removeFiles(String location, ValidWriteIdList writeIdList, CompactionInfo ci)
-      throws IOException {
+          throws IOException, HiveException {
     Path locPath = new Path(location);
     AcidUtils.Directory dir = AcidUtils.getAcidState(locPath, conf, writeIdList);
     List<FileStatus> obsoleteDirs = dir.getObsolete();
@@ -346,10 +349,13 @@ public class Cleaner extends CompactorThread {
     }
 
     FileSystem fs = filesToDelete.get(0).getFileSystem(conf);
+    Database db = Hive.get().getDatabase(ci.dbname);
 
     for (Path dead : filesToDelete) {
       LOG.debug("Going to delete path " + dead.toString());
-      replChangeManager.recycle(dead, ReplChangeManager.RecycleType.MOVE, true);
+      if (ReplChangeManager.isSourceOfReplication(db)) {
+        replChangeManager.recycle(dead, ReplChangeManager.RecycleType.MOVE, true);
+      }
       fs.delete(dead, true);
     }
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/752ba682/ql/src/test/queries/clientnegative/repl_dump_requires_admin.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientnegative/repl_dump_requires_admin.q b/ql/src/test/queries/clientnegative/repl_dump_requires_admin.q
index cd9080c..9d712ca 100644
--- a/ql/src/test/queries/clientnegative/repl_dump_requires_admin.q
+++ b/ql/src/test/queries/clientnegative/repl_dump_requires_admin.q
@@ -16,7 +16,7 @@ drop database if exists test_repldump_adminpriv cascade;
 set user.name=ruser1;
 show role grant user ruser1;
 
-create database test_repldump_adminpriv;
+create database test_repldump_adminpriv with DBPROPERTIES ('repl.source.for' = '1,2,3');
 create table test_repldump_adminpriv.dummy_tbl(a int) partitioned by (b string);
 show tables test_repldump_adminpriv;
 

http://git-wip-us.apache.org/repos/asf/hive/blob/752ba682/ql/src/test/queries/clientnegative/repl_load_requires_admin.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientnegative/repl_load_requires_admin.q b/ql/src/test/queries/clientnegative/repl_load_requires_admin.q
index 68a132d..0b1b12b 100644
--- a/ql/src/test/queries/clientnegative/repl_load_requires_admin.q
+++ b/ql/src/test/queries/clientnegative/repl_load_requires_admin.q
@@ -18,7 +18,7 @@ drop database if exists test_replload_adminpriv_tgt2 cascade;
 set user.name=ruser1;
 show role grant user ruser1;
 
-create database test_replload_adminpriv_src;
+create database test_replload_adminpriv_src with DBPROPERTIES ('repl.source.for' = '1,2,3');
 create table test_replload_adminpriv_src.dummy_tbl(a int) partitioned by (b string);
 show tables test_replload_adminpriv_src;
 

http://git-wip-us.apache.org/repos/asf/hive/blob/752ba682/ql/src/test/results/clientnegative/repl_dump_requires_admin.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientnegative/repl_dump_requires_admin.q.out b/ql/src/test/results/clientnegative/repl_dump_requires_admin.q.out
index ac5103e..272b8b8 100644
--- a/ql/src/test/results/clientnegative/repl_dump_requires_admin.q.out
+++ b/ql/src/test/results/clientnegative/repl_dump_requires_admin.q.out
@@ -12,10 +12,10 @@ PREHOOK: type: SHOW_ROLE_GRANT
 POSTHOOK: query: show role grant user ruser1
 POSTHOOK: type: SHOW_ROLE_GRANT
 public	false	-1	
-PREHOOK: query: create database test_repldump_adminpriv
+PREHOOK: query: create database test_repldump_adminpriv with DBPROPERTIES ('repl.source.for' = '1,2,3')
 PREHOOK: type: CREATEDATABASE
 PREHOOK: Output: database:test_repldump_adminpriv
-POSTHOOK: query: create database test_repldump_adminpriv
+POSTHOOK: query: create database test_repldump_adminpriv with DBPROPERTIES ('repl.source.for' = '1,2,3')
 POSTHOOK: type: CREATEDATABASE
 POSTHOOK: Output: database:test_repldump_adminpriv
 PREHOOK: query: create table test_repldump_adminpriv.dummy_tbl(a int) partitioned by (b string)

http://git-wip-us.apache.org/repos/asf/hive/blob/752ba682/ql/src/test/results/clientnegative/repl_load_requires_admin.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientnegative/repl_load_requires_admin.q.out b/ql/src/test/results/clientnegative/repl_load_requires_admin.q.out
index 01b57a7..1499c39 100644
--- a/ql/src/test/results/clientnegative/repl_load_requires_admin.q.out
+++ b/ql/src/test/results/clientnegative/repl_load_requires_admin.q.out
@@ -20,10 +20,10 @@ PREHOOK: type: SHOW_ROLE_GRANT
 POSTHOOK: query: show role grant user ruser1
 POSTHOOK: type: SHOW_ROLE_GRANT
 public	false	-1	
-PREHOOK: query: create database test_replload_adminpriv_src
+PREHOOK: query: create database test_replload_adminpriv_src with DBPROPERTIES ('repl.source.for' = '1,2,3')
 PREHOOK: type: CREATEDATABASE
 PREHOOK: Output: database:test_replload_adminpriv_src
-POSTHOOK: query: create database test_replload_adminpriv_src
+POSTHOOK: query: create database test_replload_adminpriv_src with DBPROPERTIES ('repl.source.for' = '1,2,3')
 POSTHOOK: type: CREATEDATABASE
 POSTHOOK: Output: database:test_replload_adminpriv_src
 PREHOOK: query: create table test_replload_adminpriv_src.dummy_tbl(a int) partitioned by (b string)

http://git-wip-us.apache.org/repos/asf/hive/blob/752ba682/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java
index 9ab9e85..be05838 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java
@@ -242,7 +242,8 @@ public class HiveAlterHandler implements AlterHandler {
                       " already exists : " + destPath);
             }
             // check that src exists and also checks permissions necessary, rename src to dest
-            if (srcFs.exists(srcPath) && wh.renameDir(srcPath, destPath, true)) {
+            if (srcFs.exists(srcPath) && wh.renameDir(srcPath, destPath,
+                    ReplChangeManager.isSourceOfReplication(olddb))) {
               dataWasMoved = true;
             }
           } catch (IOException | MetaException e) {
@@ -559,6 +560,7 @@ public class HiveAlterHandler implements AlterHandler {
     FileSystem srcFs;
     FileSystem destFs = null;
     boolean dataWasMoved = false;
+    Database db;
     try {
       msdb.openTransaction();
       Table tbl = msdb.getTable(DEFAULT_CATALOG_NAME, dbname, name);
@@ -593,9 +595,11 @@ public class HiveAlterHandler implements AlterHandler {
       // 3) rename the partition directory if it is not an external table
       if (!tbl.getTableType().equals(TableType.EXTERNAL_TABLE.toString())) {
         try {
+          db = msdb.getDatabase(catName, dbname);
+
           // if tbl location is available use it
           // else derive the tbl location from database location
-          destPath = wh.getPartitionPath(msdb.getDatabase(catName, dbname), tbl, new_part.getValues());
+          destPath = wh.getPartitionPath(db, tbl, new_part.getValues());
           destPath = constructRenamedPath(destPath, new Path(new_part.getSd().getLocation()));
         } catch (NoSuchObjectException e) {
           LOG.debug("Didn't find object in metastore ", e);
@@ -633,7 +637,7 @@ public class HiveAlterHandler implements AlterHandler {
               }
 
               //rename the data directory
-              wh.renameDir(srcPath, destPath, true);
+              wh.renameDir(srcPath, destPath, ReplChangeManager.isSourceOfReplication(db));
               LOG.info("Partition directory rename from " + srcPath + " to " + destPath + " done.");
               dataWasMoved = true;
             }
@@ -645,7 +649,6 @@ public class HiveAlterHandler implements AlterHandler {
             LOG.error("Cannot rename partition directory from " + srcPath + " to " + destPath, me);
             throw me;
           }
-
           new_part.getSd().setLocation(newPartLoc);
         }
       } else {

http://git-wip-us.apache.org/repos/asf/hive/blob/752ba682/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index 8d216f7..d37e705 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hive.metastore;
 
 import static org.apache.commons.lang.StringUtils.join;
+import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
 import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_DATABASE_COMMENT;
 import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_DATABASE_NAME;
 import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_CATALOG_NAME;
@@ -1018,7 +1019,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
 
           // Create a default database inside the catalog
           Database db = new Database(DEFAULT_DATABASE_NAME, "Default database for catalog " +
-              catalog.getName(), catalog.getLocationUri(), Collections.emptyMap());
+                           catalog.getName(), catalog.getLocationUri(), Collections.emptyMap());
           db.setCatalogName(catalog.getName());
           create_database_core(ms, db);
 
@@ -1034,7 +1035,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           if (!success) {
             ms.rollbackTransaction();
             if (madeDir) {
-              wh.deleteDir(catPath, true);
+              wh.deleteDir(catPath, true, false, false);
             }
           }
 
@@ -1209,7 +1210,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         success = ms.commitTransaction();
       } finally {
         if (success) {
-          wh.deleteDir(wh.getDnsPath(new Path(cat.getLocationUri())), false);
+          wh.deleteDir(wh.getDnsPath(new Path(cat.getLocationUri())), false, false, false);
         } else {
           ms.rollbackTransaction();
         }
@@ -1271,7 +1272,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         if (!success) {
           ms.rollbackTransaction();
           if (madeDir) {
-            wh.deleteDir(dbPath, true);
+            wh.deleteDir(dbPath, true, db);
           }
         }
 
@@ -1434,6 +1435,10 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         ms.openTransaction();
         db = ms.getDatabase(catName, name);
 
+        if (!isInTest && ReplChangeManager.isSourceOfReplication(db)) {
+          throw new InvalidOperationException("can not drop a database which is a source of replication");
+        }
+
         firePreEvent(new PreDropDatabaseEvent(db, this));
         String catPrependedName = MetaStoreUtils.prependCatalogToDbName(catName, name, conf);
 
@@ -1565,14 +1570,14 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           ms.rollbackTransaction();
         } else if (deleteData) {
           // Delete the data in the partitions which have other locations
-          deletePartitionData(partitionPaths);
+          deletePartitionData(partitionPaths, false, db);
           // Delete the data in the tables which have other locations
           for (Path tablePath : tablePaths) {
-            deleteTableData(tablePath);
+            deleteTableData(tablePath, false, db);
           }
           // Delete the data in the database
           try {
-            wh.deleteDir(new Path(db.getLocationUri()), true);
+            wh.deleteDir(new Path(db.getLocationUri()), true, db);
           } catch (Exception e) {
             LOG.error("Failed to delete database directory: " + db.getLocationUri() +
                 " " + e.getMessage());
@@ -1808,6 +1813,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       Map<String, String> transactionalListenerResponses = Collections.emptyMap();
       Path tblPath = null;
       boolean success = false, madeDir = false;
+      Database db = null;
       try {
         if (!tbl.isSetCatName()) {
           tbl.setCatName(getDefaultCatalog(conf));
@@ -1816,11 +1822,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
 
         ms.openTransaction();
 
-        Database db = ms.getDatabase(tbl.getCatName(), tbl.getDbName());
-        if (db == null) {
-          throw new NoSuchObjectException("The database " +
-              Warehouse.getCatalogQualifiedDbName(tbl.getCatName(), tbl.getDbName()) + " does not exist");
-        }
+        db = ms.getDatabase(tbl.getCatName(), tbl.getDbName());
 
         // get_table checks whether database exists, it should be moved here
         if (is_table_exists(ms, tbl.getCatName(), tbl.getDbName(), tbl.getTableName())) {
@@ -1831,8 +1833,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         if (!TableType.VIRTUAL_VIEW.toString().equals(tbl.getTableType())) {
           if (tbl.getSd().getLocation() == null
               || tbl.getSd().getLocation().isEmpty()) {
-            tblPath = wh.getDefaultTablePath(
-                ms.getDatabase(tbl.getCatName(), tbl.getDbName()), tbl.getTableName());
+            tblPath = wh.getDefaultTablePath(db, tbl.getTableName());
           } else {
             if (!isExternal(tbl) && !MetaStoreUtils.isNonNativeTable(tbl)) {
               LOG.warn("Location: " + tbl.getSd().getLocation()
@@ -1955,7 +1956,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         if (!success) {
           ms.rollbackTransaction();
           if (madeDir) {
-            wh.deleteDir(tblPath, true);
+            wh.deleteDir(tblPath, true, db);
           }
         }
 
@@ -2428,8 +2429,11 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       Table tbl = null;
       boolean ifPurge = false;
       Map<String, String> transactionalListenerResponses = Collections.emptyMap();
+      Database db = null;
       try {
         ms.openTransaction();
+        db = ms.getDatabase(catName, dbname);
+
         // drop any partitions
         tbl = get_table_core(catName, dbname, name);
         if (tbl == null) {
@@ -2480,9 +2484,9 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         } else if (deleteData && !isExternal) {
           // Data needs deletion. Check if trash may be skipped.
           // Delete the data in the partitions which have other locations
-          deletePartitionData(partPaths, ifPurge);
+          deletePartitionData(partPaths, ifPurge, db);
           // Delete the data in the table
-          deleteTableData(tblPath, ifPurge);
+          deleteTableData(tblPath, ifPurge, db);
           // ok even if the data is not deleted
         }
 
@@ -2501,23 +2505,15 @@ public class HiveMetaStore extends ThriftHiveMetastore {
      * Deletes the data in a table's location, if it fails logs an error
      *
      * @param tablePath
-     */
-    private void deleteTableData(Path tablePath) {
-      deleteTableData(tablePath, false);
-    }
-
-    /**
-     * Deletes the data in a table's location, if it fails logs an error
-     *
-     * @param tablePath
      * @param ifPurge completely purge the table (skipping trash) while removing
      *                data from warehouse
+     * @param db database the table belongs to
      */
-    private void deleteTableData(Path tablePath, boolean ifPurge) {
+    private void deleteTableData(Path tablePath, boolean ifPurge, Database db) {
 
       if (tablePath != null) {
         try {
-          wh.deleteDir(tablePath, true, ifPurge);
+          wh.deleteDir(tablePath, true, ifPurge, db);
         } catch (Exception e) {
           LOG.error("Failed to delete table directory: " + tablePath +
               " " + e.getMessage());
@@ -2526,28 +2522,19 @@ public class HiveMetaStore extends ThriftHiveMetastore {
     }
 
     /**
-     * Give a list of partitions' locations, tries to delete each one
-     * and for each that fails logs an error.
-     *
-     * @param partPaths
-     */
-    private void deletePartitionData(List<Path> partPaths) {
-      deletePartitionData(partPaths, false);
-    }
-
-    /**
     * Give a list of partitions' locations, tries to delete each one
     * and for each that fails logs an error.
     *
     * @param partPaths
     * @param ifPurge completely purge the partition (skipping trash) while
     *                removing data from warehouse
+    * @param db database the partition belongs to
     */
-    private void deletePartitionData(List<Path> partPaths, boolean ifPurge) {
+    private void deletePartitionData(List<Path> partPaths, boolean ifPurge, Database db) {
       if (partPaths != null && !partPaths.isEmpty()) {
         for (Path partPath : partPaths) {
           try {
-            wh.deleteDir(partPath, true, ifPurge);
+            wh.deleteDir(partPath, true, ifPurge, db);
           } catch (Exception e) {
             LOG.error("Failed to delete partition directory: " + partPath +
                 " " + e.getMessage());
@@ -2779,6 +2766,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         String[] parsedDbName = parseDbName(dbName, conf);
         Table tbl = get_table_core(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tableName);
         boolean isAutopurge = (tbl.isSetParameters() && "true".equalsIgnoreCase(tbl.getParameters().get("auto.purge")));
+        Database db = get_database_core(parsedDbName[CAT_NAME], parsedDbName[DB_NAME]);
 
         // This is not transactional
         for (Path location : getLocationsForTruncate(getMS(), parsedDbName[CAT_NAME],
@@ -2789,7 +2777,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
             HdfsUtils.HadoopFileStatus status = new HdfsUtils.HadoopFileStatus(getConf(), fs, location);
             FileStatus targetStatus = fs.getFileStatus(location);
             String targetGroup = targetStatus == null ? null : targetStatus.getGroup();
-            wh.deleteDir(location, true, isAutopurge);
+            wh.deleteDir(location, true, isAutopurge, db);
             fs.mkdirs(location);
             HdfsUtils.setFullFileStatus(getConf(), status, targetGroup, fs, location, false);
           } else {
@@ -2798,7 +2786,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
               continue;
             }
             for (final FileStatus status : statuses) {
-              wh.deleteDir(status.getPath(), true, isAutopurge);
+              wh.deleteDir(status.getPath(), true, isAutopurge, db);
             }
           }
         }
@@ -3071,13 +3059,14 @@ public class HiveMetaStore extends ThriftHiveMetastore {
     private Partition append_partition_common(RawStore ms, String catName, String dbName,
                                               String tableName, List<String> part_vals,
                                               EnvironmentContext envContext)
-        throws InvalidObjectException, AlreadyExistsException, MetaException {
+            throws InvalidObjectException, AlreadyExistsException, MetaException, NoSuchObjectException {
 
       Partition part = new Partition();
       boolean success = false, madeDir = false;
       Path partLocation = null;
       Table tbl = null;
       Map<String, String> transactionalListenerResponses = Collections.emptyMap();
+      Database db = null;
       try {
         ms.openTransaction();
         part.setCatName(catName);
@@ -3097,6 +3086,8 @@ public class HiveMetaStore extends ThriftHiveMetastore {
               "Cannot append a partition to a view");
         }
 
+        db = get_database_core(catName, dbName);
+
         firePreEvent(new PreAddPartitionEvent(tbl, part, this));
 
         part.setSd(tbl.getSd().deepCopy());
@@ -3149,7 +3140,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         if (!success) {
           ms.rollbackTransaction();
           if (madeDir) {
-            wh.deleteDir(partLocation, true);
+            wh.deleteDir(partLocation, true, db);
           }
         }
 
@@ -3319,6 +3310,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       final List<Partition> existingParts = new ArrayList<>();
       Table tbl = null;
       Map<String, String> transactionalListenerResponses = Collections.emptyMap();
+      Database db = null;
 
       try {
         ms.openTransaction();
@@ -3329,6 +3321,8 @@ public class HiveMetaStore extends ThriftHiveMetastore {
               " does not exist");
         }
 
+        db = ms.getDatabase(catName, dbName);
+
         if (!parts.isEmpty()) {
           firePreEvent(new PreAddPartitionEvent(tbl, parts, this));
         }
@@ -3421,7 +3415,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           for (Map.Entry<PartValEqWrapper, Boolean> e : addedPartitions.entrySet()) {
             if (e.getValue()) {
               // we just created this directory - it's not a case of pre-creation, so we nuke.
-              wh.deleteDir(new Path(e.getKey().partition.getSd().getLocation()), true);
+              wh.deleteDir(new Path(e.getKey().partition.getSd().getLocation()), true, db);
             }
           }
 
@@ -3556,6 +3550,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           .getPartitionIterator();
       Table tbl = null;
       Map<String, String> transactionalListenerResponses = Collections.emptyMap();
+      Database db = null;
       try {
         ms.openTransaction();
         tbl = ms.getTable(catName, dbName, tblName);
@@ -3564,6 +3559,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
               + "database or table " + dbName + "." + tblName + " does not exist");
         }
 
+        db = ms.getDatabase(catName, dbName);
         firePreEvent(new PreAddPartitionEvent(tbl, partitionSpecProxy, this));
         List<Future<Partition>> partFutures = Lists.newArrayList();
         final Table table = tbl;
@@ -3645,7 +3641,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           for (Map.Entry<PartValEqWrapperLite, Boolean> e : addedPartitions.entrySet()) {
             if (e.getValue()) {
               // we just created this directory - it's not a case of pre-creation, so we nuke.
-              wh.deleteDir(new Path(e.getKey().location), true);
+              wh.deleteDir(new Path(e.getKey().location), true, db);
             }
           }
         }
@@ -3783,7 +3779,8 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           success = ms.addPartition(part);
         } finally {
           if (!success && madeDir) {
-            wh.deleteDir(new Path(part.getSd().getLocation()), true);
+            wh.deleteDir(new Path(part.getSd().getLocation()), true,
+                    ms.getDatabase(tbl.getCatName(), tbl.getDbName()));
           }
         }
 
@@ -3863,6 +3860,19 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       return new Partition();
     }
 
+    private boolean isRenameAllowed(String catalogName, String srcDBName, String destDBName)
+            throws MetaException, NoSuchObjectException {
+      RawStore ms = getMS();
+      if (!srcDBName.equalsIgnoreCase(destDBName)) {
+        Database destDB = ms.getDatabase(catalogName, destDBName);
+        Database srcDB = ms.getDatabase(catalogName, srcDBName);
+        if (ReplChangeManager.isSourceOfReplication(srcDB) || ReplChangeManager.isSourceOfReplication(destDB)) {
+          return false;
+        }
+      }
+      return true;
+    }
+
     @Override
     public List<Partition> exchange_partitions(Map<String, String> partitionSpecs,
         String sourceDbName, String sourceTableName, String destDbName,
@@ -3898,6 +3908,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
             getCatalogQualifiedTableName(parsedSourceDbName[CAT_NAME],
                 parsedSourceDbName[DB_NAME], sourceTableName) + " not found");
       }
+
       List<String> partVals = MetaStoreUtils.getPvals(sourceTable.getPartitionKeys(),
           partitionSpecs);
       List<String> partValsPresent = new ArrayList<> ();
@@ -3950,6 +3961,11 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         }
       }
 
+      if (!isRenameAllowed(parsedDestDbName[CAT_NAME], parsedSourceDbName[DB_NAME], parsedDestDbName[DB_NAME])) {
+        throw new MetaException("Exchange partition not allowed for " +
+                getCatalogQualifiedTableName(parsedSourceDbName[CAT_NAME],
+                parsedSourceDbName[DB_NAME], sourceTableName) + " Dest db : " + destDbName);
+      }
       try {
         for (Partition partition: partitionsToExchange) {
           Partition destPartition = new Partition(partition);
@@ -4045,6 +4061,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       Path archiveParentDir = null;
       boolean mustPurge = false;
       boolean isExternalTbl = false;
+      boolean isSourceOfReplication = false;
       Map<String, String> transactionalListenerResponses = Collections.emptyMap();
 
       if (db_name == null) {
@@ -4092,6 +4109,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
                                                       new DropPartitionEvent(tbl, part, true, deleteData, this),
                                                       envContext);
           }
+          isSourceOfReplication = ReplChangeManager.isSourceOfReplication(ms.getDatabase(catName, db_name));
           success = ms.commitTransaction();
         }
       } finally {
@@ -4107,13 +4125,14 @@ public class HiveMetaStore extends ThriftHiveMetastore {
             }
             // Archived partitions have har:/to_har_file as their location.
             // The original directory was saved in params
+
             if (isArchived) {
               assert (archiveParentDir != null);
-              wh.deleteDir(archiveParentDir, true, mustPurge);
+              wh.deleteDir(archiveParentDir, true, mustPurge, isSourceOfReplication);
             } else {
               assert (partPath != null);
-              wh.deleteDir(partPath, true, mustPurge);
-              deleteParentRecursive(partPath.getParent(), part_vals.size() - 1, mustPurge);
+              wh.deleteDir(partPath, true, mustPurge, isSourceOfReplication);
+              deleteParentRecursive(partPath.getParent(), part_vals.size() - 1, mustPurge, isSourceOfReplication);
             }
             // ok even if the data is not deleted
           }
@@ -4141,12 +4160,13 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         || (tbl.isSetParameters() && "true".equalsIgnoreCase(tbl.getParameters().get("auto.purge")));
 
     }
-    private void deleteParentRecursive(Path parent, int depth, boolean mustPurge) throws IOException, MetaException {
+    private void deleteParentRecursive(Path parent, int depth, boolean mustPurge, boolean needRecycle)
+            throws IOException, MetaException {
       if (depth > 0 && parent != null && wh.isWritable(parent)) {
         if (wh.isDir(parent) && wh.isEmpty(parent)) {
-          wh.deleteDir(parent, true, mustPurge);
+          wh.deleteDir(parent, true, mustPurge, needRecycle);
         }
-        deleteParentRecursive(parent.getParent(), depth - 1, mustPurge);
+        deleteParentRecursive(parent.getParent(), depth - 1, mustPurge, needRecycle);
       }
     }
 
@@ -4188,6 +4208,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       List<Partition> parts = null;
       boolean mustPurge = false;
       List<Map<String, String>> transactionalListenerResponses = Lists.newArrayList();
+      boolean isSourceOfReplication = ReplChangeManager.isSourceOfReplication(ms.getDatabase(catName, dbName));
 
       try {
         // We need Partition-s for firing events and for result; DN needs MPartition-s to drop.
@@ -4295,12 +4316,12 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           // Archived partitions have har:/to_har_file as their location.
           // The original directory was saved in params
           for (Path path : archToDelete) {
-            wh.deleteDir(path, true, mustPurge);
+            wh.deleteDir(path, true, mustPurge, isSourceOfReplication);
           }
           for (PathAndPartValSize p : dirsToDelete) {
-            wh.deleteDir(p.path, true, mustPurge);
+            wh.deleteDir(p.path, true, mustPurge, isSourceOfReplication);
             try {
-              deleteParentRecursive(p.path.getParent(), p.partValSize - 1, mustPurge);
+              deleteParentRecursive(p.path.getParent(), p.partValSize - 1, mustPurge, isSourceOfReplication);
             } catch (IOException ex) {
               LOG.warn("Error from deleteParentRecursive", ex);
               throw new MetaException("Failed to delete parent: " + ex.getMessage());
@@ -4931,6 +4952,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         throws InvalidOperationException, MetaException {
       startFunction("alter_table", ": " + getCatalogQualifiedTableName(catName, dbname, name)
           + " newtbl=" + newTable.getTableName());
+
       // Update the time if it hasn't been specified.
       if (newTable.getParameters() == null ||
           newTable.getParameters().get(hive_metastoreConstants.DDL_TIME) == null) {
@@ -4955,6 +4977,11 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       Exception ex = null;
       try {
         Table oldt = get_table_core(catName, dbname, name);
+        if (!isRenameAllowed(catName, dbname, newTable.getDbName())) {
+          throw new MetaException("Alter table not allowed for table " +
+                  getCatalogQualifiedTableName(catName, dbname, name) +
+                  " new table = " + getCatalogQualifiedTableName(newTable));
+        }
         firePreEvent(new PreAlterTableEvent(oldt, newTable, this));
         alterHandler.alterTable(getMS(), wh, catName, dbname, name, newTable,
                 envContext, this);
@@ -6938,12 +6965,15 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         if (func == null) {
           throw new NoSuchObjectException("Function " + funcName + " does not exist");
         }
+        Boolean isSourceOfReplication =
+              ReplChangeManager.isSourceOfReplication(get_database_core(parsedDbName[CAT_NAME], parsedDbName[DB_NAME]));
+
         // if copy of jar to change management fails we fail the metastore transaction, since the
         // user might delete the jars on HDFS externally after dropping the function, hence having
         // a copy is required to allow incremental replication to work correctly.
         if (func.getResourceUris() != null && !func.getResourceUris().isEmpty()) {
           for (ResourceUri uri : func.getResourceUris()) {
-            if (uri.getUri().toLowerCase().startsWith("hdfs:")) {
+            if (uri.getUri().toLowerCase().startsWith("hdfs:") && isSourceOfReplication) {
               wh.addToChangeManagement(new Path(uri.getUri()));
             }
           }

http://git-wip-us.apache.org/repos/asf/hive/blob/752ba682/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
index 79ba7ff..f7018c2 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.metastore;
 
 import java.io.IOException;
+import java.util.Map;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -33,6 +34,7 @@ import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.Trash;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
@@ -56,6 +58,7 @@ public class ReplChangeManager {
   private static final String ORIG_LOC_TAG = "user.original-loc";
   static final String REMAIN_IN_TRASH_TAG = "user.remain-in-trash";
   private static final String URI_FRAGMENT_SEPARATOR = "#";
+  public static final String SOURCE_OF_REPLICATION = "repl.source.for";
 
   public enum RecycleType {
     MOVE,
@@ -467,4 +470,24 @@ public class ReplChangeManager {
           0, MetastoreConf.getTimeVar(conf, ConfVars.REPLCMINTERVAL, TimeUnit.SECONDS), TimeUnit.SECONDS);
     }
   }
+
+  public static boolean isSourceOfReplication(Database db) {
+    // Can not judge, so assuming replication is not enabled.
+    assert (db != null);
+    String replPolicyIds = getReplPolicyIdString(db);
+    return  !StringUtils.isEmpty(replPolicyIds);
+  }
+
+  public static String getReplPolicyIdString(Database db) {
+    if (db != null) {
+      Map<String, String> m = db.getParameters();
+      if ((m != null) && (m.containsKey(SOURCE_OF_REPLICATION))) {
+        String replPolicyId = m.get(SOURCE_OF_REPLICATION);
+        LOG.debug("repl policy for database {} is {}", db.getName(), replPolicyId);
+        return replPolicyId;
+      }
+      LOG.debug("Repl policy is not set for database ", db.getName());
+    }
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/752ba682/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java
index 88cbfcd..e31935e 100755
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java
@@ -301,18 +301,16 @@ public class Warehouse {
     }
   }
 
-  public boolean deleteDir(Path f, boolean recursive) throws MetaException {
-    return deleteDir(f, recursive, false);
+  public boolean deleteDir(Path f, boolean recursive, Database db) throws MetaException {
+    return deleteDir(f, recursive, false, db);
   }
 
-  public boolean deleteDir(Path f, boolean recursive, boolean ifPurge) throws MetaException {
-    return deleteDir(f, recursive, ifPurge, true);
+  public boolean deleteDir(Path f, boolean recursive, boolean ifPurge, Database db) throws MetaException {
+    return deleteDir(f, recursive, ifPurge, ReplChangeManager.isSourceOfReplication(db));
   }
 
   public boolean deleteDir(Path f, boolean recursive, boolean ifPurge, boolean needCmRecycle) throws MetaException {
-    // no need to create the CM recycle file for temporary tables
     if (needCmRecycle) {
-
       try {
         cm.recycle(f, RecycleType.MOVE, ifPurge);
       } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/hive/blob/752ba682/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/model/MDatabase.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/model/MDatabase.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/model/MDatabase.java
index fa30330..815b39c 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/model/MDatabase.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/model/MDatabase.java
@@ -21,7 +21,12 @@
  */
 package org.apache.hadoop.hive.metastore.model;
 
+import org.apache.hadoop.hive.metastore.ReplChangeManager;
+
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * Storage Class representing the Hive MDatabase in a rdbms
@@ -109,7 +114,21 @@ public class MDatabase {
    * @param parameters the parameters mapping.
    */
   public void setParameters(Map<String, String> parameters) {
-    this.parameters = parameters;
+    if (parameters == null) {
+      this.parameters = null;
+      return;
+    }
+    this.parameters = new HashMap<>();
+    Set<String> keys = new HashSet<>(parameters.keySet());
+    for(String key : keys) {
+      // Normalize the case for source of replication parameter
+      if (ReplChangeManager.SOURCE_OF_REPLICATION.equalsIgnoreCase(key)) {
+        // TODO :  Some extra validation can also be added as this is a user provided parameter.
+        this.parameters.put(ReplChangeManager.SOURCE_OF_REPLICATION, parameters.get(key));
+      } else {
+        this.parameters.put(key, parameters.get(key));
+      }
+    }
   }
 
   public String getOwnerName() {