You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sa...@apache.org on 2019/02/15 04:22:20 UTC
[hive] branch master updated: HIVE-21261: Incremental REPL LOAD
adds redundant COPY and MOVE tasks for external table events (Sankar
Hariappan, reviewed by Mahesh Kumar Behera)
This is an automated email from the ASF dual-hosted git repository.
sankarh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 4b38b88 HIVE-21261: Incremental REPL LOAD adds redundant COPY and MOVE tasks for external table events (Sankar Hariappan, reviewed by Mahesh Kumar Behera)
4b38b88 is described below
commit 4b38b881ea03d979648559ba79ac9ddb1a9f6019
Author: Sankar Hariappan <sa...@apache.org>
AuthorDate: Fri Feb 15 09:50:51 2019 +0530
HIVE-21261: Incremental REPL LOAD adds redundant COPY and MOVE tasks for external table events (Sankar Hariappan, reviewed by Mahesh Kumar Behera)
Signed-off-by: Sankar Hariappan <sa...@apache.org>
---
.../TestReplicationScenariosExternalTables.java | 2 +-
.../hive/ql/parse/ImportSemanticAnalyzer.java | 167 ++++++++++++---------
2 files changed, 95 insertions(+), 74 deletions(-)
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
index c25e6e2..81af2fe 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
@@ -512,7 +512,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
tableNames.add(components[0]);
assertTrue(components[1].length() > 0);
}
- assertTrue(expected.containsAll(tableNames));
+ assertTrue(tableNames.containsAll(expected));
reader.close();
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index 8242b47..d4fb191 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@ -133,9 +133,9 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
// db exists.
boolean waitOnPrecursor = false;
- for (int i = 1; i < ast.getChildCount(); ++i){
+ for (int i = 1; i < ast.getChildCount(); ++i) {
ASTNode child = (ASTNode) ast.getChild(i);
- switch (child.getToken().getType()){
+ switch (child.getToken().getType()) {
case HiveParser.KW_EXTERNAL:
isExternalSet = true;
break;
@@ -145,7 +145,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
break;
case HiveParser.TOK_TAB:
ASTNode tableNameNode = (ASTNode) child.getChild(0);
- Map.Entry<String,String> dbTablePair = getDbTableNamePair(tableNameNode);
+ Map.Entry<String, String> dbTablePair = getDbTableNamePair(tableNameNode);
parsedDbName = dbTablePair.getKey();
parsedTableName = dbTablePair.getValue();
// get partition metadata if partition specified
@@ -164,10 +164,11 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
}
// parsing statement is now done, on to logic.
tableExists = prepareImport(true,
- isLocationSet, isExternalSet, isPartSpecSet, waitOnPrecursor,
- parsedLocation, parsedTableName, parsedDbName, parsedPartSpec, fromTree.getText(),
- new EximUtil.SemanticAnalyzerWrapperContext(conf, db, inputs, outputs, rootTasks, LOG, ctx),
- null, getTxnMgr(), 0);
+ isLocationSet, isExternalSet, isPartSpecSet, waitOnPrecursor,
+ parsedLocation, parsedTableName, parsedDbName, parsedPartSpec, fromTree.getText(),
+ new EximUtil.SemanticAnalyzerWrapperContext(conf, db, inputs, outputs, rootTasks, LOG, ctx),
+
+ null, getTxnMgr(), 0);
} catch (SemanticException e) {
throw e;
@@ -185,12 +186,12 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
ASTNode partspec_val = (ASTNode) partspec.getChild(j);
String val = null;
String colName = unescapeIdentifier(partspec_val.getChild(0)
- .getText().toLowerCase());
+ .getText().toLowerCase());
if (partspec_val.getChildCount() < 2) { // DP in the form of T
// partition (ds, hr)
throw new SemanticException(
- ErrorMsg.INVALID_PARTITION
- .getMsg(" - Dynamic partitions not allowed"));
+ ErrorMsg.INVALID_PARTITION
+ .getMsg(" - Dynamic partitions not allowed"));
} else { // in the form of T partition (ds="2010-03-03")
val = stripQuotes(partspec_val.getChild(1).getText());
}
@@ -220,16 +221,17 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
* Given that "repl load" now supports two modes "repl load dbName [location]" and
* "repl load [location]" in which case the database name has to be taken from the table metadata
* by default and then over-ridden if something specified on the command line.
- *
+ * <p>
* hence for import to work correctly we have to pass in the sessionState default Db via the
* parsedDbName parameter
*/
public static boolean prepareImport(boolean isImportCmd,
- boolean isLocationSet, boolean isExternalSet, boolean isPartSpecSet, boolean waitOnPrecursor,
- String parsedLocation, String parsedTableName, String overrideDBName,
- LinkedHashMap<String, String> parsedPartSpec,
- String fromLocn, EximUtil.SemanticAnalyzerWrapperContext x,
- UpdatedMetaDataTracker updatedMetadata, HiveTxnManager txnMgr,
+ boolean isLocationSet, boolean isExternalSet, boolean isPartSpecSet,
+ boolean waitOnPrecursor,
+ String parsedLocation, String parsedTableName, String overrideDBName,
+ LinkedHashMap<String, String> parsedPartSpec,
+ String fromLocn, EximUtil.SemanticAnalyzerWrapperContext x,
+ UpdatedMetaDataTracker updatedMetadata, HiveTxnManager txnMgr,
long writeId // Initialize with 0 for non-ACID and non-MM tables.
) throws IOException, MetaException, HiveException, URISyntaxException {
@@ -242,7 +244,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
MetaData rv;
try {
- rv = EximUtil.readMetaData(fs, new Path(fromPath, EximUtil.METADATA_NAME));
+ rv = EximUtil.readMetaData(fs, new Path(fromPath, EximUtil.METADATA_NAME));
} catch (IOException e) {
throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(), e);
}
@@ -253,10 +255,10 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
}
ReplicationSpec replicationSpec = rv.getReplicationSpec();
- if (replicationSpec.isNoop()){
+ if (replicationSpec.isNoop()) {
// nothing to do here, silently return.
x.getLOG().debug("Current update with ID:{} is noop",
- replicationSpec.getCurrentReplicationState());
+ replicationSpec.getCurrentReplicationState());
return false;
}
@@ -265,7 +267,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
}
String dbname = rv.getTable().getDbName();
- if ((overrideDBName !=null) && (!overrideDBName.isEmpty())){
+ if ((overrideDBName != null) && (!overrideDBName.isEmpty())) {
// If the parsed statement contained a db.tablename specification, prefer that.
dbname = overrideDBName;
}
@@ -305,7 +307,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
}
boolean inReplicationScope = false;
- if ((replicationSpec != null) && replicationSpec.isInReplicationScope()){
+ if ((replicationSpec != null) && replicationSpec.isInReplicationScope()) {
tblDesc.setReplicationSpec(replicationSpec);
// Statistics for a non-transactional table will be replicated separately. Don't bother
// with it here.
@@ -329,7 +331,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
x.getInputs().add(toReadEntity(new Path(parsedLocation), x.getConf()));
}
- if ((parsedTableName!= null) && (!parsedTableName.isEmpty())){
+ if ((parsedTableName != null) && (!parsedTableName.isEmpty())) {
tblDesc.setTableName(parsedTableName);
}
@@ -338,9 +340,9 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
for (Partition partition : partitions) {
// TODO: this should ideally not create AddPartitionDesc per partition
AddPartitionDesc partsDesc =
- getBaseAddPartitionDescFromPartition(fromPath, dbname, tblDesc, partition,
- replicationSpec, x.getConf());
- if (inReplicationScope){
+ getBaseAddPartitionDescFromPartition(fromPath, dbname, tblDesc, partition,
+ replicationSpec, x.getConf());
+ if (inReplicationScope) {
// Statistics for a non-transactional table will be replicated separately. Don't bother
// with it here.
if (TxnUtils.isTransactionalTable(tblDesc.getTblProps())) {
@@ -350,12 +352,12 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
partitionDescs.add(partsDesc);
}
- if (isPartSpecSet){
+ if (isPartSpecSet) {
// The import specification asked for only a particular partition to be loaded
// We load only that, and ignore all the others.
boolean found = false;
for (Iterator<AddPartitionDesc> partnIter = partitionDescs
- .listIterator(); partnIter.hasNext();) {
+ .listIterator(); partnIter.hasNext(); ) {
AddPartitionDesc addPartitionDesc = partnIter.next();
if (!found && addPartitionDesc.getPartition(0).getPartSpec().equals(parsedPartSpec)) {
found = true;
@@ -365,8 +367,8 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
}
if (!found) {
throw new SemanticException(
- ErrorMsg.INVALID_PARTITION
- .getMsg(" - Specified partition not found in import directory"));
+ ErrorMsg.INVALID_PARTITION
+ .getMsg(" - Specified partition not found in import directory"));
}
}
@@ -412,25 +414,25 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
if (inReplicationScope) {
createReplImportTasks(
- tblDesc, partitionDescs,
- replicationSpec, waitOnPrecursor, table,
- fromURI, wh, x, writeId, stmtId, updatedMetadata);
+ tblDesc, partitionDescs,
+ replicationSpec, waitOnPrecursor, table,
+ fromURI, wh, x, writeId, stmtId, updatedMetadata);
} else {
createRegularImportTasks(
- tblDesc, partitionDescs,
- isPartSpecSet, replicationSpec, table,
- fromURI, fs, wh, x, writeId, stmtId);
+ tblDesc, partitionDescs,
+ isPartSpecSet, replicationSpec, table,
+ fromURI, fs, wh, x, writeId, stmtId);
}
return tableExists;
}
private static AddPartitionDesc getBaseAddPartitionDescFromPartition(
- Path fromPath, String dbName, ImportTableDesc tblDesc, Partition partition,
- ReplicationSpec replicationSpec, HiveConf conf)
- throws MetaException, SemanticException {
+ Path fromPath, String dbName, ImportTableDesc tblDesc, Partition partition,
+ ReplicationSpec replicationSpec, HiveConf conf)
+ throws MetaException, SemanticException {
AddPartitionDesc partsDesc = new AddPartitionDesc(dbName, tblDesc.getTableName(),
- EximUtil.makePartSpec(tblDesc.getPartCols(), partition.getValues()),
- partition.getSd().getLocation(), partition.getParameters());
+ EximUtil.makePartSpec(tblDesc.getPartCols(), partition.getValues()),
+ partition.getSd().getLocation(), partition.getParameters());
AddPartitionDesc.OnePartitionDesc partDesc = partsDesc.getPartition(0);
partDesc.setInputFormat(partition.getSd().getInputFormat());
partDesc.setOutputFormat(partition.getSd().getOutputFormat());
@@ -441,27 +443,28 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
partDesc.setBucketCols(partition.getSd().getBucketCols());
partDesc.setSortCols(partition.getSd().getSortCols());
if (replicationSpec.isInReplicationScope() && tblDesc.isExternal()
- && !replicationSpec.isMigratingToExternalTable()) {
+ && !replicationSpec.isMigratingToExternalTable()) {
String newLocation = ReplExternalTables
- .externalTableLocation(conf, partition.getSd().getLocation());
+ .externalTableLocation(conf, partition.getSd().getLocation());
LOG.debug("partition {} has data location: {}", partition, newLocation);
partDesc.setLocation(newLocation);
} else {
partDesc.setLocation(new Path(fromPath,
- Warehouse.makePartName(tblDesc.getPartCols(), partition.getValues())).toString());
+ Warehouse.makePartName(tblDesc.getPartCols(), partition.getValues())).toString());
}
return partsDesc;
}
private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName,
- org.apache.hadoop.hive.metastore.api.Table tblObj) throws Exception {
+ org.apache.hadoop.hive.metastore.api.Table tblObj)
+ throws Exception {
Table table = new Table(tblObj);
return new ImportTableDesc(dbName, table);
}
private static Task<?> loadTable(URI fromURI, Table table, boolean replace, Path tgtPath,
- ReplicationSpec replicationSpec, EximUtil.SemanticAnalyzerWrapperContext x,
- Long writeId, int stmtId) throws HiveException {
+ ReplicationSpec replicationSpec, EximUtil.SemanticAnalyzerWrapperContext x,
+ Long writeId, int stmtId) throws HiveException {
assert table != null;
assert table.getParameters() != null;
Path dataPath = new Path(fromURI.toString(), EximUtil.DATA_PATH_NAME);
@@ -508,10 +511,11 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
Utilities.FILE_OP_LOGGER.trace("adding import work for table with source location: " +
- dataPath + "; table: " + tgtPath + "; copy destination " + destPath + "; mm " + writeId +
- " for " + table.getTableName() + ": " +
+ dataPath + "; table: " + tgtPath + "; copy destination " + destPath + "; mm " +
+ writeId +
+ " for " + table.getTableName() + ": " +
(AcidUtils.isFullAcidTable(table) ? "acid" :
- (AcidUtils.isInsertOnlyTable(table) ? "mm" : "flat")
+ (AcidUtils.isInsertOnlyTable(table) ? "mm" : "flat")
)
);
}
@@ -553,7 +557,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
return loadTableTask;
}
- private static Task<?> createTableTask(ImportTableDesc tableDesc, EximUtil.SemanticAnalyzerWrapperContext x){
+ private static Task<?> createTableTask(ImportTableDesc tableDesc, EximUtil.SemanticAnalyzerWrapperContext x) {
return tableDesc.getCreateTableTask(x.getInputs(), x.getOutputs(), x.getConf());
}
@@ -565,20 +569,21 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
}
private static Task<? extends Serializable> alterTableTask(ImportTableDesc tableDesc,
- EximUtil.SemanticAnalyzerWrapperContext x, ReplicationSpec replicationSpec) {
+ EximUtil.SemanticAnalyzerWrapperContext x,
+ ReplicationSpec replicationSpec) {
tableDesc.setReplaceMode(true);
- if ((replicationSpec != null) && (replicationSpec.isInReplicationScope())){
+ if ((replicationSpec != null) && (replicationSpec.isInReplicationScope())) {
tableDesc.setReplicationSpec(replicationSpec);
}
return tableDesc.getCreateTableTask(x.getInputs(), x.getOutputs(), x.getConf());
}
private static Task<? extends Serializable> alterSinglePartition(
- ImportTableDesc tblDesc, Table table, Warehouse wh, AddPartitionDesc addPartitionDesc,
- ReplicationSpec replicationSpec, org.apache.hadoop.hive.ql.metadata.Partition ptn,
- EximUtil.SemanticAnalyzerWrapperContext x) throws MetaException, IOException, HiveException {
+ ImportTableDesc tblDesc, Table table, Warehouse wh, AddPartitionDesc addPartitionDesc,
+ ReplicationSpec replicationSpec, org.apache.hadoop.hive.ql.metadata.Partition ptn,
+ EximUtil.SemanticAnalyzerWrapperContext x) throws MetaException, IOException, HiveException {
addPartitionDesc.setReplaceMode(true);
- if ((replicationSpec != null) && (replicationSpec.isInReplicationScope())){
+ if ((replicationSpec != null) && (replicationSpec.isInReplicationScope())) {
addPartitionDesc.setReplicationSpec(replicationSpec);
}
AddPartitionDesc.OnePartitionDesc partSpec = addPartitionDesc.getPartition(0);
@@ -591,17 +596,19 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
}
private static Task<?> addSinglePartition(ImportTableDesc tblDesc,
- Table table, Warehouse wh, AddPartitionDesc addPartitionDesc, ReplicationSpec replicationSpec,
- EximUtil.SemanticAnalyzerWrapperContext x, Long writeId, int stmtId)
- throws MetaException, IOException, HiveException {
+ Table table, Warehouse wh, AddPartitionDesc addPartitionDesc,
+ ReplicationSpec replicationSpec,
+ EximUtil.SemanticAnalyzerWrapperContext x, Long writeId, int stmtId)
+ throws MetaException, IOException, HiveException {
AddPartitionDesc.OnePartitionDesc partSpec = addPartitionDesc.getPartition(0);
boolean isAutoPurge = false;
boolean needRecycle = false;
boolean copyToMigratedTxnTable = false;
- if (tblDesc.isExternal() && tblDesc.getLocation() == null) {
- x.getLOG().debug("Importing in-place: adding AddPart for partition "
- + partSpecToString(partSpec.getPartSpec()));
+ if (shouldSkipDataCopyInReplScope(tblDesc, replicationSpec)
+ || (tblDesc.isExternal() && tblDesc.getLocation() == null)) {
+ x.getLOG().debug("Adding AddPart and skipped data copy for partition "
+ + partSpecToString(partSpec.getPartSpec()));
// addPartitionDesc already has the right partition location
@SuppressWarnings("unchecked")
Task<?> addPartTask = TaskFactory.get(
@@ -611,8 +618,8 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
String srcLocation = partSpec.getLocation();
fixLocationInPartSpec(tblDesc, table, wh, replicationSpec, partSpec, x);
x.getLOG().debug("adding dependent CopyWork/AddPart/MoveWork for partition "
- + partSpecToString(partSpec.getPartSpec())
- + " with source location: " + srcLocation);
+ + partSpecToString(partSpec.getPartSpec())
+ + " with source location: " + srcLocation);
Path tgtLocation = new Path(partSpec.getLocation());
LoadFileType loadFileType;
@@ -620,7 +627,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
if (replicationSpec.isInReplicationScope() &&
x.getCtx().getConf().getBoolean(REPL_ENABLE_MOVE_OPTIMIZATION.varname, false)) {
loadFileType = LoadFileType.IGNORE;
- destPath = tgtLocation;
+ destPath = tgtLocation;
isAutoPurge = "true".equalsIgnoreCase(table.getProperty("auto.purge"));
if (table.isTemporary()) {
needRecycle = false;
@@ -636,19 +643,19 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
//Replication scope the write id will be invalid
boolean useStagingDirectory = !AcidUtils.isTransactionalTable(table.getParameters()) ||
replicationSpec.isInReplicationScope();
- destPath = useStagingDirectory ? x.getCtx().getExternalTmpPath(tgtLocation)
+ destPath = useStagingDirectory ? x.getCtx().getExternalTmpPath(tgtLocation)
: new Path(tgtLocation, AcidUtils.deltaSubdir(writeId, writeId, stmtId));
}
- Path moveTaskSrc = !AcidUtils.isTransactionalTable(table.getParameters()) ||
+ Path moveTaskSrc = !AcidUtils.isTransactionalTable(table.getParameters()) ||
replicationSpec.isInReplicationScope() ? destPath : tgtLocation;
if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
Utilities.FILE_OP_LOGGER.trace("adding import work for partition with source location: "
- + srcLocation + "; target: " + tgtLocation + "; copy dest " + destPath + "; mm "
- + writeId + " for " + partSpecToString(partSpec.getPartSpec()) + ": " +
- (AcidUtils.isFullAcidTable(table) ? "acid" :
- (AcidUtils.isInsertOnlyTable(table) ? "mm" : "flat")
- )
+ + srcLocation + "; target: " + tgtLocation + "; copy dest " + destPath + "; mm "
+ + writeId + " for " + partSpecToString(partSpec.getPartSpec()) + ": " +
+ (AcidUtils.isFullAcidTable(table) ? "acid" :
+ (AcidUtils.isInsertOnlyTable(table) ? "mm" : "flat")
+ )
);
}
@@ -711,6 +718,19 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
}
/**
+ * In REPL LOAD flow, the data copy is done separately for external tables using data locations
+ * dumped in file {@link ReplExternalTables#FILE_NAME}. So, we can skip copying it here.
+ * In case of migrating from managed to external table, the data path won't be listed in this
+ * file and so need to copy data while applying the event.
+ */
+ private static boolean shouldSkipDataCopyInReplScope(ImportTableDesc tblDesc, ReplicationSpec replicationSpec) {
+ return ((replicationSpec != null)
+ && replicationSpec.isInReplicationScope()
+ && tblDesc.isExternal()
+ && !replicationSpec.isMigratingToExternalTable());
+ }
+
+ /**
* Helper method to set location properly in partSpec
*/
private static void fixLocationInPartSpec(ImportTableDesc tblDesc, Table table,
@@ -1210,7 +1230,8 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
addPartitionDesc.getPartition(0).getPartSpec());
}
}
- } else if (!replicationSpec.isMetadataOnly()) {
+ } else if (!replicationSpec.isMetadataOnly()
+ && !shouldSkipDataCopyInReplScope(tblDesc, replicationSpec)) {
x.getLOG().debug("adding dependent CopyWork/MoveWork for table");
t.addDependentTask(loadTable(fromURI, table, replicationSpec.isReplace(),
new Path(tblDesc.getLocation()), replicationSpec, x, writeId, stmtId));