You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/04/02 16:16:37 UTC

svn commit: r1463556 [3/15] - in /hive/trunk: common/src/java/org/apache/hadoop/hive/conf/ data/files/ ql/if/ ql/src/gen/thrift/gen-cpp/ ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/ ql/src/gen/thrift/gen-php/ ql/src/gen/thrift/gen...

Added: hive/trunk/data/files/part.rc
URL: http://svn.apache.org/viewvc/hive/trunk/data/files/part.rc?rev=1463556&view=auto
==============================================================================
Binary file - no diff available.

Propchange: hive/trunk/data/files/part.rc
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: hive/trunk/data/files/part.seq
URL: http://svn.apache.org/viewvc/hive/trunk/data/files/part.seq?rev=1463556&view=auto
==============================================================================
Binary file - no diff available.

Propchange: hive/trunk/data/files/part.seq
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: hive/trunk/data/files/part_tiny.txt
URL: http://svn.apache.org/viewvc/hive/trunk/data/files/part_tiny.txt?rev=1463556&view=auto
==============================================================================
--- hive/trunk/data/files/part_tiny.txt (added)
+++ hive/trunk/data/files/part_tiny.txt Tue Apr  2 14:16:34 2013
@@ -0,0 +1,26 @@
+121152almond antique burnished rose metallicManufacturer#1Brand#14PROMO PLATED TIN2JUMBO BOX1173.15e pinto beans h
+121152almond antique burnished rose metallicManufacturer#1Brand#14PROMO PLATED TIN2JUMBO BOX1173.15e pinto beans h
+85768almond antique chartreuse lavender yellowManufacturer#1Brand#12LARGE BRUSHED STEEL34SM BAG1753.76refull
+110592almond antique salmon chartreuse burlywoodManufacturer#1Brand#15PROMO BURNISHED NICKEL6JUMBO PKG1602.59 to the furiously
+86428almond aquamarine burnished black steelManufacturer#1Brand#12STANDARD ANODIZED STEEL28WRAP BAG1414.42arefully 
+65667almond aquamarine pink moccasin thistleManufacturer#1Brand#12LARGE BURNISHED STEEL42JUMBO CASE1632.66e across the expr
+105685almond antique violet chocolate turquoiseManufacturer#2Brand#22MEDIUM ANODIZED COPPER14MED CAN1690.68ly pending requ
+191709almond antique violet turquoise frostedManufacturer#2Brand#22ECONOMY POLISHED STEEL40MED BOX1800.7 haggle
+146985almond aquamarine midnight light salmonManufacturer#2Brand#23MEDIUM BURNISHED COPPER2SM CASE2031.98s cajole caref
+132666almond aquamarine rose maroon antiqueManufacturer#2Brand#24SMALL POLISHED NICKEL25MED BOX1698.66even 
+195606almond aquamarine sandy cyan gainsboroManufacturer#2Brand#25STANDARD PLATED TIN18SM PKG1701.6ic de
+90681almond antique chartreuse khaki whiteManufacturer#3Brand#31MEDIUM BURNISHED TIN17SM CASE1671.68are slyly after the sl
+17273almond antique forest lavender goldenrodManufacturer#3Brand#35PROMO ANODIZED TIN14JUMBO CASE1190.27along the
+112398almond antique metallic orange dimManufacturer#3Brand#32MEDIUM BURNISHED BRASS19JUMBO JAR1410.39ole car
+40982almond antique misty red oliveManufacturer#3Brand#32ECONOMY PLATED COPPER1LG PKG1922.98c foxes can s
+144293almond antique olive coral navajoManufacturer#3Brand#34STANDARD POLISHED STEEL45JUMBO CAN1337.29ag furiously about 
+49671almond antique gainsboro frosted violetManufacturer#4Brand#41SMALL BRUSHED BRASS10SM BOX1620.67ccounts run quick
+48427almond antique violet mint lemonManufacturer#4Brand#42PROMO POLISHED STEEL39SM CASE1375.42hely ironic i
+45261almond aquamarine floral ivory bisqueManufacturer#4Brand#42SMALL PLATED STEEL27WRAP CASE1206.26careful
+17927almond aquamarine yellow dodger mintManufacturer#4Brand#41ECONOMY BRUSHED COPPER7SM PKG1844.92ites. eve
+33357almond azure aquamarine papaya violetManufacturer#4Brand#41STANDARD ANODIZED TIN12WRAP CASE1290.35reful
+192697almond antique blue firebrick mintManufacturer#5Brand#52MEDIUM BURNISHED TIN31LG DRUM1789.69ickly ir
+42669almond antique medium spring khakiManufacturer#5Brand#51STANDARD BURNISHED TIN6MED CAN1611.66sits haggl
+155733almond antique sky peru orangeManufacturer#5Brand#53SMALL PLATED BRASS2WRAP DRUM1788.73furiously. bra
+15103almond aquamarine dodger light gainsboroManufacturer#5Brand#53ECONOMY BURNISHED STEEL46LG PACK1018.1packages hinder carefu
+78486almond azure blanched chiffon midnightManufacturer#5Brand#52LARGE BRUSHED BRASS23MED BAG1464.48hely blith

Modified: hive/trunk/ql/if/queryplan.thrift
URL: http://svn.apache.org/viewvc/hive/trunk/ql/if/queryplan.thrift?rev=1463556&r1=1463555&r2=1463556&view=diff
==============================================================================
--- hive/trunk/ql/if/queryplan.thrift (original)
+++ hive/trunk/ql/if/queryplan.thrift Tue Apr  2 14:16:34 2013
@@ -53,6 +53,7 @@ enum OperatorType {
   LATERALVIEWFORWARD,
   HASHTABLESINK,
   HASHTABLEDUMMY,
+  PTF,
 }
 
 struct Operator {

Modified: hive/trunk/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp?rev=1463556&r1=1463555&r2=1463556&view=diff
==============================================================================
--- hive/trunk/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp (original)
+++ hive/trunk/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp Tue Apr  2 14:16:34 2013
@@ -48,7 +48,8 @@ int _kOperatorTypeValues[] = {
   OperatorType::LATERALVIEWJOIN,
   OperatorType::LATERALVIEWFORWARD,
   OperatorType::HASHTABLESINK,
-  OperatorType::HASHTABLEDUMMY
+  OperatorType::HASHTABLEDUMMY,
+  OperatorType::PTF
 };
 const char* _kOperatorTypeNames[] = {
   "JOIN",
@@ -68,9 +69,10 @@ const char* _kOperatorTypeNames[] = {
   "LATERALVIEWJOIN",
   "LATERALVIEWFORWARD",
   "HASHTABLESINK",
-  "HASHTABLEDUMMY"
+  "HASHTABLEDUMMY",
+  "PTF"
 };
-const std::map<int, const char*> _OperatorType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(18, _kOperatorTypeValues, _kOperatorTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL));
+const std::map<int, const char*> _OperatorType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(19, _kOperatorTypeValues, _kOperatorTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL));
 
 int _kTaskTypeValues[] = {
   TaskType::MAP,

Modified: hive/trunk/ql/src/gen/thrift/gen-cpp/queryplan_types.h
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/gen/thrift/gen-cpp/queryplan_types.h?rev=1463556&r1=1463555&r2=1463556&view=diff
==============================================================================
--- hive/trunk/ql/src/gen/thrift/gen-cpp/queryplan_types.h (original)
+++ hive/trunk/ql/src/gen/thrift/gen-cpp/queryplan_types.h Tue Apr  2 14:16:34 2013
@@ -53,7 +53,8 @@ struct OperatorType {
     LATERALVIEWJOIN = 14,
     LATERALVIEWFORWARD = 15,
     HASHTABLESINK = 16,
-    HASHTABLEDUMMY = 17
+    HASHTABLEDUMMY = 17,
+    PTF = 18
   };
 };
 

Modified: hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/Operator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/Operator.java?rev=1463556&r1=1463555&r2=1463556&view=diff
==============================================================================
--- hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/Operator.java (original)
+++ hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/Operator.java Tue Apr  2 14:16:34 2013
@@ -810,7 +810,7 @@ public class Operator implements org.apa
                 for (int _i25 = 0; _i25 < _map24.size; ++_i25)
                 {
                   String _key26; // required
-                  String _val27; // optional
+                  String _val27; // required
                   _key26 = iprot.readString();
                   _val27 = iprot.readString();
                   struct.operatorAttributes.put(_key26, _val27);
@@ -830,7 +830,7 @@ public class Operator implements org.apa
                 for (int _i29 = 0; _i29 < _map28.size; ++_i29)
                 {
                   String _key30; // required
-                  long _val31; // optional
+                  long _val31; // required
                   _key30 = iprot.readString();
                   _val31 = iprot.readI64();
                   struct.operatorCounters.put(_key30, _val31);
@@ -1003,7 +1003,7 @@ public class Operator implements org.apa
           for (int _i37 = 0; _i37 < _map36.size; ++_i37)
           {
             String _key38; // required
-            String _val39; // optional
+            String _val39; // required
             _key38 = iprot.readString();
             _val39 = iprot.readString();
             struct.operatorAttributes.put(_key38, _val39);
@@ -1018,7 +1018,7 @@ public class Operator implements org.apa
           for (int _i41 = 0; _i41 < _map40.size; ++_i41)
           {
             String _key42; // required
-            long _val43; // optional
+            long _val43; // required
             _key42 = iprot.readString();
             _val43 = iprot.readI64();
             struct.operatorCounters.put(_key42, _val43);

Modified: hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java?rev=1463556&r1=1463555&r2=1463556&view=diff
==============================================================================
--- hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java (original)
+++ hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java Tue Apr  2 14:16:34 2013
@@ -29,7 +29,8 @@ public enum OperatorType implements org.
   LATERALVIEWJOIN(14),
   LATERALVIEWFORWARD(15),
   HASHTABLESINK(16),
-  HASHTABLEDUMMY(17);
+  HASHTABLEDUMMY(17),
+  PTF(18);
 
   private final int value;
 
@@ -86,6 +87,8 @@ public enum OperatorType implements org.
         return HASHTABLESINK;
       case 17:
         return HASHTABLEDUMMY;
+      case 18:
+        return PTF;
       default:
         return null;
     }

Modified: hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/Query.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/Query.java?rev=1463556&r1=1463555&r2=1463556&view=diff
==============================================================================
--- hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/Query.java (original)
+++ hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/Query.java Tue Apr  2 14:16:34 2013
@@ -983,7 +983,7 @@ public class Query implements org.apache
                 for (int _i101 = 0; _i101 < _map100.size; ++_i101)
                 {
                   String _key102; // required
-                  String _val103; // optional
+                  String _val103; // required
                   _key102 = iprot.readString();
                   _val103 = iprot.readString();
                   struct.queryAttributes.put(_key102, _val103);
@@ -1003,7 +1003,7 @@ public class Query implements org.apache
                 for (int _i105 = 0; _i105 < _map104.size; ++_i105)
                 {
                   String _key106; // required
-                  long _val107; // optional
+                  long _val107; // required
                   _key106 = iprot.readString();
                   _val107 = iprot.readI64();
                   struct.queryCounters.put(_key106, _val107);
@@ -1239,7 +1239,7 @@ public class Query implements org.apache
           for (int _i118 = 0; _i118 < _map117.size; ++_i118)
           {
             String _key119; // required
-            String _val120; // optional
+            String _val120; // required
             _key119 = iprot.readString();
             _val120 = iprot.readString();
             struct.queryAttributes.put(_key119, _val120);
@@ -1254,7 +1254,7 @@ public class Query implements org.apache
           for (int _i122 = 0; _i122 < _map121.size; ++_i122)
           {
             String _key123; // required
-            long _val124; // optional
+            long _val124; // required
             _key123 = iprot.readString();
             _val124 = iprot.readI64();
             struct.queryCounters.put(_key123, _val124);

Modified: hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/Stage.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/Stage.java?rev=1463556&r1=1463555&r2=1463556&view=diff
==============================================================================
--- hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/Stage.java (original)
+++ hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/Stage.java Tue Apr  2 14:16:34 2013
@@ -911,7 +911,7 @@ public class Stage implements org.apache
                 for (int _i73 = 0; _i73 < _map72.size; ++_i73)
                 {
                   String _key74; // required
-                  String _val75; // optional
+                  String _val75; // required
                   _key74 = iprot.readString();
                   _val75 = iprot.readString();
                   struct.stageAttributes.put(_key74, _val75);
@@ -931,7 +931,7 @@ public class Stage implements org.apache
                 for (int _i77 = 0; _i77 < _map76.size; ++_i77)
                 {
                   String _key78; // required
-                  long _val79; // optional
+                  long _val79; // required
                   _key78 = iprot.readString();
                   _val79 = iprot.readI64();
                   struct.stageCounters.put(_key78, _val79);
@@ -1147,7 +1147,7 @@ public class Stage implements org.apache
           for (int _i90 = 0; _i90 < _map89.size; ++_i90)
           {
             String _key91; // required
-            String _val92; // optional
+            String _val92; // required
             _key91 = iprot.readString();
             _val92 = iprot.readString();
             struct.stageAttributes.put(_key91, _val92);
@@ -1162,7 +1162,7 @@ public class Stage implements org.apache
           for (int _i94 = 0; _i94 < _map93.size; ++_i94)
           {
             String _key95; // required
-            long _val96; // optional
+            long _val96; // required
             _key95 = iprot.readString();
             _val96 = iprot.readI64();
             struct.stageCounters.put(_key95, _val96);

Modified: hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/Task.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/Task.java?rev=1463556&r1=1463555&r2=1463556&view=diff
==============================================================================
--- hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/Task.java (original)
+++ hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/Task.java Tue Apr  2 14:16:34 2013
@@ -996,7 +996,7 @@ public class Task implements org.apache.
                 for (int _i45 = 0; _i45 < _map44.size; ++_i45)
                 {
                   String _key46; // required
-                  String _val47; // optional
+                  String _val47; // required
                   _key46 = iprot.readString();
                   _val47 = iprot.readString();
                   struct.taskAttributes.put(_key46, _val47);
@@ -1016,7 +1016,7 @@ public class Task implements org.apache.
                 for (int _i49 = 0; _i49 < _map48.size; ++_i49)
                 {
                   String _key50; // required
-                  long _val51; // optional
+                  long _val51; // required
                   _key50 = iprot.readString();
                   _val51 = iprot.readI64();
                   struct.taskCounters.put(_key50, _val51);
@@ -1256,7 +1256,7 @@ public class Task implements org.apache.
           for (int _i62 = 0; _i62 < _map61.size; ++_i62)
           {
             String _key63; // required
-            String _val64; // optional
+            String _val64; // required
             _key63 = iprot.readString();
             _val64 = iprot.readString();
             struct.taskAttributes.put(_key63, _val64);
@@ -1271,7 +1271,7 @@ public class Task implements org.apache.
           for (int _i66 = 0; _i66 < _map65.size; ++_i66)
           {
             String _key67; // required
-            long _val68; // optional
+            long _val68; // required
             _key67 = iprot.readString();
             _val68 = iprot.readI64();
             struct.taskCounters.put(_key67, _val68);

Modified: hive/trunk/ql/src/gen/thrift/gen-php/Types.php
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/gen/thrift/gen-php/Types.php?rev=1463556&r1=1463555&r2=1463556&view=diff
==============================================================================
--- hive/trunk/ql/src/gen/thrift/gen-php/Types.php (original)
+++ hive/trunk/ql/src/gen/thrift/gen-php/Types.php Tue Apr  2 14:16:34 2013
@@ -53,6 +53,7 @@ final class OperatorType {
   const LATERALVIEWFORWARD = 15;
   const HASHTABLESINK = 16;
   const HASHTABLEDUMMY = 17;
+  const PTF = 18;
   static public $__names = array(
     0 => 'JOIN',
     1 => 'MAPJOIN',
@@ -72,6 +73,7 @@ final class OperatorType {
     15 => 'LATERALVIEWFORWARD',
     16 => 'HASHTABLESINK',
     17 => 'HASHTABLEDUMMY',
+    18 => 'PTF',
   );
 }
 

Modified: hive/trunk/ql/src/gen/thrift/gen-py/queryplan/ttypes.py
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/gen/thrift/gen-py/queryplan/ttypes.py?rev=1463556&r1=1463555&r2=1463556&view=diff
==============================================================================
--- hive/trunk/ql/src/gen/thrift/gen-py/queryplan/ttypes.py (original)
+++ hive/trunk/ql/src/gen/thrift/gen-py/queryplan/ttypes.py Tue Apr  2 14:16:34 2013
@@ -63,6 +63,7 @@ class OperatorType:
   LATERALVIEWFORWARD = 15
   HASHTABLESINK = 16
   HASHTABLEDUMMY = 17
+  PTF = 18
 
   _VALUES_TO_NAMES = {
     0: "JOIN",
@@ -83,6 +84,7 @@ class OperatorType:
     15: "LATERALVIEWFORWARD",
     16: "HASHTABLESINK",
     17: "HASHTABLEDUMMY",
+    18: "PTF",
   }
 
   _NAMES_TO_VALUES = {
@@ -104,6 +106,7 @@ class OperatorType:
     "LATERALVIEWFORWARD": 15,
     "HASHTABLESINK": 16,
     "HASHTABLEDUMMY": 17,
+    "PTF": 18,
   }
 
 class TaskType:

Modified: hive/trunk/ql/src/gen/thrift/gen-rb/queryplan_types.rb
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/gen/thrift/gen-rb/queryplan_types.rb?rev=1463556&r1=1463555&r2=1463556&view=diff
==============================================================================
--- hive/trunk/ql/src/gen/thrift/gen-rb/queryplan_types.rb (original)
+++ hive/trunk/ql/src/gen/thrift/gen-rb/queryplan_types.rb Tue Apr  2 14:16:34 2013
@@ -39,8 +39,9 @@ module OperatorType
   LATERALVIEWFORWARD = 15
   HASHTABLESINK = 16
   HASHTABLEDUMMY = 17
-  VALUE_MAP = {0 => "JOIN", 1 => "MAPJOIN", 2 => "EXTRACT", 3 => "FILTER", 4 => "FORWARD", 5 => "GROUPBY", 6 => "LIMIT", 7 => "SCRIPT", 8 => "SELECT", 9 => "TABLESCAN", 10 => "FILESINK", 11 => "REDUCESINK", 12 => "UNION", 13 => "UDTF", 14 => "LATERALVIEWJOIN", 15 => "LATERALVIEWFORWARD", 16 => "HASHTABLESINK", 17 => "HASHTABLEDUMMY"}
-  VALID_VALUES = Set.new([JOIN, MAPJOIN, EXTRACT, FILTER, FORWARD, GROUPBY, LIMIT, SCRIPT, SELECT, TABLESCAN, FILESINK, REDUCESINK, UNION, UDTF, LATERALVIEWJOIN, LATERALVIEWFORWARD, HASHTABLESINK, HASHTABLEDUMMY]).freeze
+  PTF = 18
+  VALUE_MAP = {0 => "JOIN", 1 => "MAPJOIN", 2 => "EXTRACT", 3 => "FILTER", 4 => "FORWARD", 5 => "GROUPBY", 6 => "LIMIT", 7 => "SCRIPT", 8 => "SELECT", 9 => "TABLESCAN", 10 => "FILESINK", 11 => "REDUCESINK", 12 => "UNION", 13 => "UDTF", 14 => "LATERALVIEWJOIN", 15 => "LATERALVIEWFORWARD", 16 => "HASHTABLESINK", 17 => "HASHTABLEDUMMY", 18 => "PTF"}
+  VALID_VALUES = Set.new([JOIN, MAPJOIN, EXTRACT, FILTER, FORWARD, GROUPBY, LIMIT, SCRIPT, SELECT, TABLESCAN, FILESINK, REDUCESINK, UNION, UDTF, LATERALVIEWJOIN, LATERALVIEWFORWARD, HASHTABLESINK, HASHTABLEDUMMY, PTF]).freeze
 end
 
 module TaskType

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/QueryProperties.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/QueryProperties.java?rev=1463556&r1=1463555&r2=1463556&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/QueryProperties.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/QueryProperties.java Tue Apr  2 14:16:34 2013
@@ -37,6 +37,8 @@ public class QueryProperties {
   boolean hasOrderBy = false;
   boolean hasSortBy = false;
   boolean hasJoinFollowedByGroupBy = false;
+  boolean hasPTF = false;
+  boolean hasWindowing = false;
 
   // does the query have a using clause
   boolean usesScript = false;
@@ -109,6 +111,22 @@ public class QueryProperties {
     this.hasClusterBy = hasClusterBy;
   }
 
+  public boolean hasPTF() {
+    return hasPTF;
+  }
+
+  public void setHasPTF(boolean hasPTF) {
+    this.hasPTF = hasPTF;
+  }
+
+  public boolean hasWindowing() {
+    return hasWindowing;
+  }
+
+  public void setHasWindowing(boolean hasWindowing) {
+    this.hasWindowing = hasWindowing;
+  }
+
   public boolean isMapJoinRemoved() {
     return mapJoinRemoved;
   }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java?rev=1463556&r1=1463555&r2=1463556&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java Tue Apr  2 14:16:34 2013
@@ -135,15 +135,25 @@ import org.apache.hadoop.hive.ql.udf.gen
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCount;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCovariance;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCovarianceSample;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCumeDist;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFDenseRank;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEWAHBitmap;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFFirstValue;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFHistogramNumeric;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFLastValue;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMin;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFNTile;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFPercentRank;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFPercentileApprox;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFRank;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFLead;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFLag;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver2;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFRowNumber;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStd;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStdSample;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFSum;
@@ -172,6 +182,8 @@ import org.apache.hadoop.hive.ql.udf.gen
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFInFile;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFIndex;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFInstr;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFLeadLag.GenericUDFLag;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFLeadLag.GenericUDFLead;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFLocate;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFMap;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFMapKeys;
@@ -201,8 +213,8 @@ import org.apache.hadoop.hive.ql.udf.gen
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFStruct;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFTimestamp;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToBinary;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToUnixTimeStamp;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToDecimal;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToUnixTimeStamp;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToUtcTimestamp;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFTranslate;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFUnion;
@@ -215,6 +227,11 @@ import org.apache.hadoop.hive.ql.udf.gen
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDTFParseUrlTuple;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDTFStack;
 import org.apache.hadoop.hive.ql.udf.generic.SimpleGenericUDAFParameterInfo;
+import org.apache.hadoop.hive.ql.udf.ptf.NPath.NPathResolver;
+import org.apache.hadoop.hive.ql.udf.ptf.Noop.NoopResolver;
+import org.apache.hadoop.hive.ql.udf.ptf.NoopWithMap.NoopWithMapResolver;
+import org.apache.hadoop.hive.ql.udf.ptf.TableFunctionResolver;
+import org.apache.hadoop.hive.ql.udf.ptf.WindowingTableFunction.WindowingTableFunctionResolver;
 import org.apache.hadoop.hive.ql.udf.xml.GenericUDFXPath;
 import org.apache.hadoop.hive.ql.udf.xml.UDFXPathBoolean;
 import org.apache.hadoop.hive.ql.udf.xml.UDFXPathDouble;
@@ -237,6 +254,7 @@ import org.w3c.dom.Document;
 import org.w3c.dom.Element;
 import org.w3c.dom.NodeList;
 
+
 /**
  * FunctionRegistry.
  */
@@ -248,6 +266,26 @@ public final class FunctionRegistry {
    * The mapping from expression function names to expression classes.
    */
   static Map<String, FunctionInfo> mFunctions = Collections.synchronizedMap(new LinkedHashMap<String, FunctionInfo>());
+
+  /*
+   * PTF variables
+   * */
+
+  public static final String LEAD_FUNC_NAME = "lead";
+  public static final String LAG_FUNC_NAME = "lag";
+
+  public static final String WINDOWING_TABLE_FUNCTION = "windowingtablefunction";
+  public static final String NOOP_TABLE_FUNCTION = "noop";
+  public static final String NOOP_MAP_TABLE_FUNCTION = "noopwithmap";
+
+  static Map<String, PTFFunctionInfo> tableFunctions = Collections.synchronizedMap(new LinkedHashMap<String, PTFFunctionInfo>());
+  static Map<String, WindowFunctionInfo> windowFunctions = Collections.synchronizedMap(new LinkedHashMap<String, WindowFunctionInfo>());
+
+  /*
+   * UDAFS that only work when the input rows have an order.
+   */
+  public static final HashSet<String> UDAFS_IMPLY_ORDER = new HashSet<String>();
+
   static {
     registerUDF("concat", UDFConcat.class, false);
     registerUDF("substr", UDFSubstr.class, false);
@@ -482,6 +520,36 @@ public final class FunctionRegistry {
     registerGenericUDTF("json_tuple", GenericUDTFJSONTuple.class);
     registerGenericUDTF("parse_url_tuple", GenericUDTFParseUrlTuple.class);
     registerGenericUDTF("stack", GenericUDTFStack.class);
+
+    //PTF declarations
+    registerGenericUDF(true, LEAD_FUNC_NAME, GenericUDFLead.class);
+    registerGenericUDF(true, LAG_FUNC_NAME, GenericUDFLag.class);
+
+    registerHiveUDAFsAsWindowFunctions();
+    registerWindowFunction("row_number", new GenericUDAFRowNumber());
+    registerWindowFunction("rank", new GenericUDAFRank());
+    registerWindowFunction("dense_rank", new GenericUDAFDenseRank());
+    registerWindowFunction("percent_rank", new GenericUDAFPercentRank());
+    registerWindowFunction("cume_dist", new GenericUDAFCumeDist());
+    registerWindowFunction("ntile", new GenericUDAFNTile());
+    registerWindowFunction("first_value", new GenericUDAFFirstValue());
+    registerWindowFunction("last_value", new GenericUDAFLastValue());
+    registerWindowFunction(LEAD_FUNC_NAME, new GenericUDAFLead(), false);
+    registerWindowFunction(LAG_FUNC_NAME, new GenericUDAFLag(), false);
+
+    UDAFS_IMPLY_ORDER.add("rank");
+    UDAFS_IMPLY_ORDER.add("dense_rank");
+    UDAFS_IMPLY_ORDER.add("percent_rank");
+    UDAFS_IMPLY_ORDER.add("cume_dist");
+    UDAFS_IMPLY_ORDER.add(LEAD_FUNC_NAME);
+    UDAFS_IMPLY_ORDER.add(LAG_FUNC_NAME);
+    UDAFS_IMPLY_ORDER.add("first_value");
+    UDAFS_IMPLY_ORDER.add("last_value");
+
+    registerTableFunction(NOOP_TABLE_FUNCTION, NoopResolver.class);
+    registerTableFunction(NOOP_MAP_TABLE_FUNCTION, NoopWithMapResolver.class);
+    registerTableFunction(WINDOWING_TABLE_FUNCTION,  WindowingTableFunctionResolver.class);
+    registerTableFunction("npath", NPathResolver.class);
   }
 
   public static void registerTemporaryUDF(String functionName,
@@ -795,6 +863,26 @@ public final class FunctionRegistry {
     return udafEvaluator;
   }
 
+  @SuppressWarnings("deprecation")
+  public static GenericUDAFEvaluator getGenericWindowingEvaluator(String name,
+      List<ObjectInspector> argumentOIs, boolean isDistinct,
+      boolean isAllColumns) throws SemanticException {
+
+    WindowFunctionInfo finfo = windowFunctions.get(name.toLowerCase());
+    if (finfo == null) { return null;}
+    if ( !name.toLowerCase().equals(LEAD_FUNC_NAME) &&
+    !name.toLowerCase().equals(LAG_FUNC_NAME) ) {
+    return getGenericUDAFEvaluator(name, argumentOIs, isDistinct, isAllColumns);
+    }
+
+    // this must be lead/lag UDAF
+    ObjectInspector args[] = new ObjectInspector[argumentOIs.size()];
+    GenericUDAFResolver udafResolver = finfo.getfInfo().getGenericUDAFResolver();
+    GenericUDAFParameterInfo paramInfo = new SimpleGenericUDAFParameterInfo(
+    argumentOIs.toArray(args), isDistinct, isAllColumns);
+    return ((GenericUDAFResolver2) udafResolver).getEvaluator(paramInfo);
+  }
+
   /**
    * This method is shared between UDFRegistry and UDAFRegistry. methodName will
    * be "evaluate" for UDFRegistry, and "aggregate"/"evaluate"/"evaluatePartial"
@@ -1033,36 +1121,36 @@ public final class FunctionRegistry {
     }
     if (udfMethods.size() > 1) {
 
-      // if the only difference is numeric types, pick the method 
+      // if the only difference is numeric types, pick the method
       // with the smallest overall numeric type.
       int lowestNumericType = Integer.MAX_VALUE;
       boolean multiple = true;
       Method candidate = null;
       List<TypeInfo> referenceArguments = null;
-      
+
       for (Method m: udfMethods) {
         int maxNumericType = 0;
-        
+
         List<TypeInfo> argumentsAccepted = TypeInfoUtils.getParameterTypeInfos(m, argumentsPassed.size());
-        
+
         if (referenceArguments == null) {
-          // keep the arguments for reference - we want all the non-numeric 
+          // keep the arguments for reference - we want all the non-numeric
           // arguments to be the same
           referenceArguments = argumentsAccepted;
         }
-        
+
         Iterator<TypeInfo> referenceIterator = referenceArguments.iterator();
-        
+
         for (TypeInfo accepted: argumentsAccepted) {
           TypeInfo reference = referenceIterator.next();
-          
+
           if (numericTypes.containsKey(accepted)) {
             // We're looking for the udf with the smallest maximum numeric type.
             int typeValue = numericTypes.get(accepted);
             maxNumericType = typeValue > maxNumericType ? typeValue : maxNumericType;
           } else if (!accepted.equals(reference)) {
             // There are non-numeric arguments that don't match from one UDF to
-            // another. We give up at this point. 
+            // another. We give up at this point.
             throw new AmbiguousMethodException(udfClass, argumentsPassed, mlist);
           }
         }
@@ -1351,4 +1439,93 @@ public final class FunctionRegistry {
   private FunctionRegistry() {
     // prevent instantiation
   }
+
+
+  //---------PTF functions------------
+
+  public static void registerWindowFunction(String name, GenericUDAFResolver wFn)
+  {
+    registerWindowFunction(name, wFn, true);
+  }
+
+  /**
+   * Typically a WindowFunction is the same as a UDAF. The only exceptions are Lead & Lag UDAFs. These
+   * are not registered as regular UDAFs because
+   * - we plan to support Lead & Lag as UDFs (usable only within argument expressions
+   *   of UDAFs when windowing is involved). Since mFunctions holds both UDFs and UDAFs we cannot
+   *   add both FunctionInfos to mFunctions.
+   * We choose to only register UDFs in mFunctions. The implication of this is that Lead/Lag UDAFs
+   * are only usable when windowing is involved.
+   *
+   * @param name
+   * @param wFn
+   * @param registerAsUDAF
+   */
+  public static void registerWindowFunction(String name, GenericUDAFResolver wFn, boolean registerAsUDAF)
+  {
+    FunctionInfo fInfo = null;
+    if (registerAsUDAF) {
+      registerGenericUDAF(true, name, wFn);
+      fInfo = getFunctionInfo(name);
+    }
+    else {
+      fInfo = new FunctionInfo(true,
+          name.toLowerCase(), wFn);
+    }
+
+    WindowFunctionInfo wInfo = new WindowFunctionInfo(fInfo);
+    windowFunctions.put(name.toLowerCase(), wInfo);
+  }
+
+  public static WindowFunctionInfo getWindowFunctionInfo(String name)
+  {
+    return windowFunctions.get(name.toLowerCase());
+  }
+
+  public static boolean impliesOrder(String functionName) {
+    return functionName == null ? false : UDAFS_IMPLY_ORDER.contains(functionName.toLowerCase());
+  }
+
+  static void registerHiveUDAFsAsWindowFunctions()
+  {
+    Set<String> fNames = getFunctionNames();
+    for(String fName : fNames)
+    {
+      FunctionInfo fInfo = getFunctionInfo(fName);
+      if ( fInfo.isGenericUDAF())
+      {
+        WindowFunctionInfo wInfo = new WindowFunctionInfo(fInfo);
+        windowFunctions.put(fName, wInfo);
+      }
+    }
+  }
+
+  public static boolean isTableFunction(String name)
+  {
+    PTFFunctionInfo tFInfo = tableFunctions.get(name.toLowerCase());
+     return tFInfo != null && !tFInfo.isInternal();
+  }
+
+  public static TableFunctionResolver getTableFunctionResolver(String name)
+  {
+    PTFFunctionInfo tfInfo = tableFunctions.get(name.toLowerCase());
+    return (TableFunctionResolver) ReflectionUtils.newInstance(tfInfo.getFunctionResolver(), null);
+  }
+
+  public static TableFunctionResolver getWindowingTableFunction()
+  {
+    return getTableFunctionResolver(WINDOWING_TABLE_FUNCTION);
+  }
+
+  public static TableFunctionResolver getNoopTableFunction()
+  {
+    return getTableFunctionResolver(NOOP_TABLE_FUNCTION);
+  }
+
+  public static void registerTableFunction(String name, Class<? extends TableFunctionResolver> tFnCls)
+  {
+    PTFFunctionInfo tInfo = new PTFFunctionInfo(name, tFnCls);
+    tableFunctions.put(name.toLowerCase(), tInfo);
+  }
+
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java?rev=1463556&r1=1463555&r2=1463556&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java Tue Apr  2 14:16:34 2013
@@ -36,6 +36,7 @@ import org.apache.hadoop.hive.ql.plan.La
 import org.apache.hadoop.hive.ql.plan.LimitDesc;
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.PTFDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
 import org.apache.hadoop.hive.ql.plan.SMBJoinDesc;
 import org.apache.hadoop.hive.ql.plan.ScriptDesc;
@@ -74,6 +75,7 @@ public final class OperatorFactory {
     opvec.add(new OpTuple<FileSinkDesc>(FileSinkDesc.class, FileSinkOperator.class));
     opvec.add(new OpTuple<CollectDesc>(CollectDesc.class, CollectOperator.class));
     opvec.add(new OpTuple<ScriptDesc>(ScriptDesc.class, ScriptOperator.class));
+    opvec.add(new OpTuple<PTFDesc>(PTFDesc.class, PTFOperator.class));
     opvec.add(new OpTuple<ReduceSinkDesc>(ReduceSinkDesc.class, ReduceSinkOperator.class));
     opvec.add(new OpTuple<ExtractDesc>(ExtractDesc.class, ExtractOperator.class));
     opvec.add(new OpTuple<GroupByDesc>(GroupByDesc.class, GroupByOperator.class));

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFFunctionInfo.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFFunctionInfo.java?rev=1463556&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFFunctionInfo.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFFunctionInfo.java Tue Apr  2 14:16:34 2013
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import org.apache.hadoop.hive.ql.exec.PartitionTableFunctionDescription;
+import org.apache.hadoop.hive.ql.udf.ptf.TableFunctionResolver;
+
+class PTFFunctionInfo
+{
+	String displayName;
+	Class<? extends TableFunctionResolver>  functionResolver;
+	boolean isInternal;
+
+	public PTFFunctionInfo(String displayName, Class<? extends TableFunctionResolver> tFnCls)
+	{
+		super();
+		this.displayName = displayName;
+		this.functionResolver = tFnCls;
+		isInternal = false;
+		PartitionTableFunctionDescription def = functionResolver.getAnnotation(PartitionTableFunctionDescription.class);
+		if ( def != null)
+		{
+			isInternal = def.isInternal();
+		}
+	}
+
+	public String getDisplayName()
+	{
+		return displayName;
+	}
+
+	public Class<? extends TableFunctionResolver> getFunctionResolver()
+	{
+		return functionResolver;
+	}
+
+	public boolean isInternal()
+	{
+		return isInternal;
+	}
+
+}

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java?rev=1463556&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java Tue Apr  2 14:16:34 2013
@@ -0,0 +1,432 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Stack;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.PTFPartition.PTFPartitionIterator;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.PTFDesc;
+import org.apache.hadoop.hive.ql.plan.PTFDesc.PTFExpressionDef;
+import org.apache.hadoop.hive.ql.plan.PTFDesc.PTFInputDef;
+import org.apache.hadoop.hive.ql.plan.PTFDesc.PartitionDef;
+import org.apache.hadoop.hive.ql.plan.PTFDesc.PartitionedTableFunctionDef;
+import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowExpressionDef;
+import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowTableFunctionDef;
+import org.apache.hadoop.hive.ql.plan.PTFDeserializer;
+import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFLeadLag;
+import org.apache.hadoop.hive.ql.udf.ptf.TableFunctionEvaluator;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+
+public class PTFOperator extends Operator<PTFDesc> implements Serializable
+{
+
+	private static final long serialVersionUID = 1L;
+	PTFPartition inputPart;
+	boolean isMapOperator;
+
+	transient KeyWrapperFactory keyWrapperFactory;
+	protected transient KeyWrapper currentKeys;
+	protected transient KeyWrapper newKeys;
+	transient HiveConf hiveConf;
+
+
+	/*
+	 * 1. Find out if the operator is invoked at Map-Side or Reduce-side
+	 * 2. Get the deserialized QueryDef
+	 * 3. Reconstruct the transient variables in QueryDef
+	 * 4. Create input partition to store rows coming from previous operator
+	 */
+	@Override
+	protected void initializeOp(Configuration jobConf) throws HiveException
+	{
+		hiveConf = new HiveConf(jobConf, PTFOperator.class);
+		// if the parent is ExtractOperator, this invocation is from reduce-side
+		Operator<? extends OperatorDesc> parentOp = getParentOperators().get(0);
+		if (parentOp instanceof ExtractOperator)
+		{
+			isMapOperator = false;
+		}
+		else
+		{
+			isMapOperator = true;
+		}
+
+		reconstructQueryDef(hiveConf);
+    inputPart = createFirstPartitionForChain(
+        inputObjInspectors[0], hiveConf, isMapOperator);
+
+		if (isMapOperator)
+		{
+			PartitionedTableFunctionDef tDef = conf.getStartOfChain();
+			outputObjInspector = tDef.getRawInputShape().getOI();
+		}
+		else
+		{
+			outputObjInspector = conf.getFuncDef().getOutputShape().getOI();
+		}
+
+		setupKeysWrapper(inputObjInspectors[0]);
+
+		super.initializeOp(jobConf);
+	}
+
+	@Override
+	protected void closeOp(boolean abort) throws HiveException
+	{
+		super.closeOp(abort);
+    if(inputPart.size() != 0){
+      if (isMapOperator)
+      {
+        processMapFunction();
+      }
+      else
+      {
+        processInputPartition();
+      }
+    }
+	}
+
+	@Override
+	public void processOp(Object row, int tag) throws HiveException
+	{
+	  if (!isMapOperator )
+    {
+      /*
+       * checkif current row belongs to the current accumulated Partition:
+       * - If not:
+       *  - process the current Partition
+       *  - reset input Partition
+       * - set currentKey to the newKey if it is null or has changed.
+       */
+      newKeys.getNewKey(row, inputPart.getOI());
+      boolean keysAreEqual = (currentKeys != null && newKeys != null)?
+              newKeys.equals(currentKeys) : false;
+
+      if (currentKeys != null && !keysAreEqual)
+      {
+        processInputPartition();
+        inputPart.reset();
+      }
+
+      if (currentKeys == null || !keysAreEqual)
+      {
+        if (currentKeys == null)
+        {
+          currentKeys = newKeys.copyKey();
+        }
+        else
+        {
+          currentKeys.copyKey(newKeys);
+        }
+      }
+    }
+
+    // add row to current Partition.
+    inputPart.append(row);
+	}
+
+	/**
+	 * Initialize the visitor to use the QueryDefDeserializer Use the order
+	 * defined in QueryDefWalker to visit the QueryDef
+	 *
+	 * @param hiveConf
+	 * @throws HiveException
+	 */
+	protected void reconstructQueryDef(HiveConf hiveConf) throws HiveException
+	{
+
+	  PTFDeserializer dS =
+	      new PTFDeserializer(conf, (StructObjectInspector)inputObjInspectors[0], hiveConf);
+	  dS.initializePTFChain(conf.getFuncDef());
+	}
+
+	protected void setupKeysWrapper(ObjectInspector inputOI) throws HiveException
+	{
+		PartitionDef pDef = conf.getStartOfChain().getPartition();
+		ArrayList<PTFExpressionDef> exprs = pDef.getExpressions();
+		int numExprs = exprs.size();
+		ExprNodeEvaluator[] keyFields = new ExprNodeEvaluator[numExprs];
+		ObjectInspector[] keyOIs = new ObjectInspector[numExprs];
+		ObjectInspector[] currentKeyOIs = new ObjectInspector[numExprs];
+
+		for(int i=0; i<numExprs; i++)
+		{
+		  PTFExpressionDef exprDef = exprs.get(i);
+			/*
+			 * Why cannot we just use the ExprNodeEvaluator on the column?
+			 * - because on the reduce-side it is initialized based on the rowOI of the HiveTable
+			 *   and not the OI of the ExtractOp ( the parent of this Operator on the reduce-side)
+			 */
+			keyFields[i] = ExprNodeEvaluatorFactory.get(exprDef.getExprNode());
+			keyOIs[i] = keyFields[i].initialize(inputOI);
+			currentKeyOIs[i] =
+			    ObjectInspectorUtils.getStandardObjectInspector(keyOIs[i],
+			        ObjectInspectorCopyOption.WRITABLE);
+		}
+
+		keyWrapperFactory = new KeyWrapperFactory(keyFields, keyOIs, currentKeyOIs);
+	  newKeys = keyWrapperFactory.getKeyWrapper();
+	}
+
+	protected void processInputPartition() throws HiveException
+	{
+	  PTFPartition outPart = executeChain(inputPart);
+	  if ( conf.forWindowing() ) {
+	    executeWindowExprs(outPart);
+	  }
+	  else {
+	    PTFPartitionIterator<Object> pItr = outPart.iterator();
+	    while (pItr.hasNext())
+	    {
+	      Object oRow = pItr.next();
+	      forward(oRow, outputObjInspector);
+	    }
+	  }
+	}
+
+	protected void processMapFunction() throws HiveException
+	{
+	  PartitionedTableFunctionDef tDef = conf.getStartOfChain();
+    PTFPartition outPart = tDef.getTFunction().transformRawInput(inputPart);
+    PTFPartitionIterator<Object> pItr = outPart.iterator();
+    while (pItr.hasNext())
+    {
+      Object oRow = pItr.next();
+      forward(oRow, outputObjInspector);
+    }
+	}
+
+	/**
+	 * @return the name of the operator
+	 */
+  @Override
+  public String getName() {
+    return getOperatorName();
+  }
+
+  static public String getOperatorName() {
+    return "PTF";
+  }
+
+
+	@Override
+	public OperatorType getType()
+	{
+		return OperatorType.PTF;
+	}
+
+	 /**
+   * For all the table functions to be applied to the input
+   * hive table or query, push them on a stack.
+   * For each table function popped out of the stack,
+   * execute the function on the input partition
+   * and return an output partition.
+   * @param part
+   * @return
+   * @throws HiveException
+   */
+  private PTFPartition executeChain(PTFPartition part)
+      throws HiveException
+  {
+    Stack<PartitionedTableFunctionDef> fnDefs = new Stack<PartitionedTableFunctionDef>();
+    PTFInputDef iDef = conf.getFuncDef();
+    while (true)
+    {
+      if (iDef instanceof PartitionedTableFunctionDef)
+      {
+        fnDefs.push((PartitionedTableFunctionDef) iDef);
+        iDef = ((PartitionedTableFunctionDef) iDef).getInput();
+      }
+      else
+      {
+        break;
+      }
+    }
+
+    PartitionedTableFunctionDef currFnDef;
+    while (!fnDefs.isEmpty())
+    {
+      currFnDef = fnDefs.pop();
+      part = currFnDef.getTFunction().execute(part);
+    }
+    return part;
+  }
+
+  /**
+   * If WindowingSpec contains any Windowing Expressions or has a
+   * Having condition then these are processed
+   * and forwarded on. Whereas when there is no having or WdwExprs
+   * just forward the rows in the output Partition.
+   *
+   * For e.g. Consider the following query:
+   * <pre>
+   * {@code
+   *  select rank(), lead(rank(),1),...
+   *  from xyz
+   *  ...
+   *  having rank() > 1
+   *  }
+   * </pre>
+   * rank() gets processed as a WdwFn; Its in the oPart(output partition)
+   * argument to executeWindowExprs. Here we first evaluate the having expression.
+   * So the first row of oPart gets filtered out.
+   * Next we evaluate lead(rank()) which is held as a WindowExpression and add it to the output.
+   *
+   * @param ptfDesc
+   * @param oPart output partition after Window Fns are processed.
+   * @param op
+   * @throws HiveException
+   */
+  private void executeWindowExprs(PTFPartition oPart)
+      throws HiveException
+  {
+    WindowTableFunctionDef wTFnDef = (WindowTableFunctionDef) conf.getFuncDef();
+    /*
+     * inputOI represents the row with WindowFn results present.
+     * So in the e.g. above it will have a column for 'rank()'
+     */
+    StructObjectInspector inputOI = wTFnDef.getOutputFromWdwFnProcessing().getOI();
+    /*
+     * outputOI represents the final row with the Windowing Expressions added.
+     * So in the e.g. above it will have a column for 'lead(rank())' in addition to
+     * all columns in inputOI.
+     */
+    StructObjectInspector outputOI = wTFnDef.getOutputShape().getOI();
+    int numCols = outputOI.getAllStructFieldRefs().size();
+    ArrayList<WindowExpressionDef> wdwExprs = wTFnDef.getWindowExpressions();
+    int numWdwExprs = wdwExprs == null ? 0 : wdwExprs.size();
+    Object[] output = new Object[numCols];
+
+    /*
+     * If this Windowing invocation has no Window Expressions and doesn't need to be filtered,
+     * we can just forward the row in the oPart partition.
+     */
+    boolean forwardRowsUntouched = (wdwExprs == null || wdwExprs.size() == 0 );
+
+    PTFPartitionIterator<Object> pItr = oPart.iterator();
+    PTFOperator.connectLeadLagFunctionsToPartition(conf, pItr);
+    while (pItr.hasNext())
+    {
+      int colCnt = 0;
+      Object oRow = pItr.next();
+
+      /*
+       * when there is no Windowing expressions or having;
+       * just forward the Object coming out of the Partition.
+       */
+      if ( forwardRowsUntouched ) {
+        forward(oRow, outputObjInspector);
+        continue;
+      }
+
+      /*
+       * Setup the output row columns in the following order
+       * - the columns in the SelectList processed by the PTF
+       * (ie the Select Exprs that have navigation expressions)
+       * - the columns from the final PTF.
+       */
+
+      if ( wdwExprs != null ) {
+        for (WindowExpressionDef wdwExpr : wdwExprs)
+        {
+          Object newCol = wdwExpr.getExprEvaluator().evaluate(oRow);
+          output[colCnt++] = newCol;
+        }
+      }
+
+      for(; colCnt < numCols; ) {
+        StructField field = inputOI.getAllStructFieldRefs().get(colCnt - numWdwExprs);
+        output[colCnt++] =
+            ObjectInspectorUtils.copyToStandardObject(inputOI.getStructFieldData(oRow, field),
+            field.getFieldObjectInspector());
+      }
+
+      forward(output, outputObjInspector);
+    }
+  }
+
+  /**
+   * Create a new Partition.
+   * A partition has 2 OIs: the OI for the rows being put in and the OI for the rows
+   * coming out. You specify the output OI by giving the Serde to use to Serialize.
+   * Typically these 2 OIs are the same; but not always. For the
+   * first PTF in a chain the OI of the incoming rows is dictated by the Parent Op
+   * to this PTFOp. The output OI from the Partition is typically LazyBinaryStruct, but
+   * not always. In the case of Noop/NoopMap we keep the Strcuture the same as
+   * what is given to us.
+   * <p>
+   * The Partition we want to create here is for feeding the First table function in the chain.
+   * So for map-side processing use the Serde from the output Shape its InputDef.
+   * For reduce-side processing use the Serde from its RawInputShape(the shape
+   * after map-side processing).
+   * @param oi
+   * @param hiveConf
+   * @param isMapSide
+   * @return
+   * @throws HiveException
+   */
+  public PTFPartition createFirstPartitionForChain(ObjectInspector oi,
+      HiveConf hiveConf, boolean isMapSide) throws HiveException
+  {
+    PartitionedTableFunctionDef tabDef = conf.getStartOfChain();
+    TableFunctionEvaluator tEval = tabDef.getTFunction();
+    String partClassName = tEval.getPartitionClass();
+    int partMemSize = tEval.getPartitionMemSize();
+
+    PTFPartition part = null;
+    SerDe serde = isMapSide ? tabDef.getInput().getOutputShape().getSerde() :
+      tabDef.getRawInputShape().getSerde();
+    part = new PTFPartition(partClassName, partMemSize, serde,
+        (StructObjectInspector) oi);
+    return part;
+
+  }
+
+  public static void connectLeadLagFunctionsToPartition(PTFDesc ptfDesc,
+      PTFPartitionIterator<Object> pItr) throws HiveException
+  {
+    List<ExprNodeGenericFuncDesc> llFnDescs = ptfDesc.getLlInfo().getLeadLagExprs();
+    if (llFnDescs == null) {
+      return;
+    }
+    for (ExprNodeGenericFuncDesc llFnDesc : llFnDescs)
+    {
+      GenericUDFLeadLag llFn = (GenericUDFLeadLag) llFnDesc
+          .getGenericUDF();
+      llFn.setpItr(pItr);
+    }
+  }
+
+
+
+}

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPartition.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPartition.java?rev=1463556&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPartition.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPartition.java Tue Apr  2 14:16:34 2013
@@ -0,0 +1,285 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.util.ConcurrentModificationException;
+import java.util.Iterator;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.ql.exec.PTFPersistence.ByteBasedList;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.Writable;
+
+/*
+ * represents a collection of rows that is acted upon by a TableFunction or a WindowFunction.
+ */
+public class PTFPartition
+{
+  SerDe serDe;
+  StructObjectInspector OI;
+  private ByteBasedList elems;
+  private Writable wRow;
+  private int sz;
+
+  public PTFPartition(HiveConf cfg, SerDe serDe, StructObjectInspector oI) throws HiveException
+  {
+    String partitionClass = HiveConf.getVar(cfg, ConfVars.HIVE_PTF_PARTITION_PERSISTENCE_CLASS);
+    int partitionMemSize = HiveConf.getIntVar(cfg, ConfVars.HIVE_PTF_PARTITION_PERSISTENT_SIZE);
+    init(partitionClass, partitionMemSize, serDe, oI);
+  }
+
+  public PTFPartition(String partitionClass, int partitionMemSize, SerDe serDe, StructObjectInspector oI) throws HiveException
+  {
+    init(partitionClass, partitionMemSize, serDe, oI);
+  }
+
+  private void init(String partitionClass, int partitionMemSize, SerDe serDe, StructObjectInspector oI) throws HiveException
+  {
+    this.serDe = serDe;
+    OI = oI;
+    elems = PTFPersistence.createList(partitionClass, partitionMemSize);
+    sz = 0;
+    wRow = createWritable();
+  }
+
+  public void reset() throws HiveException {
+    sz = 0;
+    elems.reset(0);
+  }
+
+  public SerDe getSerDe()
+  {
+    return serDe;
+  }
+  public void setSerDe(SerDe serDe)
+  {
+    this.serDe = serDe;
+  }
+  public StructObjectInspector getOI()
+  {
+    return OI;
+  }
+  public void setOI(StructObjectInspector oI)
+  {
+    OI = oI;
+  }
+
+  private Writable createWritable() throws HiveException
+  {
+    try
+    {
+      return serDe.getSerializedClass().newInstance();
+    }
+    catch(Throwable t)
+    {
+      throw new HiveException(t);
+    }
+  }
+
+  public Object getAt(int i) throws HiveException
+  {
+    try
+    {
+      elems.get(i, wRow);
+      Object o = serDe.deserialize(wRow);
+      return o;
+    }
+    catch(SerDeException  se)
+    {
+      throw new HiveException(se);
+    }
+  }
+
+  public Object getWritableAt(int i) throws HiveException
+  {
+    elems.get(i, wRow);
+    return wRow;
+  }
+
+  public void append(Writable o) throws HiveException
+  {
+    elems.append(o);
+    sz++;
+  }
+
+  public void append(Object o) throws HiveException
+  {
+    try
+    {
+      append(serDe.serialize(o, OI));
+    }
+    catch(SerDeException e)
+    {
+      throw new HiveException(e);
+    }
+  }
+
+  public int size()
+  {
+    return sz;
+  }
+
+  public PTFPartitionIterator<Object> iterator()
+  {
+    return new PItr(0, size());
+  }
+
+  public PTFPartitionIterator<Object> range(int start, int end)
+  {
+    assert(start >= 0);
+    assert(end <= size());
+    assert(start <= end);
+    return new PItr(start, end);
+  }
+
+  class PItr implements PTFPartitionIterator<Object>
+  {
+    int idx;
+    final int start;
+    final int end;
+    final int createTimeSz;
+
+    PItr(int start, int end)
+    {
+      this.idx = start;
+      this.start = start;
+      this.end = end;
+      createTimeSz = PTFPartition.this.size();
+    }
+
+    public boolean hasNext()
+    {
+      checkForComodification() ;
+      return idx < end;
+    }
+
+    public Object next()
+    {
+      checkForComodification();
+      try
+      {
+        return PTFPartition.this.getAt(idx++);
+      }
+      catch(HiveException e)
+      {
+        throw new RuntimeException(e);
+      }
+    }
+
+    public void remove()
+    {
+      throw new UnsupportedOperationException();
+    }
+
+    final void checkForComodification()
+    {
+        if (createTimeSz != PTFPartition.this.size()) {
+          throw new ConcurrentModificationException();
+        }
+    }
+
+    @Override
+    public int getIndex()
+    {
+      return idx;
+    }
+
+    private Object getAt(int i)
+    {
+      try
+      {
+        return PTFPartition.this.getAt(i);
+      }
+      catch(HiveException e)
+      {
+        throw new RuntimeException(e);
+      }
+    }
+
+    @Override
+    public Object lead(int amt)
+    {
+      int i = idx + amt;
+      i = i >= end ? end - 1 : i;
+      return getAt(i);
+    }
+
+    @Override
+    public Object lag(int amt)
+    {
+      int i = idx - amt;
+      i = i < start ? start : i;
+      return getAt(i);
+    }
+
+    @Override
+    public Object resetToIndex(int idx)
+    {
+      if ( idx < start || idx >= end )
+      {
+        return null;
+      }
+      Object o = getAt(idx);
+      this.idx = idx + 1;
+      return o;
+    }
+
+    @Override
+    public PTFPartition getPartition()
+    {
+      return PTFPartition.this;
+    }
+
+    @Override
+    public void reset()
+    {
+      idx = start;
+    }
+  };
+
+  /*
+   * provide an Iterator on the rows in a Partiton.
+   * Iterator exposes the index of the next location.
+   * Client can invoke lead/lag relative to the next location.
+   */
+  public static interface PTFPartitionIterator<T> extends Iterator<T>
+  {
+    int getIndex();
+
+    T lead(int amt);
+
+    T lag(int amt);
+
+    /*
+     * after a lead and lag call, allow Object associated with SerDe and writable associated with partition to be reset
+     * to the value for the current Index.
+     */
+    Object resetToIndex(int idx);
+
+    PTFPartition getPartition();
+
+    void reset();
+  }
+
+
+}