You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ma...@apache.org on 2019/07/16 06:43:29 UTC

[hive] branch master updated: HIVE-21956 : Add the list of table selected by dump in the dump folder. (Mahesh Kumar Behera reviewed by Sankar Hariappan)

This is an automated email from the ASF dual-hosted git repository.

mahesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new c7340c6  HIVE-21956 : Add the list of table selected by dump in the dump folder.  (Mahesh Kumar Behera reviewed by  Sankar Hariappan)
c7340c6 is described below

commit c7340c6f6e765ef6e499f7a3c399beab843cb6b0
Author: Mahesh Kumar Behera <ma...@apache.org>
AuthorDate: Tue Jul 16 12:04:08 2019 +0530

    HIVE-21956 : Add the list of table selected by dump in the dump folder.  (Mahesh Kumar Behera reviewed by  Sankar Hariappan)
---
 .../parse/TestTableLevelReplicationScenarios.java  | 107 ++++++++++++++++++++-
 .../hadoop/hive/ql/exec/repl/ReplDumpTask.java     | 102 +++++++++++++++++---
 .../events/filesystem/BootstrapEventsIterator.java |   2 +-
 .../events/filesystem/DatabaseEventsIterator.java  |   6 +-
 .../hadoop/hive/ql/exec/repl/util/ReplUtils.java   |  18 +++-
 5 files changed, 218 insertions(+), 17 deletions(-)

diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java
index 09db38d..270e61a 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hive.ql.parse;
 
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
@@ -36,6 +37,11 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.HashSet;
+import java.util.Set;
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
 
 import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.FILE_NAME;
 import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME;
@@ -153,6 +159,9 @@ public class TestTableLevelReplicationScenarios extends BaseReplicationScenarios
       verifyBootstrapDirInIncrementalDump(tuple.dumpLocation, bootstrappedTables);
     }
 
+    // If the policy contains '.'' means its table level replication.
+    verifyTableListForPolicy(tuple.dumpLocation, replPolicy.contains(".'") ? expectedTables : null);
+
     replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
             .run("use " + replicatedDbName)
             .run("show tables")
@@ -194,6 +203,36 @@ public class TestTableLevelReplicationScenarios extends BaseReplicationScenarios
     }
   }
 
+  private void verifyTableListForPolicy(String dumpLocation, String[] tableList) throws Throwable {
+    FileSystem fileSystem = primary.miniDFSCluster.getFileSystem();
+    Path tableListFile = new Path(dumpLocation, ReplUtils.REPL_TABLE_LIST_DIR_NAME);
+    tableListFile = new Path(tableListFile, primaryDbName.toLowerCase());
+
+    if (tableList == null) {
+      Assert.assertFalse(fileSystem.exists(tableListFile));
+      return;
+    } else {
+      Assert.assertTrue(fileSystem.exists(tableListFile));
+    }
+
+    BufferedReader reader = null;
+    try {
+      InputStream inputStream = fileSystem.open(tableListFile);
+      reader = new BufferedReader(new InputStreamReader(inputStream));
+      Set tableNames = new HashSet<>(Arrays.asList(tableList));
+      int numTable = 0;
+      for (String line = reader.readLine(); line != null; line = reader.readLine()) {
+        numTable++;
+        Assert.assertTrue(tableNames.contains(line));
+      }
+      Assert.assertEquals(numTable, tableList.length);
+    } finally {
+      if (reader != null) {
+        reader.close();
+      }
+    }
+  }
+
   @Test
   public void testBasicBootstrapWithIncludeList() throws Throwable {
     String[] originalNonAcidTables = new String[] {"t1", "t2"};
@@ -660,7 +699,7 @@ public class TestTableLevelReplicationScenarios extends BaseReplicationScenarios
             .run("alter table out100 rename to in100") // this will add the bootstrap
             .run("drop table in100");  // table in100 is dropped, so no bootstrap should happen.
 
-    replicatedTables = new String[] {"in200", "in12", "in12", "in14"};
+    replicatedTables = new String[] {"in200", "in12", "in11", "in14"};
     bootstrapTables = new String[] {"in14", "in200"};
     replicateAndVerify(replPolicy, null, lastReplId, null,
             null, bootstrapTables, replicatedTables);
@@ -907,4 +946,70 @@ public class TestTableLevelReplicationScenarios extends BaseReplicationScenarios
     replicateAndVerify(newPolicy, replPolicy, lastReplId, null,
             null, bootstrapTables, replicatedTables, new String[] {"1", "2"});
   }
+
+  @Test
+  public void testRenameTableScenariosUpgrade() throws Throwable {
+    // Policy with no table level filter, no ACID and external table.
+    String replPolicy = primaryDbName;
+    List<String> loadWithClause = ReplicationTestUtils.externalTableBasePathWithClause(REPLICA_EXTERNAL_BASE, replica);
+    List<String> dumpWithClause = Arrays.asList(
+            "'" +  HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='false'",
+            "'" + ReplUtils.REPL_DUMP_INCLUDE_ACID_TABLES + "'='false'"
+    );
+
+    String[] originalNonAcidTables = new String[] {"in1", "out4"};
+    String[] originalExternalTables = new String[] {"in2", "out5"};
+    String[] originalAcidTables = new String[] {"in3", "out6"};
+    createTables(originalNonAcidTables, CreateTableType.NON_ACID);
+    createTables(originalExternalTables, CreateTableType.EXTERNAL);
+    createTables(originalAcidTables, CreateTableType.FULL_ACID);
+
+    // Only NON_ACID table replication is done.
+    String[] replicatedTables = new String[] {"in1", "out4"};
+    String lastReplId = replicateAndVerify(replPolicy, null, null, dumpWithClause,
+            null, new String[] {}, replicatedTables);
+
+    originalNonAcidTables = new String[] {"in7", "out10"};
+    originalExternalTables = new String[] {"in8", "out11"};
+    originalAcidTables = new String[] {"in9", "out12"};
+    createTables(originalNonAcidTables, CreateTableType.NON_ACID);
+    createTables(originalExternalTables, CreateTableType.EXTERNAL);
+    createTables(originalAcidTables, CreateTableType.MM_ACID);
+
+    primary.run("use " + primaryDbName)
+            .run("alter table out4 rename to in4")
+            .run("alter table out5 rename to in5")
+            .run("alter table out6 rename to in6");
+
+    // Table level replication with ACID and EXTERNAL table.
+    String newReplPolicy = primaryDbName + ".'in[0-9]+'.'in8'";
+    dumpWithClause = Arrays.asList(
+            "'" +  HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'",
+            "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='true'",
+            "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES.varname + "'='true'",
+            "'" + ReplUtils.REPL_DUMP_INCLUDE_ACID_TABLES + "'='true'"
+    );
+
+    replicatedTables = new String[] {"in1", "in2", "in3", "in4", "in5", "in6", "in7", "in9"};
+    String[] bootstrapTables = new String[] {"in2", "in3", "in4", "in5", "in6", "in9"};
+    lastReplId = replicateAndVerify(newReplPolicy, replPolicy, lastReplId, dumpWithClause,
+            loadWithClause, bootstrapTables, replicatedTables);
+
+    primary.run("use " + primaryDbName)
+            .run("alter table in4 rename to out4")
+            .run("alter table in5 rename to out5")
+            .run("alter table in6 rename to out6");
+
+    dumpWithClause = Arrays.asList(
+            "'" +  HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'",
+            "'" + ReplUtils.REPL_DUMP_INCLUDE_ACID_TABLES + "'='true'"
+    );
+
+    // Database replication with ACID and EXTERNAL table.
+    replicatedTables = new String[] {"in1", "in2", "in3", "out4", "out5", "out6", "in7", "in8",
+            "in9", "out10", "out11", "out12"};
+    bootstrapTables = new String[] {"out4", "out5", "out6", "in8", "out10", "out11", "out12"};
+    replicateAndVerify(replPolicy, newReplPolicy, lastReplId, dumpWithClause,
+            loadWithClause, bootstrapTables, replicatedTables);
+  }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index 9b80408..4cd60cc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -17,8 +17,10 @@
  */
 package org.apache.hadoop.hive.ql.exec.repl;
 
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -69,7 +71,10 @@ import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.security.auth.login.LoginException;
+import java.io.IOException;
 import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -219,6 +224,24 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
     return !ReplUtils.tableIncludedInReplScope(work.oldReplScope, table.getTableName());
   }
 
+  private boolean isTableSatifiesConfig(Table table) {
+    if (table == null) {
+      return false;
+    }
+
+    if (TableType.EXTERNAL_TABLE.equals(table.getTableType())
+            && !conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES)) {
+      return false;
+    }
+
+    if (AcidUtils.isTransactionalTable(table)
+            && !ReplUtils.includeAcidTableInDump(conf)) {
+      return false;
+    }
+
+    return true;
+  }
+
   private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) throws Exception {
     Long lastReplId;// get list of events matching dbPattern & tblPattern
     // go through each event, and dump out each event to a event-level dump dir inside dumproot
@@ -226,6 +249,8 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
     long waitUntilTime = 0;
     long bootDumpBeginReplId = -1;
 
+    List<String> tableList = work.replScope.includeAllTables() ? null : new ArrayList<>();
+
     // If we are bootstrapping ACID tables, we need to perform steps similar to a regular
     // bootstrap (See bootstrapDump() for more details. Only difference here is instead of
     // waiting for the concurrent transactions to finish, we start dumping the incremental events
@@ -292,7 +317,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
     dmd.write();
 
     // Examine all the tables if required.
-    if (shouldExamineTablesToDump()) {
+    if (shouldExamineTablesToDump() || (tableList != null)) {
       // If required wait more for any transactions open at the time of starting the ACID bootstrap.
       if (needBootstrapAcidTablesDuringIncrementalDump()) {
         assert (waitUntilTime > 0);
@@ -318,6 +343,9 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
               dumpTable(dbName, tableName, validTxnList, dbRoot, bootDumpBeginReplId, hiveDb,
                       tableTuple);
             }
+            if (tableList != null && isTableSatifiesConfig(table)) {
+              tableList.add(tableName);
+            }
           } catch (InvalidTableException te) {
             // Repl dump shouldn't fail if the table is dropped/renamed while dumping it.
             // Just log a debug message and skip it.
@@ -325,6 +353,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
           }
         }
       }
+      dumpTableListToDumpLocation(tableList, dumpRoot, dbName, conf);
     }
 
     return lastReplId;
@@ -376,13 +405,61 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
     return rspec;
   }
 
-  Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) throws Exception {
+  private void dumpTableListToDumpLocation(List<String> tableList, Path dbRoot, String dbName,
+                                           HiveConf hiveConf) throws IOException, LoginException {
+    // Empty list will create an empty file to distinguish it from db level replication. If no file is there, that means
+    // db level replication. If empty file is there, means no table satisfies the policy.
+    if (tableList == null) {
+      LOG.debug("Table list file is not created for db level replication.");
+      return;
+    }
+
+    // The table list is dumped in _tables/dbname file
+    Path tableListFile = new Path(dbRoot, ReplUtils.REPL_TABLE_LIST_DIR_NAME);
+    tableListFile = new Path(tableListFile, dbName.toLowerCase());
+
+    int count = 0;
+    while (count < FileUtils.MAX_IO_ERROR_RETRY) {
+      try (FSDataOutputStream writer = FileSystem.get(hiveConf).create(tableListFile)) {
+        for (String tableName : tableList) {
+          String line = tableName.toLowerCase().concat("\n");
+          writer.write(line.getBytes(StandardCharsets.UTF_8));
+        }
+        // Close is called explicitly as close also calls the actual file system write,
+        // so there is chance of i/o exception thrown by close.
+        writer.close();
+        break;
+      } catch (IOException e) {
+        LOG.info("File operation failed", e);
+        if (count >= (FileUtils.MAX_IO_ERROR_RETRY - 1)) {
+          //no need to wait in the last iteration
+          LOG.error("File " + tableListFile.toUri() + " creation failed even after " +
+                  FileUtils.MAX_IO_ERROR_RETRY + " attempts.");
+          throw new IOException(ErrorMsg.REPL_FILE_SYSTEM_OPERATION_RETRY.getMsg());
+        }
+        int sleepTime = FileUtils.getSleepTime(count);
+        LOG.info("Sleep for " + sleepTime + " milliseconds before retry " + (count+1));
+        try {
+          Thread.sleep(sleepTime);
+        } catch (InterruptedException timerEx) {
+          LOG.info("Sleep interrupted", timerEx.getMessage());
+        }
+        FileSystem.closeAllForUGI(org.apache.hadoop.hive.shims.Utils.getUGI());
+      }
+      count++;
+    }
+    LOG.info("Table list file " + tableListFile.toUri() + " is created for table list - " + tableList);
+  }
+
+  Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb)
+          throws Exception {
     // bootstrap case
     // Last repl id would've been captured during compile phase in queryState configs before opening txn.
     // This is needed as we dump data on ACID/MM tables based on read snapshot or else we may lose data from
     // concurrent txns when bootstrap dump in progress. If it is not available, then get it from metastore.
     Long bootDumpBeginReplId = queryState.getConf().getLong(ReplUtils.LAST_REPL_ID_KEY, -1L);
     assert (bootDumpBeginReplId >= 0L);
+    List<String> tableList;
 
     LOG.info("Bootstrap Dump for db {}", work.dbNameOrPattern);
     long timeoutInMs = HiveConf.getTimeVar(conf,
@@ -391,7 +468,10 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
 
     String validTxnList = getValidTxnListForReplDump(hiveDb, waitUntilTime);
     for (String dbName : Utils.matchesDb(hiveDb, work.dbNameOrPattern)) {
-      LOG.debug("ReplicationSemanticAnalyzer: analyzeReplDump dumping db: " + dbName);
+      LOG.debug("Dumping db: " + dbName);
+
+      // TODO : Currently we don't support separate table list for each database.
+      tableList = work.replScope.includeAllTables() ? null : new ArrayList<>();
       Database db = hiveDb.getDatabase(dbName);
       if ((db != null) && (ReplUtils.isFirstIncPending(db.getParameters()))) {
         // For replicated (target) database, until after first successful incremental load, the database will not be
@@ -413,18 +493,12 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
                       && !conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY);
       try (Writer writer = new Writer(dbRoot, conf)) {
         for (String tblName : Utils.matchesTbl(hiveDb, dbName, work.replScope)) {
-          LOG.debug(
-              "analyzeReplDump dumping table: " + tblName + " to db root " + dbRoot.toUri());
+          LOG.debug("Dumping table: " + tblName + " to db root " + dbRoot.toUri());
+          Table table = null;
 
           try {
             HiveWrapper.Tuple<Table> tableTuple = new HiveWrapper(hiveDb, dbName).table(tblName, conf);
-            Table table = tableTuple != null ? tableTuple.object : null;
-            if (table != null && ReplUtils.isFirstIncPending(table.getParameters())) {
-              // For replicated (target) table, until after first successful incremental load, the table will not be
-              // in a consistent state. Avoid allowing replicating this table to a new target.
-              throw new HiveException("Replication dump not allowed for replicated table" +
-                      " with first incremental dump pending : " + tblName);
-            }
+            table = tableTuple != null ? tableTuple.object : null;
             if (shouldWriteExternalTableLocationInfo
                     && TableType.EXTERNAL_TABLE.equals(tableTuple.object.getTableType())) {
               LOG.debug("Adding table {} to external tables list", tblName);
@@ -438,7 +512,11 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
             LOG.debug(te.getMessage());
           }
           dumpConstraintMetadata(dbName, tblName, dbRoot, hiveDb);
+          if (tableList != null && isTableSatifiesConfig(table)) {
+            tableList.add(tblName);
+          }
         }
+        dumpTableListToDumpLocation(tableList, dumpRoot, dbName, conf);
       } catch (Exception e) {
         caught = e;
       } finally {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java
index 5735854..ab6e09f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java
@@ -83,7 +83,7 @@ public class BootstrapEventsIterator implements Iterator<BootstrapEvent> {
     Path path = new Path(dumpDirectory);
     FileSystem fileSystem = path.getFileSystem(hiveConf);
     FileStatus[] fileStatuses =
-        fileSystem.listStatus(new Path(dumpDirectory), EximUtil.getDirectoryFilter(fileSystem));
+        fileSystem.listStatus(new Path(dumpDirectory), ReplUtils.getBootstrapDirectoryFilter(fileSystem));
     if ((fileStatuses == null) || (fileStatuses.length == 0)) {
       throw new IllegalArgumentException("No data to load in path " + dumpDirectory);
     }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java
index ae2e1db..5665bda 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.BootstrapEvent;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
 import org.apache.hadoop.hive.ql.parse.EximUtil;
 import org.apache.hadoop.hive.ql.parse.repl.load.EventDumpDirComparator;
 import org.slf4j.Logger;
@@ -122,8 +123,9 @@ class DatabaseEventsIterator implements Iterator<BootstrapEvent> {
         while (remoteIterator.hasNext()) {
           LocatedFileStatus next = remoteIterator.next();
           // we want to skip this file, this also means there cant be a table with name represented
-          // by constant ReplExternalTables.FILE_NAME
-          if(next.getPath().toString().endsWith(ReplExternalTables.FILE_NAME)) {
+          // by constant ReplExternalTables.FILE_NAME or ReplUtils.REPL_TABLE_LIST_DIR_NAME (_tables)
+          if(next.getPath().toString().endsWith(ReplExternalTables.FILE_NAME) ||
+                  next.getPath().toString().endsWith(ReplUtils.REPL_TABLE_LIST_DIR_NAME)) {
             continue;
           }
           if (next.getPath().toString().endsWith(EximUtil.METADATA_NAME)) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
index 1142595..1df5077 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hive.ql.exec.repl.util;
 
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hive.common.repl.ReplConst;
 import org.apache.hadoop.hive.common.repl.ReplScope;
@@ -80,6 +81,10 @@ public class ReplUtils {
   // Root directory for dumping bootstrapped tables along with incremental events dump.
   public static final String INC_BOOTSTRAP_ROOT_DIR_NAME = "_bootstrap";
 
+  // Name of the directory which stores the list of tables included in the policy in case of table level replication.
+  // One file per database, named after the db name. The directory is not created for db level replication.
+  public static final String REPL_TABLE_LIST_DIR_NAME = "_tables";
+
   // Migrating to transactional tables in bootstrap load phase.
   // It is enough to copy all the original files under base_1 dir and so write-id is hardcoded to 1.
   public static final Long REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID = 1L;
@@ -232,7 +237,18 @@ public class ReplUtils {
   public static PathFilter getEventsDirectoryFilter(final FileSystem fs) {
     return p -> {
       try {
-        return fs.isDirectory(p) && !p.getName().equalsIgnoreCase(ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME);
+        return fs.isDirectory(p) && !p.getName().equalsIgnoreCase(ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME)
+                && !p.getName().equalsIgnoreCase(ReplUtils.REPL_TABLE_LIST_DIR_NAME);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    };
+  }
+
+  public static PathFilter getBootstrapDirectoryFilter(final FileSystem fs) {
+    return p -> {
+      try {
+        return fs.isDirectory(p) && !p.getName().equalsIgnoreCase(ReplUtils.REPL_TABLE_LIST_DIR_NAME);
       } catch (IOException e) {
         throw new RuntimeException(e);
       }