You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2018/06/09 16:30:30 UTC
hive git commit: OrcRawRecordMerger doesn't work for more than one
file in non vectorized case (Sergey Shelukhin, reviewed by Eugene Koifman)
Repository: hive
Updated Branches:
refs/heads/branch-3 1c5ce3f61 -> 7981d38cb
OrcRawRecordMerger doesn't work for more than one file in non vectorized case (Sergey Shelukhin, reviewed by Eugene Koifman)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/7981d38c
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/7981d38c
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/7981d38c
Branch: refs/heads/branch-3
Commit: 7981d38cbf6ef43384f5fb51b1560d7cf5add729
Parents: 1c5ce3f
Author: sergey <se...@apache.org>
Authored: Sat Jun 9 09:10:15 2018 -0700
Committer: Eugene Koifman <ek...@apache.org>
Committed: Sat Jun 9 09:10:15 2018 -0700
----------------------------------------------------------------------
.../hadoop/hive/ql/io/RecordIdentifier.java | 1 +
.../hadoop/hive/ql/io/orc/OrcInputFormat.java | 6 ++--
.../hive/ql/io/orc/OrcRawRecordMerger.java | 38 ++++++++++++++------
3 files changed, 33 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/7981d38c/ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java b/ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java
index 607abfd..ea7ba53 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java
@@ -226,6 +226,7 @@ public class RecordIdentifier implements WritableComparable<RecordIdentifier> {
return "{originalWriteId: " + writeId + ", " + bucketToString() + ", row: " + getRowId() +"}";
}
protected String bucketToString() {
+ if (bucketId == -1) return ("bucket: " + bucketId);
BucketCodec codec =
BucketCodec.determineVersion(bucketId);
return "bucket: " + bucketId + "(" + codec.getVersion() + "." +
http://git-wip-us.apache.org/repos/asf/hive/blob/7981d38c/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index 694cf75..732e233 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -2076,9 +2076,11 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
String txnString = conf.get(ValidWriteIdList.VALID_WRITEIDS_KEY);
ValidWriteIdList validWriteIdList
= (txnString == null) ? new ValidReaderWriteIdList() : new ValidReaderWriteIdList(txnString);
- LOG.debug("getReader:: Read ValidWriteIdList: " + validWriteIdList.toString()
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("getReader:: Read ValidWriteIdList: " + validWriteIdList.toString()
+ " isTransactionalTable: " + HiveConf.getBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN));
-
+ LOG.debug("Creating merger for {} and {}", split.getPath(), Arrays.toString(deltas));
+ }
final OrcRawRecordMerger records =
new OrcRawRecordMerger(conf, true, reader, split.isOriginal(), bucket,
validWriteIdList, readOptions, deltas, mergerOptions);
http://git-wip-us.apache.org/repos/asf/hive/blob/7981d38c/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
index 8c7c72e..9d954ca 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
@@ -249,6 +249,10 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
} while (nextRecord() != null &&
(minKey != null && key.compareRow(getMinKey()) <= 0));
}
+ @Override
+ public String toString() {
+ return "[key=" + key + ", nextRecord=" + nextRecord + ", reader=" + reader + "]";
+ }
@Override public final OrcStruct nextRecord() {
return nextRecord;
}
@@ -281,7 +285,6 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
OrcRecordUpdater.getRowId(nextRecord()),
OrcRecordUpdater.getCurrentTransaction(nextRecord()),
OrcRecordUpdater.getOperation(nextRecord()) == OrcRecordUpdater.DELETE_OPERATION);
-
// if this record is larger than maxKey, we need to stop
if (getMaxKey() != null && getKey().compareRow(getMaxKey()) > 0) {
LOG.debug("key " + getKey() + " > maxkey " + getMaxKey());
@@ -999,7 +1002,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
LOG.info("min key = " + keyInterval.getMinKey() + ", max key = " + keyInterval.getMaxKey());
// use the min/max instead of the byte range
ReaderPair pair = null;
- ReaderKey key = new ReaderKey();
+ ReaderKey baseKey = new ReaderKey();
if (isOriginal) {
options = options.clone();
if(mergerOptions.isCompacting()) {
@@ -1009,7 +1012,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
readerPairOptions = modifyForNonAcidSchemaRead(mergerOptions,
AcidUtils.parseBase(mergerOptions.getBaseDir()), mergerOptions.getBaseDir());
}
- pair = new OriginalReaderPairToCompact(key, bucket, options, readerPairOptions,
+ pair = new OriginalReaderPairToCompact(baseKey, bucket, options, readerPairOptions,
conf, validWriteIdList,
0);//0 since base_x doesn't have a suffix (neither does pre acid write)
} else {
@@ -1024,7 +1027,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
readerPairOptions = modifyForNonAcidSchemaRead(mergerOptions,
tfp.syntheticWriteId, tfp.folder);
}
- pair = new OriginalReaderPairToRead(key, reader, bucket, keyInterval.getMinKey(),
+ pair = new OriginalReaderPairToRead(baseKey, reader, bucket, keyInterval.getMinKey(),
keyInterval.getMaxKey(), options, readerPairOptions, conf, validWriteIdList, tfp.statementId);
}
} else {
@@ -1039,7 +1042,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
//doing major compaction - it's possible where full compliment of bucket files is not
//required (on Tez) that base_x/ doesn't have a file for 'bucket'
reader = OrcFile.createReader(bucketPath, OrcFile.readerOptions(conf));
- pair = new ReaderPairAcid(key, reader, keyInterval.getMinKey(), keyInterval.getMaxKey(),
+ pair = new ReaderPairAcid(baseKey, reader, keyInterval.getMinKey(), keyInterval.getMaxKey(),
eventOptions);
}
else {
@@ -1049,7 +1052,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
}
else {
assert reader != null : "no reader? " + mergerOptions.getRootPath();
- pair = new ReaderPairAcid(key, reader, keyInterval.getMinKey(), keyInterval.getMaxKey(),
+ pair = new ReaderPairAcid(baseKey, reader, keyInterval.getMinKey(), keyInterval.getMaxKey(),
eventOptions);
}
}
@@ -1058,7 +1061,8 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
LOG.info("updated min key = " + keyInterval.getMinKey() + ", max key = " + keyInterval.getMaxKey());
// if there is at least one record, put it in the map
if (pair.nextRecord() != null) {
- readers.put(key, pair);
+ ensurePutReader(baseKey, pair);
+ baseKey = null;
}
baseReader = pair.getRecordReader();
}
@@ -1088,7 +1092,8 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
ReaderPair deltaPair = new OriginalReaderPairToCompact(key, bucket, options,
rawCompactOptions, conf, validWriteIdList, deltaDir.getStatementId());
if (deltaPair.nextRecord() != null) {
- readers.put(key, deltaPair);
+ ensurePutReader(key, deltaPair);
+ key = new ReaderKey();
}
continue;
}
@@ -1101,6 +1106,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
*/
continue;
}
+ LOG.debug("Looking at delta file {}", deltaFile);
if(deltaDir.isDeleteDelta()) {
//if here it maybe compaction or regular read or Delete event sorter
//in the later 2 cases we should do:
@@ -1109,7 +1115,8 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
ReaderPair deltaPair = new ReaderPairAcid(key, deltaReader, minKey, maxKey,
deltaEventOptions);
if (deltaPair.nextRecord() != null) {
- readers.put(key, deltaPair);
+ ensurePutReader(key, deltaPair);
+ key = new ReaderKey();
}
continue;
}
@@ -1123,13 +1130,15 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
//must get statementId from file name since Acid 1.0 doesn't write it into bucketProperty
ReaderPairAcid deltaPair = new ReaderPairAcid(key, deltaReader, minKey, maxKey, deltaEventOptions);
if (deltaPair.nextRecord() != null) {
- readers.put(key, deltaPair);
+ ensurePutReader(key, deltaPair);
+ key = new ReaderKey();
}
}
}
}
// get the first record
+ LOG.debug("Final reader map {}", readers);
Map.Entry<ReaderKey, ReaderPair> entry = readers.pollFirstEntry();
if (entry == null) {
columns = 0;
@@ -1146,6 +1155,14 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
}
}
+ private void ensurePutReader(ReaderKey key, ReaderPair deltaPair) throws IOException {
+ ReaderPair oldPair = readers.put(key, deltaPair);
+ if (oldPair == null) return;
+ String error = "Two readers for " + key + ": new " + deltaPair + ", old " + oldPair;
+ LOG.error(error);
+ throw new IOException(error);
+ }
+
/**
* For use with Load Data statement which places {@link AcidUtils.AcidBaseFileType#ORIGINAL_BASE}
* type files into a base_x/ or delta_x_x. The data in these are then assigned ROW_IDs at read
@@ -1352,6 +1369,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
boolean isSameRow = prevKey.isSameRow((ReaderKey)recordIdentifier);
// if we are collapsing, figure out if this is a new row
if (collapse || isSameRow) {
+ // Note: for collapse == false, this just sets keysSame.
keysSame = (collapse && prevKey.compareRow(recordIdentifier) == 0) || (isSameRow);
if (!keysSame) {
prevKey.set(recordIdentifier);