You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jv...@apache.org on 2011/09/13 04:20:53 UTC
svn commit: r1170007 [1/3] - in /hive/trunk:
common/src/java/org/apache/hadoop/hive/conf/ data/files/
ql/src/java/org/apache/hadoop/hive/ql/index/
ql/src/java/org/apache/hadoop/hive/ql/index/bitmap/
ql/src/java/org/apache/hadoop/hive/ql/index/compact/ ...
Author: jvs
Date: Tue Sep 13 02:20:52 2011
New Revision: 1170007
URL: http://svn.apache.org/viewvc?rev=1170007&view=rev
Log:
HIVE-1694. Accelerate GROUP BY execution using indexes
(Prajakta Kalmegh via jvs)
Added:
hive/trunk/data/files/lineitem.txt
hive/trunk/data/files/tbl.txt
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/AggregateIndexHandler.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/IndexUtils.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteCanApplyCtx.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteCanApplyProcFactory.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteGBUsingIndex.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteParseContextGenerator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndex.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndexCtx.java
hive/trunk/ql/src/test/queries/clientpositive/ql_rewrite_gbtoidx.q
hive/trunk/ql/src/test/results/clientpositive/ql_rewrite_gbtoidx.q.out
Modified:
hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndex.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/bitmap/BitmapIndexHandler.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/index/IndexWhereProcessor.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/index/IndexWhereTaskDispatcher.java
Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1170007&r1=1170006&r2=1170007&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Tue Sep 13 02:20:52 2011
@@ -431,6 +431,10 @@ public class HiveConf extends Configurat
// For har files
HIVEARCHIVEENABLED("hive.archive.enabled", false),
HIVEHARPARENTDIRSETTABLE("hive.archive.har.parentdir.settable", false),
+
+ //Enable/Disable gbToIdx rewrite rule
+ HIVEOPTGBYUSINGINDEX("hive.optimize.index.groupby", false),
+
HIVEOUTERJOINSUPPORTSFILTERS("hive.outerjoin.supports.filters", true),
// Serde for FetchTask
Added: hive/trunk/data/files/lineitem.txt
URL: http://svn.apache.org/viewvc/hive/trunk/data/files/lineitem.txt?rev=1170007&view=auto
==============================================================================
--- hive/trunk/data/files/lineitem.txt (added)
+++ hive/trunk/data/files/lineitem.txt Tue Sep 13 02:20:52 2011
@@ -0,0 +1,100 @@
+1|155190|7706|1|17|21168.23|0.04|0.02|N|O|1996-03-13|1996-02-12|1996-03-22|DELIVER IN PERSON|TRUCK|egular courts above the|
+1|67310|7311|2|36|45983.16|0.09|0.06|N|O|1996-04-12|1996-02-28|1996-04-20|TAKE BACK RETURN|MAIL|ly final dependencies: slyly bold |
+1|63700|3701|3|8|13309.60|0.10|0.02|N|O|1996-01-29|1996-03-05|1996-01-31|TAKE BACK RETURN|REG AIR|riously. regular, express dep|
+1|2132|4633|4|28|28955.64|0.09|0.06|N|O|1996-04-21|1996-03-30|1996-05-16|NONE|AIR|lites. fluffily even de|
+1|24027|1534|5|24|22824.48|0.10|0.04|N|O|1996-03-30|1996-03-14|1996-04-01|NONE|FOB| pending foxes. slyly re|
+1|15635|638|6|32|49620.16|0.07|0.02|N|O|1996-01-30|1996-02-07|1996-02-03|DELIVER IN PERSON|MAIL|arefully slyly ex|
+2|106170|1191|1|38|44694.46|0.00|0.05|N|O|1997-01-28|1997-01-14|1997-02-02|TAKE BACK RETURN|RAIL|ven requests. deposits breach a|
+3|4297|1798|1|45|54058.05|0.06|0.00|R|F|1994-02-02|1994-01-04|1994-02-23|NONE|AIR|ongside of the furiously brave acco|
+3|19036|6540|2|49|46796.47|0.10|0.00|R|F|1993-11-09|1993-12-20|1993-11-24|TAKE BACK RETURN|RAIL| unusual accounts. eve|
+3|128449|3474|3|27|39890.88|0.06|0.07|A|F|1994-01-16|1993-11-22|1994-01-23|DELIVER IN PERSON|SHIP|nal foxes wake. |
+3|29380|1883|4|2|2618.76|0.01|0.06|A|F|1993-12-04|1994-01-07|1994-01-01|NONE|TRUCK|y. fluffily pending d|
+3|183095|650|5|28|32986.52|0.04|0.00|R|F|1993-12-14|1994-01-10|1994-01-01|TAKE BACK RETURN|FOB|ages nag slyly pending|
+3|62143|9662|6|26|28733.64|0.10|0.02|A|F|1993-10-29|1993-12-18|1993-11-04|TAKE BACK RETURN|RAIL|ges sleep after the caref|
+4|88035|5560|1|30|30690.90|0.03|0.08|N|O|1996-01-10|1995-12-14|1996-01-18|DELIVER IN PERSON|REG AIR|- quickly regular packages sleep. idly|
+5|108570|8571|1|15|23678.55|0.02|0.04|R|F|1994-10-31|1994-08-31|1994-11-20|NONE|AIR|ts wake furiously |
+5|123927|3928|2|26|50723.92|0.07|0.08|R|F|1994-10-16|1994-09-25|1994-10-19|NONE|FOB|sts use slyly quickly special instruc|
+5|37531|35|3|50|73426.50|0.08|0.03|A|F|1994-08-08|1994-10-13|1994-08-26|DELIVER IN PERSON|AIR|eodolites. fluffily unusual|
+6|139636|2150|1|37|61998.31|0.08|0.03|A|F|1992-04-27|1992-05-15|1992-05-02|TAKE BACK RETURN|TRUCK|p furiously special foxes|
+7|182052|9607|1|12|13608.60|0.07|0.03|N|O|1996-05-07|1996-03-13|1996-06-03|TAKE BACK RETURN|FOB|ss pinto beans wake against th|
+7|145243|7758|2|9|11594.16|0.08|0.08|N|O|1996-02-01|1996-03-02|1996-02-19|TAKE BACK RETURN|SHIP|es. instructions|
+7|94780|9799|3|46|81639.88|0.10|0.07|N|O|1996-01-15|1996-03-27|1996-02-03|COLLECT COD|MAIL| unusual reques|
+7|163073|3074|4|28|31809.96|0.03|0.04|N|O|1996-03-21|1996-04-08|1996-04-20|NONE|FOB|. slyly special requests haggl|
+7|151894|9440|5|38|73943.82|0.08|0.01|N|O|1996-02-11|1996-02-24|1996-02-18|DELIVER IN PERSON|TRUCK|ns haggle carefully ironic deposits. bl|
+7|79251|1759|6|35|43058.75|0.06|0.03|N|O|1996-01-16|1996-02-23|1996-01-22|TAKE BACK RETURN|FOB|jole. excuses wake carefully alongside of |
+7|157238|2269|7|5|6476.15|0.04|0.02|N|O|1996-02-10|1996-03-26|1996-02-13|NONE|FOB|ithely regula|
+32|82704|7721|1|28|47227.60|0.05|0.08|N|O|1995-10-23|1995-08-27|1995-10-26|TAKE BACK RETURN|TRUCK|sleep quickly. req|
+32|197921|441|2|32|64605.44|0.02|0.00|N|O|1995-08-14|1995-10-07|1995-08-27|COLLECT COD|AIR|lithely regular deposits. fluffily |
+32|44161|6666|3|2|2210.32|0.09|0.02|N|O|1995-08-07|1995-10-07|1995-08-23|DELIVER IN PERSON|AIR| express accounts wake according to the|
+32|2743|7744|4|4|6582.96|0.09|0.03|N|O|1995-08-04|1995-10-01|1995-09-03|NONE|REG AIR|e slyly final pac|
+32|85811|8320|5|44|79059.64|0.05|0.06|N|O|1995-08-28|1995-08-20|1995-09-14|DELIVER IN PERSON|AIR|symptotes nag according to the ironic depo|
+32|11615|4117|6|6|9159.66|0.04|0.03|N|O|1995-07-21|1995-09-23|1995-07-25|COLLECT COD|RAIL| gifts cajole carefully.|
+33|61336|8855|1|31|40217.23|0.09|0.04|A|F|1993-10-29|1993-12-19|1993-11-08|COLLECT COD|TRUCK|ng to the furiously ironic package|
+33|60519|5532|2|32|47344.32|0.02|0.05|A|F|1993-12-09|1994-01-04|1993-12-28|COLLECT COD|MAIL|gular theodolites|
+33|137469|9983|3|5|7532.30|0.05|0.03|A|F|1993-12-09|1993-12-25|1993-12-23|TAKE BACK RETURN|AIR|. stealthily bold exc|
+33|33918|3919|4|41|75928.31|0.09|0.00|R|F|1993-11-09|1994-01-24|1993-11-11|TAKE BACK RETURN|MAIL|unusual packages doubt caref|
+34|88362|871|1|13|17554.68|0.00|0.07|N|O|1998-10-23|1998-09-14|1998-11-06|NONE|REG AIR|nic accounts. deposits are alon|
+34|89414|1923|2|22|30875.02|0.08|0.06|N|O|1998-10-09|1998-10-16|1998-10-12|NONE|FOB|thely slyly p|
+34|169544|4577|3|6|9681.24|0.02|0.06|N|O|1998-10-30|1998-09-20|1998-11-05|NONE|FOB|ar foxes sleep |
+35|450|2951|1|24|32410.80|0.02|0.00|N|O|1996-02-21|1996-01-03|1996-03-18|TAKE BACK RETURN|FOB|, regular tithe|
+35|161940|4457|2|34|68065.96|0.06|0.08|N|O|1996-01-22|1996-01-06|1996-01-27|DELIVER IN PERSON|RAIL|s are carefully against the f|
+35|120896|8433|3|7|13418.23|0.06|0.04|N|O|1996-01-19|1995-12-22|1996-01-29|NONE|MAIL| the carefully regular |
+35|85175|7684|4|25|29004.25|0.06|0.05|N|O|1995-11-26|1995-12-25|1995-12-21|DELIVER IN PERSON|SHIP| quickly unti|
+35|119917|4940|5|34|65854.94|0.08|0.06|N|O|1995-11-08|1996-01-15|1995-11-26|COLLECT COD|MAIL|. silent, unusual deposits boost|
+35|30762|3266|6|28|47397.28|0.03|0.02|N|O|1996-02-01|1995-12-24|1996-02-28|COLLECT COD|RAIL|ly alongside of |
+36|119767|9768|1|42|75043.92|0.09|0.00|N|O|1996-02-03|1996-01-21|1996-02-23|COLLECT COD|SHIP| careful courts. special |
+37|22630|5133|1|40|62105.20|0.09|0.03|A|F|1992-07-21|1992-08-01|1992-08-15|NONE|REG AIR|luffily regular requests. slyly final acco|
+37|126782|1807|2|39|70542.42|0.05|0.02|A|F|1992-07-02|1992-08-18|1992-07-28|TAKE BACK RETURN|RAIL|the final requests. ca|
+37|12903|5405|3|43|78083.70|0.05|0.08|A|F|1992-07-10|1992-07-06|1992-08-02|DELIVER IN PERSON|TRUCK|iously ste|
+38|175839|874|1|44|84252.52|0.04|0.02|N|O|1996-09-29|1996-11-17|1996-09-30|COLLECT COD|MAIL|s. blithely unusual theodolites am|
+39|2320|9821|1|44|53782.08|0.09|0.06|N|O|1996-11-14|1996-12-15|1996-12-12|COLLECT COD|RAIL|eodolites. careful|
+39|186582|4137|2|26|43383.08|0.08|0.04|N|O|1996-11-04|1996-10-20|1996-11-20|NONE|FOB|ckages across the slyly silent|
+39|67831|5350|3|46|82746.18|0.06|0.08|N|O|1996-09-26|1996-12-19|1996-10-26|DELIVER IN PERSON|AIR|he carefully e|
+39|20590|3093|4|32|48338.88|0.07|0.05|N|O|1996-10-02|1996-12-19|1996-10-14|COLLECT COD|MAIL|heodolites sleep silently pending foxes. ac|
+39|54519|9530|5|43|63360.93|0.01|0.01|N|O|1996-10-17|1996-11-14|1996-10-26|COLLECT COD|MAIL|yly regular i|
+39|94368|6878|6|40|54494.40|0.06|0.05|N|O|1996-12-08|1996-10-22|1997-01-01|COLLECT COD|AIR|quickly ironic fox|
+64|85951|5952|1|21|40675.95|0.05|0.02|R|F|1994-09-30|1994-09-18|1994-10-26|DELIVER IN PERSON|REG AIR|ch slyly final, thin platelets.|
+65|59694|4705|1|26|42995.94|0.03|0.03|A|F|1995-04-20|1995-04-25|1995-05-13|NONE|TRUCK|pending deposits nag even packages. ca|
+65|73815|8830|2|22|39353.82|0.00|0.05|N|O|1995-07-17|1995-06-04|1995-07-19|COLLECT COD|FOB| ideas. special, r|
+65|1388|3889|3|21|27076.98|0.09|0.07|N|O|1995-07-06|1995-05-14|1995-07-31|DELIVER IN PERSON|RAIL|bove the even packages. accounts nag carefu|
+66|115118|7630|1|31|35126.41|0.00|0.08|R|F|1994-02-19|1994-03-11|1994-02-20|TAKE BACK RETURN|RAIL|ut the unusual accounts sleep at the bo|
+66|173489|3490|2|41|64061.68|0.04|0.07|A|F|1994-02-21|1994-03-01|1994-03-18|COLLECT COD|AIR| regular de|
+67|21636|9143|1|4|6230.52|0.09|0.04|N|O|1997-04-17|1997-01-31|1997-04-20|NONE|SHIP| cajole thinly expres|
+67|20193|5198|2|12|13358.28|0.09|0.05|N|O|1997-01-27|1997-02-21|1997-02-22|NONE|REG AIR| even packages cajole|
+67|173600|6118|3|5|8368.00|0.03|0.07|N|O|1997-02-20|1997-02-12|1997-02-21|DELIVER IN PERSON|TRUCK|y unusual packages thrash pinto |
+67|87514|7515|4|44|66066.44|0.08|0.06|N|O|1997-03-18|1997-01-29|1997-04-13|DELIVER IN PERSON|RAIL|se quickly above the even, express reques|
+67|40613|8126|5|23|35733.03|0.05|0.07|N|O|1997-04-19|1997-02-14|1997-05-06|DELIVER IN PERSON|REG AIR|ly regular deposit|
+67|178306|824|6|29|40144.70|0.02|0.05|N|O|1997-01-25|1997-01-27|1997-01-27|DELIVER IN PERSON|FOB|ultipliers |
+68|7068|9569|1|3|2925.18|0.05|0.02|N|O|1998-07-04|1998-06-05|1998-07-21|NONE|RAIL|fully special instructions cajole. furious|
+68|175180|2732|2|46|57738.28|0.02|0.05|N|O|1998-06-26|1998-06-07|1998-07-05|NONE|MAIL| requests are unusual, regular pinto |
+68|34980|7484|3|46|88089.08|0.04|0.05|N|O|1998-08-13|1998-07-08|1998-08-29|NONE|RAIL|egular dependencies affix ironically along |
+68|94728|2256|4|20|34454.40|0.07|0.01|N|O|1998-06-27|1998-05-23|1998-07-02|NONE|REG AIR| excuses integrate fluffily |
+68|82758|5267|5|27|47000.25|0.03|0.06|N|O|1998-06-19|1998-06-25|1998-06-29|DELIVER IN PERSON|SHIP|ccounts. deposits use. furiously|
+68|102561|5072|6|30|46906.80|0.05|0.06|N|O|1998-08-11|1998-07-11|1998-08-14|NONE|RAIL|oxes are slyly blithely fin|
+68|139247|1761|7|41|52735.84|0.09|0.08|N|O|1998-06-24|1998-06-27|1998-07-06|NONE|SHIP|eposits nag special ideas. furiousl|
+69|115209|7721|1|48|58761.60|0.01|0.07|A|F|1994-08-17|1994-08-11|1994-09-08|NONE|TRUCK|regular epitaphs. carefully even ideas hag|
+69|104180|9201|2|32|37893.76|0.08|0.06|A|F|1994-08-24|1994-08-17|1994-08-31|NONE|REG AIR|s sleep carefully bold, |
+69|137267|4807|3|17|22172.42|0.09|0.00|A|F|1994-07-02|1994-07-07|1994-07-03|TAKE BACK RETURN|AIR|final, pending instr|
+69|37502|2509|4|3|4318.50|0.09|0.04|R|F|1994-06-06|1994-07-27|1994-06-15|NONE|MAIL| blithely final d|
+69|92070|7089|5|42|44606.94|0.07|0.04|R|F|1994-07-31|1994-07-26|1994-08-28|DELIVER IN PERSON|REG AIR|tect regular, speci|
+69|18504|1006|6|23|32717.50|0.05|0.00|A|F|1994-10-03|1994-08-06|1994-10-24|NONE|SHIP|nding accounts ca|
+70|64128|9141|1|8|8736.96|0.03|0.08|R|F|1994-01-12|1994-02-27|1994-01-14|TAKE BACK RETURN|FOB|ggle. carefully pending dependenc|
+70|196156|1195|2|13|16277.95|0.06|0.06|A|F|1994-03-03|1994-02-13|1994-03-26|COLLECT COD|AIR|lyly special packag|
+70|179809|7361|3|1|1888.80|0.03|0.05|R|F|1994-01-26|1994-03-05|1994-01-28|TAKE BACK RETURN|RAIL|quickly. fluffily unusual theodolites c|
+70|45734|743|4|11|18477.03|0.01|0.05|A|F|1994-03-17|1994-03-17|1994-03-27|NONE|MAIL|alongside of the deposits. fur|
+70|37131|2138|5|37|39520.81|0.09|0.04|R|F|1994-02-13|1994-03-16|1994-02-21|COLLECT COD|MAIL|n accounts are. q|
+70|55655|3171|6|19|30602.35|0.06|0.03|A|F|1994-01-26|1994-02-17|1994-02-06|TAKE BACK RETURN|SHIP| packages wake pending accounts.|
+71|61931|1932|1|25|47323.25|0.09|0.07|N|O|1998-04-10|1998-04-22|1998-04-11|COLLECT COD|FOB|ckly. slyly|
+71|65916|3435|2|3|5645.73|0.09|0.07|N|O|1998-05-23|1998-04-03|1998-06-02|COLLECT COD|SHIP|y. pinto beans haggle after the|
+71|34432|1942|3|45|61489.35|0.00|0.07|N|O|1998-02-23|1998-03-20|1998-03-24|DELIVER IN PERSON|SHIP| ironic packages believe blithely a|
+71|96645|9155|4|33|54174.12|0.00|0.01|N|O|1998-04-12|1998-03-20|1998-04-15|NONE|FOB| serve quickly fluffily bold deposi|
+71|103255|5766|5|39|49071.75|0.08|0.06|N|O|1998-01-29|1998-04-07|1998-02-18|DELIVER IN PERSON|RAIL|l accounts sleep across the pack|
+71|195635|674|6|34|58841.42|0.04|0.01|N|O|1998-03-05|1998-04-22|1998-03-30|DELIVER IN PERSON|TRUCK|s cajole. |
+96|123076|613|1|23|25278.61|0.10|0.06|A|F|1994-07-19|1994-06-29|1994-07-25|DELIVER IN PERSON|TRUCK|ep-- carefully reg|
+96|135390|5391|2|30|42761.70|0.01|0.06|R|F|1994-06-03|1994-05-29|1994-06-22|DELIVER IN PERSON|TRUCK|e quickly even ideas. furiou|
+97|119477|1989|1|13|19454.11|0.00|0.02|R|F|1993-04-01|1993-04-04|1993-04-08|NONE|TRUCK|ayers cajole against the furiously|
+97|49568|2073|2|37|56149.72|0.02|0.06|A|F|1993-04-13|1993-03-30|1993-04-14|DELIVER IN PERSON|SHIP|ic requests boost carefully quic|
+97|77699|5221|3|19|31857.11|0.06|0.08|R|F|1993-05-14|1993-03-05|1993-05-25|TAKE BACK RETURN|RAIL|gifts. furiously ironic packages cajole. |
+98|40216|217|1|28|32373.88|0.06|0.07|A|F|1994-12-24|1994-10-25|1995-01-16|COLLECT COD|REG AIR| pending, regular accounts s|
+98|109743|7274|2|1|1752.74|0.00|0.00|A|F|1994-12-01|1994-12-12|1994-12-15|DELIVER IN PERSON|TRUCK|. unusual instructions against|
+98|44706|4707|3|14|23109.80|0.05|0.02|A|F|1994-12-30|1994-11-22|1995-01-27|COLLECT COD|AIR| cajole furiously. blithely ironic ideas |
Added: hive/trunk/data/files/tbl.txt
URL: http://svn.apache.org/viewvc/hive/trunk/data/files/tbl.txt?rev=1170007&view=auto
==============================================================================
--- hive/trunk/data/files/tbl.txt (added)
+++ hive/trunk/data/files/tbl.txt Tue Sep 13 02:20:52 2011
@@ -0,0 +1,10 @@
+1|1997
+2|1997
+2|1994
+2|1998
+3|1997
+3|1998
+4|1996
+4|1997
+6|1997
+7|1997
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/AggregateIndexHandler.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/AggregateIndexHandler.java?rev=1170007&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/AggregateIndexHandler.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/AggregateIndexHandler.java Tue Sep 13 02:20:52 2011
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.index;
+
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Index;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.hooks.ReadEntity;
+import org.apache.hadoop.hive.ql.hooks.WriteEntity;
+import org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.HiveUtils;
+import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
+import org.apache.hadoop.hive.ql.optimizer.IndexUtils;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+
+
+/**
+ * Index handler for indexes that have aggregate functions on indexed columns.
+ *
+ */
+public class AggregateIndexHandler extends CompactIndexHandler {
+
+ private static Index index = null;
+
+ @Override
+ public void analyzeIndexDefinition(Table baseTable, Index idx,
+ Table indexTable) throws HiveException {
+ index = idx;
+ StorageDescriptor storageDesc = index.getSd();
+ if (this.usesIndexTable() && indexTable != null) {
+ StorageDescriptor indexTableSd = storageDesc.deepCopy();
+ List<FieldSchema> indexTblCols = indexTableSd.getCols();
+ FieldSchema bucketFileName = new FieldSchema("_bucketname", "string", "");
+ indexTblCols.add(bucketFileName);
+ FieldSchema offSets = new FieldSchema("_offsets", "array<bigint>", "");
+ indexTblCols.add(offSets);
+ Map<String, String> paraList = index.getParameters();
+
+ if(paraList != null && paraList.containsKey("AGGREGATES")){
+ String propValue = paraList.get("AGGREGATES");
+ if(propValue.contains(",")){
+ String[] aggFuncs = propValue.split(",");
+ for (int i = 0; i < aggFuncs.length; i++) {
+ createAggregationFunction(indexTblCols, aggFuncs[i]);
+ }
+ }else{
+ createAggregationFunction(indexTblCols, propValue);
+ }
+ }
+ indexTable.setSd(indexTableSd);
+ }
+ }
+
+ private void createAggregationFunction(List<FieldSchema> indexTblCols, String property){
+ String[] aggFuncCol = property.split("\\(");
+ String funcName = aggFuncCol[0];
+ String colName = aggFuncCol[1].substring(0, aggFuncCol[1].length() - 1);
+ if(colName.contains("*")){
+ colName = colName.replace("*", "ALL");
+ }
+ FieldSchema aggregationFunction =
+ new FieldSchema("_" + funcName + "_Of_" + colName + "", "bigint", "");
+ indexTblCols.add(aggregationFunction);
+ }
+
+ @Override
+ protected Task<?> getIndexBuilderMapRedTask(Set<ReadEntity> inputs,
+ Set<WriteEntity> outputs,
+ List<FieldSchema> indexField, boolean partitioned,
+ PartitionDesc indexTblPartDesc, String indexTableName,
+ PartitionDesc baseTablePartDesc, String baseTableName, String dbName) {
+
+ String indexCols = HiveUtils.getUnparsedColumnNamesFromFieldSchema(indexField);
+
+ //form a new insert overwrite query.
+ StringBuilder command= new StringBuilder();
+ Map<String, String> partSpec = indexTblPartDesc.getPartSpec();
+
+ command.append("INSERT OVERWRITE TABLE " + HiveUtils.unparseIdentifier(indexTableName));
+ if (partitioned && indexTblPartDesc != null) {
+ command.append(" PARTITION ( ");
+ List<String> ret = getPartKVPairStringArray((LinkedHashMap<String, String>) partSpec);
+ for (int i = 0; i < ret.size(); i++) {
+ String partKV = ret.get(i);
+ command.append(partKV);
+ if (i < ret.size() - 1) {
+ command.append(",");
+ }
+ }
+ command.append(" ) ");
+ }
+
+ command.append(" SELECT ");
+ command.append(indexCols);
+ command.append(",");
+
+ command.append(VirtualColumn.FILENAME.getName());
+ command.append(",");
+ command.append(" collect_set (");
+ command.append(VirtualColumn.BLOCKOFFSET.getName());
+ command.append(") ");
+ command.append(",");
+
+ assert indexField.size()==1;
+
+ Map<String, String> paraList = index.getParameters();
+ if(paraList != null && paraList.containsKey("AGGREGATES")){
+ command.append(paraList.get("AGGREGATES") + " ");
+ }
+
+ command.append(" FROM " + HiveUtils.unparseIdentifier(baseTableName));
+ Map<String, String> basePartSpec = baseTablePartDesc.getPartSpec();
+ if(basePartSpec != null) {
+ command.append(" WHERE ");
+ List<String> pkv = getPartKVPairStringArray((LinkedHashMap<String, String>) basePartSpec);
+ for (int i = 0; i < pkv.size(); i++) {
+ String partKV = pkv.get(i);
+ command.append(partKV);
+ if (i < pkv.size() - 1) {
+ command.append(" AND ");
+ }
+ }
+ }
+ command.append(" GROUP BY ");
+ command.append(indexCols + ", " + VirtualColumn.FILENAME.getName());
+
+ HiveConf builderConf = new HiveConf(getConf(), AggregateIndexHandler.class);
+ Task<?> rootTask = IndexUtils.createRootTask(builderConf, inputs, outputs,
+ command, (LinkedHashMap<String, String>) partSpec, indexTableName, dbName);
+
+ return rootTask;
+ }
+ }
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndex.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndex.java?rev=1170007&r1=1170006&r2=1170007&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndex.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndex.java Tue Sep 13 02:20:52 2011
@@ -30,6 +30,7 @@ public class HiveIndex {
public static String INDEX_TABLE_CREATETIME = "hive.index.basetbl.dfs.lastModifiedTime";
public static enum IndexType {
+ AGGREGATE_TABLE("aggregate", "org.apache.hadoop.hive.ql.AggregateIndexHandler"),
COMPACT_SUMMARY_TABLE("compact", "org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler"),
BITMAP_TABLE("bitmap",
"org.apache.hadoop.hive.ql.index.bitmap.BitmapIndexHandler");
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/bitmap/BitmapIndexHandler.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/bitmap/BitmapIndexHandler.java?rev=1170007&r1=1170006&r2=1170007&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/bitmap/BitmapIndexHandler.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/bitmap/BitmapIndexHandler.java Tue Sep 13 02:20:52 2011
@@ -39,8 +39,6 @@ import org.apache.hadoop.hive.ql.hooks.R
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.index.HiveIndexQueryContext;
import org.apache.hadoop.hive.ql.index.HiveIndexedInputFormat;
-import org.apache.hadoop.hive.ql.index.IndexMetadataChangeTask;
-import org.apache.hadoop.hive.ql.index.IndexMetadataChangeWork;
import org.apache.hadoop.hive.ql.index.IndexPredicateAnalyzer;
import org.apache.hadoop.hive.ql.index.IndexSearchCondition;
import org.apache.hadoop.hive.ql.index.TableBasedIndexHandler;
@@ -48,6 +46,7 @@ import org.apache.hadoop.hive.ql.metadat
import org.apache.hadoop.hive.ql.metadata.HiveUtils;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
+import org.apache.hadoop.hive.ql.optimizer.IndexUtils;
import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
@@ -224,10 +223,8 @@ public class BitmapIndexHandler extends
PartitionDesc indexTblPartDesc, String indexTableName,
PartitionDesc baseTablePartDesc, String baseTableName, String dbName) throws HiveException {
- HiveConf conf = new HiveConf(getConf(), BitmapIndexHandler.class);
- HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVEROWOFFSET, true);
- // Don't try to index optimize the query to build the index
- HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVEOPTINDEXFILTER, false);
+ HiveConf builderConf = new HiveConf(getConf(), BitmapIndexHandler.class);
+ HiveConf.setBoolVar(builderConf, HiveConf.ConfVars.HIVEROWOFFSET, true);
String indexCols = HiveUtils.getUnparsedColumnNamesFromFieldSchema(indexField);
@@ -283,21 +280,12 @@ public class BitmapIndexHandler extends
// Require clusterby ROWOFFSET if map-size aggregation is off.
// TODO: Make this work without map side aggregation
- if (!conf.get("hive.map.aggr", null).equals("true")) {
+ if (!builderConf.get("hive.map.aggr", null).equals("true")) {
throw new HiveException("Cannot construct index without map-side aggregation");
}
- Driver driver = new Driver(conf);
- driver.compile(command.toString());
-
- Task<?> rootTask = driver.getPlan().getRootTasks().get(0);
- inputs.addAll(driver.getPlan().getInputs());
- outputs.addAll(driver.getPlan().getOutputs());
-
- IndexMetadataChangeWork indexMetaChange = new IndexMetadataChangeWork(partSpec, indexTableName, dbName);
- IndexMetadataChangeTask indexMetaChangeTsk = new IndexMetadataChangeTask();
- indexMetaChangeTsk.setWork(indexMetaChange);
- rootTask.addDependentTask(indexMetaChangeTsk);
+ Task<?> rootTask = IndexUtils.createRootTask(builderConf, inputs, outputs,
+ command, (LinkedHashMap<String, String>) partSpec, indexTableName, dbName);
return rootTask;
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java?rev=1170007&r1=1170006&r2=1170007&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java Tue Sep 13 02:20:52 2011
@@ -36,8 +36,6 @@ import org.apache.hadoop.hive.ql.exec.Ta
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.index.HiveIndexQueryContext;
-import org.apache.hadoop.hive.ql.index.IndexMetadataChangeTask;
-import org.apache.hadoop.hive.ql.index.IndexMetadataChangeWork;
import org.apache.hadoop.hive.ql.index.IndexPredicateAnalyzer;
import org.apache.hadoop.hive.ql.index.IndexSearchCondition;
import org.apache.hadoop.hive.ql.index.TableBasedIndexHandler;
@@ -46,9 +44,9 @@ import org.apache.hadoop.hive.ql.metadat
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler.DecomposedPredicate;
+import org.apache.hadoop.hive.ql.optimizer.IndexUtils;
import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
-import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan;
@@ -129,20 +127,8 @@ public class CompactIndexHandler extends
command.append(indexCols + ", " + VirtualColumn.FILENAME.getName());
HiveConf builderConf = new HiveConf(getConf(), CompactIndexHandler.class);
- // Don't try to index optimize the query to build the index
- HiveConf.setBoolVar(builderConf, HiveConf.ConfVars.HIVEOPTINDEXFILTER, false);
- Driver driver = new Driver(builderConf);
- driver.compile(command.toString());
-
- Task<?> rootTask = driver.getPlan().getRootTasks().get(0);
- inputs.addAll(driver.getPlan().getInputs());
- outputs.addAll(driver.getPlan().getOutputs());
-
- IndexMetadataChangeWork indexMetaChange = new IndexMetadataChangeWork(partSpec, indexTableName, dbName);
- IndexMetadataChangeTask indexMetaChangeTsk = new IndexMetadataChangeTask();
- indexMetaChangeTsk.setWork(indexMetaChange);
- rootTask.addDependentTask(indexMetaChangeTsk);
-
+ Task<?> rootTask = IndexUtils.createRootTask(builderConf, inputs, outputs,
+ command, partSpec, indexTableName, dbName);
return rootTask;
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java?rev=1170007&r1=1170006&r2=1170007&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java Tue Sep 13 02:20:52 2011
@@ -657,13 +657,14 @@ public class Hive {
Index indexDesc = new Index(indexName, indexHandlerClass, dbName, tableName, time, time, indexTblName,
storageDescriptor, params, deferredRebuild);
indexDesc.getParameters().put("comment", indexComment);
- indexHandler.analyzeIndexDefinition(baseTbl, indexDesc, tt);
if (idxProps != null)
{
indexDesc.getParameters().putAll(idxProps);
}
+ indexHandler.analyzeIndexDefinition(baseTbl, indexDesc, tt);
+
this.getMSC().createIndex(indexDesc, tt);
} catch (Exception e) {
@@ -703,7 +704,7 @@ public class Hive {
} catch (NoSuchObjectException e) {
throw new HiveException("Partition or table doesn't exist.", e);
} catch (Exception e) {
- throw new HiveException("Unknow error. Please check logs.", e);
+ throw new HiveException("Unknown error. Please check logs.", e);
}
}
@@ -1419,7 +1420,7 @@ public class Hive {
} catch (NoSuchObjectException e) {
throw new HiveException("Partition or table doesn't exist.", e);
} catch (Exception e) {
- throw new HiveException("Unknow error. Please check logs.", e);
+ throw new HiveException("Unknown error. Please check logs.", e);
}
}
@@ -2098,6 +2099,4 @@ public class Hive {
private static String[] getQualifiedNames(String qualifiedName) {
return qualifiedName.split("\\.");
}
-
-
};
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/IndexUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/IndexUtils.java?rev=1170007&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/IndexUtils.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/IndexUtils.java Tue Sep 13 02:20:52 2011
@@ -0,0 +1,274 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Index;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.hooks.ReadEntity;
+import org.apache.hadoop.hive.ql.hooks.WriteEntity;
+import org.apache.hadoop.hive.ql.index.IndexMetadataChangeTask;
+import org.apache.hadoop.hive.ql.index.IndexMetadataChangeWork;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.optimizer.physical.index.IndexWhereProcessor;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+
+/**
+ * Utility class for index support.
+ * Currently used for BITMAP and AGGREGATE index
+ *
+ */
+public final class IndexUtils {
+
+ private static final Log LOG = LogFactory.getLog(IndexWhereProcessor.class.getName());
+ private static final Map<Index, Table> indexToIndexTable = new HashMap<Index, Table>();
+
+ private IndexUtils(){
+ }
+
+ /**
+ * Check the partitions used by the table scan to make sure they also exist in the
+ * index table.
+ * @param pctx
+ * @param indexes
+ * @return partitions used by query. null if they do not exist in index table
+ * @throws HiveException
+ */
+ public static Set<Partition> checkPartitionsCoveredByIndex(TableScanOperator tableScan,
+ ParseContext pctx,
+ Map<Table, List<Index>> indexes)
+ throws HiveException {
+ Hive hive = Hive.get(pctx.getConf());
+ Set<Partition> queryPartitions = null;
+ // make sure each partition exists on the index table
+ PrunedPartitionList queryPartitionList = pctx.getOpToPartList().get(tableScan);
+ if(queryPartitionList.getConfirmedPartns() != null
+ && !queryPartitionList.getConfirmedPartns().isEmpty()){
+ queryPartitions = queryPartitionList.getConfirmedPartns();
+ }else if(queryPartitionList.getUnknownPartns() != null
+ && !queryPartitionList.getUnknownPartns().isEmpty()){
+ queryPartitions = queryPartitionList.getUnknownPartns();
+ }
+
+ for (Partition part : queryPartitions) {
+ List<Table> sourceIndexTables = getIndexTables(hive, part, indexes);
+ if (!containsPartition(hive, part, indexes)) {
+ return null; // problem if it doesn't contain the partition
+ }
+ }
+
+ return queryPartitions;
+ }
+
+ /**
+ * return index tables associated with a given base table
+ */
+ private List<Table> getIndexTables(Hive hive, Table table,
+ Map<Table, List<Index>> indexes) throws
+ HiveException {
+ List<Table> indexTables = new ArrayList<Table>();
+ if (indexes == null || indexes.get(table) == null) {
+ return indexTables;
+ }
+ for (Index index : indexes.get(table)) {
+ Table indexTable = hive.getTable(index.getIndexTableName());
+ indexToIndexTable.put(index, indexTable);
+ indexTables.add(indexTable);
+ }
+ return indexTables;
+ }
+
+ /**
+ * return index tables associated with the base table of the partition
+ */
+ private static List<Table> getIndexTables(Hive hive, Partition part,
+ Map<Table, List<Index>> indexes) throws HiveException {
+ List<Table> indexTables = new ArrayList<Table>();
+ Table partitionedTable = part.getTable();
+ if (indexes == null || indexes.get(partitionedTable) == null) {
+ return indexTables;
+ }
+ for (Index index : indexes.get(partitionedTable)) {
+ Table indexTable = hive.getTable(index.getIndexTableName());
+ indexToIndexTable.put(index, indexTable);
+ indexTables.add(indexTable);
+ }
+ return indexTables;
+ }
+
+ /**
+ * check that every index table contains the given partition and is fresh
+ */
+ private static boolean containsPartition(Hive hive, Partition part,
+ Map<Table, List<Index>> indexes)
+ throws HiveException {
+ HashMap<String, String> partSpec = part.getSpec();
+
+ if (indexes == null || indexes.get(part.getTable()) == null) {
+ return false;
+ }
+
+ if (partSpec.isEmpty()) {
+ // empty specs come from non-partitioned tables
+ return isIndexTableFresh(hive, indexes.get(part.getTable()), part.getTable());
+ }
+
+ for (Index index : indexes.get(part.getTable())) {
+ Table indexTable = indexToIndexTable.get(index);
+ // get partitions that match the spec
+ List<Partition> matchingPartitions = hive.getPartitions(indexTable, partSpec);
+ if (matchingPartitions == null || matchingPartitions.size() == 0) {
+ LOG.info("Index table " + indexTable + "did not contain built partition that matched " + partSpec);
+ return false;
+ } else if (!isIndexPartitionFresh(hive, index, part)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Check the index partitions on a parttioned table exist and are fresh
+ */
+ private static boolean isIndexPartitionFresh(Hive hive, Index index,
+ Partition part) throws HiveException {
+ LOG.info("checking index staleness...");
+ try {
+ FileSystem partFs = part.getPartitionPath().getFileSystem(hive.getConf());
+ FileStatus partFss = partFs.getFileStatus(part.getPartitionPath());
+ String ts = index.getParameters().get(part.getSpec().toString());
+ if (ts == null) {
+ return false;
+ }
+ long indexTs = Long.parseLong(ts);
+ LOG.info(partFss.getModificationTime());
+ LOG.info(ts);
+ if (partFss.getModificationTime() > indexTs) {
+ LOG.info("index is stale on the partitions that matched " + part.getSpec());
+ return false;
+ }
+ } catch (IOException e) {
+ LOG.info("failed to grab timestamp info");
+ throw new HiveException(e);
+ }
+ return true;
+ }
+
+ /**
+ * Check that the indexes on the unpartioned table exist and are fresh
+ */
+ private static boolean isIndexTableFresh(Hive hive, List<Index> indexes, Table src)
+ throws HiveException {
+ //check that they exist
+ if (indexes == null || indexes.size() == 0) {
+ return false;
+ }
+ //check that they are not stale
+ for (Index index : indexes) {
+ LOG.info("checking index staleness...");
+ try {
+ FileSystem srcFs = src.getPath().getFileSystem(hive.getConf());
+ FileStatus srcFss= srcFs.getFileStatus(src.getPath());
+ String ts = index.getParameters().get("base_timestamp");
+ if (ts == null) {
+ return false;
+ }
+ long indexTs = Long.parseLong(ts);
+ LOG.info(srcFss.getModificationTime());
+ LOG.info(ts);
+ if (srcFss.getModificationTime() > indexTs) {
+ LOG.info("index is stale ");
+ return false;
+ }
+ } catch (IOException e) {
+ LOG.info("failed to grab timestamp info");
+ throw new HiveException(e);
+ }
+ }
+ return true;
+ }
+
+
+ /**
+ * Get a list of indexes on a table that match given types.
+ */
+ public static List<Index> getIndexes(Table baseTableMetaData, List<String> matchIndexTypes)
+ throws SemanticException {
+ List<Index> matchingIndexes = new ArrayList<Index>();
+ List<Index> indexesOnTable = null;
+
+ try {
+ indexesOnTable = baseTableMetaData.getAllIndexes((short) -1); // get all indexes
+ } catch (HiveException e) {
+ throw new SemanticException("Error accessing metastore", e);
+ }
+
+ for (Index index : indexesOnTable) {
+ String indexType = index.getIndexHandlerClass();
+ if (matchIndexTypes.contains(indexType)) {
+ matchingIndexes.add(index);
+ }
+ }
+ return matchingIndexes;
+ }
+
+
+ public static Task<?> createRootTask(HiveConf builderConf, Set<ReadEntity> inputs,
+ Set<WriteEntity> outputs, StringBuilder command,
+ LinkedHashMap<String, String> partSpec,
+ String indexTableName, String dbName){
+ // Don't try to index optimize the query to build the index
+ HiveConf.setBoolVar(builderConf, HiveConf.ConfVars.HIVEOPTINDEXFILTER, false);
+ Driver driver = new Driver(builderConf);
+ driver.compile(command.toString());
+
+ Task<?> rootTask = driver.getPlan().getRootTasks().get(0);
+ inputs.addAll(driver.getPlan().getInputs());
+ outputs.addAll(driver.getPlan().getOutputs());
+
+ IndexMetadataChangeWork indexMetaChange = new IndexMetadataChangeWork(partSpec,
+ indexTableName, dbName);
+ IndexMetadataChangeTask indexMetaChangeTsk = new IndexMetadataChangeTask();
+ indexMetaChangeTsk.setWork(indexMetaChange);
+ rootTask.addDependentTask(indexMetaChangeTsk);
+
+ return rootTask;
+ }
+
+
+}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java?rev=1170007&r1=1170006&r2=1170007&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java Tue Sep 13 02:20:52 2011
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.optimizer.index.RewriteGBUsingIndex;
import org.apache.hadoop.hive.ql.optimizer.lineage.Generator;
import org.apache.hadoop.hive.ql.optimizer.pcr.PartitionConditionRemover;
import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner;
@@ -54,11 +55,12 @@ public class Optimizer {
transformations.add(new PartitionPruner());
transformations.add(new PartitionConditionRemover());
}
-
+ if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTGBYUSINGINDEX)) {
+ transformations.add(new RewriteGBUsingIndex());
+ }
if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTGROUPBY)) {
transformations.add(new GroupByOptimizer());
}
-
transformations.add(new SamplePruner());
transformations.add(new MapJoinProcessor());
if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTBUCKETMAPJOIN)) {
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteCanApplyCtx.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteCanApplyCtx.java?rev=1170007&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteCanApplyCtx.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteCanApplyCtx.java Tue Sep 13 02:20:52 2011
@@ -0,0 +1,364 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.index;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Stack;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.metastore.api.Index;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
+import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.GraphWalker;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.lib.PreOrderWalker;
+import org.apache.hadoop.hive.ql.lib.Rule;
+import org.apache.hadoop.hive.ql.lib.RuleRegExp;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+
+/**
+ * RewriteCanApplyCtx class stores the context for the {@link RewriteCanApplyProcFactory}
+ * to determine if any index can be used and if the input query
+ * meets all the criteria for rewrite optimization.
+ */
+public final class RewriteCanApplyCtx implements NodeProcessorCtx {
+
+ private static final Log LOG = LogFactory.getLog(RewriteCanApplyCtx.class.getName());
+
+ private RewriteCanApplyCtx(ParseContext parseContext) {
+ this.parseContext = parseContext;
+ }
+
+ public static RewriteCanApplyCtx getInstance(ParseContext parseContext){
+ return new RewriteCanApplyCtx(parseContext);
+ }
+
+ // Rewrite Variables
+ private int aggFuncCnt = 0;
+ private boolean queryHasGroupBy = false;
+ private boolean aggFuncIsNotCount = false;
+ private boolean aggFuncColsFetchException = false;
+ private boolean whrClauseColsFetchException = false;
+ private boolean selClauseColsFetchException = false;
+ private boolean gbyKeysFetchException = false;
+ private boolean countOnAllCols = false;
+ private boolean countOfOne = false;
+ private boolean queryHasMultipleTables = false;
+
+ //Data structures that are populated in the RewriteCanApplyProcFactory
+ //methods to check if the index key meets all criteria
+ private Set<String> selectColumnsList = new LinkedHashSet<String>();
+ private Set<String> predicateColumnsList = new LinkedHashSet<String>();
+ private Set<String> gbKeyNameList = new LinkedHashSet<String>();
+ private Set<String> aggFuncColList = new LinkedHashSet<String>();
+
+ private final ParseContext parseContext;
+ private String baseTableName;
+ private String aggFunction;
+
+ void resetCanApplyCtx(){
+ setAggFuncCnt(0);
+ setQueryHasGroupBy(false);
+ setAggFuncIsNotCount(false);
+ setAggFuncColsFetchException(false);
+ setWhrClauseColsFetchException(false);
+ setSelClauseColsFetchException(false);
+ setGbyKeysFetchException(false);
+ setCountOnAllCols(false);
+ setCountOfOne(false);
+ setQueryHasMultipleTables(false);
+ selectColumnsList.clear();
+ predicateColumnsList.clear();
+ gbKeyNameList.clear();
+ aggFuncColList.clear();
+ setBaseTableName("");
+ setAggFunction("");
+ }
+
+ public boolean isQueryHasGroupBy() {
+ return queryHasGroupBy;
+ }
+
+ public void setQueryHasGroupBy(boolean queryHasGroupBy) {
+ this.queryHasGroupBy = queryHasGroupBy;
+ }
+
+ public boolean isAggFuncIsNotCount() {
+ return aggFuncIsNotCount;
+ }
+
+ public void setAggFuncIsNotCount(boolean aggFuncIsNotCount) {
+ this.aggFuncIsNotCount = aggFuncIsNotCount;
+ }
+
+ public Map<String, String> getBaseToIdxTableMap() {
+ return baseToIdxTableMap;
+ }
+
+ public void setAggFunction(String aggFunction) {
+ this.aggFunction = aggFunction;
+ }
+
+ public String getAggFunction() {
+ return aggFunction;
+ }
+
+ public void setAggFuncColsFetchException(boolean aggFuncColsFetchException) {
+ this.aggFuncColsFetchException = aggFuncColsFetchException;
+ }
+
+ public boolean isAggFuncColsFetchException() {
+ return aggFuncColsFetchException;
+ }
+
+ public void setWhrClauseColsFetchException(boolean whrClauseColsFetchException) {
+ this.whrClauseColsFetchException = whrClauseColsFetchException;
+ }
+
+ public boolean isWhrClauseColsFetchException() {
+ return whrClauseColsFetchException;
+ }
+
+ public void setSelClauseColsFetchException(boolean selClauseColsFetchException) {
+ this.selClauseColsFetchException = selClauseColsFetchException;
+ }
+
+ public boolean isSelClauseColsFetchException() {
+ return selClauseColsFetchException;
+ }
+
+ public void setGbyKeysFetchException(boolean gbyKeysFetchException) {
+ this.gbyKeysFetchException = gbyKeysFetchException;
+ }
+
+ public boolean isGbyKeysFetchException() {
+ return gbyKeysFetchException;
+ }
+
+ public void setCountOnAllCols(boolean countOnAllCols) {
+ this.countOnAllCols = countOnAllCols;
+ }
+
+ public boolean isCountOnAllCols() {
+ return countOnAllCols;
+ }
+
+ public void setCountOfOne(boolean countOfOne) {
+ this.countOfOne = countOfOne;
+ }
+
+ public boolean isCountOfOne() {
+ return countOfOne;
+ }
+
+ public void setQueryHasMultipleTables(boolean queryHasMultipleTables) {
+ this.queryHasMultipleTables = queryHasMultipleTables;
+ }
+
+ public boolean isQueryHasMultipleTables() {
+ return queryHasMultipleTables;
+ }
+
+ public Set<String> getSelectColumnsList() {
+ return selectColumnsList;
+ }
+
+ public void setSelectColumnsList(Set<String> selectColumnsList) {
+ this.selectColumnsList = selectColumnsList;
+ }
+
+ public Set<String> getPredicateColumnsList() {
+ return predicateColumnsList;
+ }
+
+ public void setPredicateColumnsList(Set<String> predicateColumnsList) {
+ this.predicateColumnsList = predicateColumnsList;
+ }
+
+ public Set<String> getGbKeyNameList() {
+ return gbKeyNameList;
+ }
+
+ public void setGbKeyNameList(Set<String> gbKeyNameList) {
+ this.gbKeyNameList = gbKeyNameList;
+ }
+
+ public Set<String> getAggFuncColList() {
+ return aggFuncColList;
+ }
+
+ public void setAggFuncColList(Set<String> aggFuncColList) {
+ this.aggFuncColList = aggFuncColList;
+ }
+
+ public int getAggFuncCnt() {
+ return aggFuncCnt;
+ }
+
+ public void setAggFuncCnt(int aggFuncCnt) {
+ this.aggFuncCnt = aggFuncCnt;
+ }
+
+ public String getBaseTableName() {
+ return baseTableName;
+ }
+
+ public void setBaseTableName(String baseTableName) {
+ this.baseTableName = baseTableName;
+ }
+
+ public ParseContext getParseContext() {
+ return parseContext;
+ }
+
+
+ /**
+ * This method walks all the nodes starting from topOp TableScanOperator node
+ * and invokes methods from {@link RewriteCanApplyProcFactory} for each of the rules
+ * added to the opRules map. We use the {@link PreOrderWalker} for a pre-order
+ * traversal of the operator tree.
+ *
+ * The methods from {@link RewriteCanApplyProcFactory} set appropriate values in
+ * {@link RewriteVars} enum.
+ *
+ * @param topOp
+ * @throws SemanticException
+ */
+ void populateRewriteVars(Operator<? extends Serializable> topOp) throws SemanticException{
+ Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+ opRules.put(new RuleRegExp("R1", "FIL%"),
+ RewriteCanApplyProcFactory.canApplyOnFilterOperator());
+ opRules.put(new RuleRegExp("R2", "GBY%"),
+ RewriteCanApplyProcFactory.canApplyOnGroupByOperator());
+ opRules.put(new RuleRegExp("R3", "SEL%"),
+ RewriteCanApplyProcFactory.canApplyOnSelectOperator());
+
+ // The dispatcher fires the processor corresponding to the closest matching
+ // rule and passes the context along
+ Dispatcher disp = new DefaultRuleDispatcher(getDefaultProc(), opRules, this);
+ GraphWalker ogw = new PreOrderWalker(disp);
+
+ // Create a list of topop nodes
+ List<Node> topNodes = new ArrayList<Node>();
+ topNodes.add(topOp);
+
+ try {
+ ogw.startWalking(topNodes, null);
+ } catch (SemanticException e) {
+ LOG.error("Exception in walking operator tree. Rewrite variables not populated");
+ LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e));
+ throw new SemanticException(e.getMessage(), e);
+ }
+ }
+
+
+ /**
+ * Default procedure for {@link DefaultRuleDispatcher}.
+ * @return
+ */
+ private NodeProcessor getDefaultProc() {
+ return new NodeProcessor() {
+ @Override
+ public Object process(Node nd, Stack<Node> stack,
+ NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException {
+ return null;
+ }
+ };
+ }
+
+
+ //Map for base table to index table mapping
+ //TableScan operator for base table will be modified to read from index table
+ private final Map<String, String> baseToIdxTableMap =
+ new HashMap<String, String>();;
+
+
+ public void addTable(String baseTableName, String indexTableName) {
+ baseToIdxTableMap.put(baseTableName, indexTableName);
+ }
+
+ public String findBaseTable(String baseTableName) {
+ return baseToIdxTableMap.get(baseTableName);
+ }
+
+
+ boolean isIndexUsableForQueryBranchRewrite(Index index, Set<String> indexKeyNames){
+
+ //--------------------------------------------
+ //Check if all columns in select list are part of index key columns
+ if (!indexKeyNames.containsAll(selectColumnsList)) {
+ LOG.info("Select list has non index key column : " +
+ " Cannot use index " + index.getIndexName());
+ return false;
+ }
+
+ //--------------------------------------------
+ // Check if all columns in where predicate are part of index key columns
+ if (!indexKeyNames.containsAll(predicateColumnsList)) {
+ LOG.info("Predicate column ref list has non index key column : " +
+ " Cannot use index " + index.getIndexName());
+ return false;
+ }
+
+ //--------------------------------------------
+ // For group by, we need to check if all keys are from index columns
+ // itself. Here GB key order can be different than index columns but that does
+ // not really matter for final result.
+ if (!indexKeyNames.containsAll(gbKeyNameList)) {
+ LOG.info("Group by key has some non-indexed columns, " +
+ " Cannot use index " + index.getIndexName());
+ return false;
+ }
+
+ // If we have agg function (currently only COUNT is supported), check if its inputs are
+ // from index. we currently support only that.
+ if (aggFuncColList.size() > 0) {
+ if (!indexKeyNames.containsAll(aggFuncColList)){
+ LOG.info("Agg Func input is not present in index key columns. Currently " +
+ "only agg func on index columns are supported by rewrite optimization");
+ return false;
+ }
+ }
+
+ //Now that we are good to do this optimization, set parameters in context
+ //which would be used by transformation procedure as inputs.
+ if(queryHasGroupBy
+ && aggFuncCnt == 1
+ && !aggFuncIsNotCount){
+ addTable(baseTableName, index.getIndexTableName());
+ }else{
+ LOG.info("No valid criteria met to apply rewrite.");
+ return false;
+ }
+ return true;
+ }
+
+}
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteCanApplyProcFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteCanApplyProcFactory.java?rev=1170007&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteCanApplyProcFactory.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteCanApplyProcFactory.java Tue Sep 13 02:20:52 2011
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.index;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Stack;
+
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.FilterOperator;
+import org.apache.hadoop.hive.ql.exec.GroupByOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
+import org.apache.hadoop.hive.ql.exec.SelectOperator;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.AggregationDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.FilterDesc;
+import org.apache.hadoop.hive.ql.plan.GroupByDesc;
+
+/**
+ * Factory of methods used by {@link RewriteGBUsingIndex}
+ * to determine if the rewrite optimization can be applied to the input query.
+ *
+ */
+public final class RewriteCanApplyProcFactory {
+ private static RewriteCanApplyCtx canApplyCtx = null;
+
+ private RewriteCanApplyProcFactory(){
+ //this prevents the class from getting instantiated
+ }
+
+ /**
+ * Check for conditions in FilterOperator that do not meet rewrite criteria.
+ */
+ private static class CheckFilterProc implements NodeProcessor {
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx,
+ Object... nodeOutputs) throws SemanticException {
+ FilterOperator operator = (FilterOperator)nd;
+ canApplyCtx = (RewriteCanApplyCtx)ctx;
+ FilterDesc conf = (FilterDesc)operator.getConf();
+ //The filter operator should have a predicate of ExprNodeGenericFuncDesc type.
+ //This represents the comparison operator
+ ExprNodeGenericFuncDesc oldengfd = (ExprNodeGenericFuncDesc) conf.getPredicate();
+ if(oldengfd == null){
+ canApplyCtx.setWhrClauseColsFetchException(true);
+ }
+ //The predicate should have valid left and right columns
+ List<String> colList = oldengfd.getCols();
+ if(colList == null || colList.size() == 0){
+ canApplyCtx.setWhrClauseColsFetchException(true);
+ }
+ //Add the predicate columns to RewriteCanApplyCtx's predColRefs list to check later
+ //if index keys contain all filter predicate columns and vice-a-versa
+ for (String col : colList) {
+ canApplyCtx.getPredicateColumnsList().add(col);
+ }
+ return null;
+ }
+ }
+
+ public static CheckFilterProc canApplyOnFilterOperator() {
+ return new CheckFilterProc();
+ }
+
+ /**
+ * Check for conditions in GroupByOperator that do not meet rewrite criteria.
+ *
+ */
+ private static class CheckGroupByProc implements NodeProcessor {
+
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx,
+ Object... nodeOutputs) throws SemanticException {
+ GroupByOperator operator = (GroupByOperator)nd;
+ canApplyCtx = (RewriteCanApplyCtx)ctx;
+ //for each group-by clause in query, only one GroupByOperator of the
+ //GBY-RS-GBY sequence is stored in getGroupOpToInputTables
+ //we need to process only this operator
+ //Also, we do not rewrite for cases when same query branch has multiple group-by constructs
+ if(canApplyCtx.getParseContext().getGroupOpToInputTables().containsKey(operator) &&
+ !canApplyCtx.isQueryHasGroupBy()){
+
+ canApplyCtx.setQueryHasGroupBy(true);
+ GroupByDesc conf = (GroupByDesc) operator.getConf();
+ List<AggregationDesc> aggrList = conf.getAggregators();
+ if(aggrList != null && aggrList.size() > 0){
+ for (AggregationDesc aggregationDesc : aggrList) {
+ canApplyCtx.setAggFuncCnt(canApplyCtx.getAggFuncCnt() + 1);
+ //In the current implementation, we do not support more than 1 agg funcs in group-by
+ if(canApplyCtx.getAggFuncCnt() > 1) {
+ return false;
+ }
+ String aggFunc = aggregationDesc.getGenericUDAFName();
+ if(!("count".equals(aggFunc))){
+ canApplyCtx.setAggFuncIsNotCount(true);
+ }else{
+ List<ExprNodeDesc> para = aggregationDesc.getParameters();
+ //for a valid aggregation, it needs to have non-null parameter list
+ if(para == null){
+ canApplyCtx.setAggFuncColsFetchException(true);
+ }else if(para.size() == 0){
+ //count(*) case
+ canApplyCtx.setCountOnAllCols(true);
+ canApplyCtx.setAggFunction("_count_Of_ALL");
+ }else{
+ assert para.size()==1;
+ for(int i=0; i< para.size(); i++){
+ ExprNodeDesc expr = para.get(i);
+ if(expr instanceof ExprNodeColumnDesc){
+ //Add the columns to RewriteCanApplyCtx's selectColumnsList list
+ //to check later if index keys contain all select clause columns
+ //and vice-a-versa. We get the select column 'actual' names only here
+ //if we have a agg func along with group-by
+ //SelectOperator has internal names in its colList data structure
+ canApplyCtx.getSelectColumnsList().add(
+ ((ExprNodeColumnDesc) expr).getColumn());
+ //Add the columns to RewriteCanApplyCtx's aggFuncColList list to check later
+ //if columns contained in agg func are index key columns
+ canApplyCtx.getAggFuncColList().add(
+ ((ExprNodeColumnDesc) expr).getColumn());
+ canApplyCtx.setAggFunction("_count_Of_" +
+ ((ExprNodeColumnDesc) expr).getColumn() + "");
+ }else if(expr instanceof ExprNodeConstantDesc){
+ //count(1) case
+ canApplyCtx.setCountOfOne(true);
+ canApplyCtx.setAggFunction("_count_Of_1");
+ }
+ }
+ }
+ }
+ }
+ }
+
+ //we need to have non-null group-by keys for a valid group-by operator
+ List<ExprNodeDesc> keyList = conf.getKeys();
+ if(keyList == null || keyList.size() == 0){
+ canApplyCtx.setGbyKeysFetchException(true);
+ }
+ for (ExprNodeDesc expr : keyList) {
+ checkExpression(expr);
+ }
+ }
+ return null;
+ }
+
+ private void checkExpression(ExprNodeDesc expr){
+ if(expr instanceof ExprNodeColumnDesc){
+ //Add the group-by keys to RewriteCanApplyCtx's gbKeyNameList list to check later
+ //if all keys are from index columns
+ canApplyCtx.getGbKeyNameList().addAll(expr.getCols());
+ }else if(expr instanceof ExprNodeGenericFuncDesc){
+ ExprNodeGenericFuncDesc funcExpr = (ExprNodeGenericFuncDesc)expr;
+ List<ExprNodeDesc> childExprs = funcExpr.getChildExprs();
+ for (ExprNodeDesc childExpr : childExprs) {
+ if(childExpr instanceof ExprNodeColumnDesc){
+ canApplyCtx.getGbKeyNameList().addAll(expr.getCols());
+ canApplyCtx.getSelectColumnsList().add(((ExprNodeColumnDesc) childExpr).getColumn());
+ }else if(childExpr instanceof ExprNodeGenericFuncDesc){
+ checkExpression(childExpr);
+ }
+ }
+ }
+ }
+ }
+
+
+ public static CheckGroupByProc canApplyOnGroupByOperator() {
+ return new CheckGroupByProc();
+ }
+
+
+ /**
+ * Check for conditions in SelectOperator that do not meet rewrite criteria.
+ */
+ private static class CheckSelectProc implements NodeProcessor {
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx,
+ Object... nodeOutputs) throws SemanticException {
+ SelectOperator operator = (SelectOperator)nd;
+ canApplyCtx = (RewriteCanApplyCtx)ctx;
+
+ List<Operator<? extends Serializable>> childrenList = operator.getChildOperators();
+ Operator<? extends Serializable> child = childrenList.get(0);
+ if(child instanceof FileSinkOperator){
+ Map<String, String> internalToAlias = new LinkedHashMap<String, String>();
+ RowSchema rs = operator.getSchema();
+ //to get the internal to alias mapping
+ List<ColumnInfo> sign = rs.getSignature();
+ for (ColumnInfo columnInfo : sign) {
+ internalToAlias.put(columnInfo.getInternalName(), columnInfo.getAlias());
+ }
+
+ //if FilterOperator predicate has internal column names,
+ //we need to retrieve the 'actual' column names to
+ //check if index keys contain all filter predicate columns and vice-a-versa
+ Iterator<String> predItr = canApplyCtx.getPredicateColumnsList().iterator();
+ while(predItr.hasNext()){
+ String predCol = predItr.next();
+ String newPredCol = "";
+ if(internalToAlias.get(predCol) != null){
+ newPredCol = internalToAlias.get(predCol);
+ canApplyCtx.getPredicateColumnsList().remove(predCol);
+ canApplyCtx.getPredicateColumnsList().add(newPredCol);
+ }
+ }
+ }
+ return null;
+ }
+ }
+
+ public static CheckSelectProc canApplyOnSelectOperator() {
+ return new CheckSelectProc();
+ }
+
+}
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteGBUsingIndex.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteGBUsingIndex.java?rev=1170007&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteGBUsingIndex.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteGBUsingIndex.java Tue Sep 13 02:20:52 2011
@@ -0,0 +1,474 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.index;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Index;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.index.AggregateIndexHandler;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.optimizer.IndexUtils;
+import org.apache.hadoop.hive.ql.optimizer.Transform;
+import org.apache.hadoop.hive.ql.parse.OpParseContext;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+
+
+/**
+ * RewriteGBUsingIndex is implemented as one of the Rule-based Optimizations.
+ * Implements optimizations for GroupBy clause rewrite using aggregate index.
+ * This optimization rewrites GroupBy query over base table to the query over simple table-scan
+ * over index table, if there is index on the group by key(s) or the distinct column(s).
+ * E.g.
+ * <code>
+ * select count(key)
+ * from table
+ * group by key;
+ * </code>
+ * to
+ * <code>
+ * select sum(_count_Of_key)
+ * from idx_table
+ * group by key;
+ * </code>
+ *
+ * The rewrite supports following queries:
+ * <ul>
+ * <li> Queries having only those col refs that are in the index key.
+ * <li> Queries that have index key col refs
+ * <ul>
+ * <li> in SELECT
+ * <li> in WHERE
+ * <li> in GROUP BY
+ * </ul>
+ * <li> Queries with agg func COUNT(index key col ref) in SELECT
+ * <li> Queries with SELECT DISTINCT index_key_col_refs
+ * <li> Queries having a subquery satisfying above condition (only the subquery is rewritten)
+ * </ul>
+ *
+ * @see AggregateIndexHandler
+ * @see IndexUtils
+ * @see RewriteCanApplyCtx
+ * @see RewriteCanApplyProcFactory
+ * @see RewriteParseContextGenerator
+ * @see RewriteQueryUsingAggregateIndexCtx
+ * @see RewriteQueryUsingAggregateIndex
+ * For test cases, @see ql_rewrite_gbtoidx.q
+ */
+
+public class RewriteGBUsingIndex implements Transform {
+ private ParseContext parseContext;
+ private Hive hiveDb;
+ private HiveConf hiveConf;
+ private static final Log LOG = LogFactory.getLog(RewriteGBUsingIndex.class.getName());
+
+ /*
+ * Stores the list of top TableScanOperator names for which the rewrite
+ * can be applied and the action that needs to be performed for operator tree
+ * starting from this TableScanOperator
+ */
+ private final Map<String, RewriteCanApplyCtx> tsOpToProcess =
+ new LinkedHashMap<String, RewriteCanApplyCtx>();
+
+ //Name of the current table on which rewrite is being performed
+ private String baseTableName = null;
+ //Name of the current index which is used for rewrite
+ private String indexTableName = null;
+
+ //Index Validation Variables
+ private static final String IDX_BUCKET_COL = "_bucketname";
+ private static final String IDX_OFFSETS_ARRAY_COL = "_offsets";
+
+
+ @Override
+ public ParseContext transform(ParseContext pctx) throws SemanticException {
+ parseContext = pctx;
+ hiveConf = parseContext.getConf();
+ try {
+ hiveDb = Hive.get(hiveConf);
+ } catch (HiveException e) {
+ LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e));
+ throw new SemanticException(e.getMessage(), e);
+ }
+
+ // Don't try to index optimize the query to build the index
+ HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTINDEXFILTER, false);
+
+ /* Check if the input query passes all the tests to be eligible for a rewrite
+ * If yes, rewrite original query; else, return the current parseContext
+ */
+ if(shouldApplyOptimization()){
+ LOG.info("Rewriting Original Query using " + getName() + " optimization.");
+ rewriteOriginalQuery();
+ }
+ return parseContext;
+ }
+
+ private String getName() {
+ return "RewriteGBUsingIndex";
+ }
+
+ /**
+ * We traverse the current operator tree to check for conditions in which the
+ * optimization cannot be applied.
+ *
+ * At the end, we check if all conditions have passed for rewrite. If yes, we
+ * determine if the the index is usable for rewrite. Else, we log the condition which
+ * did not meet the rewrite criterion.
+ *
+ * @return
+ * @throws SemanticException
+ */
+ boolean shouldApplyOptimization() throws SemanticException{
+ boolean canApply = false;
+ if(ifQueryHasMultipleTables()){
+ //We do not apply this optimization for this case as of now.
+ return false;
+ }else{
+ /*
+ * This code iterates over each TableScanOperator from the topOps map from ParseContext.
+ * For each operator tree originating from this top TableScanOperator, we determine
+ * if the optimization can be applied. If yes, we add the name of the top table to
+ * the tsOpToProcess to apply rewrite later on.
+ * */
+ Map<TableScanOperator, Table> topToTable = parseContext.getTopToTable();
+ Iterator<TableScanOperator> topOpItr = topToTable.keySet().iterator();
+ while(topOpItr.hasNext()){
+
+ TableScanOperator topOp = topOpItr.next();
+ Table table = topToTable.get(topOp);
+ baseTableName = table.getTableName();
+ Map<Table, List<Index>> indexes = getIndexesForRewrite();
+ if(indexes == null){
+ LOG.debug("Error getting valid indexes for rewrite, " +
+ "skipping " + getName() + " optimization");
+ return false;
+ }
+
+ if(indexes.size() == 0){
+ LOG.debug("No Valid Index Found to apply Rewrite, " +
+ "skipping " + getName() + " optimization");
+ return false;
+ }else{
+ //we need to check if the base table has confirmed or unknown partitions
+ if(parseContext.getOpToPartList() != null && parseContext.getOpToPartList().size() > 0){
+ //if base table has partitions, we need to check if index is built for
+ //all partitions. If not, then we do not apply the optimization
+ if(checkIfIndexBuiltOnAllTablePartitions(topOp, indexes)){
+ //check if rewrite can be applied for operator tree
+ //if partitions condition returns true
+ canApply = checkIfRewriteCanBeApplied(topOp, table, indexes);
+ }else{
+ LOG.debug("Index is not built for all table partitions, " +
+ "skipping " + getName() + " optimization");
+ return false;
+ }
+ }else{
+ //check if rewrite can be applied for operator tree
+ //if there are no partitions on base table
+ canApply = checkIfRewriteCanBeApplied(topOp, table, indexes);
+ }
+ }
+ }
+ }
+ return canApply;
+ }
+
+ /**
+ * This methods checks if rewrite can be applied using the index and also
+ * verifies all conditions of the operator tree.
+ *
+ * @param topOp - TableScanOperator for a single the operator tree branch
+ * @param indexes - Map of a table and list of indexes on it
+ * @return - true if rewrite can be applied on the current branch; false otherwise
+ * @throws SemanticException
+ */
+ private boolean checkIfRewriteCanBeApplied(TableScanOperator topOp, Table baseTable,
+ Map<Table, List<Index>> indexes) throws SemanticException{
+ boolean canApply = false;
+ //Context for checking if this optimization can be applied to the input query
+ RewriteCanApplyCtx canApplyCtx = RewriteCanApplyCtx.getInstance(parseContext);
+ Map<String, Operator<? extends Serializable>> topOps = parseContext.getTopOps();
+
+ canApplyCtx.setBaseTableName(baseTableName);
+ canApplyCtx.populateRewriteVars(topOp);
+
+ Map<Index, Set<String>> indexTableMap = getIndexToKeysMap(indexes.get(baseTable));
+ Iterator<Index> indexMapItr = indexTableMap.keySet().iterator();
+ Index index = null;
+ while(indexMapItr.hasNext()){
+ //we rewrite the original query using the first valid index encountered
+ //this can be changed if we have a better mechanism to
+ //decide which index will produce a better rewrite
+ index = indexMapItr.next();
+ canApply = canApplyCtx.isIndexUsableForQueryBranchRewrite(index,
+ indexTableMap.get(index));
+ if(canApply){
+ canApply = checkIfAllRewriteCriteriaIsMet(canApplyCtx);
+ //break here if any valid index is found to apply rewrite
+ if(canApply){
+ //check if aggregation function is set.
+ //If not, set it using the only indexed column
+ if(canApplyCtx.getAggFunction() == null){
+ //strip of the start and end braces [...]
+ String aggregationFunction = indexTableMap.get(index).toString();
+ aggregationFunction = aggregationFunction.substring(1,
+ aggregationFunction.length() - 1);
+ canApplyCtx.setAggFunction("_count_Of_" + aggregationFunction + "");
+ }
+ }
+ break;
+ }
+ }
+ indexTableName = index.getIndexTableName();
+
+ if(canApply && topOps.containsValue(topOp)) {
+ Iterator<String> topOpNamesItr = topOps.keySet().iterator();
+ while(topOpNamesItr.hasNext()){
+ String topOpName = topOpNamesItr.next();
+ if(topOps.get(topOpName).equals(topOp)){
+ tsOpToProcess.put(topOpName, canApplyCtx);
+ }
+ }
+ }
+
+ if(tsOpToProcess.size() == 0){
+ canApply = false;
+ }else{
+ canApply = true;
+ }
+ return canApply;
+ }
+
+ /**
+ * This block of code iterates over the topToTable map from ParseContext
+ * to determine if the query has a scan over multiple tables.
+ * @return
+ */
+ boolean ifQueryHasMultipleTables(){
+ Map<TableScanOperator, Table> topToTable = parseContext.getTopToTable();
+ Iterator<Table> valuesItr = topToTable.values().iterator();
+ Set<String> tableNameSet = new HashSet<String>();
+ while(valuesItr.hasNext()){
+ Table table = valuesItr.next();
+ tableNameSet.add(table.getTableName());
+ }
+ if(tableNameSet.size() > 1){
+ LOG.debug("Query has more than one table " +
+ "that is not supported with " + getName() + " optimization.");
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Get a list of indexes which can be used for rewrite.
+ * @return
+ * @throws SemanticException
+ */
+ private Map<Table, List<Index>> getIndexesForRewrite() throws SemanticException{
+ List<String> supportedIndexes = new ArrayList<String>();
+ supportedIndexes.add(AggregateIndexHandler.class.getName());
+
+ // query the metastore to know what columns we have indexed
+ Collection<Table> topTables = parseContext.getTopToTable().values();
+ Map<Table, List<Index>> indexes = new HashMap<Table, List<Index>>();
+ for (Table tbl : topTables){
+ List<Index> tblIndexes = IndexUtils.getIndexes(tbl, supportedIndexes);
+ if (tblIndexes.size() > 0) {
+ indexes.put(tbl, tblIndexes);
+ }
+ }
+
+ return indexes;
+ }
+
+ /**
+ * This method checks if the index is built on all partitions of the base
+ * table. If not, then the method returns false as we do not apply optimization
+ * for this case.
+ * @param tableScan
+ * @param indexes
+ * @return
+ * @throws SemanticException
+ */
+ private boolean checkIfIndexBuiltOnAllTablePartitions(TableScanOperator tableScan,
+ Map<Table, List<Index>> indexes) throws SemanticException{
+ // check if we have indexes on all partitions in this table scan
+ Set<Partition> queryPartitions;
+ try {
+ queryPartitions = IndexUtils.checkPartitionsCoveredByIndex(tableScan, parseContext, indexes);
+ if (queryPartitions == null) { // partitions not covered
+ return false;
+ }
+ } catch (HiveException e) {
+ LOG.error("Fatal Error: problem accessing metastore", e);
+ throw new SemanticException(e);
+ }
+ if(queryPartitions.size() != 0){
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * This code block iterates over indexes on the table and populates the indexToKeys map
+ * for all the indexes that satisfy the rewrite criteria.
+ * @param indexTables
+ * @return
+ * @throws SemanticException
+ */
+ Map<Index, Set<String>> getIndexToKeysMap(List<Index> indexTables) throws SemanticException{
+ Index index = null;
+ Hive hiveInstance = hiveDb;
+ Map<Index, Set<String>> indexToKeysMap = new LinkedHashMap<Index, Set<String>>();
+ for (int idxCtr = 0; idxCtr < indexTables.size(); idxCtr++) {
+ final Set<String> indexKeyNames = new LinkedHashSet<String>();
+ index = indexTables.get(idxCtr);
+ //Getting index key columns
+ StorageDescriptor sd = index.getSd();
+ List<FieldSchema> idxColList = sd.getCols();
+ for (FieldSchema fieldSchema : idxColList) {
+ indexKeyNames.add(fieldSchema.getName());
+ }
+ assert indexKeyNames.size()==1;
+ // Check that the index schema is as expected. This code block should
+ // catch problems of this rewrite breaking when the AggregateIndexHandler
+ // index is changed.
+ List<String> idxTblColNames = new ArrayList<String>();
+ try {
+ Table idxTbl = hiveInstance.getTable(index.getDbName(),
+ index.getIndexTableName());
+ for (FieldSchema idxTblCol : idxTbl.getCols()) {
+ idxTblColNames.add(idxTblCol.getName());
+ }
+ } catch (HiveException e) {
+ LOG.error("Got exception while locating index table, " +
+ "skipping " + getName() + " optimization");
+ LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e));
+ throw new SemanticException(e.getMessage(), e);
+ }
+ assert(idxTblColNames.contains(IDX_BUCKET_COL));
+ assert(idxTblColNames.contains(IDX_OFFSETS_ARRAY_COL));
+ // we add all index tables which can be used for rewrite
+ // and defer the decision of using a particular index for later
+ // this is to allow choosing a index if a better mechanism is
+ // designed later to chose a better rewrite
+ indexToKeysMap.put(index, indexKeyNames);
+ }
+ return indexToKeysMap;
+ }
+
+ /**
+ * Method to rewrite the input query if all optimization criteria is passed.
+ * The method iterates over the tsOpToProcess {@link ArrayList} to apply the rewrites
+ * @throws SemanticException
+ *
+ */
+ @SuppressWarnings("unchecked")
+ private void rewriteOriginalQuery() throws SemanticException {
+ Map<String, Operator<? extends Serializable>> topOpMap =
+ (HashMap<String, Operator<? extends Serializable>>) parseContext.getTopOps().clone();
+ Iterator<String> tsOpItr = tsOpToProcess.keySet().iterator();
+
+ while(tsOpItr.hasNext()){
+ baseTableName = tsOpItr.next();
+ RewriteCanApplyCtx canApplyCtx = tsOpToProcess.get(baseTableName);
+ TableScanOperator topOp = (TableScanOperator) topOpMap.get(baseTableName);
+ RewriteQueryUsingAggregateIndexCtx rewriteQueryCtx =
+ RewriteQueryUsingAggregateIndexCtx.getInstance(parseContext, hiveDb,
+ indexTableName, baseTableName, canApplyCtx.getAggFunction());
+ rewriteQueryCtx.invokeRewriteQueryProc(topOp);
+ parseContext = rewriteQueryCtx.getParseContext();
+ parseContext.setOpParseCtx((LinkedHashMap<Operator<? extends Serializable>,
+ OpParseContext>) rewriteQueryCtx.getOpc());
+ }
+ LOG.info("Finished Rewriting query");
+ }
+
+
+ /**
+ * This method logs the reason for which we cannot apply the rewrite optimization.
+ * @return
+ */
+ boolean checkIfAllRewriteCriteriaIsMet(RewriteCanApplyCtx canApplyCtx){
+ if (canApplyCtx.getAggFuncCnt() > 1){
+ LOG.debug("More than 1 agg funcs: " +
+ "Not supported by " + getName() + " optimization.");
+ return false;
+ }
+ if (canApplyCtx.isAggFuncIsNotCount()){
+ LOG.debug("Agg func other than count is " +
+ "not supported by " + getName() + " optimization.");
+ return false;
+ }
+ if (canApplyCtx.isCountOnAllCols()){
+ LOG.debug("Currently count function needs group by on key columns. This is a count(*) case.,"
+ + "Cannot apply this " + getName() + " optimization.");
+ return false;
+ }
+ if (canApplyCtx.isCountOfOne()){
+ LOG.debug("Currently count function needs group by on key columns. This is a count(1) case.,"
+ + "Cannot apply this " + getName() + " optimization.");
+ return false;
+ }
+ if (canApplyCtx.isAggFuncColsFetchException()){
+ LOG.debug("Got exception while locating child col refs " +
+ "of agg func, skipping " + getName() + " optimization.");
+ return false;
+ }
+ if (canApplyCtx.isWhrClauseColsFetchException()){
+ LOG.debug("Got exception while locating child col refs for where clause, "
+ + "skipping " + getName() + " optimization.");
+ return false;
+ }
+ if (canApplyCtx.isSelClauseColsFetchException()){
+ LOG.debug("Got exception while locating child col refs for select list, "
+ + "skipping " + getName() + " optimization.");
+ return false;
+ }
+ if (canApplyCtx.isGbyKeysFetchException()){
+ LOG.debug("Got exception while locating child col refs for GroupBy key, "
+ + "skipping " + getName() + " optimization.");
+ return false;
+ }
+ return true;
+ }
+}
+