You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ma...@apache.org on 2019/07/30 03:46:13 UTC

[hive] branch master updated: HIVE-22036 : HMS should identify events corresponding to replicated database for Atlas HMS hook. (Ashutosh Bapat reviewed by Mahesh Kumar Behera)

This is an automated email from the ASF dual-hosted git repository.

mahesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new ba26fcf  HIVE-22036 : HMS should identify events corresponding to replicated database for Atlas HMS hook. (Ashutosh Bapat reviewed by Mahesh Kumar Behera)
ba26fcf is described below

commit ba26fcfdd6f27c148d8f4ff2e2c4001530cb039b
Author: Ashutosh Bapat <ab...@cloudera.com>
AuthorDate: Tue Jul 30 09:13:38 2019 +0530

    HIVE-22036 : HMS should identify events corresponding to replicated database for Atlas HMS hook. (Ashutosh Bapat reviewed by Mahesh Kumar Behera)
    
    Signed-off-by: Mahesh Kumar Behera <ma...@apache.org>
---
 .../parse/ReplMetaStoreEventListenerTestImpl.java  | 155 ++++++++++++++++
 .../ql/parse/TestMetaStoreEventListenerInRepl.java | 199 +++++++++++++++++++++
 .../hadoop/hive/ql/exec/repl/util/ReplUtils.java   |   2 +-
 .../apache/hadoop/hive/common/repl/ReplConst.java  |   7 +
 .../hadoop/hive/metastore/HiveAlterHandler.java    |  10 +-
 .../hadoop/hive/metastore/HiveMetaStore.java       |  51 ++++--
 .../hive/metastore/events/AlterDatabaseEvent.java  |  10 +-
 .../hive/metastore/events/AlterTableEvent.java     |   8 +-
 .../hive/metastore/events/CreateDatabaseEvent.java |   9 +-
 .../hive/metastore/events/CreateTableEvent.java    |  11 +-
 .../hive/metastore/events/DropDatabaseEvent.java   |   9 +-
 .../hive/metastore/events/DropTableEvent.java      |  10 +-
 12 files changed, 459 insertions(+), 22 deletions(-)

diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/ReplMetaStoreEventListenerTestImpl.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/ReplMetaStoreEventListenerTestImpl.java
new file mode 100644
index 0000000..31fdb77
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/ReplMetaStoreEventListenerTestImpl.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.MetaStoreEventListener;
+import org.apache.hadoop.hive.metastore.events.AlterDatabaseEvent;
+import org.apache.hadoop.hive.metastore.events.AlterTableEvent;
+import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent;
+import org.apache.hadoop.hive.metastore.events.CreateTableEvent;
+import org.apache.hadoop.hive.metastore.events.DropTableEvent;
+import org.apache.hadoop.hive.metastore.events.ListenerEvent;
+import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * ReplMetaStoreEventListenerTestImpl - Implementation of MetaStoreEventListener to test
+ * isReplicated flag in some of the tests.
+ */
+public class ReplMetaStoreEventListenerTestImpl extends MetaStoreEventListener {
+  protected static final Logger LOG =
+          LoggerFactory.getLogger(ReplMetaStoreEventListenerTestImpl.class);
+  static Map<String, Set<String>> replicatedDbsForEvents = new HashMap<>();
+  static Map<String, Set<String>> nonReplicatedDbsForEvents = new HashMap<>();
+  static Map<String, Set<String>> replicatedTablesForEvents = new HashMap<>();
+
+  public ReplMetaStoreEventListenerTestImpl(Configuration conf) {
+    super(conf);
+  }
+
+  private void addNameToEventMap(Map<String, Set<String>> eventMap, String name,
+                                 ListenerEvent event) {
+    String eventType = event.getClass().getName();
+    Set<String> eventNames = eventMap.get(eventType);
+    if (eventNames == null) {
+      eventNames = new HashSet<>();
+      eventMap.put(eventType, eventNames);
+    }
+    eventNames.add(name.toLowerCase());
+  }
+
+  @Override
+  public void onCreateDatabase(CreateDatabaseEvent createDatabaseEvent) {
+    String dbName = createDatabaseEvent.getDatabase().getName().toLowerCase();
+    if (createDatabaseEvent.isReplicated()) {
+      addNameToEventMap(replicatedDbsForEvents, dbName, createDatabaseEvent);
+    } else {
+      addNameToEventMap(nonReplicatedDbsForEvents, dbName, createDatabaseEvent);
+    }
+  }
+
+  @Override
+  public void onAlterDatabase(AlterDatabaseEvent alterDatabaseEvent) {
+    // The test doesn't create any database rename events, so it's fine to just check the new
+    // database name.
+    String dbName = alterDatabaseEvent.getNewDatabase().getName().toLowerCase();
+    if (alterDatabaseEvent.isReplicated()) {
+      addNameToEventMap(replicatedDbsForEvents, dbName, alterDatabaseEvent);
+    } else {
+      addNameToEventMap(nonReplicatedDbsForEvents, dbName, alterDatabaseEvent);
+    }
+  }
+
+  @Override
+  public void onCreateTable(CreateTableEvent createTableEvent) {
+    String dbName = createTableEvent.getTable().getDbName();
+    String tblName = createTableEvent.getTable().getTableName();
+    if (createTableEvent.isReplicated()) {
+      addNameToEventMap(replicatedDbsForEvents, dbName, createTableEvent);
+      addNameToEventMap(replicatedTablesForEvents, tblName, createTableEvent);
+    } else {
+      addNameToEventMap(nonReplicatedDbsForEvents, dbName, createTableEvent);
+    }
+  }
+
+  @Override
+  public void onAlterTable(AlterTableEvent alterTableEvent) {
+    // Test doesn't have table rename events, since we are only interested in checking replication
+    // status. So, it's fine to get the names from the new table.
+    String dbName = alterTableEvent.getNewTable().getDbName();
+    String tblName = alterTableEvent.getNewTable().getTableName();
+    if (alterTableEvent.isReplicated()) {
+      addNameToEventMap(replicatedDbsForEvents, dbName, alterTableEvent);
+      addNameToEventMap(replicatedTablesForEvents, tblName, alterTableEvent);
+    } else {
+      addNameToEventMap(nonReplicatedDbsForEvents, dbName, alterTableEvent);
+    }
+  }
+
+  @Override
+  public void onDropTable(DropTableEvent dropTableEvent) {
+    String dbName = dropTableEvent.getTable().getDbName();
+    String tblName = dropTableEvent.getTable().getTableName();
+    if (dropTableEvent.isReplicated()) {
+      addNameToEventMap(replicatedDbsForEvents, dbName, dropTableEvent);
+      addNameToEventMap(replicatedTablesForEvents, tblName, dropTableEvent);
+    } else {
+      addNameToEventMap(nonReplicatedDbsForEvents, dbName, dropTableEvent);
+    }
+
+  }
+
+  static void checkEventSanity(Map<String, Set<String>> eventsMap, String replicaDbName) {
+    replicaDbName = replicaDbName.toLowerCase();
+    for (String event : eventsMap.keySet()) {
+      Set<String> dbsForEvent = replicatedDbsForEvents.get(event);
+      LOG.info("Examining dbs and tables for event " + event);
+      // isreplicated should be true only for replicated database
+      Assert.assertTrue(dbsForEvent.contains(replicaDbName));
+      Assert.assertEquals(1, dbsForEvent.size());
+      if (nonReplicatedDbsForEvents.get(event) != null) {
+        Assert.assertFalse(nonReplicatedDbsForEvents.get(event).contains(replicaDbName));
+      }
+
+      Set<String> eventTables = replicatedTablesForEvents.get(event);
+      Assert.assertEquals(eventsMap.get(event), eventTables);
+    }
+  }
+
+  static void clearSanityMap(Map<String, Set<String>> map) {
+    for (Set<String> eventEntry : map.values()) {
+      if (eventEntry != null) {
+        eventEntry.clear();
+      }
+    }
+    map.clear();
+  }
+
+  static void clearSanityData() {
+    clearSanityMap(replicatedDbsForEvents);
+    clearSanityMap(nonReplicatedDbsForEvents);
+    clearSanityMap(replicatedTablesForEvents);
+  }
+}
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestMetaStoreEventListenerInRepl.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestMetaStoreEventListenerInRepl.java
new file mode 100644
index 0000000..7121dfb
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestMetaStoreEventListenerInRepl.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse;
+
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.events.AlterDatabaseEvent;
+import org.apache.hadoop.hive.metastore.events.AlterTableEvent;
+import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent;
+import org.apache.hadoop.hive.metastore.events.CreateTableEvent;
+import org.apache.hadoop.hive.metastore.events.DropTableEvent;
+import org.apache.hadoop.hive.shims.Utils;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.AfterClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
+
+/**
+ * TestMetaStoreEventListenerInRepl - Test metastore events created by replication.
+ */
+public class TestMetaStoreEventListenerInRepl {
+
+  @Rule
+  public final TestName testName = new TestName();
+
+  protected static final Logger LOG = LoggerFactory.getLogger(TestMetaStoreEventListenerInRepl.class);
+  static WarehouseInstance primary;
+  static WarehouseInstance replica;
+  static HiveConf conf;
+  String primaryDbName, replicatedDbName;
+
+  @BeforeClass
+  public static void internalBeforeClassSetup() throws Exception {
+    TestMetaStoreEventListenerInRepl.conf = new HiveConf(TestMetaStoreEventListenerInRepl.class);
+    TestMetaStoreEventListenerInRepl.conf.set("dfs.client.use.datanode.hostname", "true");
+    TestMetaStoreEventListenerInRepl.conf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*");
+    MiniDFSCluster miniDFSCluster =
+        new MiniDFSCluster.Builder(TestMetaStoreEventListenerInRepl.conf).numDataNodes(1).format(true).build();
+
+    Map<String, String> conf = new HashMap<String, String>() {{
+	      put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString());
+	      put("hive.support.concurrency", "true");
+	      put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager");
+	      put("hive.metastore.client.capability.check", "false");
+	      put("hive.repl.bootstrap.dump.open.txn.timeout", "1s");
+	      put("hive.exec.dynamic.partition.mode", "nonstrict");
+	      put("hive.strict.checks.bucketing", "false");
+	      put("hive.mapred.mode", "nonstrict");
+	      put("mapred.input.dir.recursive", "true");
+	      put("hive.metastore.disallow.incompatible.col.type.changes", "false");
+	      put("hive.in.repl.test", "true");
+	      put(MetastoreConf.ConfVars.EVENT_LISTENERS.getVarname(),
+                ReplMetaStoreEventListenerTestImpl.class.getName());
+    }};
+
+    primary = new WarehouseInstance(LOG, miniDFSCluster, conf);
+    replica = new WarehouseInstance(LOG, miniDFSCluster, conf);
+  }
+
+  @AfterClass
+  public static void classLevelTearDown() throws IOException {
+    primary.close();
+    replica.close();
+  }
+
+  @Before
+  public void setup() throws Throwable {
+    primaryDbName = testName.getMethodName() + "_" + +System.currentTimeMillis();
+    replicatedDbName = "replicated_" + primaryDbName;
+    primary.run("create database " + primaryDbName + " WITH DBPROPERTIES ( '" +
+            SOURCE_OF_REPLICATION + "' = '1,2,3')");
+  }
+
+  @After
+  public void tearDown() throws Throwable {
+    primary.run("drop database if exists " + primaryDbName + " cascade");
+    replica.run("drop database if exists " + replicatedDbName + " cascade");
+  }
+
+  private Map<String, Set<String>> prepareBootstrapData(String primaryDbName) throws Throwable {
+    primary.run("use " + primaryDbName)
+            .run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc " +
+            "tblproperties (\"transactional\"=\"true\")")
+            .run("insert into t1 values(1)")
+            .run("create table t2 (place string) partitioned by (country string) clustered by(place) " +
+                    "into 3 buckets stored as orc tblproperties (\"transactional\"=\"true\")")
+            .run("insert into t2 partition(country='india') values ('bangalore')")
+            .run("create table t4 (id int)")
+            .run("insert into t4 values(111), (222)");
+
+    // Add expected events with associated tables, if any.
+    Map<String, Set<String>> eventsMap = new HashMap<>();
+    eventsMap.put(CreateDatabaseEvent.class.getName(), null);
+    // Replication causes many implicit alter database operations, so metastore will see some
+    // alter table events as well.
+    eventsMap.put(AlterDatabaseEvent.class.getName(), null);
+    eventsMap.put(CreateTableEvent.class.getName(), new HashSet<>(Arrays.asList("t1", "t2", "t4")));
+    eventsMap.put(AlterTableEvent.class.getName(), new HashSet<>(Arrays.asList("t1", "t2", "t4")));
+    return eventsMap;
+  }
+
+  Map<String, Set<String>> prepareIncData(String dbName) throws Throwable {
+    primary.run("use " + dbName)
+            .run("create table t6 stored as orc tblproperties (\"transactional\"=\"true\")" +
+                    " as select * from t1")
+            .run("alter table t2 add columns (placetype string)")
+            .run("update t2 set placetype = 'city'")
+            .run("insert into t1 values (3)")
+            .run("drop table t2")
+            .run("alter database " + dbName + " set dbproperties('some.useless.property'='1')");
+
+    // Add expected events with associated tables, if any.
+    Map<String, Set<String>> eventsMap = new HashMap<>();
+    // Replication causes many implicit alter database operations, so metastore will see some
+    // alter table events as well.
+    eventsMap.put(AlterDatabaseEvent.class.getName(), null);
+    eventsMap.put(CreateTableEvent.class.getName(), new HashSet<>(Arrays.asList("t6")));
+    eventsMap.put(AlterTableEvent.class.getName(), new HashSet<>(Arrays.asList("t1", "t2", "t6")));
+    eventsMap.put(DropTableEvent.class.getName(), new HashSet<>(Arrays.asList("t2")));
+
+    return eventsMap;
+  }
+
+  Map<String, Set<String>> prepareInc2Data(String dbName) throws Throwable {
+    primary.run("use " + dbName)
+            .run("insert into t4 values (333)")
+            .run("create table t7 (str string)")
+            .run("insert into t7 values ('aaa')")
+            .run("drop table t1");
+    // Add expected events with associated tables, if any.
+    Map<String, Set<String>> eventsMap = new HashMap<>();
+    // Replication causes many implicit alter database operations, so metastore will see some
+    // alter table events as well.
+    eventsMap.put(AlterDatabaseEvent.class.getName(), null);
+    eventsMap.put(CreateTableEvent.class.getName(), new HashSet<>(Arrays.asList("t7")));
+    eventsMap.put(AlterTableEvent.class.getName(), new HashSet<>(Arrays.asList("t4", "t7")));
+    eventsMap.put(DropTableEvent.class.getName(), new HashSet<>(Arrays.asList("t1")));
+
+    return eventsMap;
+  }
+
+  @Test
+  public void testReplEvents() throws Throwable {
+    Map<String, Set<String>> eventsMap = prepareBootstrapData(primaryDbName);
+    WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName)
+            .dump(primaryDbName, null);
+    replica.load(replicatedDbName, bootstrapDump.dumpLocation);
+    ReplMetaStoreEventListenerTestImpl.checkEventSanity(eventsMap, replicatedDbName);
+    ReplMetaStoreEventListenerTestImpl.clearSanityData();
+
+    eventsMap = prepareIncData(primaryDbName);
+    LOG.info(testName.getMethodName() + ": first incremental dump and load.");
+    WarehouseInstance.Tuple incDump = primary.run("use " + primaryDbName)
+            .dump(primaryDbName, bootstrapDump.lastReplicationId);
+    replica.load(replicatedDbName, incDump.dumpLocation);
+    ReplMetaStoreEventListenerTestImpl.checkEventSanity(eventsMap, replicatedDbName);
+    ReplMetaStoreEventListenerTestImpl.clearSanityData();
+
+    // Second incremental, after bootstrap
+    eventsMap = prepareInc2Data(primaryDbName);
+    LOG.info(testName.getMethodName() + ": second incremental dump and load.");
+    WarehouseInstance.Tuple inc2Dump = primary.run("use " + primaryDbName)
+            .dump(primaryDbName, incDump.lastReplicationId);
+    replica.load(replicatedDbName, inc2Dump.dumpLocation);
+    ReplMetaStoreEventListenerTestImpl.checkEventSanity(eventsMap, replicatedDbName);
+    ReplMetaStoreEventListenerTestImpl.clearSanityData();
+  }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
index 1df5077..23127c9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
@@ -64,7 +64,7 @@ import static org.apache.hadoop.hive.ql.util.HiveStrictManagedMigration.TableMig
 public class ReplUtils {
 
   public static final String LAST_REPL_ID_KEY = "hive.repl.last.repl.id";
-  public static final String REPL_CHECKPOINT_KEY = "hive.repl.ckpt.key";
+  public static final String REPL_CHECKPOINT_KEY = ReplConst.REPL_TARGET_DB_PROPERTY;
   public static final String REPL_FIRST_INC_PENDING_FLAG = "hive.repl.first.inc.pending";
 
   // write id allocated in the current execution context which will be passed through config to be used by different
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplConst.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplConst.java
index c2c8e4b..7c29969 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplConst.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplConst.java
@@ -30,4 +30,11 @@ public class ReplConst {
   public static final String REPL_DATA_LOCATION_CHANGED = "REPL_DATA_LOCATION_CHANGED";
 
   public static final String TRUE = "true";
+
+  /**
+   * The constant string literal added as a property of database being replicated into. We choose
+   * this property over other properties is because this property is added right when the
+   * database is created as part of repl load and survives the incremental cycles.
+   */
+  public static final String REPL_TARGET_DB_PROPERTY = "hive.repl.ckpt.key";
 }
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java
index 9756425..6234501 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java
@@ -103,6 +103,7 @@ public class HiveAlterHandler implements AlterHandler {
 
     final boolean cascade;
     final boolean replDataLocationChanged;
+    final boolean isReplicated;
     if ((environmentContext != null) && environmentContext.isSetProperties()) {
       cascade = StatsSetupConst.TRUE.equals(environmentContext.getProperties().get(StatsSetupConst.CASCADE));
       replDataLocationChanged = ReplConst.TRUE.equals(environmentContext.getProperties().get(ReplConst.REPL_DATA_LOCATION_CHANGED));
@@ -173,6 +174,9 @@ public class HiveAlterHandler implements AlterHandler {
 
       validateTableChangesOnReplSource(olddb, oldt, newt, environmentContext);
 
+      // On a replica this alter table will be executed only if old and new both the databases are
+      // available and being replicated into. Otherwise, it will be either create or drop of table.
+      isReplicated = HiveMetaStore.HMSHandler.isDbReplicationTarget(olddb);
       if (oldt.getPartitionKeysSize() != 0) {
         isPartitionedTable = true;
       }
@@ -240,6 +244,7 @@ public class HiveAlterHandler implements AlterHandler {
 
             // get new location
             Database db = msdb.getDatabase(catName, newDbName);
+            assert(isReplicated == HiveMetaStore.HMSHandler.isDbReplicationTarget(db));
             Path databasePath = constructRenamedPath(wh.getDatabasePath(db), srcPath);
             destPath = new Path(databasePath, newTblName);
             destFs = wh.getFs(destPath);
@@ -346,6 +351,7 @@ public class HiveAlterHandler implements AlterHandler {
         if (MetaStoreServerUtils.requireCalStats(null, null, newt, environmentContext) &&
             !isPartitionedTable) {
           Database db = msdb.getDatabase(catName, newDbName);
+          assert(isReplicated == HiveMetaStore.HMSHandler.isDbReplicationTarget(db));
           // Update table stats. For partitioned table, we update stats in alterPartition()
           MetaStoreServerUtils.updateTableStatsSlow(db, newt, wh, false, true, environmentContext);
         }
@@ -388,7 +394,7 @@ public class HiveAlterHandler implements AlterHandler {
         txnAlterTableEventResponses = MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
                   EventMessage.EventType.ALTER_TABLE,
                   new AlterTableEvent(oldt, newt, false, true,
-                          newt.getWriteId(), handler),
+                          newt.getWriteId(), handler, isReplicated),
                   environmentContext);
       }
       // commit the changes
@@ -452,7 +458,7 @@ public class HiveAlterHandler implements AlterHandler {
       // make this call whether the event failed or succeeded. To make this behavior consistent,
       // this call is made for failed events also.
       MetaStoreListenerNotifier.notifyEvent(listeners, EventMessage.EventType.ALTER_TABLE,
-          new AlterTableEvent(oldt, newt, false, success, newt.getWriteId(), handler),
+          new AlterTableEvent(oldt, newt, false, success, newt.getWriteId(), handler, isReplicated),
           environmentContext, txnAlterTableEventResponses, msdb);
     }
   }
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index 7e97f8d..dcbe0c2 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -84,6 +84,7 @@ import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.common.ZooKeeperHiveHelper;
 import org.apache.hadoop.hive.common.ZKDeRegisterWatcher;
+import org.apache.hadoop.hive.common.repl.ReplConst;
 import org.apache.hadoop.hive.metastore.api.*;
 import org.apache.hadoop.hive.metastore.events.AddForeignKeyEvent;
 import org.apache.hadoop.hive.metastore.events.AcidWriteEvent;
@@ -1393,6 +1394,17 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       }
     }
 
+    static boolean isDbReplicationTarget(Database db) {
+      if (db.getParameters() == null) {
+        return false;
+      }
+
+      if (!db.getParameters().containsKey(ReplConst.REPL_TARGET_DB_PROPERTY)) {
+        return false;
+      }
+
+      return !db.getParameters().get(ReplConst.REPL_TARGET_DB_PROPERTY).trim().isEmpty();
+    }
 
     // Assumes that the catalog has already been set.
     private void create_database_core(RawStore ms, final Database db)
@@ -1414,6 +1426,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       db.setCreateTime((int) time);
       boolean success = false;
       boolean madeDir = false;
+      boolean isReplicated = isDbReplicationTarget(db);
       Map<String, String> transactionalListenersResponses = Collections.emptyMap();
       try {
         firePreEvent(new PreCreateDatabaseEvent(db, this));
@@ -1433,7 +1446,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           transactionalListenersResponses =
               MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
                                                     EventType.CREATE_DATABASE,
-                                                    new CreateDatabaseEvent(db, true, this));
+                                                    new CreateDatabaseEvent(db, true, this, isReplicated));
         }
 
         success = ms.commitTransaction();
@@ -1448,7 +1461,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         if (!listeners.isEmpty()) {
           MetaStoreListenerNotifier.notifyEvent(listeners,
                                                 EventType.CREATE_DATABASE,
-                                                new CreateDatabaseEvent(db, success, this),
+                                                new CreateDatabaseEvent(db, success, this, isReplicated),
                                                 null,
                                                 transactionalListenersResponses, ms);
         }
@@ -1545,6 +1558,11 @@ public class HiveMetaStore extends ThriftHiveMetastore {
 
       String[] parsedDbName = parseDbName(dbName, conf);
 
+      // We can replicate into an empty database, in which case newDB will have indication that
+      // it's target of replication but not oldDB. But replication flow will never alter a
+      // database so that oldDB indicates that it's target or replication but not the newDB. So,
+      // relying solely on newDB to check whether the database is target of replication works.
+      boolean isReplicated = isDbReplicationTarget(newDB);
       try {
         oldDB = get_database_core(parsedDbName[CAT_NAME], parsedDbName[DB_NAME]);
         if (oldDB == null) {
@@ -1560,7 +1578,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           transactionalListenersResponses =
               MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
                   EventType.ALTER_DATABASE,
-                  new AlterDatabaseEvent(oldDB, newDB, true, this));
+                  new AlterDatabaseEvent(oldDB, newDB, true, this, isReplicated));
         }
 
         success = ms.commitTransaction();
@@ -1575,7 +1593,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         if ((null != oldDB) && (!listeners.isEmpty())) {
           MetaStoreListenerNotifier.notifyEvent(listeners,
               EventType.ALTER_DATABASE,
-              new AlterDatabaseEvent(oldDB, newDB, success, this),
+              new AlterDatabaseEvent(oldDB, newDB, success, this, isReplicated),
               null,
               transactionalListenersResponses, ms);
         }
@@ -1595,9 +1613,11 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       if (name == null) {
         throw new MetaException("Database name cannot be null.");
       }
+      boolean isReplicated = false;
       try {
         ms.openTransaction();
         db = ms.getDatabase(catName, name);
+        isReplicated = isDbReplicationTarget(db);
 
         if (!isInTest && ReplChangeManager.isSourceOfReplication(db)) {
           throw new InvalidOperationException("can not drop a database which is a source of replication");
@@ -1728,7 +1748,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
             transactionalListenerResponses =
                 MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
                                                       EventType.DROP_DATABASE,
-                                                      new DropDatabaseEvent(db, true, this));
+                                                      new DropDatabaseEvent(db, true, this, isReplicated));
           }
 
           success = ms.commitTransaction();
@@ -1756,7 +1776,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         if (!listeners.isEmpty()) {
           MetaStoreListenerNotifier.notifyEvent(listeners,
                                                 EventType.DROP_DATABASE,
-                                                new DropDatabaseEvent(db, success, this),
+                                                new DropDatabaseEvent(db, success, this, isReplicated),
                                                 null,
                                                 transactionalListenerResponses, ms);
         }
@@ -1985,6 +2005,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       Path tblPath = null;
       boolean success = false, madeDir = false;
       Database db = null;
+      boolean isReplicated = false;
       try {
         if (!tbl.isSetCatName()) {
           tbl.setCatName(getDefaultCatalog(conf));
@@ -1994,6 +2015,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         ms.openTransaction();
 
         db = ms.getDatabase(tbl.getCatName(), tbl.getDbName());
+        isReplicated = isDbReplicationTarget(db);
 
         // get_table checks whether database exists, it should be moved here
         if (is_table_exists(ms, tbl.getCatName(), tbl.getDbName(), tbl.getTableName())) {
@@ -2128,7 +2150,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
 
         if (!transactionalListeners.isEmpty()) {
           transactionalListenerResponses = MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
-              EventType.CREATE_TABLE, new CreateTableEvent(tbl, true, this), envContext);
+              EventType.CREATE_TABLE, new CreateTableEvent(tbl, true, this, isReplicated), envContext);
           if (primaryKeys != null && !primaryKeys.isEmpty()) {
             MetaStoreListenerNotifier.notifyEvent(transactionalListeners, EventType.ADD_PRIMARYKEY,
                 new AddPrimaryKeyEvent(primaryKeys, true, this), envContext);
@@ -2158,7 +2180,8 @@ public class HiveMetaStore extends ThriftHiveMetastore {
 
         if (!listeners.isEmpty()) {
           MetaStoreListenerNotifier.notifyEvent(listeners, EventType.CREATE_TABLE,
-              new CreateTableEvent(tbl, success, this), envContext, transactionalListenerResponses, ms);
+              new CreateTableEvent(tbl, success, this, isReplicated), envContext,
+                  transactionalListenerResponses, ms);
           if (primaryKeys != null && !primaryKeys.isEmpty()) {
             MetaStoreListenerNotifier.notifyEvent(listeners, EventType.ADD_PRIMARYKEY,
                 new AddPrimaryKeyEvent(primaryKeys, success, this), envContext);
@@ -2643,9 +2666,11 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       boolean ifPurge = false;
       Map<String, String> transactionalListenerResponses = Collections.emptyMap();
       Database db = null;
+      boolean isReplicated = false;
       try {
         ms.openTransaction();
         db = ms.getDatabase(catName, dbname);
+        isReplicated = isDbReplicationTarget(db);
 
         // drop any partitions
         tbl = get_table_core(catName, dbname, name);
@@ -2686,7 +2711,8 @@ public class HiveMetaStore extends ThriftHiveMetastore {
             transactionalListenerResponses =
                 MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
                                                       EventType.DROP_TABLE,
-                                                      new DropTableEvent(tbl, true, deleteData, this),
+                                                      new DropTableEvent(tbl, true, deleteData,
+                                                              this, isReplicated),
                                                       envContext);
           }
           success = ms.commitTransaction();
@@ -2706,7 +2732,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         if (!listeners.isEmpty()) {
           MetaStoreListenerNotifier.notifyEvent(listeners,
                                                 EventType.DROP_TABLE,
-                                                new DropTableEvent(tbl, success, deleteData, this),
+                                                new DropTableEvent(tbl, success, deleteData, this, isReplicated),
                                                 envContext,
                                                 transactionalListenerResponses, ms);
         }
@@ -2924,18 +2950,19 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           EnvironmentContext environmentContext = new EnvironmentContext();
           updateStatsForTruncate(table.getParameters(), environmentContext);
 
+          boolean isReplicated = isDbReplicationTarget(ms.getDatabase(catName, dbName));
           if (!transactionalListeners.isEmpty()) {
             MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
                     EventType.ALTER_TABLE,
                     new AlterTableEvent(table, table, true, true,
-                            writeId, this));
+                            writeId, this, isReplicated));
           }
 
           if (!listeners.isEmpty()) {
             MetaStoreListenerNotifier.notifyEvent(listeners,
                     EventType.ALTER_TABLE,
                     new AlterTableEvent(table, table, true, true,
-                            writeId, this));
+                            writeId, this, isReplicated));
           }
 
           // TODO: this should actually pass thru and set writeId for txn stats.
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AlterDatabaseEvent.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AlterDatabaseEvent.java
index 3f6eae9..fd14952 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AlterDatabaseEvent.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AlterDatabaseEvent.java
@@ -33,11 +33,14 @@ public class AlterDatabaseEvent extends ListenerEvent {
 
   private final Database oldDb;
   private final Database newDb;
+  private final boolean isReplicated;
 
-  public AlterDatabaseEvent(Database oldDb, Database newDb, boolean status, IHMSHandler handler) {
+  public AlterDatabaseEvent(Database oldDb, Database newDb, boolean status, IHMSHandler handler,
+                            boolean isReplicated) {
     super(status, handler);
     this.oldDb = oldDb;
     this.newDb = newDb;
+    this.isReplicated = isReplicated;
   }
 
   /**
@@ -53,4 +56,9 @@ public class AlterDatabaseEvent extends ListenerEvent {
   public Database getNewDatabase() {
     return newDb;
   }
+
+  /**
+   * @return where this event is caused by replication
+   */
+  public boolean isReplicated() { return isReplicated; }
 }
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AlterTableEvent.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AlterTableEvent.java
index 541fbe4..111cb60 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AlterTableEvent.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AlterTableEvent.java
@@ -32,14 +32,16 @@ public class AlterTableEvent extends ListenerEvent {
   private final Table oldTable;
   private final boolean isTruncateOp;
   private Long writeId;
+  private final boolean isReplicated;
 
-  public AlterTableEvent (Table oldTable, Table newTable, boolean isTruncateOp, boolean status,
-                          Long writeId, IHMSHandler handler) {
+  public AlterTableEvent(Table oldTable, Table newTable, boolean isTruncateOp, boolean status,
+                         Long writeId, IHMSHandler handler, boolean isReplicated) {
     super (status, handler);
     this.oldTable = oldTable;
     this.newTable = newTable;
     this.isTruncateOp = isTruncateOp;
     this.writeId = writeId;
+    this.isReplicated = isReplicated;
   }
 
   /**
@@ -66,4 +68,6 @@ public class AlterTableEvent extends ListenerEvent {
   public Long getWriteId() {
     return writeId;
   }
+
+  public boolean isReplicated() { return isReplicated; }
 }
\ No newline at end of file
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/CreateDatabaseEvent.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/CreateDatabaseEvent.java
index e2c3ee3..e1b2202 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/CreateDatabaseEvent.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/CreateDatabaseEvent.java
@@ -28,10 +28,12 @@ import org.apache.hadoop.hive.metastore.api.Database;
 public class CreateDatabaseEvent extends ListenerEvent {
 
   private final Database db;
+  private final boolean isReplicated;
 
-  public CreateDatabaseEvent (Database db, boolean status, IHMSHandler handler) {
+  public CreateDatabaseEvent(Database db, boolean status, IHMSHandler handler, boolean isReplicated) {
     super (status, handler);
     this.db = db;
+    this.isReplicated = isReplicated;
   }
 
   /**
@@ -40,4 +42,9 @@ public class CreateDatabaseEvent extends ListenerEvent {
   public Database getDatabase () {
     return db;
   }
+
+  /**
+   * @return where this event is caused by replication
+   */
+  public boolean isReplicated() { return isReplicated; }
 }
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/CreateTableEvent.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/CreateTableEvent.java
index 4f5e887..5f2c9f5 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/CreateTableEvent.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/CreateTableEvent.java
@@ -28,10 +28,12 @@ import org.apache.hadoop.hive.metastore.api.Table;
 public class CreateTableEvent extends ListenerEvent {
 
   private final Table table;
+  private final boolean isReplicated;
 
-  public CreateTableEvent (Table table, boolean status, IHMSHandler handler) {
+  public CreateTableEvent(Table table, boolean status, IHMSHandler handler, boolean isReplicated) {
     super (status, handler);
     this.table = table;
+    this.isReplicated = isReplicated;
   }
 
   /**
@@ -40,4 +42,11 @@ public class CreateTableEvent extends ListenerEvent {
   public Table getTable () {
     return table;
   }
+
+  /**
+   * @return whether this event was created by replication
+   */
+  public boolean isReplicated() { return isReplicated; }
 }
+
+
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/DropDatabaseEvent.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/DropDatabaseEvent.java
index 94fe264..2ce1063 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/DropDatabaseEvent.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/DropDatabaseEvent.java
@@ -28,10 +28,12 @@ import org.apache.hadoop.hive.metastore.api.Database;
 public class DropDatabaseEvent extends ListenerEvent {
 
   private final Database db;
+  private final boolean isReplicated;
 
-  public DropDatabaseEvent(Database db, boolean status, IHMSHandler handler) {
+  public DropDatabaseEvent(Database db, boolean status, IHMSHandler handler, boolean isReplicated) {
     super (status, handler);
     this.db = db;
+    this.isReplicated = isReplicated;
   }
 
   /**
@@ -40,4 +42,9 @@ public class DropDatabaseEvent extends ListenerEvent {
   public Database getDatabase() {
     return db;
   }
+
+  /**
+   * @return where this event is caused by replication
+   */
+  public boolean isReplicated() { return isReplicated; }
 }
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/DropTableEvent.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/DropTableEvent.java
index 9152232..35e367b 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/DropTableEvent.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/DropTableEvent.java
@@ -29,13 +29,16 @@ public class DropTableEvent extends ListenerEvent {
 
   private final Table table;
   private final boolean deleteData;
+  private final boolean isReplicated;
 
-  public DropTableEvent(Table table, boolean status, boolean deleteData, IHMSHandler handler) {
+  public DropTableEvent(Table table, boolean status, boolean deleteData, IHMSHandler handler,
+                        boolean isReplicated) {
     super(status, handler);
     this.table = table;
     // In HiveMetaStore, the deleteData flag indicates whether DFS data should be
     // removed on a drop.
     this.deleteData = deleteData;
+    this.isReplicated = isReplicated;
   }
 
   /**
@@ -51,4 +54,9 @@ public class DropTableEvent extends ListenerEvent {
   public boolean getDeleteData() {
     return deleteData;
   }
+
+  /**
+   * @return whether this event was created by replication
+   */
+  public boolean isReplicated() { return isReplicated; }
 }