You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/05/07 22:15:34 UTC

hive git commit: HIVE-19312 : MM tables don't work with BucketizedHIF (Sergey Shelukhin, reviewed by Gunther Hagleitner)

Repository: hive
Updated Branches:
  refs/heads/master d159f24f6 -> 0930aec69


HIVE-19312 : MM tables don't work with BucketizedHIF (Sergey Shelukhin, reviewed by Gunther Hagleitner)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0930aec6
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0930aec6
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0930aec6

Branch: refs/heads/master
Commit: 0930aec69bc6af835470e6e42460730f3db3ad34
Parents: d159f24
Author: sergey <se...@apache.org>
Authored: Mon May 7 15:15:23 2018 -0700
Committer: sergey <se...@apache.org>
Committed: Mon May 7 15:15:23 2018 -0700

----------------------------------------------------------------------
 .../test/resources/testconfiguration.properties |   1 +
 .../hive/ql/io/BucketizedHiveInputFormat.java   |  56 +++++--
 .../hadoop/hive/ql/io/HiveInputFormat.java      |  29 ++--
 .../rcfile/truncate/ColumnTruncateMapper.java   |   4 +-
 ql/src/test/queries/clientpositive/mm_bhif.q    |  27 ++++
 .../test/results/clientpositive/mm_bhif.q.out   | 146 +++++++++++++++++++
 6 files changed, 234 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/0930aec6/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index 13c08de..e7463ca 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -578,6 +578,7 @@ minillaplocal.query.files=\
   mapjoin_hint.q,\
   mapjoin_emit_interval.q,\
   mergejoin_3way.q,\
+  mm_bhif.q,\
   mm_conversions.q,\
   mm_exim.q,\
   mm_loaddata.q,\

http://git-wip-us.apache.org/repos/asf/hive/blob/0930aec6/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java
index e09c6ec..58f0480 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java
@@ -18,10 +18,12 @@
 
 package org.apache.hadoop.hive.ql.io;
 
+import org.apache.curator.shaded.com.google.common.collect.Lists;
+
+import org.apache.hadoop.hive.common.ValidWriteIdList;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.fs.FileStatus;
@@ -50,8 +52,7 @@ import org.apache.hadoop.mapred.Reporter;
 public class BucketizedHiveInputFormat<K extends WritableComparable, V extends Writable>
     extends HiveInputFormat<K, V> {
 
-  public static final Logger LOG = LoggerFactory
-      .getLogger("org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat");
+  public static final Logger LOG = LoggerFactory.getLogger(BucketizedHiveInputFormat.class);
 
   @Override
   public RecordReader getRecordReader(InputSplit split, JobConf job,
@@ -123,25 +124,34 @@ public class BucketizedHiveInputFormat<K extends WritableComparable, V extends W
     // for each dir, get all files under the dir, do getSplits to each
     // individual file,
     // and then create a BucketizedHiveInputSplit on it
+
+    ArrayList<Path> currentDir = null;
     for (Path dir : dirs) {
       PartitionDesc part = getPartitionDescFromPath(pathToPartitionInfo, dir);
-      // create a new InputFormat instance if this is the first time to see this
-      // class
+      // create a new InputFormat instance if this is the first time to see this class
       Class inputFormatClass = part.getInputFileFormatClass();
       InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job);
       newjob.setInputFormat(inputFormat.getClass());
 
-      FileStatus[] listStatus = listStatus(newjob, dir);
-
-      for (FileStatus status : listStatus) {
-        LOG.info("block size: " + status.getBlockSize());
-        LOG.info("file length: " + status.getLen());
-        FileInputFormat.setInputPaths(newjob, status.getPath());
-        InputSplit[] iss = inputFormat.getSplits(newjob, 0);
-        if (iss != null && iss.length > 0) {
-          numOrigSplits += iss.length;
-          result.add(new BucketizedHiveInputSplit(iss, inputFormatClass
-              .getName()));
+      ValidWriteIdList mmIds = null;
+      if (part.getTableDesc() != null) {
+        // This can happen for truncate table case for non-MM tables.
+        mmIds = getMmValidWriteIds(newjob, part.getTableDesc(), null);
+        throw new AssertionError(dir + ": " + part);
+      }
+      // TODO: should this also handle ACID operation, etc.? seems to miss a lot of stuff from HIF.
+      Path[] finalDirs = (mmIds == null) ? new Path[] { dir }
+        : processPathsForMmRead(Lists.newArrayList(dir), newjob, mmIds);
+      if (finalDirs == null) {
+        continue; // No valid inputs - possible in MM case.
+      }
+
+      for (Path finalDir : finalDirs) {
+        FileStatus[] listStatus = listStatus(newjob, finalDir);
+
+        for (FileStatus status : listStatus) {
+          numOrigSplits = addBHISplit(
+              status, inputFormat, inputFormatClass, numOrigSplits, newjob, result);
         }
       }
     }
@@ -149,4 +159,18 @@ public class BucketizedHiveInputFormat<K extends WritableComparable, V extends W
         + numOrigSplits + " original splits.");
     return result.toArray(new BucketizedHiveInputSplit[result.size()]);
   }
+
+  private int addBHISplit(FileStatus status, InputFormat inputFormat, Class inputFormatClass,
+      int numOrigSplits, JobConf newjob, ArrayList<InputSplit> result) throws IOException {
+    LOG.info("block size: " + status.getBlockSize());
+    LOG.info("file length: " + status.getLen());
+    FileInputFormat.setInputPaths(newjob, status.getPath());
+    InputSplit[] iss = inputFormat.getSplits(newjob, 0);
+    if (iss != null && iss.length > 0) {
+      numOrigSplits += iss.length;
+      result.add(new BucketizedHiveInputSplit(iss, inputFormatClass
+          .getName()));
+    }
+    return numOrigSplits;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/0930aec6/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
index b25bb1d..655d10b 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
@@ -478,18 +478,9 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
       InputFormat inputFormat, Class<? extends InputFormat> inputFormatClass, int splits,
       TableDesc table, List<InputSplit> result)
           throws IOException {
-    ValidWriteIdList validWriteIdList = AcidUtils.getTableValidWriteIdList(conf, table.getTableName());
-    ValidWriteIdList validMmWriteIdList;
-    if (AcidUtils.isInsertOnlyTable(table.getProperties())) {
-      if (validWriteIdList == null) {
-        throw new IOException("Insert-Only table: " + table.getTableName()
-                + " is missing from the ValidWriteIdList config: "
-                + conf.get(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY));
-      }
-      validMmWriteIdList = validWriteIdList;
-    } else {
-      validMmWriteIdList = null;  // for non-MM case
-    }
+    ValidWriteIdList validWriteIdList = AcidUtils.getTableValidWriteIdList(
+        conf, table.getTableName());
+    ValidWriteIdList validMmWriteIdList = getMmValidWriteIds(conf, table, validWriteIdList);
 
     try {
       Utilities.copyTablePropertiesToConf(table, conf);
@@ -555,6 +546,20 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
     }
   }
 
+  protected ValidWriteIdList getMmValidWriteIds(
+      JobConf conf, TableDesc table, ValidWriteIdList validWriteIdList) throws IOException {
+    if (!AcidUtils.isInsertOnlyTable(table.getProperties())) return null;
+    if (validWriteIdList == null) {
+      validWriteIdList = AcidUtils.getTableValidWriteIdList( conf, table.getTableName());
+      if (validWriteIdList == null) {
+        throw new IOException("Insert-Only table: " + table.getTableName()
+                + " is missing from the ValidWriteIdList config: "
+                + conf.get(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY));
+      }
+    }
+    return validWriteIdList;
+  }
+
   public static Path[] processPathsForMmRead(List<Path> dirs, JobConf conf,
       ValidWriteIdList validWriteIdList) throws IOException {
      if (validWriteIdList == null) {

http://git-wip-us.apache.org/repos/asf/hive/blob/0930aec6/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java
index 591c4b8..c112978 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java
@@ -236,7 +236,9 @@ public class ColumnTruncateMapper extends MapReduceBase implements
     Path backupPath = backupOutputPath(fs, outputPath, job);
     Utilities.mvFileToFinalPath(outputPath, job, success, LOG, dynPartCtx, null,
       reporter);
-    fs.delete(backupPath, true);
+    if (backupPath != null) {
+      fs.delete(backupPath, true);
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/0930aec6/ql/src/test/queries/clientpositive/mm_bhif.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/mm_bhif.q b/ql/src/test/queries/clientpositive/mm_bhif.q
new file mode 100644
index 0000000..f9c7f8a
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/mm_bhif.q
@@ -0,0 +1,27 @@
+SET hive.vectorized.execution.enabled=true;
+SET hive.vectorized.execution.reduce.enabled=true;
+set hive.mapred.mode=nonstrict;
+set hive.exec.reducers.max = 10;
+set hive.map.groupby.sorted=true;
+set hive.support.concurrency=true;
+set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
+
+-- SORT_QUERY_RESULTS
+
+CREATE TABLE T1_mm(key STRING, val STRING) PARTITIONED BY (ds string)
+CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE  tblproperties ("transactional"="true", "transactional_properties"="insert_only");
+
+LOAD DATA LOCAL INPATH '../../data/files/bucket_files/000000_0' INTO TABLE T1_mm  PARTITION (ds='1');
+
+INSERT OVERWRITE TABLE T1_mm PARTITION (ds='1') select key, val from T1_mm where ds = '1';
+
+
+set hive.fetch.task.conversion=none;
+
+select * from T1_mm;
+
+explain
+select count(distinct key) from T1_mm;
+select count(distinct key) from T1_mm;
+
+DROP TABLE T1_mm;

http://git-wip-us.apache.org/repos/asf/hive/blob/0930aec6/ql/src/test/results/clientpositive/mm_bhif.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/mm_bhif.q.out b/ql/src/test/results/clientpositive/mm_bhif.q.out
new file mode 100644
index 0000000..4774007
--- /dev/null
+++ b/ql/src/test/results/clientpositive/mm_bhif.q.out
@@ -0,0 +1,146 @@
+PREHOOK: query: CREATE TABLE T1_mm(key STRING, val STRING) PARTITIONED BY (ds string)
+CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE  tblproperties ("transactional"="true", "transactional_properties"="insert_only")
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@T1_mm
+POSTHOOK: query: CREATE TABLE T1_mm(key STRING, val STRING) PARTITIONED BY (ds string)
+CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE  tblproperties ("transactional"="true", "transactional_properties"="insert_only")
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@T1_mm
+PREHOOK: query: LOAD DATA LOCAL INPATH '../../data/files/bucket_files/000000_0' INTO TABLE T1_mm  PARTITION (ds='1')
+PREHOOK: type: LOAD
+#### A masked pattern was here ####
+PREHOOK: Output: default@t1_mm
+POSTHOOK: query: LOAD DATA LOCAL INPATH '../../data/files/bucket_files/000000_0' INTO TABLE T1_mm  PARTITION (ds='1')
+POSTHOOK: type: LOAD
+#### A masked pattern was here ####
+POSTHOOK: Output: default@t1_mm
+POSTHOOK: Output: default@t1_mm@ds=1
+PREHOOK: query: INSERT OVERWRITE TABLE T1_mm PARTITION (ds='1') select key, val from T1_mm where ds = '1'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@t1_mm
+PREHOOK: Input: default@t1_mm@ds=1
+PREHOOK: Output: default@t1_mm@ds=1
+POSTHOOK: query: INSERT OVERWRITE TABLE T1_mm PARTITION (ds='1') select key, val from T1_mm where ds = '1'
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@t1_mm
+POSTHOOK: Input: default@t1_mm@ds=1
+POSTHOOK: Output: default@t1_mm@ds=1
+POSTHOOK: Lineage: t1_mm PARTITION(ds=1).key SIMPLE [(t1_mm)t1_mm.FieldSchema(name:key, type:string, comment:null), ]
+POSTHOOK: Lineage: t1_mm PARTITION(ds=1).val SIMPLE [(t1_mm)t1_mm.FieldSchema(name:val, type:string, comment:null), ]
+PREHOOK: query: select * from T1_mm
+PREHOOK: type: QUERY
+PREHOOK: Input: default@t1_mm
+PREHOOK: Input: default@t1_mm@ds=1
+#### A masked pattern was here ####
+POSTHOOK: query: select * from T1_mm
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@t1_mm
+POSTHOOK: Input: default@t1_mm@ds=1
+#### A masked pattern was here ####
+1	11	1
+2	12	1
+3	13	1
+7	17	1
+8	18	1
+8	28	1
+PREHOOK: query: explain
+select count(distinct key) from T1_mm
+PREHOOK: type: QUERY
+POSTHOOK: query: explain
+select count(distinct key) from T1_mm
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-2 depends on stages: Stage-1
+  Stage-0 depends on stages: Stage-2
+
+STAGE PLANS:
+  Stage: Stage-1
+    Map Reduce
+      Map Operator Tree:
+          TableScan
+            alias: t1_mm
+            Statistics: Num rows: 6 Data size: 24 Basic stats: COMPLETE Column stats: NONE
+            Select Operator
+              expressions: key (type: string)
+              outputColumnNames: key
+              Statistics: Num rows: 6 Data size: 24 Basic stats: COMPLETE Column stats: NONE
+              Group By Operator
+                keys: key (type: string)
+                mode: hash
+                outputColumnNames: _col0
+                Statistics: Num rows: 6 Data size: 24 Basic stats: COMPLETE Column stats: NONE
+                Reduce Output Operator
+                  key expressions: _col0 (type: string)
+                  sort order: +
+                  Map-reduce partition columns: _col0 (type: string)
+                  Statistics: Num rows: 6 Data size: 24 Basic stats: COMPLETE Column stats: NONE
+      Execution mode: vectorized
+      Reduce Operator Tree:
+        Group By Operator
+          keys: KEY._col0 (type: string)
+          mode: partial2
+          outputColumnNames: _col0
+          Statistics: Num rows: 6 Data size: 24 Basic stats: COMPLETE Column stats: NONE
+          Group By Operator
+            aggregations: count(_col0)
+            mode: partial2
+            outputColumnNames: _col0
+            Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+            File Output Operator
+              compressed: false
+              table:
+                  input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                  output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                  serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
+
+  Stage: Stage-2
+    Map Reduce
+      Map Operator Tree:
+          TableScan
+            Reduce Output Operator
+              sort order: 
+              Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+              value expressions: _col0 (type: bigint)
+      Execution mode: vectorized
+      Reduce Operator Tree:
+        Group By Operator
+          aggregations: count(VALUE._col0)
+          mode: mergepartial
+          outputColumnNames: _col0
+          Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+          File Output Operator
+            compressed: false
+            Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+            table:
+                input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+
+PREHOOK: query: select count(distinct key) from T1_mm
+PREHOOK: type: QUERY
+PREHOOK: Input: default@t1_mm
+PREHOOK: Input: default@t1_mm@ds=1
+#### A masked pattern was here ####
+POSTHOOK: query: select count(distinct key) from T1_mm
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@t1_mm
+POSTHOOK: Input: default@t1_mm@ds=1
+#### A masked pattern was here ####
+5
+PREHOOK: query: DROP TABLE T1_mm
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@t1_mm
+PREHOOK: Output: default@t1_mm
+POSTHOOK: query: DROP TABLE T1_mm
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@t1_mm
+POSTHOOK: Output: default@t1_mm