You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2018/10/16 18:23:41 UTC
hive git commit: HIVE-17231:
ColumnizedDeleteEventRegistry.DeleteReaderValue optimization (Eugene Koifman,
reviewed by Gopal V)
Repository: hive
Updated Branches:
refs/heads/master 0f2f999bb -> d7be4b9f2
HIVE-17231: ColumnizedDeleteEventRegistry.DeleteReaderValue optimization (Eugene Koifman, reviewed by Gopal V)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d7be4b9f
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d7be4b9f
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d7be4b9f
Branch: refs/heads/master
Commit: d7be4b9f26345439c472969461d3d2c81f7e5057
Parents: 0f2f999
Author: Eugene Koifman <ek...@apache.org>
Authored: Tue Oct 16 11:23:11 2018 -0700
Committer: Eugene Koifman <ek...@apache.org>
Committed: Tue Oct 16 11:23:11 2018 -0700
----------------------------------------------------------------------
.../hadoop/hive/ql/exec/FileSinkOperator.java | 4 ++
.../hive/ql/io/orc/OrcRawRecordMerger.java | 55 +++-----------------
.../io/orc/VectorizedOrcAcidRowBatchReader.java | 13 +----
3 files changed, 12 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/d7be4b9f/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index 8c7a78b..79e41d9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -388,6 +388,10 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
*
* A new FSP is created for each partition, so this only requires the bucket numbering and that
* is mapped in directly as an index.
+ *
+ * This relies on ReduceSinkOperator to shuffle update/delete rows by
+ * UDFToInteger(RecordIdentifier), i.e. by writerId in ROW__ID.
+ * {@link org.apache.hadoop.hive.ql.parse.SemanticAnalyzer#getPartitionColsFromBucketColsForUpdateDelete(Operator, boolean)}
*/
public int createDynamicBucket(int bucketNum) {
// this assumes all paths are bucket names (which means no lookup is needed)
http://git-wip-us.apache.org/repos/asf/hive/blob/d7be4b9f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
index 6be0c74..8cabf96 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
@@ -1122,7 +1122,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
}
continue;
}
- for (Path deltaFile : getDeltaFiles(delta, bucket, conf, mergerOptions, isBucketed)) {
+ for (Path deltaFile : getDeltaFiles(delta, bucket, mergerOptions)) {
FileSystem fs = deltaFile.getFileSystem(conf);
if(!fs.exists(deltaFile)) {
/**
@@ -1262,53 +1262,12 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
* This determines the set of {@link ReaderPairAcid} to create for a given delta/.
* For unbucketed tables {@code bucket} can be thought of as a write tranche.
*/
- static Path[] getDeltaFiles(Path deltaDirectory, int bucket, Configuration conf,
- Options mergerOptions, boolean isBucketed) throws IOException {
- if(isBucketed) {
- /**
- * for bucketed tables (for now) we always trust that the N in bucketN file name means that
- * all records have {@link RecordIdentifier#getBucketProperty()} encoding bucketId = N. This
- * means that a delete event in bucketN can only modify an insert in another bucketN file for
- * the same N. (Down the road we may trust it only in certain delta dirs)
- *
- * Compactor takes all types of deltas for a given bucket. For regular read, any file that
- * contains (only) insert events is treated as base and only
- * delete_delta/ are treated as deltas.
- */
- assert (!mergerOptions.isCompacting &&
- deltaDirectory.getName().startsWith(AcidUtils.DELETE_DELTA_PREFIX)
- ) || mergerOptions.isCompacting : "Unexpected delta: " + deltaDirectory;
- Path deltaFile = AcidUtils.createBucketFile(deltaDirectory, bucket);
- return new Path[]{deltaFile};
- }
- /**
- * For unbucketed tables insert events are also stored in bucketN files but here N is
- * the writer ID. We can trust that N matches info in {@link RecordIdentifier#getBucketProperty()}
- * delta_x_y but it's not required since we can't trust N for delete_delta_x_x/bucketN.
- * Thus we always have to take all files in a delete_delta.
- * For regular read, any file that has (only) insert events is treated as base so
- * {@link deltaDirectory} can only be delete_delta and so we take all files in it.
- * For compacting, every split contains base/bN + delta(s)/bN + delete_delta(s){all buckets} for
- * a given N.
- */
- if(deltaDirectory.getName().startsWith(AcidUtils.DELETE_DELTA_PREFIX)) {
- //it's not wrong to take all delete events for bucketed tables but it's more efficient
- //to only take those that belong to the 'bucket' assuming we trust the file name
- //un-bucketed table - get all files
- FileSystem fs = deltaDirectory.getFileSystem(conf);
- FileStatus[] dataFiles = fs.listStatus(deltaDirectory, AcidUtils.bucketFileFilter);
- Path[] deltaFiles = new Path[dataFiles.length];
- int i = 0;
- for (FileStatus stat : dataFiles) {
- deltaFiles[i++] = stat.getPath();
- }//todo: need a test where we actually have more than 1 file
- return deltaFiles;
- }
- //if here it must be delta_x_y - insert events only, so we must be compacting
- assert mergerOptions.isCompacting() : "Expected to be called as part of compaction";
- Path deltaFile = AcidUtils.createBucketFile(deltaDirectory, bucket);
- return new Path[] {deltaFile};
-
+ static Path[] getDeltaFiles(Path deltaDirectory, int bucket, Options mergerOptions) {
+ assert (!mergerOptions.isCompacting &&
+ deltaDirectory.getName().startsWith(AcidUtils.DELETE_DELTA_PREFIX)
+ ) || mergerOptions.isCompacting : "Unexpected delta: " + deltaDirectory +
+ "(isCompacting=" + mergerOptions.isCompacting() + ")";
+ return new Path[] {AcidUtils.createBucketFile(deltaDirectory, bucket)};
}
@VisibleForTesting
http://git-wip-us.apache.org/repos/asf/hive/blob/d7be4b9f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
index 66280b2..2f809de 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
@@ -1420,19 +1420,8 @@ public class VectorizedOrcAcidRowBatchReader
/**
* Whenever we are reading a batch, we must ensure that all the records in the batch
* have the same bucket id as the bucket id of the split. If not, throw exception.
- * NOTE: this assertion might not hold, once virtual bucketing is in place. However,
- * it should be simple to fix that case. Just replace check for bucket equality with
- * a check for valid bucket mapping. Until virtual bucketing is added, it means
- * either the split computation got messed up or we found some corrupted records.
*/
private void checkBucketId(int bucketPropertyFromRecord) throws IOException {
- if(!isBucketedTable) {
- /**
- * in this case a file inside a delete_delta_x_y/bucketN may contain any value for
- * bucketId in {@link RecordIdentifier#getBucketProperty()}
- */
- return;
- }
int bucketIdFromRecord = BucketCodec.determineVersion(bucketPropertyFromRecord)
.decodeWriterId(bucketPropertyFromRecord);
if(bucketIdFromRecord != bucketForSplit) {
@@ -1534,7 +1523,7 @@ public class VectorizedOrcAcidRowBatchReader
for (Path deleteDeltaDir : deleteDeltaDirs) {
FileSystem fs = deleteDeltaDir.getFileSystem(conf);
Path[] deleteDeltaFiles = OrcRawRecordMerger.getDeltaFiles(deleteDeltaDir, bucket,
- conf, new OrcRawRecordMerger.Options().isCompacting(false), isBucketedTable);
+ new OrcRawRecordMerger.Options().isCompacting(false));
for (Path deleteDeltaFile : deleteDeltaFiles) {
// NOTE: Calling last flush length below is more for future-proofing when we have
// streaming deletes. But currently we don't support streaming deletes, and this can