You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2017/12/01 02:40:06 UTC
[2/3] hive git commit: HIVE-17361 Support LOAD DATA for transactional
tables (Eugene Koifman, reviewed by Alan Gates)
http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
index 95a60dc..73f27e5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
@@ -88,11 +88,11 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
*/
private int statementId;//sort on this descending, like currentTransactionId
- public ReaderKey() {
+ ReaderKey() {
this(-1, -1, -1, -1, 0);
}
- public ReaderKey(long originalTransaction, int bucket, long rowId,
+ ReaderKey(long originalTransaction, int bucket, long rowId,
long currentTransactionId) {
this(originalTransaction, bucket, rowId, currentTransactionId, 0);
}
@@ -196,6 +196,34 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
void next(OrcStruct next) throws IOException;
}
/**
+ * Used when base_x/bucket_N is missing - makes control flow a bit easier
+ */
+ private class EmptyReaderPair implements ReaderPair {
+ @Override public OrcStruct nextRecord() {
+ return null;
+ }
+ @Override public int getColumns() {
+ return 0;
+ }
+ @Override public RecordReader getRecordReader() {
+ return null;
+ }
+ @Override public Reader getReader() {
+ return null;
+ }
+ @Override public RecordIdentifier getMinKey() {
+ return null;
+ }
+ @Override public RecordIdentifier getMaxKey() {
+ return null;
+ }
+ @Override public ReaderKey getKey() {
+ return null;
+ }
+ @Override public void next(OrcStruct next) throws IOException {
+ }
+ }
+ /**
* A reader and the next record from that reader. The code reads ahead so that
* we can return the lowest ReaderKey from each of the readers. Thus, the
* next available row is nextRecord and only following records are still in
@@ -209,6 +237,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
private final ReaderKey key;
private final RecordIdentifier minKey;
private final RecordIdentifier maxKey;
+ @Deprecated//HIVE-18158
private final int statementId;
/**
@@ -320,12 +349,18 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
private final ReaderKey key;
final int bucketId;
final int bucketProperty;
+ /**
+ * TransactionId to use when generating synthetic ROW_IDs
+ */
+ final long transactionId;
- OriginalReaderPair(ReaderKey key, int bucketId, Configuration conf) throws IOException {
+ OriginalReaderPair(ReaderKey key, int bucketId, Configuration conf, Options mergeOptions,
+ int statementId) throws IOException {
this.key = key;
this.bucketId = bucketId;
assert bucketId >= 0 : "don't support non-bucketed tables yet";
- this.bucketProperty = encodeBucketId(conf, bucketId);
+ this.bucketProperty = encodeBucketId(conf, bucketId, statementId);
+ transactionId = mergeOptions.getTransactionId();
}
@Override public final OrcStruct nextRecord() {
return nextRecord;
@@ -337,7 +372,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
@Override
public final ReaderKey getKey() { return key; }
/**
- * The cumulative number of row in all files of the logical bucket that precede the file
+ * The cumulative number of rows in all files of the logical bucket that precede the file
* represented by {@link #getRecordReader()}
*/
abstract long getRowIdOffset();
@@ -355,9 +390,9 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
new IntWritable(OrcRecordUpdater.INSERT_OPERATION);
nextRecord().setFieldValue(OrcRecordUpdater.OPERATION, operation);
nextRecord().setFieldValue(OrcRecordUpdater.CURRENT_TRANSACTION,
- new LongWritable(0));
+ new LongWritable(transactionId));
nextRecord().setFieldValue(OrcRecordUpdater.ORIGINAL_TRANSACTION,
- new LongWritable(0));
+ new LongWritable(transactionId));
nextRecord().setFieldValue(OrcRecordUpdater.BUCKET,
new IntWritable(bucketProperty));
nextRecord().setFieldValue(OrcRecordUpdater.ROW_ID,
@@ -369,17 +404,17 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
((IntWritable) next.getFieldValue(OrcRecordUpdater.OPERATION))
.set(OrcRecordUpdater.INSERT_OPERATION);
((LongWritable) next.getFieldValue(OrcRecordUpdater.ORIGINAL_TRANSACTION))
- .set(0);
+ .set(transactionId);
((IntWritable) next.getFieldValue(OrcRecordUpdater.BUCKET))
.set(bucketProperty);
((LongWritable) next.getFieldValue(OrcRecordUpdater.CURRENT_TRANSACTION))
- .set(0);
+ .set(transactionId);
((LongWritable) next.getFieldValue(OrcRecordUpdater.ROW_ID))
.set(nextRowId);
nextRecord().setFieldValue(OrcRecordUpdater.ROW,
getRecordReader().next(OrcRecordUpdater.getRow(next)));
}
- key.setValues(0L, bucketProperty, nextRowId, 0L, 0);
+ key.setValues(transactionId, bucketProperty, nextRowId, transactionId, 0);
if (getMaxKey() != null && key.compareRow(getMaxKey()) > 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("key " + key + " > maxkey " + getMaxKey());
@@ -391,9 +426,15 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
return false;//reached EndOfFile
}
}
- static int encodeBucketId(Configuration conf, int bucketId) {
- return BucketCodec.V1.encode(new AcidOutputFormat.Options(conf).bucket(bucketId));
+ static int encodeBucketId(Configuration conf, int bucketId, int statementId) {
+ return BucketCodec.V1.encode(new AcidOutputFormat.Options(conf).bucket(bucketId)
+ .statementId(statementId));
}
+ /**
+ * This handles normal read (as opposed to Compaction) of a {@link AcidUtils.AcidBaseFileType#ORIGINAL_BASE}
+ * file. These may be a result of Load Data or it may be a file that was written to the table
+ * before it was converted to acid.
+ */
@VisibleForTesting
final static class OriginalReaderPairToRead extends OriginalReaderPair {
private final long rowIdOffset;
@@ -401,12 +442,11 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
private final RecordReader recordReader;
private final RecordIdentifier minKey;
private final RecordIdentifier maxKey;
-
OriginalReaderPairToRead(ReaderKey key, Reader reader, int bucketId,
final RecordIdentifier minKey, final RecordIdentifier maxKey,
Reader.Options options, Options mergerOptions, Configuration conf,
- ValidTxnList validTxnList) throws IOException {
- super(key, bucketId, conf);
+ ValidTxnList validTxnList, int statementId) throws IOException {
+ super(key, bucketId, conf, mergerOptions, statementId);
this.reader = reader;
assert !mergerOptions.isCompacting();
assert mergerOptions.getRootPath() != null : "Since we have original files";
@@ -426,6 +466,11 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
boolean haveSeenCurrentFile = false;
long rowIdOffsetTmp = 0;
{
+ /**
+ * Note that for reading base_x/ or delta_x_x/ with non-acid schema,
+ * {@link Options#getRootPath()} is set to base_x/ or delta_x_x/ which causes all it's
+ * contents to be in {@link org.apache.hadoop.hive.ql.io.AcidUtils.Directory#getOriginalFiles()}
+ */
//the split is from something other than the 1st file of the logical bucket - compute offset
AcidUtils.Directory directoryState = AcidUtils.getAcidState(mergerOptions.getRootPath(),
conf, validTxnList, false, true);
@@ -458,7 +503,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
if (rowIdOffset > 0) {
//rowIdOffset could be 0 if all files before current one are empty
/**
- * Since we already done {@link OrcRawRecordMerger#discoverOriginalKeyBounds(Reader, int, Reader.Options, Configuration)}
+ * Since we already done {@link OrcRawRecordMerger#discoverOriginalKeyBounds(Reader, int, Reader.Options, Configuration, Options)}
* need to fix min/max key since these are used by
* {@link #next(OrcStruct)} which uses {@link #rowIdOffset} to generate rowId for
* the key. Clear? */
@@ -469,7 +514,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
* If this is not the 1st file, set minKey 1 less than the start of current file
* (Would not need to set minKey if we knew that there are no delta files)
* {@link #advanceToMinKey()} needs this */
- newMinKey = new RecordIdentifier(0, bucketProperty,rowIdOffset - 1);
+ newMinKey = new RecordIdentifier(transactionId, bucketProperty,rowIdOffset - 1);
}
if (maxKey != null) {
maxKey.setRowId(maxKey.getRowId() + rowIdOffset);
@@ -482,7 +527,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
* of the file so we want to leave it blank to make sure any insert events in delta
* files are included; Conversely, if it's not the last file, set the maxKey so that
* events from deltas that don't modify anything in the current split are excluded*/
- newMaxKey = new RecordIdentifier(0, bucketProperty,
+ newMaxKey = new RecordIdentifier(transactionId, bucketProperty,
rowIdOffset + reader.getNumberOfRows() - 1);
}
this.minKey = newMinKey;
@@ -532,8 +577,8 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
OriginalReaderPairToCompact(ReaderKey key, int bucketId,
Reader.Options options, Options mergerOptions, Configuration conf,
- ValidTxnList validTxnList) throws IOException {
- super(key, bucketId, conf);
+ ValidTxnList validTxnList, int statementId) throws IOException {
+ super(key, bucketId, conf, mergerOptions, statementId);
assert mergerOptions.isCompacting() : "Should only be used for Compaction";
this.conf = conf;
this.options = options;
@@ -544,9 +589,15 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
assert options.getMaxOffset() == Long.MAX_VALUE;
AcidUtils.Directory directoryState = AcidUtils.getAcidState(
mergerOptions.getRootPath(), conf, validTxnList, false, true);
+ /**
+ * Note that for reading base_x/ or delta_x_x/ with non-acid schema,
+ * {@link Options#getRootPath()} is set to base_x/ or delta_x_x/ which causes all it's
+ * contents to be in {@link org.apache.hadoop.hive.ql.io.AcidUtils.Directory#getOriginalFiles()}
+ */
originalFiles = directoryState.getOriginalFiles();
assert originalFiles.size() > 0;
- this.reader = advanceToNextFile();//in case of Compaction, this is the 1st file of the current bucket
+ //in case of Compaction, this is the 1st file of the current bucket
+ this.reader = advanceToNextFile();
if (reader == null) {
//Compactor generated a split for a bucket that has no data?
throw new IllegalStateException("No 'original' files found for bucketId=" + this.bucketId +
@@ -655,7 +706,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
*/
private KeyInterval discoverOriginalKeyBounds(Reader reader, int bucket,
Reader.Options options,
- Configuration conf) throws IOException {
+ Configuration conf, Options mergerOptions) throws IOException {
long rowLength = 0;
long rowOffset = 0;
long offset = options.getOffset();//this would usually be at block boundary
@@ -663,7 +714,9 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
boolean isTail = true;
RecordIdentifier minKey = null;
RecordIdentifier maxKey = null;
- int bucketProperty = encodeBucketId(conf, bucket);
+ TransactionMetaData tfp = TransactionMetaData.findTransactionIDForSynthetcRowIDs(
+ mergerOptions.getBucketPath(), mergerOptions.getRootPath(), conf);
+ int bucketProperty = encodeBucketId(conf, bucket, tfp.statementId);
/**
* options.getOffset() and getMaxOffset() would usually be at block boundary which doesn't
* necessarily match stripe boundary. So we want to come up with minKey to be one before the 1st
@@ -755,13 +808,15 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
* {@link OrcRawRecordMerger} Acid reader is used slightly differently in various contexts.
* This makes the "context" explicit.
*/
- static class Options {
+ static class Options implements Cloneable {
private int copyIndex = 0;
private boolean isCompacting = false;
private Path bucketPath;
private Path rootPath;
+ private Path baseDir;
private boolean isMajorCompaction = false;
private boolean isDeleteReader = false;
+ private long transactionId = 0;
Options copyIndex(int copyIndex) {
assert copyIndex >= 0;
this.copyIndex = copyIndex;
@@ -790,6 +845,14 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
assert !isCompacting;
return this;
}
+ Options transactionId(long transactionId) {
+ this.transactionId = transactionId;
+ return this;
+ }
+ Options baseDir(Path baseDir) {
+ this.baseDir = baseDir;
+ return this;
+ }
/**
* 0 means it's the original file, without {@link Utilities#COPY_KEYWORD} suffix
*/
@@ -825,13 +888,48 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
boolean isDeleteReader() {
return isDeleteReader;
}
+ /**
+ * for reading "original" files - i.e. not native acid schema. Default value of 0 is
+ * appropriate for files that existed in a table before it was made transactional. 0 is the
+ * primordial transaction. For non-native files resulting from Load Data command, they
+ * are located and base_x or delta_x_x and then transactionId == x.
+ */
+ long getTransactionId() {
+ return transactionId;
+ }
+
+ /**
+ * In case of isMajorCompaction() this is the base dir from the Compactor, i.e. either a base_x
+ * or {@link #rootPath} if it's the 1st major compaction after non-acid2acid conversion
+ */
+ Path getBaseDir() {
+ return baseDir;
+ }
+ /**
+ * shallow clone
+ */
+ public Options clone() {
+ try {
+ return (Options) super.clone();
+ }
+ catch(CloneNotSupportedException ex) {
+ throw new AssertionError();
+ }
+ }
}
/**
- * Create a reader that merge sorts the ACID events together.
+ * Create a reader that merge sorts the ACID events together. This handles
+ * 1. 'normal' reads on behalf of a query (non vectorized)
+ * 2. Compaction reads (major/minor)
+ * 3. Delete event reads - to create a sorted view of all delete events for vectorized read
+ *
+ * This makes the logic in the constructor confusing and needs to be refactored. Liberal use of
+ * asserts below is primarily for documentation purposes.
+ *
* @param conf the configuration
* @param collapseEvents should the events on the same row be collapsed
- * @param isOriginal is the base file a pre-acid file
- * @param bucket the bucket we are reading
+ * @param isOriginal if reading filws w/o acid schema - {@link AcidUtils.AcidBaseFileType#ORIGINAL_BASE}
+ * @param bucket the bucket/writer id of the file we are reading
* @param options the options to read with
* @param deltaDirectory the list of delta directories to include
* @throws IOException
@@ -887,11 +985,15 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
objectInspector = OrcRecordUpdater.createEventSchema
(OrcStruct.createObjectInspector(0, OrcUtils.getOrcTypes(typeDescr)));
+ assert !(mergerOptions.isCompacting() && reader != null) : "don't need a reader for compaction";
// modify the options to reflect the event instead of the base row
Reader.Options eventOptions = createEventOptions(options);
+ //suppose it's the first Major compaction so we only have deltas
+ boolean isMajorNoBase = mergerOptions.isCompacting() && mergerOptions.isMajorCompaction()
+ && mergerOptions.getBaseDir() == null;
if((mergerOptions.isCompacting() && mergerOptions.isMinorCompaction()) ||
- mergerOptions.isDeleteReader()) {
+ mergerOptions.isDeleteReader() || isMajorNoBase) {
//for minor compaction, there is no progress report and we don't filter deltas
baseReader = null;
minKey = maxKey = null;
@@ -906,27 +1008,68 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
} else {
// find the min/max based on the offset and length (and more for 'original')
if (isOriginal) {
- keyInterval = discoverOriginalKeyBounds(reader, bucket, options, conf);
+ //note that this KeyInterval may be adjusted later due to copy_N files
+ keyInterval = discoverOriginalKeyBounds(reader, bucket, options, conf, mergerOptions);
} else {
keyInterval = discoverKeyBounds(reader, options);
}
}
LOG.info("min key = " + keyInterval.getMinKey() + ", max key = " + keyInterval.getMaxKey());
// use the min/max instead of the byte range
- ReaderPair pair;
+ ReaderPair pair = null;
ReaderKey key = new ReaderKey();
if (isOriginal) {
options = options.clone();
if(mergerOptions.isCompacting()) {
- pair = new OriginalReaderPairToCompact(key, bucket, options, mergerOptions,
- conf, validTxnList);
+ assert mergerOptions.isMajorCompaction();
+ Options readerPairOptions = mergerOptions;
+ if(mergerOptions.getBaseDir().getName().startsWith(AcidUtils.BASE_PREFIX)) {
+ readerPairOptions = modifyForNonAcidSchemaRead(mergerOptions,
+ AcidUtils.parseBase(mergerOptions.getBaseDir()), mergerOptions.getBaseDir());
+ }
+ pair = new OriginalReaderPairToCompact(key, bucket, options, readerPairOptions,
+ conf, validTxnList,
+ 0);//0 since base_x doesn't have a suffix (neither does pre acid write)
} else {
+ assert mergerOptions.getBucketPath() != null : " since this is not compaction: "
+ + mergerOptions.getRootPath();
+ //if here it's a non-acid schema file - check if from before table was marked transactional
+ //or in base_x/delta_x_x from Load Data
+ Options readerPairOptions = mergerOptions;
+ TransactionMetaData tfp = TransactionMetaData.findTransactionIDForSynthetcRowIDs(
+ mergerOptions.getBucketPath(), mergerOptions.getRootPath(), conf);
+ if(tfp.syntheticTransactionId > 0) {
+ readerPairOptions = modifyForNonAcidSchemaRead(mergerOptions,
+ tfp.syntheticTransactionId, tfp.folder);
+ }
pair = new OriginalReaderPairToRead(key, reader, bucket, keyInterval.getMinKey(),
- keyInterval.getMaxKey(), options, mergerOptions, conf, validTxnList);
+ keyInterval.getMaxKey(), options, readerPairOptions, conf, validTxnList, tfp.statementId);
}
} else {
- pair = new ReaderPairAcid(key, reader, keyInterval.getMinKey(), keyInterval.getMaxKey(),
- eventOptions, 0);
+ if(mergerOptions.isCompacting()) {
+ assert mergerOptions.isMajorCompaction() : "expected major compaction: "
+ + mergerOptions.getBaseDir() + ":" + bucket;
+ assert mergerOptions.getBaseDir() != null : "no baseDir?: " + mergerOptions.getRootPath();
+ //we are compacting and it's acid schema so create a reader for the 1st bucket file that is not empty
+ FileSystem fs = mergerOptions.getBaseDir().getFileSystem(conf);
+ Path bucketPath = AcidUtils.createBucketFile(mergerOptions.getBaseDir(), bucket);
+ if(fs.exists(bucketPath) && fs.getFileStatus(bucketPath).getLen() > 0) {
+ //doing major compaction - it's possible where full compliment of bucket files is not
+ //required (on Tez) that base_x/ doesn't have a file for 'bucket'
+ reader = OrcFile.createReader(bucketPath, OrcFile.readerOptions(conf));
+ pair = new ReaderPairAcid(key, reader, keyInterval.getMinKey(), keyInterval.getMaxKey(),
+ eventOptions, 0);
+ }
+ else {
+ pair = new EmptyReaderPair();
+ LOG.info("No non-empty " + bucketPath + " was found for Major compaction");
+ }
+ }
+ else {
+ assert reader != null : "no reader? " + mergerOptions.getRootPath();
+ pair = new ReaderPairAcid(key, reader, keyInterval.getMinKey(), keyInterval.getMaxKey(),
+ eventOptions, 0);
+ }
}
minKey = pair.getMinKey();
maxKey = pair.getMaxKey();
@@ -937,11 +1080,13 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
}
baseReader = pair.getRecordReader();
}
-
- if (deltaDirectory != null) {
- /*whatever SARG maybe applicable to base it's not applicable to delete_delta since it has no
- * user columns
- * HIVE-17320: we should compute a SARG to push down min/max key to delete_delta*/
+ /*now process the delta files. For normal read these should only be delete deltas. For
+ * Compaction these may be any delta_x_y/. The files inside any delta_x_y/ may be in Acid
+ * format (i.e. with Acid metadata columns) or 'original'.*/
+ if (deltaDirectory != null && deltaDirectory.length > 0) {
+ /*For reads, whatever SARG maybe applicable to base it's not applicable to delete_delta since it has no
+ * user columns. For Compaction there is never a SARG.
+ * */
Reader.Options deltaEventOptions = eventOptions.clone()
.searchArgument(null, null).range(0, Long.MAX_VALUE);
for(Path delta: deltaDirectory) {
@@ -950,17 +1095,50 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
throw new IllegalStateException(delta + " is not delete delta and is not compacting.");
}
ReaderKey key = new ReaderKey();
- AcidUtils.ParsedDelta deltaDir = AcidUtils.parsedDelta(delta);
+ AcidUtils.ParsedDelta deltaDir = AcidUtils.parsedDelta(delta, delta.getFileSystem(conf));
+ if(deltaDir.isRawFormat()) {
+ assert !deltaDir.isDeleteDelta() : delta.toString();
+ assert mergerOptions.isCompacting() : "during regular read anything which is not a" +
+ " delete_delta is treated like base: " + delta;
+ Options rawCompactOptions = modifyForNonAcidSchemaRead(mergerOptions,
+ deltaDir.getMinTransaction(), delta);
+ //this will also handle copy_N files if any
+ ReaderPair deltaPair = new OriginalReaderPairToCompact(key, bucket, options,
+ rawCompactOptions, conf, validTxnList, deltaDir.getStatementId());
+ if (deltaPair.nextRecord() != null) {
+ readers.put(key, deltaPair);
+ }
+ continue;
+ }
for (Path deltaFile : getDeltaFiles(delta, bucket, conf, mergerOptions, isBucketed)) {
FileSystem fs = deltaFile.getFileSystem(conf);
if(!fs.exists(deltaFile)) {
+ /**
+ * it's possible that the file for a specific {@link bucket} doesn't exist in any given
+ * delta since since no rows hashed to it (and not configured to create empty buckets)
+ */
continue;
}
+ if(deltaDir.isDeleteDelta()) {
+ //if here it maybe compaction or regular read or Delete event sorter
+ //in the later 2 cases we should do:
+ //HIVE-17320: we should compute a SARG to push down min/max key to delete_delta
+ Reader deltaReader = OrcFile.createReader(deltaFile, OrcFile.readerOptions(conf));
+ ReaderPair deltaPair = new ReaderPairAcid(key, deltaReader, minKey, maxKey,
+ deltaEventOptions, deltaDir.getStatementId());
+ if (deltaPair.nextRecord() != null) {
+ readers.put(key, deltaPair);
+ }
+ continue;
+ }
+ //if here then we must be compacting
+ assert mergerOptions.isCompacting() : "not compacting and not delete delta : " + delta;
/* side files are only created by streaming ingest. If this is a compaction, we may
* have an insert delta/ here with side files there because the original writer died.*/
long length = AcidUtils.getLogicalLength(fs, fs.getFileStatus(deltaFile));
assert length >= 0;
Reader deltaReader = OrcFile.createReader(deltaFile, OrcFile.readerOptions(conf).maxLength(length));
+ //must get statementId from file name since Acid 1.0 doesn't write it into bucketProperty
ReaderPairAcid deltaPair = new ReaderPairAcid(key, deltaReader, minKey, maxKey,
deltaEventOptions, deltaDir.getStatementId());
if (deltaPair.nextRecord() != null) {
@@ -988,6 +1166,76 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
}
/**
+ * For use with Load Data statement which places {@link AcidUtils.AcidBaseFileType#ORIGINAL_BASE}
+ * type files into a base_x/ or delta_x_x. The data in these are then assigned ROW_IDs at read
+ * time and made permanent at compaction time. This is identical to how 'original' files (i.e.
+ * those that existed in the table before it was converted to an Acid table) except that the
+ * transaction ID to use in the ROW_ID should be that of the transaction that ran the Load Data.
+ */
+ static final class TransactionMetaData {
+ final long syntheticTransactionId;
+ /**
+ * folder which determines the transaction id to use in synthetic ROW_IDs
+ */
+ final Path folder;
+ final int statementId;
+ TransactionMetaData(long syntheticTransactionId, Path folder) {
+ this(syntheticTransactionId, folder, 0);
+ }
+ TransactionMetaData(long syntheticTransactionId, Path folder, int statementId) {
+ this.syntheticTransactionId = syntheticTransactionId;
+ this.folder = folder;
+ this.statementId = statementId;
+ }
+ static TransactionMetaData findTransactionIDForSynthetcRowIDs(Path splitPath, Path rootPath,
+ Configuration conf) throws IOException {
+ Path parent = splitPath.getParent();
+ if(rootPath.equals(parent)) {
+ //the 'isOriginal' file is at the root of the partition (or table) thus it is
+ //from a pre-acid conversion write and belongs to primordial txnid:0.
+ return new TransactionMetaData(0, parent);
+ }
+ while(parent != null && !rootPath.equals(parent)) {
+ boolean isBase = parent.getName().startsWith(AcidUtils.BASE_PREFIX);
+ boolean isDelta = parent.getName().startsWith(AcidUtils.DELTA_PREFIX);
+ if(isBase || isDelta) {
+ if(isBase) {
+ return new TransactionMetaData(AcidUtils.parseBase(parent), parent);
+ }
+ else {
+ AcidUtils.ParsedDelta pd = AcidUtils.parsedDelta(parent, AcidUtils.DELTA_PREFIX,
+ parent.getFileSystem(conf));
+ assert pd.getMinTransaction() == pd.getMaxTransaction() :
+ "This a delta with raw non acid schema, must be result of single write, no compaction: "
+ + splitPath;
+ return new TransactionMetaData(pd.getMinTransaction(), parent, pd.getStatementId());
+ }
+ }
+ parent = parent.getParent();
+ }
+ if(parent == null) {
+ //spit is marked isOriginal but it's not an immediate child of a partition nor is it in a
+ //base/ or delta/ - this should never happen
+ throw new IllegalStateException("Cannot determine transaction id for original file "
+ + splitPath + " in " + rootPath);
+ }
+ //"warehouse/t/HIVE_UNION_SUBDIR_15/000000_0" is a meaningful path for nonAcid2acid
+ // converted table
+ return new TransactionMetaData(0, rootPath);
+ }
+ }
+ /**
+ * This is done to read non-acid schema files ("original") located in base_x/ or delta_x_x/ which
+ * happens as a result of Load Data statement. Setting {@code rootPath} to base_x/ or delta_x_x
+ * causes {@link AcidUtils#getAcidState(Path, Configuration, ValidTxnList)} in subsequent
+ * {@link OriginalReaderPair} object to return the files in this dir
+ * in {@link AcidUtils.Directory#getOriginalFiles()}
+ * @return modified clone of {@code baseOptions}
+ */
+ private Options modifyForNonAcidSchemaRead(Options baseOptions, long transactionId, Path rootPath) {
+ return baseOptions.clone().transactionId(transactionId).rootPath(rootPath);
+ }
+ /**
* This determines the set of {@link ReaderPairAcid} to create for a given delta/.
* For unbucketed tables {@code bucket} can be thought of as a write tranche.
*/
http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
index 315cc1d..8af38b2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
@@ -196,7 +196,9 @@ public class OrcRecordUpdater implements RecordUpdater {
fields.add(new OrcStruct.Field("row", rowInspector, ROW));
return new OrcStruct.OrcStructInspector(fields);
}
-
+ /**
+ * @param path - partition root
+ */
OrcRecordUpdater(Path path,
AcidOutputFormat.Options options) throws IOException {
this.options = options;
http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
index 58638b5..edffa5b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
@@ -51,6 +51,9 @@ public class OrcSplit extends FileSplit implements ColumnarSplit, LlapAwareSplit
private static final Logger LOG = LoggerFactory.getLogger(OrcSplit.class);
private OrcTail orcTail;
private boolean hasFooter;
+ /**
+ * This means {@link AcidUtils.AcidBaseFileType#ORIGINAL_BASE}
+ */
private boolean isOriginal;
private boolean hasBase;
//partition root
http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
index bcde4fc..d571bd0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
@@ -45,7 +45,6 @@ import org.apache.hadoop.hive.ql.io.RecordIdentifier;
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
import org.apache.hadoop.hive.shims.HadoopShims;
import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
import org.apache.orc.impl.AcidStats;
@@ -156,7 +155,7 @@ public class VectorizedOrcAcidRowBatchReader
this.vectorizedRowBatchBase = baseReader.createValue();
}
- private VectorizedOrcAcidRowBatchReader(JobConf conf, OrcSplit inputSplit, Reporter reporter,
+ private VectorizedOrcAcidRowBatchReader(JobConf conf, OrcSplit orcSplit, Reporter reporter,
VectorizedRowBatchCtx rowBatchCtx) throws IOException {
this.rbCtx = rowBatchCtx;
final boolean isAcidRead = HiveConf.getBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN);
@@ -165,12 +164,10 @@ public class VectorizedOrcAcidRowBatchReader
// This type of VectorizedOrcAcidRowBatchReader can only be created when split-update is
// enabled for an ACID case and the file format is ORC.
- boolean isReadNotAllowed = !isAcidRead || !acidOperationalProperties.isSplitUpdate()
- || !(inputSplit instanceof OrcSplit);
+ boolean isReadNotAllowed = !isAcidRead || !acidOperationalProperties.isSplitUpdate();
if (isReadNotAllowed) {
OrcInputFormat.raiseAcidTablesMustBeReadWithAcidReaderException(conf);
}
- final OrcSplit orcSplit = (OrcSplit) inputSplit;
reporter.setStatus(orcSplit.toString());
readerOptions = OrcRawRecordMerger.createEventOptions(OrcInputFormat.createOptionsForReader(conf));
@@ -226,9 +223,11 @@ public class VectorizedOrcAcidRowBatchReader
private static final class OffsetAndBucketProperty {
private final long rowIdOffset;
private final int bucketProperty;
- private OffsetAndBucketProperty(long rowIdOffset, int bucketProperty) {
+ private final long syntheticTxnId;
+ private OffsetAndBucketProperty(long rowIdOffset, int bucketProperty, long syntheticTxnId) {
this.rowIdOffset = rowIdOffset;
this.bucketProperty = bucketProperty;
+ this.syntheticTxnId = syntheticTxnId;
}
}
/**
@@ -240,17 +239,34 @@ public class VectorizedOrcAcidRowBatchReader
*
* todo: This logic is executed per split of every "original" file. The computed result is the
* same for every split form the same file so this could be optimized by moving it to
- * before/during splt computation and passing the info in the split. (HIVE-17917)
+ * before/during split computation and passing the info in the split. (HIVE-17917)
*/
private OffsetAndBucketProperty computeOffsetAndBucket(
OrcSplit split, JobConf conf,ValidTxnList validTxnList) throws IOException {
- if(!needSyntheticRowIds(split, !deleteEventRegistry.isEmpty(), rowIdProjected)) {
- return new OffsetAndBucketProperty(0,0);
+ if(!needSyntheticRowIds(split.isOriginal(), !deleteEventRegistry.isEmpty(), rowIdProjected)) {
+ if(split.isOriginal()) {
+ /**
+ * Even if we don't need to project ROW_IDs, we still need to check the transaction ID that
+ * created the file to see if it's committed. See more in
+ * {@link #next(NullWritable, VectorizedRowBatch)}. (In practice getAcidState() should
+ * filter out base/delta files but this makes fewer dependencies)
+ */
+ OrcRawRecordMerger.TransactionMetaData syntheticTxnInfo =
+ OrcRawRecordMerger.TransactionMetaData.findTransactionIDForSynthetcRowIDs(split.getPath(),
+ split.getRootDir(), conf);
+ return new OffsetAndBucketProperty(-1,-1,
+ syntheticTxnInfo.syntheticTransactionId);
+ }
+ return null;
}
long rowIdOffset = 0;
+ OrcRawRecordMerger.TransactionMetaData syntheticTxnInfo =
+ OrcRawRecordMerger.TransactionMetaData.findTransactionIDForSynthetcRowIDs(split.getPath(),
+ split.getRootDir(), conf);
int bucketId = AcidUtils.parseBaseOrDeltaBucketFilename(split.getPath(), conf).getBucketId();
- int bucketProperty = BucketCodec.V1.encode(new AcidOutputFormat.Options(conf).statementId(0).bucket(bucketId));
- AcidUtils.Directory directoryState = AcidUtils.getAcidState(split.getRootDir(), conf,
+ int bucketProperty = BucketCodec.V1.encode(new AcidOutputFormat.Options(conf)
+ .statementId(syntheticTxnInfo.statementId).bucket(bucketId));
+ AcidUtils.Directory directoryState = AcidUtils.getAcidState( syntheticTxnInfo.folder, conf,
validTxnList, false, true);
for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) {
AcidOutputFormat.Options bucketOptions =
@@ -266,7 +282,8 @@ public class VectorizedOrcAcidRowBatchReader
OrcFile.readerOptions(conf));
rowIdOffset += reader.getNumberOfRows();
}
- return new OffsetAndBucketProperty(rowIdOffset, bucketProperty);
+ return new OffsetAndBucketProperty(rowIdOffset, bucketProperty,
+ syntheticTxnInfo.syntheticTransactionId);
}
/**
* {@link VectorizedOrcAcidRowBatchReader} is always used for vectorized reads of acid tables.
@@ -284,7 +301,7 @@ public class VectorizedOrcAcidRowBatchReader
if(rbCtx == null) {
throw new IllegalStateException("Could not create VectorizedRowBatchCtx for " + split.getPath());
}
- return !needSyntheticRowIds(split, hasDeletes, areRowIdsProjected(rbCtx));
+ return !needSyntheticRowIds(split.isOriginal(), hasDeletes, areRowIdsProjected(rbCtx));
}
/**
@@ -292,8 +309,8 @@ public class VectorizedOrcAcidRowBatchReader
* Even if ROW__ID is not projected you still need to decorate the rows with them to see if
* any of the delete events apply.
*/
- private static boolean needSyntheticRowIds(OrcSplit split, boolean hasDeletes, boolean rowIdProjected) {
- return split.isOriginal() && (hasDeletes || rowIdProjected);
+ private static boolean needSyntheticRowIds(boolean isOriginal, boolean hasDeletes, boolean rowIdProjected) {
+ return isOriginal && (hasDeletes || rowIdProjected);
}
private static boolean areRowIdsProjected(VectorizedRowBatchCtx rbCtx) {
if(rbCtx.getVirtualColumnCount() == 0) {
@@ -316,7 +333,7 @@ public class VectorizedOrcAcidRowBatchReader
if (orcSplit.isOriginal()) {
root = orcSplit.getRootDir();
} else {
- root = path.getParent().getParent();
+ root = path.getParent().getParent();//todo: why not just use getRootDir()?
assert root.equals(orcSplit.getRootDir()) : "root mismatch: baseDir=" + orcSplit.getRootDir() +
" path.p.p=" + root;
}
@@ -398,7 +415,9 @@ public class VectorizedOrcAcidRowBatchReader
* If there are deletes and reading original file, we must produce synthetic ROW_IDs in order
* to see if any deletes apply
*/
- if(rowIdProjected || !deleteEventRegistry.isEmpty()) {
+ if(needSyntheticRowIds(true, !deleteEventRegistry.isEmpty(), rowIdProjected)) {
+ assert syntheticProps != null && syntheticProps.rowIdOffset >= 0 : "" + syntheticProps;
+ assert syntheticProps != null && syntheticProps.bucketProperty >= 0 : "" + syntheticProps;
if(innerReader == null) {
throw new IllegalStateException(getClass().getName() + " requires " +
org.apache.orc.RecordReader.class +
@@ -409,8 +428,7 @@ public class VectorizedOrcAcidRowBatchReader
*/
recordIdColumnVector.fields[0].noNulls = true;
recordIdColumnVector.fields[0].isRepeating = true;
- //all "original" is considered written by txnid:0 which committed
- ((LongColumnVector)recordIdColumnVector.fields[0]).vector[0] = 0;
+ ((LongColumnVector)recordIdColumnVector.fields[0]).vector[0] = syntheticProps.syntheticTxnId;
/**
* This is {@link RecordIdentifier#getBucketProperty()}
* Also see {@link BucketCodec}
@@ -433,15 +451,21 @@ public class VectorizedOrcAcidRowBatchReader
innerRecordIdColumnVector[OrcRecordUpdater.ORIGINAL_TRANSACTION] = recordIdColumnVector.fields[0];
innerRecordIdColumnVector[OrcRecordUpdater.BUCKET] = recordIdColumnVector.fields[1];
innerRecordIdColumnVector[OrcRecordUpdater.ROW_ID] = recordIdColumnVector.fields[2];
+ //these are insert events so (original txn == current) txn for all rows
+ innerRecordIdColumnVector[OrcRecordUpdater.CURRENT_TRANSACTION] = recordIdColumnVector.fields[0];
+ }
+ if(syntheticProps.syntheticTxnId > 0) {
+ //"originals" (written before table was converted to acid) is considered written by
+ // txnid:0 which is always committed so there is no need to check wrt invalid transactions
+ //But originals written by Load Data for example can be in base_x or delta_x_x so we must
+ //check if 'x' is committed or not evn if ROW_ID is not needed in the Operator pipeline.
+ findRecordsWithInvalidTransactionIds(innerRecordIdColumnVector,
+ vectorizedRowBatchBase.size, selectedBitSet);
}
}
else {
// Case 1- find rows which belong to transactions that are not valid.
findRecordsWithInvalidTransactionIds(vectorizedRowBatchBase, selectedBitSet);
- /**
- * All "original" data belongs to txnid:0 and is always valid/committed for every reader
- * So only do findRecordsWithInvalidTransactionIds() wrt {@link validTxnList} for !isOriginal
- */
}
// Case 2- find rows which have been deleted.
@@ -473,11 +497,6 @@ public class VectorizedOrcAcidRowBatchReader
}
else {
// Finally, link up the columnVector from the base VectorizedRowBatch to outgoing batch.
- // NOTE: We only link up the user columns and not the ACID metadata columns because this
- // vectorized code path is not being used in cases of update/delete, when the metadata columns
- // would be expected to be passed up the operator pipeline. This is because
- // currently the update/delete specifically disable vectorized code paths.
- // This happens at ql/exec/Utilities.java::3293 when it checks for mapWork.getVectorMode()
StructColumnVector payloadStruct = (StructColumnVector) vectorizedRowBatchBase.cols[OrcRecordUpdater.ROW];
// Transfer columnVector objects from base batch to outgoing batch.
System.arraycopy(payloadStruct.fields, 0, value.cols, 0, value.getDataColumnCount());
http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
index f7388a4..736034d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
@@ -27,12 +27,10 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
-import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.InputFormatChecker;
import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface;
import org.apache.hadoop.io.NullWritable;
http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index 6fb0c43..fdb3603 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hive.ql.plan.LockDatabaseDesc;
import org.apache.hadoop.hive.ql.plan.LockTableDesc;
import org.apache.hadoop.hive.ql.plan.UnlockDatabaseDesc;
import org.apache.hadoop.hive.ql.plan.UnlockTableDesc;
+import org.apache.hadoop.hive.ql.plan.api.Query;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hive.common.util.ShutdownHookManager;
import org.slf4j.Logger;
@@ -297,6 +298,10 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
break;
default:
if(!queryPlan.getOperation().isAllowedInTransaction() && isExplicitTransaction) {
+ if(allowOperationInATransaction(queryPlan)) {
+ break;
+ }
+ //look at queryPlan.outputs(WriteEntity.t - that's the table)
//for example, drop table in an explicit txn is not allowed
//in some cases this requires looking at more than just the operation
//for example HiveOperation.LOAD - OK if target is MM table but not OK if non-acid table
@@ -311,6 +316,33 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
any non acid and raise an appropriate error
* Driver.acidSinks and Driver.acidInQuery can be used if any acid is in the query*/
}
+
+ /**
+ * This modifies the logic wrt what operations are allowed in a transaction. Multi-statement
+ * transaction support is incomplete but it makes some Acid tests cases much easier to write.
+ */
+ private boolean allowOperationInATransaction(QueryPlan queryPlan) {
+ //Acid and MM tables support Load Data with transactional semantics. This will allow Load Data
+ //in a txn assuming we can determine the target is a suitable table type.
+ if(queryPlan.getOperation() == HiveOperation.LOAD && queryPlan.getOutputs() != null && queryPlan.getOutputs().size() == 1) {
+ WriteEntity writeEntity = queryPlan.getOutputs().iterator().next();
+ if(AcidUtils.isFullAcidTable(writeEntity.getTable()) || AcidUtils.isInsertOnlyTable(writeEntity.getTable())) {
+ switch (writeEntity.getWriteType()) {
+ case INSERT:
+ //allow operation in a txn
+ return true;
+ case INSERT_OVERWRITE:
+ //see HIVE-18154
+ return false;
+ default:
+ //not relevant for LOAD
+ return false;
+ }
+ }
+ }
+ //todo: handle Insert Overwrite as well: HIVE-18154
+ return false;
+ }
/**
* Normally client should call {@link #acquireLocks(org.apache.hadoop.hive.ql.QueryPlan, org.apache.hadoop.hive.ql.Context, String)}
* @param isBlocking if false, the method will return immediately; thus the locks may be in LockState.WAITING
http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 1a37bf7..9f2c6d8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -146,6 +146,7 @@ import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.index.HiveIndexHandler;
import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.optimizer.listbucketingpruner.ListBucketingPrunerUtils;
import org.apache.hadoop.hive.ql.plan.AddPartitionDesc;
@@ -1705,18 +1706,20 @@ public class Hive {
* location/inputformat/outputformat/serde details from table spec
* @param isSrcLocal
* If the source directory is LOCAL
- * @param isAcid
- * true if this is an ACID operation
+ * @param isAcidIUDoperation
+ * true if this is an ACID operation Insert/Update/Delete operation
* @param hasFollowingStatsTask
* true if there is a following task which updates the stats, so, this method need not update.
* @return Partition object being loaded with data
*/
public Partition loadPartition(Path loadPath, Table tbl, Map<String, String> partSpec,
LoadFileType loadFileType, boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir,
- boolean isSrcLocal, boolean isAcid, boolean hasFollowingStatsTask, Long txnId, int stmtId)
+ boolean isSrcLocal, boolean isAcidIUDoperation, boolean hasFollowingStatsTask, Long txnId, int stmtId)
throws HiveException {
Path tblDataLocationPath = tbl.getDataLocation();
boolean isMmTableWrite = AcidUtils.isInsertOnlyTable(tbl.getParameters());
+ assert tbl.getPath() != null : "null==getPath() for " + tbl.getTableName();
+ boolean isFullAcidTable = AcidUtils.isFullAcidTable(tbl);
try {
// Get the partition object if it already exists
Partition oldPart = getPartition(tbl, partSpec, false);
@@ -1768,7 +1771,7 @@ public class Hive {
if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
Utilities.FILE_OP_LOGGER.trace("not moving " + loadPath + " to " + newPartPath + " (MM)");
}
- assert !isAcid;
+ assert !isAcidIUDoperation;
if (areEventsForDmlNeeded(tbl, oldPart)) {
newFiles = listFilesCreatedByQuery(loadPath, txnId, stmtId);
}
@@ -1792,16 +1795,22 @@ public class Hive {
filter = (loadFileType == LoadFileType.REPLACE_ALL)
? new JavaUtils.IdPathFilter(txnId, stmtId, false, true) : filter;
}
+ else if(!isAcidIUDoperation && isFullAcidTable) {
+ destPath = fixFullAcidPathForLoadData(loadFileType, destPath, txnId, stmtId, tbl);
+ }
if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
Utilities.FILE_OP_LOGGER.trace("moving " + loadPath + " to " + destPath);
}
- if ((loadFileType == LoadFileType.REPLACE_ALL) || (oldPart == null && !isAcid)) {
+ //todo: why is "&& !isAcidIUDoperation" needed here?
+ if (!isFullAcidTable && ((loadFileType == LoadFileType.REPLACE_ALL) || (oldPart == null && !isAcidIUDoperation))) {
+ //for fullAcid tables we don't delete files for commands with OVERWRITE - we create a new
+ // base_x. (there is Insert Overwrite and Load Data Overwrite)
boolean isAutoPurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge"));
replaceFiles(tbl.getPath(), loadPath, destPath, oldPartPath, getConf(),
isSrcLocal, isAutoPurge, newFiles, filter, isMmTableWrite);
} else {
FileSystem fs = tbl.getDataLocation().getFileSystem(conf);
- copyFiles(conf, loadPath, destPath, fs, isSrcLocal, isAcid,
+ copyFiles(conf, loadPath, destPath, fs, isSrcLocal, isAcidIUDoperation,
(loadFileType == LoadFileType.OVERWRITE_EXISTING), newFiles);
}
}
@@ -1891,6 +1900,38 @@ public class Hive {
}
}
+ /**
+ * Load Data commands for fullAcid tables write to base_x (if there is overwrite clause) or
+ * delta_x_x directory - same as any other Acid write. This method modifies the destPath to add
+ * this path component.
+ * @param txnId - id of current transaction (in which this operation is running)
+ * @param stmtId - see {@link DbTxnManager#getWriteIdAndIncrement()}
+ * @return appropriately modified path
+ */
+ private Path fixFullAcidPathForLoadData(LoadFileType loadFileType, Path destPath, long txnId, int stmtId, Table tbl) throws HiveException {
+ switch (loadFileType) {
+ case REPLACE_ALL:
+ destPath = new Path(destPath, AcidUtils.baseDir(txnId));
+ break;
+ case KEEP_EXISTING:
+ destPath = new Path(destPath, AcidUtils.deltaSubdir(txnId, txnId, stmtId));
+ break;
+ case OVERWRITE_EXISTING:
+ //should not happen here - this is for replication
+ default:
+ throw new IllegalArgumentException("Unexpected " + LoadFileType.class.getName() + " " + loadFileType);
+ }
+ try {
+ FileSystem fs = tbl.getDataLocation().getFileSystem(SessionState.getSessionConf());
+ if(!FileUtils.mkdir(fs, destPath, conf)) {
+ LOG.warn(destPath + " already exists?!?!");
+ }
+ AcidUtils.MetaDataFile.createMetaFile(destPath, fs, true);
+ } catch (IOException e) {
+ throw new HiveException("load: error while creating " + destPath + ";loadFileType=" + loadFileType, e);
+ }
+ return destPath;
+ }
private boolean areEventsForDmlNeeded(Table tbl, Partition oldPart) {
return conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary() && oldPart != null;
@@ -2125,7 +2166,6 @@ private void constructOneLBLocationMap(FileStatus fSta,
* @param partSpec
* @param loadFileType
* @param numDP number of dynamic partitions
- * @param listBucketingEnabled
* @param isAcid true if this is an ACID operation
* @param txnId txnId, can be 0 unless isAcid == true
* @return partition map details (PartitionSpec and Partition)
@@ -2273,14 +2313,16 @@ private void constructOneLBLocationMap(FileStatus fSta,
* if list bucketing enabled
* @param hasFollowingStatsTask
* if there is any following stats task
- * @param isAcid true if this is an ACID based write
+ * @param isAcidIUDoperation true if this is an ACID based Insert [overwrite]/update/delete
*/
public void loadTable(Path loadPath, String tableName, LoadFileType loadFileType, boolean isSrcLocal,
- boolean isSkewedStoreAsSubdir, boolean isAcid, boolean hasFollowingStatsTask,
- Long txnId, int stmtId, boolean isMmTable) throws HiveException {
-
+ boolean isSkewedStoreAsSubdir, boolean isAcidIUDoperation, boolean hasFollowingStatsTask,
+ Long txnId, int stmtId) throws HiveException {
List<Path> newFiles = null;
Table tbl = getTable(tableName);
+ assert tbl.getPath() != null : "null==getPath() for " + tbl.getTableName();
+ boolean isMmTable = AcidUtils.isInsertOnlyTable(tbl);
+ boolean isFullAcidTable = AcidUtils.isFullAcidTable(tbl);
HiveConf sessionConf = SessionState.getSessionConf();
if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary()) {
newFiles = Collections.synchronizedList(new ArrayList<Path>());
@@ -2298,24 +2340,31 @@ private void constructOneLBLocationMap(FileStatus fSta,
newFiles = listFilesCreatedByQuery(loadPath, txnId, stmtId);
} else {
// Either a non-MM query, or a load into MM table from an external source.
- Path tblPath = tbl.getPath(), destPath = tblPath;
+ Path tblPath = tbl.getPath();
+ Path destPath = tblPath;
PathFilter filter = FileUtils.HIDDEN_FILES_PATH_FILTER;
if (isMmTable) {
+ assert !isAcidIUDoperation;
// We will load into MM directory, and delete from the parent if needed.
destPath = new Path(destPath, AcidUtils.deltaSubdir(txnId, txnId, stmtId));
filter = loadFileType == LoadFileType.REPLACE_ALL
? new JavaUtils.IdPathFilter(txnId, stmtId, false, true) : filter;
}
+ else if(!isAcidIUDoperation && isFullAcidTable) {
+ destPath = fixFullAcidPathForLoadData(loadFileType, destPath, txnId, stmtId, tbl);
+ }
Utilities.FILE_OP_LOGGER.debug("moving " + loadPath + " to " + tblPath
+ " (replace = " + loadFileType + ")");
- if (loadFileType == LoadFileType.REPLACE_ALL) {
+ if (loadFileType == LoadFileType.REPLACE_ALL && !isFullAcidTable) {
+ //for fullAcid we don't want to delete any files even for OVERWRITE see HIVE-14988/HIVE-17361
+ //todo: should probably do the same for MM IOW
boolean isAutopurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge"));
replaceFiles(tblPath, loadPath, destPath, tblPath,
sessionConf, isSrcLocal, isAutopurge, newFiles, filter, isMmTable);
} else {
try {
FileSystem fs = tbl.getDataLocation().getFileSystem(sessionConf);
- copyFiles(sessionConf, loadPath, destPath, fs, isSrcLocal, isAcid,
+ copyFiles(sessionConf, loadPath, destPath, fs, isSrcLocal, isAcidIUDoperation,
loadFileType == LoadFileType.OVERWRITE_EXISTING, newFiles);
} catch (IOException e) {
throw new HiveException("addFiles: filesystem error in check phase", e);
@@ -2358,7 +2407,6 @@ private void constructOneLBLocationMap(FileStatus fSta,
fireInsertEvent(tbl, null, (loadFileType == LoadFileType.REPLACE_ALL), newFiles);
}
-
/**
* Creates a partition.
*
http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index cd75130..a1b6cda 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@ -391,7 +391,6 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
LoadTableDesc loadTableWork = new LoadTableDesc(destPath,
Utilities.getTableDesc(table), new TreeMap<>(),
replace ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING, txnId);
- loadTableWork.setTxnId(txnId);
loadTableWork.setStmtId(stmtId);
MoveWork mv = new MoveWork(x.getInputs(), x.getOutputs(), loadTableWork, null, false, SessionState.get().getLineageState());
Task<?> loadTableTask = TaskFactory.get(mv, x.getConf());
@@ -400,6 +399,10 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
return loadTableTask;
}
+ /**
+ * todo: this is odd: transactions are opened for all statements. what is this supposed to check?
+ */
+ @Deprecated
private static boolean isAcid(Long txnId) {
return (txnId != null) && (txnId != 0);
}
@@ -490,7 +493,6 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
partSpec.getPartSpec(),
replicationSpec.isReplace() ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING,
txnId);
- loadTableWork.setTxnId(txnId);
loadTableWork.setStmtId(stmtId);
loadTableWork.setInheritTableSpecs(false);
Task<?> loadPartTask = TaskFactory.get(new MoveWork(
http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
index 238fbd6..cc956da 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
@@ -136,7 +136,7 @@ public class LoadSemanticAnalyzer extends BaseSemanticAnalyzer {
}
private List<FileStatus> applyConstraintsAndGetFiles(URI fromURI, Tree ast,
- boolean isLocal) throws SemanticException {
+ boolean isLocal, Table table) throws SemanticException {
FileStatus[] srcs = null;
@@ -159,6 +159,14 @@ public class LoadSemanticAnalyzer extends BaseSemanticAnalyzer {
throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(ast,
"source contains directory: " + oneSrc.getPath().toString()));
}
+ if(AcidUtils.isFullAcidTable(table)) {
+ if(!AcidUtils.originalBucketFilter.accept(oneSrc.getPath())) {
+ //acid files (e.g. bucket_0000) have ROW_ID embedded in them and so can't be simply
+ //copied to a table so only allow non-acid files for now
+ throw new SemanticException(ErrorMsg.ACID_LOAD_DATA_INVALID_FILE_NAME,
+ oneSrc.getPath().getName(), table.getDbName() + "." + table.getTableName());
+ }
+ }
}
} catch (IOException e) {
// Has to use full name to make sure it does not conflict with
@@ -230,11 +238,8 @@ public class LoadSemanticAnalyzer extends BaseSemanticAnalyzer {
}
}
- if(AcidUtils.isAcidTable(ts.tableHandle) && !AcidUtils.isInsertOnlyTable(ts.tableHandle.getParameters())) {
- throw new SemanticException(ErrorMsg.LOAD_DATA_ON_ACID_TABLE, ts.tableHandle.getCompleteName());
- }
// make sure the arguments make sense
- List<FileStatus> files = applyConstraintsAndGetFiles(fromURI, fromTree, isLocal);
+ List<FileStatus> files = applyConstraintsAndGetFiles(fromURI, fromTree, isLocal, ts.tableHandle);
// for managed tables, make sure the file formats match
if (TableType.MANAGED_TABLE.equals(ts.tableHandle.getTableType())
@@ -277,17 +282,16 @@ public class LoadSemanticAnalyzer extends BaseSemanticAnalyzer {
}
Long txnId = null;
- int stmtId = 0;
- Table tbl = ts.tableHandle;
- if (AcidUtils.isInsertOnlyTable(tbl.getParameters())) {
+ int stmtId = -1;
+ if (AcidUtils.isAcidTable(ts.tableHandle)) {
txnId = SessionState.get().getTxnMgr().getCurrentTxnId();
+ stmtId = SessionState.get().getTxnMgr().getWriteIdAndIncrement();
}
LoadTableDesc loadTableWork;
loadTableWork = new LoadTableDesc(new Path(fromURI),
Utilities.getTableDesc(ts.tableHandle), partSpec,
isOverWrite ? LoadFileType.REPLACE_ALL : LoadFileType.KEEP_EXISTING, txnId);
- loadTableWork.setTxnId(txnId);
loadTableWork.setStmtId(stmtId);
if (preservePartitionSpecs){
// Note : preservePartitionSpecs=true implies inheritTableSpecs=false but
http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
index 1fa7b40..4683c9c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
@@ -47,9 +47,22 @@ public class LoadTableDesc extends LoadDesc implements Serializable {
private Map<String, String> partitionSpec; // NOTE: this partitionSpec has to be ordered map
public enum LoadFileType {
- REPLACE_ALL, // Remove all existing data before copy/move
- KEEP_EXISTING, // If any file exist while copy, then just duplicate the file
- OVERWRITE_EXISTING // If any file exist while copy, then just overwrite the file
+ /**
+ * This corresponds to INSERT OVERWRITE and REPL LOAD for INSERT OVERWRITE event.
+ * Remove all existing data before copy/move
+ */
+ REPLACE_ALL,
+ /**
+ * This corresponds to INSERT INTO and LOAD DATA.
+ * If any file exist while copy, then just duplicate the file
+ */
+ KEEP_EXISTING,
+ /**
+ * This corresponds to REPL LOAD where if we re-apply the same event then need to overwrite
+ * the file instead of making a duplicate copy.
+ * If any file exist while copy, then just overwrite the file
+ */
+ OVERWRITE_EXISTING
}
public LoadTableDesc(final LoadTableDesc o) {
super(o.getSourcePath(), o.getWriteType());
@@ -215,14 +228,10 @@ public class LoadTableDesc extends LoadDesc implements Serializable {
return currentTransactionId == null ? 0 : currentTransactionId;
}
- public void setTxnId(Long txnId) {
- this.currentTransactionId = txnId;
- }
-
public int getStmtId() {
return stmtId;
}
-
+ //todo: should this not be passed in the c'tor?
public void setStmtId(int stmtId) {
this.stmtId = stmtId;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index 7d4d379..a804527 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -37,7 +37,6 @@ import org.apache.hadoop.hive.common.StringableMap;
import org.apache.hadoop.hive.common.ValidCompactorTxnList;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.api.CompactionType;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
@@ -577,11 +576,16 @@ public class CompactorMR {
dir.getName().startsWith(AcidUtils.DELTA_PREFIX) ||
dir.getName().startsWith(AcidUtils.DELETE_DELTA_PREFIX)) {
boolean sawBase = dir.getName().startsWith(AcidUtils.BASE_PREFIX);
+ boolean isRawFormat = !dir.getName().startsWith(AcidUtils.DELETE_DELTA_PREFIX)
+ && AcidUtils.MetaDataFile.isRawFormat(dir, fs);//deltes can't be raw format
- FileStatus[] files = fs.listStatus(dir, AcidUtils.bucketFileFilter);
+ FileStatus[] files = fs.listStatus(dir, isRawFormat ? AcidUtils.originalBucketFilter
+ : AcidUtils.bucketFileFilter);
for(FileStatus f : files) {
// For each file, figure out which bucket it is.
- Matcher matcher = AcidUtils.BUCKET_DIGIT_PATTERN.matcher(f.getPath().getName());
+ Matcher matcher = isRawFormat ?
+ AcidUtils.LEGACY_BUCKET_DIGIT_PATTERN.matcher(f.getPath().getName())
+ : AcidUtils.BUCKET_DIGIT_PATTERN.matcher(f.getPath().getName());
addFileToMap(matcher, f.getPath(), sawBase, splitToBucketMap);
}
} else {
@@ -612,8 +616,12 @@ public class CompactorMR {
private void addFileToMap(Matcher matcher, Path file, boolean sawBase,
Map<Integer, BucketTracker> splitToBucketMap) {
if (!matcher.find()) {
- LOG.warn("Found a non-bucket file that we thought matched the bucket pattern! " +
- file.toString() + " Matcher=" + matcher.toString());
+ String msg = "Found a non-bucket file that we thought matched the bucket pattern! " +
+ file.toString() + " Matcher=" + matcher.toString();
+ LOG.error(msg);
+ //following matcher.group() would fail anyway and we don't want to skip files since that
+ //may be a data loss scenario
+ throw new IllegalArgumentException(msg);
}
int bucketNum = Integer.parseInt(matcher.group());
BucketTracker bt = splitToBucketMap.get(bucketNum);
http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
index 52257c4..319e0ee 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
@@ -344,7 +344,7 @@ public class TestTxnCommands extends TxnCommandsBaseForTests {
//this should fail because txn aborted due to timeout
CommandProcessorResponse cpr = runStatementOnDriverNegative("delete from " + Table.ACIDTBL + " where a = 5");
Assert.assertTrue("Actual: " + cpr.getErrorMessage(), cpr.getErrorMessage().contains("Transaction manager has aborted the transaction txnid:1"));
-
+
//now test that we don't timeout locks we should not
//heartbeater should be running in the background every 1/2 second
hiveConf.setTimeVar(HiveConf.ConfVars.HIVE_TXN_TIMEOUT, 1, TimeUnit.SECONDS);
@@ -354,9 +354,9 @@ public class TestTxnCommands extends TxnCommandsBaseForTests {
runStatementOnDriver("start transaction");
runStatementOnDriver("select count(*) from " + Table.ACIDTBL + " where a = 17");
pause(750);
-
+
TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
-
+
//since there is txn open, we are heartbeating the txn not individual locks
GetOpenTxnsInfoResponse txnsInfoResponse = txnHandler.getOpenTxnsInfo();
Assert.assertEquals(2, txnsInfoResponse.getOpen_txns().size());
@@ -377,7 +377,7 @@ public class TestTxnCommands extends TxnCommandsBaseForTests {
//these 2 values are equal when TXN entry is made. Should never be equal after 1st heartbeat, which we
//expect to have happened by now since HIVE_TXN_TIMEOUT=1sec
Assert.assertNotEquals("Didn't see heartbeat happen", Long.parseLong(vals[0]), lastHeartbeat);
-
+
ShowLocksResponse slr = txnHandler.showLocks(new ShowLocksRequest());
TestDbTxnManager2.checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", Table.ACIDTBL.name, null, slr.getLocks());
pause(750);
@@ -525,7 +525,8 @@ public class TestTxnCommands extends TxnCommandsBaseForTests {
String stmt = "merge into target t using (" + teeCurMatch + ") s on t.key=s.key and t.cur=1 and s.`o/p\\n`=1 " +
"when matched then update set cur=0 " +
"when not matched then insert values(s.key,s.data,1)";
-
+ //to allow cross join from 'teeCurMatch'
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_STRICT_CHECKS_CARTESIAN, false);
runStatementOnDriver(stmt);
int[][] resultVals = {{1,5,0},{1,7,1},{1,18,0},{2,6,1},{3,8,1}};
List<String> r = runStatementOnDriver("select * from target order by key,data,cur");
@@ -569,7 +570,7 @@ public class TestTxnCommands extends TxnCommandsBaseForTests {
List<String> r = runStatementOnDriver("select * from target order by key,data,cur");
Assert.assertEquals(stringifyValues(resultVals), r);
}
-
+
@Test
public void testMergeOnTezEdges() throws Exception {
String query = "merge into " + Table.ACIDTBL +
http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index 17d976a..ab5f969 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -77,7 +77,7 @@ public class TestTxnCommands2 {
).getPath().replaceAll("\\\\", "/");
protected static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse";
//bucket count for test tables; set it to 1 for easier debugging
- protected static int BUCKET_COUNT = 2;
+ static int BUCKET_COUNT = 2;
@Rule
public TestName testName = new TestName();
@@ -117,12 +117,11 @@ public class TestTxnCommands2 {
setUpWithTableProperties("'transactional'='true'");
}
- protected void setUpWithTableProperties(String tableProperties) throws Exception {
+ void setUpWithTableProperties(String tableProperties) throws Exception {
hiveConf = new HiveConf(this.getClass());
hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, TEST_WAREHOUSE_DIR);
- hiveConf.setVar(HiveConf.ConfVars.HIVEMAPREDMODE, "nonstrict");
hiveConf.setVar(HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName());
hiveConf
.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
@@ -406,7 +405,7 @@ public class TestTxnCommands2 {
expectedException.expect(RuntimeException.class);
expectedException.expectMessage("TBLPROPERTIES with 'transactional_properties' cannot be altered after the table is created");
runStatementOnDriver("create table acidTblLegacy (a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')");
- runStatementOnDriver("alter table acidTblLegacy SET TBLPROPERTIES ('transactional_properties' = 'default')");
+ runStatementOnDriver("alter table acidTblLegacy SET TBLPROPERTIES ('transactional_properties' = 'insert_only')");
}
/**
* Test the query correctness and directory layout for ACID table conversion