You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ay...@apache.org on 2022/04/17 06:03:04 UTC
[hive] branch master updated: HIVE-25921: Overwrite table metadata for bootstraped tables. (#2993). (Ayush Saxena, reviewed by Denys Kuzmenko)
This is an automated email from the ASF dual-hosted git repository.
ayushsaxena pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 61d4ff2be4 HIVE-25921: Overwrite table metadata for bootstraped tables. (#2993). (Ayush Saxena, reviewed by Denys Kuzmenko)
61d4ff2be4 is described below
commit 61d4ff2be48b20df9fd24692c372ee9c2606babe
Author: Ayush Saxena <ay...@apache.org>
AuthorDate: Sun Apr 17 11:32:51 2022 +0530
HIVE-25921: Overwrite table metadata for bootstraped tables. (#2993). (Ayush Saxena, reviewed by Denys Kuzmenko)
---
.../parse/TestReplicationOptimisedBootstrap.java | 133 +++++++++++++++++++++
.../ql/ddl/table/create/CreateTableOperation.java | 3 +-
.../hadoop/hive/ql/exec/repl/ReplLoadTask.java | 11 +-
.../hadoop/hive/ql/exec/repl/ReplLoadWork.java | 21 +++-
.../repl/bootstrap/load/table/LoadPartitions.java | 33 +++--
.../exec/repl/bootstrap/load/table/LoadTable.java | 22 ++--
.../hadoop/hive/ql/parse/ReplicationSpec.java | 9 ++
.../hadoop/hive/ql/plan/ImportTableDesc.java | 4 +
8 files changed, 207 insertions(+), 29 deletions(-)
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
index 5782d20640..537d860434 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
@@ -934,6 +934,139 @@ public class TestReplicationOptimisedBootstrap extends BaseReplicationAcrossInst
.verifyResults(new String[] { "beejing", "chengdu" });
}
+ @Test
+ public void testOverwriteDuringBootstrap() throws Throwable {
+ List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true);
+ withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'");
+
+ // Do a bootstrap cycle.
+ primary.dump(primaryDbName, withClause);
+ replica.load(replicatedDbName, primaryDbName, withClause);
+
+ // Create some partitioned and non partitioned tables and do a dump & load.
+ WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+ .run("create table t1 (id int)")
+ .run("insert into table t1 values (1)")
+ .run("insert into table t1 values (2),(3),(4)")
+ .run("create table t2 (id int)")
+ .run("insert into table t2 values (15),(16)")
+ .run("create table t3 (place string) partitioned by (country string)")
+ .run("insert into table t3 partition(country='india') values ('chennai')")
+ .run("insert into table t3 partition(country='us') values ('new york')")
+ .run("create table t4 (place string) partitioned by (country string)")
+ .run("insert into table t4 partition(country='china') values ('beejing')")
+ .run("insert into table t4 partition(country='nepal') values ('kathmandu')")
+ .dump(primaryDbName, withClause);
+
+ // Do the load and check all the external & managed tables are present.
+ replica.load(replicatedDbName, primaryDbName, withClause)
+ .run("repl status " + replicatedDbName)
+ .verifyResult(tuple.lastReplicationId)
+ .run("use " + replicatedDbName)
+ .run("show tables like 't1'")
+ .verifyResult("t1")
+ .run("show tables like 't2'")
+ .verifyResult("t2")
+ .run("show tables like 't3'")
+ .verifyResult("t3")
+ .run("show tables like 't4'")
+ .verifyResult("t4")
+ .verifyReplTargetProperty(replicatedDbName);
+
+ // Prepare for reverse bootstrap.
+ // Do some modifications on original source cluster. The diff becomes(tnew_managed, t1, t2, t3)
+ // Create one new table: It should get dropped. (tnew_managed)
+ // Create some new partition: The new partition should get dropped. (t2: france)
+ // Modify a table, the data should get overwritten. (t1)
+ // Modify a partition, the data should be overwritten. (t3: india value delhi)
+ // Drop a table, the table should be recreated(t2)
+ primary.run("use " + primaryDbName)
+ .run("create table tnew_managed (id int)")
+ .run("insert into table t1 values (25)")
+ .run("insert into table tnew_managed values (110)")
+ .run("insert into table t3 partition(country='france') values ('lyon')")
+ .run("insert into table t3 partition(country='india') values ('delhi')")
+ .run("drop table t2");
+
+ // Do some modifications on the target cluster. (t1, t2, t3: bootstrap & t2, t4, t5: incremental)
+ replica.run("use " + replicatedDbName)
+ .run("insert into table t1 values (101)")
+ .run("insert into table t1 values (121),(211)")
+ .run("insert into table t3 partition(country='india') values ('lucknow')")
+ .run("insert into table t2 values (11)")
+ .run("insert into table t4 partition(country='india') values ('kanpur')")
+ .run("create table t5 (place string) partitioned by (country string)")
+ .run("insert into table t5 partition(country='china') values ('beejing')")
+ .run("insert into table t4 partition(country='china') values ('Shanghai')");
+
+ // Prepare for reverse replication.
+ DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
+ Path newReplDir = new Path(replica.repldDir + "1");
+ replicaFs.mkdirs(newReplDir);
+ withClause = ReplicationTestUtils.includeExternalTableClause(true);
+ withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + newReplDir + "'");
+
+ // Do a reverse dump
+ tuple = replica.dump(replicatedDbName, withClause);
+
+ // Check the event ack file got created.
+ assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + " doesn't exist",
+ replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
+
+ Path dumpPath = new Path(tuple.dumpLocation);
+
+ // Do a load, this should create a table_diff_complete directory
+ primary.load(primaryDbName, replicatedDbName, withClause);
+
+ // Check the table diff directory exist.
+ assertTrue(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY).toString() + " doesn't exist",
+ replicaFs.exists(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY)));
+
+ // Check the table diff has all the modified table, including the dropped and empty ones
+ HashSet<String> tableDiffEntries = getTablesFromTableDiffFile(dumpPath, conf);
+ assertTrue("Table Diff Contains " + tableDiffEntries, tableDiffEntries
+ .containsAll(Arrays.asList("tnew_managed", "t1", "t2", "t3")));
+
+ // Do a reverse second dump, this should do a bootstrap dump for the tables in the table_diff and incremental for
+ // rest.
+ tuple = replica.dump(replicatedDbName, withClause);
+
+ String hiveDumpDir = tuple.dumpLocation + File.separator + ReplUtils.REPL_HIVE_BASE_DIR;
+ // _bootstrap directory should be created as bootstrap enabled on external tables.
+ Path dumpPath1 = new Path(hiveDumpDir, INC_BOOTSTRAP_ROOT_DIR_NAME +"/metadata/" + replicatedDbName);
+ FileStatus[] listStatus =
+ dumpPath1.getFileSystem(conf).listStatus(dumpPath1);
+ ArrayList<String> tablesBootstrapped = new ArrayList<String>();
+ for (FileStatus file : listStatus) {
+ tablesBootstrapped.add(file.getPath().getName());
+ }
+
+ assertTrue(tablesBootstrapped.containsAll(Arrays.asList("t1", "t2", "t3")));
+
+ // Do a reverse load, this should do a bootstrap load for the tables in table_diff and incremental for the rest.
+ primary.load(primaryDbName, replicatedDbName, withClause);
+
+ primary.run("use " + primaryDbName)
+ .run("select id from t1")
+ .verifyResults(new String[] { "1", "2", "3", "4", "101", "121", "211" })
+ .run("select id from t2")
+ .verifyResults(new String[] { "15", "16", "11" })
+ .run("select place from t3 where country = 'india'")
+ .verifyResults(new String[] {"chennai", "lucknow" })
+ .run("select place from t3 where country = 'us'")
+ .verifyResults(new String[] {"new york" })
+ .run("select place from t3 where country = 'france'")
+ .verifyFailure(new String[] { "lyon" })
+ .run("select place from t4 where country = 'china'")
+ .verifyResults(new String[] { "beejing", "Shanghai" })
+ .run("select place from t4 where country = 'india'")
+ .verifyResults(new String[] { "kanpur" })
+ .run("select place from t5 where country = 'china'")
+ .verifyResults(new String[] { "beejing" })
+ .run("show tables like 'tnew_managed'")
+ .verifyFailure(new String[]{"tnew_managed"});
+ }
+
@NotNull
private List<String> setUpFirstIterForOptimisedBootstrap() throws Throwable {
List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/create/CreateTableOperation.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/create/CreateTableOperation.java
index ef327f4435..6dacbb0b7e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/create/CreateTableOperation.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/create/CreateTableOperation.java
@@ -66,7 +66,8 @@ public class CreateTableOperation extends DDLOperation<CreateTableDesc> {
Table existingTable = context.getDb().getTable(tbl.getDbName(), tbl.getTableName(), false);
if (existingTable != null) {
Map<String, String> dbParams = context.getDb().getDatabase(existingTable.getDbName()).getParameters();
- if (desc.getReplicationSpec().allowEventReplacementInto(dbParams)) {
+ if (desc.getReplicationSpec().allowEventReplacementInto(dbParams) || desc.getReplicationSpec()
+ .isForceOverwrite()) {
desc.setReplaceMode(true); // we replace existing table.
// If location of an existing managed table is changed, then need to delete the old location if exists.
// This scenario occurs when a managed table is converted into external table at source. In this case,
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
index f3b0a0ecd6..a8a333f640 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
@@ -311,7 +311,7 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
} else {
LoadTable loadTable = new LoadTable(tableEvent, loadContext, iterator.replLogger(), tableContext,
loadTaskTracker, work.getMetricCollector());
- tableTracker = loadTable.tasks(work.isIncrementalLoad());
+ tableTracker = loadTable.tasks(work.isIncrementalLoad(), work.isSecondFailover);
}
setUpDependencies(dbTracker, tableTracker);
@@ -336,7 +336,7 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
// for a table we explicitly try to load partitions as there is no separate partitions events.
LoadPartitions loadPartitions =
new LoadPartitions(loadContext, iterator.replLogger(), loadTaskTracker, tableEvent,
- work.dbNameToLoadIn, tableContext, work.getMetricCollector());
+ work.dbNameToLoadIn, tableContext, work.getMetricCollector(), work.tablesToBootstrap);
TaskTracker partitionsTracker = loadPartitions.tasks();
partitionsPostProcessing(iterator, scope, loadTaskTracker, tableTracker,
partitionsTracker);
@@ -445,7 +445,7 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
LoadPartitions loadPartitions =
new LoadPartitions(loadContext, iterator.replLogger(), tableContext, loadTaskTracker,
event.asTableEvent(), work.dbNameToLoadIn, event.lastPartitionReplicated(), work.getMetricCollector(),
- event.lastPartSpecReplicated(), event.lastStageReplicated());
+ event.lastPartSpecReplicated(), event.lastStageReplicated(), getWork().tablesToBootstrap);
/*
the tableTracker here should be a new instance and not an existing one as this can
only happen when we break in between loading partitions.
@@ -733,10 +733,11 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
return 0;
}
} else if (work.isSecondFailover) {
- // DROP the tables to be bootstrapped.
+ // DROP the tables extra on target, which are not on source cluster.
Hive db = getHive();
- for (String table : work.tablesToBootstrap) {
+ for (String table : work.tablesToDrop) {
+ LOG.info("Dropping table {} for optimised bootstarap", work.dbNameToLoadIn + "." + table);
db.dropTable(work.dbNameToLoadIn + "." + table, true);
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
index 271862aa59..b6072912c9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.exec.repl;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.collections4.ListUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -51,10 +52,12 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static org.apache.hadoop.hive.conf.Constants.SCHEDULED_QUERY_EXECUTIONID;
import static org.apache.hadoop.hive.conf.Constants.SCHEDULED_QUERY_SCHEDULENAME;
@@ -93,7 +96,8 @@ public class ReplLoadWork implements Serializable, ReplLoadWorkMBean {
private boolean shouldFailover;
public boolean isFirstFailover;
public boolean isSecondFailover;
- public List<String> tablesToBootstrap = new ArrayList<>();
+ public List<String> tablesToBootstrap = new LinkedList<>();
+ public List<String> tablesToDrop = new LinkedList<>();
/*
these are sessionState objects that are copied over to work to allow for parallel execution.
@@ -157,10 +161,17 @@ public class ReplLoadWork implements Serializable, ReplLoadWorkMBean {
Path incBootstrapDir = new Path(dumpDirectory, ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME);
if (fs.exists(incBootstrapDir)) {
if (isSecondFailover) {
- String[] tableList = getBootstrapTableList(dumpDirParent, hiveConf);
- tablesToBootstrap = Arrays.asList(tableList);
- LOG.info("Optimised bootstrap for database {} with load with bootstrap table list as {}", dbNameToLoadIn,
- tablesToBootstrap);
+ String[] bootstrappedTables = getBootstrapTableList(new Path(dumpDirectory).getParent(), hiveConf);
+ LOG.info("Optimised bootstrap load for database {} with initial bootstrapped table list as {}",
+ dbNameToLoadIn, tablesToBootstrap);
+ // Get list of tables bootstrapped.
+ Path tableMetaPath = new Path(incBootstrapDir, EximUtil.METADATA_PATH_NAME + "/" + sourceDbName);
+ tablesToBootstrap =
+ Stream.of(fs.listStatus(tableMetaPath)).map(st -> st.getPath().getName()).collect(Collectors.toList());
+ List<String> tableList = Arrays.asList(bootstrappedTables);
+ tablesToDrop = ListUtils.subtract(tableList, tablesToBootstrap);
+ LOG.info("Optimised bootstrap for database {} with drop table list as {} and bootstrap table list as {}",
+ dbNameToLoadIn, tablesToDrop, tablesToBootstrap);
}
this.bootstrapIterator = new BootstrapEventsIterator(
new Path(incBootstrapDir, EximUtil.METADATA_PATH_NAME).toString(), dbNameToLoadIn, true,
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
index bcbf20c53e..b043426c49 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent;
import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState;
import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker;
import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context;
+import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
@@ -80,21 +81,19 @@ public class LoadPartitions {
private final ReplicationMetricCollector metricCollector;
private final ImportTableDesc tableDesc;
+ private final List<String> tablesToBootstrap;
private Table table;
- public LoadPartitions(Context context, ReplLogger replLogger, TaskTracker tableTracker,
- TableEvent event, String dbNameToLoadIn,
- TableContext tableContext, ReplicationMetricCollector metricCollector) throws HiveException {
- this(context, replLogger, tableContext, tableTracker, event, dbNameToLoadIn, null,
- metricCollector, null, PartitionState.Stage.PARTITION);
+ public LoadPartitions(Context context, ReplLogger replLogger, TaskTracker tableTracker, TableEvent event, String dbNameToLoadIn,
+ TableContext tableContext, ReplicationMetricCollector metricCollector, List<String> tablesToBootstrap) throws HiveException {
+ this(context, replLogger, tableContext, tableTracker, event, dbNameToLoadIn, null, metricCollector, null,
+ PartitionState.Stage.PARTITION, tablesToBootstrap);
}
- public LoadPartitions(Context context, ReplLogger replLogger, TableContext tableContext,
- TaskTracker limiter, TableEvent event, String dbNameToLoadIn,
- AlterTableAddPartitionDesc lastReplicatedPartition,
- ReplicationMetricCollector metricCollector,
- AlterTableAddPartitionDesc.PartitionDesc lastReplicatedPartitionDesc,
- ReplicationState.PartitionState.Stage lastReplicatedStage) throws HiveException {
+ public LoadPartitions(Context context, ReplLogger replLogger, TableContext tableContext, TaskTracker limiter,
+ TableEvent event, String dbNameToLoadIn, AlterTableAddPartitionDesc lastReplicatedPartition,
+ ReplicationMetricCollector metricCollector, AlterTableAddPartitionDesc.PartitionDesc lastReplicatedPartitionDesc,
+ PartitionState.Stage lastReplicatedStage, List<String> tablesToBootstrap) throws HiveException {
this.tracker = new TaskTracker(limiter);
this.event = event;
this.context = context;
@@ -106,6 +105,7 @@ public class LoadPartitions {
this.metricCollector = metricCollector;
this.lastReplicatedPartitionDesc = lastReplicatedPartitionDesc;
this.lastReplicatedStage = lastReplicatedStage;
+ this.tablesToBootstrap = tablesToBootstrap;
}
public TaskTracker tasks() throws Exception {
@@ -134,6 +134,17 @@ public class LoadPartitions {
} else {
// existing
if (table.isPartitioned()) {
+ if (tablesToBootstrap.stream().anyMatch(table.getTableName()::equalsIgnoreCase)) {
+ Hive hiveDb = Hive.get(context.hiveConf);
+ // Collect the non-existing partitions to drop.
+ List<Partition> partitions = hiveDb.getPartitions(table);
+ List<String> newParts = event.partitions(tableDesc);
+ for (Partition part : partitions) {
+ if (!newParts.contains(part.getName())) {
+ hiveDb.dropPartition(table.getDbName(), table.getTableName(), part.getValues(), true);
+ }
+ }
+ }
List<AlterTableAddPartitionDesc> partitionDescs = event.partitionDescriptions(tableDesc);
if (!event.replicationSpec().isMetadataOnly() && !partitionDescs.isEmpty()) {
updateReplicationState(initialReplicationState());
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
index 824d8c70a0..54ca6e02fc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
@@ -82,7 +82,7 @@ public class LoadTable {
this.metricCollector = metricCollector;
}
- public TaskTracker tasks(boolean isBootstrapDuringInc) throws Exception {
+ public TaskTracker tasks(boolean isBootstrapDuringInc, boolean isSecondFailover) throws Exception {
// Path being passed to us is a table dump location. We go ahead and load it in as needed.
// If tblName is null, then we default to the table name specified in _metadata, which is good.
// or are both specified, in which case, that's what we are intended to create the new table as.
@@ -95,6 +95,10 @@ public class LoadTable {
ImportTableDesc tableDesc = event.tableDesc(dbName);
Table table = ImportSemanticAnalyzer.tableIfExists(tableDesc, context.hiveDb);
+ if (isSecondFailover) {
+ tableDesc.setForceOverwriteTable();
+ }
+
// Normally, on import, trying to create a table or a partition in a db that does not yet exist
// is a error condition. However, in the case of a REPL LOAD, it is possible that we are trying
// to create tasks to create a table inside a db that as-of-now does not exist, but there is
@@ -112,7 +116,7 @@ public class LoadTable {
}
Task<?> tblRootTask = null;
- ReplLoadOpType loadTblType = getLoadTableType(table, isBootstrapDuringInc);
+ ReplLoadOpType loadTblType = getLoadTableType(table, isBootstrapDuringInc, isSecondFailover);
switch (loadTblType) {
case LOAD_NEW:
break;
@@ -138,7 +142,7 @@ public class LoadTable {
or in the case of an unpartitioned table. In all other cases, it should
behave like a noop or a pure MD alter.
*/
- newTableTasks(tableDesc, tblRootTask, tableLocationTuple);
+ newTableTasks(tableDesc, tblRootTask, tableLocationTuple, isSecondFailover && table != null);
// Set Checkpoint task as dependant to create table task. So, if same dump is retried for
// bootstrap, we skip current table update.
@@ -159,9 +163,13 @@ public class LoadTable {
return tracker;
}
- private ReplLoadOpType getLoadTableType(Table table, boolean isBootstrapDuringInc)
+ private ReplLoadOpType getLoadTableType(Table table, boolean isBootstrapDuringInc, boolean isSecondFailover)
throws InvalidOperationException, HiveException {
- if (table == null) {
+ // In case of second iteration of the optimised bootstrap, we don't drop & re create the table, instead we
+ // re-write the metadata, in order to prevent deletion of the data in the target cluster. So, that while copying
+ // data from the source cluster, we just operate on the modified/missing/additional files and can get rid of
+ // copying the files which are already there on both cluster and are same.
+ if (table == null || isSecondFailover) {
return ReplLoadOpType.LOAD_NEW;
}
@@ -180,11 +188,11 @@ public class LoadTable {
return ReplLoadOpType.LOAD_REPLACE;
}
- private void newTableTasks(ImportTableDesc tblDesc, Task<?> tblRootTask, TableLocationTuple tuple)
+ private void newTableTasks(ImportTableDesc tblDesc, Task<?> tblRootTask, TableLocationTuple tuple, boolean setLoc)
throws Exception {
Table table = tblDesc.toTable(context.hiveConf);
ReplicationSpec replicationSpec = event.replicationSpec();
- if (!tblDesc.isExternal()) {
+ if (!tblDesc.isExternal() && !setLoc) {
tblDesc.setLocation(null);
}
Task<?> createTableTask =
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
index dd2224c64e..48d3f5cec7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
@@ -49,6 +49,7 @@ public class ReplicationSpec {
//Determine if replication is done using repl or export-import
private boolean isRepl = false;
private boolean isMetadataOnlyForExternalTables = false;
+ private boolean isForceOverwrite = false;
public void setInReplicationScope(boolean inReplicationScope) {
isInReplicationScope = inReplicationScope;
@@ -418,4 +419,12 @@ public class ReplicationSpec {
public void setRepl(boolean repl) {
isRepl = repl;
}
+
+ public boolean isForceOverwrite() {
+ return isForceOverwrite;
+ }
+
+ public void setForceOverwrite(boolean forceOverwrite) {
+ isForceOverwrite = forceOverwrite;
+ }
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ImportTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ImportTableDesc.java
index 232ea09035..23128ebfed 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ImportTableDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ImportTableDesc.java
@@ -190,4 +190,8 @@ public class ImportTableDesc {
public Long getReplWriteId() {
return this.createTblDesc.getReplWriteId();
}
+
+ public void setForceOverwriteTable(){
+ this.createTblDesc.getReplicationSpec().setForceOverwrite(true);
+ }
}