You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/05/07 22:15:34 UTC
hive git commit: HIVE-19312 : MM tables don't work with BucketizedHIF
(Sergey Shelukhin, reviewed by Gunther Hagleitner)
Repository: hive
Updated Branches:
refs/heads/master d159f24f6 -> 0930aec69
HIVE-19312 : MM tables don't work with BucketizedHIF (Sergey Shelukhin, reviewed by Gunther Hagleitner)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0930aec6
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0930aec6
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0930aec6
Branch: refs/heads/master
Commit: 0930aec69bc6af835470e6e42460730f3db3ad34
Parents: d159f24
Author: sergey <se...@apache.org>
Authored: Mon May 7 15:15:23 2018 -0700
Committer: sergey <se...@apache.org>
Committed: Mon May 7 15:15:23 2018 -0700
----------------------------------------------------------------------
.../test/resources/testconfiguration.properties | 1 +
.../hive/ql/io/BucketizedHiveInputFormat.java | 56 +++++--
.../hadoop/hive/ql/io/HiveInputFormat.java | 29 ++--
.../rcfile/truncate/ColumnTruncateMapper.java | 4 +-
ql/src/test/queries/clientpositive/mm_bhif.q | 27 ++++
.../test/results/clientpositive/mm_bhif.q.out | 146 +++++++++++++++++++
6 files changed, 234 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/0930aec6/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index 13c08de..e7463ca 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -578,6 +578,7 @@ minillaplocal.query.files=\
mapjoin_hint.q,\
mapjoin_emit_interval.q,\
mergejoin_3way.q,\
+ mm_bhif.q,\
mm_conversions.q,\
mm_exim.q,\
mm_loaddata.q,\
http://git-wip-us.apache.org/repos/asf/hive/blob/0930aec6/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java
index e09c6ec..58f0480 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java
@@ -18,10 +18,12 @@
package org.apache.hadoop.hive.ql.io;
+import org.apache.curator.shaded.com.google.common.collect.Lists;
+
+import org.apache.hadoop.hive.common.ValidWriteIdList;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.FileStatus;
@@ -50,8 +52,7 @@ import org.apache.hadoop.mapred.Reporter;
public class BucketizedHiveInputFormat<K extends WritableComparable, V extends Writable>
extends HiveInputFormat<K, V> {
- public static final Logger LOG = LoggerFactory
- .getLogger("org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat");
+ public static final Logger LOG = LoggerFactory.getLogger(BucketizedHiveInputFormat.class);
@Override
public RecordReader getRecordReader(InputSplit split, JobConf job,
@@ -123,25 +124,34 @@ public class BucketizedHiveInputFormat<K extends WritableComparable, V extends W
// for each dir, get all files under the dir, do getSplits to each
// individual file,
// and then create a BucketizedHiveInputSplit on it
+
+ ArrayList<Path> currentDir = null;
for (Path dir : dirs) {
PartitionDesc part = getPartitionDescFromPath(pathToPartitionInfo, dir);
- // create a new InputFormat instance if this is the first time to see this
- // class
+ // create a new InputFormat instance if this is the first time to see this class
Class inputFormatClass = part.getInputFileFormatClass();
InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job);
newjob.setInputFormat(inputFormat.getClass());
- FileStatus[] listStatus = listStatus(newjob, dir);
-
- for (FileStatus status : listStatus) {
- LOG.info("block size: " + status.getBlockSize());
- LOG.info("file length: " + status.getLen());
- FileInputFormat.setInputPaths(newjob, status.getPath());
- InputSplit[] iss = inputFormat.getSplits(newjob, 0);
- if (iss != null && iss.length > 0) {
- numOrigSplits += iss.length;
- result.add(new BucketizedHiveInputSplit(iss, inputFormatClass
- .getName()));
+ ValidWriteIdList mmIds = null;
+ if (part.getTableDesc() != null) {
+ // This can happen for truncate table case for non-MM tables.
+ mmIds = getMmValidWriteIds(newjob, part.getTableDesc(), null);
+ throw new AssertionError(dir + ": " + part);
+ }
+ // TODO: should this also handle ACID operation, etc.? seems to miss a lot of stuff from HIF.
+ Path[] finalDirs = (mmIds == null) ? new Path[] { dir }
+ : processPathsForMmRead(Lists.newArrayList(dir), newjob, mmIds);
+ if (finalDirs == null) {
+ continue; // No valid inputs - possible in MM case.
+ }
+
+ for (Path finalDir : finalDirs) {
+ FileStatus[] listStatus = listStatus(newjob, finalDir);
+
+ for (FileStatus status : listStatus) {
+ numOrigSplits = addBHISplit(
+ status, inputFormat, inputFormatClass, numOrigSplits, newjob, result);
}
}
}
@@ -149,4 +159,18 @@ public class BucketizedHiveInputFormat<K extends WritableComparable, V extends W
+ numOrigSplits + " original splits.");
return result.toArray(new BucketizedHiveInputSplit[result.size()]);
}
+
+ private int addBHISplit(FileStatus status, InputFormat inputFormat, Class inputFormatClass,
+ int numOrigSplits, JobConf newjob, ArrayList<InputSplit> result) throws IOException {
+ LOG.info("block size: " + status.getBlockSize());
+ LOG.info("file length: " + status.getLen());
+ FileInputFormat.setInputPaths(newjob, status.getPath());
+ InputSplit[] iss = inputFormat.getSplits(newjob, 0);
+ if (iss != null && iss.length > 0) {
+ numOrigSplits += iss.length;
+ result.add(new BucketizedHiveInputSplit(iss, inputFormatClass
+ .getName()));
+ }
+ return numOrigSplits;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/0930aec6/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
index b25bb1d..655d10b 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
@@ -478,18 +478,9 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
InputFormat inputFormat, Class<? extends InputFormat> inputFormatClass, int splits,
TableDesc table, List<InputSplit> result)
throws IOException {
- ValidWriteIdList validWriteIdList = AcidUtils.getTableValidWriteIdList(conf, table.getTableName());
- ValidWriteIdList validMmWriteIdList;
- if (AcidUtils.isInsertOnlyTable(table.getProperties())) {
- if (validWriteIdList == null) {
- throw new IOException("Insert-Only table: " + table.getTableName()
- + " is missing from the ValidWriteIdList config: "
- + conf.get(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY));
- }
- validMmWriteIdList = validWriteIdList;
- } else {
- validMmWriteIdList = null; // for non-MM case
- }
+ ValidWriteIdList validWriteIdList = AcidUtils.getTableValidWriteIdList(
+ conf, table.getTableName());
+ ValidWriteIdList validMmWriteIdList = getMmValidWriteIds(conf, table, validWriteIdList);
try {
Utilities.copyTablePropertiesToConf(table, conf);
@@ -555,6 +546,20 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
}
}
+ protected ValidWriteIdList getMmValidWriteIds(
+ JobConf conf, TableDesc table, ValidWriteIdList validWriteIdList) throws IOException {
+ if (!AcidUtils.isInsertOnlyTable(table.getProperties())) return null;
+ if (validWriteIdList == null) {
+ validWriteIdList = AcidUtils.getTableValidWriteIdList( conf, table.getTableName());
+ if (validWriteIdList == null) {
+ throw new IOException("Insert-Only table: " + table.getTableName()
+ + " is missing from the ValidWriteIdList config: "
+ + conf.get(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY));
+ }
+ }
+ return validWriteIdList;
+ }
+
public static Path[] processPathsForMmRead(List<Path> dirs, JobConf conf,
ValidWriteIdList validWriteIdList) throws IOException {
if (validWriteIdList == null) {
http://git-wip-us.apache.org/repos/asf/hive/blob/0930aec6/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java
index 591c4b8..c112978 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java
@@ -236,7 +236,9 @@ public class ColumnTruncateMapper extends MapReduceBase implements
Path backupPath = backupOutputPath(fs, outputPath, job);
Utilities.mvFileToFinalPath(outputPath, job, success, LOG, dynPartCtx, null,
reporter);
- fs.delete(backupPath, true);
+ if (backupPath != null) {
+ fs.delete(backupPath, true);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/0930aec6/ql/src/test/queries/clientpositive/mm_bhif.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/mm_bhif.q b/ql/src/test/queries/clientpositive/mm_bhif.q
new file mode 100644
index 0000000..f9c7f8a
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/mm_bhif.q
@@ -0,0 +1,27 @@
+SET hive.vectorized.execution.enabled=true;
+SET hive.vectorized.execution.reduce.enabled=true;
+set hive.mapred.mode=nonstrict;
+set hive.exec.reducers.max = 10;
+set hive.map.groupby.sorted=true;
+set hive.support.concurrency=true;
+set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
+
+-- SORT_QUERY_RESULTS
+
+CREATE TABLE T1_mm(key STRING, val STRING) PARTITIONED BY (ds string)
+CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE tblproperties ("transactional"="true", "transactional_properties"="insert_only");
+
+LOAD DATA LOCAL INPATH '../../data/files/bucket_files/000000_0' INTO TABLE T1_mm PARTITION (ds='1');
+
+INSERT OVERWRITE TABLE T1_mm PARTITION (ds='1') select key, val from T1_mm where ds = '1';
+
+
+set hive.fetch.task.conversion=none;
+
+select * from T1_mm;
+
+explain
+select count(distinct key) from T1_mm;
+select count(distinct key) from T1_mm;
+
+DROP TABLE T1_mm;
http://git-wip-us.apache.org/repos/asf/hive/blob/0930aec6/ql/src/test/results/clientpositive/mm_bhif.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/mm_bhif.q.out b/ql/src/test/results/clientpositive/mm_bhif.q.out
new file mode 100644
index 0000000..4774007
--- /dev/null
+++ b/ql/src/test/results/clientpositive/mm_bhif.q.out
@@ -0,0 +1,146 @@
+PREHOOK: query: CREATE TABLE T1_mm(key STRING, val STRING) PARTITIONED BY (ds string)
+CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE tblproperties ("transactional"="true", "transactional_properties"="insert_only")
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@T1_mm
+POSTHOOK: query: CREATE TABLE T1_mm(key STRING, val STRING) PARTITIONED BY (ds string)
+CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE tblproperties ("transactional"="true", "transactional_properties"="insert_only")
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@T1_mm
+PREHOOK: query: LOAD DATA LOCAL INPATH '../../data/files/bucket_files/000000_0' INTO TABLE T1_mm PARTITION (ds='1')
+PREHOOK: type: LOAD
+#### A masked pattern was here ####
+PREHOOK: Output: default@t1_mm
+POSTHOOK: query: LOAD DATA LOCAL INPATH '../../data/files/bucket_files/000000_0' INTO TABLE T1_mm PARTITION (ds='1')
+POSTHOOK: type: LOAD
+#### A masked pattern was here ####
+POSTHOOK: Output: default@t1_mm
+POSTHOOK: Output: default@t1_mm@ds=1
+PREHOOK: query: INSERT OVERWRITE TABLE T1_mm PARTITION (ds='1') select key, val from T1_mm where ds = '1'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@t1_mm
+PREHOOK: Input: default@t1_mm@ds=1
+PREHOOK: Output: default@t1_mm@ds=1
+POSTHOOK: query: INSERT OVERWRITE TABLE T1_mm PARTITION (ds='1') select key, val from T1_mm where ds = '1'
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@t1_mm
+POSTHOOK: Input: default@t1_mm@ds=1
+POSTHOOK: Output: default@t1_mm@ds=1
+POSTHOOK: Lineage: t1_mm PARTITION(ds=1).key SIMPLE [(t1_mm)t1_mm.FieldSchema(name:key, type:string, comment:null), ]
+POSTHOOK: Lineage: t1_mm PARTITION(ds=1).val SIMPLE [(t1_mm)t1_mm.FieldSchema(name:val, type:string, comment:null), ]
+PREHOOK: query: select * from T1_mm
+PREHOOK: type: QUERY
+PREHOOK: Input: default@t1_mm
+PREHOOK: Input: default@t1_mm@ds=1
+#### A masked pattern was here ####
+POSTHOOK: query: select * from T1_mm
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@t1_mm
+POSTHOOK: Input: default@t1_mm@ds=1
+#### A masked pattern was here ####
+1 11 1
+2 12 1
+3 13 1
+7 17 1
+8 18 1
+8 28 1
+PREHOOK: query: explain
+select count(distinct key) from T1_mm
+PREHOOK: type: QUERY
+POSTHOOK: query: explain
+select count(distinct key) from T1_mm
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-2 depends on stages: Stage-1
+ Stage-0 depends on stages: Stage-2
+
+STAGE PLANS:
+ Stage: Stage-1
+ Map Reduce
+ Map Operator Tree:
+ TableScan
+ alias: t1_mm
+ Statistics: Num rows: 6 Data size: 24 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: key (type: string)
+ outputColumnNames: key
+ Statistics: Num rows: 6 Data size: 24 Basic stats: COMPLETE Column stats: NONE
+ Group By Operator
+ keys: key (type: string)
+ mode: hash
+ outputColumnNames: _col0
+ Statistics: Num rows: 6 Data size: 24 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: _col0 (type: string)
+ sort order: +
+ Map-reduce partition columns: _col0 (type: string)
+ Statistics: Num rows: 6 Data size: 24 Basic stats: COMPLETE Column stats: NONE
+ Execution mode: vectorized
+ Reduce Operator Tree:
+ Group By Operator
+ keys: KEY._col0 (type: string)
+ mode: partial2
+ outputColumnNames: _col0
+ Statistics: Num rows: 6 Data size: 24 Basic stats: COMPLETE Column stats: NONE
+ Group By Operator
+ aggregations: count(_col0)
+ mode: partial2
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
+
+ Stage: Stage-2
+ Map Reduce
+ Map Operator Tree:
+ TableScan
+ Reduce Output Operator
+ sort order:
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col0 (type: bigint)
+ Execution mode: vectorized
+ Reduce Operator Tree:
+ Group By Operator
+ aggregations: count(VALUE._col0)
+ mode: mergepartial
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+ Processor Tree:
+ ListSink
+
+PREHOOK: query: select count(distinct key) from T1_mm
+PREHOOK: type: QUERY
+PREHOOK: Input: default@t1_mm
+PREHOOK: Input: default@t1_mm@ds=1
+#### A masked pattern was here ####
+POSTHOOK: query: select count(distinct key) from T1_mm
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@t1_mm
+POSTHOOK: Input: default@t1_mm@ds=1
+#### A masked pattern was here ####
+5
+PREHOOK: query: DROP TABLE T1_mm
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@t1_mm
+PREHOOK: Output: default@t1_mm
+POSTHOOK: query: DROP TABLE T1_mm
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@t1_mm
+POSTHOOK: Output: default@t1_mm