You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sa...@apache.org on 2018/05/11 16:51:14 UTC

hive git commit: HIVE-19435: Incremental replication cause data loss if a table is dropped followed by create and insert-into with different partition type (Sankar Hariappan, reviewed by Mahesh Kumar Behera, Thejas M Nair)

Repository: hive
Updated Branches:
  refs/heads/master 598dcf40f -> 9cfc15a86


HIVE-19435: Incremental replication cause data loss if a table is dropped followed by create and insert-into with different partition type (Sankar Hariappan, reviewed by Mahesh Kumar Behera, Thejas M Nair)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/9cfc15a8
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/9cfc15a8
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/9cfc15a8

Branch: refs/heads/master
Commit: 9cfc15a86faebb5cb654afc1bfc0c47d347b7ae2
Parents: 598dcf4
Author: Sankar Hariappan <sa...@apache.org>
Authored: Fri May 11 22:20:06 2018 +0530
Committer: Sankar Hariappan <sa...@apache.org>
Committed: Fri May 11 22:20:06 2018 +0530

----------------------------------------------------------------------
 ...TestReplicationScenariosAcrossInstances.java | 58 ++++++++++++++++++++
 .../hive/ql/parse/ImportSemanticAnalyzer.java   | 38 ++++++++++---
 2 files changed, 88 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/9cfc15a8/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
index 70e1aa7..df9bde0 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
@@ -675,4 +675,62 @@ public class TestReplicationScenariosAcrossInstances {
             .run("select id from table2 order by id")
             .verifyResults(new String[] {"2"});
   }
+
+  @Test
+  public void testIncrementalReplWithDropAndCreateTableDifferentPartitionTypeAndInsert() throws Throwable {
+    // Bootstrap dump with empty db
+    WarehouseInstance.Tuple bootstrapTuple = primary.dump(primaryDbName, null);
+
+    // Bootstrap load in replica
+    replica.load(replicatedDbName, bootstrapTuple.dumpLocation)
+            .status(replicatedDbName)
+            .verifyResult(bootstrapTuple.lastReplicationId);
+
+    // First incremental dump
+    WarehouseInstance.Tuple firstIncremental = primary.run("use " + primaryDbName)
+            .run("create table table1 (id int) partitioned by (country string)")
+            .run("create table table2 (id int)")
+            .run("create table table3 (id int) partitioned by (country string)")
+            .run("insert into table1 partition(country='india') values(1)")
+            .run("insert into table2 values(2)")
+            .run("insert into table3 partition(country='india') values(3)")
+            .dump(primaryDbName, bootstrapTuple.lastReplicationId);
+
+    // Second incremental dump
+    WarehouseInstance.Tuple secondIncremental = primary.run("use " + primaryDbName)
+            .run("drop table table1")
+            .run("drop table table2")
+            .run("drop table table3")
+            .run("create table table1 (id int)")
+            .run("insert into table1 values (10)")
+            .run("create table table2 (id int) partitioned by (country string)")
+            .run("insert into table2 partition(country='india') values(20)")
+            .run("create table table3 (id int) partitioned by (name string, rank int)")
+            .run("insert into table3 partition(name='adam', rank=100) values(30)")
+            .dump(primaryDbName, firstIncremental.lastReplicationId);
+
+    // First incremental load
+    replica.load(replicatedDbName, firstIncremental.dumpLocation)
+            .status(replicatedDbName)
+            .verifyResult(firstIncremental.lastReplicationId)
+            .run("use " + replicatedDbName)
+            .run("select id from table1")
+            .verifyResults(new String[] {"1"})
+            .run("select * from table2")
+            .verifyResults(new String[] {"2"})
+            .run("select id from table3")
+            .verifyResults(new String[] {"3"});
+
+    // Second incremental load
+    replica.load(replicatedDbName, secondIncremental.dumpLocation)
+            .status(replicatedDbName)
+            .verifyResult(secondIncremental.lastReplicationId)
+            .run("use " + replicatedDbName)
+            .run("select * from table1")
+            .verifyResults(new String[] {"10"})
+            .run("select id from table2")
+            .verifyResults(new String[] {"20"})
+            .run("select id from table3")
+            .verifyResults(new String[] {"30"});
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/9cfc15a8/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index e597872..682b641 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@ -891,14 +891,14 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
     }
   }
 
-  private static Table createNewTableMetadataObject(ImportTableDesc tblDesk)
+  private static Table createNewTableMetadataObject(ImportTableDesc tblDesc)
       throws SemanticException {
-    Table newTable = new Table(tblDesk.getDatabaseName(), tblDesk.getTableName());
+    Table newTable = new Table(tblDesc.getDatabaseName(), tblDesc.getTableName());
     //so that we know the type of table we are creating: acid/MM to match what was exported
-    newTable.setParameters(tblDesk.getTblProps());
-    if(tblDesk.isExternal() && AcidUtils.isTransactionalTable(newTable)) {
+    newTable.setParameters(tblDesc.getTblProps());
+    if(tblDesc.isExternal() && AcidUtils.isTransactionalTable(newTable)) {
       throw new SemanticException("External tables may not be transactional: " +
-          Warehouse.getQualifiedName(tblDesk.getDatabaseName(), tblDesk.getTableName()));
+          Warehouse.getQualifiedName(tblDesc.getDatabaseName(), tblDesc.getTableName()));
     }
     return newTable;
   }
@@ -1019,14 +1019,36 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
         x.getTasks().add(t);
       }
     } else {
+      // If table of current event has partition flag different from existing table, it means, some
+      // of the previous events in same batch have drop and create table events with same same but
+      // different partition flag. In this case, should go with current event's table type and so
+      // create the dummy table object for adding repl tasks.
+      boolean isOldTableValid = true;
+      if (table.isPartitioned() != isPartitioned(tblDesc)) {
+        table = createNewTableMetadataObject(tblDesc);
+        isOldTableValid = false;
+      }
+
       // Table existed, and is okay to replicate into, not dropping and re-creating.
-      if (table.isPartitioned()) {
+      if (isPartitioned(tblDesc)) {
         x.getLOG().debug("table partitioned");
         for (AddPartitionDesc addPartitionDesc : partitionDescs) {
           addPartitionDesc.setReplicationSpec(replicationSpec);
           Map<String, String> partSpec = addPartitionDesc.getPartition(0).getPartSpec();
           org.apache.hadoop.hive.ql.metadata.Partition ptn = null;
-          if ((ptn = x.getHive().getPartition(table, partSpec, false)) == null) {
+          if (isOldTableValid) {
+            // If existing table is valid but the partition spec is different, then ignore partition
+            // validation and create new partition.
+            try {
+              ptn = x.getHive().getPartition(table, partSpec, false);
+            } catch (HiveException ex) {
+              ptn = null;
+              table = createNewTableMetadataObject(tblDesc);
+              isOldTableValid = false;
+            }
+          }
+
+          if (ptn == null) {
             if (!replicationSpec.isMetadataOnly()){
               x.getTasks().add(addSinglePartition(
                   fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId));
@@ -1071,7 +1093,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
         x.getLOG().debug("table non-partitioned");
         if (!replicationSpec.isMetadataOnly()) {
           // repl-imports are replace-into unless the event is insert-into
-          loadTable(fromURI, table, replicationSpec.isReplace(), table.getDataLocation(),
+          loadTable(fromURI, table, replicationSpec.isReplace(), new Path(tblDesc.getLocation()),
             replicationSpec, x, writeId, stmtId);
         } else {
           x.getTasks().add(alterTableTask(tblDesc, x, replicationSpec));