You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sa...@apache.org on 2019/08/21 13:30:28 UTC
[hive] branch master updated: HIVE-22068: Return the last event id
dumped as repl status to avoid notification event missing error (Ashutosh
Bapat, reviewed by Sankar Hariappan)
This is an automated email from the ASF dual-hosted git repository.
sankarh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 7f7e286 HIVE-22068: Return the last event id dumped as repl status to avoid notification event missing error (Ashutosh Bapat, reviewed by Sankar Hariappan)
7f7e286 is described below
commit 7f7e28643dcd44e9143172c768b131b9ff9e76ba
Author: Ashutosh Bapat <ab...@cloudera.com>
AuthorDate: Wed Aug 21 19:00:01 2019 +0530
HIVE-22068: Return the last event id dumped as repl status to avoid notification event missing error (Ashutosh Bapat, reviewed by Sankar Hariappan)
Signed-off-by: Sankar Hariappan <sa...@apache.org>
---
...estReplicationScenariosAcidTablesBootstrap.java | 6 +-
.../TestReplicationScenariosExternalTables.java | 105 +++++++++++++++++++++
.../hadoop/hive/ql/parse/WarehouseInstance.java | 5 +
.../hadoop/hive/ql/exec/repl/ReplDumpTask.java | 5 +-
.../hadoop/hive/ql/exec/repl/ReplLoadTask.java | 39 ++++++++
.../incremental/IncrementalLoadTasksBuilder.java | 7 --
.../parse/repl/dump/log/IncrementalDumpLogger.java | 12 ++-
.../repl/dump/log/state/IncrementalDumpBegin.java | 15 ++-
.../apache/hadoop/hive/metastore/ObjectStore.java | 21 ++++-
9 files changed, 197 insertions(+), 18 deletions(-)
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTablesBootstrap.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTablesBootstrap.java
index f475b1e..9389f26 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTablesBootstrap.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTablesBootstrap.java
@@ -233,11 +233,7 @@ public class TestReplicationScenariosAcidTablesBootstrap
HiveConf replicaConf = replica.getConf();
LOG.info(testName.getMethodName() + ": loading incremental dump with ACID bootstrap.");
replica.load(replicatedDbName, incDump.dumpLocation);
- // During incremental dump with ACID bootstrap we do not dump ALLOC_WRITE_ID events. So the
- // two ALLOC_WRITE_ID events corresponding aborted transactions on t1 and t2 will not be
- // repliaced. Discount those.
- verifyIncLoad(replicatedDbName,
- (new Long(Long.valueOf(incDump.lastReplicationId) - 2)).toString());
+ verifyIncLoad(replicatedDbName, incDump.lastReplicationId);
// Verify if HWM is properly set after REPL LOAD
verifyNextId(tables, replicatedDbName, replicaConf);
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
index e1802ad..1815824 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
@@ -34,6 +34,9 @@ import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -47,6 +50,7 @@ import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
+import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.FILE_NAME;
import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME;
import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.REPL_CLEAN_TABLES_FROM_BOOTSTRAP_CONFIG;
@@ -57,6 +61,7 @@ import static org.junit.Assert.assertTrue;
public class TestReplicationScenariosExternalTables extends BaseReplicationAcrossInstances {
private static final String REPLICA_EXTERNAL_BASE = "/replica_external_base";
+ String extraPrimaryDb;
@BeforeClass
public static void classLevelSetup() throws Exception {
@@ -71,6 +76,18 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
internalBeforeClassSetup(overrides, TestReplicationScenarios.class);
}
+ @Before
+ public void setup() throws Throwable {
+ super.setup();
+ extraPrimaryDb = "extra_" + primaryDbName;
+ }
+
+ @After
+ public void tearDown() throws Throwable {
+ primary.run("drop database if exists " + extraPrimaryDb + " cascade");
+ super.tearDown();
+ }
+
@Test
public void replicationWithoutExternalTables() throws Throwable {
List<String> loadWithClause = externalTableBasePathWithClause();
@@ -750,6 +767,94 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
.verifyResults(Arrays.asList("1", "2"));
}
+ @Test
+ public void testIncrementalDumpEmptyDumpDirectory() throws Throwable {
+ List<String> loadWithClause = externalTableBasePathWithClause();
+ List<String> dumpWithClause = Collections.singletonList(
+ "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'"
+ );
+ WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+ .run("create external table t1 (id int)")
+ .run("insert into table t1 values (1)")
+ .run("insert into table t1 values (2)")
+ .dump(primaryDbName, null, dumpWithClause);
+
+ replica.load(replicatedDbName, tuple.dumpLocation)
+ .status(replicatedDbName)
+ .verifyResult(tuple.lastReplicationId);
+
+ // This looks like an empty dump but it has the ALTER TABLE event created by the previous
+ // dump. We need it here so that the next dump won't have any events.
+ WarehouseInstance.Tuple incTuple = primary.dump(primaryDbName, tuple.lastReplicationId, dumpWithClause);
+ replica.load(replicatedDbName, incTuple.dumpLocation, loadWithClause)
+ .status(replicatedDbName)
+ .verifyResult(incTuple.lastReplicationId);
+
+ // create events for some other database and then dump the primaryDbName to dump an empty directory.
+ primary.run("create database " + extraPrimaryDb + " WITH DBPROPERTIES ( '" +
+ SOURCE_OF_REPLICATION + "' = '1,2,3')");
+ WarehouseInstance.Tuple inc2Tuple = primary.run("use " + extraPrimaryDb)
+ .run("create table tbl (fld int)")
+ .run("use " + primaryDbName)
+ .dump(primaryDbName, incTuple.lastReplicationId, dumpWithClause);
+ Assert.assertEquals(primary.getCurrentNotificationEventId().getEventId(),
+ Long.valueOf(inc2Tuple.lastReplicationId).longValue());
+
+ // Incremental load to existing database with empty dump directory should set the repl id to the last event at src.
+ replica.load(replicatedDbName, inc2Tuple.dumpLocation, loadWithClause)
+ .status(replicatedDbName)
+ .verifyResult(inc2Tuple.lastReplicationId);
+ }
+
+ @Test
+ public void testExtTableBootstrapDuringIncrementalWithoutAnyEvents() throws Throwable {
+ List<String> loadWithClause = externalTableBasePathWithClause();
+ List<String> dumpWithClause = Collections.singletonList(
+ "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='false'"
+ );
+
+ WarehouseInstance.Tuple bootstrapDump = primary
+ .run("use " + primaryDbName)
+ .run("create external table t1 (id int)")
+ .run("insert into table t1 values (1)")
+ .run("create table t2 (id int)")
+ .run("insert into table t2 values (1)")
+ .dump(primaryDbName, null, dumpWithClause);
+
+ replica.load(replicatedDbName, bootstrapDump.dumpLocation, loadWithClause)
+ .status(replicatedDbName)
+ .verifyResult(bootstrapDump.lastReplicationId)
+ .run("use " + replicatedDbName)
+ .run("show tables like 't1'")
+ .verifyFailure(new String[] {"t1" })
+ .run("show tables like 't2'")
+ .verifyResult("t2")
+ .verifyReplTargetProperty(replicatedDbName);
+
+ // This looks like an empty dump but it has the ALTER TABLE event created by the previous
+ // dump. We need it here so that the next dump won't have any events.
+ WarehouseInstance.Tuple incTuple = primary.dump(primaryDbName, bootstrapDump.lastReplicationId);
+ replica.load(replicatedDbName, incTuple.dumpLocation)
+ .status(replicatedDbName)
+ .verifyResult(incTuple.lastReplicationId);
+
+ // Take a dump with external tables bootstrapped and load it
+ dumpWithClause = Arrays.asList("'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'",
+ "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='true'");
+ WarehouseInstance.Tuple inc2Tuple = primary.run("use " + primaryDbName)
+ .dump(primaryDbName, incTuple.lastReplicationId, dumpWithClause);
+
+ replica.load(replicatedDbName, inc2Tuple.dumpLocation, loadWithClause)
+ .status(replicatedDbName)
+ .verifyResult(inc2Tuple.lastReplicationId)
+ .run("use " + replicatedDbName)
+ .run("show tables like 't1'")
+ .verifyResult("t1")
+ .run("show tables like 't2'")
+ .verifyResult("t2")
+ .verifyReplTargetProperty(replicatedDbName);
+ }
+
private List<String> externalTableBasePathWithClause() throws IOException, SemanticException {
return ReplicationTestUtils.externalTableBasePathWithClause(REPLICA_EXTERNAL_BASE, replica);
}
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
index 2a2845d..91981e3 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.MetaStoreTestUtils;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.ForeignKeysRequest;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
@@ -606,6 +607,10 @@ public class WarehouseInstance implements Closeable {
}
}
+ CurrentNotificationEventId getCurrentNotificationEventId() throws Exception {
+ return client.getCurrentNotificationEventId();
+ }
+
List<Path> copyToHDFS(List<URI> localUris) throws IOException, SemanticException {
DistributedFileSystem fs = miniDFSCluster.getFileSystem();
Path destinationBasePath = new Path("/", String.valueOf(System.nanoTime()));
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index 1c1bd9a..32cb38a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -296,9 +296,10 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
String dbName = (null != work.dbNameOrPattern && !work.dbNameOrPattern.isEmpty())
? work.dbNameOrPattern
: "?";
+ int maxEventLimit = work.maxEventLimit();
replLogger = new IncrementalDumpLogger(dbName, dumpRoot.toString(),
- evFetcher.getDbNotificationEventsCount(work.eventFrom, dbName, work.eventTo,
- work.maxEventLimit()));
+ evFetcher.getDbNotificationEventsCount(work.eventFrom, dbName, work.eventTo, maxEventLimit),
+ work.eventFrom, work.eventTo, maxEventLimit);
replLogger.startLog();
while (evIter.hasNext()) {
NotificationEvent ev = evIter.next();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
index c1ebfd5..713b5f2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
@@ -28,6 +28,8 @@ import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.InvalidInputException;
import org.apache.hadoop.hive.ql.DriverContext;
import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.ddl.DDLWork;
+import org.apache.hadoop.hive.ql.ddl.database.AlterDatabaseSetPropertiesDesc;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.BootstrapEvent;
@@ -66,6 +68,7 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -517,6 +520,42 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
childTasks.add(builder.build(driverContext, getHive(), LOG, tracker));
}
+ // If there are no more events to be applied, add a task to update the last.repl.id of the
+ // target database to the event id of the last event considered by the dump. Next
+ // incremental cycle won't consider the events in this dump again if it starts from this id.
+ if (!builder.hasMoreWork() && !work.getPathsToCopyIterator().hasNext()) {
+ // The name of the database to be loaded into is either specified directly in REPL LOAD
+ // command i.e. when dbNameToLoadIn has a valid dbname or is available through dump
+ // metadata during table level replication.
+ String dbName = work.dbNameToLoadIn;
+ if (dbName == null || StringUtils.isBlank(dbName)) {
+ if (work.currentReplScope != null) {
+ String replScopeDbName = work.currentReplScope.getDbName();
+ if (replScopeDbName != null && !"*".equals(replScopeDbName)) {
+ dbName = replScopeDbName;
+ }
+ }
+ }
+
+ // If we are replicating to multiple databases at a time, it's not
+ // possible to know which all databases we are replicating into and hence we can not
+ // update repl id in all those databases.
+ if (StringUtils.isNotBlank(dbName)) {
+ String lastEventid = builder.eventTo().toString();
+ Map<String, String> mapProp = new HashMap<>();
+ mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), lastEventid);
+
+ AlterDatabaseSetPropertiesDesc alterDbDesc =
+ new AlterDatabaseSetPropertiesDesc(dbName, mapProp,
+ new ReplicationSpec(lastEventid, lastEventid));
+ Task<? extends Serializable> updateReplIdTask =
+ TaskFactory.get(new DDLWork(new HashSet<>(), new HashSet<>(), alterDbDesc), conf);
+
+ DAGTraversal.traverse(childTasks, new AddDependencyToLeaves(updateReplIdTask));
+ LOG.debug("Added task to set last repl id of db " + dbName + " to " + lastEventid);
+ }
+ }
+
// Either the incremental has more work or the external table file copy has more paths to process.
// Once all the incremental events are applied and external tables file copies are done, enable
// bootstrap of tables if exist.
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
index e83b9f8..6efec71 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
@@ -151,13 +151,6 @@ public class IncrementalLoadTasksBuilder {
}
if (!hasMoreWork()) {
- // if no events were replayed, then add a task to update the last repl id of the database/table to last event id.
- if (taskChainTail == evTaskRoot) {
- String lastEventid = eventTo.toString();
- taskChainTail = dbUpdateReplStateTask(dbName, lastEventid, taskChainTail);
- this.log.debug("no events to replay, set last repl id of db " + dbName + " to " + lastEventid);
- }
-
ReplRemoveFirstIncLoadPendFlagDesc desc = new ReplRemoveFirstIncLoadPendFlagDesc(dbName);
Task<? extends Serializable> updateIncPendTask = TaskFactory.get(new DDLWork(inputs, outputs, desc), conf);
taskChainTail.addDependentTask(updateIncPendTask);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/log/IncrementalDumpLogger.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/log/IncrementalDumpLogger.java
index 9b148f2..f5c0837 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/log/IncrementalDumpLogger.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/log/IncrementalDumpLogger.java
@@ -28,17 +28,25 @@ public class IncrementalDumpLogger extends ReplLogger {
private String dumpDir;
private long estimatedNumEvents;
private long eventSeqNo;
+ private Long fromEventId;
+ private Long toEventId;
+ private int maxEvents;
- public IncrementalDumpLogger(String dbName, String dumpDir, long estimatedNumEvents) {
+ public IncrementalDumpLogger(String dbName, String dumpDir, long estimatedNumEvents,
+ Long fromEventId, Long toEventId, int maxEvents) {
this.dbName = dbName;
this.dumpDir = dumpDir;
this.estimatedNumEvents = estimatedNumEvents;
this.eventSeqNo = 0;
+ this.fromEventId = fromEventId;
+ this.toEventId = toEventId;
+ this.maxEvents = maxEvents;
}
@Override
public void startLog() {
- (new IncrementalDumpBegin(dbName, estimatedNumEvents)).log(LogTag.START);
+ (new IncrementalDumpBegin(dbName, estimatedNumEvents, fromEventId, toEventId,
+ Long.valueOf(maxEvents))).log(LogTag.START);
}
@Override
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/log/state/IncrementalDumpBegin.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/log/state/IncrementalDumpBegin.java
index 2e02527..7c0afc3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/log/state/IncrementalDumpBegin.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/log/state/IncrementalDumpBegin.java
@@ -34,10 +34,23 @@ public class IncrementalDumpBegin extends ReplState {
@JsonProperty
private Long dumpStartTime;
- public IncrementalDumpBegin(String dbName, long estimatedNumEvents) {
+ @JsonProperty
+ private Long fromEventId;
+
+ @JsonProperty
+ private Long toEventId;
+
+ @JsonProperty
+ private Long maxEvents;
+
+ public IncrementalDumpBegin(String dbName, long estimatedNumEvents, Long fromEventId,
+ Long toEventId, Long maxEvents) {
this.dbName = dbName;
this.dumpType = DumpType.INCREMENTAL;
this.estimatedNumEvents = estimatedNumEvents;
this.dumpStartTime = System.currentTimeMillis() / 1000;
+ this.fromEventId = fromEventId;
+ this.toEventId = toEventId;
+ this.maxEvents = maxEvents;
}
}
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
index 1151c22..f04553f 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
@@ -10277,8 +10277,27 @@ public class ObjectStore implements RawStore, Configurable {
int max_events = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.EVENT_CLEAN_MAX_EVENTS);
max_events = max_events > 0 ? max_events : Integer.MAX_VALUE;
query.setRange(0, max_events);
+ query.setOrdering("eventId ascending");
- Collection<MNotificationLog> toBeRemoved = (Collection) query.execute(tooOld);
+ List<MNotificationLog> toBeRemoved = (List) query.execute(tooOld);
+ if (toBeRemoved == null || toBeRemoved.size() == 0) {
+ LOG.info("No events found to be cleaned with eventTime < {}.", tooOld);
+ } else {
+ NotificationEvent firstEvent = translateDbToThrift(toBeRemoved.get(0));
+ long minEventId = firstEvent.getEventId();
+ long minEventTime = firstEvent.getEventTime();
+ long maxEventId = minEventId;
+ long maxEventTime = minEventTime;
+ if (toBeRemoved.size() > 1) {
+ NotificationEvent lastEvent =
+ translateDbToThrift(toBeRemoved.get(toBeRemoved.size() - 1));
+ maxEventId = lastEvent.getEventId();
+ maxEventTime = lastEvent.getEventTime();
+ }
+ LOG.info("Cleaned {} events with eventTime < {}, minimum eventId {} (with eventTime {}) " +
+ "and maximum eventId {} (with eventTime {})",
+ toBeRemoved.size(), tooOld, minEventId, minEventTime, maxEventId, maxEventTime);
+ }
if (CollectionUtils.isNotEmpty(toBeRemoved)) {
pm.deletePersistentAll(toBeRemoved);
}