You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pv...@apache.org on 2020/07/20 09:32:47 UTC
[hive] branch master updated: HIVE-23840: Use LLAP to get orc
metadata (Peter Vary reviewed by Adam Szita)
This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new fa086ec HIVE-23840: Use LLAP to get orc metadata (Peter Vary reviewed by Adam Szita)
fa086ec is described below
commit fa086ecce543384993a61d5154b14fa3b80df3b5
Author: pvary <pv...@cloudera.com>
AuthorDate: Mon Jul 20 11:32:29 2020 +0200
HIVE-23840: Use LLAP to get orc metadata (Peter Vary reviewed by Adam Szita)
Closes (#1251)
---
.../hive/llap/io/api/impl/LlapRecordReader.java | 2 +-
.../org/apache/hadoop/hive/llap/LlapHiveUtils.java | 7 +-
.../org/apache/hadoop/hive/ql/io/AcidUtils.java | 5 +
.../hadoop/hive/ql/io/orc/OrcRawRecordMerger.java | 4 +
.../hadoop/hive/ql/io/orc/OrcRecordUpdater.java | 29 ++-
.../org/apache/hadoop/hive/ql/io/orc/OrcSplit.java | 3 +-
.../ql/io/orc/VectorizedOrcAcidRowBatchReader.java | 255 ++++++++++++++-------
.../hive/ql/io/orc/TestOrcRawRecordMerger.java | 47 ++++
.../orc/TestVectorizedOrcAcidRowBatchReader.java | 63 ++++-
9 files changed, 321 insertions(+), 94 deletions(-)
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
index 417a42a..c148dd4 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
@@ -191,7 +191,7 @@ class LlapRecordReader implements RecordReader<NullWritable, VectorizedRowBatch>
if (isAcidScan) {
OrcSplit orcSplit = (OrcSplit) split;
this.acidReader = new VectorizedOrcAcidRowBatchReader(
- orcSplit, jobConf, Reporter.NULL, null, rbCtx, true);
+ orcSplit, jobConf, Reporter.NULL, null, rbCtx, true, mapWork);
isAcidFormat = !orcSplit.isOriginal();
} else {
isAcidFormat = false;
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapHiveUtils.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapHiveUtils.java
index 2779969..88834ea 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/LlapHiveUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapHiveUtils.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.io.CacheTag;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -105,7 +106,7 @@ public final class LlapHiveUtils {
* @return the MapWork instance. Might be null if missing.
* @throws HiveException
*/
- public static MapWork findMapWork(JobConf job) throws HiveException {
+ public static MapWork findMapWork(JobConf job) {
String inputName = job.get(Utilities.INPUT_NAME, null);
if (LOG.isDebugEnabled()) {
LOG.debug("Initializing for input " + inputName);
@@ -142,8 +143,8 @@ public final class LlapHiveUtils {
}
}
- public static boolean isLlapMode(HiveConf conf) {
- return "llap".equalsIgnoreCase(conf.getVar(HiveConf.ConfVars.HIVE_EXECUTION_MODE));
+ public static boolean isLlapMode(Configuration conf) {
+ return "llap".equalsIgnoreCase(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_MODE));
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index 85d8531..add20ec 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -505,15 +505,18 @@ public class AcidUtils {
if (ORIGINAL_PATTERN.matcher(filename).matches() || ORIGINAL_PATTERN_COPY.matcher(filename).matches()) {
long minWriteId = 0;
long maxWriteId = 0;
+ int statementId = -1;
if (bucketFile.getParent().getName().startsWith(DELTA_PREFIX)) {
ParsedDelta parsedDelta = parsedDelta(bucketFile.getParent(), false);
minWriteId = parsedDelta.getMinWriteId();
maxWriteId = parsedDelta.getMaxWriteId();
+ statementId = parsedDelta.getStatementId();
}
result
.setOldStyle(true)
.minimumWriteId(minWriteId)
.maximumWriteId(maxWriteId)
+ .statementId(statementId)
.bucket(bucket)
.writingBase(!bucketFile.getParent().getName().startsWith(DELTA_PREFIX));
}
@@ -531,6 +534,7 @@ public class AcidUtils {
.setOldStyle(false)
.minimumWriteId(parsedDelta.minWriteId)
.maximumWriteId(parsedDelta.maxWriteId)
+ .statementId(parsedDelta.statementId)
.bucket(bucket)
.attemptId(attemptId);
} else if (bucketFile.getParent().getName().startsWith(DELETE_DELTA_PREFIX)) {
@@ -539,6 +543,7 @@ public class AcidUtils {
.setOldStyle(false)
.minimumWriteId(parsedDelta.minWriteId)
.maximumWriteId(parsedDelta.maxWriteId)
+ .statementId(parsedDelta.statementId)
.bucket(bucket);
}
} else {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
index 6739a2a..ee31e6b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
@@ -695,6 +695,10 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
return Objects.hash(minKey, maxKey);
}
+ public boolean isIntersects(KeyInterval other) {
+ return (minKey == null || other.maxKey == null || minKey.compareTo(other.maxKey) <= 0) &&
+ (maxKey == null || other.minKey == null || maxKey.compareTo(other.minKey) >= 0);
+ }
}
/**
* Find the key range for original bucket files.
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
index d0c8006..6477c96 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
@@ -51,9 +51,11 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.orc.OrcConf;
+import org.apache.orc.OrcProto;
import org.apache.orc.TypeDescription;
import org.apache.orc.impl.AcidStats;
import org.apache.orc.impl.OrcAcidUtils;
+import org.apache.orc.impl.OrcTail;
import org.apache.orc.impl.SchemaEvolution;
import org.apache.orc.impl.WriterImpl;
import org.slf4j.Logger;
@@ -648,15 +650,28 @@ public class OrcRecordUpdater implements RecordUpdater {
}
static RecordIdentifier[] parseKeyIndex(Reader reader) {
- String[] stripes;
- try {
- if (!reader.hasMetadataValue(OrcRecordUpdater.ACID_KEY_INDEX_NAME)) {
- return null;
+ if (!reader.hasMetadataValue(OrcRecordUpdater.ACID_KEY_INDEX_NAME)) {
+ return null;
+ }
+
+ ByteBuffer val =
+ reader.getMetadataValue(OrcRecordUpdater.ACID_KEY_INDEX_NAME)
+ .duplicate();
+ return parseKeyIndex(val);
+ }
+
+ static RecordIdentifier[] parseKeyIndex(OrcTail orcTail) {
+ for(OrcProto.UserMetadataItem item: orcTail.getFooter().getMetadataList()) {
+ if (item.hasName() && item.getName().equals(OrcRecordUpdater.ACID_KEY_INDEX_NAME)) {
+ return parseKeyIndex(item.getValue().asReadOnlyByteBuffer().duplicate());
}
+ }
+ return null;
+ }
- ByteBuffer val =
- reader.getMetadataValue(OrcRecordUpdater.ACID_KEY_INDEX_NAME)
- .duplicate();
+ private static RecordIdentifier[] parseKeyIndex(ByteBuffer val) {
+ String[] stripes;
+ try {
CharsetDecoder utf8Decoder = UTF8.newDecoder();
stripes = utf8Decoder.decode(val).toString().split(";");
} catch (CharacterCodingException e) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
index e71dc7d..25da7a5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
@@ -346,8 +346,7 @@ public class OrcSplit extends FileSplit implements ColumnarSplit, LlapAwareSplit
OrcRawRecordMerger.TransactionMetaData.findWriteIDForSynthetcRowIDs(getPath(), rootPath, conf);
writeId = tmd.syntheticWriteId;
stmtId = tmd.statementId;
- AcidOutputFormat.Options opt = AcidUtils.parseBaseOrDeltaBucketFilename(getPath(), conf);
- bucketId = opt.getBucketId();
+ bucketId = AcidUtils.parseBucketId(getPath());
}
@Override
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
index 832ce19..d0cc983 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
@@ -25,8 +25,12 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
import org.apache.hadoop.hive.common.ValidWriteIdList;
+import org.apache.hadoop.hive.common.io.CacheTag;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.IllegalCacheConfigurationException;
+import org.apache.hadoop.hive.llap.LlapHiveUtils;
+import org.apache.hadoop.hive.llap.io.api.LlapProxy;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
@@ -42,6 +46,8 @@ import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.shims.HadoopShims;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.JobConf;
@@ -50,9 +56,10 @@ import org.apache.hive.common.util.Ref;
import org.apache.orc.ColumnStatistics;
import org.apache.orc.IntegerColumnStatistics;
import org.apache.orc.OrcConf;
+import org.apache.orc.OrcProto;
import org.apache.orc.StripeInformation;
-import org.apache.orc.StripeStatistics;
-import org.apache.orc.impl.OrcAcidUtils;
+import org.apache.orc.impl.ColumnStatisticsImpl;
+import org.apache.orc.impl.OrcTail;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -129,6 +136,16 @@ public class VectorizedOrcAcidRowBatchReader
*/
private SearchArgument deleteEventSarg = null;
+ /**
+ * Cachetag associated with the Split
+ */
+ private final CacheTag cacheTag;
+
+ /**
+ * Skip using Llap IO cache for checking delete_delta files if the configuration is not correct
+ */
+ private static boolean skipLlapCache = false;
+
//OrcInputFormat c'tor
VectorizedOrcAcidRowBatchReader(OrcSplit inputSplit, JobConf conf,
Reporter reporter) throws IOException {
@@ -138,7 +155,7 @@ public class VectorizedOrcAcidRowBatchReader
VectorizedOrcAcidRowBatchReader(OrcSplit inputSplit, JobConf conf,
Reporter reporter, VectorizedRowBatchCtx rbCtx) throws IOException {
this(conf, inputSplit, reporter,
- rbCtx == null ? Utilities.getVectorizedRowBatchCtx(conf) : rbCtx, false);
+ rbCtx == null ? Utilities.getVectorizedRowBatchCtx(conf) : rbCtx, false, null);
final Reader reader = OrcInputFormat.createOrcReaderForSplit(conf, inputSplit);
// Careful with the range here now, we do not want to read the whole base file like deltas.
@@ -189,15 +206,15 @@ public class VectorizedOrcAcidRowBatchReader
*/
public VectorizedOrcAcidRowBatchReader(OrcSplit inputSplit, JobConf conf, Reporter reporter,
org.apache.hadoop.mapred.RecordReader<NullWritable, VectorizedRowBatch> baseReader,
- VectorizedRowBatchCtx rbCtx, boolean isFlatPayload) throws IOException {
- this(conf, inputSplit, reporter, rbCtx, isFlatPayload);
+ VectorizedRowBatchCtx rbCtx, boolean isFlatPayload, MapWork mapWork) throws IOException {
+ this(conf, inputSplit, reporter, rbCtx, isFlatPayload, mapWork);
if (baseReader != null) {
setBaseAndInnerReader(baseReader);
}
}
private VectorizedOrcAcidRowBatchReader(JobConf conf, OrcSplit orcSplit, Reporter reporter,
- VectorizedRowBatchCtx rowBatchCtx, boolean isFlatPayload) throws IOException {
+ VectorizedRowBatchCtx rowBatchCtx, boolean isFlatPayload, MapWork mapWork) throws IOException {
this.isFlatPayload = isFlatPayload;
this.rbCtx = rowBatchCtx;
final boolean isAcidRead = AcidUtils.isFullAcidScan(conf);
@@ -232,6 +249,19 @@ public class VectorizedOrcAcidRowBatchReader
this.syntheticProps = orcSplit.getSyntheticAcidProps();
+ if (LlapHiveUtils.isLlapMode(conf) && LlapProxy.isDaemon()
+ && HiveConf.getBoolVar(conf, ConfVars.LLAP_TRACK_CACHE_USAGE))
+ {
+ if (mapWork == null) {
+ mapWork = LlapHiveUtils.findMapWork(conf);
+ }
+ PartitionDesc partitionDesc =
+ LlapHiveUtils.partitionDescForPath(orcSplit.getPath(), mapWork.getPathToPartitionInfo());
+ cacheTag = LlapHiveUtils.getDbAndTableNameForMetrics(orcSplit.getPath(), true, partitionDesc);
+ } else {
+ cacheTag = null;
+ }
+
// Clone readerOptions for deleteEvents.
Reader.Options deleteEventReaderOptions = readerOptions.clone();
// Set the range on the deleteEventReaderOptions to 0 to INTEGER_MAX because
@@ -244,7 +274,7 @@ public class VectorizedOrcAcidRowBatchReader
// See if we can load all the relevant delete events from all the
// delete deltas in memory...
der = new ColumnizedDeleteEventRegistry(conf, orcSplit,
- deleteEventReaderOptions, keyInterval);
+ deleteEventReaderOptions, keyInterval, cacheTag);
} catch (DeleteEventsOverflowMemoryException e) {
// If not, then create a set of hanging readers that do sort-merge to find the next smallest
// delete event on-demand. Caps the memory consumption to (some_const * no. of readers).
@@ -403,11 +433,7 @@ public class VectorizedOrcAcidRowBatchReader
return new OrcRawRecordMerger.KeyInterval(null, null);
}
- //todo: since we already have OrcSplit.orcTail, should somehow use it to
- // get the acid.index, stats, etc rather than fetching the footer again
- // though it seems that orcTail is mostly null....
- Reader reader = OrcFile.createReader(orcSplit.getPath(),
- OrcFile.readerOptions(conf));
+ OrcTail orcTail = getOrcTail(orcSplit.getPath(), conf, cacheTag).orcTail;
if(orcSplit.isOriginal()) {
/**
@@ -421,10 +447,10 @@ public class VectorizedOrcAcidRowBatchReader
* Reader.Options, Configuration, OrcRawRecordMerger.Options)}*/
LOG.debug("findMinMaxKeys(original split)");
- return findOriginalMinMaxKeys(orcSplit, reader, deleteEventReaderOptions);
+ return findOriginalMinMaxKeys(orcSplit, orcTail, deleteEventReaderOptions);
}
- List<StripeInformation> stripes = reader.getStripes();
+ List<StripeInformation> stripes = orcTail.getStripes();
final long splitStart = orcSplit.getStart();
final long splitEnd = splitStart + orcSplit.getLength();
int firstStripeIndex = -1;
@@ -480,7 +506,7 @@ public class VectorizedOrcAcidRowBatchReader
lastStripeIndex + ")");
return new OrcRawRecordMerger.KeyInterval(null, null);
}
- RecordIdentifier[] keyIndex = OrcRecordUpdater.parseKeyIndex(reader);
+ RecordIdentifier[] keyIndex = OrcRecordUpdater.parseKeyIndex(orcTail);
if(keyIndex == null) {
LOG.warn("Could not find keyIndex (" + firstStripeIndex + "," +
@@ -500,12 +526,12 @@ public class VectorizedOrcAcidRowBatchReader
* are actually computed. Streaming ingest used to set it 0 and Minor
* compaction so there are lots of legacy files with no (rather, bad)
* column stats*/
- boolean columnStatsPresent = reader.getRowIndexStride() > 0;
+ boolean columnStatsPresent = orcTail.getFooter().getRowIndexStride() > 0;
if(!columnStatsPresent) {
LOG.debug("findMinMaxKeys() No ORC column stats");
}
- List<StripeStatistics> stats = reader.getStripeStatistics();
+ List<OrcProto.StripeStatistics> stats = orcTail.getStripeStatisticsProto();
assert stripes.size() == stats.size() : "str.s=" + stripes.size() +
" sta.s=" + stats.size();
@@ -518,29 +544,7 @@ public class VectorizedOrcAcidRowBatchReader
}
else {
if(columnStatsPresent) {
- ColumnStatistics[] colStats =
- stats.get(firstStripeIndex).getColumnStatistics();
- /*
- Structure in data is like this:
- <op, owid, writerId, rowid, cwid, <f1, ... fn>>
- The +1 is to account for the top level struct which has a
- ColumnStatistics object in colsStats. Top level struct is normally
- dropped by the Reader (I guess because of orc.impl.SchemaEvolution)
- */
- IntegerColumnStatistics origWriteId = (IntegerColumnStatistics)
- colStats[OrcRecordUpdater.ORIGINAL_WRITEID + 1];
- IntegerColumnStatistics bucketProperty = (IntegerColumnStatistics)
- colStats[OrcRecordUpdater.BUCKET + 1];
- IntegerColumnStatistics rowId = (IntegerColumnStatistics)
- colStats[OrcRecordUpdater.ROW_ID + 1];
- //we may want to change bucketProperty from int to long in the
- // future(across the stack) this protects the following cast to int
- assert bucketProperty.getMinimum() <= Integer.MAX_VALUE :
- "was bucketProperty changed to a long (" +
- bucketProperty.getMinimum() + ")?!:" + orcSplit;
- //this a lower bound but not necessarily greatest lower bound
- minKey = new RecordIdentifier(origWriteId.getMinimum(),
- (int) bucketProperty.getMinimum(), rowId.getMinimum());
+ minKey = getKeyInterval(stats.get(firstStripeIndex).getColStatsList()).getMinKey();
}
}
@@ -550,22 +554,7 @@ public class VectorizedOrcAcidRowBatchReader
maxKey = keyIndex[lastStripeIndex];
} else {
if(columnStatsPresent) {
- ColumnStatistics[] colStats =
- stats.get(lastStripeIndex).getColumnStatistics();
- IntegerColumnStatistics origWriteId = (IntegerColumnStatistics)
- colStats[OrcRecordUpdater.ORIGINAL_WRITEID + 1];
- IntegerColumnStatistics bucketProperty = (IntegerColumnStatistics)
- colStats[OrcRecordUpdater.BUCKET + 1];
- IntegerColumnStatistics rowId = (IntegerColumnStatistics)
- colStats[OrcRecordUpdater.ROW_ID + 1];
-
- assert bucketProperty.getMaximum() <= Integer.MAX_VALUE :
- "was bucketProperty changed to a long (" +
- bucketProperty.getMaximum() + ")?!:" + orcSplit;
-
- // this is an upper bound but not necessarily the least upper bound
- maxKey = new RecordIdentifier(origWriteId.getMaximum(),
- (int) bucketProperty.getMaximum(), rowId.getMaximum());
+ maxKey = getKeyInterval(stats.get(firstStripeIndex).getColStatsList()).getMaxKey();
}
}
OrcRawRecordMerger.KeyInterval keyInterval =
@@ -593,23 +582,18 @@ public class VectorizedOrcAcidRowBatchReader
* writeId is the same in both cases
*/
for(int i = firstStripeIndex; i <= lastStripeIndex; i++) {
- ColumnStatistics[] colStats = stats.get(firstStripeIndex)
- .getColumnStatistics();
- IntegerColumnStatistics bucketProperty = (IntegerColumnStatistics)
- colStats[OrcRecordUpdater.BUCKET + 1];
- IntegerColumnStatistics rowId = (IntegerColumnStatistics)
- colStats[OrcRecordUpdater.ROW_ID + 1];
- if(bucketProperty.getMinimum() < minBucketProp) {
- minBucketProp = bucketProperty.getMinimum();
+ OrcRawRecordMerger.KeyInterval key = getKeyInterval(stats.get(i).getColStatsList());
+ if(key.getMinKey().getBucketProperty() < minBucketProp) {
+ minBucketProp = key.getMinKey().getBucketProperty();
}
- if(bucketProperty.getMaximum() > maxBucketProp) {
- maxBucketProp = bucketProperty.getMaximum();
+ if(key.getMaxKey().getBucketProperty() > maxBucketProp) {
+ maxBucketProp = key.getMaxKey().getBucketProperty();
}
- if(rowId.getMinimum() < minRowId) {
- minRowId = rowId.getMinimum();
+ if(key.getMinKey().getRowId() < minRowId) {
+ minRowId = key.getMinKey().getRowId();
}
- if(rowId.getMaximum() > maxRowId) {
- maxRowId = rowId.getMaximum();
+ if(key.getMaxKey().getRowId() > maxRowId) {
+ maxRowId = key.getMaxKey().getRowId();
}
}
}
@@ -623,7 +607,7 @@ public class VectorizedOrcAcidRowBatchReader
return keyInterval;
}
- private OrcRawRecordMerger.KeyInterval findOriginalMinMaxKeys(OrcSplit orcSplit, Reader reader,
+ private OrcRawRecordMerger.KeyInterval findOriginalMinMaxKeys(OrcSplit orcSplit, OrcTail orcTail,
Reader.Options deleteEventReaderOptions) {
// This method returns the minimum and maximum synthetic row ids that are present in this split
@@ -641,7 +625,7 @@ public class VectorizedOrcAcidRowBatchReader
long minRowId = syntheticProps.getRowIdOffset();
long maxRowId = syntheticProps.getRowIdOffset();
- for(StripeInformation stripe: reader.getStripes()) {
+ for(StripeInformation stripe: orcTail.getStripes()) {
if (splitStart > stripe.getOffset()) {
// This stripe starts before the current split starts. This stripe is not included in this split.
minRowId += stripe.getNumberOfRows();
@@ -685,6 +669,36 @@ public class VectorizedOrcAcidRowBatchReader
return keyIntervalTmp;
}
+ private static class ReaderData {
+ OrcTail orcTail;
+ Reader reader;
+ }
+
+ /**
+ * Gets the OrcTail from cache if LLAP IO is enabled, otherwise creates the reader to get the tail.
+ * If reader is created return that as well so we do not have recreate it if needed.
+ * @param path The Orc file path we want to get the OrcTail for
+ * @param conf The Configuration to access LLAP
+ * @param cacheTag The cacheTag needed to get OrcTail from LLAP IO cache
+ * @return ReaderData object where the orcTail is not null. Reader can be null, but if we had to create
+ * one we return that as well for further reuse.
+ */
+ private static ReaderData getOrcTail(Path path, Configuration conf, CacheTag cacheTag) throws IOException {
+ ReaderData readerData = new ReaderData();
+ if (!skipLlapCache && LlapHiveUtils.isLlapMode(conf) && LlapProxy.isDaemon()) {
+ try {
+ readerData.orcTail = LlapProxy.getIo().getOrcTailFromCache(path, conf, cacheTag);
+ } catch (IllegalCacheConfigurationException icce) {
+ LOG.warn("Cache is not usable. Please fix the configuration", icce);
+ skipLlapCache = true;
+ }
+ } else {
+ readerData.reader = OrcFile.createReader(path, OrcFile.readerOptions(conf));
+ readerData.orcTail = new OrcTail(readerData.reader.getFileTail(), readerData.reader.getSerializedFileFooter());
+ }
+ return readerData;
+ }
+
/**
* See {@link #next(NullWritable, VectorizedRowBatch)} first and
* {@link OrcRawRecordMerger.OriginalReaderPair}.
@@ -1543,7 +1557,7 @@ public class VectorizedOrcAcidRowBatchReader
ColumnizedDeleteEventRegistry(JobConf conf, OrcSplit orcSplit,
Reader.Options readerOptions,
- OrcRawRecordMerger.KeyInterval keyInterval)
+ OrcRawRecordMerger.KeyInterval keyInterval, CacheTag cacheTag)
throws IOException, DeleteEventsOverflowMemoryException {
this.testMode = conf.getBoolean(ConfVars.HIVE_IN_TEST.varname, false);
int bucket = AcidUtils.parseBucketId(orcSplit.getPath());
@@ -1562,20 +1576,31 @@ public class VectorizedOrcAcidRowBatchReader
try {
final Path[] deleteDeltaDirs = getDeleteDeltaDirsFromSplit(orcSplit);
if (deleteDeltaDirs.length > 0) {
+ AcidOutputFormat.Options orcSplitMinMaxWriteIds =
+ AcidUtils.parseBaseOrDeltaBucketFilename(orcSplit.getPath(), conf);
int totalDeleteEventCount = 0;
for (Path deleteDeltaDir : deleteDeltaDirs) {
- FileSystem fs = deleteDeltaDir.getFileSystem(conf);
+ if (!isQualifiedDeleteDeltaForSplit(orcSplitMinMaxWriteIds, deleteDeltaDir)) {
+ continue;
+ }
Path[] deleteDeltaFiles = OrcRawRecordMerger.getDeltaFiles(deleteDeltaDir, bucket,
new OrcRawRecordMerger.Options().isCompacting(false), null);
for (Path deleteDeltaFile : deleteDeltaFiles) {
try {
- /**
- * todo: we have OrcSplit.orcTail so we should be able to get stats from there
- */
- Reader deleteDeltaReader = OrcFile.createReader(deleteDeltaFile, OrcFile.readerOptions(conf));
- if (deleteDeltaReader.getNumberOfRows() <= 0) {
+ ReaderData readerData = getOrcTail(deleteDeltaFile, conf, cacheTag);
+ OrcTail orcTail = readerData.orcTail;
+ if (orcTail.getFooter().getNumberOfRows() <= 0) {
continue; // just a safe check to ensure that we are not reading empty delete files.
}
+ OrcRawRecordMerger.KeyInterval deleteKeyInterval = findDeleteMinMaxKeys(orcTail, deleteDeltaFile);
+ if (!deleteKeyInterval.isIntersects(keyInterval)) {
+ // If there is no intersection between data and delete delta, do not read delete file
+ continue;
+ }
+ // Reader can be reused if it was created before for getting orcTail: mostly for non-LLAP cache cases.
+ // For LLAP cases we need to create it here.
+ Reader deleteDeltaReader = readerData.reader != null ? readerData.reader :
+ OrcFile.createReader(deleteDeltaFile, OrcFile.readerOptions(conf));
totalDeleteEventCount += deleteDeltaReader.getNumberOfRows();
DeleteReaderValue deleteReaderValue = new DeleteReaderValue(deleteDeltaReader,
deleteDeltaFile, readerOptions, bucket, validWriteIdList, isBucketedTable, conf,
@@ -1601,6 +1626,48 @@ public class VectorizedOrcAcidRowBatchReader
throw e; // rethrow the exception so that the caller can handle.
}
}
+
+ private static OrcRawRecordMerger.KeyInterval findDeleteMinMaxKeys(OrcTail orcTail, Path path) {
+ boolean columnStatsPresent = orcTail.getFooter().getRowIndexStride() > 0;
+ if(!columnStatsPresent) {
+ LOG.debug("findMinMaxKeys() No ORC column stats");
+ return new OrcRawRecordMerger.KeyInterval(null, null);
+ }
+
+ return getKeyInterval(orcTail.getFooter().getStatisticsList());
+ }
+
+ /**
+ * Check if the delete delta folder needs to be scanned for a given split's min/max write ids.
+ *
+ * @param orcSplitMinMaxWriteIds
+ * @param deleteDeltaDir
+ * @return true when delete delta dir has to be scanned.
+ */
+ @VisibleForTesting
+ protected static boolean isQualifiedDeleteDeltaForSplit(AcidOutputFormat.Options orcSplitMinMaxWriteIds,
+ Path deleteDeltaDir)
+ {
+ AcidUtils.ParsedDelta deleteDelta = AcidUtils.parsedDelta(deleteDeltaDir, false);
+ // We allow equal writeIds so we are prepared for multi statement transactions.
+ // In this case we have to check the stmt id.
+ if (orcSplitMinMaxWriteIds.getMinimumWriteId() == deleteDelta.getMaxWriteId()) {
+ int orcSplitStmtId = orcSplitMinMaxWriteIds.getStatementId();
+ int deltaStmtId = deleteDelta.getStatementId();
+ // StatementId -1 and 0 is also used as the default one if it is not provided.
+ // Not brave enough to fix generally, so just fix here.
+ if (orcSplitStmtId == -1) {
+ orcSplitStmtId = 0;
+ }
+ if (deltaStmtId == -1) {
+ deltaStmtId = 0;
+ }
+ return orcSplitStmtId < deltaStmtId;
+ }
+ // For delta_0000012_0000014_0000, no need to read delete delta folders < 12.
+ return orcSplitMinMaxWriteIds.getMinimumWriteId() < deleteDelta.getMaxWriteId();
+ }
+
private void checkSize(int index) throws DeleteEventsOverflowMemoryException {
if(index > maxEventsInMemory) {
//check to prevent OOM errors
@@ -1790,4 +1857,34 @@ public class VectorizedOrcAcidRowBatchReader
SearchArgument getDeleteEventSarg() {
return deleteEventSarg;
}
+
+ private static IntegerColumnStatistics deserializeIntColumnStatistics(List<OrcProto.ColumnStatistics> colStats, int id) {
+ return (IntegerColumnStatistics) ColumnStatisticsImpl.deserialize(null, colStats.get(id));
+ }
+
+ /**
+ * Calculates the min/max record key.
+ * Structure in data is like this:
+ * <op, owid, writerId, rowid, cwid, <f1, ... fn>>
+ * The +1 is to account for the top level struct which has a
+ * ColumnStatistics object in colsStats. Top level struct is normally
+ * dropped by the Reader (I guess because of orc.impl.SchemaEvolution)
+ * @param colStats The statistics array
+ * @return The min record key
+ */
+ private static OrcRawRecordMerger.KeyInterval getKeyInterval(List<OrcProto.ColumnStatistics> colStats) {
+ IntegerColumnStatistics origWriteId = deserializeIntColumnStatistics(colStats, OrcRecordUpdater.ORIGINAL_WRITEID + 1);
+ IntegerColumnStatistics bucketProperty = deserializeIntColumnStatistics(colStats, OrcRecordUpdater.BUCKET + 1);
+ IntegerColumnStatistics rowId = deserializeIntColumnStatistics(colStats, OrcRecordUpdater.ROW_ID + 1);
+
+ // We may want to change bucketProperty from int to long in the future(across the stack) this protects
+ // the following cast to int
+ assert bucketProperty.getMaximum() <= Integer.MAX_VALUE :
+ "was bucketProperty (max) changed to a long (" + bucketProperty.getMaximum() + ")?!";
+ assert bucketProperty.getMinimum() <= Integer.MAX_VALUE :
+ "was bucketProperty (min) changed to a long (" + bucketProperty.getMaximum() + ")?!";
+ RecordIdentifier maxKey = new RecordIdentifier(origWriteId.getMaximum(), (int) bucketProperty.getMaximum(), rowId.getMaximum());
+ RecordIdentifier minKey = new RecordIdentifier(origWriteId.getMinimum(), (int) bucketProperty.getMinimum(), rowId.getMinimum());
+ return new OrcRawRecordMerger.KeyInterval(minKey, maxKey);
+ }
}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
index e6fae44..99d5fe6 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
@@ -74,6 +74,7 @@ import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;
@@ -115,6 +116,52 @@ public class TestOrcRawRecordMerger {
assertEquals(false, left.equals(ri));
}
+ @Test
+ public void testIntersect() {
+ OrcRawRecordMerger.KeyInterval ki1 = generateKeyInterval(1000L, 2000L);
+ OrcRawRecordMerger.KeyInterval ki2 = generateKeyInterval(1500L, 2500L);
+ checkIntersect(ki1, ki2, true);
+
+ ki2 = generateKeyInterval(500L, 1000L);
+ checkIntersect(ki1, ki2, true);
+
+ ki2 = generateKeyInterval(500L, 999L);
+ checkIntersect(ki1, ki2, false);
+
+ ki2 = generateKeyInterval(500L, null);
+ checkIntersect(ki1, ki2, true);
+
+ ki2 = generateKeyInterval(2500L, null);
+ checkIntersect(ki1, ki2, false);
+
+ ki2 = generateKeyInterval(null, null);
+ checkIntersect(ki1, ki2, true);
+ }
+
+ private static OrcRawRecordMerger.KeyInterval generateKeyInterval(Long minRowId, Long maxRowId) {
+ RecordIdentifier min = null;
+ if (minRowId != null) {
+ min = new RecordIdentifier(1, 100, minRowId);
+ }
+ RecordIdentifier max = null;
+ if (maxRowId != null) {
+ max = new RecordIdentifier(1, 100, maxRowId);
+ }
+ return new OrcRawRecordMerger.KeyInterval(min, max);
+ }
+
+ private static void checkIntersect(OrcRawRecordMerger.KeyInterval ki1, OrcRawRecordMerger.KeyInterval ki2,
+ boolean isIntersect)
+ {
+ if (isIntersect) {
+ assertTrue(ki1.isIntersects(ki2));
+ assertTrue(ki2.isIntersects(ki1));
+ } else {
+ assertFalse(ki1.isIntersects(ki2));
+ assertFalse(ki2.isIntersects(ki1));
+ }
+ }
+
private static void setRow(OrcStruct event,
int operation,
long originalTransaction,
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
index 6fe47d5..ee81cd3 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
@@ -18,7 +18,7 @@
package org.apache.hadoop.hive.ql.io.orc;
import java.io.File;
-import java.util.ArrayList;
+import java.io.IOException;
import java.util.BitSet;
import java.util.List;
import java.util.Properties;
@@ -955,7 +955,7 @@ public class TestVectorizedOrcAcidRowBatchReader {
new RecordIdentifier(0, bucketProperty, 2), filterOn);
}
- @Test
+ @Test
public void testVectorizedOrcAcidRowBatchReader() throws Exception {
conf.set("bucket_count", "1");
conf.set(ValidTxnList.VALID_TXNS_KEY,
@@ -1093,4 +1093,63 @@ public class TestVectorizedOrcAcidRowBatchReader {
null, null, true);
}
+
+ @Test
+ public void testIsQualifiedDeleteDeltaForSplit() throws IOException {
+ // Original file
+ checkPath("00000_0", "delete_delta_000012_000012_0000", true);
+ checkPath("00000_0", "delete_delta_000001_000001", true);
+
+ // Original copy
+ checkPath("00000_0_copy", "delete_delta_0000012_0000012_0000", true);
+ checkPath("00000_0_copy", "delete_delta_0000001_0000001", true);
+
+ // Base file
+ checkPath("base_00000002/bucket_0000001", "delete_delta_0000012_0000012_0000", true);
+
+ // Compacted base file
+ checkPath("base_0000002_v123/bucket_00000_0", "delete_delta_0000012_0000012_0000", true);
+
+ // Delta file
+ checkPath("delta_00000002_0000002/bucket_00001_1", "delete_delta_0000012_0000012_0000", true);
+ checkPath("delta_00000002_0000002/bucket_00001_1", "delete_delta_0000002_0000002", false);
+ checkPath("delta_00000002_0000002/bucket_00001_1", "delete_delta_0000001_0000001_0001", false);
+
+ // Delta with statement id
+ checkPath("delta_0000002_0000002_124/bucket_00001", "delete_delta_000012_000012_0000", true);
+ checkPath("delta_0000002_0000002_124/bucket_00001", "delete_delta_000002_000002", false);
+ checkPath("delta_0000002_0000002_124/bucket_00001", "delete_delta_000001_000001_0001", false);
+
+ // Delta file with data loaded by LOAD DATA command
+ checkPath("delta_0000002_0000002_0000/000000_0", "delete_delta_0000012_0000012_0000", true);
+ checkPath("delta_0000002_0000002_0000/000000_0", "delete_delta_0000002_0000002", false);
+ checkPath("delta_0000002_0000002_0000/000000_0", "delete_delta_0000001_0000001_0001", false);
+
+ // Compacted delta
+ checkPath("delta_0000002_0000005_124/bucket_00001", "delete_delta_0000012_0000012_0000", true);
+ checkPath("delta_0000002_0000005_124/bucket_00001", "delete_delta_0000003_0000003", true);
+ checkPath("delta_0000002_0000005_124/bucket_00001", "delete_delta_0000002_0000005", true);
+ checkPath("delta_0000002_0000005_124/bucket_00001", "delete_delta_0000002_0000002", false);
+ checkPath("delta_0000002_0000005_124/bucket_00001", "delete_delta_0000001_0000001_0001", false);
+
+ // Multi statement transaction check
+ checkPath("delta_0000002_0000002_0000/bucket_00001", "delete_delta_0000002_0000002_0000", false);
+ checkPath("delta_0000002_0000002_0001/bucket_00001", "delete_delta_0000002_0000002_0000", false);
+ checkPath("delta_0000002_0000002_0001/bucket_00001", "delete_delta_0000002_0000002_0002", true);
+ checkPath("delta_0000002_0000002_0001/bucket_00001", "delete_delta_0000002_0000002", false);
+ checkPath("delta_0000002_0000002/bucket_00001", "delete_delta_0000002_0000002", false);
+ checkPath("delta_0000002_0000002/bucket_00001", "delete_delta_0000002_0000002_0001", true);
+ }
+
+ private void checkPath(String splitPath, String deleteDeltaPath, boolean expected) throws IOException {
+ String tableDir = "";//hdfs://localhost:59316/base/warehouse/acid_test/";
+ AcidOutputFormat.Options ao = AcidUtils.parseBaseOrDeltaBucketFilename(new Path(tableDir + splitPath), conf);
+ if (expected) {
+ assertTrue(VectorizedOrcAcidRowBatchReader.ColumnizedDeleteEventRegistry.isQualifiedDeleteDeltaForSplit(ao,
+ new Path(tableDir + deleteDeltaPath)));
+ } else {
+ assertFalse(VectorizedOrcAcidRowBatchReader.ColumnizedDeleteEventRegistry.isQualifiedDeleteDeltaForSplit(ao,
+ new Path(tableDir + deleteDeltaPath)));
+ }
+ }
}