You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sa...@apache.org on 2019/08/21 13:30:28 UTC

[hive] branch master updated: HIVE-22068: Return the last event id dumped as repl status to avoid notification event missing error (Ashutosh Bapat, reviewed by Sankar Hariappan)

This is an automated email from the ASF dual-hosted git repository.

sankarh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 7f7e286  HIVE-22068: Return the last event id dumped as repl status to avoid notification event missing error (Ashutosh Bapat, reviewed by Sankar Hariappan)
7f7e286 is described below

commit 7f7e28643dcd44e9143172c768b131b9ff9e76ba
Author: Ashutosh Bapat <ab...@cloudera.com>
AuthorDate: Wed Aug 21 19:00:01 2019 +0530

    HIVE-22068: Return the last event id dumped as repl status to avoid notification event missing error (Ashutosh Bapat, reviewed by Sankar Hariappan)
    
    Signed-off-by: Sankar Hariappan <sa...@apache.org>
---
 ...estReplicationScenariosAcidTablesBootstrap.java |   6 +-
 .../TestReplicationScenariosExternalTables.java    | 105 +++++++++++++++++++++
 .../hadoop/hive/ql/parse/WarehouseInstance.java    |   5 +
 .../hadoop/hive/ql/exec/repl/ReplDumpTask.java     |   5 +-
 .../hadoop/hive/ql/exec/repl/ReplLoadTask.java     |  39 ++++++++
 .../incremental/IncrementalLoadTasksBuilder.java   |   7 --
 .../parse/repl/dump/log/IncrementalDumpLogger.java |  12 ++-
 .../repl/dump/log/state/IncrementalDumpBegin.java  |  15 ++-
 .../apache/hadoop/hive/metastore/ObjectStore.java  |  21 ++++-
 9 files changed, 197 insertions(+), 18 deletions(-)

diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTablesBootstrap.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTablesBootstrap.java
index f475b1e..9389f26 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTablesBootstrap.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTablesBootstrap.java
@@ -233,11 +233,7 @@ public class TestReplicationScenariosAcidTablesBootstrap
     HiveConf replicaConf = replica.getConf();
     LOG.info(testName.getMethodName() + ": loading incremental dump with ACID bootstrap.");
     replica.load(replicatedDbName, incDump.dumpLocation);
-    // During incremental dump with ACID bootstrap we do not dump ALLOC_WRITE_ID events. So the
-    // two ALLOC_WRITE_ID events corresponding aborted transactions on t1 and t2 will not be
-    // repliaced. Discount those.
-    verifyIncLoad(replicatedDbName,
-            (new Long(Long.valueOf(incDump.lastReplicationId) - 2)).toString());
+    verifyIncLoad(replicatedDbName, incDump.lastReplicationId);
     // Verify if HWM is properly set after REPL LOAD
     verifyNextId(tables, replicatedDbName, replicaConf);
 
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
index e1802ad..1815824 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
@@ -34,6 +34,9 @@ import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -47,6 +50,7 @@ import java.util.Set;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 
+import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
 import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.FILE_NAME;
 import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME;
 import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.REPL_CLEAN_TABLES_FROM_BOOTSTRAP_CONFIG;
@@ -57,6 +61,7 @@ import static org.junit.Assert.assertTrue;
 public class TestReplicationScenariosExternalTables extends BaseReplicationAcrossInstances {
 
   private static final String REPLICA_EXTERNAL_BASE = "/replica_external_base";
+  String extraPrimaryDb;
 
   @BeforeClass
   public static void classLevelSetup() throws Exception {
@@ -71,6 +76,18 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
     internalBeforeClassSetup(overrides, TestReplicationScenarios.class);
   }
 
+  @Before
+  public void setup() throws Throwable {
+    super.setup();
+    extraPrimaryDb = "extra_" + primaryDbName;
+  }
+
+  @After
+  public void tearDown() throws Throwable {
+    primary.run("drop database if exists " + extraPrimaryDb + " cascade");
+    super.tearDown();
+  }
+
   @Test
   public void replicationWithoutExternalTables() throws Throwable {
     List<String> loadWithClause = externalTableBasePathWithClause();
@@ -750,6 +767,94 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
             .verifyResults(Arrays.asList("1", "2"));
   }
 
+  @Test
+  public void testIncrementalDumpEmptyDumpDirectory() throws Throwable {
+    List<String> loadWithClause = externalTableBasePathWithClause();
+    List<String> dumpWithClause = Collections.singletonList(
+            "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'"
+    );
+    WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+            .run("create external table t1 (id int)")
+            .run("insert into table t1 values (1)")
+            .run("insert into table t1 values (2)")
+            .dump(primaryDbName, null, dumpWithClause);
+
+    replica.load(replicatedDbName, tuple.dumpLocation)
+            .status(replicatedDbName)
+            .verifyResult(tuple.lastReplicationId);
+
+    // This looks like an empty dump but it has the ALTER TABLE event created by the previous
+    // dump. We need it here so that the next dump won't have any events.
+    WarehouseInstance.Tuple incTuple = primary.dump(primaryDbName, tuple.lastReplicationId, dumpWithClause);
+    replica.load(replicatedDbName, incTuple.dumpLocation, loadWithClause)
+            .status(replicatedDbName)
+            .verifyResult(incTuple.lastReplicationId);
+
+    // create events for some other database and then dump the primaryDbName to dump an empty directory.
+    primary.run("create database " + extraPrimaryDb + " WITH DBPROPERTIES ( '" +
+            SOURCE_OF_REPLICATION + "' = '1,2,3')");
+    WarehouseInstance.Tuple inc2Tuple = primary.run("use " + extraPrimaryDb)
+            .run("create table tbl (fld int)")
+            .run("use " + primaryDbName)
+            .dump(primaryDbName, incTuple.lastReplicationId, dumpWithClause);
+    Assert.assertEquals(primary.getCurrentNotificationEventId().getEventId(),
+                        Long.valueOf(inc2Tuple.lastReplicationId).longValue());
+
+    // Incremental load to existing database with empty dump directory should set the repl id to the last event at src.
+    replica.load(replicatedDbName, inc2Tuple.dumpLocation, loadWithClause)
+            .status(replicatedDbName)
+            .verifyResult(inc2Tuple.lastReplicationId);
+  }
+
+  @Test
+  public void testExtTableBootstrapDuringIncrementalWithoutAnyEvents() throws Throwable {
+    List<String> loadWithClause = externalTableBasePathWithClause();
+    List<String> dumpWithClause = Collections.singletonList(
+            "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='false'"
+    );
+
+    WarehouseInstance.Tuple bootstrapDump = primary
+            .run("use " + primaryDbName)
+            .run("create external table t1 (id int)")
+            .run("insert into table t1 values (1)")
+            .run("create table t2 (id int)")
+            .run("insert into table t2 values (1)")
+            .dump(primaryDbName, null, dumpWithClause);
+
+    replica.load(replicatedDbName, bootstrapDump.dumpLocation, loadWithClause)
+            .status(replicatedDbName)
+            .verifyResult(bootstrapDump.lastReplicationId)
+            .run("use " + replicatedDbName)
+            .run("show tables like 't1'")
+            .verifyFailure(new String[] {"t1" })
+            .run("show tables like 't2'")
+            .verifyResult("t2")
+            .verifyReplTargetProperty(replicatedDbName);
+
+    // This looks like an empty dump but it has the ALTER TABLE event created by the previous
+    // dump. We need it here so that the next dump won't have any events.
+    WarehouseInstance.Tuple incTuple = primary.dump(primaryDbName, bootstrapDump.lastReplicationId);
+    replica.load(replicatedDbName, incTuple.dumpLocation)
+            .status(replicatedDbName)
+            .verifyResult(incTuple.lastReplicationId);
+
+    // Take a dump with external tables bootstrapped and load it
+    dumpWithClause = Arrays.asList("'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'",
+            "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='true'");
+    WarehouseInstance.Tuple inc2Tuple = primary.run("use " + primaryDbName)
+            .dump(primaryDbName, incTuple.lastReplicationId, dumpWithClause);
+
+    replica.load(replicatedDbName, inc2Tuple.dumpLocation, loadWithClause)
+            .status(replicatedDbName)
+            .verifyResult(inc2Tuple.lastReplicationId)
+            .run("use " + replicatedDbName)
+            .run("show tables like 't1'")
+            .verifyResult("t1")
+            .run("show tables like 't2'")
+            .verifyResult("t2")
+            .verifyReplTargetProperty(replicatedDbName);
+  }
+
   private List<String> externalTableBasePathWithClause() throws IOException, SemanticException {
     return ReplicationTestUtils.externalTableBasePathWithClause(REPLICA_EXTERNAL_BASE, replica);
   }
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
index 2a2845d..91981e3 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.MetaStoreTestUtils;
 import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.ForeignKeysRequest;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
@@ -606,6 +607,10 @@ public class WarehouseInstance implements Closeable {
     }
   }
 
+  CurrentNotificationEventId getCurrentNotificationEventId() throws Exception {
+    return client.getCurrentNotificationEventId();
+  }
+
   List<Path> copyToHDFS(List<URI> localUris) throws IOException, SemanticException {
     DistributedFileSystem fs = miniDFSCluster.getFileSystem();
     Path destinationBasePath = new Path("/", String.valueOf(System.nanoTime()));
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index 1c1bd9a..32cb38a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -296,9 +296,10 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
     String dbName = (null != work.dbNameOrPattern && !work.dbNameOrPattern.isEmpty())
         ? work.dbNameOrPattern
         : "?";
+    int maxEventLimit = work.maxEventLimit();
     replLogger = new IncrementalDumpLogger(dbName, dumpRoot.toString(),
-            evFetcher.getDbNotificationEventsCount(work.eventFrom, dbName, work.eventTo,
-                    work.maxEventLimit()));
+            evFetcher.getDbNotificationEventsCount(work.eventFrom, dbName, work.eventTo, maxEventLimit),
+            work.eventFrom, work.eventTo, maxEventLimit);
     replLogger.startLog();
     while (evIter.hasNext()) {
       NotificationEvent ev = evIter.next();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
index c1ebfd5..713b5f2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
@@ -28,6 +28,8 @@ import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.InvalidInputException;
 import org.apache.hadoop.hive.ql.DriverContext;
 import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.ddl.DDLWork;
+import org.apache.hadoop.hive.ql.ddl.database.AlterDatabaseSetPropertiesDesc;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.BootstrapEvent;
@@ -66,6 +68,7 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 
@@ -517,6 +520,42 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
         childTasks.add(builder.build(driverContext, getHive(), LOG, tracker));
       }
 
+      // If there are no more events to be applied, add a task to update the last.repl.id of the
+      // target database to the event id of the last event considered by the dump. Next
+      // incremental cycle won't consider the events in this dump again if it starts from this id.
+      if (!builder.hasMoreWork() && !work.getPathsToCopyIterator().hasNext()) {
+        // The name of the database to be loaded into is either specified directly in REPL LOAD
+        // command i.e. when dbNameToLoadIn has a valid dbname or is available through dump
+        // metadata during table level replication.
+        String dbName = work.dbNameToLoadIn;
+        if (dbName == null || StringUtils.isBlank(dbName)) {
+          if (work.currentReplScope != null) {
+            String replScopeDbName = work.currentReplScope.getDbName();
+            if (replScopeDbName != null && !"*".equals(replScopeDbName)) {
+              dbName = replScopeDbName;
+            }
+          }
+        }
+
+        // If we are replicating to multiple databases at a time, it's not
+        // possible to know which all databases we are replicating into and hence we can not
+        // update repl id in all those databases.
+        if (StringUtils.isNotBlank(dbName)) {
+          String lastEventid = builder.eventTo().toString();
+          Map<String, String> mapProp = new HashMap<>();
+          mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), lastEventid);
+
+          AlterDatabaseSetPropertiesDesc alterDbDesc =
+                  new AlterDatabaseSetPropertiesDesc(dbName, mapProp,
+                          new ReplicationSpec(lastEventid, lastEventid));
+          Task<? extends Serializable> updateReplIdTask =
+                  TaskFactory.get(new DDLWork(new HashSet<>(), new HashSet<>(), alterDbDesc), conf);
+
+          DAGTraversal.traverse(childTasks, new AddDependencyToLeaves(updateReplIdTask));
+          LOG.debug("Added task to set last repl id of db " + dbName + " to " + lastEventid);
+        }
+      }
+
       // Either the incremental has more work or the external table file copy has more paths to process.
       // Once all the incremental events are applied and external tables file copies are done, enable
       // bootstrap of tables if exist.
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
index e83b9f8..6efec71 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
@@ -151,13 +151,6 @@ public class IncrementalLoadTasksBuilder {
     }
 
     if (!hasMoreWork()) {
-      // if no events were replayed, then add a task to update the last repl id of the database/table to last event id.
-      if (taskChainTail == evTaskRoot) {
-        String lastEventid = eventTo.toString();
-        taskChainTail = dbUpdateReplStateTask(dbName, lastEventid, taskChainTail);
-        this.log.debug("no events to replay, set last repl id of db  " + dbName + " to " + lastEventid);
-      }
-
       ReplRemoveFirstIncLoadPendFlagDesc desc = new ReplRemoveFirstIncLoadPendFlagDesc(dbName);
       Task<? extends Serializable> updateIncPendTask = TaskFactory.get(new DDLWork(inputs, outputs, desc), conf);
       taskChainTail.addDependentTask(updateIncPendTask);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/log/IncrementalDumpLogger.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/log/IncrementalDumpLogger.java
index 9b148f2..f5c0837 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/log/IncrementalDumpLogger.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/log/IncrementalDumpLogger.java
@@ -28,17 +28,25 @@ public class IncrementalDumpLogger extends ReplLogger {
   private String dumpDir;
   private long estimatedNumEvents;
   private long eventSeqNo;
+  private Long fromEventId;
+  private Long toEventId;
+  private int maxEvents;
 
-  public IncrementalDumpLogger(String dbName, String dumpDir, long estimatedNumEvents) {
+  public IncrementalDumpLogger(String dbName, String dumpDir, long estimatedNumEvents,
+                               Long fromEventId, Long toEventId, int maxEvents) {
     this.dbName = dbName;
     this.dumpDir = dumpDir;
     this.estimatedNumEvents = estimatedNumEvents;
     this.eventSeqNo = 0;
+    this.fromEventId = fromEventId;
+    this.toEventId = toEventId;
+    this.maxEvents = maxEvents;
   }
 
   @Override
   public void startLog() {
-    (new IncrementalDumpBegin(dbName, estimatedNumEvents)).log(LogTag.START);
+    (new IncrementalDumpBegin(dbName, estimatedNumEvents, fromEventId, toEventId,
+            Long.valueOf(maxEvents))).log(LogTag.START);
   }
 
   @Override
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/log/state/IncrementalDumpBegin.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/log/state/IncrementalDumpBegin.java
index 2e02527..7c0afc3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/log/state/IncrementalDumpBegin.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/log/state/IncrementalDumpBegin.java
@@ -34,10 +34,23 @@ public class IncrementalDumpBegin extends ReplState {
   @JsonProperty
   private Long dumpStartTime;
 
-  public IncrementalDumpBegin(String dbName, long estimatedNumEvents) {
+  @JsonProperty
+  private Long fromEventId;
+
+  @JsonProperty
+  private Long toEventId;
+
+  @JsonProperty
+  private Long maxEvents;
+
+  public IncrementalDumpBegin(String dbName, long estimatedNumEvents, Long fromEventId,
+                              Long toEventId, Long maxEvents) {
     this.dbName = dbName;
     this.dumpType = DumpType.INCREMENTAL;
     this.estimatedNumEvents = estimatedNumEvents;
     this.dumpStartTime = System.currentTimeMillis() / 1000;
+    this.fromEventId = fromEventId;
+    this.toEventId = toEventId;
+    this.maxEvents = maxEvents;
   }
 }
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
index 1151c22..f04553f 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
@@ -10277,8 +10277,27 @@ public class ObjectStore implements RawStore, Configurable {
       int max_events = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.EVENT_CLEAN_MAX_EVENTS);
       max_events = max_events > 0 ? max_events : Integer.MAX_VALUE;
       query.setRange(0, max_events);
+      query.setOrdering("eventId ascending");
 
-      Collection<MNotificationLog> toBeRemoved = (Collection) query.execute(tooOld);
+      List<MNotificationLog> toBeRemoved = (List) query.execute(tooOld);
+      if (toBeRemoved == null || toBeRemoved.size() == 0) {
+        LOG.info("No events found to be cleaned with eventTime < {}.", tooOld);
+      } else {
+        NotificationEvent firstEvent = translateDbToThrift(toBeRemoved.get(0));
+        long minEventId = firstEvent.getEventId();
+        long minEventTime = firstEvent.getEventTime();
+        long maxEventId = minEventId;
+        long maxEventTime = minEventTime;
+        if (toBeRemoved.size() > 1) {
+          NotificationEvent lastEvent =
+                  translateDbToThrift(toBeRemoved.get(toBeRemoved.size() - 1));
+          maxEventId = lastEvent.getEventId();
+          maxEventTime = lastEvent.getEventTime();
+        }
+        LOG.info("Cleaned {} events with eventTime < {}, minimum eventId {} (with eventTime {}) " +
+                        "and maximum eventId {} (with eventTime {})",
+                toBeRemoved.size(), tooOld, minEventId, minEventTime, maxEventId, maxEventTime);
+      }
       if (CollectionUtils.isNotEmpty(toBeRemoved)) {
         pm.deletePersistentAll(toBeRemoved);
       }