You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sa...@apache.org on 2019/01/31 17:09:34 UTC
[hive] branch master updated: HIVE-21029: External table
replication for existing deployments running incremental replication
(Sankar Hariappan, reviewed by Mahesh Kumar Behera)
This is an automated email from the ASF dual-hosted git repository.
sankarh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new f2e10f2 HIVE-21029: External table replication for existing deployments running incremental replication (Sankar Hariappan, reviewed by Mahesh Kumar Behera)
f2e10f2 is described below
commit f2e10f26e89eb147d93944762dc442dacb957c60
Author: Sankar Hariappan <sa...@apache.org>
AuthorDate: Thu Jan 31 22:37:22 2019 +0530
HIVE-21029: External table replication for existing deployments running incremental replication (Sankar Hariappan, reviewed by Mahesh Kumar Behera)
Signed-off-by: Sankar Hariappan <sa...@apache.org>
---
.../java/org/apache/hadoop/hive/conf/HiveConf.java | 11 ++-
.../TestReplicationScenariosExternalTables.java | 92 ++++++++++++++++++++++
...licationScenariosIncrementalLoadAcidTables.java | 2 +
ql/src/java/org/apache/hadoop/hive/ql/Driver.java | 6 +-
.../hadoop/hive/ql/exec/repl/ReplDumpTask.java | 49 ++++++++----
.../hive/ql/exec/repl/ReplExternalTables.java | 7 +-
.../hadoop/hive/ql/exec/repl/ReplLoadTask.java | 62 ++++++++++-----
.../hadoop/hive/ql/exec/repl/ReplLoadWork.java | 47 +++++++----
.../events/filesystem/BootstrapEventsIterator.java | 15 ++--
.../filesystem/ConstraintEventsIterator.java | 4 +-
.../events/filesystem/DatabaseEventsIterator.java | 4 +-
.../bootstrap/events/filesystem/FSTableEvent.java | 3 -
.../repl/bootstrap/load/table/LoadPartitions.java | 7 --
.../incremental/IncrementalLoadEventsIterator.java | 4 +-
.../incremental/IncrementalLoadTasksBuilder.java | 10 +--
.../hadoop/hive/ql/exec/repl/util/ReplUtils.java | 20 +++++
.../hive/ql/parse/ReplicationSemanticAnalyzer.java | 4 -
.../hive/ql/parse/repl/dump/HiveWrapper.java | 4 +
.../hive/ql/parse/repl/dump/TableExport.java | 2 +-
.../hadoop/hive/ql/parse/repl/dump/Utils.java | 17 ++--
.../events/AbstractConstraintEventHandler.java | 1 +
.../repl/dump/events/AddPartitionHandler.java | 2 +-
.../repl/dump/events/AlterPartitionHandler.java | 2 +-
.../parse/repl/dump/events/AlterTableHandler.java | 2 +-
.../parse/repl/dump/events/CreateTableHandler.java | 2 +-
.../ql/parse/repl/dump/events/InsertHandler.java | 2 +-
.../repl/dump/events/UpdatePartColStatHandler.java | 2 +-
.../dump/events/UpdateTableColStatHandler.java | 2 +-
.../ql/parse/repl/dump/io/TableSerializer.java | 2 +-
29 files changed, 279 insertions(+), 108 deletions(-)
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index b3a4754..085bed9 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -489,7 +489,16 @@ public class HiveConf extends Configuration {
"Indicates if repl dump should include information about external tables. It should be \n"
+ "used in conjunction with 'hive.repl.dump.metadata.only' set to false. if 'hive.repl.dump.metadata.only' \n"
+ " is set to true then this config parameter has no effect as external table meta data is flushed \n"
- + " always by default."),
+ + " always by default. If this config parameter is enabled on an on-going replication policy which is in\n"
+ + " incremental phase, then need to set 'hive.repl.bootstrap.external.tables' to true for the first \n"
+ + " repl dump to bootstrap all external tables."),
+ REPL_BOOTSTRAP_EXTERNAL_TABLES("hive.repl.bootstrap.external.tables", false,
+ "Indicates if repl dump should bootstrap the information about external tables along with incremental \n"
+ + "dump for replication. It is recommended to keep this config parameter as false always and should be \n"
+ + "set to true only via WITH clause of REPL DUMP command. It should be used in conjunction with \n"
+ + "'hive.repl.include.external.tables' when sets to true. If 'hive.repl.include.external.tables' is \n"
+ + "set to false, then this config parameter has no effect. It should be set to true only once for \n"
+ + "incremental repl dump on each existing replication policy after enabling external tables replication."),
REPL_ENABLE_MOVE_OPTIMIZATION("hive.repl.enable.move.optimization", false,
"If its set to true, REPL LOAD copies data files directly to the target table/partition location \n"
+ "instead of copying to staging directory first and then move to target location. This optimizes \n"
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
index 40ce4b4..c25e6e2 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
@@ -46,6 +46,7 @@ import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.FILE_NAME;
+import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -393,6 +394,97 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
}
+ @Test
+ public void bootstrapExternalTablesDuringIncrementalPhase() throws Throwable {
+ List<String> loadWithClause = externalTableBasePathWithClause();
+ List<String> dumpWithClause = Collections.singletonList(
+ "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='false'"
+ );
+
+ WarehouseInstance.Tuple tuple = primary
+ .run("use " + primaryDbName)
+ .run("create external table t1 (id int)")
+ .run("insert into table t1 values (1)")
+ .run("insert into table t1 values (2)")
+ .run("create external table t2 (place string) partitioned by (country string)")
+ .run("insert into table t2 partition(country='india') values ('bangalore')")
+ .run("insert into table t2 partition(country='us') values ('austin')")
+ .run("insert into table t2 partition(country='france') values ('paris')")
+ .dump(primaryDbName, null, dumpWithClause);
+
+ // the _external_tables_file info only should be created if external tables are to be replicated not otherwise
+ assertFalse(primary.miniDFSCluster.getFileSystem()
+ .exists(new Path(new Path(tuple.dumpLocation, primaryDbName.toLowerCase()), FILE_NAME)));
+
+ replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
+ .status(replicatedDbName)
+ .verifyResult(tuple.lastReplicationId)
+ .run("use " + replicatedDbName)
+ .run("show tables like 't1'")
+ .verifyFailure(new String[] {"t1" })
+ .run("show tables like 't2'")
+ .verifyFailure(new String[] {"t2" });
+
+ dumpWithClause = Arrays.asList("'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'",
+ "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='true'");
+ tuple = primary.run("use " + primaryDbName)
+ .run("drop table t1")
+ .run("create external table t3 (id int)")
+ .run("insert into table t3 values (10)")
+ .run("insert into table t3 values (20)")
+ .run("create table t4 as select * from t3")
+ .dump(primaryDbName, tuple.lastReplicationId, dumpWithClause);
+
+ // the _external_tables_file info should be created as external tables are to be replicated.
+ assertTrue(primary.miniDFSCluster.getFileSystem()
+ .exists(new Path(tuple.dumpLocation, FILE_NAME)));
+
+ // verify that the external table info is written correctly for incremental
+ assertExternalFileInfo(Arrays.asList("t2", "t3"),
+ new Path(tuple.dumpLocation, FILE_NAME));
+
+ // _bootstrap directory should be created as bootstrap enabled on external tables.
+ Path dumpPath = new Path(tuple.dumpLocation, INC_BOOTSTRAP_ROOT_DIR_NAME);
+ assertTrue(primary.miniDFSCluster.getFileSystem().exists(dumpPath));
+
+ // _bootstrap/<db_name>/t2
+ // _bootstrap/<db_name>/t3
+ Path dbPath = new Path(dumpPath, primaryDbName);
+ Path tblPath = new Path(dbPath, "t2");
+ assertTrue(primary.miniDFSCluster.getFileSystem().exists(tblPath));
+ tblPath = new Path(dbPath, "t3");
+ assertTrue(primary.miniDFSCluster.getFileSystem().exists(tblPath));
+
+ replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
+ .status(replicatedDbName)
+ .verifyResult(tuple.lastReplicationId)
+ .run("use " + replicatedDbName)
+ .run("show tables like 't1'")
+ .verifyFailure(new String[] {"t1" })
+ .run("show tables like 't2'")
+ .verifyResult("t2")
+ .run("show tables like 't3'")
+ .verifyResult("t3")
+ .run("show tables like 't4'")
+ .verifyResult("t4");
+
+ // Drop source tables to see if target points to correct data or not after bootstrap load.
+ primary.run("use " + primaryDbName)
+ .run("drop table t2")
+ .run("drop table t3");
+
+ // Create table event for t4 should be applied along with bootstrapping of t2 and t3
+ replica.run("use " + replicatedDbName)
+ .run("select place from t2 where country = 'us'")
+ .verifyResult("austin")
+ .run("select place from t2 where country = 'france'")
+ .verifyResult("paris")
+ .run("select id from t3 order by id")
+ .verifyResults(Arrays.asList("10", "20"))
+ .run("select id from t4 order by id")
+ .verifyResults(Arrays.asList("10", "20"));
+ }
+
private List<String> externalTableBasePathWithClause() throws IOException, SemanticException {
Path externalTableLocation = new Path(REPLICA_EXTERNAL_BASE);
DistributedFileSystem fileSystem = replica.miniDFSCluster.getFileSystem();
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosIncrementalLoadAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosIncrementalLoadAcidTables.java
index 5529d9e..a4b044d 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosIncrementalLoadAcidTables.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosIncrementalLoadAcidTables.java
@@ -81,6 +81,7 @@ public class TestReplicationScenariosIncrementalLoadAcidTables {
put("hive.mapred.mode", "nonstrict");
put("mapred.input.dir.recursive", "true");
put("hive.metastore.disallow.incompatible.col.type.changes", "false");
+ put("hive.stats.autogather", "false");
}};
acidConfs.putAll(overrides);
@@ -91,6 +92,7 @@ public class TestReplicationScenariosIncrementalLoadAcidTables {
put("hive.support.concurrency", "false");
put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager");
put("hive.metastore.client.capability.check", "false");
+ put("hive.stats.autogather", "false");
}};
replicaNonAcid = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf1);
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 01ecf0a..0dd50de 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -83,6 +83,7 @@ import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.exec.TaskResult;
import org.apache.hadoop.hive.ql.exec.TaskRunner;
import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession;
import org.apache.hadoop.hive.ql.history.HiveHistory.Keys;
import org.apache.hadoop.hive.ql.hooks.Entity;
@@ -120,7 +121,6 @@ import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.parse.ParseException;
import org.apache.hadoop.hive.ql.parse.ParseUtils;
import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
-import org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer;
import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
import org.apache.hadoop.hive.ql.parse.SemanticAnalyzerFactory;
import org.apache.hadoop.hive.ql.plan.DDLDesc.DDLDescWithWriteId;
@@ -927,8 +927,8 @@ public class Driver implements IDriver {
// Last logged notification event id would be the last repl Id for the current REPl DUMP.
Hive hiveDb = Hive.get();
Long lastReplId = hiveDb.getMSC().getCurrentNotificationEventId().getEventId();
- conf.setLong(ReplicationSemanticAnalyzer.LAST_REPL_ID_KEY, lastReplId);
- LOG.debug("Setting " + ReplicationSemanticAnalyzer.LAST_REPL_ID_KEY + " = " + lastReplId);
+ conf.setLong(ReplUtils.LAST_REPL_ID_KEY, lastReplId);
+ LOG.debug("Setting " + ReplUtils.LAST_REPL_ID_KEY + " = " + lastReplId);
}
private void openTransaction() throws LockException, CommandProcessorResponse {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index 947bfcf..acfa354 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hive.metastore.messaging.event.filters.EventBoundaryFil
import org.apache.hadoop.hive.ql.DriverContext;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.lockmgr.LockException;
import org.apache.hadoop.hive.ql.metadata.Hive;
@@ -47,7 +48,6 @@ import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.metadata.events.EventUtils;
import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec;
import org.apache.hadoop.hive.ql.parse.EximUtil;
-import org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer;
import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.repl.DumpType;
@@ -80,15 +80,13 @@ import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.Writer;
public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
private static final String dumpSchema = "dump_dir,last_repl_id#string,string";
- private static final String FUNCTIONS_ROOT_DIR_NAME = "_functions";
- private static final String CONSTRAINTS_ROOT_DIR_NAME = "_constraints";
- private static final String FUNCTION_METADATA_FILE_NAME = "_metadata";
+ private static final String FUNCTION_METADATA_FILE_NAME = EximUtil.METADATA_NAME;
private static final long SLEEP_TIME = 60000;
public enum ConstraintFileType {COMMON("common", "c_"), FOREIGNKEY("fk", "f_");
private final String name;
private final String prefix;
- private ConstraintFileType(String name, String prefix) {
+ ConstraintFileType(String name, String prefix) {
this.name = name;
this.prefix = prefix;
}
@@ -195,13 +193,23 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
dmd.setDump(DumpType.INCREMENTAL, work.eventFrom, lastReplId, cmRoot);
dmd.write();
- if (conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES) &&
- !conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY)) {
+ // If external tables are enabled for replication and
+ // - If bootstrap is enabled, then need to combine bootstrap dump of external tables.
+ // - If metadata-only dump is enabled, then shall skip dumping external tables data locations to
+ // _external_tables_info file. If not metadata-only, then dump the data locations.
+ if (conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES)
+ && (!conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY)
+ || conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES))) {
+ Path dbRoot = getBootstrapDbRoot(dumpRoot, dbName, true);
try (Writer writer = new Writer(dumpRoot, conf)) {
for (String tableName : Utils.matchesTbl(hiveDb, dbName, work.tableNameOrPattern)) {
Table table = hiveDb.getTable(dbName, tableName);
if (TableType.EXTERNAL_TABLE.equals(table.getTableType())) {
writer.dataLocationDump(table);
+ if (conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES)) {
+ HiveWrapper.Tuple<Table> tableTuple = new HiveWrapper(hiveDb, dbName).table(table);
+ dumpTable(dbName, tableName, null, dbRoot, 0, hiveDb, tableTuple);
+ }
}
}
}
@@ -209,6 +217,13 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
return lastReplId;
}
+ private Path getBootstrapDbRoot(Path dumpRoot, String dbName, boolean isIncrementalPhase) {
+ if (isIncrementalPhase) {
+ dumpRoot = new Path(dumpRoot, ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME);
+ }
+ return new Path(dumpRoot, dbName);
+ }
+
private void dumpEvent(NotificationEvent ev, Path evRoot, Path cmRoot, Hive db) throws Exception {
EventHandler.Context context = new EventHandler.Context(
evRoot,
@@ -237,7 +252,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
// Last repl id would've been captured during compile phase in queryState configs before opening txn.
// This is needed as we dump data on ACID/MM tables based on read snapshot or else we may lose data from
// concurrent txns when bootstrap dump in progress. If it is not available, then get it from metastore.
- Long bootDumpBeginReplId = queryState.getConf().getLong(ReplicationSemanticAnalyzer.LAST_REPL_ID_KEY, -1L);
+ Long bootDumpBeginReplId = queryState.getConf().getLong(ReplUtils.LAST_REPL_ID_KEY, -1L);
assert (bootDumpBeginReplId >= 0L);
String validTxnList = getValidTxnListForReplDump(hiveDb);
@@ -252,6 +267,9 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
String uniqueKey = Utils.setDbBootstrapDumpState(hiveDb, dbName);
Exception caught = null;
+ boolean shouldWriteExternalTableLocationInfo =
+ conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES)
+ && !conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY);
try (Writer writer = new Writer(dbRoot, conf)) {
for (String tblName : Utils.matchesTbl(hiveDb, dbName, work.tableNameOrPattern)) {
LOG.debug(
@@ -259,12 +277,9 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
try {
HiveWrapper.Tuple<Table> tableTuple = new HiveWrapper(hiveDb, dbName).table(tblName,
conf);
- boolean shouldWriteExternalTableLocationInfo =
- conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES)
- && TableType.EXTERNAL_TABLE.equals(tableTuple.object.getTableType())
- && !conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY);
- if (shouldWriteExternalTableLocationInfo) {
- LOG.debug("adding table {} to external tables list", tblName);
+ if (shouldWriteExternalTableLocationInfo
+ && TableType.EXTERNAL_TABLE.equals(tableTuple.object.getTableType())) {
+ LOG.debug("Adding table {} to external tables list", tblName);
writer.dataLocationDump(tableTuple.object);
}
dumpTable(dbName, tblName, validTxnList, dbRoot, bootDumpBeginReplId, hiveDb,
@@ -312,7 +327,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
}
Path dumpDbMetadata(String dbName, Path dumpRoot, long lastReplId, Hive hiveDb) throws Exception {
- Path dbRoot = new Path(dumpRoot, dbName);
+ Path dbRoot = getBootstrapDbRoot(dumpRoot, dbName, false);
// TODO : instantiating FS objects are generally costly. Refactor
FileSystem fs = dbRoot.getFileSystem(conf);
Path dumpPath = new Path(dbRoot, EximUtil.METADATA_NAME);
@@ -435,7 +450,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
}
void dumpFunctionMetadata(String dbName, Path dumpRoot, Hive hiveDb) throws Exception {
- Path functionsRoot = new Path(new Path(dumpRoot, dbName), FUNCTIONS_ROOT_DIR_NAME);
+ Path functionsRoot = new Path(new Path(dumpRoot, dbName), ReplUtils.FUNCTIONS_ROOT_DIR_NAME);
List<String> functionNames = hiveDb.getFunctions(dbName, "*");
for (String functionName : functionNames) {
HiveWrapper.Tuple<Function> tuple = functionTuple(functionName, dbName, hiveDb);
@@ -455,7 +470,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
void dumpConstraintMetadata(String dbName, String tblName, Path dbRoot, Hive hiveDb) throws Exception {
try {
- Path constraintsRoot = new Path(dbRoot, CONSTRAINTS_ROOT_DIR_NAME);
+ Path constraintsRoot = new Path(dbRoot, ReplUtils.CONSTRAINTS_ROOT_DIR_NAME);
Path commonConstraintsFile = new Path(constraintsRoot, ConstraintFileType.COMMON.getPrefix() + tblName);
Path fkConstraintsFile = new Path(constraintsRoot, ConstraintFileType.FOREIGNKEY.getPrefix() + tblName);
List<SQLPrimaryKey> pks = hiveDb.getPrimaryKeyList(dbName, tblName);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java
index 59b7c1c..015bc26 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java
@@ -84,13 +84,14 @@ public final class ReplExternalTables {
private static Logger LOG = LoggerFactory.getLogger(Writer.class);
private final HiveConf hiveConf;
private final Path writePath;
- private final Boolean excludeExternalTables, dumpMetadataOnly;
+ private final boolean includeExternalTables;
+ private final boolean dumpMetadataOnly;
private OutputStream writer;
Writer(Path dbRoot, HiveConf hiveConf) throws IOException {
this.hiveConf = hiveConf;
writePath = new Path(dbRoot, FILE_NAME);
- excludeExternalTables = !hiveConf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES);
+ includeExternalTables = hiveConf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES);
dumpMetadataOnly = hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY);
if (shouldWrite()) {
this.writer = FileSystem.get(hiveConf).create(writePath);
@@ -98,7 +99,7 @@ public final class ReplExternalTables {
}
private boolean shouldWrite() {
- return !dumpMetadataOnly && !excludeExternalTables;
+ return !dumpMetadataOnly && includeExternalTables;
}
/**
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
index 2126aab..4dc14f4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table.LoadTable;
import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table.TableContext;
import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context;
import org.apache.hadoop.hive.ql.exec.util.DAGTraversal;
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.repl.ReplLogger;
import org.apache.hadoop.hive.ql.plan.api.StageType;
@@ -49,7 +50,9 @@ import org.apache.hadoop.hive.ql.plan.api.StageType;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import static org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadDatabase.AlterDatabase;
@@ -61,6 +64,11 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
return (work.isIncrementalLoad() ? "REPL_INCREMENTAL_LOAD" : "REPL_BOOTSTRAP_LOAD");
}
+ @Override
+ public StageType getType() {
+ return work.isIncrementalLoad() ? StageType.REPL_INCREMENTAL_LOAD : StageType.REPL_BOOTSTRAP_LOAD;
+ }
+
/**
* Provides the root Tasks created as a result of this loadTask run which will be executed
* by the driver. It does not track details across multiple runs of LoadTask.
@@ -96,8 +104,8 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
of multiple databases once we have the basic flow to chain creating of tasks in place for
a database ( directory )
*/
- BootstrapEventsIterator iterator = work.iterator();
- ConstraintEventsIterator constraintIterator = work.constraintIterator();
+ BootstrapEventsIterator iterator = work.bootstrapIterator();
+ ConstraintEventsIterator constraintIterator = work.constraintsIterator();
/*
This is used to get hold of a reference during the current creation of tasks and is initialized
with "0" tasks such that it will be non consequential in any operations done with task tracker
@@ -241,7 +249,11 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
if (addAnotherLoadTask) {
createBuilderTask(scope.rootTasks);
}
- if (!iterator.hasNext() && !constraintIterator.hasNext()) {
+
+ // Update last repl ID of the database only if the current dump is not incremental. If bootstrap
+ // is combined with incremental dump, it contains only tables to bootstrap. So, needn't change
+ // last repl ID of the database.
+ if (!iterator.hasNext() && !constraintIterator.hasNext() && !work.isIncrementalLoad()) {
loadTaskTracker.update(updateDatabaseLastReplID(maxTasks, context, scope));
work.updateDbEventState(null);
}
@@ -268,9 +280,17 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
}
private void createEndReplLogTask(Context context, Scope scope,
- ReplLogger replLogger) throws SemanticException {
- Database dbInMetadata = work.databaseEvent(context.hiveConf).dbInMetadata(work.dbNameToLoadIn);
- ReplStateLogWork replLogWork = new ReplStateLogWork(replLogger, dbInMetadata.getParameters());
+ ReplLogger replLogger) throws SemanticException {
+ Map<String, String> dbProps;
+ if (work.isIncrementalLoad()) {
+ dbProps = new HashMap<>();
+ dbProps.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(),
+ work.incrementalLoadTasksBuilder().eventTo().toString());
+ } else {
+ Database dbInMetadata = work.databaseEvent(context.hiveConf).dbInMetadata(work.dbNameToLoadIn);
+ dbProps = dbInMetadata.getParameters();
+ }
+ ReplStateLogWork replLogWork = new ReplStateLogWork(replLogger, dbProps);
Task<ReplStateLogWork> replLogTask = TaskFactory.get(replLogWork);
if (scope.rootTasks.isEmpty()) {
scope.rootTasks.add(replLogTask);
@@ -344,20 +364,25 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
DAGTraversal.traverse(rootTasks, new AddDependencyToLeaves(loadTask));
}
- @Override
- public StageType getType() {
- return work.isIncrementalLoad() ? StageType.REPL_INCREMENTAL_LOAD : StageType.REPL_BOOTSTRAP_LOAD;
- }
-
private int executeIncrementalLoad(DriverContext driverContext) {
try {
+ IncrementalLoadTasksBuilder builder = work.incrementalLoadTasksBuilder();
+
+ // If incremental events are already applied, then check and perform if need to bootstrap any tables.
+ if (!builder.hasMoreWork() && !work.getPathsToCopyIterator().hasNext()) {
+ if (work.hasBootstrapLoadTasks()) {
+ LOG.debug("Current incremental dump have tables to be bootstrapped. Switching to bootstrap "
+ + "mode after applying all events.");
+ return executeBootStrapLoad(driverContext);
+ }
+ }
+
List<Task<? extends Serializable>> childTasks = new ArrayList<>();
int parallelism = conf.getIntVar(HiveConf.ConfVars.EXECPARALLETHREADNUMBER);
// during incremental we will have no parallelism from replication tasks since they are event based
- // and hence are linear. To achieve prallelism we have to use copy tasks(which have no DAG) for
+ // and hence are linear. To achieve parallelism we have to use copy tasks(which have no DAG) for
// all threads except one, in execution phase.
int maxTasks = conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS);
- IncrementalLoadTasksBuilder builder = work.getIncrementalLoadTaskBuilder();
// If the total number of tasks that can be created are less than the parallelism we can achieve
// do nothing since someone is working on 1950's machine. else try to achieve max parallelism
@@ -374,17 +399,18 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
}
TaskTracker trackerForReplIncremental = new TaskTracker(calculatedMaxNumOfTasks);
Task<? extends Serializable> incrementalLoadTaskRoot =
- builder.build(driverContext, getHive(), LOG, work, trackerForReplIncremental);
+ builder.build(driverContext, getHive(), LOG, trackerForReplIncremental);
// we are adding the incremental task first so that its always processed first,
// followed by dir copy tasks if capacity allows.
childTasks.add(incrementalLoadTaskRoot);
TaskTracker trackerForCopy = new TaskTracker(maxNumOfHDFSTasks);
- childTasks
- .addAll(new ExternalTableCopyTaskBuilder(work, conf).tasks(trackerForCopy));
+ childTasks.addAll(new ExternalTableCopyTaskBuilder(work, conf).tasks(trackerForCopy));
- // either the incremental has more work or the external table file copy has more paths to process
- if (builder.hasMoreWork() || work.getPathsToCopyIterator().hasNext()) {
+ // Either the incremental has more work or the external table file copy has more paths to process.
+ // Once all the incremental events are applied and external tables file copies are done, enable
+ // bootstrap of tables if exist.
+ if (builder.hasMoreWork() || work.getPathsToCopyIterator().hasNext() || work.hasBootstrapLoadTasks()) {
DAGTraversal.traverse(childTasks, new AddDependencyToLeaves(TaskFactory.get(work, conf)));
}
this.childTasks = childTasks;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
index e86a5fa..7539281 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
@@ -17,12 +17,15 @@
*/
package org.apache.hadoop.hive.ql.exec.repl;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.DatabaseEvent;
import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.BootstrapEventsIterator;
import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.ConstraintEventsIterator;
import org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadEventsIterator;
import org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadTasksBuilder;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.ql.plan.Explain;
import org.apache.hadoop.hive.ql.session.LineageState;
import org.apache.hadoop.hive.ql.exec.Task;
@@ -44,8 +47,7 @@ public class ReplLoadWork implements Serializable {
private int loadTaskRunCount = 0;
private DatabaseEvent.State state = null;
private final transient BootstrapEventsIterator bootstrapIterator;
- private final transient IncrementalLoadEventsIterator incrementalIterator;
- private final transient IncrementalLoadTasksBuilder incrementalLoad;
+ private transient IncrementalLoadTasksBuilder incrementalLoadTasksBuilder;
private transient Task<? extends Serializable> rootTask;
private final transient Iterator<DirCopyWork> pathsToCopyIterator;
@@ -65,26 +67,36 @@ public class ReplLoadWork implements Serializable {
this.dbNameToLoadIn = dbNameToLoadIn;
rootTask = null;
if (isIncrementalDump) {
- incrementalIterator = new IncrementalLoadEventsIterator(dumpDirectory, hiveConf);
- this.bootstrapIterator = null;
- this.constraintsIterator = null;
- incrementalLoad =
+ incrementalLoadTasksBuilder =
new IncrementalLoadTasksBuilder(dbNameToLoadIn, tableNameToLoadIn, dumpDirectory,
- incrementalIterator, hiveConf, eventTo);
+ new IncrementalLoadEventsIterator(dumpDirectory, hiveConf), hiveConf, eventTo);
+
+ /*
+ * If the current incremental dump also includes bootstrap for some tables, then create iterator
+ * for the same.
+ */
+ Path incBootstrapDir = new Path(dumpDirectory, ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME);
+ FileSystem fs = incBootstrapDir.getFileSystem(hiveConf);
+ if (fs.exists(incBootstrapDir)) {
+ this.bootstrapIterator = new BootstrapEventsIterator(incBootstrapDir.toString(), dbNameToLoadIn, hiveConf);
+ this.constraintsIterator = new ConstraintEventsIterator(dumpDirectory, hiveConf);
+ } else {
+ this.bootstrapIterator = null;
+ this.constraintsIterator = null;
+ }
} else {
this.bootstrapIterator = new BootstrapEventsIterator(dumpDirectory, dbNameToLoadIn, hiveConf);
this.constraintsIterator = new ConstraintEventsIterator(dumpDirectory, hiveConf);
- incrementalIterator = null;
- incrementalLoad = null;
+ incrementalLoadTasksBuilder = null;
}
this.pathsToCopyIterator = pathsToCopyIterator.iterator();
}
- public BootstrapEventsIterator iterator() {
+ BootstrapEventsIterator bootstrapIterator() {
return bootstrapIterator;
}
- public ConstraintEventsIterator constraintIterator() {
+ ConstraintEventsIterator constraintsIterator() {
return constraintsIterator;
}
@@ -104,16 +116,17 @@ public class ReplLoadWork implements Serializable {
return state != null;
}
- public boolean isIncrementalLoad() {
- return incrementalIterator != null;
+ boolean isIncrementalLoad() {
+ return incrementalLoadTasksBuilder != null;
}
- public IncrementalLoadEventsIterator getIncrementalIterator() {
- return incrementalIterator;
+ boolean hasBootstrapLoadTasks() {
+ return (((bootstrapIterator != null) && bootstrapIterator.hasNext())
+ || ((constraintsIterator != null) && constraintsIterator.hasNext()));
}
- public IncrementalLoadTasksBuilder getIncrementalLoadTaskBuilder() {
- return incrementalLoad;
+ IncrementalLoadTasksBuilder incrementalLoadTasksBuilder() {
+ return incrementalLoadTasksBuilder;
}
public Task<? extends Serializable> getRootTask() {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java
index 60ad6d3..ef6e31f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java
@@ -22,8 +22,8 @@ import org.apache.hadoop.fs.*;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState;
import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.BootstrapEvent;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.ql.parse.EximUtil;
-import org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer;
import org.apache.hadoop.hive.ql.parse.repl.load.log.BootstrapLoadLogger;
import org.apache.hadoop.hive.ql.parse.repl.ReplLogger;
@@ -93,14 +93,9 @@ public class BootstrapEventsIterator implements Iterator<BootstrapEvent> {
+ " does not correspond to REPL LOAD expecting to load to a singular destination point.");
}
- List<FileStatus> dbsToCreate = Arrays.stream(fileStatuses).filter(f -> {
- Path metadataPath = new Path(f.getPath() + Path.SEPARATOR + EximUtil.METADATA_NAME);
- try {
- return fileSystem.exists(metadataPath);
- } catch (IOException e) {
- throw new RuntimeException("could not determine if exists : " + metadataPath.toString(), e);
- }
- }).collect(Collectors.toList());
+ List<FileStatus> dbsToCreate = Arrays.stream(fileStatuses).filter(
+ f -> !f.getPath().getName().equals(ReplUtils.CONSTRAINTS_ROOT_DIR_NAME)
+ ).collect(Collectors.toList());
dbEventsIterator = dbsToCreate.stream().map(f -> {
try {
return new DatabaseEventsIterator(f.getPath(), hiveConf);
@@ -167,7 +162,7 @@ public class BootstrapEventsIterator implements Iterator<BootstrapEvent> {
long numTables = getSubDirs(fs, dbDumpPath).length;
long numFunctions = 0;
- Path funcPath = new Path(dbDumpPath, ReplicationSemanticAnalyzer.FUNCTIONS_ROOT_DIR_NAME);
+ Path funcPath = new Path(dbDumpPath, ReplUtils.FUNCTIONS_ROOT_DIR_NAME);
if (fs.exists(funcPath)) {
numFunctions = getSubDirs(fs, funcPath).length;
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/ConstraintEventsIterator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/ConstraintEventsIterator.java
index 32518e0..73c3a7a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/ConstraintEventsIterator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/ConstraintEventsIterator.java
@@ -28,8 +28,8 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.repl.ReplDumpTask.ConstraintFileType;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.ql.parse.EximUtil;
-import org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer;
public class ConstraintEventsIterator implements Iterator<FSConstraintEvent> {
private FileStatus[] dbDirs;
@@ -47,7 +47,7 @@ public class ConstraintEventsIterator implements Iterator<FSConstraintEvent> {
private FileStatus[] listConstraintFilesInDBDir(FileSystem fs, Path dbDir, String prefix) {
try {
- return fs.listStatus(new Path(dbDir, ReplicationSemanticAnalyzer.CONSTRAINTS_ROOT_DIR_NAME), new PathFilter() {
+ return fs.listStatus(new Path(dbDir, ReplUtils.CONSTRAINTS_ROOT_DIR_NAME), new PathFilter() {
public boolean accept(Path p) {
return p.getName().startsWith(prefix);
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java
index e0f8f72..874edb9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java
@@ -39,7 +39,7 @@ import java.util.List;
import java.util.stream.Collectors;
import java.util.ArrayList;
-import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.FUNCTIONS_ROOT_DIR_NAME;
+import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.FUNCTIONS_ROOT_DIR_NAME;
class DatabaseEventsIterator implements Iterator<BootstrapEvent> {
private static Logger LOG = LoggerFactory.getLogger(DatabaseEventsIterator.class);
@@ -122,7 +122,7 @@ class DatabaseEventsIterator implements Iterator<BootstrapEvent> {
while (remoteIterator.hasNext()) {
LocatedFileStatus next = remoteIterator.next();
// we want to skip this file, this also means there cant be a table with name represented
- // by constantReplExternalTables.FILE_NAME
+ // by constant ReplExternalTables.FILE_NAME
if(next.getPath().toString().endsWith(ReplExternalTables.FILE_NAME)) {
continue;
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java
index 076165a..99a8d5d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java
@@ -123,9 +123,6 @@ public class FSTableEvent implements TableEvent {
tableDesc.setExternal(true);
}
tableDesc.setReplicationSpec(replicationSpec());
- if (table.getTableType() == TableType.EXTERNAL_TABLE) {
- tableDesc.setExternal(true);
- }
return tableDesc;
} catch (Exception e) {
throw new SemanticException(e);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
index ad41276..65b7aa0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
@@ -159,13 +159,6 @@ public class LoadPartitions {
}
private TaskTracker forNewTable() throws Exception {
- Database parentDb = context.hiveDb.getDatabase(tableDesc.getDatabaseName());
- // If table doesn't exist, allow creating a new one only if the database state is older than the update.
- // This in-turn applicable for partitions creation as well.
- if ((parentDb != null) && (!event.replicationSpec().allowReplacementInto(parentDb.getParameters()))) {
- return tracker;
- }
-
Iterator<AddPartitionDesc> iterator = event.partitionDescriptions(tableDesc).iterator();
while (iterator.hasNext() && tracker.canAddMoreTasks()) {
AddPartitionDesc currentPartitionDesc = iterator.next();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadEventsIterator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadEventsIterator.java
index 5638ace..f2c8e8f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadEventsIterator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadEventsIterator.java
@@ -22,7 +22,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.ql.parse.repl.load.EventDumpDirComparator;
import java.io.IOException;
@@ -42,7 +42,7 @@ public class IncrementalLoadEventsIterator implements Iterator<FileStatus> {
public IncrementalLoadEventsIterator(String loadPath, HiveConf conf) throws IOException {
Path eventPath = new Path(loadPath);
FileSystem fs = eventPath.getFileSystem(conf);
- eventDirs = fs.listStatus(eventPath, EximUtil.getDirectoryFilter(fs));
+ eventDirs = fs.listStatus(eventPath, ReplUtils.getEventsDirectoryFilter(fs));
if ((eventDirs == null) || (eventDirs.length == 0)) {
currentIndex = 0;
numEvents = 0;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
index 7ae33e3..5302191 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.hive.metastore.api.ReplLastIdInfo;
import org.apache.hadoop.hive.ql.DriverContext;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
-import org.apache.hadoop.hive.ql.exec.repl.ReplLoadWork;
import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork;
import org.apache.hadoop.hive.ql.exec.repl.util.AddDependencyToLeaves;
import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker;
@@ -92,7 +91,7 @@ public class IncrementalLoadTasksBuilder {
}
public Task<? extends Serializable> build(DriverContext driverContext, Hive hive, Logger log,
- ReplLoadWork loadWork, TaskTracker tracker) throws Exception {
+ TaskTracker tracker) throws Exception {
Task<? extends Serializable> evTaskRoot = TaskFactory.get(new DependencyCollectionWork());
Task<? extends Serializable> taskChainTail = evTaskRoot;
Long lastReplayedEvent = null;
@@ -173,9 +172,6 @@ public class IncrementalLoadTasksBuilder {
this.log.debug("Added {}:{} as a precursor of barrier task {}:{}",
taskChainTail.getClass(), taskChainTail.getId(),
barrierTask.getClass(), barrierTask.getId());
- if (loadWork.getPathsToCopyIterator().hasNext()) {
- taskChainTail.addDependentTask(TaskFactory.get(loadWork, conf));
- }
}
return evTaskRoot;
}
@@ -396,6 +392,10 @@ public class IncrementalLoadTasksBuilder {
return tasks;
}
+ public Long eventTo() {
+ return eventTo;
+ }
+
public static long getNumIteration() {
return numIteration;
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
index 4fdd12a..91eeb13 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hive.ql.exec.repl.util;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
@@ -56,12 +58,19 @@ import static org.apache.hadoop.hive.ql.util.HiveStrictManagedMigration.TableMig
public class ReplUtils {
+ public static final String LAST_REPL_ID_KEY = "hive.repl.last.repl.id";
public static final String REPL_CHECKPOINT_KEY = "hive.repl.ckpt.key";
// write id allocated in the current execution context which will be passed through config to be used by different
// tasks.
public static final String REPL_CURRENT_TBL_WRITE_ID = "hive.repl.current.table.write.id";
+ public static final String FUNCTIONS_ROOT_DIR_NAME = "_functions";
+ public static final String CONSTRAINTS_ROOT_DIR_NAME = "_constraints";
+
+ // Root directory for dumping bootstrapped tables along with incremental events dump.
+ public static final String INC_BOOTSTRAP_ROOT_DIR_NAME = "_bootstrap";
+
// Migrating to transactional tables in bootstrap load phase.
// It is enough to copy all the original files under base_1 dir and so write-id is hardcoded to 1.
public static final Long REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID = 1L;
@@ -167,4 +176,15 @@ public class ReplUtils {
}
return taskList;
}
+
+ // Path filters to filter only events (directories) excluding "_bootstrap"
+ public static PathFilter getEventsDirectoryFilter(final FileSystem fs) {
+ return p -> {
+ try {
+ return fs.isDirectory(p) && !p.getName().equalsIgnoreCase(ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ };
+ }
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index 4e7595c..2036d69 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -85,10 +85,6 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
private static String testInjectDumpDir = null; // unit tests can overwrite this to affect default dump behaviour
private static final String dumpSchema = "dump_dir,last_repl_id#string,string";
- public static final String LAST_REPL_ID_KEY = "hive.repl.last.repl.id";
- public static final String FUNCTIONS_ROOT_DIR_NAME = "_functions";
- public static final String CONSTRAINTS_ROOT_DIR_NAME = "_constraints";
-
ReplicationSemanticAnalyzer(QueryState queryState) throws SemanticException {
super(queryState);
this.db = super.db;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/HiveWrapper.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/HiveWrapper.java
index 2fa3676..d01e24c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/HiveWrapper.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/HiveWrapper.java
@@ -62,6 +62,10 @@ public class HiveWrapper {
getColStats));
}
+ public Tuple<Table> table(final Table tblObj) throws HiveException {
+ return new Tuple<>(functionForSpec, () -> tblObj);
+ }
+
public static class Tuple<T> {
interface Function<T> {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
index 1eee3fd..4cd4d70 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
@@ -178,7 +178,7 @@ public class TableExport {
}
private boolean shouldExport() {
- return Utils.shouldReplicate(replicationSpec, tableSpec.tableHandle, conf);
+ return Utils.shouldReplicate(replicationSpec, tableSpec.tableHandle, false, conf);
}
/**
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
index 21df63c..3cac813 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
@@ -166,8 +166,8 @@ public class Utils {
* validates if a table can be exported, similar to EximUtil.shouldExport with few replication
* specific checks.
*/
- public static Boolean shouldReplicate(ReplicationSpec replicationSpec, Table tableHandle,
- HiveConf hiveConf) {
+ public static boolean shouldReplicate(ReplicationSpec replicationSpec, Table tableHandle,
+ boolean isEventDump, HiveConf hiveConf) {
if (replicationSpec == null) {
replicationSpec = new ReplicationSpec();
}
@@ -187,14 +187,21 @@ public class Utils {
}
if (MetaStoreUtils.isExternalTable(tableHandle.getTTable())) {
- return hiveConf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES) || replicationSpec.isMetadataOnly();
+ boolean shouldReplicateExternalTables = hiveConf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES)
+ || replicationSpec.isMetadataOnly();
+ if (isEventDump) {
+ // Skip dumping of events related to external tables if bootstrap is enabled on it.
+ shouldReplicateExternalTables = shouldReplicateExternalTables
+ && !hiveConf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES);
+ }
+ return shouldReplicateExternalTables;
}
}
return true;
}
public static boolean shouldReplicate(NotificationEvent tableForEvent,
- ReplicationSpec replicationSpec, Hive db, HiveConf hiveConf) {
+ ReplicationSpec replicationSpec, Hive db, boolean isEventDump, HiveConf hiveConf) {
Table table;
try {
table = db.getTable(tableForEvent.getDbName(), tableForEvent.getTableName());
@@ -204,7 +211,7 @@ public class Utils {
.getTableName(), e);
return false;
}
- return shouldReplicate(replicationSpec, table, hiveConf);
+ return shouldReplicate(replicationSpec, table, isEventDump, hiveConf);
}
static List<Path> getDataPathList(Path fromPath, ReplicationSpec replicationSpec, HiveConf conf)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractConstraintEventHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractConstraintEventHandler.java
index 672f402..d938cc1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractConstraintEventHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractConstraintEventHandler.java
@@ -31,6 +31,7 @@ abstract class AbstractConstraintEventHandler<T extends EventMessage> extends Ab
event,
withinContext.replicationSpec,
withinContext.db,
+ true,
withinContext.hiveConf
);
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java
index 415e954..0756f59 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java
@@ -67,7 +67,7 @@ class AddPartitionHandler extends AbstractEventHandler {
}
final Table qlMdTable = new Table(tobj);
- if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, withinContext.hiveConf)) {
+ if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, true, withinContext.hiveConf)) {
return;
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java
index 1b91e3e..e59bdf6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java
@@ -100,7 +100,7 @@ class AlterPartitionHandler extends AbstractEventHandler<AlterPartitionMessage>
}
Table qlMdTable = new Table(tableObject);
- if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, withinContext.hiveConf)) {
+ if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, true, withinContext.hiveConf)) {
return;
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java
index ff43399..4deb551 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java
@@ -87,7 +87,7 @@ class AlterTableHandler extends AbstractEventHandler<AlterTableMessage> {
Table qlMdTableBefore = new Table(before);
if (!Utils
- .shouldReplicate(withinContext.replicationSpec, qlMdTableBefore, withinContext.hiveConf)) {
+ .shouldReplicate(withinContext.replicationSpec, qlMdTableBefore, true, withinContext.hiveConf)) {
return;
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java
index a8bf671..8a838db 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java
@@ -54,7 +54,7 @@ class CreateTableHandler extends AbstractEventHandler<CreateTableMessage> {
Table qlMdTable = new Table(tobj);
- if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, withinContext.hiveConf)) {
+ if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, true, withinContext.hiveConf)) {
return;
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java
index f029fee..1bcd529 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java
@@ -58,7 +58,7 @@ class InsertHandler extends AbstractEventHandler<InsertMessage> {
withinContext.replicationSpec.setNoop(true);
}
- if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, withinContext.hiveConf)) {
+ if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, true, withinContext.hiveConf)) {
return;
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdatePartColStatHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdatePartColStatHandler.java
index f3f00c5..79e1361 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdatePartColStatHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdatePartColStatHandler.java
@@ -59,7 +59,7 @@ class UpdatePartColStatHandler extends AbstractEventHandler<UpdatePartitionColum
return;
}
- if (!Utils.shouldReplicate(withinContext.replicationSpec, new Table(tableObj),
+ if (!Utils.shouldReplicate(withinContext.replicationSpec, new Table(tableObj), true,
withinContext.hiveConf)) {
return;
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdateTableColStatHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdateTableColStatHandler.java
index bd8182d..ca9af5e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdateTableColStatHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdateTableColStatHandler.java
@@ -38,7 +38,7 @@ class UpdateTableColStatHandler extends AbstractEventHandler<UpdateTableColumnSt
public void handle(Context withinContext) throws Exception {
LOG.info("Processing#{} UpdateTableColumnStat message : {}", fromEventId(), eventMessageAsJSON);
Table qlMdTable = new Table(eventMessage.getTableObject());
- if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, withinContext.hiveConf)) {
+ if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, true, withinContext.hiveConf)) {
return;
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java
index dac20d2..4d8ffe9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java
@@ -52,7 +52,7 @@ public class TableSerializer implements JsonWriter.Serializer {
@Override
public void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvider)
throws SemanticException, IOException {
- if (!Utils.shouldReplicate(additionalPropertiesProvider, tableHandle, hiveConf)) {
+ if (!Utils.shouldReplicate(additionalPropertiesProvider, tableHandle, false, hiveConf)) {
return;
}