You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pv...@apache.org on 2019/10/17 08:17:59 UTC
[hive] branch master updated: HIVE-22298: Allow Llap IO cache for
reading tables without delete delta (Peter Vary reviewed by Slim Bouguerra)
This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new c6626ed HIVE-22298: Allow Llap IO cache for reading tables without delete delta (Peter Vary reviewed by Slim Bouguerra)
c6626ed is described below
commit c6626edb65c2cd00576647e54db1995628fe64da
Author: Peter Vary <pv...@cloudera.com>
AuthorDate: Thu Oct 17 09:55:30 2019 +0200
HIVE-22298: Allow Llap IO cache for reading tables without delete delta (Peter Vary reviewed by Slim Bouguerra)
---
.../hive/llap/io/api/impl/LlapRecordReader.java | 12 ++++--
.../org/apache/hadoop/hive/ql/io/orc/OrcSplit.java | 39 +++++------------
.../ql/io/orc/VectorizedOrcAcidRowBatchReader.java | 49 ++++++++++++----------
.../clientpositive/acid_vectorization_original.q | 4 ++
4 files changed, 49 insertions(+), 55 deletions(-)
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
index 8cc81cc..77966aa 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
@@ -102,6 +102,7 @@ class LlapRecordReader implements RecordReader<NullWritable, VectorizedRowBatch>
private final ReadPipeline rp;
private final ExecutorService executor;
private final boolean isAcidScan;
+ private final boolean isAcidFormat;
/**
* Creates the record reader and checks the input-specific compatibility.
@@ -182,10 +183,15 @@ class LlapRecordReader implements RecordReader<NullWritable, VectorizedRowBatch>
this.isVectorized = HiveConf.getBoolVar(jobConf, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED);
if (isAcidScan) {
+ OrcSplit orcSplit = (OrcSplit) split;
this.acidReader = new VectorizedOrcAcidRowBatchReader(
- (OrcSplit) split, jobConf, Reporter.NULL, null, rbCtx, true);
+ orcSplit, jobConf, Reporter.NULL, null, rbCtx, true);
+ isAcidFormat = !orcSplit.isOriginal();
+ } else {
+ isAcidFormat = false;
}
- this.includes = new IncludesImpl(tableIncludedCols, isAcidScan, rbCtx,
+
+ this.includes = new IncludesImpl(tableIncludedCols, isAcidFormat, rbCtx,
schema, job, isAcidScan && acidReader.includeAcidColumns());
// Create the consumer of encoded data; it will coordinate decoding to CVBs.
@@ -361,7 +367,7 @@ class LlapRecordReader implements RecordReader<NullWritable, VectorizedRowBatch>
counters.incrWallClockCounter(LlapIOCounters.CONSUMER_TIME_NS, firstReturnTime);
return false;
}
- if (isAcidScan) {
+ if (isAcidFormat) {
vrb.selectedInUse = true;//why?
if (isVectorized) {
// TODO: relying everywhere on the magical constants and columns being together means ACID
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
index 3eadc26..0a96fc3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
@@ -273,38 +273,19 @@ public class OrcSplit extends FileSplit implements ColumnarSplit, LlapAwareSplit
@Override
public boolean canUseLlapIo(Configuration conf) {
- final boolean hasDelta = deltas != null && !deltas.isEmpty();
- final boolean isAcidRead = AcidUtils.isFullAcidScan(conf);
- final boolean isVectorized = Utilities.getIsVectorized(conf);
- Boolean isSplitUpdate = null;
- if (isAcidRead) {
- final AcidUtils.AcidOperationalProperties acidOperationalProperties
- = AcidUtils.getAcidOperationalProperties(conf);
- isSplitUpdate = acidOperationalProperties.isSplitUpdate();
- // TODO: this is brittle. Who said everyone has to upgrade using upgrade process?
- assert isSplitUpdate : "should be true in Hive 3.0";
- }
-
- if (isOriginal) {
- if (!isAcidRead && !hasDelta) {
- // Original scan only
- return true;
+ if (AcidUtils.isFullAcidScan(conf)) {
+ if (HiveConf.getBoolVar(conf, ConfVars.LLAP_IO_ACID_ENABLED)
+ && Utilities.getIsVectorized(conf)) {
+ boolean hasDeleteDelta = deltas != null && !deltas.isEmpty();
+ return VectorizedOrcAcidRowBatchReader.canUseLlapIoForAcid(this, hasDeleteDelta, conf);
+ } else {
+ LOG.info("Skipping Llap IO based on the following: [vectorized={}, hive.llap.io.acid={}] for {}",
+ Utilities.getIsVectorized(conf), HiveConf.getBoolVar(conf, ConfVars.LLAP_IO_ACID_ENABLED), this);
+ return false;
}
} else {
- boolean isAcidEnabled = HiveConf.getBoolVar(conf, ConfVars.LLAP_IO_ACID_ENABLED);
- if (isAcidEnabled && isAcidRead && hasBase && isVectorized) {
- if (hasDelta) {
- if (isSplitUpdate) {
- // Base with delete deltas
- return true;
- }
- } else {
- // Base scan only
- return true;
- }
- }
+ return true;
}
- return false;
}
/**
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
index 374b105..2543dc6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
@@ -272,7 +272,7 @@ public class VectorizedOrcAcidRowBatchReader
* isOriginal - don't have meta columns - nothing to skip
* there no relevant delete events && ROW__ID is not needed higher up
* (e.g. this is not a delete statement)*/
- if (!isOriginal && deleteEventRegistry.isEmpty() && !rowIdProjected) {
+ if (deleteEventRegistry.isEmpty() && !rowIdProjected) {
Path parent = orcSplit.getPath().getParent();
while (parent != null && !rootPath.equals(parent)) {
if (parent.getName().startsWith(AcidUtils.BASE_PREFIX)) {
@@ -749,7 +749,7 @@ public class VectorizedOrcAcidRowBatchReader
* @param hasDeletes - if there are any deletes that apply to this split
* todo: HIVE-17944
*/
- static boolean canUseLlapForAcid(OrcSplit split, boolean hasDeletes, Configuration conf) {
+ static boolean canUseLlapIoForAcid(OrcSplit split, boolean hasDeletes, Configuration conf) {
if(!split.isOriginal()) {
return true;
}
@@ -906,12 +906,8 @@ public class VectorizedOrcAcidRowBatchReader
}
}
- if (isOriginal) {
- /* Just copy the payload. {@link recordIdColumnVector} has already been populated */
- System.arraycopy(vectorizedRowBatchBase.cols, 0, value.cols, 0, value.getDataColumnCount());
- } else {
- copyFromBase(value);
- }
+ copyFromBase(value);
+
if (rowIdProjected) {
int ix = rbCtx.findVirtualColumnNum(VirtualColumn.ROWID);
value.cols[ix] = recordIdColumnVector;
@@ -923,7 +919,11 @@ public class VectorizedOrcAcidRowBatchReader
//ColumnVectors for acid meta cols to create a single ColumnVector
//representing RecordIdentifier and (optionally) set it in 'value'
private void copyFromBase(VectorizedRowBatch value) {
- assert !isOriginal;
+ if (isOriginal) {
+ /* Just copy the payload. {@link recordIdColumnVector} has already been populated if needed */
+ System.arraycopy(vectorizedRowBatchBase.cols, 0, value.cols, 0, value.getDataColumnCount());
+ return;
+ }
if (isFlatPayload) {
int payloadCol = includeAcidColumns ? OrcRecordUpdater.ROW : 0;
// Ignore the struct column and just copy all the following data columns.
@@ -1266,7 +1266,6 @@ public class VectorizedOrcAcidRowBatchReader
* A simple wrapper class to hold the (owid, bucketProperty, rowId) pair.
*/
static class DeleteRecordKey implements Comparable<DeleteRecordKey> {
- private static final DeleteRecordKey otherKey = new DeleteRecordKey();
private long originalWriteId;
/**
* see {@link BucketCodec}
@@ -1288,25 +1287,29 @@ public class VectorizedOrcAcidRowBatchReader
if (other == null) {
return -1;
}
- if (originalWriteId != other.originalWriteId) {
- return originalWriteId < other.originalWriteId ? -1 : 1;
- }
- if(bucketProperty != other.bucketProperty) {
- return bucketProperty < other.bucketProperty ? -1 : 1;
- }
- if (rowId != other.rowId) {
- return rowId < other.rowId ? -1 : 1;
- }
- return 0;
+ return compareTo(other.originalWriteId, other.bucketProperty, other.rowId);
}
+
private int compareTo(RecordIdentifier other) {
if (other == null) {
return -1;
}
- otherKey.set(other.getWriteId(), other.getBucketProperty(),
- other.getRowId());
- return compareTo(otherKey);
+ return compareTo(other.getWriteId(), other.getBucketProperty(), other.getRowId());
}
+
+ private int compareTo(long oOriginalWriteId, int oBucketProperty, long oRowId) {
+ if (originalWriteId != oOriginalWriteId) {
+ return originalWriteId < oOriginalWriteId ? -1 : 1;
+ }
+ if(bucketProperty != oBucketProperty) {
+ return bucketProperty < oBucketProperty ? -1 : 1;
+ }
+ if (rowId != oRowId) {
+ return rowId < oRowId ? -1 : 1;
+ }
+ return 0;
+ }
+
@Override
public String toString() {
return "DeleteRecordKey(" + originalWriteId + "," +
diff --git a/ql/src/test/queries/clientpositive/acid_vectorization_original.q b/ql/src/test/queries/clientpositive/acid_vectorization_original.q
index 1a9fc57..c6d790d 100644
--- a/ql/src/test/queries/clientpositive/acid_vectorization_original.q
+++ b/ql/src/test/queries/clientpositive/acid_vectorization_original.q
@@ -110,6 +110,10 @@ explain select ROW__ID, count(*) from over10k_orc_bucketed group by ROW__ID havi
-- this test that there are no duplicate ROW__IDs so should produce no output
-- on LLAP this produces "NULL, 6"; on tez it produces nothing: HIVE-17921
+-- this makes sure that the same code is running on the Ptest and on localhost. The target is:
+-- Original split count is 11 grouped split count is 1, for bucket: 1
+set tez.grouping.split-count=1;
+
select ROW__ID, count(*) from over10k_orc_bucketed group by ROW__ID having count(*) > 1;
-- this produces nothing (as it should)
select ROW__ID, * from over10k_orc_bucketed where ROW__ID is null;