You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sa...@apache.org on 2019/03/09 05:45:19 UTC

[hive] branch master updated: HIVE-21286: Hive should support clean-up of previously bootstrapped tables when retry from different dump (Sankar Hariappan, reviewed by Ashutosh Bapat, Mahesh Kumar Behera)

This is an automated email from the ASF dual-hosted git repository.

sankarh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new fa62461  HIVE-21286: Hive should support clean-up of previously bootstrapped tables when retry from different dump (Sankar Hariappan, reviewed by Ashutosh Bapat, Mahesh Kumar Behera)
fa62461 is described below

commit fa62461c1ab75654be514e6b0e6aa2f0ee278319
Author: Sankar Hariappan <sa...@apache.org>
AuthorDate: Sat Mar 9 11:14:26 2019 +0530

    HIVE-21286: Hive should support clean-up of previously bootstrapped tables when retry from different dump (Sankar Hariappan, reviewed by Ashutosh Bapat, Mahesh Kumar Behera)
    
    Signed-off-by: Sankar Hariappan <sa...@apache.org>
---
 .../TestReplicationScenariosExternalTables.java    | 129 +++++++++++++++++++--
 .../hadoop/hive/ql/exec/repl/ReplLoadTask.java     |  95 ++++++++++++++-
 .../hadoop/hive/ql/exec/repl/ReplLoadWork.java     |  12 +-
 .../events/filesystem/BootstrapEventsIterator.java |   8 +-
 .../events/filesystem/DatabaseEventsIterator.java  |   2 +-
 .../bootstrap/events/filesystem/FSTableEvent.java  |   7 ++
 .../hadoop/hive/ql/exec/repl/util/ReplUtils.java   |   4 +
 7 files changed, 239 insertions(+), 18 deletions(-)

diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
index a5d1032..72da2f1 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
@@ -22,6 +22,9 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore;
+import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.BehaviourInjection;
+import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.CallerArguments;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
 import org.apache.hadoop.hive.ql.ErrorMsg;
@@ -38,6 +41,7 @@ import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -45,9 +49,11 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
+import javax.annotation.Nullable;
 
 import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.FILE_NAME;
 import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME;
+import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.REPL_CLEAN_TABLES_FROM_BOOTSTRAP_CONFIG;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -535,18 +541,107 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
             .verifyResults(Arrays.asList("10", "20"));
   }
 
-  private List<String> externalTableBasePathWithClause() throws IOException, SemanticException {
-    Path externalTableLocation = new Path(REPLICA_EXTERNAL_BASE);
-    DistributedFileSystem fileSystem = replica.miniDFSCluster.getFileSystem();
-    externalTableLocation = PathBuilder.fullyQualifiedHDFSUri(externalTableLocation, fileSystem);
-    fileSystem.mkdirs(externalTableLocation);
+  @Test
+  public void retryBootstrapExternalTablesFromDifferentDump() throws Throwable {
+    List<String> loadWithClause = new ArrayList<>();
+    loadWithClause.addAll(externalTableBasePathWithClause());
 
-    // this is required since the same filesystem is used in both source and target
-    return Arrays.asList(
-            "'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='"
-                    + externalTableLocation.toString() + "'",
-            "'distcp.options.pugpb'=''"
+    List<String> dumpWithClause = Collections.singletonList(
+            "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='false'"
     );
+
+    WarehouseInstance.Tuple tupleBootstrapWithoutExternal = primary
+            .run("use " + primaryDbName)
+            .run("create external table t1 (id int)")
+            .run("insert into table t1 values (1)")
+            .run("create external table t2 (place string) partitioned by (country string)")
+            .run("insert into table t2 partition(country='india') values ('bangalore')")
+            .run("insert into table t2 partition(country='us') values ('austin')")
+            .run("create table t3 as select * from t1")
+            .dump(primaryDbName, null, dumpWithClause);
+
+    replica.load(replicatedDbName, tupleBootstrapWithoutExternal.dumpLocation, loadWithClause)
+            .status(replicatedDbName)
+            .verifyResult(tupleBootstrapWithoutExternal.lastReplicationId)
+            .run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResult("t3")
+            .run("select id from t3")
+            .verifyResult("1");
+
+    dumpWithClause = Arrays.asList("'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'",
+            "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='true'");
+    WarehouseInstance.Tuple tupleIncWithExternalBootstrap = primary.run("use " + primaryDbName)
+            .run("drop table t1")
+            .run("create external table t4 (id int)")
+            .run("insert into table t4 values (10)")
+            .run("create table t5 as select * from t4")
+            .dump(primaryDbName, tupleBootstrapWithoutExternal.lastReplicationId, dumpWithClause);
+
+    // Fail setting ckpt property for table t4 but success for t2.
+    BehaviourInjection<CallerArguments, Boolean> callerVerifier
+            = new BehaviourInjection<CallerArguments, Boolean>() {
+      @Nullable
+      @Override
+      public Boolean apply(@Nullable CallerArguments args) {
+        if (args.tblName.equalsIgnoreCase("t4") && args.dbName.equalsIgnoreCase(replicatedDbName)) {
+          injectionPathCalled = true;
+          LOG.warn("Verifier - DB : " + args.dbName + " TABLE : " + args.tblName);
+          return false;
+        }
+        return true;
+      }
+    };
+
+    // Fail repl load before the ckpt property is set for t4 and after it is set for t2.
+    // In the retry, these half baked tables should be dropped and bootstrap should be successful.
+    InjectableBehaviourObjectStore.setAlterTableModifier(callerVerifier);
+    try {
+      replica.loadFailure(replicatedDbName, tupleIncWithExternalBootstrap.dumpLocation, loadWithClause);
+      callerVerifier.assertInjectionsPerformed(true, false);
+    } finally {
+      InjectableBehaviourObjectStore.resetAlterTableModifier();
+    }
+
+    // Insert into existing external table and then Drop it, add another managed table with same name
+    // and dump another bootstrap dump for external tables.
+    WarehouseInstance.Tuple tupleNewIncWithExternalBootstrap = primary.run("use " + primaryDbName)
+            .run("insert into table t2 partition(country='india') values ('chennai')")
+            .run("drop table t2")
+            .run("create table t2 as select * from t4")
+            .run("insert into table t4 values (20)")
+            .dump(primaryDbName, tupleIncWithExternalBootstrap.lastReplicationId, dumpWithClause);
+
+    // Set incorrect bootstrap dump to clean tables. Here, used the full bootstrap dump which is invalid.
+    // So, REPL LOAD fails.
+    loadWithClause.add("'" + REPL_CLEAN_TABLES_FROM_BOOTSTRAP_CONFIG + "'='"
+            + tupleBootstrapWithoutExternal.dumpLocation + "'");
+    replica.loadFailure(replicatedDbName, tupleNewIncWithExternalBootstrap.dumpLocation, loadWithClause);
+    loadWithClause.remove("'" + REPL_CLEAN_TABLES_FROM_BOOTSTRAP_CONFIG + "'='"
+            + tupleBootstrapWithoutExternal.dumpLocation + "'");
+
+    // Set previously failed bootstrap dump to clean-up. Now, new bootstrap should overwrite the old one.
+    loadWithClause.add("'" + REPL_CLEAN_TABLES_FROM_BOOTSTRAP_CONFIG + "'='"
+            + tupleIncWithExternalBootstrap.dumpLocation + "'");
+
+    // Verify if bootstrapping with same dump is idempotent and return same result
+    for (int i = 0; i < 2; i++) {
+      replica.load(replicatedDbName, tupleNewIncWithExternalBootstrap.dumpLocation, loadWithClause)
+              .run("use " + replicatedDbName)
+              .run("show tables like 't1'")
+              .verifyFailure(new String[]{"t1"})
+              .run("select id from t2")
+              .verifyResult("10")
+              .run("select id from t4")
+              .verifyResults(Arrays.asList("10", "20"))
+              .run("select id from t5")
+              .verifyResult("10");
+
+      // Once the REPL LOAD is successful, the this config should be unset or else, the subsequent REPL LOAD
+      // will also drop those tables which will cause data loss.
+      loadWithClause.remove("'" + REPL_CLEAN_TABLES_FROM_BOOTSTRAP_CONFIG + "'='"
+              + tupleIncWithExternalBootstrap.dumpLocation + "'");
+    }
   }
 
   @Test
@@ -578,6 +673,20 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
             ErrorMsg.REPL_BOOTSTRAP_LOAD_PATH_NOT_VALID.getErrorCode());
   }
 
+  private List<String> externalTableBasePathWithClause() throws IOException, SemanticException {
+    Path externalTableLocation = new Path(REPLICA_EXTERNAL_BASE);
+    DistributedFileSystem fileSystem = replica.miniDFSCluster.getFileSystem();
+    externalTableLocation = PathBuilder.fullyQualifiedHDFSUri(externalTableLocation, fileSystem);
+    fileSystem.mkdirs(externalTableLocation);
+
+    // this is required since the same filesystem is used in both source and target
+    return Arrays.asList(
+            "'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='"
+                    + externalTableLocation.toString() + "'",
+            "'distcp.options.pugpb'=''"
+    );
+  }
+
   private void assertExternalFileInfo(List<String> expected, Path externalTableInfoFile)
       throws IOException {
     DistributedFileSystem fileSystem = primary.miniDFSCluster.getFileSystem();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
index 7062eda..b5e39b8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
@@ -17,13 +17,17 @@
  */
 package org.apache.hadoop.hive.ql.exec.repl;
 
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.InvalidInputException;
 import org.apache.hadoop.hive.ql.DriverContext;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
-import org.apache.hadoop.hive.ql.exec.repl.util.AddDependencyToLeaves;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.BootstrapEvent;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.ConstraintEvent;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.DatabaseEvent;
@@ -32,21 +36,29 @@ import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.PartitionEvent;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.BootstrapEventsIterator;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.ConstraintEventsIterator;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.FSTableEvent;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadConstraint;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadDatabase;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadFunction;
-import org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadTasksBuilder;
-import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table.LoadPartitions;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table.LoadTable;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table.TableContext;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context;
+import org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadTasksBuilder;
+import org.apache.hadoop.hive.ql.exec.repl.util.AddDependencyToLeaves;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker;
 import org.apache.hadoop.hive.ql.exec.util.DAGTraversal;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
 import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.PathBuilder;
 import org.apache.hadoop.hive.ql.parse.repl.ReplLogger;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
 
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -279,6 +291,77 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
     return 0;
   }
 
+  /**
+   * Cleanup/drop tables from the given database which are bootstrapped by input dump dir.
+   * @throws HiveException Failed to drop the tables.
+   * @throws IOException File operations failure.
+   * @throws InvalidInputException Invalid input dump directory.
+   */
+  private void cleanTablesFromBootstrap() throws HiveException, IOException, InvalidInputException {
+    Path bootstrapDirectory = new PathBuilder(work.bootstrapDumpToCleanTables)
+            .addDescendant(ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME).build();
+    FileSystem fs = bootstrapDirectory.getFileSystem(conf);
+
+    if (!fs.exists(bootstrapDirectory)) {
+      throw new InvalidInputException("Input bootstrap dump directory specified to clean tables from is invalid: "
+              + bootstrapDirectory);
+    }
+
+    FileStatus[] fileStatuses = fs.listStatus(bootstrapDirectory, EximUtil.getDirectoryFilter(fs));
+    if ((fileStatuses == null) || (fileStatuses.length == 0)) {
+      throw new InvalidInputException("Input bootstrap dump directory specified to clean tables from is empty: "
+              + bootstrapDirectory);
+    }
+
+    if (StringUtils.isNotBlank(work.dbNameToLoadIn) && (fileStatuses.length > 1)) {
+      throw new InvalidInputException("Input bootstrap dump directory specified to clean tables from has multiple"
+              + " DB dirs in the dump: " + bootstrapDirectory
+              + " which is not allowed on single target DB: " + work.dbNameToLoadIn);
+    }
+
+    // Iterate over the DBs and tables listed in the input bootstrap dump directory to clean tables from.
+    BootstrapEventsIterator bootstrapEventsIterator
+            = new BootstrapEventsIterator(bootstrapDirectory.toString(), work.dbNameToLoadIn, false, conf);
+
+    // This map will have only one entry if target database is renamed using input DB name from REPL LOAD.
+    // For multiple DBs case, this map maintains the table names list against each DB.
+    Map<String, List<String>> dbToTblsListMap = new HashMap<>();
+    while (bootstrapEventsIterator.hasNext()) {
+      BootstrapEvent event = bootstrapEventsIterator.next();
+      if (event.eventType().equals(BootstrapEvent.EventType.Table)) {
+        FSTableEvent tableEvent = (FSTableEvent) event;
+        String dbName = (StringUtils.isBlank(work.dbNameToLoadIn) ? tableEvent.getDbName() : work.dbNameToLoadIn);
+        List<String> tableNames;
+        if (dbToTblsListMap.containsKey(dbName)) {
+          tableNames = dbToTblsListMap.get(dbName);
+        } else {
+          tableNames = new ArrayList<>();
+          dbToTblsListMap.put(dbName, tableNames);
+        }
+        tableNames.add(tableEvent.getTableName());
+      }
+    }
+
+    // No tables listed in the given bootstrap dump directory specified to clean tables.
+    if (dbToTblsListMap.isEmpty()) {
+      LOG.info("No DB/tables are listed in the bootstrap dump: {} specified to clean tables.",
+              bootstrapDirectory);
+      return;
+    }
+
+    Hive db = getHive();
+    for (Map.Entry<String, List<String>> dbEntry : dbToTblsListMap.entrySet()) {
+      String dbName = dbEntry.getKey();
+      List<String> tableNames = dbEntry.getValue();
+
+      for (String table : tableNames) {
+        db.dropTable(dbName + "." + table, true);
+      }
+      LOG.info("Tables listed in the Database: {} in the bootstrap dump: {} are cleaned",
+              dbName, bootstrapDirectory);
+    }
+  }
+
   private void createEndReplLogTask(Context context, Scope scope,
                                     ReplLogger replLogger) throws SemanticException {
     Map<String, String> dbProps;
@@ -366,6 +449,12 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
 
   private int executeIncrementalLoad(DriverContext driverContext) {
     try {
+      // If user has requested to cleanup any bootstrap dump, then just do it before incremental load.
+      if (work.needCleanTablesFromBootstrap) {
+        cleanTablesFromBootstrap();
+        work.needCleanTablesFromBootstrap = false;
+      }
+
       IncrementalLoadTasksBuilder builder = work.incrementalLoadTasksBuilder();
 
       // If incremental events are already applied, then check and perform if need to bootstrap any tables.
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
index 7539281..c5e0831 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hive.ql.exec.repl;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -43,6 +44,9 @@ public class ReplLoadWork implements Serializable {
   final String dbNameToLoadIn;
   final String tableNameToLoadIn;
   final String dumpDirectory;
+  final String bootstrapDumpToCleanTables;
+  boolean needCleanTablesFromBootstrap;
+
   private final ConstraintEventsIterator constraintsIterator;
   private int loadTaskRunCount = 0;
   private DatabaseEvent.State state = null;
@@ -65,6 +69,9 @@ public class ReplLoadWork implements Serializable {
     sessionStateLineageState = lineageState;
     this.dumpDirectory = dumpDirectory;
     this.dbNameToLoadIn = dbNameToLoadIn;
+    this.bootstrapDumpToCleanTables = hiveConf.get(ReplUtils.REPL_CLEAN_TABLES_FROM_BOOTSTRAP_CONFIG);
+    this.needCleanTablesFromBootstrap = StringUtils.isNotBlank(this.bootstrapDumpToCleanTables);
+
     rootTask = null;
     if (isIncrementalDump) {
       incrementalLoadTasksBuilder =
@@ -78,14 +85,15 @@ public class ReplLoadWork implements Serializable {
       Path incBootstrapDir = new Path(dumpDirectory, ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME);
       FileSystem fs = incBootstrapDir.getFileSystem(hiveConf);
       if (fs.exists(incBootstrapDir)) {
-        this.bootstrapIterator = new BootstrapEventsIterator(incBootstrapDir.toString(), dbNameToLoadIn, hiveConf);
+        this.bootstrapIterator = new BootstrapEventsIterator(incBootstrapDir.toString(), dbNameToLoadIn,
+                true, hiveConf);
         this.constraintsIterator = new ConstraintEventsIterator(dumpDirectory, hiveConf);
       } else {
         this.bootstrapIterator = null;
         this.constraintsIterator = null;
       }
     } else {
-      this.bootstrapIterator = new BootstrapEventsIterator(dumpDirectory, dbNameToLoadIn, hiveConf);
+      this.bootstrapIterator = new BootstrapEventsIterator(dumpDirectory, dbNameToLoadIn, true, hiveConf);
       this.constraintsIterator = new ConstraintEventsIterator(dumpDirectory, hiveConf);
       incrementalLoadTasksBuilder = null;
     }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java
index f1d7563..5735854 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java
@@ -75,9 +75,10 @@ public class BootstrapEventsIterator implements Iterator<BootstrapEvent> {
   private final String dumpDirectory;
   private final String dbNameToLoadIn;
   private final HiveConf hiveConf;
+  private final boolean needLogger;
   private ReplLogger replLogger;
 
-  public BootstrapEventsIterator(String dumpDirectory, String dbNameToLoadIn, HiveConf hiveConf)
+  public BootstrapEventsIterator(String dumpDirectory, String dbNameToLoadIn, boolean needLogger, HiveConf hiveConf)
           throws IOException {
     Path path = new Path(dumpDirectory);
     FileSystem fileSystem = path.getFileSystem(hiveConf);
@@ -107,6 +108,7 @@ public class BootstrapEventsIterator implements Iterator<BootstrapEvent> {
 
     this.dumpDirectory = dumpDirectory;
     this.dbNameToLoadIn = dbNameToLoadIn;
+    this.needLogger = needLogger;
     this.hiveConf = hiveConf;
   }
 
@@ -116,7 +118,9 @@ public class BootstrapEventsIterator implements Iterator<BootstrapEvent> {
       if (currentDatabaseIterator == null) {
         if (dbEventsIterator.hasNext()) {
           currentDatabaseIterator = dbEventsIterator.next();
-          initReplLogger();
+          if (needLogger) {
+            initReplLogger();
+          }
         } else {
           return false;
         }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java
index 874edb9..ae2e1db 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java
@@ -106,7 +106,7 @@ class DatabaseEventsIterator implements Iterator<BootstrapEvent> {
     }
   }
 
-  public Path dbLevelPath() {
+  Path dbLevelPath() {
     return this.dbLevelPath;
   }
 
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java
index 4b382f2..22b6e98 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java
@@ -63,6 +63,13 @@ public class FSTableEvent implements TableEvent {
     }
   }
 
+  public String getDbName() {
+    return metadata.getTable().getDbName();
+  }
+  public String getTableName() {
+    return metadata.getTable().getTableName();
+  }
+
   public boolean shouldNotReplicate() {
     ReplicationSpec spec = replicationSpec();
     return spec.isNoop() || !spec.isInReplicationScope();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
index cb81dd2..a5ed840 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
@@ -66,6 +66,10 @@ public class ReplUtils {
   // tasks.
   public static final String REPL_CURRENT_TBL_WRITE_ID = "hive.repl.current.table.write.id";
 
+  // Configuration to be received via WITH clause of REPL LOAD to clean tables from any previously failed
+  // bootstrap load.
+  public static final String REPL_CLEAN_TABLES_FROM_BOOTSTRAP_CONFIG = "hive.repl.clean.tables.from.bootstrap";
+
   public static final String FUNCTIONS_ROOT_DIR_NAME = "_functions";
   public static final String CONSTRAINTS_ROOT_DIR_NAME = "_constraints";