You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ku...@apache.org on 2020/02/21 08:23:49 UTC
[hive] branch master updated: HIVE-21164: ACID: explore how we can
avoid a move step during inserts/compaction (Marta Kuczora,
reviewed by Peter Vary)
This is an automated email from the ASF dual-hosted git repository.
kuczoram pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new ffee30e HIVE-21164: ACID: explore how we can avoid a move step during inserts/compaction (Marta Kuczora, reviewed by Peter Vary)
ffee30e is described below
commit ffee30e6267e85f00a22767262192abb9681cfb7
Author: Marta Kuczora <ku...@cloudera.com>
AuthorDate: Fri Feb 21 09:22:06 2020 +0100
HIVE-21164: ACID: explore how we can avoid a move step during inserts/compaction (Marta Kuczora, reviewed by Peter Vary)
---
.../java/org/apache/hadoop/hive/conf/HiveConf.java | 3 +
.../hive/hcatalog/streaming/TestStreaming.java | 2 +-
.../org/apache/hadoop/hive/ql/TestAcidOnTez.java | 20 +-
.../hadoop/hive/ql/history/TestHiveHistory.java | 2 +-
.../hive/ql/txn/compactor/CompactorTestUtil.java | 3 +-
.../test/resources/testconfiguration.properties | 3 +
.../hive/ql/exec/AbstractFileMergeOperator.java | 6 +-
.../hadoop/hive/ql/exec/FileSinkOperator.java | 126 +-
.../org/apache/hadoop/hive/ql/exec/MoveTask.java | 8 +-
.../org/apache/hadoop/hive/ql/exec/Utilities.java | 84 +-
.../apache/hadoop/hive/ql/io/AcidInputFormat.java | 4 +-
.../apache/hadoop/hive/ql/io/AcidOutputFormat.java | 10 +
.../org/apache/hadoop/hive/ql/io/AcidUtils.java | 52 +-
.../hadoop/hive/ql/io/HiveFileFormatUtils.java | 14 +-
.../apache/hadoop/hive/ql/io/RecordUpdater.java | 6 +
.../hadoop/hive/ql/io/orc/OrcInputFormat.java | 5 +-
.../hadoop/hive/ql/io/orc/OrcOutputFormat.java | 5 +
.../hadoop/hive/ql/io/orc/OrcRawRecordMerger.java | 31 +-
.../hadoop/hive/ql/io/orc/OrcRecordUpdater.java | 14 +-
.../ql/io/orc/VectorizedOrcAcidRowBatchReader.java | 4 +-
.../org/apache/hadoop/hive/ql/metadata/Hive.java | 42 +-
.../hadoop/hive/ql/optimizer/GenMapRedUtils.java | 24 +-
.../hadoop/hive/ql/parse/SemanticAnalyzer.java | 91 +-
.../hadoop/hive/ql/parse/spark/GenSparkUtils.java | 2 +-
.../apache/hadoop/hive/ql/plan/FileSinkDesc.java | 28 +-
.../apache/hadoop/hive/ql/plan/LoadTableDesc.java | 9 +
.../hadoop/hive/ql/txn/compactor/CompactorMR.java | 44 +-
.../apache/hadoop/hive/ql/util/UpgradeTool.java | 1 -
.../apache/hadoop/hive/ql/TestTxnAddPartition.java | 2 +-
.../org/apache/hadoop/hive/ql/TestTxnCommands.java | 2 +-
.../apache/hadoop/hive/ql/TestTxnCommands2.java | 4 +-
.../apache/hadoop/hive/ql/TestTxnCommands3.java | 14 +-
.../apache/hadoop/hive/ql/TestTxnConcatenate.java | 14 +-
.../org/apache/hadoop/hive/ql/TestTxnExIm.java | 2 +-
.../org/apache/hadoop/hive/ql/TestTxnLoadData.java | 20 +-
.../apache/hadoop/hive/ql/TestTxnNoBuckets.java | 154 +-
.../hadoop/hive/ql/TxnCommandsBaseForTests.java | 17 +-
.../apache/hadoop/hive/ql/exec/TestExecDriver.java | 2 +-
.../hadoop/hive/ql/exec/TestFileSinkOperator.java | 10 +-
.../hadoop/hive/ql/lockmgr/TestDbTxnManager2.java | 1 +
.../hive/ql/txn/compactor/CompactorTest.java | 2 +-
.../hadoop/hive/ql/txn/compactor/TestWorker.java | 4 +-
.../tez_acid_union_dynamic_partition.q | 20 +
.../tez_acid_union_dynamic_partition_2.q | 25 +
.../clientpositive/tez_acid_union_multiinsert.q | 94 +
.../results/clientpositive/acid_subquery.q.out | 9 +
.../create_transactional_full_acid.q.out | 9 +
.../encryption_insert_partition_dynamic.q.out | 4 +
.../clientpositive/llap/acid_no_buckets.q.out | 56 +-
.../clientpositive/llap/insert_overwrite.q.out | 10 +
.../test/results/clientpositive/llap/mm_all.q.out | 15 +
.../llap/tez_acid_union_dynamic_partition.q.out | 63 +
.../llap/tez_acid_union_dynamic_partition_2.q.out | 85 +
.../llap/tez_acid_union_multiinsert.q.out | 3481 ++++++++++++++++++++
ql/src/test/results/clientpositive/mm_all.q.out | 15 +
.../org/apache/hive/streaming/TestStreaming.java | 6 +-
56 files changed, 4479 insertions(+), 304 deletions(-)
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index c872e69..d0a552a 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -3996,6 +3996,9 @@ public class HiveConf extends Configuration {
HIVE_CREATE_TABLES_AS_INSERT_ONLY("hive.create.as.insert.only", false,
"Whether the eligible tables should be created as ACID insert-only by default. Does \n" +
"not apply to external tables, the ones using storage handlers, etc."),
+ HIVE_ACID_DIRECT_INSERT_ENABLED("hive.acid.direct.insert.enabled", true,
+ "Enable writing the data files directly to the table's final destination instead of the staging directory."
+ + "This optimization only applies on INSERT operations on ACID tables."),
// role names are case-insensitive
USERS_IN_ADMIN_ROLE("hive.users.in.admin.role", "", false,
"Comma separated list of users who are in admin role for bootstrapping.\n" +
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
index da677c7..569de70 100644
--- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
@@ -392,7 +392,7 @@ public class TestStreaming {
rs = queryTable(driver,"select ROW__ID, a, b, INPUT__FILE__NAME from default.streamingnobuckets order by ROW__ID");
Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\tfoo\tbar"));
- Assert.assertTrue(rs.get(0), rs.get(0).endsWith("streamingnobuckets/delta_0000001_0000001_0000/bucket_00000"));
+ Assert.assertTrue(rs.get(0), rs.get(0).endsWith("streamingnobuckets/delta_0000001_0000001_0000/bucket_00000_0"));
Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\ta1\tb2"));
Assert.assertTrue(rs.get(1), rs.get(1).endsWith("streamingnobuckets/delta_0000002_0000003/bucket_00000"));
Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4"));
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java
index 056cd27..01b7a36 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java
@@ -681,11 +681,11 @@ ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree ~/dev/hiverwgit/itests/h
}
String[][] expected2 = {
- {"{\"writeid\":1,\"bucketid\":536870913,\"rowid\":0}\t1\t2", "warehouse/t/delta_0000001_0000001_0001/bucket_00000"},
- {"{\"writeid\":1,\"bucketid\":536870913,\"rowid\":1}\t3\t4", "warehouse/t/delta_0000001_0000001_0001/bucket_00000"},
- {"{\"writeid\":1,\"bucketid\":536870914,\"rowid\":0}\t5\t6", "warehouse/t/delta_0000001_0000001_0002/bucket_00000"},
- {"{\"writeid\":1,\"bucketid\":536870914,\"rowid\":1}\t7\t8", "warehouse/t/delta_0000001_0000001_0002/bucket_00000"},
- {"{\"writeid\":1,\"bucketid\":536870915,\"rowid\":0}\t9\t10", "warehouse/t/delta_0000001_0000001_0003/bucket_00000"}
+ {"{\"writeid\":1,\"bucketid\":536870913,\"rowid\":0}\t1\t2", "warehouse/t/delta_0000001_0000001_0001/bucket_00000_0"},
+ {"{\"writeid\":1,\"bucketid\":536870913,\"rowid\":1}\t3\t4", "warehouse/t/delta_0000001_0000001_0001/bucket_00000_0"},
+ {"{\"writeid\":1,\"bucketid\":536870914,\"rowid\":0}\t5\t6", "warehouse/t/delta_0000001_0000001_0002/bucket_00000_0"},
+ {"{\"writeid\":1,\"bucketid\":536870914,\"rowid\":1}\t7\t8", "warehouse/t/delta_0000001_0000001_0002/bucket_00000_0"},
+ {"{\"writeid\":1,\"bucketid\":536870915,\"rowid\":0}\t9\t10", "warehouse/t/delta_0000001_0000001_0003/bucket_00000_0"}
};
Assert.assertEquals("Unexpected row count", expected2.length, rs.size());
for(int i = 0; i < expected2.length; i++) {
@@ -727,11 +727,11 @@ ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree ~/dev/hiverwgit/itests/h
LOG.warn(s);
}
String[][] expected2 = {
- {"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t1\t2", "warehouse/t/delta_0000001_0000001_0000/bucket_00001"},
- {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t2\t4", "warehouse/t/delta_0000001_0000001_0000/bucket_00000"},
- {"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t5\t6", "warehouse/t/delta_0000001_0000001_0000/bucket_00001"},
- {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t6\t8", "warehouse/t/delta_0000001_0000001_0000/bucket_00000"},
- {"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":2}\t9\t10", "warehouse/t/delta_0000001_0000001_0000/bucket_00001"}
+ {"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t1\t2", "warehouse/t/delta_0000001_0000001_0000/bucket_00001_0"},
+ {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t2\t4", "warehouse/t/delta_0000001_0000001_0000/bucket_00000_0"},
+ {"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t5\t6", "warehouse/t/delta_0000001_0000001_0000/bucket_00001_0"},
+ {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t6\t8", "warehouse/t/delta_0000001_0000001_0000/bucket_00000_0"},
+ {"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":2}\t9\t10", "warehouse/t/delta_0000001_0000001_0000/bucket_00001_0"}
};
Assert.assertEquals("Unexpected row count", expected2.length, rs.size());
for(int i = 0; i < expected2.length; i++) {
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java
index 31d15fd..127de23 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java
@@ -112,7 +112,7 @@ public class TestHiveHistory {
db.createTable(src, cols, null, TextInputFormat.class,
IgnoreKeyTextOutputFormat.class);
db.loadTable(hadoopDataFile[i], src,
- LoadFileType.KEEP_EXISTING, false, false, false, false, null, 0, false);
+ LoadFileType.KEEP_EXISTING, false, false, false, false, null, 0, false, false);
i++;
}
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorTestUtil.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorTestUtil.java
index c2aa73b..e70d878 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorTestUtil.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorTestUtil.java
@@ -50,6 +50,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -346,7 +347,7 @@ class CompactorTestUtil {
conf.setBoolean("orc.schema.evolution.case.sensitive", false);
HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN, true);
AcidInputFormat.RawReader<OrcStruct> reader =
- aif.getRawReader(conf, true, bucket, writeIdList, base, deltas);
+ aif.getRawReader(conf, true, bucket, writeIdList, base, deltas, new HashMap<String, String>());
RecordIdentifier identifier = reader.createKey();
OrcStruct value = reader.createValue();
long currentTxn = min;
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index 94467a4..42bc5df 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -435,6 +435,8 @@ minillap.query.files=acid_bucket_pruning.q,\
results_cache_diff_fs.q,\
tez_union_dynamic_partition.q,\
tez_union_dynamic_partition_2.q,\
+ tez_acid_union_dynamic_partition.q,\
+ tez_acid_union_dynamic_partition_2.q,\
unionDistinct_1.q,\
whroot_external1.q,\
load_fs2.q,\
@@ -839,6 +841,7 @@ minillaplocal.query.files=\
tez_union_decimal.q,\
tez_union_group_by.q,\
tez_union_multiinsert.q,\
+ tez_acid_union_multiinsert.q,\
tez_vector_dynpart_hashjoin_1.q,\
tez_vector_dynpart_hashjoin_2.q,\
timestamp_4.q,\
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java
index 9a32581..d68d8f9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java
@@ -257,7 +257,7 @@ public abstract class AbstractFileMergeOperator<T extends FileMergeDesc>
assert finalPath.equals(outPath);
// There's always just one file that we have merged.
// The union/DP/etc. should already be account for in the path.
- Utilities.writeMmCommitManifest(Lists.newArrayList(outPath),
+ Utilities.writeCommitManifest(Lists.newArrayList(outPath),
tmpPath.getParent(), fs, taskId, conf.getWriteId(), conf.getStmtId(), null, false);
LOG.info("Merged into " + finalPath + "(" + fss.getLen() + " bytes).");
}
@@ -337,8 +337,8 @@ public abstract class AbstractFileMergeOperator<T extends FileMergeDesc>
lbLevels = conf.getListBucketingDepth();
// We don't expect missing buckets from mere (actually there should be no buckets),
// so just pass null as bucketing context. Union suffix should also be accounted for.
- Utilities.handleMmTableFinalPath(outputDir.getParent(), null, hconf, success,
- dpLevels, lbLevels, null, mmWriteId, stmtId, reporter, isMmTable, false, false);
+ Utilities.handleDirectInsertTableFinalPath(outputDir.getParent(), null, hconf, success,
+ dpLevels, lbLevels, null, mmWriteId, stmtId, reporter, isMmTable, false, false, false);
}
} catch (IOException e) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index 9ad4e71..d5e1b5b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -161,6 +161,8 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
private final String subdirForTxn;
private Path taskOutputTempPathRoot;
Path[] outPaths;
+ // The bucket files we successfully wrote to in this writer
+ Path[] outPathsCommitted;
Path[] finalPaths;
RecordWriter[] outWriters;
RecordUpdater[] updaters;
@@ -168,19 +170,31 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
int acidLastBucket = -1;
int acidFileOffset = -1;
private boolean isMmTable;
+ private boolean isDirectInsert;
+ private boolean isInsertOverwrite;
String dpDirForCounters;
- public FSPaths(Path specPath, boolean isMmTable) {
+ public FSPaths(Path specPath, boolean isMmTable, boolean isDirectInsert, boolean isInsertOverwrite) {
this.isMmTable = isMmTable;
- if (!isMmTable) {
+ this.isDirectInsert = isDirectInsert;
+ this.isInsertOverwrite = isInsertOverwrite;
+ if (!isMmTable && !isDirectInsert) {
tmpPathRoot = Utilities.toTempPath(specPath);
taskOutputTempPathRoot = Utilities.toTaskTempPath(specPath);
subdirForTxn = null;
} else {
tmpPathRoot = specPath;
taskOutputTempPathRoot = null; // Should not be used.
- subdirForTxn = AcidUtils.baseOrDeltaSubdir(conf.getInsertOverwrite(),
- conf.getTableWriteId(), conf.getTableWriteId(), conf.getStatementId());
+ if (isMmTable) {
+ subdirForTxn = AcidUtils.baseOrDeltaSubdir(conf.getInsertOverwrite(),
+ conf.getTableWriteId(), conf.getTableWriteId(), conf.getStatementId());
+ } else {
+ /**
+ * For direct write to final path during ACID insert, we create the delta directories
+ * later when we create the RecordUpdater using AcidOutputFormat.Options
+ */
+ subdirForTxn = null;
+ }
}
if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
Utilities.FILE_OP_LOGGER.trace("new FSPaths for " + numFiles
@@ -188,6 +202,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
}
outPaths = new Path[numFiles];
+ outPathsCommitted = new Path[numFiles];
finalPaths = new Path[numFiles];
outWriters = new RecordWriter[numFiles];
updaters = new RecordUpdater[numFiles];
@@ -211,6 +226,11 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
try {
for (int i = 0; i < updaters.length; i++) {
if (updaters[i] != null) {
+ SerDeStats stats = updaters[i].getStats();
+ // Ignore 0 row files except in case of insert overwrite
+ if (isDirectInsert && (stats.getRowCount() > 0 || isInsertOverwrite)) {
+ outPathsCommitted[i] = updaters[i].getUpdatedFilePath();
+ }
updaters[i].close(abort);
}
}
@@ -243,13 +263,19 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
}
if(outPaths[idx] != null && fs.exists(outPaths[idx])) {
if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
- Utilities.FILE_OP_LOGGER.trace("committing " + outPaths[idx] + " to "
- + finalPaths[idx] + " (" + isMmTable + ")");
+ Utilities.FILE_OP_LOGGER.trace("committing " + outPaths[idx] + " to " + finalPaths[idx] + " (mm table ="
+ + isMmTable + ", direct insert = " + isDirectInsert + ")");
}
if (isMmTable) {
assert outPaths[idx].equals(finalPaths[idx]);
commitPaths.add(outPaths[idx]);
- } else if (!fs.rename(outPaths[idx], finalPaths[idx])) {
+ } else if (isDirectInsert && (outPathsCommitted[idx] != null)) {
+ if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
+ Utilities.FILE_OP_LOGGER
+ .trace("committing " + outPathsCommitted[idx] + " (direct insert = " + isDirectInsert + ")");
+ }
+ commitPaths.add(outPathsCommitted[idx]);
+ } else if (!isDirectInsert && !fs.rename(outPaths[idx], finalPaths[idx])) {
FileStatus fileStatus = FileUtils.getFileStatusOrNull(fs, finalPaths[idx]);
if (fileStatus != null) {
LOG.warn("Target path " + finalPaths[idx] + " with a size " + fileStatus.getLen() + " exists. Trying to delete it.");
@@ -264,15 +290,14 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
}
}
}
-
updateProgress();
}
- public void abortWriters(FileSystem fs, boolean abort, boolean delete) throws HiveException {
- //should this close updaters[]?
+ public void abortWritersAndUpdaters(FileSystem fs, boolean abort, boolean delete) throws HiveException {
for (int idx = 0; idx < outWriters.length; idx++) {
if (outWriters[idx] != null) {
try {
+ LOG.debug("Aborted: closing: " + outWriters[idx].toString());
outWriters[idx].close(abort);
if (delete) {
fs.delete(outPaths[idx], true);
@@ -283,6 +308,20 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
}
}
}
+ for (int idx = 0; idx < updaters.length; idx++) {
+ if (updaters[idx] != null) {
+ try {
+ LOG.debug("Aborted: closing: " + updaters[idx].toString());
+ updaters[idx].close(abort);
+ if (delete) {
+ fs.delete(outPaths[idx], true);
+ }
+ updateProgress();
+ } catch (IOException e) {
+ throw new HiveException(e);
+ }
+ }
+ }
}
public void initializeBucketPaths(int filesIdx, String taskId, boolean isNativeTable,
@@ -290,7 +329,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
if (isNativeTable) {
String extension = Utilities.getFileExtension(jc, isCompressed, hiveOutputFormat);
String taskWithExt = extension == null ? taskId : taskId + extension;
- if (!isMmTable) {
+ if (!isMmTable && !isDirectInsert) {
if (!bDynParts && !isSkewedStoredAsSubDirectories) {
finalPaths[filesIdx] = new Path(parent, taskWithExt);
} else {
@@ -308,7 +347,12 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
taskIdPath += extension;
}
- Path finalPath = new Path(buildTmpPath(), taskIdPath);
+ Path finalPath;
+ if (isDirectInsert) {
+ finalPath = buildTmpPath();
+ } else {
+ finalPath = new Path(buildTmpPath(), taskIdPath);
+ }
// In the cases that have multi-stage insert, e.g. a "hive.skewjoin.key"-based skew join,
// it can happen that we want multiple commits into the same directory from different
@@ -319,7 +363,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
// affects some less obscure scenario.
try {
FileSystem fpfs = finalPath.getFileSystem(hconf);
- if (fpfs.exists(finalPath)) {
+ if ((!isDirectInsert) && fpfs.exists(finalPath)) {
throw new RuntimeException(finalPath + " already exists");
}
} catch (IOException e) {
@@ -330,7 +374,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
}
if (LOG.isInfoEnabled()) {
LOG.info("Final Path: FS " + finalPaths[filesIdx]);
- if (LOG.isInfoEnabled() && !isMmTable) {
+ if (LOG.isInfoEnabled() && (!isMmTable && !isDirectInsert)) {
LOG.info("Writing to temp file: FS " + outPaths[filesIdx]);
}
}
@@ -468,7 +512,10 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
unionPath = null;
} else {
isUnionDp = (dpCtx != null);
- if (conf.isMmTable() || isUnionDp) {
+ if (conf.isDirectInsert()) {
+ specPath = conf.getParentDir();
+ unionPath = null;
+ } else if (conf.isMmTable() || isUnionDp) {
// MM tables need custom handling for union suffix; DP tables use parent too.
specPath = conf.getParentDir();
unionPath = conf.getDirName().getName();
@@ -526,7 +573,11 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
throw ex;
}
isCompressed = conf.getCompressed();
- parent = Utilities.toTempPath(conf.getDirName());
+ if (conf.isLinkedFileSink() && conf.isDirectInsert()) {
+ parent = Utilities.toTempPath(conf.getFinalDirName());
+ } else {
+ parent = Utilities.toTempPath(conf.getDirName());
+ }
statsFromRecordWriter = new boolean[numFiles];
serializer = (Serializer) conf.getTableInfo().getDeserializerClass().newInstance();
serializer.initialize(unsetNestedColumnPaths(hconf), conf.getTableInfo().getProperties());
@@ -565,7 +616,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
}
if (!bDynParts) {
- fsp = new FSPaths(specPath, conf.isMmTable());
+ fsp = new FSPaths(specPath, conf.isMmTable(), conf.isDirectInsert(), conf.getInsertOverwrite());
fsp.subdirAfterTxn = combinePathFragments(generateListBucketingDirName(null), unionPath);
if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
Utilities.FILE_OP_LOGGER.trace("creating new paths " + System.identityHashCode(fsp)
@@ -740,7 +791,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
assert filesIdx == numFiles;
// in recent hadoop versions, use deleteOnExit to clean tmp files.
- if (isNativeTable() && fs != null && fsp != null && !conf.isMmTable()) {
+ if (isNativeTable() && fs != null && fsp != null && !conf.isMmTable() && !conf.isDirectInsert()) {
autoDelete = fs.deleteOnExit(fsp.outPaths[0]);
}
} catch (Exception e) {
@@ -764,7 +815,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
LOG.info("New Final Path: FS " + fsp.finalPaths[filesIdx]);
}
- if (isNativeTable() && !conf.isMmTable()) {
+ if (isNativeTable() && !conf.isMmTable() && !conf.isDirectInsert()) {
// in recent hadoop versions, use deleteOnExit to clean tmp files.
autoDelete = fs.deleteOnExit(fsp.outPaths[filesIdx]);
}
@@ -790,12 +841,21 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
StatsProvidingRecordWriter;
// increment the CREATED_FILES counter
} else if (conf.getWriteType() == AcidUtils.Operation.INSERT) {
+ Path outPath = fsp.outPaths[filesIdx];
+ if (conf.isDirectInsert()
+ && !FileUtils.mkdir(fs, outPath.getParent(), hconf)) {
+ LOG.warn("Unable to create directory: " + outPath);
+ }
// Only set up the updater for insert. For update and delete we don't know unitl we see
// the row.
ObjectInspector inspector = bDynParts ? subSetOI : outputObjInspector;
int acidBucketNum = Integer.parseInt(Utilities.getTaskIdFromFilename(taskId));
+ String attemptId = null;
+ if (conf.isDirectInsert()) {
+ attemptId = taskId.split("_")[1];
+ }
fsp.updaters[filesIdx] = HiveFileFormatUtils.getAcidRecordUpdater(jc, conf.getTableInfo(),
- acidBucketNum, conf, fsp.outPaths[filesIdx], inspector, reporter, -1);
+ acidBucketNum, conf, fsp.outPaths[filesIdx], inspector, reporter, -1, attemptId); // outPath.getParent()
}
if (reporter != null) {
@@ -824,9 +884,9 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
// <table-dir>/<staging-dir>/<partition-dir>/<union-dir>
// for non-MM tables, the final destination partition directory is created during move task via rename
- // for MM tables, the final destination partition directory is created by the tasks themselves
+ // for MM tables and ACID insert, the final destination partition directory is created by the tasks themselves
try {
- if (conf.isMmTable()) {
+ if (conf.isMmTable() || conf.isDirectInsert()) {
createDpDir(destPartPath);
} else {
// outPath will be
@@ -1086,7 +1146,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
* @throws HiveException
*/
private FSPaths createNewPaths(String dpDir, String lbDir) throws HiveException {
- FSPaths fsp2 = new FSPaths(specPath, conf.isMmTable());
+ FSPaths fsp2 = new FSPaths(specPath, conf.isMmTable(), conf.isDirectInsert(), conf.getInsertOverwrite());
fsp2.subdirAfterTxn = combinePathFragments(lbDir, unionPath);
fsp2.subdirBeforeTxn = dpDir;
String pathKey = combinePathFragments(dpDir, lbDir);
@@ -1337,9 +1397,9 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
SparkMetricUtils.updateSparkBytesWrittenMetrics(LOG, fs, fsp.finalPaths);
}
}
- if (conf.isMmTable()) {
- Utilities.writeMmCommitManifest(commitPaths, specPath, fs, originalTaskId,
- conf.getTableWriteId(), conf.getStatementId(), unionPath, conf.getInsertOverwrite());
+ if (conf.isMmTable() || conf.isDirectInsert()) {
+ Utilities.writeCommitManifest(commitPaths, specPath, fs, originalTaskId, conf.getTableWriteId(), conf
+ .getStatementId(), unionPath, conf.getInsertOverwrite());
}
// Only publish stats if this operator's flag was set to gather stats
if (conf.isGatherStats()) {
@@ -1350,7 +1410,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
// Hadoop always call close() even if an Exception was thrown in map() or
// reduce().
for (FSPaths fsp : valToPaths.values()) {
- fsp.abortWriters(fs, abort, !autoDelete && isNativeTable() && !conf.isMmTable());
+ fsp.abortWritersAndUpdaters(fs, abort, !autoDelete && isNativeTable() && !conf.isMmTable());
}
}
fsp = prevFsp = null;
@@ -1383,10 +1443,14 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
specPath = conf.getParentDir();
unionSuffix = conf.getDirName().getName();
}
+ if (conf.isLinkedFileSink() && conf.isDirectInsert()) {
+ specPath = conf.getParentDir();
+ unionSuffix = null;
+ }
if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
Utilities.FILE_OP_LOGGER.trace("jobCloseOp using specPath " + specPath);
}
- if (!conf.isMmTable()) {
+ if (!conf.isMmTable() && !conf.isDirectInsert()) {
Utilities.mvFileToFinalPath(specPath, hconf, success, LOG, dpCtx, conf, reporter);
} else {
int dpLevels = dpCtx == null ? 0 : dpCtx.getNumDPCols(),
@@ -1396,9 +1460,9 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
: (dpCtx != null ? dpCtx.getNumBuckets() : 0);
MissingBucketsContext mbc = new MissingBucketsContext(
conf.getTableInfo(), numBuckets, conf.getCompressed());
- Utilities.handleMmTableFinalPath(specPath, unionSuffix, hconf, success,
- dpLevels, lbLevels, mbc, conf.getTableWriteId(), conf.getStatementId(), reporter,
- conf.isMmTable(), conf.isMmCtas(), conf.getInsertOverwrite());
+ Utilities.handleDirectInsertTableFinalPath(specPath, unionSuffix, hconf, success, dpLevels, lbLevels, mbc,
+ conf.getTableWriteId(), conf.getStatementId(), reporter, conf.isMmTable(), conf.isMmCtas(), conf
+ .getInsertOverwrite(), conf.isDirectInsert());
}
}
} catch (IOException e) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
index 06e4ebe..51de87f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
@@ -419,7 +419,7 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
db.loadTable(tbd.getSourcePath(), tbd.getTable().getTableName(), tbd.getLoadFileType(),
work.isSrcLocal(), isSkewedStoredAsDirs(tbd), isFullAcidOp,
resetStatisticsProps(table), tbd.getWriteId(), tbd.getStmtId(),
- tbd.isInsertOverwrite());
+ tbd.isInsertOverwrite(), tbd.isDirectInsert());
if (work.getOutputs() != null) {
DDLUtils.addIfAbsentByName(new WriteEntity(table,
getWriteType(tbd, work.getLoadTableWork().getWriteType())), work.getOutputs());
@@ -521,7 +521,7 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID &&
!tbd.isMmTable(),
resetStatisticsProps(table), tbd.getWriteId(), tbd.getStmtId(),
- tbd.isInsertOverwrite());
+ tbd.isInsertOverwrite(), tbd.isDirectInsert());
Partition partn = db.getPartition(table, tbd.getPartitionSpec(), false);
// See the comment inside updatePartitionBucketSortColumns.
@@ -568,7 +568,9 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
tbd.getStmtId(),
resetStatisticsProps(table),
work.getLoadTableWork().getWriteType(),
- tbd.isInsertOverwrite());
+ tbd.isInsertOverwrite(),
+ tbd.isDirectInsert()
+ );
// publish DP columns to its subscribers
if (dps != null && dps.size() > 0) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index 6c67bc7..e9966e6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -1963,9 +1963,9 @@ public final class Utilities {
// This can happen in ACID cases when we have splits on delta files, where the filenames
// are of the form delta_x_y/bucket_a.
if (bucketName.startsWith(AcidUtils.BUCKET_PREFIX)) {
- m = AcidUtils.BUCKET_DIGIT_PATTERN.matcher(bucketName);
+ m = AcidUtils.BUCKET_PATTERN.matcher(bucketName);
if (m.find()) {
- return Integer.parseInt(m.group());
+ return Integer.parseInt(m.group(1));
}
// Note that legacy bucket digit pattern are being ignored here.
}
@@ -3694,8 +3694,9 @@ public final class Utilities {
if (op instanceof FileSinkOperator) {
FileSinkDesc fdesc = ((FileSinkOperator) op).getConf();
- if (fdesc.isMmTable()) {
- continue; // No need to create for MM tables
+ if (fdesc.isMmTable() || fdesc.isDirectInsert()) {
+ // No need to create for MM tables, or ACID insert
+ continue;
}
Path tempDir = fdesc.getDirName();
if (tempDir != null) {
@@ -4108,7 +4109,7 @@ public final class Utilities {
}
}
- public static Path[] getMmDirectoryCandidates(FileSystem fs, Path path, int dpLevels,
+ public static Path[] getDirectInsertDirectoryCandidates(FileSystem fs, Path path, int dpLevels,
PathFilter filter, long writeId, int stmtId, Configuration conf,
Boolean isBaseDir) throws IOException {
int skipLevels = dpLevels;
@@ -4124,9 +4125,9 @@ public final class Utilities {
// /want/ to know isBaseDir because that is error prone; so, it ends up never being used.
if (stmtId < 0 || isBaseDir == null
|| (HiveConf.getBoolVar(conf, ConfVars.HIVE_MM_AVOID_GLOBSTATUS_ON_S3) && isS3(fs))) {
- return getMmDirectoryCandidatesRecursive(fs, path, skipLevels, filter);
+ return getDirectInsertDirectoryCandidatesRecursive(fs, path, skipLevels, filter);
}
- return getMmDirectoryCandidatesGlobStatus(fs, path, skipLevels, filter, writeId, stmtId, isBaseDir);
+ return getDirectInsertDirectoryCandidatesGlobStatus(fs, path, skipLevels, filter, writeId, stmtId, isBaseDir);
}
private static boolean isS3(FileSystem fs) {
@@ -4149,7 +4150,7 @@ public final class Utilities {
return paths;
}
- private static Path[] getMmDirectoryCandidatesRecursive(FileSystem fs,
+ private static Path[] getDirectInsertDirectoryCandidatesRecursive(FileSystem fs,
Path path, int skipLevels, PathFilter filter) throws IOException {
String lastRelDir = null;
HashSet<Path> results = new HashSet<Path>();
@@ -4202,7 +4203,7 @@ public final class Utilities {
return results.toArray(new Path[results.size()]);
}
- private static Path[] getMmDirectoryCandidatesGlobStatus(FileSystem fs, Path path, int skipLevels,
+ private static Path[] getDirectInsertDirectoryCandidatesGlobStatus(FileSystem fs, Path path, int skipLevels,
PathFilter filter, long writeId, int stmtId, boolean isBaseDir) throws IOException {
StringBuilder sb = new StringBuilder(path.toUri().getPath());
for (int i = 0; i < skipLevels; i++) {
@@ -4219,10 +4220,10 @@ public final class Utilities {
return statusToPath(fs.globStatus(pathPattern, filter));
}
- private static void tryDeleteAllMmFiles(FileSystem fs, Path specPath, Path manifestDir,
+ private static void tryDeleteAllDirectInsertFiles(FileSystem fs, Path specPath, Path manifestDir,
int dpLevels, int lbLevels, AcidUtils.IdPathFilter filter, long writeId, int stmtId,
Configuration conf) throws IOException {
- Path[] files = getMmDirectoryCandidates(
+ Path[] files = getDirectInsertDirectoryCandidates(
fs, specPath, dpLevels, filter, writeId, stmtId, conf, null);
if (files != null) {
for (Path path : files) {
@@ -4235,7 +4236,7 @@ public final class Utilities {
}
- public static void writeMmCommitManifest(List<Path> commitPaths, Path specPath, FileSystem fs,
+ public static void writeCommitManifest(List<Path> commitPaths, Path specPath, FileSystem fs,
String taskId, Long writeId, int stmtId, String unionSuffix, boolean isInsertOverwrite) throws HiveException {
if (commitPaths.isEmpty()) {
return;
@@ -4280,15 +4281,15 @@ public final class Utilities {
}
}
- public static void handleMmTableFinalPath(Path specPath, String unionSuffix, Configuration hconf,
+ public static void handleDirectInsertTableFinalPath(Path specPath, String unionSuffix, Configuration hconf,
boolean success, int dpLevels, int lbLevels, MissingBucketsContext mbc, long writeId, int stmtId,
- Reporter reporter, boolean isMmTable, boolean isMmCtas, boolean isInsertOverwrite)
- throws IOException, HiveException {
+ Reporter reporter, boolean isMmTable, boolean isMmCtas, boolean isInsertOverwrite, boolean isDirectInsert)
+ throws IOException, HiveException {
FileSystem fs = specPath.getFileSystem(hconf);
Path manifestDir = getManifestDir(specPath, writeId, stmtId, unionSuffix, isInsertOverwrite);
if (!success) {
AcidUtils.IdPathFilter filter = new AcidUtils.IdPathFilter(writeId, stmtId);
- tryDeleteAllMmFiles(fs, specPath, manifestDir, dpLevels, lbLevels,
+ tryDeleteAllDirectInsertFiles(fs, specPath, manifestDir, dpLevels, lbLevels,
filter, writeId, stmtId, hconf);
return;
}
@@ -4317,13 +4318,13 @@ public final class Utilities {
Utilities.FILE_OP_LOGGER.info("Creating directory with no output at {}", specPath);
FileUtils.mkdir(fs, specPath, hconf);
}
- Path[] files = getMmDirectoryCandidates(
+ Path[] files = getDirectInsertDirectoryCandidates(
fs, specPath, dpLevels, filter, writeId, stmtId, hconf, isInsertOverwrite);
- ArrayList<Path> mmDirectories = new ArrayList<>();
+ ArrayList<Path> directInsertDirectories = new ArrayList<>();
if (files != null) {
for (Path path : files) {
Utilities.FILE_OP_LOGGER.trace("Looking at path: {}", path);
- mmDirectories.add(path);
+ directInsertDirectories.add(path);
}
}
@@ -4356,15 +4357,15 @@ public final class Utilities {
}
}
- for (Path path : mmDirectories) {
- cleanMmDirectory(path, fs, unionSuffix, lbLevels, committed);
+ for (Path path : directInsertDirectories) {
+ cleanDirectInsertDirectory(path, fs, unionSuffix, lbLevels, committed);
}
if (!committed.isEmpty()) {
throw new HiveException("The following files were committed but not found: " + committed);
}
- if (mmDirectories.isEmpty()) {
+ if (directInsertDirectories.isEmpty()) {
return;
}
@@ -4373,19 +4374,22 @@ public final class Utilities {
if (lbLevels != 0) {
return;
}
- // Create fake file statuses to avoid querying the file system. removeTempOrDuplicateFiles
- // doesn't need tocheck anything except path and directory status for MM directories.
- FileStatus[] finalResults = new FileStatus[mmDirectories.size()];
- for (int i = 0; i < mmDirectories.size(); ++i) {
- finalResults[i] = new PathOnlyFileStatus(mmDirectories.get(i));
- }
- List<Path> emptyBuckets = Utilities.removeTempOrDuplicateFiles(fs, finalResults,
- unionSuffix, dpLevels, mbc == null ? 0 : mbc.numBuckets, hconf, writeId, stmtId,
+
+ if (!isDirectInsert) {
+ // Create fake file statuses to avoid querying the file system. removeTempOrDuplicateFiles
+ // doesn't need to check anything except path and directory status for MM directories.
+ FileStatus[] finalResults = new FileStatus[directInsertDirectories.size()];
+ for (int i = 0; i < directInsertDirectories.size(); ++i) {
+ finalResults[i] = new PathOnlyFileStatus(directInsertDirectories.get(i));
+ }
+ List<Path> emptyBuckets = Utilities.removeTempOrDuplicateFiles(fs, finalResults,
+ unionSuffix, dpLevels, mbc == null ? 0 : mbc.numBuckets, hconf, writeId, stmtId,
isMmTable, null, isInsertOverwrite);
- // create empty buckets if necessary
- if (!emptyBuckets.isEmpty()) {
- assert mbc != null;
- Utilities.createEmptyBuckets(hconf, emptyBuckets, mbc.isCompressed, mbc.tableInfo, reporter);
+ // create empty buckets if necessary
+ if (!emptyBuckets.isEmpty()) {
+ assert mbc != null;
+ Utilities.createEmptyBuckets(hconf, emptyBuckets, mbc.isCompressed, mbc.tableInfo, reporter);
+ }
}
}
@@ -4395,7 +4399,7 @@ public final class Utilities {
}
}
- private static void cleanMmDirectory(Path dir, FileSystem fs, String unionSuffix,
+ private static void cleanDirectInsertDirectory(Path dir, FileSystem fs, String unionSuffix,
int lbLevels, HashSet<Path> committed) throws IOException, HiveException {
for (FileStatus child : fs.listStatus(dir)) {
Path childPath = child.getPath();
@@ -4406,7 +4410,7 @@ public final class Utilities {
if (child.isDirectory()) {
Utilities.FILE_OP_LOGGER.trace(
"Recursion into LB directory {}; levels remaining ", childPath, lbLevels - 1);
- cleanMmDirectory(childPath, fs, unionSuffix, lbLevels - 1, committed);
+ cleanDirectInsertDirectory(childPath, fs, unionSuffix, lbLevels - 1, committed);
} else {
if (committed.contains(childPath)) {
throw new HiveException("LB FSOP has commited "
@@ -4421,7 +4425,9 @@ public final class Utilities {
if (committed.remove(childPath)) {
continue; // A good file.
}
- deleteUncommitedFile(childPath, fs);
+ if (!childPath.getName().equals(AcidUtils.OrcAcidVersion.ACID_FORMAT)) {
+ deleteUncommitedFile(childPath, fs);
+ }
} else if (!child.isDirectory()) {
if (committed.contains(childPath)) {
throw new HiveException("Union FSOP has commited "
@@ -4429,8 +4435,8 @@ public final class Utilities {
}
deleteUncommitedFile(childPath, fs);
} else if (childPath.getName().equals(unionSuffix)) {
- // Found the right union directory; treat it as "our" MM directory.
- cleanMmDirectory(childPath, fs, null, 0, committed);
+ // Found the right union directory; treat it as "our" directory.
+ cleanDirectInsertDirectory(childPath, fs, null, 0, committed);
} else {
String childName = childPath.getName();
if (!childName.startsWith(AbstractFileMergeOperator.UNION_SUDBIR_PREFIX)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
index bba3960..034d2d3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
@@ -34,6 +34,7 @@ import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
/**
* The interface required for input formats that what to support ACID
@@ -258,7 +259,8 @@ public interface AcidInputFormat<KEY extends WritableComparable, VALUE>
int bucket,
ValidWriteIdList validWriteIdList,
Path baseDirectory,
- Path[] deltaDirectory
+ Path[] deltaDirectory,
+ Map<String, String> deltasToAttemptId
) throws IOException;
/**
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java
index 1e8bb22..a36630a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java
@@ -51,6 +51,7 @@ public interface AcidOutputFormat<K extends WritableComparable, V> extends HiveO
private Reporter reporter;
private long minimumWriteId;
private long maximumWriteId;
+ private String attemptId;
/**
* actual bucketId (as opposed to bucket property via BucketCodec)
*/
@@ -240,6 +241,11 @@ public interface AcidOutputFormat<K extends WritableComparable, V> extends HiveO
return this;
}
+ public Options attemptId(String attemptId) {
+ this.attemptId = attemptId;
+ return this;
+ }
+
/**
* @since 1.3.0
* This can be set to -1 to make the system generate old style (delta_xxxx_yyyy) file names.
@@ -313,6 +319,10 @@ public interface AcidOutputFormat<K extends WritableComparable, V> extends HiveO
return bucketId;
}
+ public String getAttemptId() {
+ return attemptId;
+ }
+
public int getRecordIdColumn() {
return recIdCol;
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index 2f5ec52..5d57509 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -35,6 +35,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
+import java.util.regex.Matcher;
import java.util.regex.Pattern;
import com.google.common.base.Strings;
@@ -160,8 +161,8 @@ public class AcidUtils {
* This must be in sync with {@link #STATEMENT_DIGITS}
*/
public static final int MAX_STATEMENTS_PER_TXN = 10000;
- public static final Pattern BUCKET_DIGIT_PATTERN = Pattern.compile("[0-9]{5}$");
- public static final Pattern LEGACY_BUCKET_DIGIT_PATTERN = Pattern.compile("^[0-9]{6}");
+ public static final Pattern LEGACY_BUCKET_DIGIT_PATTERN = Pattern.compile("^[0-9]{6}");
+ public static final Pattern BUCKET_PATTERN = Pattern.compile("bucket_([0-9]+)(_[0-9]+)?$");
/**
* A write into a non-aicd table produces files like 0000_0 or 0000_0_copy_1
@@ -180,7 +181,6 @@ public class AcidUtils {
}
private static final Logger LOG = LoggerFactory.getLogger(AcidUtils.class);
- public static final Pattern BUCKET_PATTERN = Pattern.compile(BUCKET_PREFIX + "_[0-9]{5}$");
public static final Pattern ORIGINAL_PATTERN =
Pattern.compile("[0-9]+_[0-9]+");
/**
@@ -241,7 +241,11 @@ public class AcidUtils {
* @return the filename
*/
public static Path createBucketFile(Path subdir, int bucket) {
- return createBucketFile(subdir, bucket, true);
+ return createBucketFile(subdir, bucket, null, true);
+ }
+
+ public static Path createBucketFile(Path subdir, int bucket, String attemptId) {
+ return createBucketFile(subdir, bucket, attemptId, true);
}
/**
@@ -250,10 +254,13 @@ public class AcidUtils {
* @param bucket the bucket number
* @return the filename
*/
- private static Path createBucketFile(Path subdir, int bucket, boolean isAcidSchema) {
+ private static Path createBucketFile(Path subdir, int bucket, String attemptId, boolean isAcidSchema) {
if(isAcidSchema) {
- return new Path(subdir,
- BUCKET_PREFIX + String.format(BUCKET_DIGITS, bucket));
+ String fileName = BUCKET_PREFIX + String.format(BUCKET_DIGITS, bucket);
+ if (attemptId != null) {
+ fileName = fileName + "_" + attemptId;
+ }
+ return new Path(subdir, fileName);
}
else {
return new Path(subdir,
@@ -353,7 +360,7 @@ public class AcidUtils {
return new Path(directory, String.format(LEGACY_FILE_BUCKET_DIGITS,
options.getBucketId()) + "_0");
} else {
- return createBucketFile(baseOrDeltaSubdirPath(directory, options), options.getBucketId());
+ return createBucketFile(baseOrDeltaSubdirPath(directory, options), options.getBucketId(), options.getAttemptId());
}
}
@@ -421,11 +428,32 @@ public class AcidUtils {
if (ORIGINAL_PATTERN.matcher(filename).matches() || ORIGINAL_PATTERN_COPY.matcher(filename).matches()) {
return Integer.parseInt(filename.substring(0, filename.indexOf('_')));
} else if (filename.startsWith(BUCKET_PREFIX)) {
- return Integer.parseInt(filename.substring(filename.indexOf('_') + 1));
+ Matcher matcher = BUCKET_PATTERN.matcher(filename);
+ if (matcher.matches()) {
+ String bucketId = matcher.group(1);
+ filename = filename.substring(0,matcher.end(1));
+ if (Utilities.FILE_OP_LOGGER.isDebugEnabled()) {
+ Utilities.FILE_OP_LOGGER.debug("Parsing bucket ID = " + bucketId + " from file name '" + filename + "'");
+ }
+ return Integer.parseInt(bucketId);
+ }
}
return -1;
}
+ public static String parseAttemptId(Path bucketFile) {
+ String filename = bucketFile.getName();
+ Matcher matcher = BUCKET_PATTERN.matcher(filename);
+ String attemptId = null;
+ if (matcher.matches()) {
+ attemptId = matcher.group(2) != null ? matcher.group(2).substring(1) : null;
+ }
+ if (Utilities.FILE_OP_LOGGER.isDebugEnabled()) {
+ Utilities.FILE_OP_LOGGER.debug("Parsing attempt ID = " + attemptId + " from file name '" + bucketFile + "'");
+ }
+ return attemptId;
+ }
+
/**
* Read the first row of an ORC file and determine the bucket ID based on the bucket column. This only works with
* files with ACID schema.
@@ -460,6 +488,7 @@ public class AcidUtils {
AcidOutputFormat.Options result = new AcidOutputFormat.Options(conf);
String filename = bucketFile.getName();
int bucket = parseBucketId(bucketFile);
+ String attemptId = parseAttemptId(bucketFile);
if (ORIGINAL_PATTERN.matcher(filename).matches()) {
result
.setOldStyle(true)
@@ -494,7 +523,8 @@ public class AcidUtils {
.setOldStyle(false)
.minimumWriteId(parsedDelta.minWriteId)
.maximumWriteId(parsedDelta.maxWriteId)
- .bucket(bucket);
+ .bucket(bucket)
+ .attemptId(attemptId);
} else if (bucketFile.getParent().getName().startsWith(DELETE_DELTA_PREFIX)) {
ParsedDelta parsedDelta = parsedDelta(bucketFile.getParent(), DELETE_DELTA_PREFIX,
bucketFile.getFileSystem(conf), null);
@@ -2559,7 +2589,7 @@ public class AcidUtils {
*/
public static final class OrcAcidVersion {
private static final String ACID_VERSION_KEY = "hive.acid.version";
- private static final String ACID_FORMAT = "_orc_acid_version";
+ public static final String ACID_FORMAT = "_orc_acid_version";
private static final Charset UTF8 = Charset.forName("UTF-8");
public static final int ORC_ACID_VERSION_DEFAULT = 0;
/**
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
index 8980a62..22099e0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
@@ -306,10 +306,15 @@ public final class HiveFileFormatUtils {
return (HiveOutputFormat<?, ?>) outputFormat;
}
+ public static RecordUpdater getAcidRecordUpdater(JobConf jc, TableDesc tableInfo, int bucket, FileSinkDesc conf,
+ Path outPath, ObjectInspector inspector, Reporter reporter, int rowIdColNum) throws HiveException, IOException {
+ return getAcidRecordUpdater(jc, tableInfo, bucket, conf, outPath, inspector, reporter, rowIdColNum, null);
+ }
+
public static RecordUpdater getAcidRecordUpdater(JobConf jc, TableDesc tableInfo, int bucket,
FileSinkDesc conf, Path outPath,
ObjectInspector inspector,
- Reporter reporter, int rowIdColNum)
+ Reporter reporter, int rowIdColNum, String attemptId)
throws HiveException, IOException {
HiveOutputFormat<?, ?> hiveOutputFormat = getHiveOutputFormat(jc, tableInfo);
AcidOutputFormat<?, ?> acidOutputFormat = null;
@@ -323,10 +328,9 @@ public final class HiveFileFormatUtils {
// file the way getHiveRecordWriter does, as ORC appears to read the value for itself. Not
// sure if this is correct or not.
return getRecordUpdater(jc, acidOutputFormat,
- bucket, inspector, tableInfo.getProperties(), outPath, reporter, rowIdColNum, conf);
+ bucket, inspector, tableInfo.getProperties(), outPath, reporter, rowIdColNum, conf, attemptId);
}
-
private static RecordUpdater getRecordUpdater(JobConf jc,
AcidOutputFormat<?, ?> acidOutputFormat,
int bucket,
@@ -335,7 +339,8 @@ public final class HiveFileFormatUtils {
Path outPath,
Reporter reporter,
int rowIdColNum,
- FileSinkDesc conf) throws IOException {
+ FileSinkDesc conf,
+ String attemptId) throws IOException {
return acidOutputFormat.getRecordUpdater(outPath, new AcidOutputFormat.Options(jc)
.isCompressed(conf.getCompressed())
.tableProperties(tableProp)
@@ -348,6 +353,7 @@ public final class HiveFileFormatUtils {
.recordIdColumn(rowIdColNum)
.statementId(conf.getStatementId())
.finalDestination(conf.getDestPath())
+ .attemptId(attemptId)
.temporary(conf.isTemporary()));
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/RecordUpdater.java b/ql/src/java/org/apache/hadoop/hive/ql/io/RecordUpdater.java
index 737e677..bb257d1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/RecordUpdater.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/RecordUpdater.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.io;
import java.io.IOException;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.serde2.SerDeStats;
/**
@@ -79,4 +80,9 @@ public interface RecordUpdater {
* @return - buffered row count
*/
long getBufferedRowCount();
+
+ /**
+ * Returns the path of the file this updater wrote to
+ */
+ public Path getUpdatedFilePath();
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index a069032..03f7086 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -2459,7 +2459,8 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
int bucket,
ValidWriteIdList validWriteIdList,
Path baseDirectory,
- Path[] deltaDirectory
+ Path[] deltaDirectory,
+ Map<String, String> deltasToAttemptId
) throws IOException {
boolean isOriginal = false;
OrcRawRecordMerger.Options mergerOptions = new OrcRawRecordMerger.Options().isCompacting(true)
@@ -2481,7 +2482,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
mergerOptions.rootPath(deltaDirectory[0].getParent());
}
return new OrcRawRecordMerger(conf, collapseEvents, null, isOriginal,
- bucket, validWriteIdList, new Reader.Options(conf), deltaDirectory, mergerOptions);
+ bucket, validWriteIdList, new Reader.Options(conf), deltaDirectory, mergerOptions, deltasToAttemptId);
}
/**
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
index c4c56f8..202f78b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
@@ -268,6 +268,11 @@ public class OrcOutputFormat extends FileOutputFormat<NullWritable, OrcSerdeRow>
stringifyObject(buffer, obj, inspector);
return buffer.toString();
}
+
+ @Override
+ public Path getUpdatedFilePath() {
+ return null;
+ }
}
@Override
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
index b8a0f04..f543418 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
@@ -23,7 +23,6 @@ import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.exec.AbstractFileMergeOperator;
import org.apache.hadoop.hive.ql.exec.Utilities;
@@ -928,6 +927,20 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
}
}
}
+
+ OrcRawRecordMerger(Configuration conf,
+ boolean collapseEvents,
+ Reader reader,
+ boolean isOriginal,
+ int bucket,
+ ValidWriteIdList validWriteIdList,
+ Reader.Options options,
+ Path[] deltaDirectory,
+ Options mergerOptions) throws IOException {
+ this(conf, collapseEvents, reader, isOriginal, bucket, validWriteIdList, options, deltaDirectory, mergerOptions,
+ null);
+ }
+
/**
* Create a reader that merge sorts the ACID events together. This handles
* 1. 'normal' reads on behalf of a query (non vectorized)
@@ -952,7 +965,9 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
int bucket,
ValidWriteIdList validWriteIdList,
Reader.Options options,
- Path[] deltaDirectory, Options mergerOptions) throws IOException {
+ Path[] deltaDirectory,
+ Options mergerOptions,
+ Map<String, String> deltasToAttemptId) throws IOException {
this.collapse = collapseEvents;
this.offset = options.getOffset();
this.length = options.getLength();
@@ -1126,7 +1141,13 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
}
continue;
}
- for (Path deltaFile : getDeltaFiles(delta, bucket, mergerOptions)) {
+
+ String attemptId = null;
+ if (deltasToAttemptId != null) {
+ attemptId = deltasToAttemptId.get(delta.toString());
+ }
+
+ for (Path deltaFile : getDeltaFiles(delta, bucket, mergerOptions, attemptId)) {
FileSystem fs = deltaFile.getFileSystem(conf);
if(!fs.exists(deltaFile)) {
/**
@@ -1264,12 +1285,12 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
* This determines the set of {@link ReaderPairAcid} to create for a given delta/.
* For unbucketed tables {@code bucket} can be thought of as a write tranche.
*/
- static Path[] getDeltaFiles(Path deltaDirectory, int bucket, Options mergerOptions) {
+ static Path[] getDeltaFiles(Path deltaDirectory, int bucket, Options mergerOptions, String attemptId) {
assert (!mergerOptions.isCompacting &&
deltaDirectory.getName().startsWith(AcidUtils.DELETE_DELTA_PREFIX)
) || mergerOptions.isCompacting : "Unexpected delta: " + deltaDirectory +
"(isCompacting=" + mergerOptions.isCompacting() + ")";
- return new Path[] {AcidUtils.createBucketFile(deltaDirectory, bucket)};
+ return new Path[] {AcidUtils.createBucketFile(deltaDirectory, bucket, attemptId)};
}
@VisibleForTesting
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
index 398698e..2d6a771 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
@@ -621,7 +621,14 @@ public class OrcRecordUpdater implements RecordUpdater {
if (writer == null) {
writer = OrcFile.createWriter(path, writerOptions);
AcidUtils.OrcAcidVersion.setAcidVersionInDataFile(writer);
- AcidUtils.OrcAcidVersion.writeVersionFile(path.getParent(), fs);
+ try {
+ AcidUtils.OrcAcidVersion.writeVersionFile(path.getParent(), fs);
+ } catch (Exception e) {
+ e.printStackTrace();
+ // Ignore; might have been created by another concurrent writer, writing to a different bucket
+ // within this delta/base directory
+ LOG.trace(e.fillInStackTrace().toString());
+ }
}
}
@@ -806,4 +813,9 @@ public class OrcRecordUpdater implements RecordUpdater {
bucket.set(bucketProperty);
return currentBucketProperty;
}
+
+ @Override
+ public Path getUpdatedFilePath() {
+ return path;
+ }
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
index 0361872..598220b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
@@ -1134,7 +1134,7 @@ public class VectorizedOrcAcidRowBatchReader
assert !orcSplit.isOriginal() : "If this now supports Original splits, set up mergeOptions properly";
this.deleteRecords = new OrcRawRecordMerger(conf, true, null, false, bucket,
validWriteIdList, readerOptions, deleteDeltas,
- mergerOptions);
+ mergerOptions, null);
this.deleteRecordKey = new OrcRawRecordMerger.ReaderKey();
this.deleteRecordValue = this.deleteRecords.createValue();
// Initialize the first value in the delete reader.
@@ -1565,7 +1565,7 @@ public class VectorizedOrcAcidRowBatchReader
for (Path deleteDeltaDir : deleteDeltaDirs) {
FileSystem fs = deleteDeltaDir.getFileSystem(conf);
Path[] deleteDeltaFiles = OrcRawRecordMerger.getDeltaFiles(deleteDeltaDir, bucket,
- new OrcRawRecordMerger.Options().isCompacting(false));
+ new OrcRawRecordMerger.Options().isCompacting(false), null);
for (Path deleteDeltaFile : deleteDeltaFiles) {
// NOTE: Calling last flush length below is more for future-proofing when we have
// streaming deletes. But currently we don't support streaming deletes, and this can
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 1eb9c12..d40a67f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -2211,7 +2211,7 @@ public class Hive {
boolean isSkewedStoreAsSubdir,
boolean isSrcLocal, boolean isAcidIUDoperation,
boolean resetStatistics, Long writeId,
- int stmtId, boolean isInsertOverwrite) throws HiveException {
+ int stmtId, boolean isInsertOverwrite, boolean isDirectInsert) throws HiveException {
PerfLogger perfLogger = SessionState.getPerfLogger();
perfLogger.PerfLogBegin("MoveTask", PerfLogger.LOAD_PARTITION);
@@ -2228,7 +2228,7 @@ public class Hive {
Partition newTPart = loadPartitionInternal(loadPath, tbl, partSpec, oldPart,
loadFileType, inheritTableSpecs,
inheritLocation, isSkewedStoreAsSubdir, isSrcLocal, isAcidIUDoperation,
- resetStatistics, writeId, stmtId, isInsertOverwrite, isTxnTable, newFiles);
+ resetStatistics, writeId, stmtId, isInsertOverwrite, isTxnTable, newFiles, isDirectInsert);
AcidUtils.TableSnapshot tableSnapshot = isTxnTable ? getTableSnapshot(tbl, writeId) : null;
if (tableSnapshot != null) {
@@ -2299,7 +2299,7 @@ public class Hive {
boolean inheritLocation, boolean isSkewedStoreAsSubdir,
boolean isSrcLocal, boolean isAcidIUDoperation, boolean resetStatistics,
Long writeId, int stmtId, boolean isInsertOverwrite,
- boolean isTxnTable, List<Path> newFiles) throws HiveException {
+ boolean isTxnTable, List<Path> newFiles, boolean isDirectInsert) throws HiveException {
Path tblDataLocationPath = tbl.getDataLocation();
boolean isMmTableWrite = AcidUtils.isInsertOnlyTable(tbl.getParameters());
assert tbl.getPath() != null : "null==getPath() for " + tbl.getTableName();
@@ -2346,15 +2346,18 @@ public class Hive {
// to ACID updates. So the are not themselves ACID.
// Note: this assumes both paths are qualified; which they are, currently.
- if (((isMmTableWrite || isFullAcidTable) && loadPath.equals(newPartPath)) ||
+ if (((isMmTableWrite || isDirectInsert || isFullAcidTable) && loadPath.equals(newPartPath)) ||
(loadFileType == LoadFileType.IGNORE)) {
- // MM insert query, move itself is a no-op.
+ // MM insert query or direct insert; move itself is a no-op.
if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
- Utilities.FILE_OP_LOGGER.trace("not moving " + loadPath + " to " + newPartPath + " (MM)");
+ Utilities.FILE_OP_LOGGER.trace("not moving " + loadPath + " to " + newPartPath + " (MM = " + isMmTableWrite
+ + ", Direct insert = " + isDirectInsert + ")");
}
- assert !isAcidIUDoperation;
if (newFiles != null) {
- listFilesCreatedByQuery(loadPath, writeId, stmtId, isMmTableWrite ? isInsertOverwrite : false, newFiles);
+ if (!isMmTableWrite && !isDirectInsert) {
+ isInsertOverwrite = false;
+ }
+ listFilesCreatedByQuery(loadPath, writeId, stmtId, isInsertOverwrite, newFiles);
}
if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
Utilities.FILE_OP_LOGGER.trace("maybe deleting stuff from " + oldPartPath
@@ -2364,7 +2367,6 @@ public class Hive {
// Either a non-MM query, or a load into MM table from an external source.
Path destPath = newPartPath;
if (isMmTableWrite) {
- assert !isAcidIUDoperation;
// We will load into MM directory, and hide previous directories if needed.
destPath = new Path(destPath, isInsertOverwrite
? AcidUtils.baseDir(writeId) : AcidUtils.deltaSubdir(writeId, writeId, stmtId));
@@ -2783,11 +2785,11 @@ private void constructOneLBLocationMap(FileStatus fSta,
*/
private Set<Path> getValidPartitionsInPath(
int numDP, int numLB, Path loadPath, Long writeId, int stmtId,
- boolean isMmTable, boolean isInsertOverwrite) throws HiveException {
+ boolean isMmTable, boolean isInsertOverwrite, boolean isDirectInsert) throws HiveException {
Set<Path> validPartitions = new HashSet<Path>();
try {
FileSystem fs = loadPath.getFileSystem(conf);
- if (!isMmTable) {
+ if (!isMmTable || !isDirectInsert) {
List<FileStatus> leafStatus = HiveStatsUtils.getFileStatusRecurse(loadPath, numDP, fs);
// Check for empty partitions
for (FileStatus s : leafStatus) {
@@ -2805,7 +2807,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
// we have multiple statements anyway is union.
Utilities.FILE_OP_LOGGER.trace(
"Looking for dynamic partitions in {} ({} levels)", loadPath, numDP);
- Path[] leafStatus = Utilities.getMmDirectoryCandidates(
+ Path[] leafStatus = Utilities.getDirectInsertDirectoryCandidates(
fs, loadPath, numDP, null, writeId, -1, conf, isInsertOverwrite);
for (Path p : leafStatus) {
Path dpPath = p.getParent(); // Skip the MM directory that we have found.
@@ -2853,7 +2855,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
final String tableName, final Map<String, String> partSpec, final LoadFileType loadFileType,
final int numDP, final int numLB, final boolean isAcid, final long writeId, final int stmtId,
final boolean resetStatistics, final AcidUtils.Operation operation,
- boolean isInsertOverwrite) throws HiveException {
+ boolean isInsertOverwrite, boolean isDirectInsert) throws HiveException {
PerfLogger perfLogger = SessionState.getPerfLogger();
perfLogger.PerfLogBegin("MoveTask", PerfLogger.LOAD_DYNAMIC_PARTITIONS);
@@ -2861,7 +2863,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
// Get all valid partition paths and existing partitions for them (if any)
final Table tbl = getTable(tableName);
final Set<Path> validPartitions = getValidPartitionsInPath(numDP, numLB, loadPath, writeId, stmtId,
- AcidUtils.isInsertOnlyTable(tbl.getParameters()), isInsertOverwrite);
+ AcidUtils.isInsertOnlyTable(tbl.getParameters()), isInsertOverwrite, isDirectInsert);
final int partsToLoad = validPartitions.size();
final AtomicInteger partitionsLoaded = new AtomicInteger(0);
@@ -2929,7 +2931,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
// load the partition
Partition partition = loadPartitionInternal(entry.getKey(), tbl,
fullPartSpec, oldPartition, loadFileType, true, false, numLB > 0, false, isAcid,
- resetStatistics, writeId, stmtId, isInsertOverwrite, isTxnTable, newFiles);
+ resetStatistics, writeId, stmtId, isInsertOverwrite, isTxnTable, newFiles, isDirectInsert);
// if the partition already existed before the loading, no need to add it again to the
// metastore
@@ -3081,7 +3083,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
*/
public void loadTable(Path loadPath, String tableName, LoadFileType loadFileType, boolean isSrcLocal,
boolean isSkewedStoreAsSubdir, boolean isAcidIUDoperation, boolean resetStatistics,
- Long writeId, int stmtId, boolean isInsertOverwrite) throws HiveException {
+ Long writeId, int stmtId, boolean isInsertOverwrite, boolean isDirectInsert) throws HiveException {
PerfLogger perfLogger = SessionState.getPerfLogger();
perfLogger.PerfLogBegin("MoveTask", PerfLogger.LOAD_TABLE);
@@ -3098,7 +3100,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
}
// Note: this assumes both paths are qualified; which they are, currently.
- if (((isMmTable || isFullAcidTable) && loadPath.equals(tbl.getPath())) || (loadFileType == LoadFileType.IGNORE)) {
+ if (((isMmTable || isDirectInsert || isFullAcidTable) && loadPath.equals(tbl.getPath())) || (loadFileType == LoadFileType.IGNORE)) {
/**
* some operations on Transactional tables (e.g. Import) write directly to the final location
* and avoid the 'move' operation. Since MoveTask does other things, setting 'loadPath' to be
@@ -3111,14 +3113,16 @@ private void constructOneLBLocationMap(FileStatus fSta,
//new files list is required only for event notification.
if (newFiles != null) {
- listFilesCreatedByQuery(loadPath, writeId, stmtId, isMmTable ? isInsertOverwrite : false, newFiles);
+ if (!isMmTable && !isDirectInsert) {
+ isInsertOverwrite = false;
+ }
+ listFilesCreatedByQuery(loadPath, writeId, stmtId, isInsertOverwrite, newFiles);
}
} else {
// Either a non-MM query, or a load into MM table from an external source.
Path tblPath = tbl.getPath();
Path destPath = tblPath;
if (isMmTable) {
- assert !isAcidIUDoperation;
// We will load into MM directory, and hide previous directories if needed.
destPath = new Path(destPath, isInsertOverwrite
? AcidUtils.baseDir(writeId) : AcidUtils.deltaSubdir(writeId, writeId, stmtId));
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
index 73ca658..1ea3bd3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
@@ -1382,7 +1382,7 @@ public final class GenMapRedUtils {
Path fsopPath = srcMmWriteId != null ? fsInputDesc.getFinalDirName() : finalName;
Task<MoveWork> mvTask = GenMapRedUtils.findMoveTaskForFsopOutput(
- mvTasks, fsopPath, fsInputDesc.isMmTable());
+ mvTasks, fsopPath, fsInputDesc.isMmTable(), fsInputDesc.isDirectInsert());
ConditionalTask cndTsk = GenMapRedUtils.createCondTask(conf, currTask, dummyMv, work,
fsInputDesc.getMergeInputDirName(), finalName, mvTask, dependencyTask, lineageState);
@@ -1869,8 +1869,8 @@ public final class GenMapRedUtils {
.isSkewedStoredAsDir();
}
- public static Task<MoveWork> findMoveTaskForFsopOutput(
- List<Task<MoveWork>> mvTasks, Path fsopFinalDir, boolean isMmFsop) {
+ public static Task<MoveWork> findMoveTaskForFsopOutput(List<Task<MoveWork>> mvTasks, Path fsopFinalDir,
+ boolean isMmFsop, boolean isDirectInsert) {
// find the move task
for (Task<MoveWork> mvTsk : mvTasks) {
MoveWork mvWork = mvTsk.getWork();
@@ -1879,7 +1879,7 @@ public final class GenMapRedUtils {
if (mvWork.getLoadFileWork() != null) {
srcDir = mvWork.getLoadFileWork().getSourcePath();
isLfd = true;
- if (isMmFsop) {
+ if (isMmFsop || isDirectInsert) {
srcDir = srcDir.getParent();
}
} else if (mvWork.getLoadTableWork() != null) {
@@ -1910,8 +1910,8 @@ public final class GenMapRedUtils {
// no need of merging if the move is to a local file system
// We are looking based on the original FSOP, so use the original path as is.
- MoveTask mvTask = (MoveTask) GenMapRedUtils.findMoveTaskForFsopOutput(
- mvTasks, fsOp.getConf().getFinalDirName(), fsOp.getConf().isMmTable());
+ MoveTask mvTask = (MoveTask) GenMapRedUtils.findMoveTaskForFsopOutput(mvTasks, fsOp.getConf().getFinalDirName(),
+ fsOp.getConf().isMmTable(), fsOp.getConf().isDirectInsert());
// TODO: wtf?!! why is this in this method? This has nothing to do with anything.
if (isInsertTable && hconf.getBoolVar(ConfVars.HIVESTATSAUTOGATHER)
@@ -1985,9 +1985,15 @@ public final class GenMapRedUtils {
FileSinkDesc fileSinkDesc = fsOp.getConf();
boolean isMmTable = fileSinkDesc.isMmTable();
+ boolean isDirectInsert = fileSinkDesc.isDirectInsert();
if (chDir) {
dest = fileSinkDesc.getMergeInputDirName();
- if (!isMmTable) {
+ /**
+ * Skip temporary file generation for:
+ * 1. MM Tables
+ * 2. INSERT operation on full ACID table
+ */
+ if ((!isMmTable) && (!isDirectInsert)) {
// generate the temporary file
// it must be on the same file system as the current destination
Context baseCtx = parseCtx.getContext();
@@ -2018,8 +2024,8 @@ public final class GenMapRedUtils {
Task<MoveWork> mvTask = null;
if (!chDir) {
- mvTask = GenMapRedUtils.findMoveTaskForFsopOutput(
- mvTasks, fsOp.getConf().getFinalDirName(), fsOp.getConf().isMmTable());
+ mvTask = GenMapRedUtils.findMoveTaskForFsopOutput(mvTasks, fsOp.getConf().getFinalDirName(), isMmTable,
+ isDirectInsert);
}
// Set the move task to be dependent on the current task
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 2328eed..fed890f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -7329,6 +7329,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
Table destinationTable = null; // destination table if any
boolean destTableIsTransactional; // true for full ACID table and MM table
boolean destTableIsFullAcid; // should the destination table be written to using ACID
+ boolean isDirectInsert = false; // should we add files directly to the final path
boolean destTableIsTemporary = false;
boolean destTableIsMaterialization = false;
Partition destinationPartition = null;// destination partition if any
@@ -7342,7 +7343,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
LoadTableDesc ltd = null;
ListBucketingCtx lbCtx = null;
Map<String, String> partSpec = null;
- boolean isMmTable = false, isMmCreate = false;
+ boolean isMmTable = false, isMmCreate = false, isNonNativeTable = false;
Long writeId = null;
HiveTxnManager txnMgr = getTxnMgr();
@@ -7386,13 +7387,19 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
destinationPath = new Path(destinationTable.getPath(), dpCtx.getSPPath());
}
- boolean isNonNativeTable = destinationTable.isNonNative();
+ isNonNativeTable = destinationTable.isNonNative();
isMmTable = AcidUtils.isInsertOnlyTable(destinationTable.getParameters());
- if (isNonNativeTable || isMmTable) {
- queryTmpdir = destinationPath;
- } else {
- queryTmpdir = ctx.getTempDirForFinalJobPath(destinationPath);
+ AcidUtils.Operation acidOp = AcidUtils.Operation.NOT_ACID;
+ // this table_desc does not contain the partitioning columns
+ tableDescriptor = Utilities.getTableDesc(destinationTable);
+
+ if (!isNonNativeTable) {
+ if (destTableIsTransactional) {
+ acidOp = getAcidType(tableDescriptor.getOutputFileFormatClass(), dest, isMmTable);
+ }
}
+ isDirectInsert = isDirectInsert(destTableIsFullAcid, acidOp);
+ queryTmpdir = getTmpDir(isNonNativeTable, isMmTable, isDirectInsert, destinationPath);
if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
Utilities.FILE_OP_LOGGER.trace("create filesink w/DEST_TABLE specifying " + queryTmpdir
+ " from " + destinationPath);
@@ -7401,8 +7408,6 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
// set the root of the temporary path where dynamic partition columns will populate
dpCtx.setRootPath(queryTmpdir);
}
- // this table_desc does not contain the partitioning columns
- tableDescriptor = Utilities.getTableDesc(destinationTable);
// Add NOT NULL constraint check
input = genConstraintsPlan(dest, qb, input);
@@ -7436,7 +7441,6 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
// Create the work for moving the table
// NOTE: specify Dynamic partitions in dest_tab for WriteEntity
if (!isNonNativeTable) {
- AcidUtils.Operation acidOp = AcidUtils.Operation.NOT_ACID;
if (destTableIsTransactional) {
acidOp = getAcidType(tableDescriptor.getOutputFileFormatClass(), dest, isMmTable);
checkAcidConstraints();
@@ -7465,10 +7469,17 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
// deltas and base and leave them up to the cleaner to clean up
boolean isInsertInto = qb.getParseInfo().isInsertIntoTable(
destinationTable.getDbName(), destinationTable.getTableName());
- LoadFileType loadType = (!isInsertInto && !destTableIsTransactional)
- ? LoadFileType.REPLACE_ALL : LoadFileType.KEEP_EXISTING;
+ LoadFileType loadType;
+ if (isDirectInsert) {
+ loadType = LoadFileType.IGNORE;
+ } else if (!isInsertInto && !destTableIsTransactional) {
+ loadType = LoadFileType.REPLACE_ALL;
+ } else {
+ loadType = LoadFileType.KEEP_EXISTING;
+ }
ltd.setLoadFileType(loadType);
ltd.setInsertOverwrite(!isInsertInto);
+ ltd.setIsDirectInsert(isDirectInsert);
ltd.setLbCtx(lbCtx);
loadTableWork.add(ltd);
} else {
@@ -7525,13 +7536,23 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
}
}
+ isNonNativeTable = destinationTable.isNonNative();
isMmTable = AcidUtils.isInsertOnlyTable(destinationTable.getParameters());
- queryTmpdir = isMmTable ? destinationPath : ctx.getTempDirForFinalJobPath(destinationPath);
+ AcidUtils.Operation acidOp = AcidUtils.Operation.NOT_ACID;
+ // this table_desc does not contain the partitioning columns
+ tableDescriptor = Utilities.getTableDesc(destinationTable);
+
+ if (!isNonNativeTable) {
+ if (destTableIsTransactional) {
+ acidOp = getAcidType(tableDescriptor.getOutputFileFormatClass(), dest, isMmTable);
+ }
+ }
+ isDirectInsert = isDirectInsert(destTableIsFullAcid, acidOp);
+ queryTmpdir = getTmpDir(isNonNativeTable, isMmTable, isDirectInsert, destinationPath);
if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
Utilities.FILE_OP_LOGGER.trace("create filesink w/DEST_PARTITION specifying "
+ queryTmpdir + " from " + destinationPath);
}
- tableDescriptor = Utilities.getTableDesc(destinationTable);
// Add NOT NULL constraint check
input = genConstraintsPlan(dest, qb, input);
@@ -7561,7 +7582,6 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
lbCtx = constructListBucketingCtx(destinationPartition.getSkewedColNames(),
destinationPartition.getSkewedColValues(), destinationPartition.getSkewedColValueLocationMaps(),
destinationPartition.isStoredAsSubDirectories());
- AcidUtils.Operation acidOp = AcidUtils.Operation.NOT_ACID;
if (destTableIsTransactional) {
acidOp = getAcidType(tableDescriptor.getOutputFileFormatClass(), dest, isMmTable);
checkAcidConstraints();
@@ -7589,10 +7609,17 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
boolean isInsertInto = !qb.getParseInfo().isDestToOpTypeInsertOverwrite(dest);
// For Acid table, Insert Overwrite shouldn't replace the table content. We keep the old
// deltas and base and leave them up to the cleaner to clean up
- LoadFileType loadType = (!isInsertInto && !destTableIsTransactional)
- ? LoadFileType.REPLACE_ALL : LoadFileType.KEEP_EXISTING;
+ LoadFileType loadType;
+ if (isDirectInsert) {
+ loadType = LoadFileType.IGNORE;
+ } else if (!isInsertInto && !destTableIsTransactional) {
+ loadType = LoadFileType.REPLACE_ALL;
+ } else {
+ loadType = LoadFileType.KEEP_EXISTING;
+ }
ltd.setLoadFileType(loadType);
ltd.setInsertOverwrite(!isInsertInto);
+ ltd.setIsDirectInsert(isDirectInsert);
ltd.setLbCtx(lbCtx);
loadTableWork.add(ltd);
@@ -7818,7 +7845,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
// the metastore table, then we move and commit the partitions. At least for the time being,
// this order needs to be enforced because metastore expects a table to exist before we can
// add any partitions to it.
- boolean isNonNativeTable = tableDescriptor.isNonNative();
+ isNonNativeTable = tableDescriptor.isNonNative();
if (!isNonNativeTable) {
AcidUtils.Operation acidOp = AcidUtils.Operation.NOT_ACID;
if (destTableIsTransactional) {
@@ -7905,7 +7932,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
FileSinkDesc fileSinkDesc = createFileSinkDesc(dest, tableDescriptor, destinationPartition,
destinationPath, currentTableId, destTableIsFullAcid, destTableIsTemporary,//this was 1/4 acid
destTableIsMaterialization, queryTmpdir, rsCtx, dpCtx, lbCtx, fsRS,
- canBeMerged, destinationTable, writeId, isMmCreate, destType, qb);
+ canBeMerged, destinationTable, writeId, isMmCreate, destType, qb, isDirectInsert);
if (isMmCreate) {
// Add FSD so that the LoadTask compilation could fix up its path to avoid the move.
if (tableDesc != null) {
@@ -7961,6 +7988,30 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
return output;
}
+ private boolean isDirectInsert(boolean destTableIsFullAcid, AcidUtils.Operation acidOp) {
+ boolean directInsertEnabled = conf.getBoolVar(HiveConf.ConfVars.HIVE_ACID_DIRECT_INSERT_ENABLED);
+ boolean directInsert = directInsertEnabled && destTableIsFullAcid && (acidOp == AcidUtils.Operation.INSERT);
+ if (LOG.isDebugEnabled() && directInsert) {
+ LOG.debug("Direct insert for ACID tables is enabled.");
+ }
+ return directInsert;
+ }
+
+ private Path getTmpDir(boolean isNonNativeTable, boolean isMmTable, boolean isDirectInsert,
+ Path destinationPath) {
+ /**
+ * We will directly insert to the final destination in the following cases:
+ * 1. Non native table
+ * 2. Micro-managed (insert only table)
+ * 3. Full ACID table and operation type is INSERT
+ */
+ if (isNonNativeTable || isMmTable || isDirectInsert) {
+ return destinationPath;
+ } else {
+ return ctx.getTempDirForFinalJobPath(destinationPath);
+ }
+ }
+
private boolean useBatchingSerializer(String serdeClassName) {
return SessionState.get().isHiveServerQuery() &&
hasSetBatchSerializer(serdeClassName);
@@ -8115,7 +8166,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
boolean destTableIsMaterialization, Path queryTmpdir,
SortBucketRSCtx rsCtx, DynamicPartitionCtx dpCtx, ListBucketingCtx lbCtx,
RowSchema fsRS, boolean canBeMerged, Table dest_tab, Long mmWriteId, boolean isMmCtas,
- Integer dest_type, QB qb) throws SemanticException {
+ Integer dest_type, QB qb, boolean isDirectInsert) throws SemanticException {
boolean isInsertOverwrite = false;
switch (dest_type) {
case QBMetaData.DEST_PARTITION:
@@ -8140,7 +8191,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
conf.getBoolVar(HiveConf.ConfVars.COMPRESSRESULT), currentTableId, rsCtx.isMultiFileSpray(),
canBeMerged, rsCtx.getNumFiles(), rsCtx.getTotalFiles(), rsCtx.getPartnCols(), dpCtx,
dest_path, mmWriteId, isMmCtas, isInsertOverwrite, qb.getIsQuery(),
- qb.isCTAS() || qb.isMaterializedView());
+ qb.isCTAS() || qb.isMaterializedView(), isDirectInsert);
boolean isHiveServerQuery = SessionState.get().isHiveServerQuery();
fileSinkDesc.setHiveServerQuery(isHiveServerQuery);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
index c102a69..5fb5fd3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
@@ -433,7 +433,7 @@ public class GenSparkUtils {
Task<MoveWork> mvTask = null;
if (!chDir) {
- mvTask = GenMapRedUtils.findMoveTaskForFsopOutput(mvTasks, fileSinkDesc.getFinalDirName(), false);
+ mvTask = GenMapRedUtils.findMoveTaskForFsopOutput(mvTasks, fileSinkDesc.getFinalDirName(), false, false);
}
// Set the move task to be dependent on the current task
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
index ecc7bde..bbf73cb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
@@ -112,6 +112,8 @@ public class FileSinkDesc extends AbstractOperatorDesc implements IStatsGatherDe
private boolean isInsertOverwrite = false;
+ private boolean isDirectInsert = false;
+
private boolean isQuery = false;
private boolean isCTASorCM = false;
@@ -122,12 +124,10 @@ public class FileSinkDesc extends AbstractOperatorDesc implements IStatsGatherDe
/**
* @param destPath - the final destination for data
*/
- public FileSinkDesc(final Path dirName, final TableDesc tableInfo,
- final boolean compressed, final int destTableId, final boolean multiFileSpray,
- final boolean canBeMerged, final int numFiles, final int totalFiles,
- final List<ExprNodeDesc> partitionCols, final DynamicPartitionCtx dpCtx, Path destPath,
- Long mmWriteId, boolean isMmCtas, boolean isInsertOverwrite, boolean isQuery, boolean isCTASorCM) {
-
+ public FileSinkDesc(final Path dirName, final TableDesc tableInfo, final boolean compressed, final int destTableId,
+ final boolean multiFileSpray, final boolean canBeMerged, final int numFiles, final int totalFiles,
+ final List<ExprNodeDesc> partitionCols, final DynamicPartitionCtx dpCtx, Path destPath, Long mmWriteId,
+ boolean isMmCtas, boolean isInsertOverwrite, boolean isQuery, boolean isCTASorCM, boolean isDirectInsert) {
this.dirName = dirName;
this.tableInfo = tableInfo;
this.compressed = compressed;
@@ -145,6 +145,7 @@ public class FileSinkDesc extends AbstractOperatorDesc implements IStatsGatherDe
this.isInsertOverwrite = isInsertOverwrite;
this.isQuery = isQuery;
this.isCTASorCM = isCTASorCM;
+ this.isDirectInsert = isDirectInsert;
}
public FileSinkDesc(final Path dirName, final TableDesc tableInfo,
@@ -164,9 +165,9 @@ public class FileSinkDesc extends AbstractOperatorDesc implements IStatsGatherDe
@Override
public Object clone() throws CloneNotSupportedException {
- FileSinkDesc ret = new FileSinkDesc(dirName, tableInfo, compressed,
- destTableId, multiFileSpray, canBeMerged, numFiles, totalFiles,
- partitionCols, dpCtx, destPath, mmWriteId, isMmCtas, isInsertOverwrite, isQuery, isCTASorCM);
+ FileSinkDesc ret = new FileSinkDesc(dirName, tableInfo, compressed, destTableId, multiFileSpray, canBeMerged,
+ numFiles, totalFiles, partitionCols, dpCtx, destPath, mmWriteId, isMmCtas, isInsertOverwrite, isQuery,
+ isCTASorCM, isDirectInsert);
ret.setCompressCodec(compressCodec);
ret.setCompressType(compressType);
ret.setGatherStats(gatherStats);
@@ -184,6 +185,7 @@ public class FileSinkDesc extends AbstractOperatorDesc implements IStatsGatherDe
ret.setFilesToFetch(filesToFetch);
ret.setIsQuery(isQuery);
ret.setIsCTASorCM(isCTASorCM);
+ ret.setIsDirectInsert(isDirectInsert);
return ret;
}
@@ -223,6 +225,14 @@ public class FileSinkDesc extends AbstractOperatorDesc implements IStatsGatherDe
this.isUsingBatchingSerDe = isUsingBatchingSerDe;
}
+ public void setIsDirectInsert(boolean isDirectInsert) {
+ this.isDirectInsert = isDirectInsert;
+ }
+
+ public boolean isDirectInsert() {
+ return this.isDirectInsert;
+ }
+
@Explain(displayName = "directory", explainLevels = { Level.EXTENDED })
public Path getDirName() {
return dirName;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
index bed0581..a62b3cc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
@@ -43,6 +43,7 @@ public class LoadTableDesc extends LoadDesc implements Serializable {
private int stmtId;
private Long currentWriteId;
private boolean isInsertOverwrite;
+ private boolean isDirectInsert;
// TODO: the below seem like they should just be combined into partitionDesc
private Table mdTable;
@@ -235,6 +236,14 @@ public class LoadTableDesc extends LoadDesc implements Serializable {
this.isInsertOverwrite = v;
}
+ public void setIsDirectInsert(boolean isDirectInsert) {
+ this.isDirectInsert = isDirectInsert;
+ }
+
+ public boolean isDirectInsert() {
+ return this.isDirectInsert;
+ }
+
/**
* @return the lbCtx
*/
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index 739f2b6..b4bc3c7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -404,6 +404,7 @@ public class CompactorMR {
private int bucketNum;
private Path base;
private Path[] deltas;
+ private Map<String, String> deltasToAttemptId;
public CompactorInputSplit() {
}
@@ -420,12 +421,13 @@ public class CompactorMR {
* @throws IOException
*/
CompactorInputSplit(Configuration hadoopConf, int bucket, List<Path> files, Path base,
- Path[] deltas)
+ Path[] deltas, Map<String, String> deltasToAttemptId)
throws IOException {
bucketNum = bucket;
this.base = base;
this.deltas = deltas;
locations = new ArrayList<String>();
+ this.deltasToAttemptId = deltasToAttemptId;
for (Path path : files) {
FileSystem fs = path.getFileSystem(hadoopConf);
@@ -470,6 +472,13 @@ public class CompactorMR {
for (int i = 0; i < deltas.length; i++) {
dataOutput.writeInt(deltas[i].toString().length());
dataOutput.writeBytes(deltas[i].toString());
+ String attemptId = deltasToAttemptId.get(deltas[i].toString());
+ if (attemptId == null) {
+ dataOutput.writeInt(0);
+ } else {
+ dataOutput.writeInt(attemptId.length());
+ dataOutput.writeBytes(attemptId.toString());
+ }
}
}
@@ -502,11 +511,20 @@ public class CompactorMR {
}
numElements = dataInput.readInt();
deltas = new Path[numElements];
+ deltasToAttemptId = new HashMap<>();
for (int i = 0; i < numElements; i++) {
len = dataInput.readInt();
buf = new byte[len];
dataInput.readFully(buf);
deltas[i] = new Path(new String(buf));
+ len = dataInput.readInt();
+ String attemptId = null;
+ if (len > 0) {
+ buf = new byte[len];
+ dataInput.readFully(buf);
+ attemptId = new String(buf);
+ }
+ deltasToAttemptId.put(deltas[i].toString(), attemptId);
}
}
@@ -516,6 +534,7 @@ public class CompactorMR {
bucketNum = other.bucketNum;
base = other.base;
deltas = other.deltas;
+ deltasToAttemptId = other.deltasToAttemptId;
}
int getBucket() {
@@ -530,6 +549,10 @@ public class CompactorMR {
return deltas;
}
+ Map<String, String> getDeltasToAttemptId() {
+ return deltasToAttemptId;
+ }
+
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
@@ -587,7 +610,7 @@ public class CompactorMR {
// For each file, figure out which bucket it is.
Matcher matcher = isRawFormat ?
AcidUtils.LEGACY_BUCKET_DIGIT_PATTERN.matcher(f.getPath().getName())
- : AcidUtils.BUCKET_DIGIT_PATTERN.matcher(f.getPath().getName());
+ : AcidUtils.BUCKET_PATTERN.matcher(f.getPath().getName());
addFileToMap(matcher, f.getPath(), sawBase, splitToBucketMap);
}
} else {
@@ -606,7 +629,7 @@ public class CompactorMR {
// multiple ingestions of various sizes.
Path[] deltasForSplit = isTableBucketed ? deltaDirs : getDeltaDirsFromBucketTracker(bt);
splits.add(new CompactorInputSplit(entries, e.getKey(), bt.buckets,
- bt.sawBase ? baseDir : null, deltasForSplit));
+ bt.sawBase ? baseDir : null, deltasForSplit, bt.deltasToAttemptId));
}
LOG.debug("Returning " + splits.size() + " splits");
@@ -643,7 +666,15 @@ public class CompactorMR {
//may be a data loss scenario
throw new IllegalArgumentException(msg);
}
- int bucketNum = Integer.parseInt(matcher.group());
+ int bucketNum = -1;
+ String attemptId = null;
+ if (matcher.groupCount() > 0) {
+ bucketNum = Integer.parseInt(matcher.group(1));
+ attemptId = matcher.group(2) != null ? matcher.group(2).substring(1) : null;
+ } else {
+ bucketNum = Integer.parseInt(matcher.group());
+ }
+
BucketTracker bt = splitToBucketMap.get(bucketNum);
if (bt == null) {
bt = new BucketTracker();
@@ -652,16 +683,19 @@ public class CompactorMR {
LOG.debug("Adding " + file.toString() + " to list of files for splits");
bt.buckets.add(file);
bt.sawBase |= sawBase;
+ bt.deltasToAttemptId.put(file.getParent().toString(), attemptId);
}
private static class BucketTracker {
BucketTracker() {
sawBase = false;
buckets = new ArrayList<Path>();
+ deltasToAttemptId = new HashMap<>();
}
boolean sawBase;
List<Path> buckets;
+ Map<String, String> deltasToAttemptId;
}
}
@@ -734,7 +768,7 @@ public class CompactorMR {
boolean isMajor = jobConf.getBoolean(IS_MAJOR, false);
AcidInputFormat.RawReader<V> reader =
aif.getRawReader(jobConf, isMajor, split.getBucket(),
- writeIdList, split.getBaseDir(), split.getDeltaDirs());
+ writeIdList, split.getBaseDir(), split.getDeltaDirs(), split.getDeltasToAttemptId());
RecordIdentifier identifier = reader.createKey();
V value = reader.createValue();
getWriter(reporter, reader.getObjectInspector(), split.getBucket());
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/util/UpgradeTool.java b/ql/src/java/org/apache/hadoop/hive/ql/util/UpgradeTool.java
index 58e6289..e9e49a6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/util/UpgradeTool.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/util/UpgradeTool.java
@@ -246,7 +246,6 @@ public class UpgradeTool {
*/
static void handleRenameFiles(Table t, Path p, boolean execute, Configuration conf,
boolean isBucketed, PrintWriter pw) throws IOException {
- AcidUtils.BUCKET_DIGIT_PATTERN.matcher("foo");
if (isBucketed) {
/* For bucketed tables we assume that Hive wrote them and 0000M_0 and 0000M_0_copy_8
are the only possibilities. Since we can't move files across buckets the only thing we
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnAddPartition.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnAddPartition.java
index c9cb669..fa15e28 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnAddPartition.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnAddPartition.java
@@ -294,6 +294,6 @@ public class TestTxnAddPartition extends TxnCommandsBaseForTests {
runStatementOnDriver("insert into Tstage partition(p=1) values(0,2),(1,4)");
runStatementOnDriver("ALTER TABLE T ADD PARTITION (p=0) location '"
- + getWarehouseDir() + "/tstage/p=1/delta_0000001_0000001_0000/bucket_00001'");
+ + getWarehouseDir() + "/tstage/p=1/delta_0000001_0000001_0000/bucket_00001_0'");
}
}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
index 8421408..a1f59a8 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
@@ -1167,7 +1167,7 @@ public class TestTxnCommands extends TxnCommandsBaseForTests {
{"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":0}\t1\t2", "nonacidorctbl/000001_0"},
{"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":1}\t0\t12", "nonacidorctbl/000001_0_copy_1"},
{"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":2}\t1\t5", "nonacidorctbl/000001_0_copy_1"},
- {"{\"writeid\":10000001,\"bucketid\":536936448,\"rowid\":0}\t1\t17", "nonacidorctbl/delta_10000001_10000001_0000/bucket_00001"}
+ {"{\"writeid\":10000001,\"bucketid\":536936448,\"rowid\":0}\t1\t17", "nonacidorctbl/delta_10000001_10000001_0000/bucket_00001_0"}
};
checkResult(expected, query, isVectorized, "before compact", LOG);
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index e56d831..79dfb02 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -474,7 +474,7 @@ public class TestTxnCommands2 {
sawNewDelta = true;
FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER);
Assert.assertEquals(1, buckets.length); // only one bucket file
- Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00000"));
+ Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00000_0"));
} else {
Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
}
@@ -776,7 +776,7 @@ public class TestTxnCommands2 {
} else if (numDelta == 2) {
Assert.assertEquals("delta_10000002_10000002_0000", status[i].getPath().getName());
Assert.assertEquals(1, buckets.length);
- Assert.assertEquals("bucket_00000", buckets[0].getPath().getName());
+ Assert.assertEquals("bucket_00000_0", buckets[0].getPath().getName());
}
} else if (status[i].getPath().getName().matches("delete_delta_.*")) {
numDeleteDelta++;
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java
index 908ceb4..4f4bca2 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java
@@ -76,7 +76,7 @@ public class TestTxnCommands3 extends TxnCommandsBaseForTests {
String testQuery = "select ROW__ID, a, b, INPUT__FILE__NAME from mydb1.S";
String[][] expected = new String[][] {
{"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2",
- "s/delta_0000001_0000001_0000/bucket_00000"},
+ "s/delta_0000001_0000001_0000/bucket_00000_0"},
{"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t4\t6",
"s/delta_0000002_0000002_0000/bucket_00000"}};
checkResult(expected, testQuery, false, "check data", LOG);
@@ -273,14 +273,14 @@ public class TestTxnCommands3 extends TxnCommandsBaseForTests {
"select ROW__ID, a, b, ds, INPUT__FILE__NAME from acid_uap order by ds, a, b";
String[][] expected = new String[][]{
{"{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t1\tbah\ttoday",
- "warehouse/acid_uap/ds=today/delta_0000002_0000002_0000/bucket_00001"},
+ "warehouse/acid_uap/ds=today/delta_0000002_0000002_0000/bucket_00001_0"},
{"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t2\tyah\ttoday",
- "warehouse/acid_uap/ds=today/delta_0000002_0000002_0000/bucket_00000"},
+ "warehouse/acid_uap/ds=today/delta_0000002_0000002_0000/bucket_00000_0"},
{"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t1\tbah\ttomorrow",
- "warehouse/acid_uap/ds=tomorrow/delta_0000001_0000001_0000/bucket_00001"},
+ "warehouse/acid_uap/ds=tomorrow/delta_0000001_0000001_0000/bucket_00001_0"},
{"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t2\tyah\ttomorrow",
- "warehouse/acid_uap/ds=tomorrow/delta_0000001_0000001_0000/bucket_00000"}};
+ "warehouse/acid_uap/ds=tomorrow/delta_0000001_0000001_0000/bucket_00000_0"}};
checkResult(expected, testQuery, isVectorized, "after insert", LOG);
runStatementOnDriver("update acid_uap set b = 'fred'");
@@ -324,9 +324,9 @@ public class TestTxnCommands3 extends TxnCommandsBaseForTests {
String testQuery = "select ROW__ID, a, b, INPUT__FILE__NAME from T";
String[][] expected = new String[][] {
{"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t0\t2",
- "t/delta_0000001_0000001_0000/bucket_00000"},
+ "t/delta_0000001_0000001_0000/bucket_00000_0"},
{"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t1\t4",
- "t/delta_0000002_0000002_0000/bucket_00000"}};
+ "t/delta_0000002_0000002_0000/bucket_00000_0"}};
checkResult(expected, testQuery, false, "check data", LOG);
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java
index 8676e0d..6394429 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java
@@ -61,9 +61,9 @@ public class TestTxnConcatenate extends TxnCommandsBaseForTests {
{"{\"writeid\":2,\"bucketid\":536936448,\"rowid\":1}\t4\t4",
"acidtbl/delta_0000002_0000002_0000/bucket_00001"},
{"{\"writeid\":3,\"bucketid\":536936448,\"rowid\":0}\t5\t6",
- "acidtbl/delta_0000003_0000003_0000/bucket_00001"},
+ "acidtbl/delta_0000003_0000003_0000/bucket_00001_0"},
{"{\"writeid\":3,\"bucketid\":536936448,\"rowid\":1}\t8\t8",
- "acidtbl/delta_0000003_0000003_0000/bucket_00001"}};
+ "acidtbl/delta_0000003_0000003_0000/bucket_00001_0"}};
checkResult(expected, testQuery, false, "check data", LOG);
/*in UTs, there is no standalone HMS running to kick off compaction so it's done via runWorker()
@@ -100,11 +100,11 @@ public class TestTxnConcatenate extends TxnCommandsBaseForTests {
{"{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t1\t4",
"acidtblpart/p=p1/delta_0000002_0000002_0000/bucket_00001"},
{"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t4\t5",
- "acidtblpart/p=p2/delta_0000001_0000001_0000/bucket_00001"},
+ "acidtblpart/p=p2/delta_0000001_0000001_0000/bucket_00001_0"},
{"{\"writeid\":3,\"bucketid\":536936448,\"rowid\":0}\t5\t6",
- "acidtblpart/p=p1/delta_0000003_0000003_0000/bucket_00001"},
+ "acidtblpart/p=p1/delta_0000003_0000003_0000/bucket_00001_0"},
{"{\"writeid\":3,\"bucketid\":536936448,\"rowid\":0}\t8\t8",
- "acidtblpart/p=p2/delta_0000003_0000003_0000/bucket_00001"}};
+ "acidtblpart/p=p2/delta_0000003_0000003_0000/bucket_00001_0"}};
checkResult(expected, testQuery, false, "check data", LOG);
/*in UTs, there is no standalone HMS running to kick off compaction so it's done via runWorker()
@@ -124,11 +124,11 @@ public class TestTxnConcatenate extends TxnCommandsBaseForTests {
{"{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t1\t4",
"acidtblpart/p=p1/base_0000003_v0000019/bucket_00001"},
{"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t4\t5",
- "acidtblpart/p=p2/delta_0000001_0000001_0000/bucket_00001"},
+ "acidtblpart/p=p2/delta_0000001_0000001_0000/bucket_00001_0"},
{"{\"writeid\":3,\"bucketid\":536936448,\"rowid\":0}\t5\t6",
"acidtblpart/p=p1/base_0000003_v0000019/bucket_00001"},
{"{\"writeid\":3,\"bucketid\":536936448,\"rowid\":0}\t8\t8",
- "acidtblpart/p=p2/delta_0000003_0000003_0000/bucket_00001"}};
+ "acidtblpart/p=p2/delta_0000003_0000003_0000/bucket_00001_0"}};
checkResult(expected2, testQuery, false, "check data after concatenate", LOG);
}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java
index 66b2b27..ba53417 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java
@@ -379,7 +379,7 @@ target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands-1521148657811/
"select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID";
String[][] expected = new String[][] {
{"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t0\t0",
- "t/p=10/delta_0000001_0000001_0000/bucket_00000"},
+ "t/p=10/delta_0000001_0000001_0000/bucket_00000_0"},
{"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t3\t4",
"t/p=11/delta_0000002_0000002_0000/000000_0"},
{"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t5\t6",
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java
index bb55d9f..125c76a 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java
@@ -126,13 +126,13 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests {
String[][] expectedInter2 = new String[][] {
{"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000001_0000001_0000/000000_0"},
{"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/delta_0000002_0000002_0000/bucket_00000"},
- {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/delta_0000003_0000003_0000/bucket_00000"}
+ {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/delta_0000003_0000003_0000/bucket_00000_0"}
};
checkResult(expectedInter2, testQuery, isVectorized, "insert");
runStatementOnDriver("delete from T where a = 3");
String[][] expectedInter3 = new String[][] {
{"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/delta_0000002_0000002_0000/bucket_00000"},
- {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/delta_0000003_0000003_0000/bucket_00000"}
+ {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/delta_0000003_0000003_0000/bucket_00000_0"}
};
checkResult(expectedInter3, testQuery, isVectorized, "delete");
//test minor compaction
@@ -165,7 +165,7 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests {
String[][] expected5 = new String[][]{
{"{\"writeid\":7,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/delta_0000007_0000007_0000/bucket_00000"},
{"{\"writeid\":7,\"bucketid\":536870912,\"rowid\":1}\t1\t17", "t/delta_0000007_0000007_0000/bucket_00000"},
- {"{\"writeid\":9,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/delta_0000009_0000009_0000/bucket_00000"}
+ {"{\"writeid\":9,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/delta_0000009_0000009_0000/bucket_00000_0"}
};
checkResult(expected5, testQuery, isVectorized, "load data inpath overwrite update");
@@ -199,8 +199,8 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests {
"select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID";
String[][] expected = new String[][] {
//normal insert
- {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t0\t2", "t/delta_0000001_0000001_0000/bucket_00000"},
- {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t0\t4", "t/delta_0000001_0000001_0000/bucket_00000"},
+ {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t0\t2", "t/delta_0000001_0000001_0000/bucket_00000_0"},
+ {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t0\t4", "t/delta_0000001_0000001_0000/bucket_00000_0"},
//Load Data
{"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000002_0000002_0000/000000_0"},
{"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000002_0000002_0000/000000_0"}};
@@ -444,8 +444,8 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests {
String testQuery = isVectorized ? "select ROW__ID, a, b from T order by ROW__ID" :
"select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID";
String[][] expected = new String[][] {
- {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000001_0000001_0000/bucket_00000"},
- {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000001_0000001_0000/bucket_00000"},
+ {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000001_0000001_0000/bucket_00000_0"},
+ {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000001_0000001_0000/bucket_00000_0"},
{"{\"writeid\":1,\"bucketid\":536870913,\"rowid\":0}\t5\t5", "t/delta_0000001_0000001_0001/000000_0"},
{"{\"writeid\":1,\"bucketid\":536870913,\"rowid\":1}\t6\t6", "t/delta_0000001_0000001_0001/000000_0"}
};
@@ -483,8 +483,8 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests {
String testQuery = isVectorized ? "select ROW__ID, a, b from T order by ROW__ID" :
"select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID";
String[][] expected = new String[][] {
- {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000001_0000001_0000/bucket_00000"},
- {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000001_0000001_0000/bucket_00000"}
+ {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000001_0000001_0000/bucket_00000_0"},
+ {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000001_0000001_0000/bucket_00000_0"}
};
checkResult(expected, testQuery, isVectorized, "load data inpath");
}
@@ -505,7 +505,7 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests {
List<String> rs = runStatementOnDriver("select INPUT__FILE__NAME from T");
Assert.assertEquals(1, rs.size());
Assert.assertTrue("Unexpcted file name", rs.get(0)
- .endsWith("t/delta_0000001_0000001_0000/bucket_00000"));
+ .endsWith("t/delta_0000001_0000001_0000/bucket_00000_0"));
//T2 is an acid table so this should fail
CommandProcessorException e =
runStatementOnDriverNegative("load data local inpath '" + rs.get(0) + "' into table T2");
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java
index ea6b1d9..88d5d04 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java
@@ -48,6 +48,7 @@ import java.util.Set;
public class TestTxnNoBuckets extends TxnCommandsBaseForTests {
static final private Logger LOG = LoggerFactory.getLogger(TestTxnNoBuckets.class);
+ private static final String NO_BUCKETS_TBL_NAME = "nobuckets";
private static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") +
File.separator + TestTxnNoBuckets.class.getCanonicalName()
+ "-" + System.currentTimeMillis()
@@ -80,13 +81,13 @@ public class TestTxnNoBuckets extends TxnCommandsBaseForTests {
runStatementOnDriver("create table tmp (c1 integer, c2 integer, c3 integer) stored as orc tblproperties('transactional'='false')");
runStatementOnDriver("insert into tmp " + makeValuesClause(sourceVals1));
runStatementOnDriver("insert into tmp " + makeValuesClause(sourceVals2));
- runStatementOnDriver("drop table if exists nobuckets");
- runStatementOnDriver("create table nobuckets (c1 integer, c2 integer, c3 integer) stored " +
+ runStatementOnDriver(String.format("drop table if exists %s", NO_BUCKETS_TBL_NAME));
+ runStatementOnDriver("create table " + NO_BUCKETS_TBL_NAME + " (c1 integer, c2 integer, c3 integer) stored " +
"as orc tblproperties('transactional'='true', 'transactional_properties'='default')");
- String stmt = "insert into nobuckets select * from tmp";
+ String stmt = String.format("insert into %s select * from tmp", NO_BUCKETS_TBL_NAME);
runStatementOnDriver(stmt);
List<String> rs = runStatementOnDriver(
- "select ROW__ID, c1, c2, c3, INPUT__FILE__NAME from nobuckets order by ROW__ID");
+ String.format("select ROW__ID, c1, c2, c3, INPUT__FILE__NAME from %s order by ROW__ID", NO_BUCKETS_TBL_NAME));
Assert.assertEquals("", 4, rs.size());
LOG.warn("after insert");
for(String s : rs) {
@@ -96,54 +97,58 @@ public class TestTxnNoBuckets extends TxnCommandsBaseForTests {
* The number in the file name is writerId. This is the number encoded in ROW__ID.bucketId -
* see {@link org.apache.hadoop.hive.ql.io.BucketCodec}*/
Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t0\t0\t0\t"));
- Assert.assertTrue(rs.get(0), rs.get(0).endsWith("nobuckets/delta_0000001_0000001_0000/bucket_00000"));
+ Assert.assertTrue(rs.get(0), rs.get(0).endsWith(NO_BUCKETS_TBL_NAME + "/delta_0000001_0000001_0000/bucket_00000_0"));
Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t3\t3\t"));
- Assert.assertTrue(rs.get(1), rs.get(1).endsWith("nobuckets/delta_0000001_0000001_0000/bucket_00000"));
+ Assert.assertTrue(rs.get(1), rs.get(1).endsWith(NO_BUCKETS_TBL_NAME + "/delta_0000001_0000001_0000/bucket_00000_0"));
Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t1\t1\t1\t"));
- Assert.assertTrue(rs.get(2), rs.get(2).endsWith("nobuckets/delta_0000001_0000001_0000/bucket_00001"));
+ Assert.assertTrue(rs.get(2), rs.get(2).endsWith(NO_BUCKETS_TBL_NAME + "/delta_0000001_0000001_0000/bucket_00001_0"));
Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t2\t2\t2\t"));
- Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nobuckets/delta_0000001_0000001_0000/bucket_00001"));
+ Assert.assertTrue(rs.get(3), rs.get(3).endsWith(NO_BUCKETS_TBL_NAME + "/delta_0000001_0000001_0000/bucket_00001_0"));
hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_EXPLAIN_USER, false);
- rs = runStatementOnDriver("explain update nobuckets set c3 = 17 where c3 in(0,1)");
+ rs = runStatementOnDriver(String.format("explain update %s set c3 = 17 where c3 in(0,1)", NO_BUCKETS_TBL_NAME));
LOG.warn("Query Plan: ");
for (String s : rs) {
LOG.warn(s);
}
- runStatementOnDriver("update nobuckets set c3 = 17 where c3 in(0,1)");
- rs = runStatementOnDriver("select ROW__ID, c1, c2, c3, INPUT__FILE__NAME from nobuckets order by INPUT__FILE__NAME, ROW__ID");
+ runStatementOnDriver(String.format("update %s set c3 = 17 where c3 in(0,1)", NO_BUCKETS_TBL_NAME));
+ rs = runStatementOnDriver(
+ String.format("select ROW__ID, c1, c2, c3, INPUT__FILE__NAME from %s order by INPUT__FILE__NAME, ROW__ID",
+ NO_BUCKETS_TBL_NAME));
LOG.warn("after update");
for(String s : rs) {
LOG.warn(s);
}
Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t3\t3\t"));
- Assert.assertTrue(rs.get(0), rs.get(0).endsWith("nobuckets/delta_0000001_0000001_0000/bucket_00000"));
+ Assert.assertTrue(rs.get(0), rs.get(0).endsWith(NO_BUCKETS_TBL_NAME + "/delta_0000001_0000001_0000/bucket_00000_0"));
Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t2\t2\t2\t"));
- Assert.assertTrue(rs.get(1), rs.get(1).endsWith("nobuckets/delta_0000001_0000001_0000/bucket_00001"));
+ Assert.assertTrue(rs.get(1), rs.get(1).endsWith(NO_BUCKETS_TBL_NAME + "/delta_0000001_0000001_0000/bucket_00001_0"));
//so update has 1 writer, but which creates buckets where the new rows land
Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t0\t0\t17\t"));
- Assert.assertTrue(rs.get(2), rs.get(2).endsWith("nobuckets/delta_0000002_0000002_0000/bucket_00000"));
+ Assert.assertTrue(rs.get(2), rs.get(2).endsWith(NO_BUCKETS_TBL_NAME + "/delta_0000002_0000002_0000/bucket_00000"));
// update for "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t1\t1\t1\t"
Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t1\t1\t17\t"));
- Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nobuckets/delta_0000002_0000002_0000/bucket_00001"));
+ Assert.assertTrue(rs.get(3), rs.get(3).endsWith(NO_BUCKETS_TBL_NAME + "/delta_0000002_0000002_0000/bucket_00001"));
Set<String> expectedFiles = new HashSet<>();
//both delete events land in corresponding buckets to the original row-ids
- expectedFiles.add("ts/delete_delta_0000002_0000002_0000/bucket_00000");
- expectedFiles.add("ts/delete_delta_0000002_0000002_0000/bucket_00001");
- expectedFiles.add("nobuckets/delta_0000001_0000001_0000/bucket_00000");
- expectedFiles.add("nobuckets/delta_0000001_0000001_0000/bucket_00001");
- expectedFiles.add("nobuckets/delta_0000002_0000002_0000/bucket_00000");
- expectedFiles.add("nobuckets/delta_0000002_0000002_0000/bucket_00001");
+ expectedFiles.add(NO_BUCKETS_TBL_NAME + "/delete_delta_0000002_0000002_0000/bucket_00000");
+ expectedFiles.add(NO_BUCKETS_TBL_NAME + "/delete_delta_0000002_0000002_0000/bucket_00001");
+ expectedFiles.add(NO_BUCKETS_TBL_NAME + "/delta_0000001_0000001_0000/bucket_00000_0");
+ expectedFiles.add(NO_BUCKETS_TBL_NAME + "/delta_0000001_0000001_0000/bucket_00001_0");
+ expectedFiles.add(NO_BUCKETS_TBL_NAME + "/delta_0000002_0000002_0000/bucket_00000");
+ expectedFiles.add(NO_BUCKETS_TBL_NAME + "/delta_0000002_0000002_0000/bucket_00001");
//check that we get the right files on disk
- assertExpectedFileSet(expectedFiles, getWarehouseDir() + "/nobuckets");
+ assertExpectedFileSet(expectedFiles, getWarehouseDir() + "/" + NO_BUCKETS_TBL_NAME, NO_BUCKETS_TBL_NAME);
//todo: it would be nice to check the contents of the files... could use orc.FileDump - it has
// methods to print to a supplied stream but those are package private
- runStatementOnDriver("alter table nobuckets compact 'major'");
+ runStatementOnDriver(String.format("alter table %s compact 'major'", NO_BUCKETS_TBL_NAME));
TestTxnCommands2.runWorker(hiveConf);
- rs = runStatementOnDriver("select ROW__ID, c1, c2, c3, INPUT__FILE__NAME from nobuckets order by INPUT__FILE__NAME, ROW__ID");
+ rs = runStatementOnDriver(
+ String.format("select ROW__ID, c1, c2, c3, INPUT__FILE__NAME from %s order by INPUT__FILE__NAME, ROW__ID",
+ NO_BUCKETS_TBL_NAME));
LOG.warn("after major compact");
for(String s : rs) {
LOG.warn(s);
@@ -163,37 +168,37 @@ public class TestTxnNoBuckets extends TxnCommandsBaseForTests {
*/
String expected[][] = {
- {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t0\t0\t17", "nobuckets/base_0000002_v0000025/bucket_00000"},
- {"{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t1\t1\t17", "nobuckets/base_0000002_v0000025/bucket_00001"},
- {"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t2\t2\t2", "nobuckets/base_0000002_v0000025/bucket_00001"},
- {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t3\t3", "nobuckets/base_0000002_v0000025/bucket_00000"}
+ {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t0\t0\t17", NO_BUCKETS_TBL_NAME + "/base_0000002_v0000025/bucket_00000"},
+ {"{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t1\t1\t17", NO_BUCKETS_TBL_NAME + "/base_0000002_v0000025/bucket_00001"},
+ {"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t2\t2\t2", NO_BUCKETS_TBL_NAME + "/base_0000002_v0000025/bucket_00001"},
+ {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t3\t3", NO_BUCKETS_TBL_NAME + "/base_0000002_v0000025/bucket_00000"}
};
checkResult(expected,
"select ROW__ID, c1, c2, c3" + (shouldVectorize() ? "" : ", INPUT__FILE__NAME")
- + " from nobuckets order by c1, c2, c3",
+ + " from " + NO_BUCKETS_TBL_NAME + " order by c1, c2, c3",
shouldVectorize(),
"After Major Compaction", LOG);
expectedFiles.clear();
- expectedFiles.add("obuckets/delete_delta_0000002_0000002_0000/bucket_00000");
- expectedFiles.add("obuckets/delete_delta_0000002_0000002_0000/bucket_00001");
- expectedFiles.add("house/nobuckets/delta_0000001_0000001_0000/bucket_00000");
- expectedFiles.add("house/nobuckets/delta_0000001_0000001_0000/bucket_00001");
- expectedFiles.add("house/nobuckets/delta_0000002_0000002_0000/bucket_00000");
- expectedFiles.add("house/nobuckets/delta_0000002_0000002_0000/bucket_00001");
- expectedFiles.add("/warehouse/nobuckets/base_0000002_v0000025/bucket_00000");
- expectedFiles.add("/warehouse/nobuckets/base_0000002_v0000025/bucket_00001");
- assertExpectedFileSet(expectedFiles, getWarehouseDir() + "/nobuckets");
+ expectedFiles.add(NO_BUCKETS_TBL_NAME + "/delete_delta_0000002_0000002_0000/bucket_00000");
+ expectedFiles.add(NO_BUCKETS_TBL_NAME + "/delete_delta_0000002_0000002_0000/bucket_00001");
+ expectedFiles.add(NO_BUCKETS_TBL_NAME + "/delta_0000001_0000001_0000/bucket_00000_0");
+ expectedFiles.add(NO_BUCKETS_TBL_NAME + "/delta_0000001_0000001_0000/bucket_00001_0");
+ expectedFiles.add(NO_BUCKETS_TBL_NAME + "/delta_0000002_0000002_0000/bucket_00000");
+ expectedFiles.add(NO_BUCKETS_TBL_NAME + "/delta_0000002_0000002_0000/bucket_00001");
+ expectedFiles.add(NO_BUCKETS_TBL_NAME + "/base_0000002_v0000025/bucket_00000");
+ expectedFiles.add(NO_BUCKETS_TBL_NAME + "/base_0000002_v0000025/bucket_00001");
+ assertExpectedFileSet(expectedFiles, getWarehouseDir() + "/" + NO_BUCKETS_TBL_NAME, NO_BUCKETS_TBL_NAME);
TestTxnCommands2.runCleaner(hiveConf);
- rs = runStatementOnDriver("select c1, c2, c3 from nobuckets order by c1, c2, c3");
+ rs = runStatementOnDriver(String.format("select c1, c2, c3 from %s order by c1, c2, c3", NO_BUCKETS_TBL_NAME));
int[][] result = {{0,0,17},{1,1,17},{2,2,2},{3,3,3}};
Assert.assertEquals("Unexpected result after clean", stringifyValues(result), rs);
expectedFiles.clear();
- expectedFiles.add("/warehouse/nobuckets/base_0000002_v0000025/bucket_00000");
- expectedFiles.add("/warehouse/nobuckets/base_0000002_v0000025/bucket_00001");
- assertExpectedFileSet(expectedFiles, getWarehouseDir() + "/nobuckets");
+ expectedFiles.add(NO_BUCKETS_TBL_NAME + "/base_0000002_v0000025/bucket_00000");
+ expectedFiles.add(NO_BUCKETS_TBL_NAME + "/base_0000002_v0000025/bucket_00001");
+ assertExpectedFileSet(expectedFiles, getWarehouseDir() + "/" + NO_BUCKETS_TBL_NAME, NO_BUCKETS_TBL_NAME);
}
@Test
@@ -209,13 +214,13 @@ public class TestTxnNoBuckets extends TxnCommandsBaseForTests {
runStatementOnDriver("insert into tmp " + makeValuesClause(sourceVals2));
runStatementOnDriver("insert into tmp " + makeValuesClause(sourceVals3));
runStatementOnDriver("insert into tmp " + makeValuesClause(sourceVals4));
- runStatementOnDriver("drop table if exists nobuckets");
- runStatementOnDriver("create table nobuckets (c1 integer, c2 integer) partitioned by (c3 integer) stored " +
- "as orc tblproperties('transactional'='true', 'transactional_properties'='default')");
- String stmt = "insert into nobuckets partition(c3) select * from tmp";
+ runStatementOnDriver("drop table if exists " + NO_BUCKETS_TBL_NAME);
+ runStatementOnDriver(String.format("create table %s (c1 integer, c2 integer) partitioned by (c3 integer) stored " +
+ "as orc tblproperties('transactional'='true', 'transactional_properties'='default')", NO_BUCKETS_TBL_NAME));
+ String stmt = String.format("insert into %s partition(c3) select * from tmp", NO_BUCKETS_TBL_NAME);
runStatementOnDriver(stmt);
List<String> rs = runStatementOnDriver(
- "select ROW__ID, c1, c2, c3, INPUT__FILE__NAME from nobuckets order by ROW__ID");
+ String.format("select ROW__ID, c1, c2, c3, INPUT__FILE__NAME from %s order by ROW__ID", NO_BUCKETS_TBL_NAME));
Assert.assertEquals("", 8, rs.size());
LOG.warn("after insert");
for(String s : rs) {
@@ -223,16 +228,17 @@ public class TestTxnNoBuckets extends TxnCommandsBaseForTests {
}
rs = runStatementOnDriver(
- "select * from nobuckets where c2 in (0,3)");
+ String.format("select * from %s where c2 in (0,3)", NO_BUCKETS_TBL_NAME));
Assert.assertEquals(3, rs.size());
- runStatementOnDriver("update nobuckets set c2 = 17 where c2 in(0,3)");
- rs = runStatementOnDriver("select ROW__ID, c1, c2, c3, INPUT__FILE__NAME from nobuckets order by INPUT__FILE__NAME, ROW__ID");
+ runStatementOnDriver(String.format("update %s set c2 = 17 where c2 in(0,3)", NO_BUCKETS_TBL_NAME));
+ rs = runStatementOnDriver(
+ String.format("select ROW__ID, c1, c2, c3, INPUT__FILE__NAME from %s order by INPUT__FILE__NAME, ROW__ID",
+ NO_BUCKETS_TBL_NAME));
LOG.warn("after update");
for(String s : rs) {
LOG.warn(s);
}
- rs = runStatementOnDriver(
- "select * from nobuckets where c2=17");
+ rs = runStatementOnDriver(String.format("select * from %s where c2=17", NO_BUCKETS_TBL_NAME));
Assert.assertEquals(3, rs.size());
}
@@ -332,11 +338,11 @@ ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree /Users/ekoifman/dev/hiver
List<String> rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID");
String expected[][] = {
- {"{\"writeid\":1,\"bucketid\":536870913,\"rowid\":0}\t1\t2", "/delta_0000001_0000001_0001/bucket_00000"},
- {"{\"writeid\":1,\"bucketid\":536870913,\"rowid\":1}\t3\t4", "/delta_0000001_0000001_0001/bucket_00000"},
- {"{\"writeid\":1,\"bucketid\":536870914,\"rowid\":0}\t5\t6", "/delta_0000001_0000001_0002/bucket_00000"},
- {"{\"writeid\":1,\"bucketid\":536870915,\"rowid\":0}\t9\t10", "/delta_0000001_0000001_0003/bucket_00000"},
- {"{\"writeid\":1,\"bucketid\":536936450,\"rowid\":0}\t7\t8", "/delta_0000001_0000001_0002/bucket_00001"},
+ {"{\"writeid\":1,\"bucketid\":536870913,\"rowid\":0}\t1\t2", "/delta_0000001_0000001_0001/bucket_00000_0"},
+ {"{\"writeid\":1,\"bucketid\":536870913,\"rowid\":1}\t3\t4", "/delta_0000001_0000001_0001/bucket_00000_0"},
+ {"{\"writeid\":1,\"bucketid\":536870914,\"rowid\":0}\t5\t6", "/delta_0000001_0000001_0002/bucket_00000_0"},
+ {"{\"writeid\":1,\"bucketid\":536870915,\"rowid\":0}\t9\t10", "/delta_0000001_0000001_0003/bucket_00000_0"},
+ {"{\"writeid\":1,\"bucketid\":536936450,\"rowid\":0}\t7\t8", "/delta_0000001_0000001_0002/bucket_00001_0"},
};
checkExpected(rs, expected, "Unexpected row count after ctas");
}
@@ -563,9 +569,9 @@ ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree /Users/ekoifman/dev/hiver
{"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":3}\t0\t13",
"bucket_00000", "000000_0_copy_1"},
{"{\"writeid\":10000001,\"bucketid\":536870912,\"rowid\":0}\t0\t15",
- "bucket_00000", "bucket_00000"},
+ "bucket_00000", "bucket_00000_0"},
{"{\"writeid\":10000003,\"bucketid\":536870912,\"rowid\":0}\t0\t17",
- "bucket_00000", "bucket_00000"},
+ "bucket_00000", "bucket_00000_0"},
{"{\"writeid\":10000002,\"bucketid\":536870912,\"rowid\":0}\t0\t120",
"bucket_00000", "bucket_00000"},
{"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":0}\t1\t2",
@@ -577,7 +583,7 @@ ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree /Users/ekoifman/dev/hiver
{"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":6}\t1\t6",
"bucket_00000", "000000_0_copy_2"},
{"{\"writeid\":10000001,\"bucketid\":536870912,\"rowid\":1}\t1\t16",
- "bucket_00000", "bucket_00000"}
+ "bucket_00000", "bucket_00000_0"}
};
Assert.assertEquals("Unexpected row count before compaction", expected.length, rs.size());
for(int i = 0; i < expected.length; i++) {
@@ -792,14 +798,14 @@ ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree /Users/ekoifman/dev/hiver
String query = "select ROW__ID, p, q, a, b, INPUT__FILE__NAME from T order by p, q, a, b";
List<String> rs = runStatementOnDriver(query);
String[][] expected = {
- {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t1\t4\t1", "t/p=1/q=1/delta_0000001_0000001_0000/bucket_00000"},
- {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t1\t1\t4\t3", "t/p=1/q=1/delta_0000001_0000001_0000/bucket_00000"},
- {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t1\t1\t5\t1", "t/p=1/q=1/delta_0000003_0000003_0000/bucket_00000"},
- {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":1}\t1\t1\t5\t3", "t/p=1/q=1/delta_0000003_0000003_0000/bucket_00000"},
- {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2\t4\t2", "t/p=1/q=2/delta_0000001_0000001_0000/bucket_00000"},
- {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t1\t2\t4\t4", "t/p=1/q=2/delta_0000001_0000001_0000/bucket_00000"},
- {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t1\t2\t5\t2", "t/p=1/q=2/delta_0000003_0000003_0000/bucket_00000"},
- {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":1}\t1\t2\t5\t4", "t/p=1/q=2/delta_0000003_0000003_0000/bucket_00000"}
+ {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t1\t4\t1", "t/p=1/q=1/delta_0000001_0000001_0000/bucket_00000_0"},
+ {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t1\t1\t4\t3", "t/p=1/q=1/delta_0000001_0000001_0000/bucket_00000_0"},
+ {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t1\t1\t5\t1", "t/p=1/q=1/delta_0000003_0000003_0000/bucket_00000_0"},
+ {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":1}\t1\t1\t5\t3", "t/p=1/q=1/delta_0000003_0000003_0000/bucket_00000_0"},
+ {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2\t4\t2", "t/p=1/q=2/delta_0000001_0000001_0000/bucket_00000_0"},
+ {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t1\t2\t4\t4", "t/p=1/q=2/delta_0000001_0000001_0000/bucket_00000_0"},
+ {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t1\t2\t5\t2", "t/p=1/q=2/delta_0000003_0000003_0000/bucket_00000_0"},
+ {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":1}\t1\t2\t5\t4", "t/p=1/q=2/delta_0000003_0000003_0000/bucket_00000_0"}
};
checkExpected(rs, expected, "insert data");
@@ -810,10 +816,10 @@ ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree /Users/ekoifman/dev/hiver
query = "select ROW__ID, p, q, a, b, INPUT__FILE__NAME from T order by p, q, a, b";
rs = runStatementOnDriver(query);
String[][] expected2 = {
- {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t1\t4\t1", "t/p=1/q=1/delta_0000001_0000001_0000/bucket_00000"},
- {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t1\t1\t4\t3", "t/p=1/q=1/delta_0000001_0000001_0000/bucket_00000"},
- {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t1\t1\t5\t1", "t/p=1/q=1/delta_0000003_0000003_0000/bucket_00000"},
- {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":1}\t1\t1\t5\t3", "t/p=1/q=1/delta_0000003_0000003_0000/bucket_00000"},
+ {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t1\t4\t1", "t/p=1/q=1/delta_0000001_0000001_0000/bucket_00000_0"},
+ {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t1\t1\t4\t3", "t/p=1/q=1/delta_0000001_0000001_0000/bucket_00000_0"},
+ {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t1\t1\t5\t1", "t/p=1/q=1/delta_0000003_0000003_0000/bucket_00000_0"},
+ {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":1}\t1\t1\t5\t3", "t/p=1/q=1/delta_0000003_0000003_0000/bucket_00000_0"},
{"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2\t4\t2", "t/p=1/q=2/base_0000003_v0000020/bucket_00000"},
{"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t1\t2\t4\t4", "t/p=1/q=2/base_0000003_v0000020/bucket_00000"},
{"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t1\t2\t5\t2", "t/p=1/q=2/base_0000003_v0000020/bucket_00000"},
@@ -842,8 +848,8 @@ ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree /Users/ekoifman/dev/hiver
List<String> rs = runStatementOnDriver(query);
String[][] expected = {
//this proves data is written in Acid layout so T was made Acid
- {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000001_0000001_0000/bucket_00000"},
- {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000001_0000001_0000/bucket_00000"}
+ {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000001_0000001_0000/bucket_00000_0"},
+ {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000001_0000001_0000/bucket_00000_0"}
};
checkExpected(rs, expected, "insert data");
}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
index af14e62..1435269 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
@@ -23,6 +23,8 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.FileSystem;
@@ -231,14 +233,8 @@ public abstract class TxnCommandsBaseForTests {
* @param expectedFiles - suffixes of expected Paths. Must be the same length
* @param rootPath - table or partition root where to start looking for actual files, recursively
*/
- void assertExpectedFileSet(Set<String> expectedFiles, String rootPath) throws Exception {
- int suffixLength = 0;
- for(String s : expectedFiles) {
- if(suffixLength > 0) {
- assert suffixLength == s.length() : "all entries must be the same length. current: " + s;
- }
- suffixLength = s.length();
- }
+ void assertExpectedFileSet(Set<String> expectedFiles, String rootPath, String tableName) throws Exception {
+ Pattern pattern = Pattern.compile("(.+)/(" + tableName + "/[delete_delta|delta|base].+)");
FileSystem fs = FileSystem.get(hiveConf);
Set<String> actualFiles = new HashSet<>();
RemoteIterator<LocatedFileStatus> remoteIterator = fs.listFiles(new Path(rootPath), true);
@@ -246,7 +242,10 @@ public abstract class TxnCommandsBaseForTests {
LocatedFileStatus lfs = remoteIterator.next();
if(!lfs.isDirectory() && org.apache.hadoop.hive.common.FileUtils.HIDDEN_FILES_PATH_FILTER.accept(lfs.getPath())) {
String p = lfs.getPath().toString();
- actualFiles.add(p.substring(p.length() - suffixLength, p.length()));
+ Matcher matcher = pattern.matcher(p);
+ if (matcher.matches()) {
+ actualFiles.add(matcher.group(2));
+ }
}
}
Assert.assertEquals("Unexpected file list", expectedFiles, actualFiles);
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java
index 83db48e..801133d 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java
@@ -146,7 +146,7 @@ public class TestExecDriver {
db.createTable(src, cols, null, TextInputFormat.class,
HiveIgnoreKeyTextOutputFormat.class);
db.loadTable(hadoopDataFile[i], src, LoadFileType.KEEP_EXISTING,
- true, false, false, true, null, 0, false);
+ true, false, false, true, null, 0, false, false);
i++;
}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
index 2c4b69b..ebb51c4 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
@@ -286,7 +286,7 @@ public class TestFileSinkOperator {
DynamicPartitionCtx dpCtx = new DynamicPartitionCtx(partColMap, "Sunday", 100);
//todo: does this need the finalDestination?
desc = new FileSinkDesc(basePath, tableDesc, false, 1, false,
- false, 1, 1, partCols, dpCtx, null, null, false, false, false, false);
+ false, 1, 1, partCols, dpCtx, null, null, false, false, false, false, false);
} else {
desc = new FileSinkDesc(basePath, tableDesc, false);
}
@@ -705,7 +705,8 @@ public class TestFileSinkOperator {
int bucket,
ValidWriteIdList validWriteIdList,
Path baseDirectory,
- Path[] deltaDirectory) throws
+ Path[] deltaDirectory,
+ Map<String,String> deltaToAttemptId) throws
IOException {
return null;
}
@@ -778,6 +779,11 @@ public class TestFileSinkOperator {
public long getBufferedRowCount() {
return records.size();
}
+
+ @Override
+ public Path getUpdatedFilePath() {
+ return null;
+ }
};
}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
index 48e9afc..80fb1af 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
@@ -1243,6 +1243,7 @@ public class TestDbTxnManager2 {
//================
//test with predicates such that partition pruning doesn't kick in
+ driver.run("drop table if exists tab1");
driver.run("create table if not exists tab1 (a int, b int) partitioned by (p string) " +
"clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"); //txnid:4
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
index cfd7290..9a9ab53 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
@@ -383,7 +383,7 @@ public abstract class CompactorTest {
@Override
public RawReader<Text> getRawReader(Configuration conf, boolean collapseEvents, int bucket,
ValidWriteIdList validWriteIdList,
- Path baseDirectory, Path... deltaDirectory) throws IOException {
+ Path baseDirectory, Path[] deltaDirectory, Map<String, String> deltaToAttemptId) throws IOException {
List<Path> filesToRead = new ArrayList<Path>();
if (baseDirectory != null) {
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
index 70ae85c..443f982 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
@@ -162,7 +162,7 @@ public class TestWorker extends CompactorTest {
deltas[1] = new Path(delta2);
CompactorMR.CompactorInputSplit split =
- new CompactorMR.CompactorInputSplit(conf, 3, files, new Path(basename), deltas);
+ new CompactorMR.CompactorInputSplit(conf, 3, files, new Path(basename), deltas, new HashMap<String, String>());
Assert.assertEquals(520L, split.getLength());
String[] locations = split.getLocations();
@@ -207,7 +207,7 @@ public class TestWorker extends CompactorTest {
deltas[1] = new Path(delta2);
CompactorMR.CompactorInputSplit split =
- new CompactorMR.CompactorInputSplit(conf, 3, files, null, deltas);
+ new CompactorMR.CompactorInputSplit(conf, 3, files, null, deltas, new HashMap<String, String>());
ByteArrayOutputStream buf = new ByteArrayOutputStream();
DataOutput out = new DataOutputStream(buf);
diff --git a/ql/src/test/queries/clientpositive/tez_acid_union_dynamic_partition.q b/ql/src/test/queries/clientpositive/tez_acid_union_dynamic_partition.q
new file mode 100644
index 0000000..554ac30
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/tez_acid_union_dynamic_partition.q
@@ -0,0 +1,20 @@
+SET hive.vectorized.execution.enabled=false;
+set hive.mapred.mode=nonstrict;
+set hive.explain.user=false;
+set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
+set hive.support.concurrency=true;
+set hive.acid.direct.insert.enabled=true;
+
+create table dummy_n2(i int);
+insert into table dummy_n2 values (1);
+select * from dummy_n2;
+
+create table partunion1(id1 int) partitioned by (part1 string) stored as orc tblproperties('transactional'='true');
+
+insert into table partunion1 partition(part1)
+select temps.* from (
+select 1 as id1, '2014' as part1 from dummy_n2
+union all
+select 2 as id1, '2014' as part1 from dummy_n2 ) temps;
+
+select * from partunion1;
diff --git a/ql/src/test/queries/clientpositive/tez_acid_union_dynamic_partition_2.q b/ql/src/test/queries/clientpositive/tez_acid_union_dynamic_partition_2.q
new file mode 100644
index 0000000..3cdaef8
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/tez_acid_union_dynamic_partition_2.q
@@ -0,0 +1,25 @@
+SET hive.vectorized.execution.enabled=false;
+set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
+set hive.support.concurrency=true;
+set hive.acid.direct.insert.enabled=true;
+
+drop table if exists dummy_n7;
+drop table if exists partunion1_n0;
+
+create table dummy_n7(i int);
+insert into table dummy_n7 values (1);
+select * from dummy_n7;
+
+create table partunion1_n0(id1 int) partitioned by (part1 string) stored as orc tblproperties('transactional'='true');
+
+set hive.merge.tezfiles=true;
+
+insert into table partunion1_n0 partition(part1)
+select 1 as id1, '2014' as part1 from dummy_n7
+union all
+select 2 as id1, '2014' as part1 from dummy_n7;
+
+select * from partunion1_n0;
+
+drop table dummy_n7;
+drop table partunion1_n0;
diff --git a/ql/src/test/queries/clientpositive/tez_acid_union_multiinsert.q b/ql/src/test/queries/clientpositive/tez_acid_union_multiinsert.q
new file mode 100644
index 0000000..788d850
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/tez_acid_union_multiinsert.q
@@ -0,0 +1,94 @@
+--! qt:dataset:src
+set hive.explain.user=false;
+set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
+set hive.support.concurrency=true;
+set hive.acid.direct.insert.enabled=true;
+
+-- SORT_QUERY_RESULTS
+
+DROP TABLE IF EXISTS DEST1_acid_1;
+DROP TABLE IF EXISTS DEST1_acid_2;
+DROP TABLE IF EXISTS DEST1_acid_3;
+DROP TABLE IF EXISTS DEST1_acid_4;
+DROP TABLE IF EXISTS DEST1_acid_5;
+DROP TABLE IF EXISTS DEST1_acid_6;
+
+CREATE TABLE DEST1_acid_1(key STRING, value STRING) STORED AS ORC TBLPROPERTIES('transactional'='true');
+CREATE TABLE DEST1_acid_2(key STRING, val1 STRING, val2 STRING) STORED AS ORC TBLPROPERTIES('transactional'='true');
+CREATE TABLE DEST1_acid_3(key STRING, value STRING) STORED AS ORC TBLPROPERTIES('transactional'='true');
+CREATE TABLE DEST1_acid_4(key STRING, val1 STRING, val2 STRING) STORED AS ORC TBLPROPERTIES('transactional'='true');
+CREATE TABLE DEST1_acid_5(key STRING, value STRING) STORED AS ORC TBLPROPERTIES('transactional'='true');
+CREATE TABLE DEST1_acid_6(key STRING, val1 STRING, val2 STRING) STORED AS ORC TBLPROPERTIES('transactional'='true');
+CREATE TABLE DEST1_acid_7(key STRING, value STRING) STORED AS ORC TBLPROPERTIES('transactional'='true');
+CREATE TABLE DEST1_acid_8(key STRING, val1 STRING, val2 STRING) STORED AS ORC TBLPROPERTIES('transactional'='true');
+CREATE TABLE DEST1_acid_9(key STRING, value STRING) STORED AS ORC TBLPROPERTIES('transactional'='true');
+CREATE TABLE DEST1_acid_10(key STRING, val1 STRING, val2 STRING) STORED AS ORC TBLPROPERTIES('transactional'='true');
+
+FROM (
+ select key, value from (
+ select 'tst1' as key, cast(count(1) as string) as value, 'tst1' as value2 from src s1
+ UNION all
+ select s2.key as key, s2.value as value, 'tst1' as value2 from src s2) unionsub
+ UNION all
+ select key, value from src s0
+ ) unionsrc
+INSERT INTO TABLE DEST1_acid_1 SELECT unionsrc.key, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) GROUP BY unionsrc.key
+INSERT INTO TABLE DEST1_acid_2 SELECT unionsrc.key, unionsrc.value, COUNT(DISTINCT SUBSTR(unionsrc.value,5))
+GROUP BY unionsrc.key, unionsrc.value;
+
+select * from DEST1_acid_1;
+select * from DEST1_acid_2;
+
+FROM (
+ select key, value from src s0
+ UNION all
+ select key, value from (
+ select 'tst1' as key, cast(count(1) as string) as value, 'tst1' as value2 from src s1
+ UNION all
+ select s2.key as key, s2.value as value, 'tst1' as value2 from src s2) unionsub) unionsrc
+INSERT INTO TABLE DEST1_acid_3 SELECT unionsrc.key, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) GROUP BY unionsrc.key
+INSERT INTO TABLE DEST1_acid_4 SELECT unionsrc.key, unionsrc.value, COUNT(DISTINCT SUBSTR(unionsrc.value,5))
+GROUP BY unionsrc.key, unionsrc.value;
+
+select * from DEST1_acid_3;
+select * from DEST1_acid_4;
+
+FROM (
+ select key, value from src s0
+ UNION all
+ select 'tst1' as key, cast(count(1) as string) as value from src s1
+ UNION all
+ select s2.key as key, s2.value as value from src s2) unionsrc
+INSERT INTO TABLE DEST1_acid_5 SELECT unionsrc.key, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) GROUP BY unionsrc.key
+INSERT INTO TABLE DEST1_acid_6 SELECT unionsrc.key, unionsrc.value, COUNT(DISTINCT SUBSTR(unionsrc.value,5))
+GROUP BY unionsrc.key, unionsrc.value;
+
+select * from DEST1_acid_5;
+select * from DEST1_acid_6;
+
+FROM (select 'tst1' as key, cast(count(1) as string) as value from src s1
+ UNION all
+ select s2.key as key, s2.value as value from src s2) unionsrc
+INSERT INTO TABLE DEST1_acid_7 SELECT unionsrc.key, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) GROUP BY unionsrc.key
+INSERT INTO TABLE DEST1_acid_8 SELECT unionsrc.key, unionsrc.value, COUNT(DISTINCT SUBSTR(unionsrc.value,5))
+GROUP BY unionsrc.key, unionsrc.value;
+
+select * from DEST1_acid_7;
+select * from DEST1_acid_8;
+
+FROM (select 'tst1' as key, cast(count(1) as string) as value from src s1
+ UNION distinct
+ select s2.key as key, s2.value as value from src s2) unionsrc
+INSERT INTO TABLE DEST1_acid_9 SELECT unionsrc.key, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) GROUP BY unionsrc.key
+INSERT INTO TABLE DEST1_acid_10 SELECT unionsrc.key, unionsrc.value, COUNT(DISTINCT SUBSTR(unionsrc.value,5))
+GROUP BY unionsrc.key, unionsrc.value;
+
+select * from DEST1_acid_9;
+select * from DEST1_acid_10;
+
+DROP TABLE IF EXISTS DEST1_acid_1;
+DROP TABLE IF EXISTS DEST1_acid_2;
+DROP TABLE IF EXISTS DEST1_acid_3;
+DROP TABLE IF EXISTS DEST1_acid_4;
+DROP TABLE IF EXISTS DEST1_acid_5;
+DROP TABLE IF EXISTS DEST1_acid_6;
\ No newline at end of file
diff --git a/ql/src/test/results/clientpositive/acid_subquery.q.out b/ql/src/test/results/clientpositive/acid_subquery.q.out
index 1dc1775..4ca2e5a 100644
--- a/ql/src/test/results/clientpositive/acid_subquery.q.out
+++ b/ql/src/test/results/clientpositive/acid_subquery.q.out
@@ -95,8 +95,17 @@ POSTHOOK: Input: default@target@p=2/q=2
POSTHOOK: Output: default@merge_tmp_table
POSTHOOK: Output: default@target@p=1/q=2
POSTHOOK: Output: default@target@p=1/q=2
+POSTHOOK: Output: default@target@p=1/q=2
+POSTHOOK: Output: default@target@p=1/q=3
POSTHOOK: Output: default@target@p=1/q=3
POSTHOOK: Output: default@target@p=1/q=3
POSTHOOK: Output: default@target@p=2/q=2
POSTHOOK: Output: default@target@p=2/q=2
+POSTHOOK: Output: default@target@p=2/q=2
POSTHOOK: Lineage: merge_tmp_table.val EXPRESSION [(target)t.FieldSchema(name:ROW__ID, type:struct<writeId:bigint,bucketId:int,rowId:bigint>, comment:), (target)t.FieldSchema(name:p, type:int, comment:null), (target)t.FieldSchema(name:q, type:int, comment:null), ]
+POSTHOOK: Lineage: target PARTITION(p=1,q=2).a SIMPLE [(source)s.FieldSchema(name:a1, type:int, comment:null), ]
+POSTHOOK: Lineage: target PARTITION(p=1,q=2).b SIMPLE [(source)s.FieldSchema(name:b1, type:int, comment:null), ]
+POSTHOOK: Lineage: target PARTITION(p=1,q=3).a SIMPLE [(source)s.FieldSchema(name:a1, type:int, comment:null), ]
+POSTHOOK: Lineage: target PARTITION(p=1,q=3).b SIMPLE [(source)s.FieldSchema(name:b1, type:int, comment:null), ]
+POSTHOOK: Lineage: target PARTITION(p=2,q=2).a SIMPLE [(source)s.FieldSchema(name:a1, type:int, comment:null), ]
+POSTHOOK: Lineage: target PARTITION(p=2,q=2).b SIMPLE [(source)s.FieldSchema(name:b1, type:int, comment:null), ]
diff --git a/ql/src/test/results/clientpositive/create_transactional_full_acid.q.out b/ql/src/test/results/clientpositive/create_transactional_full_acid.q.out
index e324d5e..04b16a0 100644
--- a/ql/src/test/results/clientpositive/create_transactional_full_acid.q.out
+++ b/ql/src/test/results/clientpositive/create_transactional_full_acid.q.out
@@ -190,8 +190,17 @@ POSTHOOK: Input: default@target@p=2/q=2
POSTHOOK: Output: default@merge_tmp_table
POSTHOOK: Output: default@target@p=1/q=2
POSTHOOK: Output: default@target@p=1/q=2
+POSTHOOK: Output: default@target@p=1/q=2
+POSTHOOK: Output: default@target@p=1/q=3
POSTHOOK: Output: default@target@p=1/q=3
POSTHOOK: Output: default@target@p=1/q=3
POSTHOOK: Output: default@target@p=2/q=2
POSTHOOK: Output: default@target@p=2/q=2
+POSTHOOK: Output: default@target@p=2/q=2
POSTHOOK: Lineage: merge_tmp_table.val EXPRESSION [(target)t.FieldSchema(name:ROW__ID, type:struct<writeId:bigint,bucketId:int,rowId:bigint>, comment:), (target)t.FieldSchema(name:p, type:int, comment:null), (target)t.FieldSchema(name:q, type:int, comment:null), ]
+POSTHOOK: Lineage: target PARTITION(p=1,q=2).a SIMPLE [(source)s.FieldSchema(name:a1, type:int, comment:null), ]
+POSTHOOK: Lineage: target PARTITION(p=1,q=2).b SIMPLE [(source)s.FieldSchema(name:b1, type:int, comment:null), ]
+POSTHOOK: Lineage: target PARTITION(p=1,q=3).a SIMPLE [(source)s.FieldSchema(name:a1, type:int, comment:null), ]
+POSTHOOK: Lineage: target PARTITION(p=1,q=3).b SIMPLE [(source)s.FieldSchema(name:b1, type:int, comment:null), ]
+POSTHOOK: Lineage: target PARTITION(p=2,q=2).a SIMPLE [(source)s.FieldSchema(name:a1, type:int, comment:null), ]
+POSTHOOK: Lineage: target PARTITION(p=2,q=2).b SIMPLE [(source)s.FieldSchema(name:b1, type:int, comment:null), ]
diff --git a/ql/src/test/results/clientpositive/encrypted/encryption_insert_partition_dynamic.q.out b/ql/src/test/results/clientpositive/encrypted/encryption_insert_partition_dynamic.q.out
index 61b0057..f9c7060 100644
--- a/ql/src/test/results/clientpositive/encrypted/encryption_insert_partition_dynamic.q.out
+++ b/ql/src/test/results/clientpositive/encrypted/encryption_insert_partition_dynamic.q.out
@@ -73,8 +73,12 @@ insert into table encryptedTable_n0 partition (key)
POSTHOOK: type: QUERY
POSTHOOK: Input: default@src
POSTHOOK: Output: default@encryptedtable_n0@key=238
+POSTHOOK: Output: default@encryptedtable_n0@key=501
+POSTHOOK: Output: default@encryptedtable_n0@key=502
POSTHOOK: Output: default@encryptedtable_n0@key=86
POSTHOOK: Lineage: encryptedtable_n0 PARTITION(key=238).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: encryptedtable_n0 PARTITION(key=501).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: encryptedtable_n0 PARTITION(key=502).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
POSTHOOK: Lineage: encryptedtable_n0 PARTITION(key=86).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
PREHOOK: query: select * from encryptedTable_n0 order by key
PREHOOK: type: QUERY
diff --git a/ql/src/test/results/clientpositive/llap/acid_no_buckets.q.out b/ql/src/test/results/clientpositive/llap/acid_no_buckets.q.out
index 03b8dc3..699398b 100644
--- a/ql/src/test/results/clientpositive/llap/acid_no_buckets.q.out
+++ b/ql/src/test/results/clientpositive/llap/acid_no_buckets.q.out
@@ -611,13 +611,25 @@ POSTHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=12
POSTHOOK: Output: default@merge_tmp_table
POSTHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=11
POSTHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=12
POSTHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=12
POSTHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=12
POSTHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=11
POSTHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=12
POSTHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=12
POSTHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=12
POSTHOOK: Lineage: merge_tmp_table.val EXPRESSION [(srcpart_acid)t.FieldSchema(name:ROW__ID, type:struct<writeId:bigint,bucketId:int,rowId:bigint>, comment:), (srcpart_acid)t.FieldSchema(name:ds, type:string, comment:null), (srcpart_acid)t.FieldSchema(name:hr, type:string, comment:null), ]
+POSTHOOK: Lineage: srcpart_acid PARTITION(ds=2008-04-08,hr=11).key SIMPLE []
+POSTHOOK: Lineage: srcpart_acid PARTITION(ds=2008-04-08,hr=11).value SIMPLE []
+POSTHOOK: Lineage: srcpart_acid PARTITION(ds=2008-04-08,hr=12).key SIMPLE []
+POSTHOOK: Lineage: srcpart_acid PARTITION(ds=2008-04-08,hr=12).value SIMPLE []
+POSTHOOK: Lineage: srcpart_acid PARTITION(ds=2008-04-09,hr=11).key SIMPLE []
+POSTHOOK: Lineage: srcpart_acid PARTITION(ds=2008-04-09,hr=11).value SIMPLE []
+POSTHOOK: Lineage: srcpart_acid PARTITION(ds=2008-04-09,hr=12).key SIMPLE []
+POSTHOOK: Lineage: srcpart_acid PARTITION(ds=2008-04-09,hr=12).value SIMPLE []
PREHOOK: query: select count(*) from srcpart_acid where ds='2008-04-08' and hr=='12'
PREHOOK: type: QUERY
PREHOOK: Input: default@srcpart_acid
@@ -626,7 +638,7 @@ POSTHOOK: query: select count(*) from srcpart_acid where ds='2008-04-08' and hr=
POSTHOOK: type: QUERY
POSTHOOK: Input: default@srcpart_acid
#### A masked pattern was here ####
-0
+497
PREHOOK: query: select ds, hr, key, value from srcpart_acid where value like '%updated by merge'
PREHOOK: type: QUERY
PREHOOK: Input: default@srcpart_acid
@@ -1150,13 +1162,25 @@ POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=12
POSTHOOK: Output: default@merge_tmp_table
POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=11
POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=12
POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=12
POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=12
POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=11
POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=12
POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=12
POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=12
POSTHOOK: Lineage: merge_tmp_table.val EXPRESSION [(srcpart_acidb)t.FieldSchema(name:ROW__ID, type:struct<writeId:bigint,bucketId:int,rowId:bigint>, comment:), (srcpart_acidb)t.FieldSchema(name:ds, type:string, comment:null), (srcpart_acidb)t.FieldSchema(name:hr, type:string, comment:null), ]
+POSTHOOK: Lineage: srcpart_acidb PARTITION(ds=2008-04-08,hr=11).key SIMPLE []
+POSTHOOK: Lineage: srcpart_acidb PARTITION(ds=2008-04-08,hr=11).value SIMPLE []
+POSTHOOK: Lineage: srcpart_acidb PARTITION(ds=2008-04-08,hr=12).key SIMPLE []
+POSTHOOK: Lineage: srcpart_acidb PARTITION(ds=2008-04-08,hr=12).value SIMPLE []
+POSTHOOK: Lineage: srcpart_acidb PARTITION(ds=2008-04-09,hr=11).key SIMPLE []
+POSTHOOK: Lineage: srcpart_acidb PARTITION(ds=2008-04-09,hr=11).value SIMPLE []
+POSTHOOK: Lineage: srcpart_acidb PARTITION(ds=2008-04-09,hr=12).key SIMPLE []
+POSTHOOK: Lineage: srcpart_acidb PARTITION(ds=2008-04-09,hr=12).value SIMPLE []
PREHOOK: query: select count(*) from srcpart_acidb where ds='2008-04-08' and hr=='12'
PREHOOK: type: QUERY
PREHOOK: Input: default@srcpart_acidb
@@ -1165,7 +1189,7 @@ POSTHOOK: query: select count(*) from srcpart_acidb where ds='2008-04-08' and hr
POSTHOOK: type: QUERY
POSTHOOK: Input: default@srcpart_acidb
#### A masked pattern was here ####
-0
+497
PREHOOK: query: select ds, hr, key, value from srcpart_acidb where value like '%updated by merge'
PREHOOK: type: QUERY
PREHOOK: Input: default@srcpart_acidb
@@ -1997,13 +2021,25 @@ POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=12
POSTHOOK: Output: default@merge_tmp_table
POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=11
POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=12
POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=12
POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=12
POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=11
POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=12
POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=12
POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=12
POSTHOOK: Lineage: merge_tmp_table.val EXPRESSION [(srcpart_acidv)t.FieldSchema(name:ROW__ID, type:struct<writeId:bigint,bucketId:int,rowId:bigint>, comment:), (srcpart_acidv)t.FieldSchema(name:ds, type:string, comment:null), (srcpart_acidv)t.FieldSchema(name:hr, type:string, comment:null), ]
+POSTHOOK: Lineage: srcpart_acidv PARTITION(ds=2008-04-08,hr=11).key SIMPLE []
+POSTHOOK: Lineage: srcpart_acidv PARTITION(ds=2008-04-08,hr=11).value SIMPLE []
+POSTHOOK: Lineage: srcpart_acidv PARTITION(ds=2008-04-08,hr=12).key SIMPLE []
+POSTHOOK: Lineage: srcpart_acidv PARTITION(ds=2008-04-08,hr=12).value SIMPLE []
+POSTHOOK: Lineage: srcpart_acidv PARTITION(ds=2008-04-09,hr=11).key SIMPLE []
+POSTHOOK: Lineage: srcpart_acidv PARTITION(ds=2008-04-09,hr=11).value SIMPLE []
+POSTHOOK: Lineage: srcpart_acidv PARTITION(ds=2008-04-09,hr=12).key SIMPLE []
+POSTHOOK: Lineage: srcpart_acidv PARTITION(ds=2008-04-09,hr=12).value SIMPLE []
PREHOOK: query: select count(*) from srcpart_acidv where ds='2008-04-08' and hr=='12'
PREHOOK: type: QUERY
PREHOOK: Input: default@srcpart_acidv
@@ -2012,7 +2048,7 @@ POSTHOOK: query: select count(*) from srcpart_acidv where ds='2008-04-08' and hr
POSTHOOK: type: QUERY
POSTHOOK: Input: default@srcpart_acidv
#### A masked pattern was here ####
-0
+497
PREHOOK: query: select ds, hr, key, value from srcpart_acidv where value like '%updated by merge'
PREHOOK: type: QUERY
PREHOOK: Input: default@srcpart_acidv
@@ -2853,13 +2889,25 @@ POSTHOOK: Input: default@srcpart_acidvb@ds=2008-04-09/hr=12
POSTHOOK: Output: default@merge_tmp_table
POSTHOOK: Output: default@srcpart_acidvb@ds=2008-04-08/hr=11
POSTHOOK: Output: default@srcpart_acidvb@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acidvb@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acidvb@ds=2008-04-08/hr=12
POSTHOOK: Output: default@srcpart_acidvb@ds=2008-04-08/hr=12
POSTHOOK: Output: default@srcpart_acidvb@ds=2008-04-08/hr=12
POSTHOOK: Output: default@srcpart_acidvb@ds=2008-04-09/hr=11
POSTHOOK: Output: default@srcpart_acidvb@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acidvb@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acidvb@ds=2008-04-09/hr=12
POSTHOOK: Output: default@srcpart_acidvb@ds=2008-04-09/hr=12
POSTHOOK: Output: default@srcpart_acidvb@ds=2008-04-09/hr=12
POSTHOOK: Lineage: merge_tmp_table.val EXPRESSION [(srcpart_acidvb)t.FieldSchema(name:ROW__ID, type:struct<writeId:bigint,bucketId:int,rowId:bigint>, comment:), (srcpart_acidvb)t.FieldSchema(name:ds, type:string, comment:null), (srcpart_acidvb)t.FieldSchema(name:hr, type:string, comment:null), ]
+POSTHOOK: Lineage: srcpart_acidvb PARTITION(ds=2008-04-08,hr=11).key SIMPLE []
+POSTHOOK: Lineage: srcpart_acidvb PARTITION(ds=2008-04-08,hr=11).value SIMPLE []
+POSTHOOK: Lineage: srcpart_acidvb PARTITION(ds=2008-04-08,hr=12).key SIMPLE []
+POSTHOOK: Lineage: srcpart_acidvb PARTITION(ds=2008-04-08,hr=12).value SIMPLE []
+POSTHOOK: Lineage: srcpart_acidvb PARTITION(ds=2008-04-09,hr=11).key SIMPLE []
+POSTHOOK: Lineage: srcpart_acidvb PARTITION(ds=2008-04-09,hr=11).value SIMPLE []
+POSTHOOK: Lineage: srcpart_acidvb PARTITION(ds=2008-04-09,hr=12).key SIMPLE []
+POSTHOOK: Lineage: srcpart_acidvb PARTITION(ds=2008-04-09,hr=12).value SIMPLE []
PREHOOK: query: select count(*) from srcpart_acidvb where ds='2008-04-08' and hr=='12'
PREHOOK: type: QUERY
PREHOOK: Input: default@srcpart_acidvb
@@ -2868,7 +2916,7 @@ POSTHOOK: query: select count(*) from srcpart_acidvb where ds='2008-04-08' and h
POSTHOOK: type: QUERY
POSTHOOK: Input: default@srcpart_acidvb
#### A masked pattern was here ####
-0
+497
PREHOOK: query: select ds, hr, key, value from srcpart_acidvb where value like '%updated by merge'
PREHOOK: type: QUERY
PREHOOK: Input: default@srcpart_acidvb
diff --git a/ql/src/test/results/clientpositive/llap/insert_overwrite.q.out b/ql/src/test/results/clientpositive/llap/insert_overwrite.q.out
index fbc3326..1ee38d3 100644
--- a/ql/src/test/results/clientpositive/llap/insert_overwrite.q.out
+++ b/ql/src/test/results/clientpositive/llap/insert_overwrite.q.out
@@ -372,6 +372,12 @@ PREHOOK: Output: default@int_part
POSTHOOK: query: INSERT OVERWRITE TABLE int_part PARTITION (par) SELECT * FROM b
POSTHOOK: type: QUERY
POSTHOOK: Input: default@b
+POSTHOOK: Output: default@int_part@par=1
+POSTHOOK: Output: default@int_part@par=2
+POSTHOOK: Output: default@int_part@par=3
+POSTHOOK: Lineage: int_part PARTITION(par=1).col SIMPLE [(b)b.FieldSchema(name:par, type:string, comment:null), ]
+POSTHOOK: Lineage: int_part PARTITION(par=2).col SIMPLE [(b)b.FieldSchema(name:par, type:string, comment:null), ]
+POSTHOOK: Lineage: int_part PARTITION(par=3).col SIMPLE [(b)b.FieldSchema(name:par, type:string, comment:null), ]
PREHOOK: query: SELECT count(*) FROM int_part
PREHOOK: type: QUERY
PREHOOK: Input: default@int_part
@@ -429,7 +435,11 @@ POSTHOOK: query: INSERT OVERWRITE TABLE int_part PARTITION (par) SELECT * FROM b
POSTHOOK: type: QUERY
POSTHOOK: Input: default@b
POSTHOOK: Output: default@int_part@par=1
+POSTHOOK: Output: default@int_part@par=2
+POSTHOOK: Output: default@int_part@par=3
POSTHOOK: Lineage: int_part PARTITION(par=1).col SIMPLE [(b)b.FieldSchema(name:par, type:string, comment:null), ]
+POSTHOOK: Lineage: int_part PARTITION(par=2).col SIMPLE [(b)b.FieldSchema(name:par, type:string, comment:null), ]
+POSTHOOK: Lineage: int_part PARTITION(par=3).col SIMPLE [(b)b.FieldSchema(name:par, type:string, comment:null), ]
PREHOOK: query: SELECT count(*) FROM int_part
PREHOOK: type: QUERY
PREHOOK: Input: default@int_part
diff --git a/ql/src/test/results/clientpositive/llap/mm_all.q.out b/ql/src/test/results/clientpositive/llap/mm_all.q.out
index 226f2a9..fd28d39 100644
--- a/ql/src/test/results/clientpositive/llap/mm_all.q.out
+++ b/ql/src/test/results/clientpositive/llap/mm_all.q.out
@@ -1633,6 +1633,7 @@ POSTHOOK: Input: default@intermediate_n0@p=455
POSTHOOK: Input: default@intermediate_n0@p=456
POSTHOOK: Input: default@intermediate_n0@p=457
POSTHOOK: Output: default@multi1_mm@p=1
+POSTHOOK: Output: default@multi1_mm@p=2
POSTHOOK: Output: default@multi1_mm@p=455
POSTHOOK: Output: default@multi1_mm@p=456
POSTHOOK: Output: default@multi1_mm@p=457
@@ -1646,6 +1647,8 @@ POSTHOOK: Lineage: ###Masked###
POSTHOOK: Lineage: ###Masked###
POSTHOOK: Lineage: ###Masked###
POSTHOOK: Lineage: ###Masked###
+POSTHOOK: Lineage: ###Masked###
+POSTHOOK: Lineage: ###Masked###
PREHOOK: query: select key, key2, p from multi1_mm order by key, key2, p
PREHOOK: type: QUERY
PREHOOK: Input: default@multi1_mm
@@ -1713,6 +1716,18 @@ POSTHOOK: Input: default@intermediate_n0@p=455
POSTHOOK: Input: default@intermediate_n0@p=456
POSTHOOK: Input: default@intermediate_n0@p=457
POSTHOOK: Output: default@multi1_mm@p=1
+POSTHOOK: Output: default@multi1_mm@p=2
+POSTHOOK: Output: default@multi1_mm@p=455
+POSTHOOK: Output: default@multi1_mm@p=456
+POSTHOOK: Output: default@multi1_mm@p=457
+POSTHOOK: Lineage: ###Masked###
+POSTHOOK: Lineage: ###Masked###
+POSTHOOK: Lineage: ###Masked###
+POSTHOOK: Lineage: ###Masked###
+POSTHOOK: Lineage: ###Masked###
+POSTHOOK: Lineage: ###Masked###
+POSTHOOK: Lineage: ###Masked###
+POSTHOOK: Lineage: ###Masked###
POSTHOOK: Lineage: ###Masked###
POSTHOOK: Lineage: ###Masked###
POSTHOOK: Lineage: ###Masked###
diff --git a/ql/src/test/results/clientpositive/llap/tez_acid_union_dynamic_partition.q.out b/ql/src/test/results/clientpositive/llap/tez_acid_union_dynamic_partition.q.out
new file mode 100644
index 0000000..07b0efb
--- /dev/null
+++ b/ql/src/test/results/clientpositive/llap/tez_acid_union_dynamic_partition.q.out
@@ -0,0 +1,63 @@
+PREHOOK: query: create table dummy_n2(i int)
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@dummy_n2
+POSTHOOK: query: create table dummy_n2(i int)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@dummy_n2
+PREHOOK: query: insert into table dummy_n2 values (1)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@dummy_n2
+POSTHOOK: query: insert into table dummy_n2 values (1)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@dummy_n2
+POSTHOOK: Lineage: dummy_n2.i SCRIPT []
+PREHOOK: query: select * from dummy_n2
+PREHOOK: type: QUERY
+PREHOOK: Input: default@dummy_n2
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: select * from dummy_n2
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@dummy_n2
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+1
+PREHOOK: query: create table partunion1(id1 int) partitioned by (part1 string) stored as orc tblproperties('transactional'='true')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@partunion1
+POSTHOOK: query: create table partunion1(id1 int) partitioned by (part1 string) stored as orc tblproperties('transactional'='true')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@partunion1
+PREHOOK: query: insert into table partunion1 partition(part1)
+select temps.* from (
+select 1 as id1, '2014' as part1 from dummy_n2
+union all
+select 2 as id1, '2014' as part1 from dummy_n2 ) temps
+PREHOOK: type: QUERY
+PREHOOK: Input: default@dummy_n2
+PREHOOK: Output: default@partunion1
+POSTHOOK: query: insert into table partunion1 partition(part1)
+select temps.* from (
+select 1 as id1, '2014' as part1 from dummy_n2
+union all
+select 2 as id1, '2014' as part1 from dummy_n2 ) temps
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@dummy_n2
+POSTHOOK: Output: default@partunion1@part1=2014
+POSTHOOK: Lineage: partunion1 PARTITION(part1=2014).id1 EXPRESSION []
+PREHOOK: query: select * from partunion1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@partunion1
+PREHOOK: Input: default@partunion1@part1=2014
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: select * from partunion1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@partunion1
+POSTHOOK: Input: default@partunion1@part1=2014
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+1 2014
+2 2014
diff --git a/ql/src/test/results/clientpositive/llap/tez_acid_union_dynamic_partition_2.q.out b/ql/src/test/results/clientpositive/llap/tez_acid_union_dynamic_partition_2.q.out
new file mode 100644
index 0000000..7e664e4
--- /dev/null
+++ b/ql/src/test/results/clientpositive/llap/tez_acid_union_dynamic_partition_2.q.out
@@ -0,0 +1,85 @@
+PREHOOK: query: drop table if exists dummy_n7
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table if exists dummy_n7
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: drop table if exists partunion1_n0
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table if exists partunion1_n0
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: create table dummy_n7(i int)
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@dummy_n7
+POSTHOOK: query: create table dummy_n7(i int)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@dummy_n7
+PREHOOK: query: insert into table dummy_n7 values (1)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@dummy_n7
+POSTHOOK: query: insert into table dummy_n7 values (1)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@dummy_n7
+POSTHOOK: Lineage: dummy_n7.i SCRIPT []
+PREHOOK: query: select * from dummy_n7
+PREHOOK: type: QUERY
+PREHOOK: Input: default@dummy_n7
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: select * from dummy_n7
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@dummy_n7
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+1
+PREHOOK: query: create table partunion1_n0(id1 int) partitioned by (part1 string) stored as orc tblproperties('transactional'='true')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@partunion1_n0
+POSTHOOK: query: create table partunion1_n0(id1 int) partitioned by (part1 string) stored as orc tblproperties('transactional'='true')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@partunion1_n0
+PREHOOK: query: insert into table partunion1_n0 partition(part1)
+select 1 as id1, '2014' as part1 from dummy_n7
+union all
+select 2 as id1, '2014' as part1 from dummy_n7
+PREHOOK: type: QUERY
+PREHOOK: Input: default@dummy_n7
+PREHOOK: Output: default@partunion1_n0
+POSTHOOK: query: insert into table partunion1_n0 partition(part1)
+select 1 as id1, '2014' as part1 from dummy_n7
+union all
+select 2 as id1, '2014' as part1 from dummy_n7
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@dummy_n7
+POSTHOOK: Output: default@partunion1_n0@part1=2014
+POSTHOOK: Lineage: partunion1_n0 PARTITION(part1=2014).id1 EXPRESSION []
+PREHOOK: query: select * from partunion1_n0
+PREHOOK: type: QUERY
+PREHOOK: Input: default@partunion1_n0
+PREHOOK: Input: default@partunion1_n0@part1=2014
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: select * from partunion1_n0
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@partunion1_n0
+POSTHOOK: Input: default@partunion1_n0@part1=2014
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+1 2014
+2 2014
+PREHOOK: query: drop table dummy_n7
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@dummy_n7
+PREHOOK: Output: default@dummy_n7
+POSTHOOK: query: drop table dummy_n7
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@dummy_n7
+POSTHOOK: Output: default@dummy_n7
+PREHOOK: query: drop table partunion1_n0
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@partunion1_n0
+PREHOOK: Output: default@partunion1_n0
+POSTHOOK: query: drop table partunion1_n0
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@partunion1_n0
+POSTHOOK: Output: default@partunion1_n0
diff --git a/ql/src/test/results/clientpositive/llap/tez_acid_union_multiinsert.q.out b/ql/src/test/results/clientpositive/llap/tez_acid_union_multiinsert.q.out
new file mode 100644
index 0000000..d91a321
--- /dev/null
+++ b/ql/src/test/results/clientpositive/llap/tez_acid_union_multiinsert.q.out
@@ -0,0 +1,3481 @@
+PREHOOK: query: DROP TABLE IF EXISTS DEST1_acid_1
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: DROP TABLE IF EXISTS DEST1_acid_1
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: DROP TABLE IF EXISTS DEST1_acid_2
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: DROP TABLE IF EXISTS DEST1_acid_2
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: DROP TABLE IF EXISTS DEST1_acid_3
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: DROP TABLE IF EXISTS DEST1_acid_3
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: DROP TABLE IF EXISTS DEST1_acid_4
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: DROP TABLE IF EXISTS DEST1_acid_4
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: DROP TABLE IF EXISTS DEST1_acid_5
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: DROP TABLE IF EXISTS DEST1_acid_5
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: DROP TABLE IF EXISTS DEST1_acid_6
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: DROP TABLE IF EXISTS DEST1_acid_6
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: CREATE TABLE DEST1_acid_1(key STRING, value STRING) STORED AS ORC TBLPROPERTIES('transactional'='true')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@DEST1_acid_1
+POSTHOOK: query: CREATE TABLE DEST1_acid_1(key STRING, value STRING) STORED AS ORC TBLPROPERTIES('transactional'='true')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@DEST1_acid_1
+PREHOOK: query: CREATE TABLE DEST1_acid_2(key STRING, val1 STRING, val2 STRING) STORED AS ORC TBLPROPERTIES('transactional'='true')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@DEST1_acid_2
+POSTHOOK: query: CREATE TABLE DEST1_acid_2(key STRING, val1 STRING, val2 STRING) STORED AS ORC TBLPROPERTIES('transactional'='true')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@DEST1_acid_2
+PREHOOK: query: CREATE TABLE DEST1_acid_3(key STRING, value STRING) STORED AS ORC TBLPROPERTIES('transactional'='true')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@DEST1_acid_3
+POSTHOOK: query: CREATE TABLE DEST1_acid_3(key STRING, value STRING) STORED AS ORC TBLPROPERTIES('transactional'='true')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@DEST1_acid_3
+PREHOOK: query: CREATE TABLE DEST1_acid_4(key STRING, val1 STRING, val2 STRING) STORED AS ORC TBLPROPERTIES('transactional'='true')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@DEST1_acid_4
+POSTHOOK: query: CREATE TABLE DEST1_acid_4(key STRING, val1 STRING, val2 STRING) STORED AS ORC TBLPROPERTIES('transactional'='true')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@DEST1_acid_4
+PREHOOK: query: CREATE TABLE DEST1_acid_5(key STRING, value STRING) STORED AS ORC TBLPROPERTIES('transactional'='true')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@DEST1_acid_5
+POSTHOOK: query: CREATE TABLE DEST1_acid_5(key STRING, value STRING) STORED AS ORC TBLPROPERTIES('transactional'='true')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@DEST1_acid_5
+PREHOOK: query: CREATE TABLE DEST1_acid_6(key STRING, val1 STRING, val2 STRING) STORED AS ORC TBLPROPERTIES('transactional'='true')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@DEST1_acid_6
+POSTHOOK: query: CREATE TABLE DEST1_acid_6(key STRING, val1 STRING, val2 STRING) STORED AS ORC TBLPROPERTIES('transactional'='true')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@DEST1_acid_6
+PREHOOK: query: CREATE TABLE DEST1_acid_7(key STRING, value STRING) STORED AS ORC TBLPROPERTIES('transactional'='true')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@DEST1_acid_7
+POSTHOOK: query: CREATE TABLE DEST1_acid_7(key STRING, value STRING) STORED AS ORC TBLPROPERTIES('transactional'='true')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@DEST1_acid_7
+PREHOOK: query: CREATE TABLE DEST1_acid_8(key STRING, val1 STRING, val2 STRING) STORED AS ORC TBLPROPERTIES('transactional'='true')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@DEST1_acid_8
+POSTHOOK: query: CREATE TABLE DEST1_acid_8(key STRING, val1 STRING, val2 STRING) STORED AS ORC TBLPROPERTIES('transactional'='true')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@DEST1_acid_8
+PREHOOK: query: CREATE TABLE DEST1_acid_9(key STRING, value STRING) STORED AS ORC TBLPROPERTIES('transactional'='true')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@DEST1_acid_9
+POSTHOOK: query: CREATE TABLE DEST1_acid_9(key STRING, value STRING) STORED AS ORC TBLPROPERTIES('transactional'='true')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@DEST1_acid_9
+PREHOOK: query: CREATE TABLE DEST1_acid_10(key STRING, val1 STRING, val2 STRING) STORED AS ORC TBLPROPERTIES('transactional'='true')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@DEST1_acid_10
+POSTHOOK: query: CREATE TABLE DEST1_acid_10(key STRING, val1 STRING, val2 STRING) STORED AS ORC TBLPROPERTIES('transactional'='true')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@DEST1_acid_10
+PREHOOK: query: FROM (
+ select key, value from (
+ select 'tst1' as key, cast(count(1) as string) as value, 'tst1' as value2 from src s1
+ UNION all
+ select s2.key as key, s2.value as value, 'tst1' as value2 from src s2) unionsub
+ UNION all
+ select key, value from src s0
+ ) unionsrc
+INSERT INTO TABLE DEST1_acid_1 SELECT unionsrc.key, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) GROUP BY unionsrc.key
+INSERT INTO TABLE DEST1_acid_2 SELECT unionsrc.key, unionsrc.value, COUNT(DISTINCT SUBSTR(unionsrc.value,5))
+GROUP BY unionsrc.key, unionsrc.value
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@dest1_acid_1
+PREHOOK: Output: default@dest1_acid_2
+POSTHOOK: query: FROM (
+ select key, value from (
+ select 'tst1' as key, cast(count(1) as string) as value, 'tst1' as value2 from src s1
+ UNION all
+ select s2.key as key, s2.value as value, 'tst1' as value2 from src s2) unionsub
+ UNION all
+ select key, value from src s0
+ ) unionsrc
+INSERT INTO TABLE DEST1_acid_1 SELECT unionsrc.key, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) GROUP BY unionsrc.key
+INSERT INTO TABLE DEST1_acid_2 SELECT unionsrc.key, unionsrc.value, COUNT(DISTINCT SUBSTR(unionsrc.value,5))
+GROUP BY unionsrc.key, unionsrc.value
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@dest1_acid_1
+POSTHOOK: Output: default@dest1_acid_2
+POSTHOOK: Lineage: dest1_acid_1.key EXPRESSION [(src)s2.FieldSchema(name:key, type:string, comment:default), (src)s0.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: dest1_acid_1.value EXPRESSION [(src)s1.null, (src)s2.FieldSchema(name:value, type:string, comment:default), (src)s0.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: dest1_acid_2.key EXPRESSION [(src)s2.FieldSchema(name:key, type:string, comment:default), (src)s0.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: dest1_acid_2.val1 EXPRESSION [(src)s1.null, (src)s2.FieldSchema(name:value, type:string, comment:default), (src)s0.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: dest1_acid_2.val2 EXPRESSION [(src)s1.null, (src)s2.FieldSchema(name:value, type:string, comment:default), (src)s0.FieldSchema(name:value, type:string, comment:default), ]
+PREHOOK: query: select * from DEST1_acid_1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@dest1_acid_1
+#### A masked pattern was here ####
+POSTHOOK: query: select * from DEST1_acid_1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@dest1_acid_1
+#### A masked pattern was here ####
+0 1
+10 1
+100 1
+103 1
+104 1
+105 1
+11 1
+111 1
+113 1
+114 1
+116 1
+118 1
+119 1
+12 1
+120 1
+125 1
+126 1
+128 1
+129 1
+131 1
+133 1
+134 1
+136 1
+137 1
+138 1
+143 1
+145 1
+146 1
+149 1
+15 1
+150 1
+152 1
+153 1
+155 1
+156 1
+157 1
+158 1
+160 1
+162 1
+163 1
+164 1
+165 1
+166 1
+167 1
+168 1
+169 1
+17 1
+170 1
+172 1
+174 1
+175 1
+176 1
+177 1
+178 1
+179 1
+18 1
+180 1
+181 1
+183 1
+186 1
+187 1
+189 1
+19 1
+190 1
+191 1
+192 1
+193 1
+194 1
+195 1
+196 1
+197 1
+199 1
+2 1
+20 1
+200 1
+201 1
+202 1
+203 1
+205 1
+207 1
+208 1
+209 1
+213 1
+214 1
+216 1
+217 1
+218 1
+219 1
+221 1
+222 1
+223 1
+224 1
+226 1
+228 1
+229 1
+230 1
+233 1
+235 1
+237 1
+238 1
+239 1
+24 1
+241 1
+242 1
+244 1
+247 1
+248 1
+249 1
+252 1
+255 1
+256 1
+257 1
+258 1
+26 1
+260 1
+262 1
+263 1
+265 1
+266 1
+27 1
+272 1
+273 1
+274 1
+275 1
+277 1
+278 1
+28 1
+280 1
+281 1
+282 1
+283 1
+284 1
+285 1
+286 1
+287 1
+288 1
+289 1
+291 1
+292 1
+296 1
+298 1
+30 1
+302 1
+305 1
+306 1
+307 1
+308 1
+309 1
+310 1
+311 1
+315 1
+316 1
+317 1
+318 1
+321 1
+322 1
+323 1
+325 1
+327 1
+33 1
+331 1
+332 1
+333 1
+335 1
+336 1
+338 1
+339 1
+34 1
+341 1
+342 1
+344 1
+345 1
+348 1
+35 1
+351 1
+353 1
+356 1
+360 1
+362 1
+364 1
+365 1
+366 1
+367 1
+368 1
+369 1
+37 1
+373 1
+374 1
+375 1
+377 1
+378 1
+379 1
+382 1
+384 1
+386 1
+389 1
+392 1
+393 1
+394 1
+395 1
+396 1
+397 1
+399 1
+4 1
+400 1
+401 1
+402 1
+403 1
+404 1
+406 1
+407 1
+409 1
+41 1
+411 1
+413 1
+414 1
+417 1
+418 1
+419 1
+42 1
+421 1
+424 1
+427 1
+429 1
+43 1
+430 1
+431 1
+432 1
+435 1
+436 1
+437 1
+438 1
+439 1
+44 1
+443 1
+444 1
+446 1
+448 1
+449 1
+452 1
+453 1
+454 1
+455 1
+457 1
+458 1
+459 1
+460 1
+462 1
+463 1
+466 1
+467 1
+468 1
+469 1
+47 1
+470 1
+472 1
+475 1
+477 1
+478 1
+479 1
+480 1
+481 1
+482 1
+483 1
+484 1
+485 1
+487 1
+489 1
+490 1
+491 1
+492 1
+493 1
+494 1
+495 1
+496 1
+497 1
+498 1
+5 1
+51 1
+53 1
+54 1
+57 1
+58 1
+64 1
+65 1
+66 1
+67 1
+69 1
+70 1
+72 1
+74 1
+76 1
+77 1
+78 1
+8 1
+80 1
+82 1
+83 1
+84 1
+85 1
+86 1
+87 1
+9 1
+90 1
+92 1
+95 1
+96 1
+97 1
+98 1
+tst1 1
+PREHOOK: query: select * from DEST1_acid_2
+PREHOOK: type: QUERY
+PREHOOK: Input: default@dest1_acid_2
+#### A masked pattern was here ####
+POSTHOOK: query: select * from DEST1_acid_2
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@dest1_acid_2
+#### A masked pattern was here ####
+0 val_0 1
+10 val_10 1
+100 val_100 1
+103 val_103 1
+104 val_104 1
+105 val_105 1
+11 val_11 1
+111 val_111 1
+113 val_113 1
+114 val_114 1
+116 val_116 1
+118 val_118 1
+119 val_119 1
+12 val_12 1
+120 val_120 1
+125 val_125 1
+126 val_126 1
+128 val_128 1
+129 val_129 1
+131 val_131 1
+133 val_133 1
+134 val_134 1
+136 val_136 1
+137 val_137 1
+138 val_138 1
+143 val_143 1
+145 val_145 1
+146 val_146 1
+149 val_149 1
+15 val_15 1
+150 val_150 1
+152 val_152 1
+153 val_153 1
+155 val_155 1
+156 val_156 1
+157 val_157 1
+158 val_158 1
+160 val_160 1
+162 val_162 1
+163 val_163 1
+164 val_164 1
+165 val_165 1
+166 val_166 1
+167 val_167 1
+168 val_168 1
+169 val_169 1
+17 val_17 1
+170 val_170 1
+172 val_172 1
+174 val_174 1
+175 val_175 1
+176 val_176 1
+177 val_177 1
+178 val_178 1
+179 val_179 1
+18 val_18 1
+180 val_180 1
+181 val_181 1
+183 val_183 1
+186 val_186 1
+187 val_187 1
+189 val_189 1
+19 val_19 1
+190 val_190 1
+191 val_191 1
+192 val_192 1
+193 val_193 1
+194 val_194 1
+195 val_195 1
+196 val_196 1
+197 val_197 1
+199 val_199 1
+2 val_2 1
+20 val_20 1
+200 val_200 1
+201 val_201 1
+202 val_202 1
+203 val_203 1
+205 val_205 1
+207 val_207 1
+208 val_208 1
+209 val_209 1
+213 val_213 1
+214 val_214 1
+216 val_216 1
+217 val_217 1
+218 val_218 1
+219 val_219 1
+221 val_221 1
+222 val_222 1
+223 val_223 1
+224 val_224 1
+226 val_226 1
+228 val_228 1
+229 val_229 1
+230 val_230 1
+233 val_233 1
+235 val_235 1
+237 val_237 1
+238 val_238 1
+239 val_239 1
+24 val_24 1
+241 val_241 1
+242 val_242 1
+244 val_244 1
+247 val_247 1
+248 val_248 1
+249 val_249 1
+252 val_252 1
+255 val_255 1
+256 val_256 1
+257 val_257 1
+258 val_258 1
+26 val_26 1
+260 val_260 1
+262 val_262 1
+263 val_263 1
+265 val_265 1
+266 val_266 1
+27 val_27 1
+272 val_272 1
+273 val_273 1
+274 val_274 1
+275 val_275 1
+277 val_277 1
+278 val_278 1
+28 val_28 1
+280 val_280 1
+281 val_281 1
+282 val_282 1
+283 val_283 1
+284 val_284 1
+285 val_285 1
+286 val_286 1
+287 val_287 1
+288 val_288 1
+289 val_289 1
+291 val_291 1
+292 val_292 1
+296 val_296 1
+298 val_298 1
+30 val_30 1
+302 val_302 1
+305 val_305 1
+306 val_306 1
+307 val_307 1
+308 val_308 1
+309 val_309 1
+310 val_310 1
+311 val_311 1
+315 val_315 1
+316 val_316 1
+317 val_317 1
+318 val_318 1
+321 val_321 1
+322 val_322 1
+323 val_323 1
+325 val_325 1
+327 val_327 1
+33 val_33 1
+331 val_331 1
+332 val_332 1
+333 val_333 1
+335 val_335 1
+336 val_336 1
+338 val_338 1
+339 val_339 1
+34 val_34 1
+341 val_341 1
+342 val_342 1
+344 val_344 1
+345 val_345 1
+348 val_348 1
+35 val_35 1
+351 val_351 1
+353 val_353 1
+356 val_356 1
+360 val_360 1
+362 val_362 1
+364 val_364 1
+365 val_365 1
+366 val_366 1
+367 val_367 1
+368 val_368 1
+369 val_369 1
+37 val_37 1
+373 val_373 1
+374 val_374 1
+375 val_375 1
+377 val_377 1
+378 val_378 1
+379 val_379 1
+382 val_382 1
+384 val_384 1
+386 val_386 1
+389 val_389 1
+392 val_392 1
+393 val_393 1
+394 val_394 1
+395 val_395 1
+396 val_396 1
+397 val_397 1
+399 val_399 1
+4 val_4 1
+400 val_400 1
+401 val_401 1
+402 val_402 1
+403 val_403 1
+404 val_404 1
+406 val_406 1
+407 val_407 1
+409 val_409 1
+41 val_41 1
+411 val_411 1
+413 val_413 1
+414 val_414 1
+417 val_417 1
+418 val_418 1
+419 val_419 1
+42 val_42 1
+421 val_421 1
+424 val_424 1
+427 val_427 1
+429 val_429 1
+43 val_43 1
+430 val_430 1
+431 val_431 1
+432 val_432 1
+435 val_435 1
+436 val_436 1
+437 val_437 1
+438 val_438 1
+439 val_439 1
+44 val_44 1
+443 val_443 1
+444 val_444 1
+446 val_446 1
+448 val_448 1
+449 val_449 1
+452 val_452 1
+453 val_453 1
+454 val_454 1
+455 val_455 1
+457 val_457 1
+458 val_458 1
+459 val_459 1
+460 val_460 1
+462 val_462 1
+463 val_463 1
+466 val_466 1
+467 val_467 1
+468 val_468 1
+469 val_469 1
+47 val_47 1
+470 val_470 1
+472 val_472 1
+475 val_475 1
+477 val_477 1
+478 val_478 1
+479 val_479 1
+480 val_480 1
+481 val_481 1
+482 val_482 1
+483 val_483 1
+484 val_484 1
+485 val_485 1
+487 val_487 1
+489 val_489 1
+490 val_490 1
+491 val_491 1
+492 val_492 1
+493 val_493 1
+494 val_494 1
+495 val_495 1
+496 val_496 1
+497 val_497 1
+498 val_498 1
+5 val_5 1
+51 val_51 1
+53 val_53 1
+54 val_54 1
+57 val_57 1
+58 val_58 1
+64 val_64 1
+65 val_65 1
+66 val_66 1
+67 val_67 1
+69 val_69 1
+70 val_70 1
+72 val_72 1
+74 val_74 1
+76 val_76 1
+77 val_77 1
+78 val_78 1
+8 val_8 1
+80 val_80 1
+82 val_82 1
+83 val_83 1
+84 val_84 1
+85 val_85 1
+86 val_86 1
+87 val_87 1
+9 val_9 1
+90 val_90 1
+92 val_92 1
+95 val_95 1
+96 val_96 1
+97 val_97 1
+98 val_98 1
+tst1 500 1
+PREHOOK: query: FROM (
+ select key, value from src s0
+ UNION all
+ select key, value from (
+ select 'tst1' as key, cast(count(1) as string) as value, 'tst1' as value2 from src s1
+ UNION all
+ select s2.key as key, s2.value as value, 'tst1' as value2 from src s2) unionsub) unionsrc
+INSERT INTO TABLE DEST1_acid_3 SELECT unionsrc.key, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) GROUP BY unionsrc.key
+INSERT INTO TABLE DEST1_acid_4 SELECT unionsrc.key, unionsrc.value, COUNT(DISTINCT SUBSTR(unionsrc.value,5))
+GROUP BY unionsrc.key, unionsrc.value
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@dest1_acid_3
+PREHOOK: Output: default@dest1_acid_4
+POSTHOOK: query: FROM (
+ select key, value from src s0
+ UNION all
+ select key, value from (
+ select 'tst1' as key, cast(count(1) as string) as value, 'tst1' as value2 from src s1
+ UNION all
+ select s2.key as key, s2.value as value, 'tst1' as value2 from src s2) unionsub) unionsrc
+INSERT INTO TABLE DEST1_acid_3 SELECT unionsrc.key, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) GROUP BY unionsrc.key
+INSERT INTO TABLE DEST1_acid_4 SELECT unionsrc.key, unionsrc.value, COUNT(DISTINCT SUBSTR(unionsrc.value,5))
+GROUP BY unionsrc.key, unionsrc.value
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@dest1_acid_3
+POSTHOOK: Output: default@dest1_acid_4
+POSTHOOK: Lineage: dest1_acid_3.key EXPRESSION [(src)s0.FieldSchema(name:key, type:string, comment:default), (src)s2.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: dest1_acid_3.value EXPRESSION [(src)s0.FieldSchema(name:value, type:string, comment:default), (src)s1.null, (src)s2.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: dest1_acid_4.key EXPRESSION [(src)s0.FieldSchema(name:key, type:string, comment:default), (src)s2.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: dest1_acid_4.val1 EXPRESSION [(src)s0.FieldSchema(name:value, type:string, comment:default), (src)s1.null, (src)s2.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: dest1_acid_4.val2 EXPRESSION [(src)s0.FieldSchema(name:value, type:string, comment:default), (src)s1.null, (src)s2.FieldSchema(name:value, type:string, comment:default), ]
+PREHOOK: query: select * from DEST1_acid_3
+PREHOOK: type: QUERY
+PREHOOK: Input: default@dest1_acid_3
+#### A masked pattern was here ####
+POSTHOOK: query: select * from DEST1_acid_3
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@dest1_acid_3
+#### A masked pattern was here ####
+0 1
+10 1
+100 1
+103 1
+104 1
+105 1
+11 1
+111 1
+113 1
+114 1
+116 1
+118 1
+119 1
+12 1
+120 1
+125 1
+126 1
+128 1
+129 1
+131 1
+133 1
+134 1
+136 1
+137 1
+138 1
+143 1
+145 1
+146 1
+149 1
+15 1
+150 1
+152 1
+153 1
+155 1
+156 1
+157 1
+158 1
+160 1
+162 1
+163 1
+164 1
+165 1
+166 1
+167 1
+168 1
+169 1
+17 1
+170 1
+172 1
+174 1
+175 1
+176 1
+177 1
+178 1
+179 1
+18 1
+180 1
+181 1
+183 1
+186 1
+187 1
+189 1
+19 1
+190 1
+191 1
+192 1
+193 1
+194 1
+195 1
+196 1
+197 1
+199 1
+2 1
+20 1
+200 1
+201 1
+202 1
+203 1
+205 1
+207 1
+208 1
+209 1
+213 1
+214 1
+216 1
+217 1
+218 1
+219 1
+221 1
+222 1
+223 1
+224 1
+226 1
+228 1
+229 1
+230 1
+233 1
+235 1
+237 1
+238 1
+239 1
+24 1
+241 1
+242 1
+244 1
+247 1
+248 1
+249 1
+252 1
+255 1
+256 1
+257 1
+258 1
+26 1
+260 1
+262 1
+263 1
+265 1
+266 1
+27 1
+272 1
+273 1
+274 1
+275 1
+277 1
+278 1
+28 1
+280 1
+281 1
+282 1
+283 1
+284 1
+285 1
+286 1
+287 1
+288 1
+289 1
+291 1
+292 1
+296 1
+298 1
+30 1
+302 1
+305 1
+306 1
+307 1
+308 1
+309 1
+310 1
+311 1
+315 1
+316 1
+317 1
+318 1
+321 1
+322 1
+323 1
+325 1
+327 1
+33 1
+331 1
+332 1
+333 1
+335 1
+336 1
+338 1
+339 1
+34 1
+341 1
+342 1
+344 1
+345 1
+348 1
+35 1
+351 1
+353 1
+356 1
+360 1
+362 1
+364 1
+365 1
+366 1
+367 1
+368 1
+369 1
+37 1
+373 1
+374 1
+375 1
+377 1
+378 1
+379 1
+382 1
+384 1
+386 1
+389 1
+392 1
+393 1
+394 1
+395 1
+396 1
+397 1
+399 1
+4 1
+400 1
+401 1
+402 1
+403 1
+404 1
+406 1
+407 1
+409 1
+41 1
+411 1
+413 1
+414 1
+417 1
+418 1
+419 1
+42 1
+421 1
+424 1
+427 1
+429 1
+43 1
+430 1
+431 1
+432 1
+435 1
+436 1
+437 1
+438 1
+439 1
+44 1
+443 1
+444 1
+446 1
+448 1
+449 1
+452 1
+453 1
+454 1
+455 1
+457 1
+458 1
+459 1
+460 1
+462 1
+463 1
+466 1
+467 1
+468 1
+469 1
+47 1
+470 1
+472 1
+475 1
+477 1
+478 1
+479 1
+480 1
+481 1
+482 1
+483 1
+484 1
+485 1
+487 1
+489 1
+490 1
+491 1
+492 1
+493 1
+494 1
+495 1
+496 1
+497 1
+498 1
+5 1
+51 1
+53 1
+54 1
+57 1
+58 1
+64 1
+65 1
+66 1
+67 1
+69 1
+70 1
+72 1
+74 1
+76 1
+77 1
+78 1
+8 1
+80 1
+82 1
+83 1
+84 1
+85 1
+86 1
+87 1
+9 1
+90 1
+92 1
+95 1
+96 1
+97 1
+98 1
+tst1 1
+PREHOOK: query: select * from DEST1_acid_4
+PREHOOK: type: QUERY
+PREHOOK: Input: default@dest1_acid_4
+#### A masked pattern was here ####
+POSTHOOK: query: select * from DEST1_acid_4
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@dest1_acid_4
+#### A masked pattern was here ####
+0 val_0 1
+10 val_10 1
+100 val_100 1
+103 val_103 1
+104 val_104 1
+105 val_105 1
+11 val_11 1
+111 val_111 1
+113 val_113 1
+114 val_114 1
+116 val_116 1
+118 val_118 1
+119 val_119 1
+12 val_12 1
+120 val_120 1
+125 val_125 1
+126 val_126 1
+128 val_128 1
+129 val_129 1
+131 val_131 1
+133 val_133 1
+134 val_134 1
+136 val_136 1
+137 val_137 1
+138 val_138 1
+143 val_143 1
+145 val_145 1
+146 val_146 1
+149 val_149 1
+15 val_15 1
+150 val_150 1
+152 val_152 1
+153 val_153 1
+155 val_155 1
+156 val_156 1
+157 val_157 1
+158 val_158 1
+160 val_160 1
+162 val_162 1
+163 val_163 1
+164 val_164 1
+165 val_165 1
+166 val_166 1
+167 val_167 1
+168 val_168 1
+169 val_169 1
+17 val_17 1
+170 val_170 1
+172 val_172 1
+174 val_174 1
+175 val_175 1
+176 val_176 1
+177 val_177 1
+178 val_178 1
+179 val_179 1
+18 val_18 1
+180 val_180 1
+181 val_181 1
+183 val_183 1
+186 val_186 1
+187 val_187 1
+189 val_189 1
+19 val_19 1
+190 val_190 1
+191 val_191 1
+192 val_192 1
+193 val_193 1
+194 val_194 1
+195 val_195 1
+196 val_196 1
+197 val_197 1
+199 val_199 1
+2 val_2 1
+20 val_20 1
+200 val_200 1
+201 val_201 1
+202 val_202 1
+203 val_203 1
+205 val_205 1
+207 val_207 1
+208 val_208 1
+209 val_209 1
+213 val_213 1
+214 val_214 1
+216 val_216 1
+217 val_217 1
+218 val_218 1
+219 val_219 1
+221 val_221 1
+222 val_222 1
+223 val_223 1
+224 val_224 1
+226 val_226 1
+228 val_228 1
+229 val_229 1
+230 val_230 1
+233 val_233 1
+235 val_235 1
+237 val_237 1
+238 val_238 1
+239 val_239 1
+24 val_24 1
+241 val_241 1
+242 val_242 1
+244 val_244 1
+247 val_247 1
+248 val_248 1
+249 val_249 1
+252 val_252 1
+255 val_255 1
+256 val_256 1
+257 val_257 1
+258 val_258 1
+26 val_26 1
+260 val_260 1
+262 val_262 1
+263 val_263 1
+265 val_265 1
+266 val_266 1
+27 val_27 1
+272 val_272 1
+273 val_273 1
+274 val_274 1
+275 val_275 1
+277 val_277 1
+278 val_278 1
+28 val_28 1
+280 val_280 1
+281 val_281 1
+282 val_282 1
+283 val_283 1
+284 val_284 1
+285 val_285 1
+286 val_286 1
+287 val_287 1
+288 val_288 1
+289 val_289 1
+291 val_291 1
+292 val_292 1
+296 val_296 1
+298 val_298 1
+30 val_30 1
+302 val_302 1
+305 val_305 1
+306 val_306 1
+307 val_307 1
+308 val_308 1
+309 val_309 1
+310 val_310 1
+311 val_311 1
+315 val_315 1
+316 val_316 1
+317 val_317 1
+318 val_318 1
+321 val_321 1
+322 val_322 1
+323 val_323 1
+325 val_325 1
+327 val_327 1
+33 val_33 1
+331 val_331 1
+332 val_332 1
+333 val_333 1
+335 val_335 1
+336 val_336 1
+338 val_338 1
+339 val_339 1
+34 val_34 1
+341 val_341 1
+342 val_342 1
+344 val_344 1
+345 val_345 1
+348 val_348 1
+35 val_35 1
+351 val_351 1
+353 val_353 1
+356 val_356 1
+360 val_360 1
+362 val_362 1
+364 val_364 1
+365 val_365 1
+366 val_366 1
+367 val_367 1
+368 val_368 1
+369 val_369 1
+37 val_37 1
+373 val_373 1
+374 val_374 1
+375 val_375 1
+377 val_377 1
+378 val_378 1
+379 val_379 1
+382 val_382 1
+384 val_384 1
+386 val_386 1
+389 val_389 1
+392 val_392 1
+393 val_393 1
+394 val_394 1
+395 val_395 1
+396 val_396 1
+397 val_397 1
+399 val_399 1
+4 val_4 1
+400 val_400 1
+401 val_401 1
+402 val_402 1
+403 val_403 1
+404 val_404 1
+406 val_406 1
+407 val_407 1
+409 val_409 1
+41 val_41 1
+411 val_411 1
+413 val_413 1
+414 val_414 1
+417 val_417 1
+418 val_418 1
+419 val_419 1
+42 val_42 1
+421 val_421 1
+424 val_424 1
+427 val_427 1
+429 val_429 1
+43 val_43 1
+430 val_430 1
+431 val_431 1
+432 val_432 1
+435 val_435 1
+436 val_436 1
+437 val_437 1
+438 val_438 1
+439 val_439 1
+44 val_44 1
+443 val_443 1
+444 val_444 1
+446 val_446 1
+448 val_448 1
+449 val_449 1
+452 val_452 1
+453 val_453 1
+454 val_454 1
+455 val_455 1
+457 val_457 1
+458 val_458 1
+459 val_459 1
+460 val_460 1
+462 val_462 1
+463 val_463 1
+466 val_466 1
+467 val_467 1
+468 val_468 1
+469 val_469 1
+47 val_47 1
+470 val_470 1
+472 val_472 1
+475 val_475 1
+477 val_477 1
+478 val_478 1
+479 val_479 1
+480 val_480 1
+481 val_481 1
+482 val_482 1
+483 val_483 1
+484 val_484 1
+485 val_485 1
+487 val_487 1
+489 val_489 1
+490 val_490 1
+491 val_491 1
+492 val_492 1
+493 val_493 1
+494 val_494 1
+495 val_495 1
+496 val_496 1
+497 val_497 1
+498 val_498 1
+5 val_5 1
+51 val_51 1
+53 val_53 1
+54 val_54 1
+57 val_57 1
+58 val_58 1
+64 val_64 1
+65 val_65 1
+66 val_66 1
+67 val_67 1
+69 val_69 1
+70 val_70 1
+72 val_72 1
+74 val_74 1
+76 val_76 1
+77 val_77 1
+78 val_78 1
+8 val_8 1
+80 val_80 1
+82 val_82 1
+83 val_83 1
+84 val_84 1
+85 val_85 1
+86 val_86 1
+87 val_87 1
+9 val_9 1
+90 val_90 1
+92 val_92 1
+95 val_95 1
+96 val_96 1
+97 val_97 1
+98 val_98 1
+tst1 500 1
+PREHOOK: query: FROM (
+ select key, value from src s0
+ UNION all
+ select 'tst1' as key, cast(count(1) as string) as value from src s1
+ UNION all
+ select s2.key as key, s2.value as value from src s2) unionsrc
+INSERT INTO TABLE DEST1_acid_5 SELECT unionsrc.key, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) GROUP BY unionsrc.key
+INSERT INTO TABLE DEST1_acid_6 SELECT unionsrc.key, unionsrc.value, COUNT(DISTINCT SUBSTR(unionsrc.value,5))
+GROUP BY unionsrc.key, unionsrc.value
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@dest1_acid_5
+PREHOOK: Output: default@dest1_acid_6
+POSTHOOK: query: FROM (
+ select key, value from src s0
+ UNION all
+ select 'tst1' as key, cast(count(1) as string) as value from src s1
+ UNION all
+ select s2.key as key, s2.value as value from src s2) unionsrc
+INSERT INTO TABLE DEST1_acid_5 SELECT unionsrc.key, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) GROUP BY unionsrc.key
+INSERT INTO TABLE DEST1_acid_6 SELECT unionsrc.key, unionsrc.value, COUNT(DISTINCT SUBSTR(unionsrc.value,5))
+GROUP BY unionsrc.key, unionsrc.value
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@dest1_acid_5
+POSTHOOK: Output: default@dest1_acid_6
+POSTHOOK: Lineage: dest1_acid_5.key EXPRESSION [(src)s0.FieldSchema(name:key, type:string, comment:default), (src)s2.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: dest1_acid_5.value EXPRESSION [(src)s0.FieldSchema(name:value, type:string, comment:default), (src)s1.null, (src)s2.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: dest1_acid_6.key EXPRESSION [(src)s0.FieldSchema(name:key, type:string, comment:default), (src)s2.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: dest1_acid_6.val1 EXPRESSION [(src)s0.FieldSchema(name:value, type:string, comment:default), (src)s1.null, (src)s2.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: dest1_acid_6.val2 EXPRESSION [(src)s0.FieldSchema(name:value, type:string, comment:default), (src)s1.null, (src)s2.FieldSchema(name:value, type:string, comment:default), ]
+PREHOOK: query: select * from DEST1_acid_5
+PREHOOK: type: QUERY
+PREHOOK: Input: default@dest1_acid_5
+#### A masked pattern was here ####
+POSTHOOK: query: select * from DEST1_acid_5
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@dest1_acid_5
+#### A masked pattern was here ####
+0 1
+10 1
+100 1
+103 1
+104 1
+105 1
+11 1
+111 1
+113 1
+114 1
+116 1
+118 1
+119 1
+12 1
+120 1
+125 1
+126 1
+128 1
+129 1
+131 1
+133 1
+134 1
+136 1
+137 1
+138 1
+143 1
+145 1
+146 1
+149 1
+15 1
+150 1
+152 1
+153 1
+155 1
+156 1
+157 1
+158 1
+160 1
+162 1
+163 1
+164 1
+165 1
+166 1
+167 1
+168 1
+169 1
+17 1
+170 1
+172 1
+174 1
+175 1
+176 1
+177 1
+178 1
+179 1
+18 1
+180 1
+181 1
+183 1
+186 1
+187 1
+189 1
+19 1
+190 1
+191 1
+192 1
+193 1
+194 1
+195 1
+196 1
+197 1
+199 1
+2 1
+20 1
+200 1
+201 1
+202 1
+203 1
+205 1
+207 1
+208 1
+209 1
+213 1
+214 1
+216 1
+217 1
+218 1
+219 1
+221 1
+222 1
+223 1
+224 1
+226 1
+228 1
+229 1
+230 1
+233 1
+235 1
+237 1
+238 1
+239 1
+24 1
+241 1
+242 1
+244 1
+247 1
+248 1
+249 1
+252 1
+255 1
+256 1
+257 1
+258 1
+26 1
+260 1
+262 1
+263 1
+265 1
+266 1
+27 1
+272 1
+273 1
+274 1
+275 1
+277 1
+278 1
+28 1
+280 1
+281 1
+282 1
+283 1
+284 1
+285 1
+286 1
+287 1
+288 1
+289 1
+291 1
+292 1
+296 1
+298 1
+30 1
+302 1
+305 1
+306 1
+307 1
+308 1
+309 1
+310 1
+311 1
+315 1
+316 1
+317 1
+318 1
+321 1
+322 1
+323 1
+325 1
+327 1
+33 1
+331 1
+332 1
+333 1
+335 1
+336 1
+338 1
+339 1
+34 1
+341 1
+342 1
+344 1
+345 1
+348 1
+35 1
+351 1
+353 1
+356 1
+360 1
+362 1
+364 1
+365 1
+366 1
+367 1
+368 1
+369 1
+37 1
+373 1
+374 1
+375 1
+377 1
+378 1
+379 1
+382 1
+384 1
+386 1
+389 1
+392 1
+393 1
+394 1
+395 1
+396 1
+397 1
+399 1
+4 1
+400 1
+401 1
+402 1
+403 1
+404 1
+406 1
+407 1
+409 1
+41 1
+411 1
+413 1
+414 1
+417 1
+418 1
+419 1
+42 1
+421 1
+424 1
+427 1
+429 1
+43 1
+430 1
+431 1
+432 1
+435 1
+436 1
+437 1
+438 1
+439 1
+44 1
+443 1
+444 1
+446 1
+448 1
+449 1
+452 1
+453 1
+454 1
+455 1
+457 1
+458 1
+459 1
+460 1
+462 1
+463 1
+466 1
+467 1
+468 1
+469 1
+47 1
+470 1
+472 1
+475 1
+477 1
+478 1
+479 1
+480 1
+481 1
+482 1
+483 1
+484 1
+485 1
+487 1
+489 1
+490 1
+491 1
+492 1
+493 1
+494 1
+495 1
+496 1
+497 1
+498 1
+5 1
+51 1
+53 1
+54 1
+57 1
+58 1
+64 1
+65 1
+66 1
+67 1
+69 1
+70 1
+72 1
+74 1
+76 1
+77 1
+78 1
+8 1
+80 1
+82 1
+83 1
+84 1
+85 1
+86 1
+87 1
+9 1
+90 1
+92 1
+95 1
+96 1
+97 1
+98 1
+tst1 1
+PREHOOK: query: select * from DEST1_acid_6
+PREHOOK: type: QUERY
+PREHOOK: Input: default@dest1_acid_6
+#### A masked pattern was here ####
+POSTHOOK: query: select * from DEST1_acid_6
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@dest1_acid_6
+#### A masked pattern was here ####
+0 val_0 1
+10 val_10 1
+100 val_100 1
+103 val_103 1
+104 val_104 1
+105 val_105 1
+11 val_11 1
+111 val_111 1
+113 val_113 1
+114 val_114 1
+116 val_116 1
+118 val_118 1
+119 val_119 1
+12 val_12 1
+120 val_120 1
+125 val_125 1
+126 val_126 1
+128 val_128 1
+129 val_129 1
+131 val_131 1
+133 val_133 1
+134 val_134 1
+136 val_136 1
+137 val_137 1
+138 val_138 1
+143 val_143 1
+145 val_145 1
+146 val_146 1
+149 val_149 1
+15 val_15 1
+150 val_150 1
+152 val_152 1
+153 val_153 1
+155 val_155 1
+156 val_156 1
+157 val_157 1
+158 val_158 1
+160 val_160 1
+162 val_162 1
+163 val_163 1
+164 val_164 1
+165 val_165 1
+166 val_166 1
+167 val_167 1
+168 val_168 1
+169 val_169 1
+17 val_17 1
+170 val_170 1
+172 val_172 1
+174 val_174 1
+175 val_175 1
+176 val_176 1
+177 val_177 1
+178 val_178 1
+179 val_179 1
+18 val_18 1
+180 val_180 1
+181 val_181 1
+183 val_183 1
+186 val_186 1
+187 val_187 1
+189 val_189 1
+19 val_19 1
+190 val_190 1
+191 val_191 1
+192 val_192 1
+193 val_193 1
+194 val_194 1
+195 val_195 1
+196 val_196 1
+197 val_197 1
+199 val_199 1
+2 val_2 1
+20 val_20 1
+200 val_200 1
+201 val_201 1
+202 val_202 1
+203 val_203 1
+205 val_205 1
+207 val_207 1
+208 val_208 1
+209 val_209 1
+213 val_213 1
+214 val_214 1
+216 val_216 1
+217 val_217 1
+218 val_218 1
+219 val_219 1
+221 val_221 1
+222 val_222 1
+223 val_223 1
+224 val_224 1
+226 val_226 1
+228 val_228 1
+229 val_229 1
+230 val_230 1
+233 val_233 1
+235 val_235 1
+237 val_237 1
+238 val_238 1
+239 val_239 1
+24 val_24 1
+241 val_241 1
+242 val_242 1
+244 val_244 1
+247 val_247 1
+248 val_248 1
+249 val_249 1
+252 val_252 1
+255 val_255 1
+256 val_256 1
+257 val_257 1
+258 val_258 1
+26 val_26 1
+260 val_260 1
+262 val_262 1
+263 val_263 1
+265 val_265 1
+266 val_266 1
+27 val_27 1
+272 val_272 1
+273 val_273 1
+274 val_274 1
+275 val_275 1
+277 val_277 1
+278 val_278 1
+28 val_28 1
+280 val_280 1
+281 val_281 1
+282 val_282 1
+283 val_283 1
+284 val_284 1
+285 val_285 1
+286 val_286 1
+287 val_287 1
+288 val_288 1
+289 val_289 1
+291 val_291 1
+292 val_292 1
+296 val_296 1
+298 val_298 1
+30 val_30 1
+302 val_302 1
+305 val_305 1
+306 val_306 1
+307 val_307 1
+308 val_308 1
+309 val_309 1
+310 val_310 1
+311 val_311 1
+315 val_315 1
+316 val_316 1
+317 val_317 1
+318 val_318 1
+321 val_321 1
+322 val_322 1
+323 val_323 1
+325 val_325 1
+327 val_327 1
+33 val_33 1
+331 val_331 1
+332 val_332 1
+333 val_333 1
+335 val_335 1
+336 val_336 1
+338 val_338 1
+339 val_339 1
+34 val_34 1
+341 val_341 1
+342 val_342 1
+344 val_344 1
+345 val_345 1
+348 val_348 1
+35 val_35 1
+351 val_351 1
+353 val_353 1
+356 val_356 1
+360 val_360 1
+362 val_362 1
+364 val_364 1
+365 val_365 1
+366 val_366 1
+367 val_367 1
+368 val_368 1
+369 val_369 1
+37 val_37 1
+373 val_373 1
+374 val_374 1
+375 val_375 1
+377 val_377 1
+378 val_378 1
+379 val_379 1
+382 val_382 1
+384 val_384 1
+386 val_386 1
+389 val_389 1
+392 val_392 1
+393 val_393 1
+394 val_394 1
+395 val_395 1
+396 val_396 1
+397 val_397 1
+399 val_399 1
+4 val_4 1
+400 val_400 1
+401 val_401 1
+402 val_402 1
+403 val_403 1
+404 val_404 1
+406 val_406 1
+407 val_407 1
+409 val_409 1
+41 val_41 1
+411 val_411 1
+413 val_413 1
+414 val_414 1
+417 val_417 1
+418 val_418 1
+419 val_419 1
+42 val_42 1
+421 val_421 1
+424 val_424 1
+427 val_427 1
+429 val_429 1
+43 val_43 1
+430 val_430 1
+431 val_431 1
+432 val_432 1
+435 val_435 1
+436 val_436 1
+437 val_437 1
+438 val_438 1
+439 val_439 1
+44 val_44 1
+443 val_443 1
+444 val_444 1
+446 val_446 1
+448 val_448 1
+449 val_449 1
+452 val_452 1
+453 val_453 1
+454 val_454 1
+455 val_455 1
+457 val_457 1
+458 val_458 1
+459 val_459 1
+460 val_460 1
+462 val_462 1
+463 val_463 1
+466 val_466 1
+467 val_467 1
+468 val_468 1
+469 val_469 1
+47 val_47 1
+470 val_470 1
+472 val_472 1
+475 val_475 1
+477 val_477 1
+478 val_478 1
+479 val_479 1
+480 val_480 1
+481 val_481 1
+482 val_482 1
+483 val_483 1
+484 val_484 1
+485 val_485 1
+487 val_487 1
+489 val_489 1
+490 val_490 1
+491 val_491 1
+492 val_492 1
+493 val_493 1
+494 val_494 1
+495 val_495 1
+496 val_496 1
+497 val_497 1
+498 val_498 1
+5 val_5 1
+51 val_51 1
+53 val_53 1
+54 val_54 1
+57 val_57 1
+58 val_58 1
+64 val_64 1
+65 val_65 1
+66 val_66 1
+67 val_67 1
+69 val_69 1
+70 val_70 1
+72 val_72 1
+74 val_74 1
+76 val_76 1
+77 val_77 1
+78 val_78 1
+8 val_8 1
+80 val_80 1
+82 val_82 1
+83 val_83 1
+84 val_84 1
+85 val_85 1
+86 val_86 1
+87 val_87 1
+9 val_9 1
+90 val_90 1
+92 val_92 1
+95 val_95 1
+96 val_96 1
+97 val_97 1
+98 val_98 1
+tst1 500 1
+PREHOOK: query: FROM (select 'tst1' as key, cast(count(1) as string) as value from src s1
+ UNION all
+ select s2.key as key, s2.value as value from src s2) unionsrc
+INSERT INTO TABLE DEST1_acid_7 SELECT unionsrc.key, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) GROUP BY unionsrc.key
+INSERT INTO TABLE DEST1_acid_8 SELECT unionsrc.key, unionsrc.value, COUNT(DISTINCT SUBSTR(unionsrc.value,5))
+GROUP BY unionsrc.key, unionsrc.value
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@dest1_acid_7
+PREHOOK: Output: default@dest1_acid_8
+POSTHOOK: query: FROM (select 'tst1' as key, cast(count(1) as string) as value from src s1
+ UNION all
+ select s2.key as key, s2.value as value from src s2) unionsrc
+INSERT INTO TABLE DEST1_acid_7 SELECT unionsrc.key, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) GROUP BY unionsrc.key
+INSERT INTO TABLE DEST1_acid_8 SELECT unionsrc.key, unionsrc.value, COUNT(DISTINCT SUBSTR(unionsrc.value,5))
+GROUP BY unionsrc.key, unionsrc.value
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@dest1_acid_7
+POSTHOOK: Output: default@dest1_acid_8
+POSTHOOK: Lineage: dest1_acid_7.key EXPRESSION [(src)s2.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: dest1_acid_7.value EXPRESSION [(src)s1.null, (src)s2.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: dest1_acid_8.key EXPRESSION [(src)s2.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: dest1_acid_8.val1 EXPRESSION [(src)s1.null, (src)s2.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: dest1_acid_8.val2 EXPRESSION [(src)s1.null, (src)s2.FieldSchema(name:value, type:string, comment:default), ]
+PREHOOK: query: select * from DEST1_acid_7
+PREHOOK: type: QUERY
+PREHOOK: Input: default@dest1_acid_7
+#### A masked pattern was here ####
+POSTHOOK: query: select * from DEST1_acid_7
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@dest1_acid_7
+#### A masked pattern was here ####
+0 1
+10 1
+100 1
+103 1
+104 1
+105 1
+11 1
+111 1
+113 1
+114 1
+116 1
+118 1
+119 1
+12 1
+120 1
+125 1
+126 1
+128 1
+129 1
+131 1
+133 1
+134 1
+136 1
+137 1
+138 1
+143 1
+145 1
+146 1
+149 1
+15 1
+150 1
+152 1
+153 1
+155 1
+156 1
+157 1
+158 1
+160 1
+162 1
+163 1
+164 1
+165 1
+166 1
+167 1
+168 1
+169 1
+17 1
+170 1
+172 1
+174 1
+175 1
+176 1
+177 1
+178 1
+179 1
+18 1
+180 1
+181 1
+183 1
+186 1
+187 1
+189 1
+19 1
+190 1
+191 1
+192 1
+193 1
+194 1
+195 1
+196 1
+197 1
+199 1
+2 1
+20 1
+200 1
+201 1
+202 1
+203 1
+205 1
+207 1
+208 1
+209 1
+213 1
+214 1
+216 1
+217 1
+218 1
+219 1
+221 1
+222 1
+223 1
+224 1
+226 1
+228 1
+229 1
+230 1
+233 1
+235 1
+237 1
+238 1
+239 1
+24 1
+241 1
+242 1
+244 1
+247 1
+248 1
+249 1
+252 1
+255 1
+256 1
+257 1
+258 1
+26 1
+260 1
+262 1
+263 1
+265 1
+266 1
+27 1
+272 1
+273 1
+274 1
+275 1
+277 1
+278 1
+28 1
+280 1
+281 1
+282 1
+283 1
+284 1
+285 1
+286 1
+287 1
+288 1
+289 1
+291 1
+292 1
+296 1
+298 1
+30 1
+302 1
+305 1
+306 1
+307 1
+308 1
+309 1
+310 1
+311 1
+315 1
+316 1
+317 1
+318 1
+321 1
+322 1
+323 1
+325 1
+327 1
+33 1
+331 1
+332 1
+333 1
+335 1
+336 1
+338 1
+339 1
+34 1
+341 1
+342 1
+344 1
+345 1
+348 1
+35 1
+351 1
+353 1
+356 1
+360 1
+362 1
+364 1
+365 1
+366 1
+367 1
+368 1
+369 1
+37 1
+373 1
+374 1
+375 1
+377 1
+378 1
+379 1
+382 1
+384 1
+386 1
+389 1
+392 1
+393 1
+394 1
+395 1
+396 1
+397 1
+399 1
+4 1
+400 1
+401 1
+402 1
+403 1
+404 1
+406 1
+407 1
+409 1
+41 1
+411 1
+413 1
+414 1
+417 1
+418 1
+419 1
+42 1
+421 1
+424 1
+427 1
+429 1
+43 1
+430 1
+431 1
+432 1
+435 1
+436 1
+437 1
+438 1
+439 1
+44 1
+443 1
+444 1
+446 1
+448 1
+449 1
+452 1
+453 1
+454 1
+455 1
+457 1
+458 1
+459 1
+460 1
+462 1
+463 1
+466 1
+467 1
+468 1
+469 1
+47 1
+470 1
+472 1
+475 1
+477 1
+478 1
+479 1
+480 1
+481 1
+482 1
+483 1
+484 1
+485 1
+487 1
+489 1
+490 1
+491 1
+492 1
+493 1
+494 1
+495 1
+496 1
+497 1
+498 1
+5 1
+51 1
+53 1
+54 1
+57 1
+58 1
+64 1
+65 1
+66 1
+67 1
+69 1
+70 1
+72 1
+74 1
+76 1
+77 1
+78 1
+8 1
+80 1
+82 1
+83 1
+84 1
+85 1
+86 1
+87 1
+9 1
+90 1
+92 1
+95 1
+96 1
+97 1
+98 1
+tst1 1
+PREHOOK: query: select * from DEST1_acid_8
+PREHOOK: type: QUERY
+PREHOOK: Input: default@dest1_acid_8
+#### A masked pattern was here ####
+POSTHOOK: query: select * from DEST1_acid_8
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@dest1_acid_8
+#### A masked pattern was here ####
+0 val_0 1
+10 val_10 1
+100 val_100 1
+103 val_103 1
+104 val_104 1
+105 val_105 1
+11 val_11 1
+111 val_111 1
+113 val_113 1
+114 val_114 1
+116 val_116 1
+118 val_118 1
+119 val_119 1
+12 val_12 1
+120 val_120 1
+125 val_125 1
+126 val_126 1
+128 val_128 1
+129 val_129 1
+131 val_131 1
+133 val_133 1
+134 val_134 1
+136 val_136 1
+137 val_137 1
+138 val_138 1
+143 val_143 1
+145 val_145 1
+146 val_146 1
+149 val_149 1
+15 val_15 1
+150 val_150 1
+152 val_152 1
+153 val_153 1
+155 val_155 1
+156 val_156 1
+157 val_157 1
+158 val_158 1
+160 val_160 1
+162 val_162 1
+163 val_163 1
+164 val_164 1
+165 val_165 1
+166 val_166 1
+167 val_167 1
+168 val_168 1
+169 val_169 1
+17 val_17 1
+170 val_170 1
+172 val_172 1
+174 val_174 1
+175 val_175 1
+176 val_176 1
+177 val_177 1
+178 val_178 1
+179 val_179 1
+18 val_18 1
+180 val_180 1
+181 val_181 1
+183 val_183 1
+186 val_186 1
+187 val_187 1
+189 val_189 1
+19 val_19 1
+190 val_190 1
+191 val_191 1
+192 val_192 1
+193 val_193 1
+194 val_194 1
+195 val_195 1
+196 val_196 1
+197 val_197 1
+199 val_199 1
+2 val_2 1
+20 val_20 1
+200 val_200 1
+201 val_201 1
+202 val_202 1
+203 val_203 1
+205 val_205 1
+207 val_207 1
+208 val_208 1
+209 val_209 1
+213 val_213 1
+214 val_214 1
+216 val_216 1
+217 val_217 1
+218 val_218 1
+219 val_219 1
+221 val_221 1
+222 val_222 1
+223 val_223 1
+224 val_224 1
+226 val_226 1
+228 val_228 1
+229 val_229 1
+230 val_230 1
+233 val_233 1
+235 val_235 1
+237 val_237 1
+238 val_238 1
+239 val_239 1
+24 val_24 1
+241 val_241 1
+242 val_242 1
+244 val_244 1
+247 val_247 1
+248 val_248 1
+249 val_249 1
+252 val_252 1
+255 val_255 1
+256 val_256 1
+257 val_257 1
+258 val_258 1
+26 val_26 1
+260 val_260 1
+262 val_262 1
+263 val_263 1
+265 val_265 1
+266 val_266 1
+27 val_27 1
+272 val_272 1
+273 val_273 1
+274 val_274 1
+275 val_275 1
+277 val_277 1
+278 val_278 1
+28 val_28 1
+280 val_280 1
+281 val_281 1
+282 val_282 1
+283 val_283 1
+284 val_284 1
+285 val_285 1
+286 val_286 1
+287 val_287 1
+288 val_288 1
+289 val_289 1
+291 val_291 1
+292 val_292 1
+296 val_296 1
+298 val_298 1
+30 val_30 1
+302 val_302 1
+305 val_305 1
+306 val_306 1
+307 val_307 1
+308 val_308 1
+309 val_309 1
+310 val_310 1
+311 val_311 1
+315 val_315 1
+316 val_316 1
+317 val_317 1
+318 val_318 1
+321 val_321 1
+322 val_322 1
+323 val_323 1
+325 val_325 1
+327 val_327 1
+33 val_33 1
+331 val_331 1
+332 val_332 1
+333 val_333 1
+335 val_335 1
+336 val_336 1
+338 val_338 1
+339 val_339 1
+34 val_34 1
+341 val_341 1
+342 val_342 1
+344 val_344 1
+345 val_345 1
+348 val_348 1
+35 val_35 1
+351 val_351 1
+353 val_353 1
+356 val_356 1
+360 val_360 1
+362 val_362 1
+364 val_364 1
+365 val_365 1
+366 val_366 1
+367 val_367 1
+368 val_368 1
+369 val_369 1
+37 val_37 1
+373 val_373 1
+374 val_374 1
+375 val_375 1
+377 val_377 1
+378 val_378 1
+379 val_379 1
+382 val_382 1
+384 val_384 1
+386 val_386 1
+389 val_389 1
+392 val_392 1
+393 val_393 1
+394 val_394 1
+395 val_395 1
+396 val_396 1
+397 val_397 1
+399 val_399 1
+4 val_4 1
+400 val_400 1
+401 val_401 1
+402 val_402 1
+403 val_403 1
+404 val_404 1
+406 val_406 1
+407 val_407 1
+409 val_409 1
+41 val_41 1
+411 val_411 1
+413 val_413 1
+414 val_414 1
+417 val_417 1
+418 val_418 1
+419 val_419 1
+42 val_42 1
+421 val_421 1
+424 val_424 1
+427 val_427 1
+429 val_429 1
+43 val_43 1
+430 val_430 1
+431 val_431 1
+432 val_432 1
+435 val_435 1
+436 val_436 1
+437 val_437 1
+438 val_438 1
+439 val_439 1
+44 val_44 1
+443 val_443 1
+444 val_444 1
+446 val_446 1
+448 val_448 1
+449 val_449 1
+452 val_452 1
+453 val_453 1
+454 val_454 1
+455 val_455 1
+457 val_457 1
+458 val_458 1
+459 val_459 1
+460 val_460 1
+462 val_462 1
+463 val_463 1
+466 val_466 1
+467 val_467 1
+468 val_468 1
+469 val_469 1
+47 val_47 1
+470 val_470 1
+472 val_472 1
+475 val_475 1
+477 val_477 1
+478 val_478 1
+479 val_479 1
+480 val_480 1
+481 val_481 1
+482 val_482 1
+483 val_483 1
+484 val_484 1
+485 val_485 1
+487 val_487 1
+489 val_489 1
+490 val_490 1
+491 val_491 1
+492 val_492 1
+493 val_493 1
+494 val_494 1
+495 val_495 1
+496 val_496 1
+497 val_497 1
+498 val_498 1
+5 val_5 1
+51 val_51 1
+53 val_53 1
+54 val_54 1
+57 val_57 1
+58 val_58 1
+64 val_64 1
+65 val_65 1
+66 val_66 1
+67 val_67 1
+69 val_69 1
+70 val_70 1
+72 val_72 1
+74 val_74 1
+76 val_76 1
+77 val_77 1
+78 val_78 1
+8 val_8 1
+80 val_80 1
+82 val_82 1
+83 val_83 1
+84 val_84 1
+85 val_85 1
+86 val_86 1
+87 val_87 1
+9 val_9 1
+90 val_90 1
+92 val_92 1
+95 val_95 1
+96 val_96 1
+97 val_97 1
+98 val_98 1
+tst1 500 1
+PREHOOK: query: FROM (select 'tst1' as key, cast(count(1) as string) as value from src s1
+ UNION distinct
+ select s2.key as key, s2.value as value from src s2) unionsrc
+INSERT INTO TABLE DEST1_acid_9 SELECT unionsrc.key, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) GROUP BY unionsrc.key
+INSERT INTO TABLE DEST1_acid_10 SELECT unionsrc.key, unionsrc.value, COUNT(DISTINCT SUBSTR(unionsrc.value,5))
+GROUP BY unionsrc.key, unionsrc.value
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@dest1_acid_10
+PREHOOK: Output: default@dest1_acid_9
+POSTHOOK: query: FROM (select 'tst1' as key, cast(count(1) as string) as value from src s1
+ UNION distinct
+ select s2.key as key, s2.value as value from src s2) unionsrc
+INSERT INTO TABLE DEST1_acid_9 SELECT unionsrc.key, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) GROUP BY unionsrc.key
+INSERT INTO TABLE DEST1_acid_10 SELECT unionsrc.key, unionsrc.value, COUNT(DISTINCT SUBSTR(unionsrc.value,5))
+GROUP BY unionsrc.key, unionsrc.value
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@dest1_acid_10
+POSTHOOK: Output: default@dest1_acid_9
+POSTHOOK: Lineage: dest1_acid_10.key EXPRESSION [(src)s2.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: dest1_acid_10.val1 EXPRESSION [(src)s1.null, (src)s2.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: dest1_acid_10.val2 EXPRESSION [(src)s1.null, (src)s2.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: dest1_acid_9.key EXPRESSION [(src)s2.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: dest1_acid_9.value EXPRESSION [(src)s1.null, (src)s2.FieldSchema(name:value, type:string, comment:default), ]
+PREHOOK: query: select * from DEST1_acid_9
+PREHOOK: type: QUERY
+PREHOOK: Input: default@dest1_acid_9
+#### A masked pattern was here ####
+POSTHOOK: query: select * from DEST1_acid_9
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@dest1_acid_9
+#### A masked pattern was here ####
+0 1
+10 1
+100 1
+103 1
+104 1
+105 1
+11 1
+111 1
+113 1
+114 1
+116 1
+118 1
+119 1
+12 1
+120 1
+125 1
+126 1
+128 1
+129 1
+131 1
+133 1
+134 1
+136 1
+137 1
+138 1
+143 1
+145 1
+146 1
+149 1
+15 1
+150 1
+152 1
+153 1
+155 1
+156 1
+157 1
+158 1
+160 1
+162 1
+163 1
+164 1
+165 1
+166 1
+167 1
+168 1
+169 1
+17 1
+170 1
+172 1
+174 1
+175 1
+176 1
+177 1
+178 1
+179 1
+18 1
+180 1
+181 1
+183 1
+186 1
+187 1
+189 1
+19 1
+190 1
+191 1
+192 1
+193 1
+194 1
+195 1
+196 1
+197 1
+199 1
+2 1
+20 1
+200 1
+201 1
+202 1
+203 1
+205 1
+207 1
+208 1
+209 1
+213 1
+214 1
+216 1
+217 1
+218 1
+219 1
+221 1
+222 1
+223 1
+224 1
+226 1
+228 1
+229 1
+230 1
+233 1
+235 1
+237 1
+238 1
+239 1
+24 1
+241 1
+242 1
+244 1
+247 1
+248 1
+249 1
+252 1
+255 1
+256 1
+257 1
+258 1
+26 1
+260 1
+262 1
+263 1
+265 1
+266 1
+27 1
+272 1
+273 1
+274 1
+275 1
+277 1
+278 1
+28 1
+280 1
+281 1
+282 1
+283 1
+284 1
+285 1
+286 1
+287 1
+288 1
+289 1
+291 1
+292 1
+296 1
+298 1
+30 1
+302 1
+305 1
+306 1
+307 1
+308 1
+309 1
+310 1
+311 1
+315 1
+316 1
+317 1
+318 1
+321 1
+322 1
+323 1
+325 1
+327 1
+33 1
+331 1
+332 1
+333 1
+335 1
+336 1
+338 1
+339 1
+34 1
+341 1
+342 1
+344 1
+345 1
+348 1
+35 1
+351 1
+353 1
+356 1
+360 1
+362 1
+364 1
+365 1
+366 1
+367 1
+368 1
+369 1
+37 1
+373 1
+374 1
+375 1
+377 1
+378 1
+379 1
+382 1
+384 1
+386 1
+389 1
+392 1
+393 1
+394 1
+395 1
+396 1
+397 1
+399 1
+4 1
+400 1
+401 1
+402 1
+403 1
+404 1
+406 1
+407 1
+409 1
+41 1
+411 1
+413 1
+414 1
+417 1
+418 1
+419 1
+42 1
+421 1
+424 1
+427 1
+429 1
+43 1
+430 1
+431 1
+432 1
+435 1
+436 1
+437 1
+438 1
+439 1
+44 1
+443 1
+444 1
+446 1
+448 1
+449 1
+452 1
+453 1
+454 1
+455 1
+457 1
+458 1
+459 1
+460 1
+462 1
+463 1
+466 1
+467 1
+468 1
+469 1
+47 1
+470 1
+472 1
+475 1
+477 1
+478 1
+479 1
+480 1
+481 1
+482 1
+483 1
+484 1
+485 1
+487 1
+489 1
+490 1
+491 1
+492 1
+493 1
+494 1
+495 1
+496 1
+497 1
+498 1
+5 1
+51 1
+53 1
+54 1
+57 1
+58 1
+64 1
+65 1
+66 1
+67 1
+69 1
+70 1
+72 1
+74 1
+76 1
+77 1
+78 1
+8 1
+80 1
+82 1
+83 1
+84 1
+85 1
+86 1
+87 1
+9 1
+90 1
+92 1
+95 1
+96 1
+97 1
+98 1
+tst1 1
+PREHOOK: query: select * from DEST1_acid_10
+PREHOOK: type: QUERY
+PREHOOK: Input: default@dest1_acid_10
+#### A masked pattern was here ####
+POSTHOOK: query: select * from DEST1_acid_10
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@dest1_acid_10
+#### A masked pattern was here ####
+0 val_0 1
+10 val_10 1
+100 val_100 1
+103 val_103 1
+104 val_104 1
+105 val_105 1
+11 val_11 1
+111 val_111 1
+113 val_113 1
+114 val_114 1
+116 val_116 1
+118 val_118 1
+119 val_119 1
+12 val_12 1
+120 val_120 1
+125 val_125 1
+126 val_126 1
+128 val_128 1
+129 val_129 1
+131 val_131 1
+133 val_133 1
+134 val_134 1
+136 val_136 1
+137 val_137 1
+138 val_138 1
+143 val_143 1
+145 val_145 1
+146 val_146 1
+149 val_149 1
+15 val_15 1
+150 val_150 1
+152 val_152 1
+153 val_153 1
+155 val_155 1
+156 val_156 1
+157 val_157 1
+158 val_158 1
+160 val_160 1
+162 val_162 1
+163 val_163 1
+164 val_164 1
+165 val_165 1
+166 val_166 1
+167 val_167 1
+168 val_168 1
+169 val_169 1
+17 val_17 1
+170 val_170 1
+172 val_172 1
+174 val_174 1
+175 val_175 1
+176 val_176 1
+177 val_177 1
+178 val_178 1
+179 val_179 1
+18 val_18 1
+180 val_180 1
+181 val_181 1
+183 val_183 1
+186 val_186 1
+187 val_187 1
+189 val_189 1
+19 val_19 1
+190 val_190 1
+191 val_191 1
+192 val_192 1
+193 val_193 1
+194 val_194 1
+195 val_195 1
+196 val_196 1
+197 val_197 1
+199 val_199 1
+2 val_2 1
+20 val_20 1
+200 val_200 1
+201 val_201 1
+202 val_202 1
+203 val_203 1
+205 val_205 1
+207 val_207 1
+208 val_208 1
+209 val_209 1
+213 val_213 1
+214 val_214 1
+216 val_216 1
+217 val_217 1
+218 val_218 1
+219 val_219 1
+221 val_221 1
+222 val_222 1
+223 val_223 1
+224 val_224 1
+226 val_226 1
+228 val_228 1
+229 val_229 1
+230 val_230 1
+233 val_233 1
+235 val_235 1
+237 val_237 1
+238 val_238 1
+239 val_239 1
+24 val_24 1
+241 val_241 1
+242 val_242 1
+244 val_244 1
+247 val_247 1
+248 val_248 1
+249 val_249 1
+252 val_252 1
+255 val_255 1
+256 val_256 1
+257 val_257 1
+258 val_258 1
+26 val_26 1
+260 val_260 1
+262 val_262 1
+263 val_263 1
+265 val_265 1
+266 val_266 1
+27 val_27 1
+272 val_272 1
+273 val_273 1
+274 val_274 1
+275 val_275 1
+277 val_277 1
+278 val_278 1
+28 val_28 1
+280 val_280 1
+281 val_281 1
+282 val_282 1
+283 val_283 1
+284 val_284 1
+285 val_285 1
+286 val_286 1
+287 val_287 1
+288 val_288 1
+289 val_289 1
+291 val_291 1
+292 val_292 1
+296 val_296 1
+298 val_298 1
+30 val_30 1
+302 val_302 1
+305 val_305 1
+306 val_306 1
+307 val_307 1
+308 val_308 1
+309 val_309 1
+310 val_310 1
+311 val_311 1
+315 val_315 1
+316 val_316 1
+317 val_317 1
+318 val_318 1
+321 val_321 1
+322 val_322 1
+323 val_323 1
+325 val_325 1
+327 val_327 1
+33 val_33 1
+331 val_331 1
+332 val_332 1
+333 val_333 1
+335 val_335 1
+336 val_336 1
+338 val_338 1
+339 val_339 1
+34 val_34 1
+341 val_341 1
+342 val_342 1
+344 val_344 1
+345 val_345 1
+348 val_348 1
+35 val_35 1
+351 val_351 1
+353 val_353 1
+356 val_356 1
+360 val_360 1
+362 val_362 1
+364 val_364 1
+365 val_365 1
+366 val_366 1
+367 val_367 1
+368 val_368 1
+369 val_369 1
+37 val_37 1
+373 val_373 1
+374 val_374 1
+375 val_375 1
+377 val_377 1
+378 val_378 1
+379 val_379 1
+382 val_382 1
+384 val_384 1
+386 val_386 1
+389 val_389 1
+392 val_392 1
+393 val_393 1
+394 val_394 1
+395 val_395 1
+396 val_396 1
+397 val_397 1
+399 val_399 1
+4 val_4 1
+400 val_400 1
+401 val_401 1
+402 val_402 1
+403 val_403 1
+404 val_404 1
+406 val_406 1
+407 val_407 1
+409 val_409 1
+41 val_41 1
+411 val_411 1
+413 val_413 1
+414 val_414 1
+417 val_417 1
+418 val_418 1
+419 val_419 1
+42 val_42 1
+421 val_421 1
+424 val_424 1
+427 val_427 1
+429 val_429 1
+43 val_43 1
+430 val_430 1
+431 val_431 1
+432 val_432 1
+435 val_435 1
+436 val_436 1
+437 val_437 1
+438 val_438 1
+439 val_439 1
+44 val_44 1
+443 val_443 1
+444 val_444 1
+446 val_446 1
+448 val_448 1
+449 val_449 1
+452 val_452 1
+453 val_453 1
+454 val_454 1
+455 val_455 1
+457 val_457 1
+458 val_458 1
+459 val_459 1
+460 val_460 1
+462 val_462 1
+463 val_463 1
+466 val_466 1
+467 val_467 1
+468 val_468 1
+469 val_469 1
+47 val_47 1
+470 val_470 1
+472 val_472 1
+475 val_475 1
+477 val_477 1
+478 val_478 1
+479 val_479 1
+480 val_480 1
+481 val_481 1
+482 val_482 1
+483 val_483 1
+484 val_484 1
+485 val_485 1
+487 val_487 1
+489 val_489 1
+490 val_490 1
+491 val_491 1
+492 val_492 1
+493 val_493 1
+494 val_494 1
+495 val_495 1
+496 val_496 1
+497 val_497 1
+498 val_498 1
+5 val_5 1
+51 val_51 1
+53 val_53 1
+54 val_54 1
+57 val_57 1
+58 val_58 1
+64 val_64 1
+65 val_65 1
+66 val_66 1
+67 val_67 1
+69 val_69 1
+70 val_70 1
+72 val_72 1
+74 val_74 1
+76 val_76 1
+77 val_77 1
+78 val_78 1
+8 val_8 1
+80 val_80 1
+82 val_82 1
+83 val_83 1
+84 val_84 1
+85 val_85 1
+86 val_86 1
+87 val_87 1
+9 val_9 1
+90 val_90 1
+92 val_92 1
+95 val_95 1
+96 val_96 1
+97 val_97 1
+98 val_98 1
+tst1 500 1
+PREHOOK: query: DROP TABLE IF EXISTS DEST1_acid_1
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@dest1_acid_1
+PREHOOK: Output: default@dest1_acid_1
+POSTHOOK: query: DROP TABLE IF EXISTS DEST1_acid_1
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@dest1_acid_1
+POSTHOOK: Output: default@dest1_acid_1
+PREHOOK: query: DROP TABLE IF EXISTS DEST1_acid_2
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@dest1_acid_2
+PREHOOK: Output: default@dest1_acid_2
+POSTHOOK: query: DROP TABLE IF EXISTS DEST1_acid_2
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@dest1_acid_2
+POSTHOOK: Output: default@dest1_acid_2
+PREHOOK: query: DROP TABLE IF EXISTS DEST1_acid_3
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@dest1_acid_3
+PREHOOK: Output: default@dest1_acid_3
+POSTHOOK: query: DROP TABLE IF EXISTS DEST1_acid_3
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@dest1_acid_3
+POSTHOOK: Output: default@dest1_acid_3
+PREHOOK: query: DROP TABLE IF EXISTS DEST1_acid_4
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@dest1_acid_4
+PREHOOK: Output: default@dest1_acid_4
+POSTHOOK: query: DROP TABLE IF EXISTS DEST1_acid_4
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@dest1_acid_4
+POSTHOOK: Output: default@dest1_acid_4
+PREHOOK: query: DROP TABLE IF EXISTS DEST1_acid_5
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@dest1_acid_5
+PREHOOK: Output: default@dest1_acid_5
+POSTHOOK: query: DROP TABLE IF EXISTS DEST1_acid_5
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@dest1_acid_5
+POSTHOOK: Output: default@dest1_acid_5
+PREHOOK: query: DROP TABLE IF EXISTS DEST1_acid_6
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@dest1_acid_6
+PREHOOK: Output: default@dest1_acid_6
+POSTHOOK: query: DROP TABLE IF EXISTS DEST1_acid_6
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@dest1_acid_6
+POSTHOOK: Output: default@dest1_acid_6
diff --git a/ql/src/test/results/clientpositive/mm_all.q.out b/ql/src/test/results/clientpositive/mm_all.q.out
index 143ebd6..16ba2f2 100644
--- a/ql/src/test/results/clientpositive/mm_all.q.out
+++ b/ql/src/test/results/clientpositive/mm_all.q.out
@@ -1647,6 +1647,7 @@ POSTHOOK: Input: default@intermediate_n0@p=455
POSTHOOK: Input: default@intermediate_n0@p=456
POSTHOOK: Input: default@intermediate_n0@p=457
POSTHOOK: Output: default@multi1_mm@p=1
+POSTHOOK: Output: default@multi1_mm@p=2
POSTHOOK: Output: default@multi1_mm@p=455
POSTHOOK: Output: default@multi1_mm@p=456
POSTHOOK: Output: default@multi1_mm@p=457
@@ -1660,6 +1661,8 @@ POSTHOOK: Lineage: ###Masked###
POSTHOOK: Lineage: ###Masked###
POSTHOOK: Lineage: ###Masked###
POSTHOOK: Lineage: ###Masked###
+POSTHOOK: Lineage: ###Masked###
+POSTHOOK: Lineage: ###Masked###
PREHOOK: query: select key, key2, p from multi1_mm order by key, key2, p
PREHOOK: type: QUERY
PREHOOK: Input: default@multi1_mm
@@ -1727,6 +1730,18 @@ POSTHOOK: Input: default@intermediate_n0@p=455
POSTHOOK: Input: default@intermediate_n0@p=456
POSTHOOK: Input: default@intermediate_n0@p=457
POSTHOOK: Output: default@multi1_mm@p=1
+POSTHOOK: Output: default@multi1_mm@p=2
+POSTHOOK: Output: default@multi1_mm@p=455
+POSTHOOK: Output: default@multi1_mm@p=456
+POSTHOOK: Output: default@multi1_mm@p=457
+POSTHOOK: Lineage: ###Masked###
+POSTHOOK: Lineage: ###Masked###
+POSTHOOK: Lineage: ###Masked###
+POSTHOOK: Lineage: ###Masked###
+POSTHOOK: Lineage: ###Masked###
+POSTHOOK: Lineage: ###Masked###
+POSTHOOK: Lineage: ###Masked###
+POSTHOOK: Lineage: ###Masked###
POSTHOOK: Lineage: ###Masked###
POSTHOOK: Lineage: ###Masked###
POSTHOOK: Lineage: ###Masked###
diff --git a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
index 35a220f..6101caa 100644
--- a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
+++ b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
@@ -391,7 +391,7 @@ public class TestStreaming {
rs = queryTable(driver, "select ROW__ID, a, b, INPUT__FILE__NAME from default.streamingnobuckets order by ROW__ID");
Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\tfoo\tbar"));
- Assert.assertTrue(rs.get(0), rs.get(0).endsWith("streamingnobuckets/delta_0000001_0000001_0000/bucket_00000"));
+ Assert.assertTrue(rs.get(0), rs.get(0).endsWith("streamingnobuckets/delta_0000001_0000001_0000/bucket_00000_0"));
Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\ta1\tb2"));
Assert.assertTrue(rs.get(1), rs.get(1).endsWith("streamingnobuckets/delta_0000002_0000003/bucket_00000"));
Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4"));
@@ -569,7 +569,7 @@ public class TestStreaming {
+ "INPUT__FILE__NAME from default.writeidconnection order by a");
Assert.assertEquals(4, rs.size());
Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\ta0\tbar"));
- Assert.assertTrue(rs.get(0), rs.get(0).endsWith("bucket_00000"));
+ Assert.assertTrue(rs.get(0), rs.get(0).endsWith("bucket_00000_0"));
Assert.assertTrue(rs.get(1), rs.get(1).contains("\"rowid\":0}\ta1\tb2"));
Assert.assertTrue(rs.get(1), rs.get(1).endsWith("bucket_00000"));
Assert.assertTrue(rs.get(2), rs.get(2).contains("\"rowid\":1}\ta3\tb4"));
@@ -727,7 +727,7 @@ public class TestStreaming {
rs = queryTable(driver, "select ROW__ID, a, b, INPUT__FILE__NAME from default.streamingnobuckets order by ROW__ID");
Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\tfoo\tbar"));
- Assert.assertTrue(rs.get(0), rs.get(0).endsWith("streamingnobuckets/delta_0000001_0000001_0000/bucket_00000"));
+ Assert.assertTrue(rs.get(0), rs.get(0).endsWith("streamingnobuckets/delta_0000001_0000001_0000/bucket_00000_0"));
Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\ta1\tb2"));
Assert.assertTrue(rs.get(1), rs.get(1).endsWith("streamingnobuckets/delta_0000002_0000003/bucket_00000"));
Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4"));