You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ai...@apache.org on 2016/01/08 15:38:21 UTC
hive git commit: HIVE-12762: Common join on parquet tables returns
incorrect result when hive.optimize.index.filter set to true (reviewed by
Sergio Pena)
Repository: hive
Updated Branches:
refs/heads/master a28f6cd84 -> 3401efce6
HIVE-12762: Common join on parquet tables returns incorrect result when hive.optimize.index.filter set to true (reviewed by Sergio Pena)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/3401efce
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/3401efce
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/3401efce
Branch: refs/heads/master
Commit: 3401efce64be84f2605e51341bbb41dc1ba11899
Parents: a28f6cd
Author: Aihua Xu <ai...@apache.org>
Authored: Wed Dec 30 19:32:33 2015 -0500
Committer: Aihua Xu <ai...@apache.org>
Committed: Fri Jan 8 09:37:08 2016 -0500
----------------------------------------------------------------------
.../apache/hadoop/hive/ql/exec/Utilities.java | 30 +++++--
.../hive/ql/io/parquet/ProjectionPusher.java | 87 ++++++++++++++------
.../test/queries/clientpositive/parquet_join2.q | 14 ++++
.../results/clientpositive/parquet_join2.q.out | 62 ++++++++++++++
.../hadoop/hive/ql/io/sarg/ExpressionTree.java | 6 +-
.../hive/ql/io/sarg/SearchArgumentImpl.java | 31 +++++--
6 files changed, 189 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/3401efce/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index 9a7d990..2d317a0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -2398,12 +2398,11 @@ public final class Utilities {
return builder.toString();
}
- public static void setColumnNameList(JobConf jobConf, Operator op) {
- setColumnNameList(jobConf, op, false);
+ public static void setColumnNameList(JobConf jobConf, RowSchema rowSchema) {
+ setColumnNameList(jobConf, rowSchema, false);
}
- public static void setColumnNameList(JobConf jobConf, Operator op, boolean excludeVCs) {
- RowSchema rowSchema = op.getSchema();
+ public static void setColumnNameList(JobConf jobConf, RowSchema rowSchema, boolean excludeVCs) {
if (rowSchema == null) {
return;
}
@@ -2421,12 +2420,20 @@ public final class Utilities {
jobConf.set(serdeConstants.LIST_COLUMNS, columnNamesString);
}
- public static void setColumnTypeList(JobConf jobConf, Operator op) {
- setColumnTypeList(jobConf, op, false);
+ public static void setColumnNameList(JobConf jobConf, Operator op) {
+ setColumnNameList(jobConf, op, false);
}
- public static void setColumnTypeList(JobConf jobConf, Operator op, boolean excludeVCs) {
+ public static void setColumnNameList(JobConf jobConf, Operator op, boolean excludeVCs) {
RowSchema rowSchema = op.getSchema();
+ setColumnNameList(jobConf, rowSchema, excludeVCs);
+ }
+
+ public static void setColumnTypeList(JobConf jobConf, RowSchema rowSchema) {
+ setColumnTypeList(jobConf, rowSchema, false);
+ }
+
+ public static void setColumnTypeList(JobConf jobConf, RowSchema rowSchema, boolean excludeVCs) {
if (rowSchema == null) {
return;
}
@@ -2444,6 +2451,15 @@ public final class Utilities {
jobConf.set(serdeConstants.LIST_COLUMN_TYPES, columnTypesString);
}
+ public static void setColumnTypeList(JobConf jobConf, Operator op) {
+ setColumnTypeList(jobConf, op, false);
+ }
+
+ public static void setColumnTypeList(JobConf jobConf, Operator op, boolean excludeVCs) {
+ RowSchema rowSchema = op.getSchema();
+ setColumnTypeList(jobConf, rowSchema, excludeVCs);
+ }
+
public static String suffix = ".hashtable";
public static Path generatePath(Path basePath, String dumpFilePrefix,
http://git-wip-us.apache.org/repos/asf/hive/blob/3401efce/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java
index 017676b..db923fa 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java
@@ -16,11 +16,14 @@ package org.apache.hadoop.hive.ql.io.parquet;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
import org.slf4j.Logger;
@@ -28,12 +31,16 @@ import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.mapred.JobConf;
@@ -68,7 +75,8 @@ public class ProjectionPusher {
@Deprecated // Uses deprecated methods on ColumnProjectionUtils
private void pushProjectionsAndFilters(final JobConf jobConf,
- final String splitPath, final String splitPathWithNoSchema) {
+ final String splitPath,
+ final String splitPathWithNoSchema) {
if (mapWork == null) {
return;
@@ -76,53 +84,80 @@ public class ProjectionPusher {
return;
}
- final ArrayList<String> aliases = new ArrayList<String>();
- final Iterator<Entry<String, ArrayList<String>>> iterator = mapWork.getPathToAliases().entrySet().iterator();
+ final Set<String> aliases = new HashSet<String>();
+ final Iterator<Entry<String, ArrayList<String>>> iterator =
+ mapWork.getPathToAliases().entrySet().iterator();
while (iterator.hasNext()) {
final Entry<String, ArrayList<String>> entry = iterator.next();
final String key = new Path(entry.getKey()).toUri().getPath();
if (splitPath.equals(key) || splitPathWithNoSchema.equals(key)) {
- final ArrayList<String> list = entry.getValue();
- for (final String val : list) {
- aliases.add(val);
- }
+ aliases.addAll(entry.getValue());
}
}
- for (final String alias : aliases) {
- final Operator<? extends Serializable> op = mapWork.getAliasToWork().get(
- alias);
+ // Collect the needed columns from all the aliases and create ORed filter
+ // expression for the table.
+ boolean allColumnsNeeded = false;
+ boolean noFilters = false;
+ Set<Integer> neededColumnIDs = new HashSet<Integer>();
+ List<ExprNodeGenericFuncDesc> filterExprs = new ArrayList<ExprNodeGenericFuncDesc>();
+ RowSchema rowSchema = null;
+
+ for(String alias : aliases) {
+ final Operator<? extends Serializable> op =
+ mapWork.getAliasToWork().get(alias);
if (op != null && op instanceof TableScanOperator) {
- final TableScanOperator tableScan = (TableScanOperator) op;
-
- // push down projections
- final List<Integer> list = tableScan.getNeededColumnIDs();
+ final TableScanOperator ts = (TableScanOperator) op;
- if (list != null) {
- ColumnProjectionUtils.appendReadColumnIDs(jobConf, list);
+ if (ts.getNeededColumnIDs() == null) {
+ allColumnsNeeded = true;
} else {
- ColumnProjectionUtils.setFullyReadColumns(jobConf);
+ neededColumnIDs.addAll(ts.getNeededColumnIDs());
}
- pushFilters(jobConf, tableScan);
+ rowSchema = ts.getSchema();
+ ExprNodeGenericFuncDesc filterExpr =
+ ts.getConf() == null ? null : ts.getConf().getFilterExpr();
+ noFilters = filterExpr == null; // No filter if any TS has no filter expression
+ filterExprs.add(filterExpr);
}
}
- }
- private void pushFilters(final JobConf jobConf, final TableScanOperator tableScan) {
+ ExprNodeGenericFuncDesc tableFilterExpr = null;
+ if (!noFilters) {
+ try {
+ for (ExprNodeGenericFuncDesc filterExpr : filterExprs) {
+ if (tableFilterExpr == null ) {
+ tableFilterExpr = filterExpr;
+ } else {
+ tableFilterExpr = ExprNodeGenericFuncDesc.newInstance(new GenericUDFOPOr(),
+ Arrays.<ExprNodeDesc>asList(tableFilterExpr, filterExpr));
+ }
+ }
+ } catch(UDFArgumentException ex) {
+ LOG.debug("Turn off filtering due to " + ex);
+ tableFilterExpr = null;
+ }
+ }
- final TableScanDesc scanDesc = tableScan.getConf();
- if (scanDesc == null) {
- LOG.debug("Not pushing filters because TableScanDesc is null");
- return;
+ // push down projections
+ if (!allColumnsNeeded) {
+ if (!neededColumnIDs.isEmpty()) {
+ ColumnProjectionUtils.appendReadColumnIDs(jobConf, new ArrayList<Integer>(neededColumnIDs));
+ }
+ } else {
+ ColumnProjectionUtils.setFullyReadColumns(jobConf);
}
+ pushFilters(jobConf, rowSchema, tableFilterExpr);
+ }
+
+ private void pushFilters(final JobConf jobConf, RowSchema rowSchema, ExprNodeGenericFuncDesc filterExpr) {
// construct column name list for reference by filter push down
- Utilities.setColumnNameList(jobConf, tableScan);
+ Utilities.setColumnNameList(jobConf, rowSchema);
// push down filters
- final ExprNodeGenericFuncDesc filterExpr = scanDesc.getFilterExpr();
if (filterExpr == null) {
LOG.debug("Not pushing filters because FilterExpr is null");
return;
http://git-wip-us.apache.org/repos/asf/hive/blob/3401efce/ql/src/test/queries/clientpositive/parquet_join2.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/parquet_join2.q b/ql/src/test/queries/clientpositive/parquet_join2.q
new file mode 100644
index 0000000..9d107c7
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/parquet_join2.q
@@ -0,0 +1,14 @@
+set hive.optimize.index.filter = true;
+set hive.auto.convert.join=false;
+
+CREATE TABLE tbl1(id INT) STORED AS PARQUET;
+INSERT INTO tbl1 VALUES(1), (2);
+
+CREATE TABLE tbl2(id INT, value STRING) STORED AS PARQUET;
+INSERT INTO tbl2 VALUES(1, 'value1');
+INSERT INTO tbl2 VALUES(1, 'value2');
+
+select tbl1.id, t1.value, t2.value
+FROM tbl1
+JOIN (SELECT * FROM tbl2 WHERE value='value1') t1 ON tbl1.id=t1.id
+JOIN (SELECT * FROM tbl2 WHERE value='value2') t2 ON tbl1.id=t2.id;
http://git-wip-us.apache.org/repos/asf/hive/blob/3401efce/ql/src/test/results/clientpositive/parquet_join2.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/parquet_join2.q.out b/ql/src/test/results/clientpositive/parquet_join2.q.out
new file mode 100644
index 0000000..f25dcd8
--- /dev/null
+++ b/ql/src/test/results/clientpositive/parquet_join2.q.out
@@ -0,0 +1,62 @@
+PREHOOK: query: CREATE TABLE tbl1(id INT) STORED AS PARQUET
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@tbl1
+POSTHOOK: query: CREATE TABLE tbl1(id INT) STORED AS PARQUET
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@tbl1
+PREHOOK: query: INSERT INTO tbl1 VALUES(1), (2)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@values__tmp__table__1
+PREHOOK: Output: default@tbl1
+POSTHOOK: query: INSERT INTO tbl1 VALUES(1), (2)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@values__tmp__table__1
+POSTHOOK: Output: default@tbl1
+POSTHOOK: Lineage: tbl1.id EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col1, type:string, comment:), ]
+PREHOOK: query: CREATE TABLE tbl2(id INT, value STRING) STORED AS PARQUET
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@tbl2
+POSTHOOK: query: CREATE TABLE tbl2(id INT, value STRING) STORED AS PARQUET
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@tbl2
+PREHOOK: query: INSERT INTO tbl2 VALUES(1, 'value1')
+PREHOOK: type: QUERY
+PREHOOK: Input: default@values__tmp__table__2
+PREHOOK: Output: default@tbl2
+POSTHOOK: query: INSERT INTO tbl2 VALUES(1, 'value1')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@values__tmp__table__2
+POSTHOOK: Output: default@tbl2
+POSTHOOK: Lineage: tbl2.id EXPRESSION [(values__tmp__table__2)values__tmp__table__2.FieldSchema(name:tmp_values_col1, type:string, comment:), ]
+POSTHOOK: Lineage: tbl2.value SIMPLE [(values__tmp__table__2)values__tmp__table__2.FieldSchema(name:tmp_values_col2, type:string, comment:), ]
+PREHOOK: query: INSERT INTO tbl2 VALUES(1, 'value2')
+PREHOOK: type: QUERY
+PREHOOK: Input: default@values__tmp__table__3
+PREHOOK: Output: default@tbl2
+POSTHOOK: query: INSERT INTO tbl2 VALUES(1, 'value2')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@values__tmp__table__3
+POSTHOOK: Output: default@tbl2
+POSTHOOK: Lineage: tbl2.id EXPRESSION [(values__tmp__table__3)values__tmp__table__3.FieldSchema(name:tmp_values_col1, type:string, comment:), ]
+POSTHOOK: Lineage: tbl2.value SIMPLE [(values__tmp__table__3)values__tmp__table__3.FieldSchema(name:tmp_values_col2, type:string, comment:), ]
+PREHOOK: query: select tbl1.id, t1.value, t2.value
+FROM tbl1
+JOIN (SELECT * FROM tbl2 WHERE value='value1') t1 ON tbl1.id=t1.id
+JOIN (SELECT * FROM tbl2 WHERE value='value2') t2 ON tbl1.id=t2.id
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tbl1
+PREHOOK: Input: default@tbl2
+#### A masked pattern was here ####
+POSTHOOK: query: select tbl1.id, t1.value, t2.value
+FROM tbl1
+JOIN (SELECT * FROM tbl2 WHERE value='value1') t1 ON tbl1.id=t1.id
+JOIN (SELECT * FROM tbl2 WHERE value='value2') t2 ON tbl1.id=t2.id
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tbl1
+POSTHOOK: Input: default@tbl2
+#### A masked pattern was here ####
+1 value1 value2
http://git-wip-us.apache.org/repos/asf/hive/blob/3401efce/storage-api/src/java/org/apache/hadoop/hive/ql/io/sarg/ExpressionTree.java
----------------------------------------------------------------------
diff --git a/storage-api/src/java/org/apache/hadoop/hive/ql/io/sarg/ExpressionTree.java b/storage-api/src/java/org/apache/hadoop/hive/ql/io/sarg/ExpressionTree.java
index 577d95d..443083d 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/ql/io/sarg/ExpressionTree.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/ql/io/sarg/ExpressionTree.java
@@ -31,7 +31,7 @@ public class ExpressionTree {
public enum Operator {OR, AND, NOT, LEAF, CONSTANT}
private final Operator operator;
private final List<ExpressionTree> children;
- private final int leaf;
+ private int leaf;
private final SearchArgument.TruthValue constant;
ExpressionTree() {
@@ -153,4 +153,8 @@ public class ExpressionTree {
public int getLeaf() {
return leaf;
}
+
+ public void setLeaf(int leaf) {
+ this.leaf = leaf;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/3401efce/storage-api/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java
----------------------------------------------------------------------
diff --git a/storage-api/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java b/storage-api/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java
index eeff131..be5e67b 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java
@@ -24,8 +24,12 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Deque;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
/**
* The implementation of SearchArguments.
@@ -429,15 +433,28 @@ final class SearchArgumentImpl implements SearchArgument {
* @return the fixed root
*/
static ExpressionTree rewriteLeaves(ExpressionTree root,
- int[] leafReorder) {
- if (root.getOperator() == ExpressionTree.Operator.LEAF) {
- return new ExpressionTree(leafReorder[root.getLeaf()]);
- } else if (root.getChildren() != null){
- List<ExpressionTree> children = root.getChildren();
- for(int i=0; i < children.size(); ++i) {
- children.set(i, rewriteLeaves(children.get(i), leafReorder));
+ int[] leafReorder) {
+ // The leaves could be shared in the tree. Use Set to remove the duplicates.
+ Set<ExpressionTree> leaves = new HashSet<ExpressionTree>();
+ Queue<ExpressionTree> nodes = new LinkedList<ExpressionTree>();
+ nodes.add(root);
+
+ while(!nodes.isEmpty()) {
+ ExpressionTree node = nodes.remove();
+ if (node.getOperator() == ExpressionTree.Operator.LEAF) {
+ leaves.add(node);
+ } else {
+ if (node.getChildren() != null){
+ nodes.addAll(node.getChildren());
+ }
}
}
+
+ // Update the leaf in place
+ for(ExpressionTree leaf : leaves) {
+ leaf.setLeaf(leafReorder[leaf.getLeaf()]);
+ }
+
return root;
}