You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by dj...@apache.org on 2018/05/08 18:42:19 UTC
[14/58] [abbrv] hive git commit: HIVE-18988: Support bootstrap
replication of ACID tables (Sankar Hariappan, reviewed by Mahesh Kumar Behera,
Thejas M Nair)
HIVE-18988: Support bootstrap replication of ACID tables (Sankar Hariappan, reviewed by Mahesh Kumar Behera, Thejas M Nair)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/4c0475ff
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/4c0475ff
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/4c0475ff
Branch: refs/heads/storage-branch-2.6
Commit: 4c0475ffb4a3157270cd5244cf39823085a68f57
Parents: dcc733b
Author: Sankar Hariappan <sa...@apache.org>
Authored: Wed May 2 22:15:29 2018 +0530
Committer: Deepak Jaiswal <dj...@apache.org>
Committed: Tue May 8 11:35:04 2018 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 5 +
.../listener/DbNotificationListener.java | 2 +-
.../TestReplicationScenariosAcidTables.java | 167 +
.../java/org/apache/hadoop/hive/ql/Driver.java | 1 +
.../hadoop/hive/ql/exec/ReplCopyTask.java | 8 +-
.../apache/hadoop/hive/ql/exec/ReplTxnTask.java | 27 +-
.../apache/hadoop/hive/ql/exec/ReplTxnWork.java | 106 -
.../apache/hadoop/hive/ql/exec/TaskFactory.java | 1 +
.../hadoop/hive/ql/exec/repl/ReplDumpTask.java | 86 +-
.../hadoop/hive/ql/exec/repl/ReplDumpWork.java | 4 +-
.../ql/exec/repl/bootstrap/ReplLoadTask.java | 9 +-
.../ql/exec/repl/bootstrap/ReplLoadWork.java | 8 +-
.../exec/repl/bootstrap/events/TableEvent.java | 3 +
.../events/filesystem/FSPartitionEvent.java | 6 +
.../events/filesystem/FSTableEvent.java | 16 +
.../bootstrap/load/table/LoadPartitions.java | 49 +-
.../repl/bootstrap/load/table/LoadTable.java | 51 +-
.../exec/repl/bootstrap/load/util/Context.java | 4 +-
.../org/apache/hadoop/hive/ql/io/AcidUtils.java | 36 +
.../hadoop/hive/ql/lockmgr/DbTxnManager.java | 10 +
.../hadoop/hive/ql/lockmgr/DummyTxnManager.java | 6 +
.../hadoop/hive/ql/lockmgr/HiveTxnManager.java | 17 +-
.../hive/ql/parse/ImportSemanticAnalyzer.java | 5 +-
.../hive/ql/parse/LoadSemanticAnalyzer.java | 2 +-
.../ql/parse/ReplicationSemanticAnalyzer.java | 7 +-
.../hadoop/hive/ql/parse/ReplicationSpec.java | 28 +-
.../hadoop/hive/ql/parse/repl/CopyUtils.java | 98 +-
.../dump/BootStrapReplicationSpecFunction.java | 10 +-
.../hive/ql/parse/repl/dump/HiveWrapper.java | 6 +-
.../ql/parse/repl/dump/PartitionExport.java | 6 +-
.../hive/ql/parse/repl/dump/TableExport.java | 14 +-
.../hadoop/hive/ql/parse/repl/dump/Utils.java | 10 +
.../ql/parse/repl/dump/io/FileOperations.java | 69 +-
.../parse/repl/dump/io/FunctionSerializer.java | 2 +-
.../repl/load/message/AbortTxnHandler.java | 2 +-
.../repl/load/message/AllocWriteIdHandler.java | 2 +-
.../repl/load/message/CommitTxnHandler.java | 2 +-
.../parse/repl/load/message/OpenTxnHandler.java | 2 +-
.../hadoop/hive/ql/plan/ImportTableDesc.java | 11 +
.../apache/hadoop/hive/ql/plan/ReplTxnWork.java | 124 +
.../gen/thrift/gen-cpp/ThriftHiveMetastore.cpp | 2632 +++++----
.../gen/thrift/gen-cpp/ThriftHiveMetastore.h | 107 +
.../ThriftHiveMetastore_server.skeleton.cpp | 5 +
.../gen/thrift/gen-cpp/hive_metastore_types.cpp | 3303 +++++------
.../gen/thrift/gen-cpp/hive_metastore_types.h | 75 +
.../metastore/api/AddDynamicPartitions.java | 32 +-
.../api/AllocateTableWriteIdsRequest.java | 68 +-
.../api/AllocateTableWriteIdsResponse.java | 36 +-
.../metastore/api/ClearFileMetadataRequest.java | 32 +-
.../hive/metastore/api/ClientCapabilities.java | 32 +-
.../hive/metastore/api/CompactionRequest.java | 44 +-
.../hive/metastore/api/CreationMetadata.java | 32 +-
.../metastore/api/FindSchemasByColsResp.java | 36 +-
.../hive/metastore/api/FireEventRequest.java | 32 +-
.../metastore/api/GetAllFunctionsResponse.java | 36 +-
.../api/GetFileMetadataByExprRequest.java | 32 +-
.../api/GetFileMetadataByExprResult.java | 48 +-
.../metastore/api/GetFileMetadataRequest.java | 32 +-
.../metastore/api/GetFileMetadataResult.java | 44 +-
.../hive/metastore/api/GetTablesRequest.java | 32 +-
.../hive/metastore/api/GetTablesResult.java | 36 +-
.../metastore/api/GetValidWriteIdsRequest.java | 32 +-
.../metastore/api/GetValidWriteIdsResponse.java | 36 +-
.../api/HeartbeatTxnRangeResponse.java | 64 +-
.../metastore/api/InsertEventRequestData.java | 64 +-
.../hadoop/hive/metastore/api/LockRequest.java | 36 +-
.../hive/metastore/api/Materialization.java | 32 +-
.../api/NotificationEventResponse.java | 36 +-
.../metastore/api/PutFileMetadataRequest.java | 64 +-
.../api/ReplTblWriteIdStateRequest.java | 952 ++++
.../hive/metastore/api/SchemaVersion.java | 36 +-
.../hive/metastore/api/ShowCompactResponse.java | 36 +-
.../hive/metastore/api/ShowLocksResponse.java | 36 +-
.../hive/metastore/api/TableValidWriteIds.java | 32 +-
.../hive/metastore/api/ThriftHiveMetastore.java | 5145 ++++++++++--------
.../hive/metastore/api/WMFullResourcePlan.java | 144 +-
.../api/WMGetAllResourcePlanResponse.java | 36 +-
.../WMGetTriggersForResourePlanResponse.java | 36 +-
.../api/WMValidateResourcePlanResponse.java | 64 +-
.../gen-php/metastore/ThriftHiveMetastore.php | 2066 +++----
.../src/gen/thrift/gen-php/metastore/Types.php | 1028 ++--
.../hive_metastore/ThriftHiveMetastore-remote | 7 +
.../hive_metastore/ThriftHiveMetastore.py | 1482 ++---
.../gen/thrift/gen-py/hive_metastore/ttypes.py | 676 ++-
.../gen/thrift/gen-rb/hive_metastore_types.rb | 31 +
.../gen/thrift/gen-rb/thrift_hive_metastore.rb | 52 +
.../hadoop/hive/metastore/HiveMetaStore.java | 5 +
.../hive/metastore/HiveMetaStoreClient.java | 27 +
.../hadoop/hive/metastore/IMetaStoreClient.java | 19 +-
.../hive/metastore/ReplChangeManager.java | 63 +-
.../hadoop/hive/metastore/txn/TxnHandler.java | 326 +-
.../hadoop/hive/metastore/txn/TxnStore.java | 11 +-
.../hadoop/hive/metastore/txn/TxnUtils.java | 51 +-
.../src/main/thrift/hive_metastore.thrift | 10 +
.../HiveMetaStoreClientPreCatalog.java | 27 +
.../hadoop/hive/common/ValidReadTxnList.java | 3 -
96 files changed, 12249 insertions(+), 8264 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 049a594..668750c 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -469,6 +469,11 @@ public class HiveConf extends Configuration {
+ "metadata for acid tables which do not require the corresponding transaction \n"
+ "semantics to be applied on target. This can be removed when ACID table \n"
+ "replication is supported."),
+ REPL_BOOTSTRAP_DUMP_OPEN_TXN_TIMEOUT("hive.repl.bootstrap.dump.open.txn.timeout", "1h",
+ new TimeValidator(TimeUnit.HOURS),
+ "Indicates the timeout for all transactions which are opened before triggering bootstrap REPL DUMP. "
+ + "If these open transactions are not closed within the timeout value, then REPL DUMP will "
+ + "forcefully abort those transactions and continue with bootstrap dump."),
//https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/TransparentEncryption.html#Running_as_the_superuser
REPL_ADD_RAW_RESERVED_NAMESPACE("hive.repl.add.raw.reserved.namespace", false,
"For TDE with same encryption keys on source and target, allow Distcp super user to access \n"
http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
----------------------------------------------------------------------
diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
index 59d1e3a..7835691 100644
--- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
+++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
@@ -240,7 +240,7 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
FileStatus file = files[i];
i++;
return ReplChangeManager.encodeFileUri(file.getPath().toString(),
- ReplChangeManager.checksumFor(file.getPath(), fs));
+ ReplChangeManager.checksumFor(file.getPath(), fs), null);
} catch (IOException e) {
throw new RuntimeException(e);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
index 2ad83b6..8ad507f 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
@@ -19,12 +19,21 @@ package org.apache.hadoop.hive.ql.parse;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsRequest;
+import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsResponse;
+import org.apache.hadoop.hive.metastore.api.OpenTxnRequest;
+import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse;
+import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.shims.Utils;
import org.junit.rules.TestName;
import org.junit.rules.TestRule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -32,7 +41,9 @@ import org.junit.BeforeClass;
import org.junit.AfterClass;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
+import java.util.List;
/**
* TestReplicationScenariosAcidTables - test replication for ACID tables
@@ -59,6 +70,9 @@ public class TestReplicationScenariosAcidTables {
put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString());
put("hive.support.concurrency", "true");
put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager");
+ put("hive.repl.dump.include.acid.tables", "true");
+ put("hive.metastore.client.capability.check", "false");
+ put("hive.repl.bootstrap.dump.open.txn.timeout", "1s");
}};
primary = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf);
replica = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf);
@@ -66,6 +80,8 @@ public class TestReplicationScenariosAcidTables {
put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString());
put("hive.support.concurrency", "false");
put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager");
+ put("hive.repl.dump.include.acid.tables", "true");
+ put("hive.metastore.client.capability.check", "false");
}};
replicaNonAcid = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf1);
}
@@ -92,6 +108,157 @@ public class TestReplicationScenariosAcidTables {
}
@Test
+ public void testAcidTablesBootstrap() throws Throwable {
+ WarehouseInstance.Tuple bootstrapDump = primary
+ .run("use " + primaryDbName)
+ .run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc " +
+ "tblproperties (\"transactional\"=\"true\")")
+ .run("insert into t1 values(1)")
+ .run("insert into t1 values(2)")
+ .run("create table t2 (place string) partitioned by (country string) clustered by(place) " +
+ "into 3 buckets stored as orc tblproperties (\"transactional\"=\"true\")")
+ .run("insert into t2 partition(country='india') values ('bangalore')")
+ .run("insert into t2 partition(country='us') values ('austin')")
+ .run("insert into t2 partition(country='france') values ('paris')")
+ .run("alter table t2 add partition(country='italy')")
+ .run("create table t3 (rank int) tblproperties(\"transactional\"=\"true\", " +
+ "\"transactional_properties\"=\"insert_only\")")
+ .run("insert into t3 values(11)")
+ .run("insert into t3 values(22)")
+ .run("create table t4 (id int)")
+ .run("insert into t4 values(111), (222)")
+ .run("create table t5 (id int) stored as orc ")
+ .run("insert into t5 values(1111), (2222)")
+ .run("alter table t5 set tblproperties (\"transactional\"=\"true\")")
+ .run("insert into t5 values(3333)")
+ .dump(primaryDbName, null);
+
+ replica.load(replicatedDbName, bootstrapDump.dumpLocation)
+ .run("use " + replicatedDbName)
+ .run("show tables")
+ .verifyResults(new String[] {"t1", "t2", "t3", "t4", "t5"})
+ .run("repl status " + replicatedDbName)
+ .verifyResult(bootstrapDump.lastReplicationId)
+ .run("select id from t1 order by id")
+ .verifyResults(new String[]{"1", "2"})
+ .run("select country from t2 order by country")
+ .verifyResults(new String[] {"france", "india", "us"})
+ .run("select rank from t3 order by rank")
+ .verifyResults(new String[] {"11", "22"})
+ .run("select id from t4 order by id")
+ .verifyResults(new String[] {"111", "222"})
+ .run("select id from t5 order by id")
+ .verifyResults(new String[] {"1111", "2222", "3333"});
+ }
+
+ @Test
+ public void testAcidTablesBootstrapWithOpenTxnsTimeout() throws Throwable {
+ // Open 5 txns
+ HiveConf primaryConf = primary.getConf();
+ TxnStore txnHandler = TxnUtils.getTxnStore(primary.getConf());
+ OpenTxnsResponse otResp = txnHandler.openTxns(new OpenTxnRequest(5, "u1", "localhost"));
+ List<Long> txns = otResp.getTxn_ids();
+ String txnIdRange = " txn_id >= " + txns.get(0) + " and txn_id <= " + txns.get(4);
+ Assert.assertEquals(TxnDbUtil.queryToString(primaryConf, "select * from TXNS"),
+ 5, TxnDbUtil.countQueryAgent(primaryConf,
+ "select count(*) from TXNS where txn_state = 'o' and " + txnIdRange));
+
+ // Create 2 tables, one partitioned and other not. Also, have both types of full ACID and MM tables.
+ primary.run("use " + primaryDbName)
+ .run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc " +
+ "tblproperties (\"transactional\"=\"true\")")
+ .run("insert into t1 values(1)")
+ .run("create table t2 (rank int) partitioned by (name string) tblproperties(\"transactional\"=\"true\", " +
+ "\"transactional_properties\"=\"insert_only\")")
+ .run("insert into t2 partition(name='Bob') values(11)")
+ .run("insert into t2 partition(name='Carl') values(10)");
+ // Allocate write ids for both tables t1 and t2 for all txns
+ // t1=5+1(insert) and t2=5+2(insert)
+ AllocateTableWriteIdsRequest rqst = new AllocateTableWriteIdsRequest(primaryDbName, "t1");
+ rqst.setTxnIds(txns);
+ txnHandler.allocateTableWriteIds(rqst);
+ rqst.setTableName("t2");
+ txnHandler.allocateTableWriteIds(rqst);
+ Assert.assertEquals(TxnDbUtil.queryToString(primaryConf, "select * from TXN_TO_WRITE_ID"),
+ 6, TxnDbUtil.countQueryAgent(primaryConf,
+ "select count(*) from TXN_TO_WRITE_ID where t2w_database = '" + primaryDbName.toLowerCase()
+ + "' and t2w_table = 't1'"));
+ Assert.assertEquals(TxnDbUtil.queryToString(primaryConf, "select * from TXN_TO_WRITE_ID"),
+ 7, TxnDbUtil.countQueryAgent(primaryConf,
+ "select count(*) from TXN_TO_WRITE_ID where t2w_database = '" + primaryDbName.toLowerCase()
+ + "' and t2w_table = 't2'"));
+
+ // Bootstrap dump with open txn timeout as 1s.
+ List<String> withConfigs = Arrays.asList(
+ "'hive.repl.bootstrap.dump.open.txn.timeout'='1s'");
+ WarehouseInstance.Tuple bootstrapDump = primary
+ .run("use " + primaryDbName)
+ .dump(primaryDbName, null, withConfigs);
+
+ // After bootstrap dump, all the opened txns should be aborted. Verify it.
+ Assert.assertEquals(TxnDbUtil.queryToString(primaryConf, "select * from TXNS"),
+ 0, TxnDbUtil.countQueryAgent(primaryConf,
+ "select count(*) from TXNS where txn_state = 'o' and " + txnIdRange));
+ Assert.assertEquals(TxnDbUtil.queryToString(primaryConf, "select * from TXNS"),
+ 5, TxnDbUtil.countQueryAgent(primaryConf,
+ "select count(*) from TXNS where txn_state = 'a' and " + txnIdRange));
+
+ // Verify the next write id
+ String[] nextWriteId = TxnDbUtil.queryToString(primaryConf, "select nwi_next from NEXT_WRITE_ID where "
+ + " nwi_database = '" + primaryDbName.toLowerCase() + "' and nwi_table = 't1'")
+ .split("\n");
+ Assert.assertEquals(Long.parseLong(nextWriteId[1].trim()), 7L);
+ nextWriteId = TxnDbUtil.queryToString(primaryConf, "select nwi_next from NEXT_WRITE_ID where "
+ + " nwi_database = '" + primaryDbName.toLowerCase() + "' and nwi_table = 't2'")
+ .split("\n");
+ Assert.assertEquals(Long.parseLong(nextWriteId[1].trim()), 8L);
+
+ // Bootstrap load which should also replicate the aborted write ids on both tables.
+ HiveConf replicaConf = replica.getConf();
+ replica.load(replicatedDbName, bootstrapDump.dumpLocation)
+ .run("use " + replicatedDbName)
+ .run("show tables")
+ .verifyResults(new String[] {"t1", "t2"})
+ .run("repl status " + replicatedDbName)
+ .verifyResult(bootstrapDump.lastReplicationId)
+ .run("select id from t1")
+ .verifyResults(new String[]{"1"})
+ .run("select rank from t2 order by rank")
+ .verifyResults(new String[] {"10", "11"});
+
+ // Verify if HWM is properly set after REPL LOAD
+ nextWriteId = TxnDbUtil.queryToString(replicaConf, "select nwi_next from NEXT_WRITE_ID where "
+ + " nwi_database = '" + replicatedDbName.toLowerCase() + "' and nwi_table = 't1'")
+ .split("\n");
+ Assert.assertEquals(Long.parseLong(nextWriteId[1].trim()), 7L);
+ nextWriteId = TxnDbUtil.queryToString(replicaConf, "select nwi_next from NEXT_WRITE_ID where "
+ + " nwi_database = '" + replicatedDbName.toLowerCase() + "' and nwi_table = 't2'")
+ .split("\n");
+ Assert.assertEquals(Long.parseLong(nextWriteId[1].trim()), 8L);
+
+ // Verify if all the aborted write ids are replicated to the replicated DB
+ Assert.assertEquals(TxnDbUtil.queryToString(replicaConf, "select * from TXN_TO_WRITE_ID"),
+ 5, TxnDbUtil.countQueryAgent(replicaConf,
+ "select count(*) from TXN_TO_WRITE_ID where t2w_database = '" + replicatedDbName.toLowerCase()
+ + "' and t2w_table = 't1'"));
+ Assert.assertEquals(TxnDbUtil.queryToString(replicaConf, "select * from TXN_TO_WRITE_ID"),
+ 5, TxnDbUtil.countQueryAgent(replicaConf,
+ "select count(*) from TXN_TO_WRITE_ID where t2w_database = '" + replicatedDbName.toLowerCase()
+ + "' and t2w_table = 't2'"));
+
+ // Verify if entries added in COMPACTION_QUEUE for each table/partition
+ // t1-> 1 entry and t2-> 2 entries (1 per partition)
+ Assert.assertEquals(TxnDbUtil.queryToString(replicaConf, "select * from COMPACTION_QUEUE"),
+ 1, TxnDbUtil.countQueryAgent(replicaConf,
+ "select count(*) from COMPACTION_QUEUE where cq_database = '" + replicatedDbName
+ + "' and cq_table = 't1'"));
+ Assert.assertEquals(TxnDbUtil.queryToString(replicaConf, "select * from COMPACTION_QUEUE"),
+ 2, TxnDbUtil.countQueryAgent(replicaConf,
+ "select count(*) from COMPACTION_QUEUE where cq_database = '" + replicatedDbName
+ + "' and cq_table = 't2'"));
+ }
+
+ @Test
public void testOpenTxnEvent() throws Throwable {
String tableName = testName.getMethodName();
WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName, null);
http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 41ad002..9f4e6f2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -591,6 +591,7 @@ public class Driver implements IDriver {
setTriggerContext(queryId);
}
+ ctx.setHiveTxnManager(queryTxnMgr);
ctx.setStatsSource(statsSource);
ctx.setCmd(command);
ctx.setHDFSCleanup(true);
http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
index de270cf..24e7324 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
@@ -83,7 +83,7 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable {
if (ReplChangeManager.isCMFileUri(fromPath, srcFs)) {
String[] result = ReplChangeManager.getFileWithChksumFromURI(fromPath.toString());
ReplChangeManager.FileInfo sourceInfo = ReplChangeManager
- .getFileInfo(new Path(result[0]), result[1], conf);
+ .getFileInfo(new Path(result[0]), result[1], result[2], conf);
if (FileUtils.copy(
sourceInfo.getSrcFs(), sourceInfo.getSourcePath(),
dstFs, toPath, false, false, conf)) {
@@ -130,7 +130,7 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable {
console.printInfo("Copying file: " + oneSrc.getPath().toString());
LOG.debug("ReplCopyTask :cp:{}=>{}", oneSrc.getPath(), toPath);
srcFiles.add(new ReplChangeManager.FileInfo(oneSrc.getPath().getFileSystem(conf),
- oneSrc.getPath()));
+ oneSrc.getPath(), null));
}
}
@@ -183,14 +183,14 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable {
try (BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(fileListing)))) {
// TODO : verify if skipping charset here is okay
- String line = null;
+ String line;
while ((line = br.readLine()) != null) {
LOG.debug("ReplCopyTask :_filesReadLine: {}", line);
String[] fileWithChksum = ReplChangeManager.getFileWithChksumFromURI(line);
try {
ReplChangeManager.FileInfo f = ReplChangeManager
- .getFileInfo(new Path(fileWithChksum[0]), fileWithChksum[1], conf);
+ .getFileInfo(new Path(fileWithChksum[0]), fileWithChksum[1], fileWithChksum[2], conf);
filePaths.add(f);
} catch (MetaException e) {
// issue warning for missing file and throw exception
http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java
index 2615072..5bbc25a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.InvalidTableException;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
+import org.apache.hadoop.hive.ql.plan.ReplTxnWork;
import org.apache.hadoop.hive.ql.plan.api.StageType;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
@@ -46,18 +47,13 @@ public class ReplTxnTask extends Task<ReplTxnWork> {
@Override
public int execute(DriverContext driverContext) {
String replPolicy = work.getReplPolicy();
- if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
- Utilities.FILE_OP_LOGGER.trace("Executing ReplTxnTask " + work.getOperationType().toString() +
- " for txn ids : " + work.getTxnIds().toString() + " replPolicy : " + replPolicy);
- }
-
- String tableName = work.getTableName() == null || work.getTableName().isEmpty() ? null : work.getTableName();
- if (tableName != null) {
+ String tableName = work.getTableName();
+ ReplicationSpec replicationSpec = work.getReplicationSpec();
+ if ((tableName != null) && (replicationSpec != null)) {
Table tbl;
try {
tbl = Hive.get().getTable(work.getDbName(), tableName);
- ReplicationSpec replicationSpec = work.getReplicationSpec();
- if (replicationSpec != null && !replicationSpec.allowReplacementInto(tbl.getParameters())) {
+ if (!replicationSpec.allowReplacementInto(tbl.getParameters())) {
// if the event is already replayed, then no need to replay it again.
LOG.debug("ReplTxnTask: Event is skipped as it is already replayed. Event Id: " +
replicationSpec.getReplicationState() + "Event Type: " + work.getOperationType());
@@ -75,7 +71,6 @@ public class ReplTxnTask extends Task<ReplTxnWork> {
try {
HiveTxnManager txnManager = driverContext.getCtx().getHiveTxnManager();
String user = UserGroupInformation.getCurrentUser().getUserName();
- LOG.debug("Replaying " + work.getOperationType().toString() + " Event for policy " + replPolicy);
switch(work.getOperationType()) {
case REPL_OPEN_TXN:
List<Long> txnIds = txnManager.replOpenTxn(replPolicy, work.getTxnIds(), user);
@@ -98,11 +93,17 @@ public class ReplTxnTask extends Task<ReplTxnWork> {
case REPL_ALLOC_WRITE_ID:
assert work.getTxnToWriteIdList() != null;
String dbName = work.getDbName();
- String tblName = work.getTableName();
List <TxnToWriteId> txnToWriteIdList = work.getTxnToWriteIdList();
- txnManager.replAllocateTableWriteIdsBatch(dbName, tblName, replPolicy, txnToWriteIdList);
+ txnManager.replAllocateTableWriteIdsBatch(dbName, tableName, replPolicy, txnToWriteIdList);
LOG.info("Replayed alloc write Id Event for repl policy: " + replPolicy + " db Name : " + dbName +
- " txnToWriteIdList: " +txnToWriteIdList.toString() + " table name: " + tblName);
+ " txnToWriteIdList: " +txnToWriteIdList.toString() + " table name: " + tableName);
+ return 0;
+ case REPL_WRITEID_STATE:
+ txnManager.replTableWriteIdState(work.getValidWriteIdList(),
+ work.getDbName(), tableName, work.getPartNames());
+ LOG.info("Replicated WriteId state for DbName: " + work.getDbName()
+ + " TableName: " + tableName
+ + " ValidWriteIdList: " + work.getValidWriteIdList());
return 0;
default:
LOG.error("Operation Type " + work.getOperationType() + " is not supported ");
http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnWork.java
deleted file mode 100644
index 530e9be..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnWork.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.exec;
-
-import java.io.Serializable;
-
-import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
-import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
-import org.apache.hadoop.hive.ql.plan.Explain;
-import org.apache.hadoop.hive.ql.plan.Explain.Level;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * ReplTxnTask.
- * Used for replaying the transaction related events.
- */
-@Explain(displayName = "Replication Transaction", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
-public class ReplTxnWork implements Serializable {
- private static final long serialVersionUID = 1L;
- private String dbName;
- private String tableName;
- private String replPolicy;
- private List<Long> txnIds;
- private List<TxnToWriteId> txnToWriteIdList;
- private ReplicationSpec replicationSpec;
-
- /**
- * OperationType.
- * Different kind of events supported for replaying.
- */
- public enum OperationType {
- REPL_OPEN_TXN, REPL_ABORT_TXN, REPL_COMMIT_TXN, REPL_ALLOC_WRITE_ID
- }
-
- OperationType operation;
-
- public ReplTxnWork(String replPolicy, String dbName, String tableName, List<Long> txnIds, OperationType type,
- List<TxnToWriteId> txnToWriteIdList, ReplicationSpec replicationSpec) {
- this.txnIds = txnIds;
- this.dbName = dbName;
- this.tableName = tableName;
- this.operation = type;
- this.replPolicy = replPolicy;
- this.txnToWriteIdList = txnToWriteIdList;
- this.replicationSpec = replicationSpec;
- }
-
- public ReplTxnWork(String replPolicy, String dbName, String tableName, List<Long> txnIds, OperationType type,
- ReplicationSpec replicationSpec) {
- this(replPolicy, dbName, tableName, txnIds, type, null, replicationSpec);
- }
-
- public ReplTxnWork(String replPolicy, String dbName, String tableName, Long txnId,
- OperationType type, ReplicationSpec replicationSpec) {
- this(replPolicy, dbName, tableName, Collections.singletonList(txnId), type, null, replicationSpec);
- }
-
- public ReplTxnWork(String replPolicy, String dbName, String tableName, OperationType type,
- List<TxnToWriteId> txnToWriteIdList, ReplicationSpec replicationSpec) {
- this(replPolicy, dbName, tableName, null, type, txnToWriteIdList, replicationSpec);
- }
-
- public List<Long> getTxnIds() {
- return txnIds;
- }
-
- public String getDbName() {
- return dbName;
- }
-
- public String getTableName() {
- return tableName;
- }
-
- public String getReplPolicy() {
- return replPolicy;
- }
-
- public OperationType getOperationType() {
- return operation;
- }
-
- public List<TxnToWriteId> getTxnToWriteIdList() {
- return txnToWriteIdList;
- }
-
- public ReplicationSpec getReplicationSpec() {
- return replicationSpec;
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
index 2da6b0f..3a107b7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.MoveWork;
import org.apache.hadoop.hive.ql.plan.ReplCopyWork;
+import org.apache.hadoop.hive.ql.plan.ReplTxnWork;
import org.apache.hadoop.hive.ql.plan.SparkWork;
import org.apache.hadoop.hive.ql.plan.TezWork;
http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index ce0757c..88d352b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -21,6 +21,8 @@ import com.google.common.primitives.Ints;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.Database;
@@ -39,6 +41,8 @@ import org.apache.hadoop.hive.metastore.messaging.event.filters.EventBoundaryFil
import org.apache.hadoop.hive.metastore.messaging.event.filters.MessageFormatFilter;
import org.apache.hadoop.hive.ql.DriverContext;
import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.lockmgr.LockException;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.InvalidTableException;
@@ -65,9 +69,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
private static final String dumpSchema = "dump_dir,last_repl_id#string,string";
@@ -90,6 +97,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
}
}
+ private static long sleepTime = 60000;
private Logger LOG = LoggerFactory.getLogger(ReplDumpTask.class);
private ReplLogger replLogger;
@@ -204,20 +212,21 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
// bootstrap case
Hive hiveDb = getHive();
Long bootDumpBeginReplId = hiveDb.getMSC().getCurrentNotificationEventId().getEventId();
+ String validTxnList = getValidTxnListForReplDump(hiveDb);
for (String dbName : Utils.matchesDb(hiveDb, work.dbNameOrPattern)) {
LOG.debug("ReplicationSemanticAnalyzer: analyzeReplDump dumping db: " + dbName);
replLogger = new BootstrapDumpLogger(dbName, dumpRoot.toString(),
Utils.getAllTables(getHive(), dbName).size(),
getHive().getAllFunctions().size());
replLogger.startLog();
- Path dbRoot = dumpDbMetadata(dbName, dumpRoot);
+ Path dbRoot = dumpDbMetadata(dbName, dumpRoot, bootDumpBeginReplId);
dumpFunctionMetadata(dbName, dumpRoot);
String uniqueKey = Utils.setDbBootstrapDumpState(hiveDb, dbName);
for (String tblName : Utils.matchesTbl(hiveDb, dbName, work.tableNameOrPattern)) {
LOG.debug(
"analyzeReplDump dumping table: " + tblName + " to db root " + dbRoot.toUri());
- dumpTable(dbName, tblName, dbRoot);
+ dumpTable(dbName, tblName, validTxnList, dbRoot);
dumpConstraintMetadata(dbName, tblName, dbRoot);
}
Utils.resetDbBootstrapDumpState(hiveDb, dbName, uniqueKey);
@@ -257,17 +266,17 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
return bootDumpBeginReplId;
}
- private Path dumpDbMetadata(String dbName, Path dumpRoot) throws Exception {
+ private Path dumpDbMetadata(String dbName, Path dumpRoot, long lastReplId) throws Exception {
Path dbRoot = new Path(dumpRoot, dbName);
// TODO : instantiating FS objects are generally costly. Refactor
FileSystem fs = dbRoot.getFileSystem(conf);
Path dumpPath = new Path(dbRoot, EximUtil.METADATA_NAME);
- HiveWrapper.Tuple<Database> database = new HiveWrapper(getHive(), dbName).database();
+ HiveWrapper.Tuple<Database> database = new HiveWrapper(getHive(), dbName, lastReplId).database();
EximUtil.createDbExportDump(fs, dumpPath, database.object, database.replicationSpec);
return dbRoot;
}
- private void dumpTable(String dbName, String tblName, Path dbRoot) throws Exception {
+ private void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot) throws Exception {
try {
Hive db = getHive();
HiveWrapper.Tuple<Table> tuple = new HiveWrapper(db, dbName).table(tblName);
@@ -276,6 +285,9 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
new TableExport.Paths(work.astRepresentationForErrorMsg, dbRoot, tblName, conf, true);
String distCpDoAsUser = conf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER);
tuple.replicationSpec.setIsReplace(true); // by default for all other objects this is false
+ if (AcidUtils.isTransactionalTable(tableSpec.tableHandle)) {
+ tuple.replicationSpec.setValidWriteIdList(getValidWriteIdList(dbName, tblName, validTxnList));
+ }
new TableExport(exportPaths, tableSpec, tuple.replicationSpec, db, distCpDoAsUser, conf).write();
replLogger.tableLog(tblName, tableSpec.tableHandle.getTableType());
@@ -286,6 +298,70 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
}
}
+ private String getValidWriteIdList(String dbName, String tblName, String validTxnString) throws LockException {
+ if ((validTxnString == null) || validTxnString.isEmpty()) {
+ return null;
+ }
+ String fullTableName = AcidUtils.getFullTableName(dbName, tblName);
+ ValidWriteIdList validWriteIds = getTxnMgr()
+ .getValidWriteIds(Collections.singletonList(fullTableName), validTxnString)
+ .getTableValidWriteIdList(fullTableName);
+ return ((validWriteIds != null) ? validWriteIds.toString() : null);
+ }
+
+ private List<Long> getOpenTxns(ValidTxnList validTxnList) {
+ long[] invalidTxns = validTxnList.getInvalidTransactions();
+ List<Long> openTxns = new ArrayList<>();
+ for (long invalidTxn : invalidTxns) {
+ if (!validTxnList.isTxnAborted(invalidTxn)) {
+ openTxns.add(invalidTxn);
+ }
+ }
+ return openTxns;
+ }
+
+ private String getValidTxnListForReplDump(Hive hiveDb) throws HiveException {
+ // Key design point for REPL DUMP is to not have any txns older than current txn in which dump runs.
+ // This is needed to ensure that Repl dump doesn't copy any data files written by any open txns
+ // mainly for streaming ingest case where one delta file shall have data from committed/aborted/open txns.
+ // It may also have data inconsistency if the on-going txns doesn't have corresponding open/write
+ // events captured which means, catch-up incremental phase won't be able to replicate those txns.
+ // So, the logic is to wait for configured amount of time to see if all open txns < current txn is
+ // getting aborted/committed. If not, then we forcefully abort those txns just like AcidHouseKeeperService.
+ ValidTxnList validTxnList = getTxnMgr().getValidTxns();
+ long timeoutInMs = HiveConf.getTimeVar(conf,
+ HiveConf.ConfVars.REPL_BOOTSTRAP_DUMP_OPEN_TXN_TIMEOUT, TimeUnit.MILLISECONDS);
+ long waitUntilTime = System.currentTimeMillis() + timeoutInMs;
+ while (System.currentTimeMillis() < waitUntilTime) {
+ // If there are no txns which are open for the given ValidTxnList snapshot, then just return it.
+ if (getOpenTxns(validTxnList).isEmpty()) {
+ return validTxnList.toString();
+ }
+
+ // Wait for 1 minute and check again.
+ try {
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) {
+ LOG.info("REPL DUMP thread sleep interrupted", e);
+ }
+ validTxnList = getTxnMgr().getValidTxns();
+ }
+
+ // After the timeout just force abort the open txns
+ List<Long> openTxns = getOpenTxns(validTxnList);
+ if (!openTxns.isEmpty()) {
+ hiveDb.abortTransactions(openTxns);
+ validTxnList = getTxnMgr().getValidTxns();
+ if (validTxnList.getMinOpenTxn() != null) {
+ openTxns = getOpenTxns(validTxnList);
+ LOG.warn("REPL DUMP unable to force abort all the open txns: {} after timeout due to unknown reasons. " +
+ "However, this is rare case that shouldn't happen.", openTxns);
+ throw new IllegalStateException("REPL DUMP triggered abort txns failed for unknown reasons.");
+ }
+ }
+ return validTxnList.toString();
+ }
+
private ReplicationSpec getNewReplicationSpec(String evState, String objState,
boolean isMetadataOnly) {
return new ReplicationSpec(true, isMetadataOnly, evState, objState, false, true, true);
http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java
index 323c73d..61fa424 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java
@@ -39,8 +39,8 @@ public class ReplDumpWork implements Serializable {
}
public ReplDumpWork(String dbNameOrPattern, String tableNameOrPattern,
- Long eventFrom, Long eventTo, String astRepresentationForErrorMsg, Integer maxEventLimit,
- String resultTempPath) {
+ Long eventFrom, Long eventTo, String astRepresentationForErrorMsg, Integer maxEventLimit,
+ String resultTempPath) {
this.dbNameOrPattern = dbNameOrPattern;
this.tableNameOrPattern = tableNameOrPattern;
this.eventFrom = eventFrom;
http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java
index 6f217cf..748d318 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java
@@ -72,8 +72,7 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
protected int execute(DriverContext driverContext) {
try {
int maxTasks = conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS);
- Context context = new Context(conf, getHive(), work.sessionStateLineageState,
- work.currentTransactionId, driverContext.getCtx());
+ Context context = new Context(conf, getHive(), work.sessionStateLineageState, driverContext.getCtx());
TaskTracker loadTaskTracker = new TaskTracker(maxTasks);
/*
for now for simplicity we are doing just one directory ( one database ), come back to use
@@ -127,7 +126,7 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
new TableContext(dbTracker, work.dbNameToLoadIn, work.tableNameToLoadIn);
TableEvent tableEvent = (TableEvent) next;
LoadTable loadTable = new LoadTable(tableEvent, context, iterator.replLogger(),
- tableContext, loadTaskTracker, getTxnMgr());
+ tableContext, loadTaskTracker);
tableTracker = loadTable.tasks();
if (!scope.database) {
scope.rootTasks.addAll(tableTracker.tasks());
@@ -145,7 +144,7 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
// for a table we explicitly try to load partitions as there is no separate partitions events.
LoadPartitions loadPartitions =
new LoadPartitions(context, iterator.replLogger(), loadTaskTracker, tableEvent,
- work.dbNameToLoadIn, tableContext, getTxnMgr());
+ work.dbNameToLoadIn, tableContext);
TaskTracker partitionsTracker = loadPartitions.tasks();
partitionsPostProcessing(iterator, scope, loadTaskTracker, tableTracker,
partitionsTracker);
@@ -163,7 +162,7 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
work.tableNameToLoadIn);
LoadPartitions loadPartitions =
new LoadPartitions(context, iterator.replLogger(), tableContext, loadTaskTracker,
- event.asTableEvent(), work.dbNameToLoadIn, event.lastPartitionReplicated(), getTxnMgr());
+ event.asTableEvent(), work.dbNameToLoadIn, event.lastPartitionReplicated());
/*
the tableTracker here should be a new instance and not an existing one as this can
only happen when we break in between loading partitions.
http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java
index 91ec93e..c1a9a62 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java
@@ -44,22 +44,20 @@ public class ReplLoadWork implements Serializable {
taken care when using other methods.
*/
final LineageState sessionStateLineageState;
- public final long currentTransactionId;
public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String dbNameToLoadIn,
- String tableNameToLoadIn, LineageState lineageState, long currentTransactionId)
+ String tableNameToLoadIn, LineageState lineageState)
throws IOException {
this.tableNameToLoadIn = tableNameToLoadIn;
sessionStateLineageState = lineageState;
- this.currentTransactionId = currentTransactionId;
this.iterator = new BootstrapEventsIterator(dumpDirectory, dbNameToLoadIn, hiveConf);
this.constraintsIterator = new ConstraintEventsIterator(dumpDirectory, hiveConf);
this.dbNameToLoadIn = dbNameToLoadIn;
}
public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String dbNameOrPattern,
- LineageState lineageState, long currentTransactionId) throws IOException {
- this(hiveConf, dumpDirectory, dbNameOrPattern, null, lineageState, currentTransactionId);
+ LineageState lineageState) throws IOException {
+ this(hiveConf, dumpDirectory, dbNameOrPattern, null, lineageState);
}
public BootstrapEventsIterator iterator() {
http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/TableEvent.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/TableEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/TableEvent.java
index e817f5f..3dcc1d7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/TableEvent.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/TableEvent.java
@@ -31,6 +31,9 @@ public interface TableEvent extends BootstrapEvent {
List<AddPartitionDesc> partitionDescriptions(ImportTableDesc tblDesc)
throws SemanticException;
+ List<String> partitions(ImportTableDesc tblDesc)
+ throws SemanticException;
+
ReplicationSpec replicationSpec();
boolean shouldNotReplicate();
http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSPartitionEvent.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSPartitionEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSPartitionEvent.java
index ef73d89..ee804e8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSPartitionEvent.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSPartitionEvent.java
@@ -68,6 +68,12 @@ public class FSPartitionEvent implements PartitionEvent {
}
@Override
+ public List<String> partitions(ImportTableDesc tblDesc)
+ throws SemanticException {
+ return tableEvent.partitions(tblDesc);
+ }
+
+ @Override
public ReplicationSpec replicationSpec() {
return tableEvent.replicationSpec();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java
index cfd1640..0fabf5a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent;
import org.apache.hadoop.hive.ql.metadata.Table;
@@ -88,6 +89,21 @@ public class FSTableEvent implements TableEvent {
return descs;
}
+ @Override
+ public List<String> partitions(ImportTableDesc tblDesc)
+ throws SemanticException {
+ List<String> partitions = new ArrayList<>();
+ try {
+ for (Partition partition : metadata.getPartitions()) {
+ String partName = Warehouse.makePartName(tblDesc.getPartCols(), partition.getValues());
+ partitions.add(partName);
+ }
+ } catch (MetaException e) {
+ throw new SemanticException(e);
+ }
+ return partitions;
+ }
+
private AddPartitionDesc partitionDesc(Path fromPath,
ImportTableDesc tblDesc, Partition partition) throws SemanticException {
try {
http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
index a42c299..df7f30d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
@@ -34,7 +34,7 @@ import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.TaskTracker;
import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context;
import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.PathUtils;
import org.apache.hadoop.hive.ql.exec.util.DAGTraversal;
-import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
@@ -45,10 +45,10 @@ import org.apache.hadoop.hive.ql.parse.repl.ReplLogger;
import org.apache.hadoop.hive.ql.plan.AddPartitionDesc;
import org.apache.hadoop.hive.ql.plan.DDLWork;
import org.apache.hadoop.hive.ql.plan.ImportTableDesc;
+import org.apache.hadoop.hive.ql.plan.LoadMultiFilesDesc;
import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
import org.apache.hadoop.hive.ql.plan.LoadTableDesc.LoadFileType;
import org.apache.hadoop.hive.ql.plan.MoveWork;
-import org.apache.hadoop.hive.ql.session.SessionState;
import org.datanucleus.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,6 +56,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@@ -77,19 +78,16 @@ public class LoadPartitions {
private final ImportTableDesc tableDesc;
private Table table;
- private final HiveTxnManager txnMgr;
public LoadPartitions(Context context, ReplLogger replLogger, TaskTracker tableTracker,
TableEvent event, String dbNameToLoadIn,
- TableContext tableContext,
- HiveTxnManager txnMgr) throws HiveException, IOException {
- this(context, replLogger, tableContext, tableTracker, event, dbNameToLoadIn, null, txnMgr);
+ TableContext tableContext) throws HiveException, IOException {
+ this(context, replLogger, tableContext, tableTracker, event, dbNameToLoadIn, null);
}
public LoadPartitions(Context context, ReplLogger replLogger, TableContext tableContext,
TaskTracker limiter, TableEvent event, String dbNameToLoadIn,
- AddPartitionDesc lastReplicatedPartition,
- HiveTxnManager txnMgr) throws HiveException, IOException {
+ AddPartitionDesc lastReplicatedPartition) throws HiveException, IOException {
this.tracker = new TaskTracker(limiter);
this.event = event;
this.context = context;
@@ -99,7 +97,6 @@ public class LoadPartitions {
this.tableDesc = tableContext.overrideProperties(event.tableDesc(dbNameToLoadIn));
this.table = ImportSemanticAnalyzer.tableIfExists(tableDesc, context.hiveDb);
- this.txnMgr = txnMgr;
}
private String location() throws MetaException, HiveException {
@@ -141,8 +138,7 @@ public class LoadPartitions {
if (table == null) {
//new table
-
- table = new Table(tableDesc.getDatabaseName(), tableDesc.getTableName());
+ table = tableDesc.toTable(context.hiveConf);
if (isPartitioned(tableDesc)) {
updateReplicationState(initialReplicationState());
if (!forNewTable().hasReplicationState()) {
@@ -153,7 +149,6 @@ public class LoadPartitions {
}
} else {
// existing
-
if (table.isPartitioned()) {
List<AddPartitionDesc> partitionDescs = event.partitionDescriptions(tableDesc);
if (!event.replicationSpec().isMetadataOnly() && !partitionDescs.isEmpty()) {
@@ -242,18 +237,24 @@ public class LoadPartitions {
/**
* This will create the move of partition data from temp path to actual path
*/
- private Task<?> movePartitionTask(Table table, AddPartitionDesc.OnePartitionDesc partSpec,
- Path tmpPath) {
- // Note: this sets LoadFileType incorrectly for ACID; is that relevant for load?
- // See setLoadFileType and setIsAcidIow calls elsewhere for an example.
- LoadTableDesc loadTableWork = new LoadTableDesc(
- tmpPath, Utilities.getTableDesc(table), partSpec.getPartSpec(),
- event.replicationSpec().isReplace() ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING,
- txnMgr.getCurrentTxnId()
- );
- loadTableWork.setInheritTableSpecs(false);
- MoveWork work = new MoveWork(new HashSet<>(), new HashSet<>(), loadTableWork, null, false);
- return TaskFactory.get(work, context.hiveConf);
+ private Task<?> movePartitionTask(Table table, AddPartitionDesc.OnePartitionDesc partSpec, Path tmpPath) {
+ MoveWork moveWork = new MoveWork(new HashSet<>(), new HashSet<>(), null, null, false);
+ if (AcidUtils.isTransactionalTable(table)) {
+ LoadMultiFilesDesc loadFilesWork = new LoadMultiFilesDesc(
+ Collections.singletonList(tmpPath),
+ Collections.singletonList(new Path(partSpec.getLocation())),
+ true, null, null);
+ moveWork.setMultiFilesDesc(loadFilesWork);
+ } else {
+ LoadTableDesc loadTableWork = new LoadTableDesc(
+ tmpPath, Utilities.getTableDesc(table), partSpec.getPartSpec(),
+ event.replicationSpec().isReplace() ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING, 0L
+ );
+ loadTableWork.setInheritTableSpecs(false);
+ moveWork.setLoadTableWork(loadTableWork);
+ }
+
+ return TaskFactory.get(moveWork, context.hiveConf);
}
private Path locationOnReplicaWarehouse(Table table, AddPartitionDesc.OnePartitionDesc partSpec)
http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
index ddb26e5..e2ec4af 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
@@ -34,7 +34,7 @@ import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.TaskTracker;
import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context;
import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.PathUtils;
import org.apache.hadoop.hive.ql.exec.util.DAGTraversal;
-import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.parse.EximUtil;
import org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer;
@@ -42,16 +42,18 @@ import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.repl.ReplLogger;
import org.apache.hadoop.hive.ql.plan.ImportTableDesc;
+import org.apache.hadoop.hive.ql.plan.LoadMultiFilesDesc;
import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
import org.apache.hadoop.hive.ql.plan.LoadTableDesc.LoadFileType;
import org.apache.hadoop.hive.ql.plan.MoveWork;
-import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.plan.ReplTxnWork;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.TreeMap;
@@ -66,17 +68,15 @@ public class LoadTable {
private final TableContext tableContext;
private final TaskTracker tracker;
private final TableEvent event;
- private final HiveTxnManager txnMgr;
public LoadTable(TableEvent event, Context context, ReplLogger replLogger,
- TableContext tableContext, TaskTracker limiter, HiveTxnManager txnMgr)
+ TableContext tableContext, TaskTracker limiter)
throws SemanticException, IOException {
this.event = event;
this.context = context;
this.replLogger = replLogger;
this.tableContext = tableContext;
this.tracker = new TaskTracker(limiter);
- this.txnMgr = txnMgr;
}
private void createTableReplLogTask(String tableName, TableType tableType) throws SemanticException {
@@ -189,9 +189,8 @@ public class LoadTable {
}
}
- private void newTableTasks(ImportTableDesc tblDesc) throws SemanticException {
- Table table;
- table = new Table(tblDesc.getDatabaseName(), tblDesc.getTableName());
+ private void newTableTasks(ImportTableDesc tblDesc) throws Exception {
+ Table table = tblDesc.toTable(context.hiveConf);
// Either we're dropping and re-creating, or the table didn't exist, and we're creating.
Task<?> createTableTask =
tblDesc.getCreateTableTask(new HashSet<>(), new HashSet<>(), context.hiveConf);
@@ -199,12 +198,22 @@ public class LoadTable {
tracker.addTask(createTableTask);
return;
}
+
+ Task<?> parentTask = createTableTask;
+ if (event.replicationSpec().isTransactionalTableDump()) {
+ List<String> partNames = isPartitioned(tblDesc) ? event.partitions(tblDesc) : null;
+ ReplTxnWork replTxnWork = new ReplTxnWork(tblDesc.getDatabaseName(), tblDesc.getTableName(), partNames,
+ event.replicationSpec().getValidWriteIdList(), ReplTxnWork.OperationType.REPL_WRITEID_STATE);
+ Task<?> replTxnTask = TaskFactory.get(replTxnWork, context.hiveConf);
+ createTableTask.addDependentTask(replTxnTask);
+ parentTask = replTxnTask;
+ }
if (!isPartitioned(tblDesc)) {
- LOG.debug("adding dependent CopyWork/MoveWork for table");
+ LOG.debug("adding dependent ReplTxnTask/CopyWork/MoveWork for table");
Task<?> loadTableTask =
loadTableTask(table, event.replicationSpec(), new Path(tblDesc.getLocation()),
event.metadataPath());
- createTableTask.addDependentTask(loadTableTask);
+ parentTask.addDependentTask(loadTableTask);
}
tracker.addTask(createTableTask);
}
@@ -229,14 +238,20 @@ public class LoadTable {
Task<?> copyTask =
ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, tmpPath, context.hiveConf);
- LoadTableDesc loadTableWork = new LoadTableDesc(
- tmpPath, Utilities.getTableDesc(table), new TreeMap<>(),
- replicationSpec.isReplace() ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING,
- //todo: what is the point of this? If this is for replication, who would have opened a txn?
- txnMgr.getCurrentTxnId()
- );
- MoveWork moveWork =
- new MoveWork(new HashSet<>(), new HashSet<>(), loadTableWork, null, false);
+ MoveWork moveWork = new MoveWork(new HashSet<>(), new HashSet<>(), null, null, false);
+ if (AcidUtils.isTransactionalTable(table)) {
+ LoadMultiFilesDesc loadFilesWork = new LoadMultiFilesDesc(
+ Collections.singletonList(tmpPath),
+ Collections.singletonList(tgtPath),
+ true, null, null);
+ moveWork.setMultiFilesDesc(loadFilesWork);
+ } else {
+ LoadTableDesc loadTableWork = new LoadTableDesc(
+ tmpPath, Utilities.getTableDesc(table), new TreeMap<>(),
+ replicationSpec.isReplace() ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING, 0L
+ );
+ moveWork.setLoadTableWork(loadTableWork);
+ }
Task<?> loadTableTask = TaskFactory.get(moveWork, context.hiveConf);
copyTask.addDependentTask(loadTableTask);
return copyTask;
http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/Context.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/Context.java
index 6fbc657..7eae1ea 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/Context.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/Context.java
@@ -36,18 +36,16 @@ public class Context {
taken care when using other methods.
*/
public final LineageState sessionStateLineageState;
- public final long currentTransactionId;
public Context(HiveConf hiveConf, Hive hiveDb,
- LineageState lineageState, long currentTransactionId,
+ LineageState lineageState,
org.apache.hadoop.hive.ql.Context nestedContext) throws MetaException {
this.hiveConf = hiveConf;
this.hiveDb = hiveDb;
this.warehouse = new Warehouse(hiveConf);
this.pathInfo = new PathInfo(hiveConf);
sessionStateLineageState = lineageState;
- this.currentTransactionId = currentTransactionId;
this.nestedContext = nestedContext;
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index 445e126..183515a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hive.common.HiveStatsUtils;
+import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -1831,6 +1832,41 @@ public class AcidUtils {
return fileList;
}
+ public static List<Path> getValidDataPaths(Path dataPath, Configuration conf, String validWriteIdStr)
+ throws IOException {
+ List<Path> pathList = new ArrayList<>();
+ if ((validWriteIdStr == null) || validWriteIdStr.isEmpty()) {
+ // If Non-Acid case, then all files would be in the base data path. So, just return it.
+ pathList.add(dataPath);
+ return pathList;
+ }
+
+ // If ACID/MM tables, then need to find the valid state wrt to given ValidWriteIdList.
+ ValidWriteIdList validWriteIdList = new ValidReaderWriteIdList(validWriteIdStr);
+ Directory acidInfo = AcidUtils.getAcidState(dataPath, conf, validWriteIdList);
+
+ for (HdfsFileStatusWithId hfs : acidInfo.getOriginalFiles()) {
+ pathList.add(hfs.getFileStatus().getPath());
+ }
+ for (ParsedDelta delta : acidInfo.getCurrentDirectories()) {
+ pathList.add(delta.getPath());
+ }
+ if (acidInfo.getBaseDirectory() != null) {
+ pathList.add(acidInfo.getBaseDirectory());
+ }
+ return pathList;
+ }
+
+ public static String getAcidSubDir(Path dataPath) {
+ String dataDir = dataPath.getName();
+ if (dataDir.startsWith(AcidUtils.BASE_PREFIX)
+ || dataDir.startsWith(AcidUtils.DELTA_PREFIX)
+ || dataDir.startsWith(AcidUtils.DELETE_DELTA_PREFIX)) {
+ return dataDir;
+ }
+ return null;
+ }
+
public static boolean isAcidEnabled(HiveConf hiveConf) {
String txnMgr = hiveConf.getVar(HiveConf.ConfVars.HIVE_TXN_MANAGER);
boolean concurrency = hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY);
http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index 76569d5..515c08b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -690,6 +690,16 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
}
@Override
+ public void replTableWriteIdState(String validWriteIdList, String dbName, String tableName, List<String> partNames)
+ throws LockException {
+ try {
+ getMS().replTableWriteIdState(validWriteIdList, dbName, tableName, partNames);
+ } catch (TException e) {
+ throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(), e);
+ }
+ }
+
+ @Override
public void heartbeat() throws LockException {
List<HiveLock> locks;
if(isTxnOpen()) {
http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
index a74670b..ab9d67e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
@@ -235,6 +235,12 @@ class DummyTxnManager extends HiveTxnManagerImpl {
}
@Override
+ public void replTableWriteIdState(String validWriteIdList, String dbName, String tableName, List<String> partNames)
+ throws LockException {
+ // No-op
+ }
+
+ @Override
public void heartbeat() throws LockException {
// No-op
}
http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
index f239535..5f68e08 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
@@ -57,7 +57,7 @@ public interface HiveTxnManager {
* @return The new transaction id.
* @throws LockException in case of failure to start the transaction.
*/
- List<Long> replOpenTxn(String replPolicy, List<Long> srcTxnIds, String user) throws LockException;
+ List<Long> replOpenTxn(String replPolicy, List<Long> srcTxnIds, String user) throws LockException;
/**
* Commit the transaction in target cluster.
@@ -65,7 +65,7 @@ public interface HiveTxnManager {
* @param srcTxnId The id of the transaction at the source cluster
* @throws LockException in case of failure to commit the transaction.
*/
- void replCommitTxn(String replPolicy, long srcTxnId) throws LockException;
+ void replCommitTxn(String replPolicy, long srcTxnId) throws LockException;
/**
* Abort the transaction in target cluster.
@@ -73,7 +73,18 @@ public interface HiveTxnManager {
* @param srcTxnId The id of the transaction at the source cluster
* @throws LockException in case of failure to abort the transaction.
*/
- void replRollbackTxn(String replPolicy, long srcTxnId) throws LockException;
+ void replRollbackTxn(String replPolicy, long srcTxnId) throws LockException;
+
+ /**
+ * Replicate Table Write Ids state to mark aborted write ids and writeid high water mark.
+ * @param validWriteIdList Snapshot of writeid list when the table/partition is dumped.
+ * @param dbName Database name
+ * @param tableName Table which is written.
+ * @param partNames List of partitions being written.
+ * @throws LockException in case of failure.
+ */
+ void replTableWriteIdState(String validWriteIdList, String dbName, String tableName, List<String> partNames)
+ throws LockException;
/**
* Get the lock manager. This must be used rather than instantiating an
http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index b850ddc..fa32807 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@ -327,8 +327,9 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
Long writeId = 0L; // Initialize with 0 for non-ACID and non-MM tables.
int stmtId = 0;
- if ((tableExists && AcidUtils.isTransactionalTable(table))
- || (!tableExists && AcidUtils.isTablePropertyTransactional(tblDesc.getTblProps()))) {
+ if (!replicationSpec.isInReplicationScope()
+ && ((tableExists && AcidUtils.isTransactionalTable(table))
+ || (!tableExists && AcidUtils.isTablePropertyTransactional(tblDesc.getTblProps())))) {
//if importing into existing transactional table or will create a new transactional table
//(because Export was done from transactional table), need a writeId
// Explain plan doesn't open a txn and hence no need to allocate write id.
http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
index c07991d..8332bcc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
@@ -81,7 +81,7 @@ public class LoadSemanticAnalyzer extends BaseSemanticAnalyzer {
}
});
if ((srcs != null) && srcs.length == 1) {
- if (srcs[0].isDir()) {
+ if (srcs[0].isDirectory()) {
srcs = fs.listStatus(srcs[0].getPath(), new PathFilter() {
@Override
public boolean accept(Path p) {
http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index 05eca1f..562f497 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -368,9 +368,8 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
}
if ((!evDump) && (tblNameOrPattern != null) && !(tblNameOrPattern.isEmpty())) {
- ReplLoadWork replLoadWork =
- new ReplLoadWork(conf, loadPath.toString(), dbNameOrPattern, tblNameOrPattern,
- queryState.getLineageState(), getTxnMgr().getCurrentTxnId());
+ ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), dbNameOrPattern,
+ tblNameOrPattern, queryState.getLineageState());
rootTasks.add(TaskFactory.get(replLoadWork, conf));
return;
}
@@ -401,7 +400,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
}
ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), dbNameOrPattern,
- queryState.getLineageState(), getTxnMgr().getCurrentTxnId());
+ queryState.getLineageState());
rootTasks.add(TaskFactory.get(replLoadWork, conf));
//
// for (FileStatus dir : dirsInLoadPath) {
http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
index 824da21..7d901f9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
@@ -43,6 +43,7 @@ public class ReplicationSpec {
private boolean isNoop = false;
private boolean isLazy = false; // lazy mode => we only list files, and expect that the eventual copy will pull data in.
private boolean isReplace = true; // default is that the import mode is insert overwrite
+ private String validWriteIdList = null; // WriteIds snapshot for replicating ACID/MM tables.
private Type specType = Type.DEFAULT; // DEFAULT means REPL_LOAD or BOOTSTRAP_DUMP or EXPORT
// Key definitions related to replication
@@ -52,7 +53,8 @@ public class ReplicationSpec {
CURR_STATE_ID("repl.last.id"),
NOOP("repl.noop"),
LAZY("repl.lazy"),
- IS_REPLACE("repl.is.replace")
+ IS_REPLACE("repl.is.replace"),
+ VALID_WRITEID_LIST("repl.valid.writeid.list")
;
private final String keyName;
@@ -140,6 +142,7 @@ public class ReplicationSpec {
this.isNoop = Boolean.parseBoolean(keyFetcher.apply(ReplicationSpec.KEY.NOOP.toString()));
this.isLazy = Boolean.parseBoolean(keyFetcher.apply(ReplicationSpec.KEY.LAZY.toString()));
this.isReplace = Boolean.parseBoolean(keyFetcher.apply(ReplicationSpec.KEY.IS_REPLACE.toString()));
+ this.validWriteIdList = keyFetcher.apply(ReplicationSpec.KEY.VALID_WRITEID_LIST.toString());
}
/**
@@ -325,6 +328,27 @@ public class ReplicationSpec {
this.isLazy = isLazy;
}
+ /**
+ * @return the WriteIds snapshot for the current ACID/MM table being replicated
+ */
+ public String getValidWriteIdList() {
+ return validWriteIdList;
+ }
+
+ /**
+ * @param validWriteIdList WriteIds snapshot for the current ACID/MM table being replicated
+ */
+ public void setValidWriteIdList(String validWriteIdList) {
+ this.validWriteIdList = validWriteIdList;
+ }
+
+ /**
+ * @return whether the current replication dumped object related to ACID/Mm table
+ */
+ public boolean isTransactionalTableDump() {
+ return (validWriteIdList != null);
+ }
+
public String get(KEY key) {
switch (key){
case REPL_SCOPE:
@@ -346,6 +370,8 @@ public class ReplicationSpec {
return String.valueOf(isLazy());
case IS_REPLACE:
return String.valueOf(isReplace());
+ case VALID_WRITEID_LIST:
+ return getValidWriteIdList();
}
return null;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
index 4e61280..529ea21 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
@@ -66,43 +66,52 @@ public class CopyUtils {
// Used by replication, copy files from source to destination. It is possible source file is
// changed/removed during copy, so double check the checksum after copy,
// if not match, copy again from cm
- public void copyAndVerify(FileSystem destinationFs, Path destination,
+ public void copyAndVerify(FileSystem destinationFs, Path destRoot,
List<ReplChangeManager.FileInfo> srcFiles) throws IOException, LoginException {
- Map<FileSystem, List<ReplChangeManager.FileInfo>> map = fsToFileMap(srcFiles);
- for (Map.Entry<FileSystem, List<ReplChangeManager.FileInfo>> entry : map.entrySet()) {
+ Map<FileSystem, Map< Path, List<ReplChangeManager.FileInfo>>> map = fsToFileMap(srcFiles, destRoot);
+ for (Map.Entry<FileSystem, Map<Path, List<ReplChangeManager.FileInfo>>> entry : map.entrySet()) {
FileSystem sourceFs = entry.getKey();
- List<ReplChangeManager.FileInfo> fileInfoList = entry.getValue();
- boolean useRegularCopy = regularCopy(destinationFs, sourceFs, fileInfoList);
+ Map<Path, List<ReplChangeManager.FileInfo>> destMap = entry.getValue();
+ for (Map.Entry<Path, List<ReplChangeManager.FileInfo>> destMapEntry : destMap.entrySet()) {
+ Path destination = destMapEntry.getKey();
+ List<ReplChangeManager.FileInfo> fileInfoList = destMapEntry.getValue();
+ boolean useRegularCopy = regularCopy(destinationFs, sourceFs, fileInfoList);
- doCopyRetry(sourceFs, fileInfoList, destinationFs, destination, useRegularCopy);
-
- // Verify checksum, retry if checksum changed
- List<ReplChangeManager.FileInfo> retryFileInfoList = new ArrayList<>();
- for (ReplChangeManager.FileInfo srcFile : srcFiles) {
- if(!srcFile.isUseSourcePath()) {
- // If already use cmpath, nothing we can do here, skip this file
- continue;
+ if (!destinationFs.exists(destination)
+ && !FileUtils.mkdir(destinationFs, destination, hiveConf)) {
+ LOG.error("Failed to create destination directory: " + destination);
+ throw new IOException("Destination directory creation failed");
}
- String sourceChecksumString = srcFile.getCheckSum();
- if (sourceChecksumString != null) {
- String verifySourceChecksumString;
- try {
- verifySourceChecksumString
- = ReplChangeManager.checksumFor(srcFile.getSourcePath(), sourceFs);
- } catch (IOException e) {
- // Retry with CM path
- verifySourceChecksumString = null;
+ doCopyRetry(sourceFs, fileInfoList, destinationFs, destination, useRegularCopy);
+
+ // Verify checksum, retry if checksum changed
+ List<ReplChangeManager.FileInfo> retryFileInfoList = new ArrayList<>();
+ for (ReplChangeManager.FileInfo srcFile : srcFiles) {
+ if (!srcFile.isUseSourcePath()) {
+ // If already use cmpath, nothing we can do here, skip this file
+ continue;
}
- if ((verifySourceChecksumString == null)
- || !sourceChecksumString.equals(verifySourceChecksumString)) {
- // If checksum does not match, likely the file is changed/removed, copy again from cm
- srcFile.setIsUseSourcePath(false);
- retryFileInfoList.add(srcFile);
+ String sourceChecksumString = srcFile.getCheckSum();
+ if (sourceChecksumString != null) {
+ String verifySourceChecksumString;
+ try {
+ verifySourceChecksumString
+ = ReplChangeManager.checksumFor(srcFile.getSourcePath(), sourceFs);
+ } catch (IOException e) {
+ // Retry with CM path
+ verifySourceChecksumString = null;
+ }
+ if ((verifySourceChecksumString == null)
+ || !sourceChecksumString.equals(verifySourceChecksumString)) {
+ // If checksum does not match, likely the file is changed/removed, copy again from cm
+ srcFile.setIsUseSourcePath(false);
+ retryFileInfoList.add(srcFile);
+ }
}
}
- }
- if (!retryFileInfoList.isEmpty()) {
- doCopyRetry(sourceFs, retryFileInfoList, destinationFs, destination, useRegularCopy);
+ if (!retryFileInfoList.isEmpty()) {
+ doCopyRetry(sourceFs, retryFileInfoList, destinationFs, destination, useRegularCopy);
+ }
}
}
}
@@ -212,7 +221,7 @@ public class CopyUtils {
for (Map.Entry<FileSystem, List<Path>> entry : map.entrySet()) {
final FileSystem sourceFs = entry.getKey();
List<ReplChangeManager.FileInfo> fileList = Lists.transform(entry.getValue(),
- path -> { return new ReplChangeManager.FileInfo(sourceFs, path);});
+ path -> new ReplChangeManager.FileInfo(sourceFs, path, null));
doCopyOnce(sourceFs, entry.getValue(),
destinationFs, destination,
regularCopy(destinationFs, sourceFs, fileList));
@@ -287,16 +296,33 @@ public class CopyUtils {
return result;
}
- private Map<FileSystem, List<ReplChangeManager.FileInfo>> fsToFileMap(
- List<ReplChangeManager.FileInfo> srcFiles) throws IOException {
- Map<FileSystem, List<ReplChangeManager.FileInfo>> result = new HashMap<>();
+ // Create map of source file system to destination path to list of files to copy
+ private Map<FileSystem, Map<Path, List<ReplChangeManager.FileInfo>>> fsToFileMap(
+ List<ReplChangeManager.FileInfo> srcFiles, Path destRoot) throws IOException {
+ Map<FileSystem, Map<Path, List<ReplChangeManager.FileInfo>>> result = new HashMap<>();
for (ReplChangeManager.FileInfo file : srcFiles) {
FileSystem fileSystem = file.getSrcFs();
if (!result.containsKey(fileSystem)) {
- result.put(fileSystem, new ArrayList<ReplChangeManager.FileInfo>());
+ result.put(fileSystem, new HashMap<>());
}
- result.get(fileSystem).add(file);
+ Path destination = getCopyDestination(file, destRoot);
+ if (!result.get(fileSystem).containsKey(destination)) {
+ result.get(fileSystem).put(destination, new ArrayList<>());
+ }
+ result.get(fileSystem).get(destination).add(file);
}
return result;
}
+
+ private Path getCopyDestination(ReplChangeManager.FileInfo fileInfo, Path destRoot) {
+ if (fileInfo.getSubDir() == null) {
+ return destRoot;
+ }
+ String[] subDirs = fileInfo.getSubDir().split(Path.SEPARATOR);
+ Path destination = destRoot;
+ for (String subDir: subDirs) {
+ destination = new Path(destination, subDir);
+ }
+ return destination;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/BootStrapReplicationSpecFunction.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/BootStrapReplicationSpecFunction.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/BootStrapReplicationSpecFunction.java
index 5c1850c..4b2812e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/BootStrapReplicationSpecFunction.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/BootStrapReplicationSpecFunction.java
@@ -24,16 +24,18 @@ import org.apache.hadoop.hive.ql.parse.SemanticException;
class BootStrapReplicationSpecFunction implements HiveWrapper.Tuple.Function<ReplicationSpec> {
private final Hive db;
+ private final long currentNotificationId;
- BootStrapReplicationSpecFunction(Hive db) {
+ BootStrapReplicationSpecFunction(Hive db, long currentNotificationId) {
this.db = db;
+ this.currentNotificationId = currentNotificationId;
}
@Override
public ReplicationSpec fromMetaStore() throws HiveException {
try {
- long currentNotificationId = db.getMSC()
- .getCurrentNotificationEventId().getEventId();
+ long currentReplicationState = (this.currentNotificationId > 0)
+ ? this.currentNotificationId : db.getMSC().getCurrentNotificationEventId().getEventId();
ReplicationSpec replicationSpec =
new ReplicationSpec(
true,
@@ -45,7 +47,7 @@ class BootStrapReplicationSpecFunction implements HiveWrapper.Tuple.Function<Rep
false
);
- replicationSpec.setCurrentReplicationState(String.valueOf(currentNotificationId));
+ replicationSpec.setCurrentReplicationState(String.valueOf(currentReplicationState));
return replicationSpec;
} catch (Exception e) {
throw new SemanticException(e);