You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sa...@apache.org on 2018/05/11 16:51:14 UTC
hive git commit: HIVE-19435: Incremental replication cause data loss
if a table is dropped followed by create and insert-into with different
partition type (Sankar Hariappan, reviewed by Mahesh Kumar Behera,
Thejas M Nair)
Repository: hive
Updated Branches:
refs/heads/master 598dcf40f -> 9cfc15a86
HIVE-19435: Incremental replication cause data loss if a table is dropped followed by create and insert-into with different partition type (Sankar Hariappan, reviewed by Mahesh Kumar Behera, Thejas M Nair)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/9cfc15a8
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/9cfc15a8
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/9cfc15a8
Branch: refs/heads/master
Commit: 9cfc15a86faebb5cb654afc1bfc0c47d347b7ae2
Parents: 598dcf4
Author: Sankar Hariappan <sa...@apache.org>
Authored: Fri May 11 22:20:06 2018 +0530
Committer: Sankar Hariappan <sa...@apache.org>
Committed: Fri May 11 22:20:06 2018 +0530
----------------------------------------------------------------------
...TestReplicationScenariosAcrossInstances.java | 58 ++++++++++++++++++++
.../hive/ql/parse/ImportSemanticAnalyzer.java | 38 ++++++++++---
2 files changed, 88 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/9cfc15a8/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
index 70e1aa7..df9bde0 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
@@ -675,4 +675,62 @@ public class TestReplicationScenariosAcrossInstances {
.run("select id from table2 order by id")
.verifyResults(new String[] {"2"});
}
+
+ @Test
+ public void testIncrementalReplWithDropAndCreateTableDifferentPartitionTypeAndInsert() throws Throwable {
+ // Bootstrap dump with empty db
+ WarehouseInstance.Tuple bootstrapTuple = primary.dump(primaryDbName, null);
+
+ // Bootstrap load in replica
+ replica.load(replicatedDbName, bootstrapTuple.dumpLocation)
+ .status(replicatedDbName)
+ .verifyResult(bootstrapTuple.lastReplicationId);
+
+ // First incremental dump
+ WarehouseInstance.Tuple firstIncremental = primary.run("use " + primaryDbName)
+ .run("create table table1 (id int) partitioned by (country string)")
+ .run("create table table2 (id int)")
+ .run("create table table3 (id int) partitioned by (country string)")
+ .run("insert into table1 partition(country='india') values(1)")
+ .run("insert into table2 values(2)")
+ .run("insert into table3 partition(country='india') values(3)")
+ .dump(primaryDbName, bootstrapTuple.lastReplicationId);
+
+ // Second incremental dump
+ WarehouseInstance.Tuple secondIncremental = primary.run("use " + primaryDbName)
+ .run("drop table table1")
+ .run("drop table table2")
+ .run("drop table table3")
+ .run("create table table1 (id int)")
+ .run("insert into table1 values (10)")
+ .run("create table table2 (id int) partitioned by (country string)")
+ .run("insert into table2 partition(country='india') values(20)")
+ .run("create table table3 (id int) partitioned by (name string, rank int)")
+ .run("insert into table3 partition(name='adam', rank=100) values(30)")
+ .dump(primaryDbName, firstIncremental.lastReplicationId);
+
+ // First incremental load
+ replica.load(replicatedDbName, firstIncremental.dumpLocation)
+ .status(replicatedDbName)
+ .verifyResult(firstIncremental.lastReplicationId)
+ .run("use " + replicatedDbName)
+ .run("select id from table1")
+ .verifyResults(new String[] {"1"})
+ .run("select * from table2")
+ .verifyResults(new String[] {"2"})
+ .run("select id from table3")
+ .verifyResults(new String[] {"3"});
+
+ // Second incremental load
+ replica.load(replicatedDbName, secondIncremental.dumpLocation)
+ .status(replicatedDbName)
+ .verifyResult(secondIncremental.lastReplicationId)
+ .run("use " + replicatedDbName)
+ .run("select * from table1")
+ .verifyResults(new String[] {"10"})
+ .run("select id from table2")
+ .verifyResults(new String[] {"20"})
+ .run("select id from table3")
+ .verifyResults(new String[] {"30"});
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/9cfc15a8/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index e597872..682b641 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@ -891,14 +891,14 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
}
}
- private static Table createNewTableMetadataObject(ImportTableDesc tblDesk)
+ private static Table createNewTableMetadataObject(ImportTableDesc tblDesc)
throws SemanticException {
- Table newTable = new Table(tblDesk.getDatabaseName(), tblDesk.getTableName());
+ Table newTable = new Table(tblDesc.getDatabaseName(), tblDesc.getTableName());
//so that we know the type of table we are creating: acid/MM to match what was exported
- newTable.setParameters(tblDesk.getTblProps());
- if(tblDesk.isExternal() && AcidUtils.isTransactionalTable(newTable)) {
+ newTable.setParameters(tblDesc.getTblProps());
+ if(tblDesc.isExternal() && AcidUtils.isTransactionalTable(newTable)) {
throw new SemanticException("External tables may not be transactional: " +
- Warehouse.getQualifiedName(tblDesk.getDatabaseName(), tblDesk.getTableName()));
+ Warehouse.getQualifiedName(tblDesc.getDatabaseName(), tblDesc.getTableName()));
}
return newTable;
}
@@ -1019,14 +1019,36 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
x.getTasks().add(t);
}
} else {
+ // If table of current event has partition flag different from existing table, it means, some
+ // of the previous events in same batch have drop and create table events with same same but
+ // different partition flag. In this case, should go with current event's table type and so
+ // create the dummy table object for adding repl tasks.
+ boolean isOldTableValid = true;
+ if (table.isPartitioned() != isPartitioned(tblDesc)) {
+ table = createNewTableMetadataObject(tblDesc);
+ isOldTableValid = false;
+ }
+
// Table existed, and is okay to replicate into, not dropping and re-creating.
- if (table.isPartitioned()) {
+ if (isPartitioned(tblDesc)) {
x.getLOG().debug("table partitioned");
for (AddPartitionDesc addPartitionDesc : partitionDescs) {
addPartitionDesc.setReplicationSpec(replicationSpec);
Map<String, String> partSpec = addPartitionDesc.getPartition(0).getPartSpec();
org.apache.hadoop.hive.ql.metadata.Partition ptn = null;
- if ((ptn = x.getHive().getPartition(table, partSpec, false)) == null) {
+ if (isOldTableValid) {
+ // If existing table is valid but the partition spec is different, then ignore partition
+ // validation and create new partition.
+ try {
+ ptn = x.getHive().getPartition(table, partSpec, false);
+ } catch (HiveException ex) {
+ ptn = null;
+ table = createNewTableMetadataObject(tblDesc);
+ isOldTableValid = false;
+ }
+ }
+
+ if (ptn == null) {
if (!replicationSpec.isMetadataOnly()){
x.getTasks().add(addSinglePartition(
fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId));
@@ -1071,7 +1093,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
x.getLOG().debug("table non-partitioned");
if (!replicationSpec.isMetadataOnly()) {
// repl-imports are replace-into unless the event is insert-into
- loadTable(fromURI, table, replicationSpec.isReplace(), table.getDataLocation(),
+ loadTable(fromURI, table, replicationSpec.isReplace(), new Path(tblDesc.getLocation()),
replicationSpec, x, writeId, stmtId);
} else {
x.getTasks().add(alterTableTask(tblDesc, x, replicationSpec));