You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/04/02 16:16:37 UTC
svn commit: r1463556 [3/15] - in /hive/trunk:
common/src/java/org/apache/hadoop/hive/conf/ data/files/ ql/if/
ql/src/gen/thrift/gen-cpp/
ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/
ql/src/gen/thrift/gen-php/ ql/src/gen/thrift/gen...
Added: hive/trunk/data/files/part.rc
URL: http://svn.apache.org/viewvc/hive/trunk/data/files/part.rc?rev=1463556&view=auto
==============================================================================
Binary file - no diff available.
Propchange: hive/trunk/data/files/part.rc
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: hive/trunk/data/files/part.seq
URL: http://svn.apache.org/viewvc/hive/trunk/data/files/part.seq?rev=1463556&view=auto
==============================================================================
Binary file - no diff available.
Propchange: hive/trunk/data/files/part.seq
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: hive/trunk/data/files/part_tiny.txt
URL: http://svn.apache.org/viewvc/hive/trunk/data/files/part_tiny.txt?rev=1463556&view=auto
==============================================================================
--- hive/trunk/data/files/part_tiny.txt (added)
+++ hive/trunk/data/files/part_tiny.txt Tue Apr 2 14:16:34 2013
@@ -0,0 +1,26 @@
+121152almond antique burnished rose metallicManufacturer#1Brand#14PROMO PLATED TIN2JUMBO BOX1173.15e pinto beans h
+121152almond antique burnished rose metallicManufacturer#1Brand#14PROMO PLATED TIN2JUMBO BOX1173.15e pinto beans h
+85768almond antique chartreuse lavender yellowManufacturer#1Brand#12LARGE BRUSHED STEEL34SM BAG1753.76refull
+110592almond antique salmon chartreuse burlywoodManufacturer#1Brand#15PROMO BURNISHED NICKEL6JUMBO PKG1602.59 to the furiously
+86428almond aquamarine burnished black steelManufacturer#1Brand#12STANDARD ANODIZED STEEL28WRAP BAG1414.42arefully
+65667almond aquamarine pink moccasin thistleManufacturer#1Brand#12LARGE BURNISHED STEEL42JUMBO CASE1632.66e across the expr
+105685almond antique violet chocolate turquoiseManufacturer#2Brand#22MEDIUM ANODIZED COPPER14MED CAN1690.68ly pending requ
+191709almond antique violet turquoise frostedManufacturer#2Brand#22ECONOMY POLISHED STEEL40MED BOX1800.7 haggle
+146985almond aquamarine midnight light salmonManufacturer#2Brand#23MEDIUM BURNISHED COPPER2SM CASE2031.98s cajole caref
+132666almond aquamarine rose maroon antiqueManufacturer#2Brand#24SMALL POLISHED NICKEL25MED BOX1698.66even
+195606almond aquamarine sandy cyan gainsboroManufacturer#2Brand#25STANDARD PLATED TIN18SM PKG1701.6ic de
+90681almond antique chartreuse khaki whiteManufacturer#3Brand#31MEDIUM BURNISHED TIN17SM CASE1671.68are slyly after the sl
+17273almond antique forest lavender goldenrodManufacturer#3Brand#35PROMO ANODIZED TIN14JUMBO CASE1190.27along the
+112398almond antique metallic orange dimManufacturer#3Brand#32MEDIUM BURNISHED BRASS19JUMBO JAR1410.39ole car
+40982almond antique misty red oliveManufacturer#3Brand#32ECONOMY PLATED COPPER1LG PKG1922.98c foxes can s
+144293almond antique olive coral navajoManufacturer#3Brand#34STANDARD POLISHED STEEL45JUMBO CAN1337.29ag furiously about
+49671almond antique gainsboro frosted violetManufacturer#4Brand#41SMALL BRUSHED BRASS10SM BOX1620.67ccounts run quick
+48427almond antique violet mint lemonManufacturer#4Brand#42PROMO POLISHED STEEL39SM CASE1375.42hely ironic i
+45261almond aquamarine floral ivory bisqueManufacturer#4Brand#42SMALL PLATED STEEL27WRAP CASE1206.26careful
+17927almond aquamarine yellow dodger mintManufacturer#4Brand#41ECONOMY BRUSHED COPPER7SM PKG1844.92ites. eve
+33357almond azure aquamarine papaya violetManufacturer#4Brand#41STANDARD ANODIZED TIN12WRAP CASE1290.35reful
+192697almond antique blue firebrick mintManufacturer#5Brand#52MEDIUM BURNISHED TIN31LG DRUM1789.69ickly ir
+42669almond antique medium spring khakiManufacturer#5Brand#51STANDARD BURNISHED TIN6MED CAN1611.66sits haggl
+155733almond antique sky peru orangeManufacturer#5Brand#53SMALL PLATED BRASS2WRAP DRUM1788.73furiously. bra
+15103almond aquamarine dodger light gainsboroManufacturer#5Brand#53ECONOMY BURNISHED STEEL46LG PACK1018.1packages hinder carefu
+78486almond azure blanched chiffon midnightManufacturer#5Brand#52LARGE BRUSHED BRASS23MED BAG1464.48hely blith
Modified: hive/trunk/ql/if/queryplan.thrift
URL: http://svn.apache.org/viewvc/hive/trunk/ql/if/queryplan.thrift?rev=1463556&r1=1463555&r2=1463556&view=diff
==============================================================================
--- hive/trunk/ql/if/queryplan.thrift (original)
+++ hive/trunk/ql/if/queryplan.thrift Tue Apr 2 14:16:34 2013
@@ -53,6 +53,7 @@ enum OperatorType {
LATERALVIEWFORWARD,
HASHTABLESINK,
HASHTABLEDUMMY,
+ PTF,
}
struct Operator {
Modified: hive/trunk/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp?rev=1463556&r1=1463555&r2=1463556&view=diff
==============================================================================
--- hive/trunk/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp (original)
+++ hive/trunk/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp Tue Apr 2 14:16:34 2013
@@ -48,7 +48,8 @@ int _kOperatorTypeValues[] = {
OperatorType::LATERALVIEWJOIN,
OperatorType::LATERALVIEWFORWARD,
OperatorType::HASHTABLESINK,
- OperatorType::HASHTABLEDUMMY
+ OperatorType::HASHTABLEDUMMY,
+ OperatorType::PTF
};
const char* _kOperatorTypeNames[] = {
"JOIN",
@@ -68,9 +69,10 @@ const char* _kOperatorTypeNames[] = {
"LATERALVIEWJOIN",
"LATERALVIEWFORWARD",
"HASHTABLESINK",
- "HASHTABLEDUMMY"
+ "HASHTABLEDUMMY",
+ "PTF"
};
-const std::map<int, const char*> _OperatorType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(18, _kOperatorTypeValues, _kOperatorTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL));
+const std::map<int, const char*> _OperatorType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(19, _kOperatorTypeValues, _kOperatorTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL));
int _kTaskTypeValues[] = {
TaskType::MAP,
Modified: hive/trunk/ql/src/gen/thrift/gen-cpp/queryplan_types.h
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/gen/thrift/gen-cpp/queryplan_types.h?rev=1463556&r1=1463555&r2=1463556&view=diff
==============================================================================
--- hive/trunk/ql/src/gen/thrift/gen-cpp/queryplan_types.h (original)
+++ hive/trunk/ql/src/gen/thrift/gen-cpp/queryplan_types.h Tue Apr 2 14:16:34 2013
@@ -53,7 +53,8 @@ struct OperatorType {
LATERALVIEWJOIN = 14,
LATERALVIEWFORWARD = 15,
HASHTABLESINK = 16,
- HASHTABLEDUMMY = 17
+ HASHTABLEDUMMY = 17,
+ PTF = 18
};
};
Modified: hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/Operator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/Operator.java?rev=1463556&r1=1463555&r2=1463556&view=diff
==============================================================================
--- hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/Operator.java (original)
+++ hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/Operator.java Tue Apr 2 14:16:34 2013
@@ -810,7 +810,7 @@ public class Operator implements org.apa
for (int _i25 = 0; _i25 < _map24.size; ++_i25)
{
String _key26; // required
- String _val27; // optional
+ String _val27; // required
_key26 = iprot.readString();
_val27 = iprot.readString();
struct.operatorAttributes.put(_key26, _val27);
@@ -830,7 +830,7 @@ public class Operator implements org.apa
for (int _i29 = 0; _i29 < _map28.size; ++_i29)
{
String _key30; // required
- long _val31; // optional
+ long _val31; // required
_key30 = iprot.readString();
_val31 = iprot.readI64();
struct.operatorCounters.put(_key30, _val31);
@@ -1003,7 +1003,7 @@ public class Operator implements org.apa
for (int _i37 = 0; _i37 < _map36.size; ++_i37)
{
String _key38; // required
- String _val39; // optional
+ String _val39; // required
_key38 = iprot.readString();
_val39 = iprot.readString();
struct.operatorAttributes.put(_key38, _val39);
@@ -1018,7 +1018,7 @@ public class Operator implements org.apa
for (int _i41 = 0; _i41 < _map40.size; ++_i41)
{
String _key42; // required
- long _val43; // optional
+ long _val43; // required
_key42 = iprot.readString();
_val43 = iprot.readI64();
struct.operatorCounters.put(_key42, _val43);
Modified: hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java?rev=1463556&r1=1463555&r2=1463556&view=diff
==============================================================================
--- hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java (original)
+++ hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java Tue Apr 2 14:16:34 2013
@@ -29,7 +29,8 @@ public enum OperatorType implements org.
LATERALVIEWJOIN(14),
LATERALVIEWFORWARD(15),
HASHTABLESINK(16),
- HASHTABLEDUMMY(17);
+ HASHTABLEDUMMY(17),
+ PTF(18);
private final int value;
@@ -86,6 +87,8 @@ public enum OperatorType implements org.
return HASHTABLESINK;
case 17:
return HASHTABLEDUMMY;
+ case 18:
+ return PTF;
default:
return null;
}
Modified: hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/Query.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/Query.java?rev=1463556&r1=1463555&r2=1463556&view=diff
==============================================================================
--- hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/Query.java (original)
+++ hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/Query.java Tue Apr 2 14:16:34 2013
@@ -983,7 +983,7 @@ public class Query implements org.apache
for (int _i101 = 0; _i101 < _map100.size; ++_i101)
{
String _key102; // required
- String _val103; // optional
+ String _val103; // required
_key102 = iprot.readString();
_val103 = iprot.readString();
struct.queryAttributes.put(_key102, _val103);
@@ -1003,7 +1003,7 @@ public class Query implements org.apache
for (int _i105 = 0; _i105 < _map104.size; ++_i105)
{
String _key106; // required
- long _val107; // optional
+ long _val107; // required
_key106 = iprot.readString();
_val107 = iprot.readI64();
struct.queryCounters.put(_key106, _val107);
@@ -1239,7 +1239,7 @@ public class Query implements org.apache
for (int _i118 = 0; _i118 < _map117.size; ++_i118)
{
String _key119; // required
- String _val120; // optional
+ String _val120; // required
_key119 = iprot.readString();
_val120 = iprot.readString();
struct.queryAttributes.put(_key119, _val120);
@@ -1254,7 +1254,7 @@ public class Query implements org.apache
for (int _i122 = 0; _i122 < _map121.size; ++_i122)
{
String _key123; // required
- long _val124; // optional
+ long _val124; // required
_key123 = iprot.readString();
_val124 = iprot.readI64();
struct.queryCounters.put(_key123, _val124);
Modified: hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/Stage.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/Stage.java?rev=1463556&r1=1463555&r2=1463556&view=diff
==============================================================================
--- hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/Stage.java (original)
+++ hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/Stage.java Tue Apr 2 14:16:34 2013
@@ -911,7 +911,7 @@ public class Stage implements org.apache
for (int _i73 = 0; _i73 < _map72.size; ++_i73)
{
String _key74; // required
- String _val75; // optional
+ String _val75; // required
_key74 = iprot.readString();
_val75 = iprot.readString();
struct.stageAttributes.put(_key74, _val75);
@@ -931,7 +931,7 @@ public class Stage implements org.apache
for (int _i77 = 0; _i77 < _map76.size; ++_i77)
{
String _key78; // required
- long _val79; // optional
+ long _val79; // required
_key78 = iprot.readString();
_val79 = iprot.readI64();
struct.stageCounters.put(_key78, _val79);
@@ -1147,7 +1147,7 @@ public class Stage implements org.apache
for (int _i90 = 0; _i90 < _map89.size; ++_i90)
{
String _key91; // required
- String _val92; // optional
+ String _val92; // required
_key91 = iprot.readString();
_val92 = iprot.readString();
struct.stageAttributes.put(_key91, _val92);
@@ -1162,7 +1162,7 @@ public class Stage implements org.apache
for (int _i94 = 0; _i94 < _map93.size; ++_i94)
{
String _key95; // required
- long _val96; // optional
+ long _val96; // required
_key95 = iprot.readString();
_val96 = iprot.readI64();
struct.stageCounters.put(_key95, _val96);
Modified: hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/Task.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/Task.java?rev=1463556&r1=1463555&r2=1463556&view=diff
==============================================================================
--- hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/Task.java (original)
+++ hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/Task.java Tue Apr 2 14:16:34 2013
@@ -996,7 +996,7 @@ public class Task implements org.apache.
for (int _i45 = 0; _i45 < _map44.size; ++_i45)
{
String _key46; // required
- String _val47; // optional
+ String _val47; // required
_key46 = iprot.readString();
_val47 = iprot.readString();
struct.taskAttributes.put(_key46, _val47);
@@ -1016,7 +1016,7 @@ public class Task implements org.apache.
for (int _i49 = 0; _i49 < _map48.size; ++_i49)
{
String _key50; // required
- long _val51; // optional
+ long _val51; // required
_key50 = iprot.readString();
_val51 = iprot.readI64();
struct.taskCounters.put(_key50, _val51);
@@ -1256,7 +1256,7 @@ public class Task implements org.apache.
for (int _i62 = 0; _i62 < _map61.size; ++_i62)
{
String _key63; // required
- String _val64; // optional
+ String _val64; // required
_key63 = iprot.readString();
_val64 = iprot.readString();
struct.taskAttributes.put(_key63, _val64);
@@ -1271,7 +1271,7 @@ public class Task implements org.apache.
for (int _i66 = 0; _i66 < _map65.size; ++_i66)
{
String _key67; // required
- long _val68; // optional
+ long _val68; // required
_key67 = iprot.readString();
_val68 = iprot.readI64();
struct.taskCounters.put(_key67, _val68);
Modified: hive/trunk/ql/src/gen/thrift/gen-php/Types.php
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/gen/thrift/gen-php/Types.php?rev=1463556&r1=1463555&r2=1463556&view=diff
==============================================================================
--- hive/trunk/ql/src/gen/thrift/gen-php/Types.php (original)
+++ hive/trunk/ql/src/gen/thrift/gen-php/Types.php Tue Apr 2 14:16:34 2013
@@ -53,6 +53,7 @@ final class OperatorType {
const LATERALVIEWFORWARD = 15;
const HASHTABLESINK = 16;
const HASHTABLEDUMMY = 17;
+ const PTF = 18;
static public $__names = array(
0 => 'JOIN',
1 => 'MAPJOIN',
@@ -72,6 +73,7 @@ final class OperatorType {
15 => 'LATERALVIEWFORWARD',
16 => 'HASHTABLESINK',
17 => 'HASHTABLEDUMMY',
+ 18 => 'PTF',
);
}
Modified: hive/trunk/ql/src/gen/thrift/gen-py/queryplan/ttypes.py
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/gen/thrift/gen-py/queryplan/ttypes.py?rev=1463556&r1=1463555&r2=1463556&view=diff
==============================================================================
--- hive/trunk/ql/src/gen/thrift/gen-py/queryplan/ttypes.py (original)
+++ hive/trunk/ql/src/gen/thrift/gen-py/queryplan/ttypes.py Tue Apr 2 14:16:34 2013
@@ -63,6 +63,7 @@ class OperatorType:
LATERALVIEWFORWARD = 15
HASHTABLESINK = 16
HASHTABLEDUMMY = 17
+ PTF = 18
_VALUES_TO_NAMES = {
0: "JOIN",
@@ -83,6 +84,7 @@ class OperatorType:
15: "LATERALVIEWFORWARD",
16: "HASHTABLESINK",
17: "HASHTABLEDUMMY",
+ 18: "PTF",
}
_NAMES_TO_VALUES = {
@@ -104,6 +106,7 @@ class OperatorType:
"LATERALVIEWFORWARD": 15,
"HASHTABLESINK": 16,
"HASHTABLEDUMMY": 17,
+ "PTF": 18,
}
class TaskType:
Modified: hive/trunk/ql/src/gen/thrift/gen-rb/queryplan_types.rb
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/gen/thrift/gen-rb/queryplan_types.rb?rev=1463556&r1=1463555&r2=1463556&view=diff
==============================================================================
--- hive/trunk/ql/src/gen/thrift/gen-rb/queryplan_types.rb (original)
+++ hive/trunk/ql/src/gen/thrift/gen-rb/queryplan_types.rb Tue Apr 2 14:16:34 2013
@@ -39,8 +39,9 @@ module OperatorType
LATERALVIEWFORWARD = 15
HASHTABLESINK = 16
HASHTABLEDUMMY = 17
- VALUE_MAP = {0 => "JOIN", 1 => "MAPJOIN", 2 => "EXTRACT", 3 => "FILTER", 4 => "FORWARD", 5 => "GROUPBY", 6 => "LIMIT", 7 => "SCRIPT", 8 => "SELECT", 9 => "TABLESCAN", 10 => "FILESINK", 11 => "REDUCESINK", 12 => "UNION", 13 => "UDTF", 14 => "LATERALVIEWJOIN", 15 => "LATERALVIEWFORWARD", 16 => "HASHTABLESINK", 17 => "HASHTABLEDUMMY"}
- VALID_VALUES = Set.new([JOIN, MAPJOIN, EXTRACT, FILTER, FORWARD, GROUPBY, LIMIT, SCRIPT, SELECT, TABLESCAN, FILESINK, REDUCESINK, UNION, UDTF, LATERALVIEWJOIN, LATERALVIEWFORWARD, HASHTABLESINK, HASHTABLEDUMMY]).freeze
+ PTF = 18
+ VALUE_MAP = {0 => "JOIN", 1 => "MAPJOIN", 2 => "EXTRACT", 3 => "FILTER", 4 => "FORWARD", 5 => "GROUPBY", 6 => "LIMIT", 7 => "SCRIPT", 8 => "SELECT", 9 => "TABLESCAN", 10 => "FILESINK", 11 => "REDUCESINK", 12 => "UNION", 13 => "UDTF", 14 => "LATERALVIEWJOIN", 15 => "LATERALVIEWFORWARD", 16 => "HASHTABLESINK", 17 => "HASHTABLEDUMMY", 18 => "PTF"}
+ VALID_VALUES = Set.new([JOIN, MAPJOIN, EXTRACT, FILTER, FORWARD, GROUPBY, LIMIT, SCRIPT, SELECT, TABLESCAN, FILESINK, REDUCESINK, UNION, UDTF, LATERALVIEWJOIN, LATERALVIEWFORWARD, HASHTABLESINK, HASHTABLEDUMMY, PTF]).freeze
end
module TaskType
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/QueryProperties.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/QueryProperties.java?rev=1463556&r1=1463555&r2=1463556&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/QueryProperties.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/QueryProperties.java Tue Apr 2 14:16:34 2013
@@ -37,6 +37,8 @@ public class QueryProperties {
boolean hasOrderBy = false;
boolean hasSortBy = false;
boolean hasJoinFollowedByGroupBy = false;
+ boolean hasPTF = false;
+ boolean hasWindowing = false;
// does the query have a using clause
boolean usesScript = false;
@@ -109,6 +111,22 @@ public class QueryProperties {
this.hasClusterBy = hasClusterBy;
}
+ public boolean hasPTF() {
+ return hasPTF;
+ }
+
+ public void setHasPTF(boolean hasPTF) {
+ this.hasPTF = hasPTF;
+ }
+
+ public boolean hasWindowing() {
+ return hasWindowing;
+ }
+
+ public void setHasWindowing(boolean hasWindowing) {
+ this.hasWindowing = hasWindowing;
+ }
+
public boolean isMapJoinRemoved() {
return mapJoinRemoved;
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java?rev=1463556&r1=1463555&r2=1463556&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java Tue Apr 2 14:16:34 2013
@@ -135,15 +135,25 @@ import org.apache.hadoop.hive.ql.udf.gen
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCount;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCovariance;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCovarianceSample;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCumeDist;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFDenseRank;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEWAHBitmap;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFFirstValue;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFHistogramNumeric;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFLastValue;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMin;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFNTile;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFPercentRank;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFPercentileApprox;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFRank;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFLead;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFLag;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver2;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFRowNumber;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStd;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStdSample;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFSum;
@@ -172,6 +182,8 @@ import org.apache.hadoop.hive.ql.udf.gen
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFInFile;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFIndex;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFInstr;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFLeadLag.GenericUDFLag;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFLeadLag.GenericUDFLead;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFLocate;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFMap;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFMapKeys;
@@ -201,8 +213,8 @@ import org.apache.hadoop.hive.ql.udf.gen
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFStruct;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFTimestamp;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToBinary;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToUnixTimeStamp;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToDecimal;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToUnixTimeStamp;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToUtcTimestamp;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFTranslate;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFUnion;
@@ -215,6 +227,11 @@ import org.apache.hadoop.hive.ql.udf.gen
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTFParseUrlTuple;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTFStack;
import org.apache.hadoop.hive.ql.udf.generic.SimpleGenericUDAFParameterInfo;
+import org.apache.hadoop.hive.ql.udf.ptf.NPath.NPathResolver;
+import org.apache.hadoop.hive.ql.udf.ptf.Noop.NoopResolver;
+import org.apache.hadoop.hive.ql.udf.ptf.NoopWithMap.NoopWithMapResolver;
+import org.apache.hadoop.hive.ql.udf.ptf.TableFunctionResolver;
+import org.apache.hadoop.hive.ql.udf.ptf.WindowingTableFunction.WindowingTableFunctionResolver;
import org.apache.hadoop.hive.ql.udf.xml.GenericUDFXPath;
import org.apache.hadoop.hive.ql.udf.xml.UDFXPathBoolean;
import org.apache.hadoop.hive.ql.udf.xml.UDFXPathDouble;
@@ -237,6 +254,7 @@ import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.NodeList;
+
/**
* FunctionRegistry.
*/
@@ -248,6 +266,26 @@ public final class FunctionRegistry {
* The mapping from expression function names to expression classes.
*/
static Map<String, FunctionInfo> mFunctions = Collections.synchronizedMap(new LinkedHashMap<String, FunctionInfo>());
+
+ /*
+ * PTF variables
+ * */
+
+ public static final String LEAD_FUNC_NAME = "lead";
+ public static final String LAG_FUNC_NAME = "lag";
+
+ public static final String WINDOWING_TABLE_FUNCTION = "windowingtablefunction";
+ public static final String NOOP_TABLE_FUNCTION = "noop";
+ public static final String NOOP_MAP_TABLE_FUNCTION = "noopwithmap";
+
+ static Map<String, PTFFunctionInfo> tableFunctions = Collections.synchronizedMap(new LinkedHashMap<String, PTFFunctionInfo>());
+ static Map<String, WindowFunctionInfo> windowFunctions = Collections.synchronizedMap(new LinkedHashMap<String, WindowFunctionInfo>());
+
+ /*
+ * UDAFS that only work when the input rows have an order.
+ */
+ public static final HashSet<String> UDAFS_IMPLY_ORDER = new HashSet<String>();
+
static {
registerUDF("concat", UDFConcat.class, false);
registerUDF("substr", UDFSubstr.class, false);
@@ -482,6 +520,36 @@ public final class FunctionRegistry {
registerGenericUDTF("json_tuple", GenericUDTFJSONTuple.class);
registerGenericUDTF("parse_url_tuple", GenericUDTFParseUrlTuple.class);
registerGenericUDTF("stack", GenericUDTFStack.class);
+
+ //PTF declarations
+ registerGenericUDF(true, LEAD_FUNC_NAME, GenericUDFLead.class);
+ registerGenericUDF(true, LAG_FUNC_NAME, GenericUDFLag.class);
+
+ registerHiveUDAFsAsWindowFunctions();
+ registerWindowFunction("row_number", new GenericUDAFRowNumber());
+ registerWindowFunction("rank", new GenericUDAFRank());
+ registerWindowFunction("dense_rank", new GenericUDAFDenseRank());
+ registerWindowFunction("percent_rank", new GenericUDAFPercentRank());
+ registerWindowFunction("cume_dist", new GenericUDAFCumeDist());
+ registerWindowFunction("ntile", new GenericUDAFNTile());
+ registerWindowFunction("first_value", new GenericUDAFFirstValue());
+ registerWindowFunction("last_value", new GenericUDAFLastValue());
+ registerWindowFunction(LEAD_FUNC_NAME, new GenericUDAFLead(), false);
+ registerWindowFunction(LAG_FUNC_NAME, new GenericUDAFLag(), false);
+
+ UDAFS_IMPLY_ORDER.add("rank");
+ UDAFS_IMPLY_ORDER.add("dense_rank");
+ UDAFS_IMPLY_ORDER.add("percent_rank");
+ UDAFS_IMPLY_ORDER.add("cume_dist");
+ UDAFS_IMPLY_ORDER.add(LEAD_FUNC_NAME);
+ UDAFS_IMPLY_ORDER.add(LAG_FUNC_NAME);
+ UDAFS_IMPLY_ORDER.add("first_value");
+ UDAFS_IMPLY_ORDER.add("last_value");
+
+ registerTableFunction(NOOP_TABLE_FUNCTION, NoopResolver.class);
+ registerTableFunction(NOOP_MAP_TABLE_FUNCTION, NoopWithMapResolver.class);
+ registerTableFunction(WINDOWING_TABLE_FUNCTION, WindowingTableFunctionResolver.class);
+ registerTableFunction("npath", NPathResolver.class);
}
public static void registerTemporaryUDF(String functionName,
@@ -795,6 +863,26 @@ public final class FunctionRegistry {
return udafEvaluator;
}
+ @SuppressWarnings("deprecation")
+ public static GenericUDAFEvaluator getGenericWindowingEvaluator(String name,
+ List<ObjectInspector> argumentOIs, boolean isDistinct,
+ boolean isAllColumns) throws SemanticException {
+
+ WindowFunctionInfo finfo = windowFunctions.get(name.toLowerCase());
+ if (finfo == null) { return null;}
+ if ( !name.toLowerCase().equals(LEAD_FUNC_NAME) &&
+ !name.toLowerCase().equals(LAG_FUNC_NAME) ) {
+ return getGenericUDAFEvaluator(name, argumentOIs, isDistinct, isAllColumns);
+ }
+
+ // this must be lead/lag UDAF
+ ObjectInspector args[] = new ObjectInspector[argumentOIs.size()];
+ GenericUDAFResolver udafResolver = finfo.getfInfo().getGenericUDAFResolver();
+ GenericUDAFParameterInfo paramInfo = new SimpleGenericUDAFParameterInfo(
+ argumentOIs.toArray(args), isDistinct, isAllColumns);
+ return ((GenericUDAFResolver2) udafResolver).getEvaluator(paramInfo);
+ }
+
/**
* This method is shared between UDFRegistry and UDAFRegistry. methodName will
* be "evaluate" for UDFRegistry, and "aggregate"/"evaluate"/"evaluatePartial"
@@ -1033,36 +1121,36 @@ public final class FunctionRegistry {
}
if (udfMethods.size() > 1) {
- // if the only difference is numeric types, pick the method
+ // if the only difference is numeric types, pick the method
// with the smallest overall numeric type.
int lowestNumericType = Integer.MAX_VALUE;
boolean multiple = true;
Method candidate = null;
List<TypeInfo> referenceArguments = null;
-
+
for (Method m: udfMethods) {
int maxNumericType = 0;
-
+
List<TypeInfo> argumentsAccepted = TypeInfoUtils.getParameterTypeInfos(m, argumentsPassed.size());
-
+
if (referenceArguments == null) {
- // keep the arguments for reference - we want all the non-numeric
+ // keep the arguments for reference - we want all the non-numeric
// arguments to be the same
referenceArguments = argumentsAccepted;
}
-
+
Iterator<TypeInfo> referenceIterator = referenceArguments.iterator();
-
+
for (TypeInfo accepted: argumentsAccepted) {
TypeInfo reference = referenceIterator.next();
-
+
if (numericTypes.containsKey(accepted)) {
// We're looking for the udf with the smallest maximum numeric type.
int typeValue = numericTypes.get(accepted);
maxNumericType = typeValue > maxNumericType ? typeValue : maxNumericType;
} else if (!accepted.equals(reference)) {
// There are non-numeric arguments that don't match from one UDF to
- // another. We give up at this point.
+ // another. We give up at this point.
throw new AmbiguousMethodException(udfClass, argumentsPassed, mlist);
}
}
@@ -1351,4 +1439,93 @@ public final class FunctionRegistry {
private FunctionRegistry() {
// prevent instantiation
}
+
+
+ //---------PTF functions------------
+
+ public static void registerWindowFunction(String name, GenericUDAFResolver wFn)
+ {
+ registerWindowFunction(name, wFn, true);
+ }
+
+ /**
+ * Typically a WindowFunction is the same as a UDAF. The only exceptions are Lead & Lag UDAFs. These
+ * are not registered as regular UDAFs because
+ * - we plan to support Lead & Lag as UDFs (usable only within argument expressions
+ * of UDAFs when windowing is involved). Since mFunctions holds both UDFs and UDAFs we cannot
+ * add both FunctionInfos to mFunctions.
+ * We choose to only register UDFs in mFunctions. The implication of this is that Lead/Lag UDAFs
+ * are only usable when windowing is involved.
+ *
+ * @param name
+ * @param wFn
+ * @param registerAsUDAF
+ */
+ public static void registerWindowFunction(String name, GenericUDAFResolver wFn, boolean registerAsUDAF)
+ {
+ FunctionInfo fInfo = null;
+ if (registerAsUDAF) {
+ registerGenericUDAF(true, name, wFn);
+ fInfo = getFunctionInfo(name);
+ }
+ else {
+ fInfo = new FunctionInfo(true,
+ name.toLowerCase(), wFn);
+ }
+
+ WindowFunctionInfo wInfo = new WindowFunctionInfo(fInfo);
+ windowFunctions.put(name.toLowerCase(), wInfo);
+ }
+
+ public static WindowFunctionInfo getWindowFunctionInfo(String name)
+ {
+ return windowFunctions.get(name.toLowerCase());
+ }
+
+ public static boolean impliesOrder(String functionName) {
+ return functionName == null ? false : UDAFS_IMPLY_ORDER.contains(functionName.toLowerCase());
+ }
+
+ static void registerHiveUDAFsAsWindowFunctions()
+ {
+ Set<String> fNames = getFunctionNames();
+ for(String fName : fNames)
+ {
+ FunctionInfo fInfo = getFunctionInfo(fName);
+ if ( fInfo.isGenericUDAF())
+ {
+ WindowFunctionInfo wInfo = new WindowFunctionInfo(fInfo);
+ windowFunctions.put(fName, wInfo);
+ }
+ }
+ }
+
+ public static boolean isTableFunction(String name)
+ {
+ PTFFunctionInfo tFInfo = tableFunctions.get(name.toLowerCase());
+ return tFInfo != null && !tFInfo.isInternal();
+ }
+
+ public static TableFunctionResolver getTableFunctionResolver(String name)
+ {
+ PTFFunctionInfo tfInfo = tableFunctions.get(name.toLowerCase());
+ return (TableFunctionResolver) ReflectionUtils.newInstance(tfInfo.getFunctionResolver(), null);
+ }
+
+ public static TableFunctionResolver getWindowingTableFunction()
+ {
+ return getTableFunctionResolver(WINDOWING_TABLE_FUNCTION);
+ }
+
+ public static TableFunctionResolver getNoopTableFunction()
+ {
+ return getTableFunctionResolver(NOOP_TABLE_FUNCTION);
+ }
+
+ public static void registerTableFunction(String name, Class<? extends TableFunctionResolver> tFnCls)
+ {
+ PTFFunctionInfo tInfo = new PTFFunctionInfo(name, tFnCls);
+ tableFunctions.put(name.toLowerCase(), tInfo);
+ }
+
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java?rev=1463556&r1=1463555&r2=1463556&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java Tue Apr 2 14:16:34 2013
@@ -36,6 +36,7 @@ import org.apache.hadoop.hive.ql.plan.La
import org.apache.hadoop.hive.ql.plan.LimitDesc;
import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.PTFDesc;
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
import org.apache.hadoop.hive.ql.plan.SMBJoinDesc;
import org.apache.hadoop.hive.ql.plan.ScriptDesc;
@@ -74,6 +75,7 @@ public final class OperatorFactory {
opvec.add(new OpTuple<FileSinkDesc>(FileSinkDesc.class, FileSinkOperator.class));
opvec.add(new OpTuple<CollectDesc>(CollectDesc.class, CollectOperator.class));
opvec.add(new OpTuple<ScriptDesc>(ScriptDesc.class, ScriptOperator.class));
+ opvec.add(new OpTuple<PTFDesc>(PTFDesc.class, PTFOperator.class));
opvec.add(new OpTuple<ReduceSinkDesc>(ReduceSinkDesc.class, ReduceSinkOperator.class));
opvec.add(new OpTuple<ExtractDesc>(ExtractDesc.class, ExtractOperator.class));
opvec.add(new OpTuple<GroupByDesc>(GroupByDesc.class, GroupByOperator.class));
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFFunctionInfo.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFFunctionInfo.java?rev=1463556&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFFunctionInfo.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFFunctionInfo.java Tue Apr 2 14:16:34 2013
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import org.apache.hadoop.hive.ql.exec.PartitionTableFunctionDescription;
+import org.apache.hadoop.hive.ql.udf.ptf.TableFunctionResolver;
+
+class PTFFunctionInfo
+{
+ String displayName;
+ Class<? extends TableFunctionResolver> functionResolver;
+ boolean isInternal;
+
+ public PTFFunctionInfo(String displayName, Class<? extends TableFunctionResolver> tFnCls)
+ {
+ super();
+ this.displayName = displayName;
+ this.functionResolver = tFnCls;
+ isInternal = false;
+ PartitionTableFunctionDescription def = functionResolver.getAnnotation(PartitionTableFunctionDescription.class);
+ if ( def != null)
+ {
+ isInternal = def.isInternal();
+ }
+ }
+
+ public String getDisplayName()
+ {
+ return displayName;
+ }
+
+ public Class<? extends TableFunctionResolver> getFunctionResolver()
+ {
+ return functionResolver;
+ }
+
+ public boolean isInternal()
+ {
+ return isInternal;
+ }
+
+}
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java?rev=1463556&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java Tue Apr 2 14:16:34 2013
@@ -0,0 +1,432 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Stack;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.PTFPartition.PTFPartitionIterator;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.PTFDesc;
+import org.apache.hadoop.hive.ql.plan.PTFDesc.PTFExpressionDef;
+import org.apache.hadoop.hive.ql.plan.PTFDesc.PTFInputDef;
+import org.apache.hadoop.hive.ql.plan.PTFDesc.PartitionDef;
+import org.apache.hadoop.hive.ql.plan.PTFDesc.PartitionedTableFunctionDef;
+import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowExpressionDef;
+import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowTableFunctionDef;
+import org.apache.hadoop.hive.ql.plan.PTFDeserializer;
+import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFLeadLag;
+import org.apache.hadoop.hive.ql.udf.ptf.TableFunctionEvaluator;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+
+public class PTFOperator extends Operator<PTFDesc> implements Serializable
+{
+
+ private static final long serialVersionUID = 1L;
+ PTFPartition inputPart;
+ boolean isMapOperator;
+
+ transient KeyWrapperFactory keyWrapperFactory;
+ protected transient KeyWrapper currentKeys;
+ protected transient KeyWrapper newKeys;
+ transient HiveConf hiveConf;
+
+
+ /*
+ * 1. Find out if the operator is invoked at Map-Side or Reduce-side
+ * 2. Get the deserialized QueryDef
+ * 3. Reconstruct the transient variables in QueryDef
+ * 4. Create input partition to store rows coming from previous operator
+ */
+ @Override
+ protected void initializeOp(Configuration jobConf) throws HiveException
+ {
+ hiveConf = new HiveConf(jobConf, PTFOperator.class);
+ // if the parent is ExtractOperator, this invocation is from reduce-side
+ Operator<? extends OperatorDesc> parentOp = getParentOperators().get(0);
+ if (parentOp instanceof ExtractOperator)
+ {
+ isMapOperator = false;
+ }
+ else
+ {
+ isMapOperator = true;
+ }
+
+ reconstructQueryDef(hiveConf);
+ inputPart = createFirstPartitionForChain(
+ inputObjInspectors[0], hiveConf, isMapOperator);
+
+ if (isMapOperator)
+ {
+ PartitionedTableFunctionDef tDef = conf.getStartOfChain();
+ outputObjInspector = tDef.getRawInputShape().getOI();
+ }
+ else
+ {
+ outputObjInspector = conf.getFuncDef().getOutputShape().getOI();
+ }
+
+ setupKeysWrapper(inputObjInspectors[0]);
+
+ super.initializeOp(jobConf);
+ }
+
+ @Override
+ protected void closeOp(boolean abort) throws HiveException
+ {
+ super.closeOp(abort);
+ if(inputPart.size() != 0){
+ if (isMapOperator)
+ {
+ processMapFunction();
+ }
+ else
+ {
+ processInputPartition();
+ }
+ }
+ }
+
+ @Override
+ public void processOp(Object row, int tag) throws HiveException
+ {
+ if (!isMapOperator )
+ {
+ /*
+ * checkif current row belongs to the current accumulated Partition:
+ * - If not:
+ * - process the current Partition
+ * - reset input Partition
+ * - set currentKey to the newKey if it is null or has changed.
+ */
+ newKeys.getNewKey(row, inputPart.getOI());
+ boolean keysAreEqual = (currentKeys != null && newKeys != null)?
+ newKeys.equals(currentKeys) : false;
+
+ if (currentKeys != null && !keysAreEqual)
+ {
+ processInputPartition();
+ inputPart.reset();
+ }
+
+ if (currentKeys == null || !keysAreEqual)
+ {
+ if (currentKeys == null)
+ {
+ currentKeys = newKeys.copyKey();
+ }
+ else
+ {
+ currentKeys.copyKey(newKeys);
+ }
+ }
+ }
+
+ // add row to current Partition.
+ inputPart.append(row);
+ }
+
+ /**
+ * Initialize the visitor to use the QueryDefDeserializer Use the order
+ * defined in QueryDefWalker to visit the QueryDef
+ *
+ * @param hiveConf
+ * @throws HiveException
+ */
+ protected void reconstructQueryDef(HiveConf hiveConf) throws HiveException
+ {
+
+ PTFDeserializer dS =
+ new PTFDeserializer(conf, (StructObjectInspector)inputObjInspectors[0], hiveConf);
+ dS.initializePTFChain(conf.getFuncDef());
+ }
+
+ protected void setupKeysWrapper(ObjectInspector inputOI) throws HiveException
+ {
+ PartitionDef pDef = conf.getStartOfChain().getPartition();
+ ArrayList<PTFExpressionDef> exprs = pDef.getExpressions();
+ int numExprs = exprs.size();
+ ExprNodeEvaluator[] keyFields = new ExprNodeEvaluator[numExprs];
+ ObjectInspector[] keyOIs = new ObjectInspector[numExprs];
+ ObjectInspector[] currentKeyOIs = new ObjectInspector[numExprs];
+
+ for(int i=0; i<numExprs; i++)
+ {
+ PTFExpressionDef exprDef = exprs.get(i);
+ /*
+ * Why cannot we just use the ExprNodeEvaluator on the column?
+ * - because on the reduce-side it is initialized based on the rowOI of the HiveTable
+ * and not the OI of the ExtractOp ( the parent of this Operator on the reduce-side)
+ */
+ keyFields[i] = ExprNodeEvaluatorFactory.get(exprDef.getExprNode());
+ keyOIs[i] = keyFields[i].initialize(inputOI);
+ currentKeyOIs[i] =
+ ObjectInspectorUtils.getStandardObjectInspector(keyOIs[i],
+ ObjectInspectorCopyOption.WRITABLE);
+ }
+
+ keyWrapperFactory = new KeyWrapperFactory(keyFields, keyOIs, currentKeyOIs);
+ newKeys = keyWrapperFactory.getKeyWrapper();
+ }
+
+ protected void processInputPartition() throws HiveException
+ {
+ PTFPartition outPart = executeChain(inputPart);
+ if ( conf.forWindowing() ) {
+ executeWindowExprs(outPart);
+ }
+ else {
+ PTFPartitionIterator<Object> pItr = outPart.iterator();
+ while (pItr.hasNext())
+ {
+ Object oRow = pItr.next();
+ forward(oRow, outputObjInspector);
+ }
+ }
+ }
+
+ protected void processMapFunction() throws HiveException
+ {
+ PartitionedTableFunctionDef tDef = conf.getStartOfChain();
+ PTFPartition outPart = tDef.getTFunction().transformRawInput(inputPart);
+ PTFPartitionIterator<Object> pItr = outPart.iterator();
+ while (pItr.hasNext())
+ {
+ Object oRow = pItr.next();
+ forward(oRow, outputObjInspector);
+ }
+ }
+
+ /**
+ * @return the name of the operator
+ */
+ @Override
+ public String getName() {
+ return getOperatorName();
+ }
+
+ static public String getOperatorName() {
+ return "PTF";
+ }
+
+
+ @Override
+ public OperatorType getType()
+ {
+ return OperatorType.PTF;
+ }
+
+ /**
+ * For all the table functions to be applied to the input
+ * hive table or query, push them on a stack.
+ * For each table function popped out of the stack,
+ * execute the function on the input partition
+ * and return an output partition.
+ * @param part
+ * @return
+ * @throws HiveException
+ */
+ private PTFPartition executeChain(PTFPartition part)
+ throws HiveException
+ {
+ Stack<PartitionedTableFunctionDef> fnDefs = new Stack<PartitionedTableFunctionDef>();
+ PTFInputDef iDef = conf.getFuncDef();
+ while (true)
+ {
+ if (iDef instanceof PartitionedTableFunctionDef)
+ {
+ fnDefs.push((PartitionedTableFunctionDef) iDef);
+ iDef = ((PartitionedTableFunctionDef) iDef).getInput();
+ }
+ else
+ {
+ break;
+ }
+ }
+
+ PartitionedTableFunctionDef currFnDef;
+ while (!fnDefs.isEmpty())
+ {
+ currFnDef = fnDefs.pop();
+ part = currFnDef.getTFunction().execute(part);
+ }
+ return part;
+ }
+
+ /**
+ * If WindowingSpec contains any Windowing Expressions or has a
+ * Having condition then these are processed
+ * and forwarded on. Whereas when there is no having or WdwExprs
+ * just forward the rows in the output Partition.
+ *
+ * For e.g. Consider the following query:
+ * <pre>
+ * {@code
+ * select rank(), lead(rank(),1),...
+ * from xyz
+ * ...
+ * having rank() > 1
+ * }
+ * </pre>
+ * rank() gets processed as a WdwFn; Its in the oPart(output partition)
+ * argument to executeWindowExprs. Here we first evaluate the having expression.
+ * So the first row of oPart gets filtered out.
+ * Next we evaluate lead(rank()) which is held as a WindowExpression and add it to the output.
+ *
+ * @param ptfDesc
+ * @param oPart output partition after Window Fns are processed.
+ * @param op
+ * @throws HiveException
+ */
+ private void executeWindowExprs(PTFPartition oPart)
+ throws HiveException
+ {
+ WindowTableFunctionDef wTFnDef = (WindowTableFunctionDef) conf.getFuncDef();
+ /*
+ * inputOI represents the row with WindowFn results present.
+ * So in the e.g. above it will have a column for 'rank()'
+ */
+ StructObjectInspector inputOI = wTFnDef.getOutputFromWdwFnProcessing().getOI();
+ /*
+ * outputOI represents the final row with the Windowing Expressions added.
+ * So in the e.g. above it will have a column for 'lead(rank())' in addition to
+ * all columns in inputOI.
+ */
+ StructObjectInspector outputOI = wTFnDef.getOutputShape().getOI();
+ int numCols = outputOI.getAllStructFieldRefs().size();
+ ArrayList<WindowExpressionDef> wdwExprs = wTFnDef.getWindowExpressions();
+ int numWdwExprs = wdwExprs == null ? 0 : wdwExprs.size();
+ Object[] output = new Object[numCols];
+
+ /*
+ * If this Windowing invocation has no Window Expressions and doesn't need to be filtered,
+ * we can just forward the row in the oPart partition.
+ */
+ boolean forwardRowsUntouched = (wdwExprs == null || wdwExprs.size() == 0 );
+
+ PTFPartitionIterator<Object> pItr = oPart.iterator();
+ PTFOperator.connectLeadLagFunctionsToPartition(conf, pItr);
+ while (pItr.hasNext())
+ {
+ int colCnt = 0;
+ Object oRow = pItr.next();
+
+ /*
+ * when there is no Windowing expressions or having;
+ * just forward the Object coming out of the Partition.
+ */
+ if ( forwardRowsUntouched ) {
+ forward(oRow, outputObjInspector);
+ continue;
+ }
+
+ /*
+ * Setup the output row columns in the following order
+ * - the columns in the SelectList processed by the PTF
+ * (ie the Select Exprs that have navigation expressions)
+ * - the columns from the final PTF.
+ */
+
+ if ( wdwExprs != null ) {
+ for (WindowExpressionDef wdwExpr : wdwExprs)
+ {
+ Object newCol = wdwExpr.getExprEvaluator().evaluate(oRow);
+ output[colCnt++] = newCol;
+ }
+ }
+
+ for(; colCnt < numCols; ) {
+ StructField field = inputOI.getAllStructFieldRefs().get(colCnt - numWdwExprs);
+ output[colCnt++] =
+ ObjectInspectorUtils.copyToStandardObject(inputOI.getStructFieldData(oRow, field),
+ field.getFieldObjectInspector());
+ }
+
+ forward(output, outputObjInspector);
+ }
+ }
+
+ /**
+ * Create a new Partition.
+ * A partition has 2 OIs: the OI for the rows being put in and the OI for the rows
+ * coming out. You specify the output OI by giving the Serde to use to Serialize.
+ * Typically these 2 OIs are the same; but not always. For the
+ * first PTF in a chain the OI of the incoming rows is dictated by the Parent Op
+ * to this PTFOp. The output OI from the Partition is typically LazyBinaryStruct, but
+ * not always. In the case of Noop/NoopMap we keep the Strcuture the same as
+ * what is given to us.
+ * <p>
+ * The Partition we want to create here is for feeding the First table function in the chain.
+ * So for map-side processing use the Serde from the output Shape its InputDef.
+ * For reduce-side processing use the Serde from its RawInputShape(the shape
+ * after map-side processing).
+ * @param oi
+ * @param hiveConf
+ * @param isMapSide
+ * @return
+ * @throws HiveException
+ */
+ public PTFPartition createFirstPartitionForChain(ObjectInspector oi,
+ HiveConf hiveConf, boolean isMapSide) throws HiveException
+ {
+ PartitionedTableFunctionDef tabDef = conf.getStartOfChain();
+ TableFunctionEvaluator tEval = tabDef.getTFunction();
+ String partClassName = tEval.getPartitionClass();
+ int partMemSize = tEval.getPartitionMemSize();
+
+ PTFPartition part = null;
+ SerDe serde = isMapSide ? tabDef.getInput().getOutputShape().getSerde() :
+ tabDef.getRawInputShape().getSerde();
+ part = new PTFPartition(partClassName, partMemSize, serde,
+ (StructObjectInspector) oi);
+ return part;
+
+ }
+
+ public static void connectLeadLagFunctionsToPartition(PTFDesc ptfDesc,
+ PTFPartitionIterator<Object> pItr) throws HiveException
+ {
+ List<ExprNodeGenericFuncDesc> llFnDescs = ptfDesc.getLlInfo().getLeadLagExprs();
+ if (llFnDescs == null) {
+ return;
+ }
+ for (ExprNodeGenericFuncDesc llFnDesc : llFnDescs)
+ {
+ GenericUDFLeadLag llFn = (GenericUDFLeadLag) llFnDesc
+ .getGenericUDF();
+ llFn.setpItr(pItr);
+ }
+ }
+
+
+
+}
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPartition.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPartition.java?rev=1463556&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPartition.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPartition.java Tue Apr 2 14:16:34 2013
@@ -0,0 +1,285 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.util.ConcurrentModificationException;
+import java.util.Iterator;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.ql.exec.PTFPersistence.ByteBasedList;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.Writable;
+
+/*
+ * represents a collection of rows that is acted upon by a TableFunction or a WindowFunction.
+ */
+public class PTFPartition
+{
+ SerDe serDe;
+ StructObjectInspector OI;
+ private ByteBasedList elems;
+ private Writable wRow;
+ private int sz;
+
+ public PTFPartition(HiveConf cfg, SerDe serDe, StructObjectInspector oI) throws HiveException
+ {
+ String partitionClass = HiveConf.getVar(cfg, ConfVars.HIVE_PTF_PARTITION_PERSISTENCE_CLASS);
+ int partitionMemSize = HiveConf.getIntVar(cfg, ConfVars.HIVE_PTF_PARTITION_PERSISTENT_SIZE);
+ init(partitionClass, partitionMemSize, serDe, oI);
+ }
+
+ public PTFPartition(String partitionClass, int partitionMemSize, SerDe serDe, StructObjectInspector oI) throws HiveException
+ {
+ init(partitionClass, partitionMemSize, serDe, oI);
+ }
+
+ private void init(String partitionClass, int partitionMemSize, SerDe serDe, StructObjectInspector oI) throws HiveException
+ {
+ this.serDe = serDe;
+ OI = oI;
+ elems = PTFPersistence.createList(partitionClass, partitionMemSize);
+ sz = 0;
+ wRow = createWritable();
+ }
+
+ public void reset() throws HiveException {
+ sz = 0;
+ elems.reset(0);
+ }
+
+ public SerDe getSerDe()
+ {
+ return serDe;
+ }
+ public void setSerDe(SerDe serDe)
+ {
+ this.serDe = serDe;
+ }
+ public StructObjectInspector getOI()
+ {
+ return OI;
+ }
+ public void setOI(StructObjectInspector oI)
+ {
+ OI = oI;
+ }
+
+ private Writable createWritable() throws HiveException
+ {
+ try
+ {
+ return serDe.getSerializedClass().newInstance();
+ }
+ catch(Throwable t)
+ {
+ throw new HiveException(t);
+ }
+ }
+
+ public Object getAt(int i) throws HiveException
+ {
+ try
+ {
+ elems.get(i, wRow);
+ Object o = serDe.deserialize(wRow);
+ return o;
+ }
+ catch(SerDeException se)
+ {
+ throw new HiveException(se);
+ }
+ }
+
+ public Object getWritableAt(int i) throws HiveException
+ {
+ elems.get(i, wRow);
+ return wRow;
+ }
+
+ public void append(Writable o) throws HiveException
+ {
+ elems.append(o);
+ sz++;
+ }
+
+ public void append(Object o) throws HiveException
+ {
+ try
+ {
+ append(serDe.serialize(o, OI));
+ }
+ catch(SerDeException e)
+ {
+ throw new HiveException(e);
+ }
+ }
+
+ public int size()
+ {
+ return sz;
+ }
+
+ public PTFPartitionIterator<Object> iterator()
+ {
+ return new PItr(0, size());
+ }
+
+ public PTFPartitionIterator<Object> range(int start, int end)
+ {
+ assert(start >= 0);
+ assert(end <= size());
+ assert(start <= end);
+ return new PItr(start, end);
+ }
+
+ class PItr implements PTFPartitionIterator<Object>
+ {
+ int idx;
+ final int start;
+ final int end;
+ final int createTimeSz;
+
+ PItr(int start, int end)
+ {
+ this.idx = start;
+ this.start = start;
+ this.end = end;
+ createTimeSz = PTFPartition.this.size();
+ }
+
+ public boolean hasNext()
+ {
+ checkForComodification() ;
+ return idx < end;
+ }
+
+ public Object next()
+ {
+ checkForComodification();
+ try
+ {
+ return PTFPartition.this.getAt(idx++);
+ }
+ catch(HiveException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ final void checkForComodification()
+ {
+ if (createTimeSz != PTFPartition.this.size()) {
+ throw new ConcurrentModificationException();
+ }
+ }
+
+ @Override
+ public int getIndex()
+ {
+ return idx;
+ }
+
+ private Object getAt(int i)
+ {
+ try
+ {
+ return PTFPartition.this.getAt(i);
+ }
+ catch(HiveException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public Object lead(int amt)
+ {
+ int i = idx + amt;
+ i = i >= end ? end - 1 : i;
+ return getAt(i);
+ }
+
+ @Override
+ public Object lag(int amt)
+ {
+ int i = idx - amt;
+ i = i < start ? start : i;
+ return getAt(i);
+ }
+
+ @Override
+ public Object resetToIndex(int idx)
+ {
+ if ( idx < start || idx >= end )
+ {
+ return null;
+ }
+ Object o = getAt(idx);
+ this.idx = idx + 1;
+ return o;
+ }
+
+ @Override
+ public PTFPartition getPartition()
+ {
+ return PTFPartition.this;
+ }
+
+ @Override
+ public void reset()
+ {
+ idx = start;
+ }
+ };
+
+ /*
+ * provide an Iterator on the rows in a Partiton.
+ * Iterator exposes the index of the next location.
+ * Client can invoke lead/lag relative to the next location.
+ */
+ public static interface PTFPartitionIterator<T> extends Iterator<T>
+ {
+ int getIndex();
+
+ T lead(int amt);
+
+ T lag(int amt);
+
+ /*
+ * after a lead and lag call, allow Object associated with SerDe and writable associated with partition to be reset
+ * to the value for the current Index.
+ */
+ Object resetToIndex(int idx);
+
+ PTFPartition getPartition();
+
+ void reset();
+ }
+
+
+}