You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ma...@apache.org on 2019/07/30 03:46:13 UTC
[hive] branch master updated: HIVE-22036 : HMS should identify
events corresponding to replicated database for Atlas HMS hook. (Ashutosh
Bapat reviewed by Mahesh Kumar Behera)
This is an automated email from the ASF dual-hosted git repository.
mahesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new ba26fcf HIVE-22036 : HMS should identify events corresponding to replicated database for Atlas HMS hook. (Ashutosh Bapat reviewed by Mahesh Kumar Behera)
ba26fcf is described below
commit ba26fcfdd6f27c148d8f4ff2e2c4001530cb039b
Author: Ashutosh Bapat <ab...@cloudera.com>
AuthorDate: Tue Jul 30 09:13:38 2019 +0530
HIVE-22036 : HMS should identify events corresponding to replicated database for Atlas HMS hook. (Ashutosh Bapat reviewed by Mahesh Kumar Behera)
Signed-off-by: Mahesh Kumar Behera <ma...@apache.org>
---
.../parse/ReplMetaStoreEventListenerTestImpl.java | 155 ++++++++++++++++
.../ql/parse/TestMetaStoreEventListenerInRepl.java | 199 +++++++++++++++++++++
.../hadoop/hive/ql/exec/repl/util/ReplUtils.java | 2 +-
.../apache/hadoop/hive/common/repl/ReplConst.java | 7 +
.../hadoop/hive/metastore/HiveAlterHandler.java | 10 +-
.../hadoop/hive/metastore/HiveMetaStore.java | 51 ++++--
.../hive/metastore/events/AlterDatabaseEvent.java | 10 +-
.../hive/metastore/events/AlterTableEvent.java | 8 +-
.../hive/metastore/events/CreateDatabaseEvent.java | 9 +-
.../hive/metastore/events/CreateTableEvent.java | 11 +-
.../hive/metastore/events/DropDatabaseEvent.java | 9 +-
.../hive/metastore/events/DropTableEvent.java | 10 +-
12 files changed, 459 insertions(+), 22 deletions(-)
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/ReplMetaStoreEventListenerTestImpl.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/ReplMetaStoreEventListenerTestImpl.java
new file mode 100644
index 0000000..31fdb77
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/ReplMetaStoreEventListenerTestImpl.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.MetaStoreEventListener;
+import org.apache.hadoop.hive.metastore.events.AlterDatabaseEvent;
+import org.apache.hadoop.hive.metastore.events.AlterTableEvent;
+import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent;
+import org.apache.hadoop.hive.metastore.events.CreateTableEvent;
+import org.apache.hadoop.hive.metastore.events.DropTableEvent;
+import org.apache.hadoop.hive.metastore.events.ListenerEvent;
+import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * ReplMetaStoreEventListenerTestImpl - Implementation of MetaStoreEventListener to test
+ * isReplicated flag in some of the tests.
+ */
+public class ReplMetaStoreEventListenerTestImpl extends MetaStoreEventListener {
+ protected static final Logger LOG =
+ LoggerFactory.getLogger(ReplMetaStoreEventListenerTestImpl.class);
+ static Map<String, Set<String>> replicatedDbsForEvents = new HashMap<>();
+ static Map<String, Set<String>> nonReplicatedDbsForEvents = new HashMap<>();
+ static Map<String, Set<String>> replicatedTablesForEvents = new HashMap<>();
+
+ public ReplMetaStoreEventListenerTestImpl(Configuration conf) {
+ super(conf);
+ }
+
+ private void addNameToEventMap(Map<String, Set<String>> eventMap, String name,
+ ListenerEvent event) {
+ String eventType = event.getClass().getName();
+ Set<String> eventNames = eventMap.get(eventType);
+ if (eventNames == null) {
+ eventNames = new HashSet<>();
+ eventMap.put(eventType, eventNames);
+ }
+ eventNames.add(name.toLowerCase());
+ }
+
+ @Override
+ public void onCreateDatabase(CreateDatabaseEvent createDatabaseEvent) {
+ String dbName = createDatabaseEvent.getDatabase().getName().toLowerCase();
+ if (createDatabaseEvent.isReplicated()) {
+ addNameToEventMap(replicatedDbsForEvents, dbName, createDatabaseEvent);
+ } else {
+ addNameToEventMap(nonReplicatedDbsForEvents, dbName, createDatabaseEvent);
+ }
+ }
+
+ @Override
+ public void onAlterDatabase(AlterDatabaseEvent alterDatabaseEvent) {
+ // The test doesn't create any database rename events, so it's fine to just check the new
+ // database name.
+ String dbName = alterDatabaseEvent.getNewDatabase().getName().toLowerCase();
+ if (alterDatabaseEvent.isReplicated()) {
+ addNameToEventMap(replicatedDbsForEvents, dbName, alterDatabaseEvent);
+ } else {
+ addNameToEventMap(nonReplicatedDbsForEvents, dbName, alterDatabaseEvent);
+ }
+ }
+
+ @Override
+ public void onCreateTable(CreateTableEvent createTableEvent) {
+ String dbName = createTableEvent.getTable().getDbName();
+ String tblName = createTableEvent.getTable().getTableName();
+ if (createTableEvent.isReplicated()) {
+ addNameToEventMap(replicatedDbsForEvents, dbName, createTableEvent);
+ addNameToEventMap(replicatedTablesForEvents, tblName, createTableEvent);
+ } else {
+ addNameToEventMap(nonReplicatedDbsForEvents, dbName, createTableEvent);
+ }
+ }
+
+ @Override
+ public void onAlterTable(AlterTableEvent alterTableEvent) {
+ // Test doesn't have table rename events, since we are only interested in checking replication
+ // status. So, it's fine to get the names from the new table.
+ String dbName = alterTableEvent.getNewTable().getDbName();
+ String tblName = alterTableEvent.getNewTable().getTableName();
+ if (alterTableEvent.isReplicated()) {
+ addNameToEventMap(replicatedDbsForEvents, dbName, alterTableEvent);
+ addNameToEventMap(replicatedTablesForEvents, tblName, alterTableEvent);
+ } else {
+ addNameToEventMap(nonReplicatedDbsForEvents, dbName, alterTableEvent);
+ }
+ }
+
+ @Override
+ public void onDropTable(DropTableEvent dropTableEvent) {
+ String dbName = dropTableEvent.getTable().getDbName();
+ String tblName = dropTableEvent.getTable().getTableName();
+ if (dropTableEvent.isReplicated()) {
+ addNameToEventMap(replicatedDbsForEvents, dbName, dropTableEvent);
+ addNameToEventMap(replicatedTablesForEvents, tblName, dropTableEvent);
+ } else {
+ addNameToEventMap(nonReplicatedDbsForEvents, dbName, dropTableEvent);
+ }
+
+ }
+
+ static void checkEventSanity(Map<String, Set<String>> eventsMap, String replicaDbName) {
+ replicaDbName = replicaDbName.toLowerCase();
+ for (String event : eventsMap.keySet()) {
+ Set<String> dbsForEvent = replicatedDbsForEvents.get(event);
+ LOG.info("Examining dbs and tables for event " + event);
+ // isreplicated should be true only for replicated database
+ Assert.assertTrue(dbsForEvent.contains(replicaDbName));
+ Assert.assertEquals(1, dbsForEvent.size());
+ if (nonReplicatedDbsForEvents.get(event) != null) {
+ Assert.assertFalse(nonReplicatedDbsForEvents.get(event).contains(replicaDbName));
+ }
+
+ Set<String> eventTables = replicatedTablesForEvents.get(event);
+ Assert.assertEquals(eventsMap.get(event), eventTables);
+ }
+ }
+
+ static void clearSanityMap(Map<String, Set<String>> map) {
+ for (Set<String> eventEntry : map.values()) {
+ if (eventEntry != null) {
+ eventEntry.clear();
+ }
+ }
+ map.clear();
+ }
+
+ static void clearSanityData() {
+ clearSanityMap(replicatedDbsForEvents);
+ clearSanityMap(nonReplicatedDbsForEvents);
+ clearSanityMap(replicatedTablesForEvents);
+ }
+}
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestMetaStoreEventListenerInRepl.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestMetaStoreEventListenerInRepl.java
new file mode 100644
index 0000000..7121dfb
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestMetaStoreEventListenerInRepl.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse;
+
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.events.AlterDatabaseEvent;
+import org.apache.hadoop.hive.metastore.events.AlterTableEvent;
+import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent;
+import org.apache.hadoop.hive.metastore.events.CreateTableEvent;
+import org.apache.hadoop.hive.metastore.events.DropTableEvent;
+import org.apache.hadoop.hive.shims.Utils;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.AfterClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
+
+/**
+ * TestMetaStoreEventListenerInRepl - Test metastore events created by replication.
+ */
+public class TestMetaStoreEventListenerInRepl {
+
+ @Rule
+ public final TestName testName = new TestName();
+
+ protected static final Logger LOG = LoggerFactory.getLogger(TestMetaStoreEventListenerInRepl.class);
+ static WarehouseInstance primary;
+ static WarehouseInstance replica;
+ static HiveConf conf;
+ String primaryDbName, replicatedDbName;
+
+ @BeforeClass
+ public static void internalBeforeClassSetup() throws Exception {
+ TestMetaStoreEventListenerInRepl.conf = new HiveConf(TestMetaStoreEventListenerInRepl.class);
+ TestMetaStoreEventListenerInRepl.conf.set("dfs.client.use.datanode.hostname", "true");
+ TestMetaStoreEventListenerInRepl.conf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*");
+ MiniDFSCluster miniDFSCluster =
+ new MiniDFSCluster.Builder(TestMetaStoreEventListenerInRepl.conf).numDataNodes(1).format(true).build();
+
+ Map<String, String> conf = new HashMap<String, String>() {{
+ put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString());
+ put("hive.support.concurrency", "true");
+ put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager");
+ put("hive.metastore.client.capability.check", "false");
+ put("hive.repl.bootstrap.dump.open.txn.timeout", "1s");
+ put("hive.exec.dynamic.partition.mode", "nonstrict");
+ put("hive.strict.checks.bucketing", "false");
+ put("hive.mapred.mode", "nonstrict");
+ put("mapred.input.dir.recursive", "true");
+ put("hive.metastore.disallow.incompatible.col.type.changes", "false");
+ put("hive.in.repl.test", "true");
+ put(MetastoreConf.ConfVars.EVENT_LISTENERS.getVarname(),
+ ReplMetaStoreEventListenerTestImpl.class.getName());
+ }};
+
+ primary = new WarehouseInstance(LOG, miniDFSCluster, conf);
+ replica = new WarehouseInstance(LOG, miniDFSCluster, conf);
+ }
+
+ @AfterClass
+ public static void classLevelTearDown() throws IOException {
+ primary.close();
+ replica.close();
+ }
+
+ @Before
+ public void setup() throws Throwable {
+ primaryDbName = testName.getMethodName() + "_" + +System.currentTimeMillis();
+ replicatedDbName = "replicated_" + primaryDbName;
+ primary.run("create database " + primaryDbName + " WITH DBPROPERTIES ( '" +
+ SOURCE_OF_REPLICATION + "' = '1,2,3')");
+ }
+
+ @After
+ public void tearDown() throws Throwable {
+ primary.run("drop database if exists " + primaryDbName + " cascade");
+ replica.run("drop database if exists " + replicatedDbName + " cascade");
+ }
+
+ private Map<String, Set<String>> prepareBootstrapData(String primaryDbName) throws Throwable {
+ primary.run("use " + primaryDbName)
+ .run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc " +
+ "tblproperties (\"transactional\"=\"true\")")
+ .run("insert into t1 values(1)")
+ .run("create table t2 (place string) partitioned by (country string) clustered by(place) " +
+ "into 3 buckets stored as orc tblproperties (\"transactional\"=\"true\")")
+ .run("insert into t2 partition(country='india') values ('bangalore')")
+ .run("create table t4 (id int)")
+ .run("insert into t4 values(111), (222)");
+
+ // Add expected events with associated tables, if any.
+ Map<String, Set<String>> eventsMap = new HashMap<>();
+ eventsMap.put(CreateDatabaseEvent.class.getName(), null);
+ // Replication causes many implicit alter database operations, so metastore will see some
+ // alter table events as well.
+ eventsMap.put(AlterDatabaseEvent.class.getName(), null);
+ eventsMap.put(CreateTableEvent.class.getName(), new HashSet<>(Arrays.asList("t1", "t2", "t4")));
+ eventsMap.put(AlterTableEvent.class.getName(), new HashSet<>(Arrays.asList("t1", "t2", "t4")));
+ return eventsMap;
+ }
+
+ Map<String, Set<String>> prepareIncData(String dbName) throws Throwable {
+ primary.run("use " + dbName)
+ .run("create table t6 stored as orc tblproperties (\"transactional\"=\"true\")" +
+ " as select * from t1")
+ .run("alter table t2 add columns (placetype string)")
+ .run("update t2 set placetype = 'city'")
+ .run("insert into t1 values (3)")
+ .run("drop table t2")
+ .run("alter database " + dbName + " set dbproperties('some.useless.property'='1')");
+
+ // Add expected events with associated tables, if any.
+ Map<String, Set<String>> eventsMap = new HashMap<>();
+ // Replication causes many implicit alter database operations, so metastore will see some
+ // alter table events as well.
+ eventsMap.put(AlterDatabaseEvent.class.getName(), null);
+ eventsMap.put(CreateTableEvent.class.getName(), new HashSet<>(Arrays.asList("t6")));
+ eventsMap.put(AlterTableEvent.class.getName(), new HashSet<>(Arrays.asList("t1", "t2", "t6")));
+ eventsMap.put(DropTableEvent.class.getName(), new HashSet<>(Arrays.asList("t2")));
+
+ return eventsMap;
+ }
+
+ Map<String, Set<String>> prepareInc2Data(String dbName) throws Throwable {
+ primary.run("use " + dbName)
+ .run("insert into t4 values (333)")
+ .run("create table t7 (str string)")
+ .run("insert into t7 values ('aaa')")
+ .run("drop table t1");
+ // Add expected events with associated tables, if any.
+ Map<String, Set<String>> eventsMap = new HashMap<>();
+ // Replication causes many implicit alter database operations, so metastore will see some
+ // alter table events as well.
+ eventsMap.put(AlterDatabaseEvent.class.getName(), null);
+ eventsMap.put(CreateTableEvent.class.getName(), new HashSet<>(Arrays.asList("t7")));
+ eventsMap.put(AlterTableEvent.class.getName(), new HashSet<>(Arrays.asList("t4", "t7")));
+ eventsMap.put(DropTableEvent.class.getName(), new HashSet<>(Arrays.asList("t1")));
+
+ return eventsMap;
+ }
+
+ @Test
+ public void testReplEvents() throws Throwable {
+ Map<String, Set<String>> eventsMap = prepareBootstrapData(primaryDbName);
+ WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName)
+ .dump(primaryDbName, null);
+ replica.load(replicatedDbName, bootstrapDump.dumpLocation);
+ ReplMetaStoreEventListenerTestImpl.checkEventSanity(eventsMap, replicatedDbName);
+ ReplMetaStoreEventListenerTestImpl.clearSanityData();
+
+ eventsMap = prepareIncData(primaryDbName);
+ LOG.info(testName.getMethodName() + ": first incremental dump and load.");
+ WarehouseInstance.Tuple incDump = primary.run("use " + primaryDbName)
+ .dump(primaryDbName, bootstrapDump.lastReplicationId);
+ replica.load(replicatedDbName, incDump.dumpLocation);
+ ReplMetaStoreEventListenerTestImpl.checkEventSanity(eventsMap, replicatedDbName);
+ ReplMetaStoreEventListenerTestImpl.clearSanityData();
+
+ // Second incremental, after bootstrap
+ eventsMap = prepareInc2Data(primaryDbName);
+ LOG.info(testName.getMethodName() + ": second incremental dump and load.");
+ WarehouseInstance.Tuple inc2Dump = primary.run("use " + primaryDbName)
+ .dump(primaryDbName, incDump.lastReplicationId);
+ replica.load(replicatedDbName, inc2Dump.dumpLocation);
+ ReplMetaStoreEventListenerTestImpl.checkEventSanity(eventsMap, replicatedDbName);
+ ReplMetaStoreEventListenerTestImpl.clearSanityData();
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
index 1df5077..23127c9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
@@ -64,7 +64,7 @@ import static org.apache.hadoop.hive.ql.util.HiveStrictManagedMigration.TableMig
public class ReplUtils {
public static final String LAST_REPL_ID_KEY = "hive.repl.last.repl.id";
- public static final String REPL_CHECKPOINT_KEY = "hive.repl.ckpt.key";
+ public static final String REPL_CHECKPOINT_KEY = ReplConst.REPL_TARGET_DB_PROPERTY;
public static final String REPL_FIRST_INC_PENDING_FLAG = "hive.repl.first.inc.pending";
// write id allocated in the current execution context which will be passed through config to be used by different
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplConst.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplConst.java
index c2c8e4b..7c29969 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplConst.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplConst.java
@@ -30,4 +30,11 @@ public class ReplConst {
public static final String REPL_DATA_LOCATION_CHANGED = "REPL_DATA_LOCATION_CHANGED";
public static final String TRUE = "true";
+
+ /**
+ * The constant string literal added as a property of database being replicated into. We choose
+ * this property over other properties is because this property is added right when the
+ * database is created as part of repl load and survives the incremental cycles.
+ */
+ public static final String REPL_TARGET_DB_PROPERTY = "hive.repl.ckpt.key";
}
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java
index 9756425..6234501 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java
@@ -103,6 +103,7 @@ public class HiveAlterHandler implements AlterHandler {
final boolean cascade;
final boolean replDataLocationChanged;
+ final boolean isReplicated;
if ((environmentContext != null) && environmentContext.isSetProperties()) {
cascade = StatsSetupConst.TRUE.equals(environmentContext.getProperties().get(StatsSetupConst.CASCADE));
replDataLocationChanged = ReplConst.TRUE.equals(environmentContext.getProperties().get(ReplConst.REPL_DATA_LOCATION_CHANGED));
@@ -173,6 +174,9 @@ public class HiveAlterHandler implements AlterHandler {
validateTableChangesOnReplSource(olddb, oldt, newt, environmentContext);
+ // On a replica this alter table will be executed only if old and new both the databases are
+ // available and being replicated into. Otherwise, it will be either create or drop of table.
+ isReplicated = HiveMetaStore.HMSHandler.isDbReplicationTarget(olddb);
if (oldt.getPartitionKeysSize() != 0) {
isPartitionedTable = true;
}
@@ -240,6 +244,7 @@ public class HiveAlterHandler implements AlterHandler {
// get new location
Database db = msdb.getDatabase(catName, newDbName);
+ assert(isReplicated == HiveMetaStore.HMSHandler.isDbReplicationTarget(db));
Path databasePath = constructRenamedPath(wh.getDatabasePath(db), srcPath);
destPath = new Path(databasePath, newTblName);
destFs = wh.getFs(destPath);
@@ -346,6 +351,7 @@ public class HiveAlterHandler implements AlterHandler {
if (MetaStoreServerUtils.requireCalStats(null, null, newt, environmentContext) &&
!isPartitionedTable) {
Database db = msdb.getDatabase(catName, newDbName);
+ assert(isReplicated == HiveMetaStore.HMSHandler.isDbReplicationTarget(db));
// Update table stats. For partitioned table, we update stats in alterPartition()
MetaStoreServerUtils.updateTableStatsSlow(db, newt, wh, false, true, environmentContext);
}
@@ -388,7 +394,7 @@ public class HiveAlterHandler implements AlterHandler {
txnAlterTableEventResponses = MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
EventMessage.EventType.ALTER_TABLE,
new AlterTableEvent(oldt, newt, false, true,
- newt.getWriteId(), handler),
+ newt.getWriteId(), handler, isReplicated),
environmentContext);
}
// commit the changes
@@ -452,7 +458,7 @@ public class HiveAlterHandler implements AlterHandler {
// make this call whether the event failed or succeeded. To make this behavior consistent,
// this call is made for failed events also.
MetaStoreListenerNotifier.notifyEvent(listeners, EventMessage.EventType.ALTER_TABLE,
- new AlterTableEvent(oldt, newt, false, success, newt.getWriteId(), handler),
+ new AlterTableEvent(oldt, newt, false, success, newt.getWriteId(), handler, isReplicated),
environmentContext, txnAlterTableEventResponses, msdb);
}
}
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index 7e97f8d..dcbe0c2 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -84,6 +84,7 @@ import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.common.ZooKeeperHiveHelper;
import org.apache.hadoop.hive.common.ZKDeRegisterWatcher;
+import org.apache.hadoop.hive.common.repl.ReplConst;
import org.apache.hadoop.hive.metastore.api.*;
import org.apache.hadoop.hive.metastore.events.AddForeignKeyEvent;
import org.apache.hadoop.hive.metastore.events.AcidWriteEvent;
@@ -1393,6 +1394,17 @@ public class HiveMetaStore extends ThriftHiveMetastore {
}
}
+ static boolean isDbReplicationTarget(Database db) {
+ if (db.getParameters() == null) {
+ return false;
+ }
+
+ if (!db.getParameters().containsKey(ReplConst.REPL_TARGET_DB_PROPERTY)) {
+ return false;
+ }
+
+ return !db.getParameters().get(ReplConst.REPL_TARGET_DB_PROPERTY).trim().isEmpty();
+ }
// Assumes that the catalog has already been set.
private void create_database_core(RawStore ms, final Database db)
@@ -1414,6 +1426,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
db.setCreateTime((int) time);
boolean success = false;
boolean madeDir = false;
+ boolean isReplicated = isDbReplicationTarget(db);
Map<String, String> transactionalListenersResponses = Collections.emptyMap();
try {
firePreEvent(new PreCreateDatabaseEvent(db, this));
@@ -1433,7 +1446,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
transactionalListenersResponses =
MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
EventType.CREATE_DATABASE,
- new CreateDatabaseEvent(db, true, this));
+ new CreateDatabaseEvent(db, true, this, isReplicated));
}
success = ms.commitTransaction();
@@ -1448,7 +1461,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
if (!listeners.isEmpty()) {
MetaStoreListenerNotifier.notifyEvent(listeners,
EventType.CREATE_DATABASE,
- new CreateDatabaseEvent(db, success, this),
+ new CreateDatabaseEvent(db, success, this, isReplicated),
null,
transactionalListenersResponses, ms);
}
@@ -1545,6 +1558,11 @@ public class HiveMetaStore extends ThriftHiveMetastore {
String[] parsedDbName = parseDbName(dbName, conf);
+ // We can replicate into an empty database, in which case newDB will have indication that
+ // it's target of replication but not oldDB. But replication flow will never alter a
+ // database so that oldDB indicates that it's target or replication but not the newDB. So,
+ // relying solely on newDB to check whether the database is target of replication works.
+ boolean isReplicated = isDbReplicationTarget(newDB);
try {
oldDB = get_database_core(parsedDbName[CAT_NAME], parsedDbName[DB_NAME]);
if (oldDB == null) {
@@ -1560,7 +1578,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
transactionalListenersResponses =
MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
EventType.ALTER_DATABASE,
- new AlterDatabaseEvent(oldDB, newDB, true, this));
+ new AlterDatabaseEvent(oldDB, newDB, true, this, isReplicated));
}
success = ms.commitTransaction();
@@ -1575,7 +1593,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
if ((null != oldDB) && (!listeners.isEmpty())) {
MetaStoreListenerNotifier.notifyEvent(listeners,
EventType.ALTER_DATABASE,
- new AlterDatabaseEvent(oldDB, newDB, success, this),
+ new AlterDatabaseEvent(oldDB, newDB, success, this, isReplicated),
null,
transactionalListenersResponses, ms);
}
@@ -1595,9 +1613,11 @@ public class HiveMetaStore extends ThriftHiveMetastore {
if (name == null) {
throw new MetaException("Database name cannot be null.");
}
+ boolean isReplicated = false;
try {
ms.openTransaction();
db = ms.getDatabase(catName, name);
+ isReplicated = isDbReplicationTarget(db);
if (!isInTest && ReplChangeManager.isSourceOfReplication(db)) {
throw new InvalidOperationException("can not drop a database which is a source of replication");
@@ -1728,7 +1748,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
transactionalListenerResponses =
MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
EventType.DROP_DATABASE,
- new DropDatabaseEvent(db, true, this));
+ new DropDatabaseEvent(db, true, this, isReplicated));
}
success = ms.commitTransaction();
@@ -1756,7 +1776,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
if (!listeners.isEmpty()) {
MetaStoreListenerNotifier.notifyEvent(listeners,
EventType.DROP_DATABASE,
- new DropDatabaseEvent(db, success, this),
+ new DropDatabaseEvent(db, success, this, isReplicated),
null,
transactionalListenerResponses, ms);
}
@@ -1985,6 +2005,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
Path tblPath = null;
boolean success = false, madeDir = false;
Database db = null;
+ boolean isReplicated = false;
try {
if (!tbl.isSetCatName()) {
tbl.setCatName(getDefaultCatalog(conf));
@@ -1994,6 +2015,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
ms.openTransaction();
db = ms.getDatabase(tbl.getCatName(), tbl.getDbName());
+ isReplicated = isDbReplicationTarget(db);
// get_table checks whether database exists, it should be moved here
if (is_table_exists(ms, tbl.getCatName(), tbl.getDbName(), tbl.getTableName())) {
@@ -2128,7 +2150,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
if (!transactionalListeners.isEmpty()) {
transactionalListenerResponses = MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
- EventType.CREATE_TABLE, new CreateTableEvent(tbl, true, this), envContext);
+ EventType.CREATE_TABLE, new CreateTableEvent(tbl, true, this, isReplicated), envContext);
if (primaryKeys != null && !primaryKeys.isEmpty()) {
MetaStoreListenerNotifier.notifyEvent(transactionalListeners, EventType.ADD_PRIMARYKEY,
new AddPrimaryKeyEvent(primaryKeys, true, this), envContext);
@@ -2158,7 +2180,8 @@ public class HiveMetaStore extends ThriftHiveMetastore {
if (!listeners.isEmpty()) {
MetaStoreListenerNotifier.notifyEvent(listeners, EventType.CREATE_TABLE,
- new CreateTableEvent(tbl, success, this), envContext, transactionalListenerResponses, ms);
+ new CreateTableEvent(tbl, success, this, isReplicated), envContext,
+ transactionalListenerResponses, ms);
if (primaryKeys != null && !primaryKeys.isEmpty()) {
MetaStoreListenerNotifier.notifyEvent(listeners, EventType.ADD_PRIMARYKEY,
new AddPrimaryKeyEvent(primaryKeys, success, this), envContext);
@@ -2643,9 +2666,11 @@ public class HiveMetaStore extends ThriftHiveMetastore {
boolean ifPurge = false;
Map<String, String> transactionalListenerResponses = Collections.emptyMap();
Database db = null;
+ boolean isReplicated = false;
try {
ms.openTransaction();
db = ms.getDatabase(catName, dbname);
+ isReplicated = isDbReplicationTarget(db);
// drop any partitions
tbl = get_table_core(catName, dbname, name);
@@ -2686,7 +2711,8 @@ public class HiveMetaStore extends ThriftHiveMetastore {
transactionalListenerResponses =
MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
EventType.DROP_TABLE,
- new DropTableEvent(tbl, true, deleteData, this),
+ new DropTableEvent(tbl, true, deleteData,
+ this, isReplicated),
envContext);
}
success = ms.commitTransaction();
@@ -2706,7 +2732,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
if (!listeners.isEmpty()) {
MetaStoreListenerNotifier.notifyEvent(listeners,
EventType.DROP_TABLE,
- new DropTableEvent(tbl, success, deleteData, this),
+ new DropTableEvent(tbl, success, deleteData, this, isReplicated),
envContext,
transactionalListenerResponses, ms);
}
@@ -2924,18 +2950,19 @@ public class HiveMetaStore extends ThriftHiveMetastore {
EnvironmentContext environmentContext = new EnvironmentContext();
updateStatsForTruncate(table.getParameters(), environmentContext);
+ boolean isReplicated = isDbReplicationTarget(ms.getDatabase(catName, dbName));
if (!transactionalListeners.isEmpty()) {
MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
EventType.ALTER_TABLE,
new AlterTableEvent(table, table, true, true,
- writeId, this));
+ writeId, this, isReplicated));
}
if (!listeners.isEmpty()) {
MetaStoreListenerNotifier.notifyEvent(listeners,
EventType.ALTER_TABLE,
new AlterTableEvent(table, table, true, true,
- writeId, this));
+ writeId, this, isReplicated));
}
// TODO: this should actually pass thru and set writeId for txn stats.
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AlterDatabaseEvent.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AlterDatabaseEvent.java
index 3f6eae9..fd14952 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AlterDatabaseEvent.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AlterDatabaseEvent.java
@@ -33,11 +33,14 @@ public class AlterDatabaseEvent extends ListenerEvent {
private final Database oldDb;
private final Database newDb;
+ private final boolean isReplicated;
- public AlterDatabaseEvent(Database oldDb, Database newDb, boolean status, IHMSHandler handler) {
+ public AlterDatabaseEvent(Database oldDb, Database newDb, boolean status, IHMSHandler handler,
+ boolean isReplicated) {
super(status, handler);
this.oldDb = oldDb;
this.newDb = newDb;
+ this.isReplicated = isReplicated;
}
/**
@@ -53,4 +56,9 @@ public class AlterDatabaseEvent extends ListenerEvent {
public Database getNewDatabase() {
return newDb;
}
+
+ /**
+ * @return where this event is caused by replication
+ */
+ public boolean isReplicated() { return isReplicated; }
}
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AlterTableEvent.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AlterTableEvent.java
index 541fbe4..111cb60 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AlterTableEvent.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AlterTableEvent.java
@@ -32,14 +32,16 @@ public class AlterTableEvent extends ListenerEvent {
private final Table oldTable;
private final boolean isTruncateOp;
private Long writeId;
+ private final boolean isReplicated;
- public AlterTableEvent (Table oldTable, Table newTable, boolean isTruncateOp, boolean status,
- Long writeId, IHMSHandler handler) {
+ public AlterTableEvent(Table oldTable, Table newTable, boolean isTruncateOp, boolean status,
+ Long writeId, IHMSHandler handler, boolean isReplicated) {
super (status, handler);
this.oldTable = oldTable;
this.newTable = newTable;
this.isTruncateOp = isTruncateOp;
this.writeId = writeId;
+ this.isReplicated = isReplicated;
}
/**
@@ -66,4 +68,6 @@ public class AlterTableEvent extends ListenerEvent {
public Long getWriteId() {
return writeId;
}
+
+ public boolean isReplicated() { return isReplicated; }
}
\ No newline at end of file
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/CreateDatabaseEvent.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/CreateDatabaseEvent.java
index e2c3ee3..e1b2202 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/CreateDatabaseEvent.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/CreateDatabaseEvent.java
@@ -28,10 +28,12 @@ import org.apache.hadoop.hive.metastore.api.Database;
public class CreateDatabaseEvent extends ListenerEvent {
private final Database db;
+ private final boolean isReplicated;
- public CreateDatabaseEvent (Database db, boolean status, IHMSHandler handler) {
+ public CreateDatabaseEvent(Database db, boolean status, IHMSHandler handler, boolean isReplicated) {
super (status, handler);
this.db = db;
+ this.isReplicated = isReplicated;
}
/**
@@ -40,4 +42,9 @@ public class CreateDatabaseEvent extends ListenerEvent {
public Database getDatabase () {
return db;
}
+
+ /**
+ * @return where this event is caused by replication
+ */
+ public boolean isReplicated() { return isReplicated; }
}
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/CreateTableEvent.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/CreateTableEvent.java
index 4f5e887..5f2c9f5 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/CreateTableEvent.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/CreateTableEvent.java
@@ -28,10 +28,12 @@ import org.apache.hadoop.hive.metastore.api.Table;
public class CreateTableEvent extends ListenerEvent {
private final Table table;
+ private final boolean isReplicated;
- public CreateTableEvent (Table table, boolean status, IHMSHandler handler) {
+ public CreateTableEvent(Table table, boolean status, IHMSHandler handler, boolean isReplicated) {
super (status, handler);
this.table = table;
+ this.isReplicated = isReplicated;
}
/**
@@ -40,4 +42,11 @@ public class CreateTableEvent extends ListenerEvent {
public Table getTable () {
return table;
}
+
+ /**
+ * @return whether this event was created by replication
+ */
+ public boolean isReplicated() { return isReplicated; }
}
+
+
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/DropDatabaseEvent.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/DropDatabaseEvent.java
index 94fe264..2ce1063 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/DropDatabaseEvent.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/DropDatabaseEvent.java
@@ -28,10 +28,12 @@ import org.apache.hadoop.hive.metastore.api.Database;
public class DropDatabaseEvent extends ListenerEvent {
private final Database db;
+ private final boolean isReplicated;
- public DropDatabaseEvent(Database db, boolean status, IHMSHandler handler) {
+ public DropDatabaseEvent(Database db, boolean status, IHMSHandler handler, boolean isReplicated) {
super (status, handler);
this.db = db;
+ this.isReplicated = isReplicated;
}
/**
@@ -40,4 +42,9 @@ public class DropDatabaseEvent extends ListenerEvent {
public Database getDatabase() {
return db;
}
+
+ /**
+ * @return where this event is caused by replication
+ */
+ public boolean isReplicated() { return isReplicated; }
}
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/DropTableEvent.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/DropTableEvent.java
index 9152232..35e367b 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/DropTableEvent.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/DropTableEvent.java
@@ -29,13 +29,16 @@ public class DropTableEvent extends ListenerEvent {
private final Table table;
private final boolean deleteData;
+ private final boolean isReplicated;
- public DropTableEvent(Table table, boolean status, boolean deleteData, IHMSHandler handler) {
+ public DropTableEvent(Table table, boolean status, boolean deleteData, IHMSHandler handler,
+ boolean isReplicated) {
super(status, handler);
this.table = table;
// In HiveMetaStore, the deleteData flag indicates whether DFS data should be
// removed on a drop.
this.deleteData = deleteData;
+ this.isReplicated = isReplicated;
}
/**
@@ -51,4 +54,9 @@ public class DropTableEvent extends ListenerEvent {
public boolean getDeleteData() {
return deleteData;
}
+
+ /**
+ * @return whether this event was created by replication
+ */
+ public boolean isReplicated() { return isReplicated; }
}