You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2017/07/18 17:26:12 UTC
hive git commit: HIVE-17069 Refactor OrcRawRecrodMerger.ReaderPair
(Eugene Koifman, reviewed by Alan Gates)
Repository: hive
Updated Branches:
refs/heads/master 9807a560c -> 0534b9048
HIVE-17069 Refactor OrcRawRecrodMerger.ReaderPair (Eugene Koifman, reviewed by Alan Gates)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0534b904
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0534b904
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0534b904
Branch: refs/heads/master
Commit: 0534b9048e8d3de222bf41763f651f0946013c2d
Parents: 9807a56
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Tue Jul 18 10:25:56 2017 -0700
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Tue Jul 18 10:25:56 2017 -0700
----------------------------------------------------------------------
.../hive/ql/io/orc/OrcRawRecordMerger.java | 594 ++++++++++---------
.../hive/ql/io/orc/TestOrcRawRecordMerger.java | 91 ++-
2 files changed, 367 insertions(+), 318 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/0534b904/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
index 650f2af..814782a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.hive.ql.io.orc;
import java.io.IOException;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
@@ -54,7 +53,6 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
private static final Logger LOG = LoggerFactory.getLogger(OrcRawRecordMerger.class);
- private final Configuration conf;
private final boolean collapse;
private final RecordReader baseReader;
private final ObjectInspector objectInspector;
@@ -186,30 +184,37 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
currentTransactionId + ", statementId: "+ statementId + "}";
}
}
-
+ interface ReaderPair {
+ OrcStruct nextRecord();
+ int getColumns();
+ RecordReader getRecordReader();
+ Reader getReader();
+ RecordIdentifier getMinKey();
+ RecordIdentifier getMaxKey();
+ ReaderKey getKey();
+ void next(OrcStruct next) throws IOException;
+ }
/**
* A reader and the next record from that reader. The code reads ahead so that
* we can return the lowest ReaderKey from each of the readers. Thus, the
* next available row is nextRecord and only following records are still in
* the reader.
*/
- static class ReaderPair {
- OrcStruct nextRecord;
- final Reader reader;
- final RecordReader recordReader;
- final ReaderKey key;
+ @VisibleForTesting
+ final static class ReaderPairAcid implements ReaderPair {
+ private OrcStruct nextRecord;
+ private final Reader reader;
+ private final RecordReader recordReader;
+ private final ReaderKey key;
private final RecordIdentifier minKey;
private final RecordIdentifier maxKey;
- final int bucket;
private final int statementId;
- boolean advancedToMinKey = false;
/**
* Create a reader that reads from the first key larger than minKey to any
* keys equal to maxKey.
* @param key the key to read into
* @param reader the ORC file reader
- * @param bucket the bucket number for the file
* @param minKey only return keys larger than minKey if it is non-null
* @param maxKey only return keys less than or equal to maxKey if it is
* non-null
@@ -217,70 +222,71 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
* @param statementId id of SQL statement within a transaction
* @throws IOException
*/
- ReaderPair(ReaderKey key, Reader reader, int bucket,
- RecordIdentifier minKey, RecordIdentifier maxKey,
- ReaderImpl.Options options, int statementId) throws IOException {
+ @VisibleForTesting
+ ReaderPairAcid(ReaderKey key, Reader reader,
+ RecordIdentifier minKey, RecordIdentifier maxKey,
+ ReaderImpl.Options options, int statementId) throws IOException {
this.reader = reader;
this.key = key;
- this.minKey = minKey;
- this.maxKey = maxKey;
- this.bucket = bucket;
// TODO use stripe statistics to jump over stripes
recordReader = reader.rowsOptions(options);
this.statementId = statementId;
+ this.minKey = minKey;
+ this.maxKey = maxKey;
+ // advance the reader until we reach the minimum key
+ do {
+ next(nextRecord());
+ } while (nextRecord() != null &&
+ (minKey != null && key.compareRow(getMinKey()) <= 0));
+ }
+ @Override public final OrcStruct nextRecord() {
+ return nextRecord;
}
- RecordReader getRecordReader() {
+ @Override
+ public final int getColumns() {
+ return getReader().getTypes().get(OrcRecordUpdater.ROW + 1).getSubtypesCount();
+ }
+
+ @Override public RecordReader getRecordReader() {
return recordReader;
}
- /**
- * This must be called right after the constructor but not in the constructor to make sure
- * sub-classes are fully initialized before their {@link #next(OrcStruct)} is called
- */
- void advnaceToMinKey() throws IOException {
- advancedToMinKey = true;
- // advance the reader until we reach the minimum key
- do {
- next(nextRecord);
- } while (nextRecord != null &&
- (getMinKey() != null && key.compareRow(getMinKey()) <= 0));
+ @Override public Reader getReader() { return reader; }
+ @Override public RecordIdentifier getMinKey() {
+ return minKey;
+ }
+ @Override public RecordIdentifier getMaxKey() {
+ return maxKey;
+ }
+ @Override public ReaderKey getKey() {
+ return key;
}
- void next(OrcStruct next) throws IOException {
- assert advancedToMinKey : "advnaceToMinKey() was not called";
+ @Override
+ public void next(OrcStruct next) throws IOException {
if (getRecordReader().hasNext()) {
nextRecord = (OrcStruct) getRecordReader().next(next);
// set the key
- key.setValues(OrcRecordUpdater.getOriginalTransaction(nextRecord),
- OrcRecordUpdater.getBucket(nextRecord),
- OrcRecordUpdater.getRowId(nextRecord),
- OrcRecordUpdater.getCurrentTransaction(nextRecord),
+ getKey().setValues(OrcRecordUpdater.getOriginalTransaction(nextRecord()),
+ OrcRecordUpdater.getBucket(nextRecord()),
+ OrcRecordUpdater.getRowId(nextRecord()),
+ OrcRecordUpdater.getCurrentTransaction(nextRecord()),
statementId);
// if this record is larger than maxKey, we need to stop
- if (getMaxKey() != null && key.compareRow(getMaxKey()) > 0) {
- LOG.debug("key " + key + " > maxkey " + getMaxKey());
+ if (getMaxKey() != null && getKey().compareRow(getMaxKey()) > 0) {
+ LOG.debug("key " + getKey() + " > maxkey " + getMaxKey());
nextRecord = null;
getRecordReader().close();
}
} else {
nextRecord = null;
- recordReader.close();
+ getRecordReader().close();
}
}
-
- RecordIdentifier getMinKey() {
- return minKey;
- }
- RecordIdentifier getMaxKey() {
- return maxKey;
- }
- int getColumns() {
- return reader.getTypes().get(OrcRecordUpdater.ROW + 1).getSubtypesCount();
- }
}
/**
- * A reader that pretends an original base file is a new version base file.
+ * A reader that pretends an original base file is a new versioned base file.
* It wraps the underlying reader's row with an ACID event object and
* makes the relevant translations.
*
@@ -297,178 +303,38 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
* {@link OrcRawRecordMerger#minKey} and {@link OrcRawRecordMerger#maxKey} are computed for each
* split of the original file and used to filter rows from all the deltas. The ROW__ID.rowid for
* the rows of the 'original' file of course, must be assigned from the beginning of logical
- * bucket.
+ * bucket. The last split of the logical bucket, i.e. the split that has the end of last file,
+ * should include all insert events from deltas.
*/
- static final class OriginalReaderPair extends ReaderPair {
- private final Options mergerOptions;
- /**
- * Sum total of all rows in all the files before the 'current' one in {@link #originalFiles} list
- */
- private long rowIdOffset = 0;
- /**
- * See {@link AcidUtils.Directory#getOriginalFiles()}. This list has a fixed sort order. This
- * is the full list when compacting and empty when doing a simple read. The later is because we
- * only need to read the current split from 1 file for simple read.
- */
- private final List<HadoopShims.HdfsFileStatusWithId> originalFiles;
- /**
- * index into {@link #originalFiles}
- */
- private int nextFileIndex = 0;
- private long numRowsInCurrentFile = 0;
- private RecordReader originalFileRecordReader = null;
- private final Configuration conf;
- private final Reader.Options options;
- private final RecordIdentifier minKey;//shadow parent minKey to make final
- private final RecordIdentifier maxKey;//shadow parent maxKey to make final
-
- OriginalReaderPair(ReaderKey key, Reader reader, int bucket,
- final RecordIdentifier minKey, final RecordIdentifier maxKey,
- Reader.Options options, Options mergerOptions, Configuration conf,
- ValidTxnList validTxnList) throws IOException {
- super(key, reader, bucket, minKey, maxKey, options, 0);
- this.mergerOptions = mergerOptions;
- this.conf = conf;
- this.options = options;
- assert mergerOptions.getRootPath() != null : "Since we have original files";
- assert bucket >= 0 : "don't support non-bucketed tables yet";
+ private static abstract class OriginalReaderPair implements ReaderPair {
+ OrcStruct nextRecord;
+ private final ReaderKey key;
+ final int bucketId;
- RecordIdentifier newMinKey = minKey;
- RecordIdentifier newMaxKey = maxKey;
- if(mergerOptions.isCompacting()) {
- {
- //when compacting each split needs to process the whole logical bucket
- assert options.getOffset() == 0;
- assert options.getMaxOffset() == Long.MAX_VALUE;
- assert minKey == null;
- assert maxKey == null;
- }
- AcidUtils.Directory directoryState = AcidUtils.getAcidState(
- mergerOptions.getRootPath(), conf, validTxnList, false, true);
- originalFiles = directoryState.getOriginalFiles();
- assert originalFiles.size() > 0;
- /**
- * when there are no copyN files, the {@link #recordReader} will be the the one and only
- * file for for 'bucket' but closing here makes flow cleaner and only happens once in the
- * life of the table. With copyN files, the caller may pass in any one of the copyN files.
- * This is less prone to bugs than expecting the reader to pass in a Reader for the 1st file
- * of a logical bucket.*/
- recordReader.close();
- reader = advanceToNextFile();//in case of Compaction, this is the 1st file of the current bucket
- if(reader == null) {
- //Compactor generated a split for a bucket that has no data?
- throw new IllegalStateException("No 'original' files found for bucketId=" + bucket +
- " in " + mergerOptions.getRootPath());
- }
- numRowsInCurrentFile = reader.getNumberOfRows();
- originalFileRecordReader = reader.rowsOptions(options);
- }
- else {
- /**
- * Logically each bucket consists of 0000_0, 0000_0_copy_1... 0000_0_copyN. etc We don't
- * know N a priori so if this is true, then the current split is from 0000_0_copyN file.
- * It's needed to correctly set maxKey. In particular, set maxKey==null if this split
- * is the tail of the last file for this logical bucket to include all deltas written after
- * non-acid to acid table conversion.
- */
- boolean isLastFileForThisBucket = false;
- boolean haveSeenCurrentFile = false;
- originalFiles = Collections.emptyList();
- if (mergerOptions.getCopyIndex() > 0) {
- //the split is from something other than the 1st file of the logical bucket - compute offset
-
- AcidUtils.Directory directoryState = AcidUtils.getAcidState(mergerOptions.getRootPath(),
- conf, validTxnList, false, true);
- for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) {
- AcidOutputFormat.Options bucketOptions =
- AcidUtils.parseBaseOrDeltaBucketFilename(f.getFileStatus().getPath(), conf);
- if (bucketOptions.getBucketId() != bucket) {
- continue;
- }
- if(haveSeenCurrentFile) {
- //if here we already saw current file and now found another file for the same bucket
- //so the current file is not the last file of the logical bucket
- isLastFileForThisBucket = false;
- break;
- }
- if(f.getFileStatus().getPath().equals(mergerOptions.getBucketPath())) {
- /**
- * found the file whence the current split is from so we're done
- * counting {@link rowIdOffset}
- */
- haveSeenCurrentFile = true;
- isLastFileForThisBucket = true;
- continue;
- }
- Reader copyReader = OrcFile.createReader(f.getFileStatus().getPath(),
- OrcFile.readerOptions(conf));
- rowIdOffset += copyReader.getNumberOfRows();
- }
- if (rowIdOffset > 0) {
- //rowIdOffset could be 0 if all files before current one are empty
- /**
- * Since we already done {@link OrcRawRecordMerger#discoverOriginalKeyBounds(Reader,
- * int, Reader.Options)} need to fix min/max key since these are used by
- * {@link #next(OrcStruct)} which uses {@link #rowIdOffset} to generate rowId for
- * the key. Clear? */
- if (minKey != null) {
- minKey.setRowId(minKey.getRowId() + rowIdOffset);
- }
- else {
- /**
- * If this is not the 1st file, set minKey 1 less than the start of current file
- * (Would not need to set minKey if we knew that there are no delta files)
- * {@link #advanceToMinKey()} needs this */
- newMinKey = new RecordIdentifier(0, bucket, rowIdOffset - 1);
- }
- if (maxKey != null) {
- maxKey.setRowId(maxKey.getRowId() + rowIdOffset);
- }
- }
- } else {
- isLastFileForThisBucket = true;
- AcidUtils.Directory directoryState = AcidUtils.getAcidState(mergerOptions.getRootPath(),
- conf, validTxnList, false, true);
- int numFilesInBucket= 0;
- for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) {
- AcidOutputFormat.Options bucketOptions =
- AcidUtils.parseBaseOrDeltaBucketFilename(f.getFileStatus().getPath(), conf);
- if (bucketOptions.getBucketId() == bucket) {
- numFilesInBucket++;
- if(numFilesInBucket > 1) {
- isLastFileForThisBucket = false;
- break;
- }
- }
- }
- }
- originalFileRecordReader = recordReader;
- if(!isLastFileForThisBucket && maxKey == null) {
- /*
- * If this is the last file for this bucket, maxKey == null means the split is the tail
- * of the file so we want to leave it blank to make sure any insert events in delta
- * files are included; Conversely, if it's not the last file, set the maxKey so that
- * events from deltas that don't modify anything in the current split are excluded*/
- newMaxKey = new RecordIdentifier(0, bucket,
- rowIdOffset + reader.getNumberOfRows() - 1);
- }
- }
- this.minKey = newMinKey;
- this.maxKey = newMaxKey;
- }
- @Override RecordReader getRecordReader() {
- return originalFileRecordReader;
+ OriginalReaderPair(ReaderKey key, int bucketId) throws IOException {
+ this.key = key;
+ this.bucketId = bucketId;
+ assert bucketId >= 0 : "don't support non-bucketed tables yet";
}
- @Override RecordIdentifier getMinKey() {
- return minKey;
+ @Override public final OrcStruct nextRecord() {
+ return nextRecord;
}
- @Override RecordIdentifier getMaxKey() {
- return maxKey;
+ @Override
+ public int getColumns() {
+ return getReader().getTypes().get(0).getSubtypesCount();
}
- private boolean nextFromCurrentFile(OrcStruct next) throws IOException {
- if (originalFileRecordReader.hasNext()) {
+ @Override
+ public final ReaderKey getKey() { return key; }
+ /**
+ * The cumulative number of row in all files of the logical bucket that precede the file
+ * represented by {@link #getRecordReader()}
+ */
+ abstract long getRowIdOffset();
+
+ final boolean nextFromCurrentFile(OrcStruct next) throws IOException {
+ if (getRecordReader().hasNext()) {
//RecordReader.getRowNumber() produces a file-global row number even with PPD
- long nextRowId = originalFileRecordReader.getRowNumber() + rowIdOffset;
+ long nextRowId = getRecordReader().getRowNumber() + getRowIdOffset();
// have to do initialization here, because the super's constructor
// calls next and thus we need to initialize before our constructor
// runs
@@ -476,17 +342,17 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
nextRecord = new OrcStruct(OrcRecordUpdater.FIELDS);
IntWritable operation =
new IntWritable(OrcRecordUpdater.INSERT_OPERATION);
- nextRecord.setFieldValue(OrcRecordUpdater.OPERATION, operation);
- nextRecord.setFieldValue(OrcRecordUpdater.CURRENT_TRANSACTION,
+ nextRecord().setFieldValue(OrcRecordUpdater.OPERATION, operation);
+ nextRecord().setFieldValue(OrcRecordUpdater.CURRENT_TRANSACTION,
new LongWritable(0));
- nextRecord.setFieldValue(OrcRecordUpdater.ORIGINAL_TRANSACTION,
+ nextRecord().setFieldValue(OrcRecordUpdater.ORIGINAL_TRANSACTION,
new LongWritable(0));
- nextRecord.setFieldValue(OrcRecordUpdater.BUCKET,
- new IntWritable(bucket));
- nextRecord.setFieldValue(OrcRecordUpdater.ROW_ID,
+ nextRecord().setFieldValue(OrcRecordUpdater.BUCKET,
+ new IntWritable(bucketId));
+ nextRecord().setFieldValue(OrcRecordUpdater.ROW_ID,
new LongWritable(nextRowId));
- nextRecord.setFieldValue(OrcRecordUpdater.ROW,
- originalFileRecordReader.next(null));
+ nextRecord().setFieldValue(OrcRecordUpdater.ROW,
+ getRecordReader().next(null));
} else {
nextRecord = next;
((IntWritable) next.getFieldValue(OrcRecordUpdater.OPERATION))
@@ -494,18 +360,18 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
((LongWritable) next.getFieldValue(OrcRecordUpdater.ORIGINAL_TRANSACTION))
.set(0);
((IntWritable) next.getFieldValue(OrcRecordUpdater.BUCKET))
- .set(bucket);
+ .set(bucketId);
((LongWritable) next.getFieldValue(OrcRecordUpdater.CURRENT_TRANSACTION))
.set(0);
((LongWritable) next.getFieldValue(OrcRecordUpdater.ROW_ID))
.set(nextRowId);
- nextRecord.setFieldValue(OrcRecordUpdater.ROW,
- originalFileRecordReader.next(OrcRecordUpdater.getRow(next)));
+ nextRecord().setFieldValue(OrcRecordUpdater.ROW,
+ getRecordReader().next(OrcRecordUpdater.getRow(next)));
}
- key.setValues(0L, bucket, nextRowId, 0L, 0);
- if (maxKey != null && key.compareRow(maxKey) > 0) {
+ key.setValues(0L, bucketId, nextRowId, 0L, 0);
+ if (getMaxKey() != null && key.compareRow(getMaxKey()) > 0) {
if (LOG.isDebugEnabled()) {
- LOG.debug("key " + key + " > maxkey " + maxKey);
+ LOG.debug("key " + key + " > maxkey " + getMaxKey());
}
return false;//reached End Of Split
}
@@ -513,9 +379,199 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
}
return false;//reached EndOfFile
}
+ }
+ @VisibleForTesting
+ final static class OriginalReaderPairToRead extends OriginalReaderPair {
+ private final long rowIdOffset;
+ private final Reader reader;
+ private final RecordReader recordReader;
+ private final RecordIdentifier minKey;
+ private final RecordIdentifier maxKey;
+
+ OriginalReaderPairToRead(ReaderKey key, Reader reader, int bucketId,
+ final RecordIdentifier minKey, final RecordIdentifier maxKey,
+ Reader.Options options, Options mergerOptions, Configuration conf,
+ ValidTxnList validTxnList) throws IOException {
+ super(key, bucketId);
+ this.reader = reader;
+ assert !mergerOptions.isCompacting();
+ assert mergerOptions.getRootPath() != null : "Since we have original files";
+
+ RecordIdentifier newMinKey = minKey;
+ RecordIdentifier newMaxKey = maxKey;
+ recordReader = reader.rowsOptions(options);
+ /**
+ * Logically each bucket consists of 0000_0, 0000_0_copy_1... 0000_0_copyN. etc We don't
+ * know N a priori so if this is true, then the current split is from 0000_0_copyN file.
+ * It's needed to correctly set maxKey. In particular, set maxKey==null if this split
+ * is the tail of the last file for this logical bucket to include all deltas written after
+ * non-acid to acid table conversion.
+ */
+ boolean isLastFileForThisBucket = false;
+ boolean haveSeenCurrentFile = false;
+ long rowIdOffsetTmp = 0;
+ if (mergerOptions.getCopyIndex() > 0) {
+ //the split is from something other than the 1st file of the logical bucket - compute offset
+
+ AcidUtils.Directory directoryState = AcidUtils.getAcidState(mergerOptions.getRootPath(),
+ conf, validTxnList, false, true);
+ for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) {
+ AcidOutputFormat.Options bucketOptions =
+ AcidUtils.parseBaseOrDeltaBucketFilename(f.getFileStatus().getPath(), conf);
+ if (bucketOptions.getBucketId() != bucketId) {
+ continue;
+ }
+ if (haveSeenCurrentFile) {
+ //if here we already saw current file and now found another file for the same bucket
+ //so the current file is not the last file of the logical bucket
+ isLastFileForThisBucket = false;
+ break;
+ }
+ if (f.getFileStatus().getPath().equals(mergerOptions.getBucketPath())) {
+ /**
+ * found the file whence the current split is from so we're done
+ * counting {@link rowIdOffset}
+ */
+ haveSeenCurrentFile = true;
+ isLastFileForThisBucket = true;
+ continue;
+ }
+ Reader copyReader = OrcFile.createReader(f.getFileStatus().getPath(),
+ OrcFile.readerOptions(conf));
+ rowIdOffsetTmp += copyReader.getNumberOfRows();
+ }
+ this.rowIdOffset = rowIdOffsetTmp;
+ if (rowIdOffset > 0) {
+ //rowIdOffset could be 0 if all files before current one are empty
+ /**
+ * Since we already done {@link OrcRawRecordMerger#discoverOriginalKeyBounds(Reader,
+ * int, Reader.Options)} need to fix min/max key since these are used by
+ * {@link #next(OrcStruct)} which uses {@link #rowIdOffset} to generate rowId for
+ * the key. Clear? */
+ if (minKey != null) {
+ minKey.setRowId(minKey.getRowId() + rowIdOffset);
+ } else {
+ /**
+ * If this is not the 1st file, set minKey 1 less than the start of current file
+ * (Would not need to set minKey if we knew that there are no delta files)
+ * {@link #advanceToMinKey()} needs this */
+ newMinKey = new RecordIdentifier(0, bucketId, rowIdOffset - 1);
+ }
+ if (maxKey != null) {
+ maxKey.setRowId(maxKey.getRowId() + rowIdOffset);
+ }
+ }
+ } else {
+ rowIdOffset = 0;
+ isLastFileForThisBucket = true;
+ AcidUtils.Directory directoryState = AcidUtils.getAcidState(mergerOptions.getRootPath(),
+ conf, validTxnList, false, true);
+ int numFilesInBucket = 0;
+ for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) {
+ AcidOutputFormat.Options bucketOptions =
+ AcidUtils.parseBaseOrDeltaBucketFilename(f.getFileStatus().getPath(), conf);
+ if (bucketOptions.getBucketId() == bucketId) {
+ numFilesInBucket++;
+ if (numFilesInBucket > 1) {
+ isLastFileForThisBucket = false;
+ break;
+ }
+ }
+ }
+ }
+ if (!isLastFileForThisBucket && maxKey == null) {
+ /*
+ * If this is the last file for this bucket, maxKey == null means the split is the tail
+ * of the file so we want to leave it blank to make sure any insert events in delta
+ * files are included; Conversely, if it's not the last file, set the maxKey so that
+ * events from deltas that don't modify anything in the current split are excluded*/
+ newMaxKey = new RecordIdentifier(0, bucketId,
+ rowIdOffset + reader.getNumberOfRows() - 1);
+ }
+ this.minKey = newMinKey;
+ this.maxKey = newMaxKey;
+
+ // advance the reader until we reach the minimum key
+ do {
+ next(nextRecord());
+ } while (nextRecord() != null &&
+ (getMinKey() != null && this.getKey().compareRow(getMinKey()) <= 0));
+ }
+ @Override public RecordReader getRecordReader() {
+ return recordReader;
+ }
+ @Override public Reader getReader() { return reader; }
+ @Override public RecordIdentifier getMinKey() { return minKey; }
+ @Override public RecordIdentifier getMaxKey() {
+ return maxKey;
+ }
+ @Override public long getRowIdOffset() { return rowIdOffset; }
+
+ @Override
+ public void next(OrcStruct next) throws IOException {
+ if(!nextFromCurrentFile(next)) {
+ //only have 1 file so done
+ nextRecord = null;
+ getRecordReader().close();
+ }
+ }
+ }
+ @VisibleForTesting
+ final static class OriginalReaderPairToCompact extends OriginalReaderPair {
+ /**
+ * See {@link AcidUtils.Directory#getOriginalFiles()}. This list has a fixed sort order.
+ * It includes all original files (for all buckets).
+ */
+ private final List<HadoopShims.HdfsFileStatusWithId> originalFiles;
+ /**
+ * index into {@link #originalFiles}
+ */
+ private int nextFileIndex = 0;
+ private Reader reader;
+ private RecordReader recordReader = null;
+ private final Configuration conf;
+ private final Reader.Options options;
+ private long rowIdOffset = 0;
+
+ OriginalReaderPairToCompact(ReaderKey key, int bucketId,
+ Reader.Options options, Options mergerOptions, Configuration conf,
+ ValidTxnList validTxnList) throws IOException {
+ super(key, bucketId);
+ assert mergerOptions.isCompacting() : "Should only be used for Compaction";
+ this.conf = conf;
+ this.options = options;
+ assert mergerOptions.getRootPath() != null : "Since we have original files";
+ assert this.bucketId >= 0 : "don't support non-bucketed tables yet";
+ //when compacting each split needs to process the whole logical bucket
+ assert options.getOffset() == 0;
+ assert options.getMaxOffset() == Long.MAX_VALUE;
+ AcidUtils.Directory directoryState = AcidUtils.getAcidState(
+ mergerOptions.getRootPath(), conf, validTxnList, false, true);
+ originalFiles = directoryState.getOriginalFiles();
+ assert originalFiles.size() > 0;
+ this.reader = advanceToNextFile();//in case of Compaction, this is the 1st file of the current bucket
+ if (reader == null) {
+ //Compactor generated a split for a bucket that has no data?
+ throw new IllegalStateException("No 'original' files found for bucketId=" + this.bucketId +
+ " in " + mergerOptions.getRootPath());
+ }
+ recordReader = getReader().rowsOptions(options);
+ next(nextRecord());//load 1st row
+ }
+ @Override public RecordReader getRecordReader() {
+ return recordReader;
+ }
+ @Override public Reader getReader() { return reader; }
+ @Override public RecordIdentifier getMinKey() {
+ return null;
+ }
+ @Override public RecordIdentifier getMaxKey() {
+ return null;
+ }
+ @Override public long getRowIdOffset() { return rowIdOffset; }
+
@Override
- void next(OrcStruct next) throws IOException {
- assert advancedToMinKey : "advnaceToMinKey() was not called";
+ public void next(OrcStruct next) throws IOException {
while(true) {
if(nextFromCurrentFile(next)) {
return;
@@ -523,19 +579,17 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
if (originalFiles.size() <= nextFileIndex) {
//no more original files to read
nextRecord = null;
- originalFileRecordReader.close();
+ recordReader.close();
return;
} else {
- assert mergerOptions.isCompacting() : "originalFiles.size() should be 0 when not compacting";
- rowIdOffset += numRowsInCurrentFile;
- originalFileRecordReader.close();
- Reader reader = advanceToNextFile();
+ rowIdOffset += reader.getNumberOfRows();
+ recordReader.close();
+ reader = advanceToNextFile();
if(reader == null) {
nextRecord = null;
return;
}
- numRowsInCurrentFile = reader.getNumberOfRows();
- originalFileRecordReader = reader.rowsOptions(options);
+ recordReader = reader.rowsOptions(options);
}
}
}
@@ -546,21 +600,19 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
*/
private Reader advanceToNextFile() throws IOException {
while(nextFileIndex < originalFiles.size()) {
- AcidOutputFormat.Options bucketOptions = AcidUtils.parseBaseOrDeltaBucketFilename(originalFiles.get(nextFileIndex).getFileStatus().getPath(), conf);
- if (bucketOptions.getBucketId() == bucket) {
+ AcidOutputFormat.Options bucketOptions = AcidUtils.parseBaseOrDeltaBucketFilename(
+ originalFiles.get(nextFileIndex).getFileStatus().getPath(), conf);
+ if (bucketOptions.getBucketId() == bucketId) {
break;
}
+ //the the bucket we care about here
nextFileIndex++;
}
if(originalFiles.size() <= nextFileIndex) {
return null;//no more files for current bucket
}
- return OrcFile.createReader(originalFiles.get(nextFileIndex++).getFileStatus().getPath(), OrcFile.readerOptions(conf));
- }
-
- @Override
- int getColumns() {
- return reader.getTypes().get(0).getSubtypesCount();
+ return OrcFile.createReader(originalFiles.get(nextFileIndex++).getFileStatus().
+ getPath(), OrcFile.readerOptions(conf));
}
}
@@ -569,8 +621,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
* {@link ReaderKey} ascending. The output of this Reader should a global order across these
* files. The root of this tree is always the next 'file' to read from.
*/
- private final TreeMap<ReaderKey, ReaderPair> readers =
- new TreeMap<ReaderKey, ReaderPair>();
+ private final TreeMap<ReaderKey, ReaderPair> readers = new TreeMap<>();
// The reader that currently has the lowest key.
private ReaderPair primary;
@@ -637,7 +688,8 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
}
/**
- * Find the key range for bucket files.
+ * Find the key range for the split (of the base). These are used to filter delta files since
+ * both are sorted by key.
* @param reader the reader
* @param options the options for reading with
* @throws IOException
@@ -681,7 +733,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
*/
static Reader.Options createEventOptions(Reader.Options options) {
Reader.Options result = options.clone();
- result.range(options.getOffset(), Long.MAX_VALUE);
+ //result.range(options.getOffset(), Long.MAX_VALUE);WTF?
result.include(options.getInclude());
// slide the column names down by 6 for the name array
@@ -757,7 +809,6 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
ValidTxnList validTxnList,
Reader.Options options,
Path[] deltaDirectory, Options mergerOptions) throws IOException {
- this.conf = conf;
this.collapse = collapseEvents;
this.offset = options.getOffset();
this.length = options.getLength();
@@ -788,18 +839,23 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
ReaderKey key = new ReaderKey();
if (isOriginal) {
options = options.clone();
- pair = new OriginalReaderPair(key, reader, bucket, keyInterval.getMinKey(), keyInterval.getMaxKey(),
- options, mergerOptions, conf, validTxnList);
+ if(mergerOptions.isCompacting()) {
+ pair = new OriginalReaderPairToCompact(key, bucket, options, mergerOptions,
+ conf, validTxnList);
+ }
+ else {
+ pair = new OriginalReaderPairToRead(key, reader, bucket, keyInterval.getMinKey(),
+ keyInterval.getMaxKey(), options, mergerOptions, conf, validTxnList);
+ }
} else {
- pair = new ReaderPair(key, reader, bucket, keyInterval.getMinKey(), keyInterval.getMaxKey(),
- eventOptions, 0);
+ pair = new ReaderPairAcid(key, reader, keyInterval.getMinKey(), keyInterval.getMaxKey(),
+ eventOptions, 0);
}
minKey = pair.getMinKey();
maxKey = pair.getMaxKey();
LOG.info("updated min key = " + keyInterval.getMinKey() + ", max key = " + keyInterval.getMaxKey());
- pair.advnaceToMinKey();
// if there is at least one record, put it in the map
- if (pair.nextRecord != null) {
+ if (pair.nextRecord() != null) {
readers.put(key, pair);
}
baseReader = pair.getRecordReader();
@@ -828,11 +884,9 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
deltaEventOptions = eventOptions.clone().searchArgument(null, null);
}
}
- ReaderPair deltaPair;
- deltaPair = new ReaderPair(key, deltaReader, bucket, minKey,
- maxKey, deltaEventOptions != null ? deltaEventOptions : eventOptions, deltaDir.getStatementId());
- deltaPair.advnaceToMinKey();
- if (deltaPair.nextRecord != null) {
+ ReaderPairAcid deltaPair = new ReaderPairAcid(key, deltaReader, minKey, maxKey,
+ deltaEventOptions != null ? deltaEventOptions : eventOptions, deltaDir.getStatementId());
+ if (deltaPair.nextRecord() != null) {
readers.put(key, deltaPair);
}
}
@@ -883,8 +937,8 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
while (keysSame && primary != null) {
// The primary's nextRecord is the next value to return
- OrcStruct current = primary.nextRecord;
- recordIdentifier.set(primary.key);
+ OrcStruct current = primary.nextRecord();
+ recordIdentifier.set(primary.getKey());
// Advance the primary reader to the next record
primary.next(extraValue);
@@ -895,12 +949,12 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
// now that the primary reader has advanced, we need to see if we
// continue to read it or move to the secondary.
- if (primary.nextRecord == null ||
- primary.key.compareTo(secondaryKey) > 0) {
+ if (primary.nextRecord() == null ||
+ primary.getKey().compareTo(secondaryKey) > 0) {
// if the primary isn't done, push it back into the readers
- if (primary.nextRecord != null) {
- readers.put(primary.key, primary);
+ if (primary.nextRecord() != null) {
+ readers.put(primary.getKey(), primary);
}
// update primary and secondaryKey
@@ -967,10 +1021,10 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
@Override
public void close() throws IOException {
if (primary != null) {
- primary.recordReader.close();
+ primary.getRecordReader().close();
}
for(ReaderPair pair: readers.values()) {
- pair.recordReader.close();
+ pair.getRecordReader().close();
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/0534b904/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
index 2406af5..ba8d675 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
@@ -36,7 +36,6 @@ import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.IOConstants;
import org.apache.hadoop.hive.ql.io.RecordIdentifier;
import org.apache.hadoop.hive.ql.io.RecordUpdater;
-import org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger.OriginalReaderPair;
import org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger.ReaderKey;
import org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger.ReaderPair;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -186,25 +185,24 @@ public class TestOrcRawRecordMerger {
Reader reader = createMockReader();
RecordIdentifier minKey = new RecordIdentifier(10, 20, 30);
RecordIdentifier maxKey = new RecordIdentifier(40, 50, 60);
- ReaderPair pair = new ReaderPair(key, reader, 20, minKey, maxKey,
+ ReaderPair pair = new OrcRawRecordMerger.ReaderPairAcid(key, reader, minKey, maxKey,
new Reader.Options(), 0);
- pair.advnaceToMinKey();
- RecordReader recordReader = pair.recordReader;
+ RecordReader recordReader = pair.getRecordReader();
assertEquals(10, key.getTransactionId());
assertEquals(20, key.getBucketProperty());
assertEquals(40, key.getRowId());
assertEquals(120, key.getCurrentTransactionId());
- assertEquals("third", value(pair.nextRecord));
+ assertEquals("third", value(pair.nextRecord()));
- pair.next(pair.nextRecord);
+ pair.next(pair.nextRecord());
assertEquals(40, key.getTransactionId());
assertEquals(50, key.getBucketProperty());
assertEquals(60, key.getRowId());
assertEquals(130, key.getCurrentTransactionId());
- assertEquals("fourth", value(pair.nextRecord));
+ assertEquals("fourth", value(pair.nextRecord()));
- pair.next(pair.nextRecord);
- assertEquals(null, pair.nextRecord);
+ pair.next(pair.nextRecord());
+ assertEquals(null, pair.nextRecord());
Mockito.verify(recordReader).close();
}
@@ -213,46 +211,45 @@ public class TestOrcRawRecordMerger {
ReaderKey key = new ReaderKey();
Reader reader = createMockReader();
- ReaderPair pair = new ReaderPair(key, reader, 20, null, null,
+ ReaderPair pair = new OrcRawRecordMerger.ReaderPairAcid(key, reader, null, null,
new Reader.Options(), 0);
- pair.advnaceToMinKey();
- RecordReader recordReader = pair.recordReader;
+ RecordReader recordReader = pair.getRecordReader();
assertEquals(10, key.getTransactionId());
assertEquals(20, key.getBucketProperty());
assertEquals(20, key.getRowId());
assertEquals(100, key.getCurrentTransactionId());
- assertEquals("first", value(pair.nextRecord));
+ assertEquals("first", value(pair.nextRecord()));
- pair.next(pair.nextRecord);
+ pair.next(pair.nextRecord());
assertEquals(10, key.getTransactionId());
assertEquals(20, key.getBucketProperty());
assertEquals(30, key.getRowId());
assertEquals(110, key.getCurrentTransactionId());
- assertEquals("second", value(pair.nextRecord));
+ assertEquals("second", value(pair.nextRecord()));
- pair.next(pair.nextRecord);
+ pair.next(pair.nextRecord());
assertEquals(10, key.getTransactionId());
assertEquals(20, key.getBucketProperty());
assertEquals(40, key.getRowId());
assertEquals(120, key.getCurrentTransactionId());
- assertEquals("third", value(pair.nextRecord));
+ assertEquals("third", value(pair.nextRecord()));
- pair.next(pair.nextRecord);
+ pair.next(pair.nextRecord());
assertEquals(40, key.getTransactionId());
assertEquals(50, key.getBucketProperty());
assertEquals(60, key.getRowId());
assertEquals(130, key.getCurrentTransactionId());
- assertEquals("fourth", value(pair.nextRecord));
+ assertEquals("fourth", value(pair.nextRecord()));
- pair.next(pair.nextRecord);
+ pair.next(pair.nextRecord());
assertEquals(40, key.getTransactionId());
assertEquals(50, key.getBucketProperty());
assertEquals(61, key.getRowId());
assertEquals(140, key.getCurrentTransactionId());
- assertEquals("fifth", value(pair.nextRecord));
+ assertEquals("fifth", value(pair.nextRecord()));
- pair.next(pair.nextRecord);
- assertEquals(null, pair.nextRecord);
+ pair.next(pair.nextRecord());
+ assertEquals(null, pair.nextRecord());
Mockito.verify(recordReader).close();
}
@@ -296,25 +293,24 @@ public class TestOrcRawRecordMerger {
Path root = new Path(tmpDir, "testOriginalReaderPair");
fs.makeQualified(root);
fs.create(root);
- ReaderPair pair = new OriginalReaderPair(key, reader, 10, minKey, maxKey,
+ ReaderPair pair = new OrcRawRecordMerger.OriginalReaderPairToRead(key, reader, 10, minKey, maxKey,
new Reader.Options().include(includes), new OrcRawRecordMerger.Options().rootPath(root), conf, new ValidReadTxnList());
- pair.advnaceToMinKey();
- RecordReader recordReader = pair.recordReader;
+ RecordReader recordReader = pair.getRecordReader();
assertEquals(0, key.getTransactionId());
assertEquals(10, key.getBucketProperty());
assertEquals(2, key.getRowId());
assertEquals(0, key.getCurrentTransactionId());
- assertEquals("third", value(pair.nextRecord));
+ assertEquals("third", value(pair.nextRecord()));
- pair.next(pair.nextRecord);
+ pair.next(pair.nextRecord());
assertEquals(0, key.getTransactionId());
assertEquals(10, key.getBucketProperty());
assertEquals(3, key.getRowId());
assertEquals(0, key.getCurrentTransactionId());
- assertEquals("fourth", value(pair.nextRecord));
+ assertEquals("fourth", value(pair.nextRecord()));
- pair.next(pair.nextRecord);
- assertEquals(null, pair.nextRecord);
+ pair.next(pair.nextRecord());
+ assertEquals(null, pair.nextRecord());
Mockito.verify(recordReader).close();
}
@@ -331,46 +327,45 @@ public class TestOrcRawRecordMerger {
Path root = new Path(tmpDir, "testOriginalReaderPairNoMin");
fs.makeQualified(root);
fs.create(root);
- ReaderPair pair = new OriginalReaderPair(key, reader, 10, null, null,
+ ReaderPair pair = new OrcRawRecordMerger.OriginalReaderPairToRead(key, reader, 10, null, null,
new Reader.Options(), new OrcRawRecordMerger.Options().rootPath(root), conf, new ValidReadTxnList());
- pair.advnaceToMinKey();
- assertEquals("first", value(pair.nextRecord));
+ assertEquals("first", value(pair.nextRecord()));
assertEquals(0, key.getTransactionId());
assertEquals(10, key.getBucketProperty());
assertEquals(0, key.getRowId());
assertEquals(0, key.getCurrentTransactionId());
- pair.next(pair.nextRecord);
- assertEquals("second", value(pair.nextRecord));
+ pair.next(pair.nextRecord());
+ assertEquals("second", value(pair.nextRecord()));
assertEquals(0, key.getTransactionId());
assertEquals(10, key.getBucketProperty());
assertEquals(1, key.getRowId());
assertEquals(0, key.getCurrentTransactionId());
- pair.next(pair.nextRecord);
- assertEquals("third", value(pair.nextRecord));
+ pair.next(pair.nextRecord());
+ assertEquals("third", value(pair.nextRecord()));
assertEquals(0, key.getTransactionId());
assertEquals(10, key.getBucketProperty());
assertEquals(2, key.getRowId());
assertEquals(0, key.getCurrentTransactionId());
- pair.next(pair.nextRecord);
- assertEquals("fourth", value(pair.nextRecord));
+ pair.next(pair.nextRecord());
+ assertEquals("fourth", value(pair.nextRecord()));
assertEquals(0, key.getTransactionId());
assertEquals(10, key.getBucketProperty());
assertEquals(3, key.getRowId());
assertEquals(0, key.getCurrentTransactionId());
- pair.next(pair.nextRecord);
- assertEquals("fifth", value(pair.nextRecord));
+ pair.next(pair.nextRecord());
+ assertEquals("fifth", value(pair.nextRecord()));
assertEquals(0, key.getTransactionId());
assertEquals(10, key.getBucketProperty());
assertEquals(4, key.getRowId());
assertEquals(0, key.getCurrentTransactionId());
- pair.next(pair.nextRecord);
- assertEquals(null, pair.nextRecord);
- Mockito.verify(pair.recordReader).close();
+ pair.next(pair.nextRecord());
+ assertEquals(null, pair.nextRecord());
+ Mockito.verify(pair.getRecordReader()).close();
}
@Test
@@ -437,11 +432,11 @@ public class TestOrcRawRecordMerger {
OrcRawRecordMerger merger = new OrcRawRecordMerger(conf, false, reader,
false, 10, createMaximalTxnList(),
new Reader.Options().range(1000, 1000), null, new OrcRawRecordMerger.Options());
- RecordReader rr = merger.getCurrentReader().recordReader;
+ RecordReader rr = merger.getCurrentReader().getRecordReader();
assertEquals(0, merger.getOtherReaders().size());
- assertEquals(new RecordIdentifier(10, 20, 30), merger.getMinKey());
- assertEquals(new RecordIdentifier(40, 50, 60), merger.getMaxKey());
+ assertEquals("" + merger.getMinKey(),new RecordIdentifier(10, 20, 30), merger.getMinKey());
+ assertEquals("" + merger.getMaxKey(), new RecordIdentifier(40, 50, 60), merger.getMaxKey());
RecordIdentifier id = merger.createKey();
OrcStruct event = merger.createValue();