You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/04/02 16:16:37 UTC

svn commit: r1463556 [8/15] - in /hive/trunk: common/src/java/org/apache/hadoop/hive/conf/ data/files/ ql/if/ ql/src/gen/thrift/gen-cpp/ ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/ ql/src/gen/thrift/gen-php/ ql/src/gen/thrift/gen...

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFLeadLag.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFLeadLag.java?rev=1463556&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFLeadLag.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFLeadLag.java Tue Apr  2 14:16:34 2013
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.udf.generic;
+
+import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator;
+import org.apache.hadoop.hive.ql.exec.PTFPartition.PTFPartitionIterator;
+import org.apache.hadoop.hive.ql.exec.PTFUtils;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
+
+public abstract class GenericUDFLeadLag extends GenericUDF
+{
+	transient ExprNodeEvaluator exprEvaluator;
+	transient PTFPartitionIterator<Object> pItr;
+	ObjectInspector firstArgOI;
+
+	private PrimitiveObjectInspector amtOI;
+
+	static{
+		PTFUtils.makeTransient(GenericUDFLeadLag.class, "exprEvaluator");
+		PTFUtils.makeTransient(GenericUDFLeadLag.class, "pItr");
+	}
+
+	@Override
+	public Object evaluate(DeferredObject[] arguments) throws HiveException
+	{
+		DeferredObject amt = arguments[1];
+		int intAmt = 0;
+		try
+		{
+			intAmt = PrimitiveObjectInspectorUtils.getInt(amt.get(), amtOI);
+		}
+		catch (NullPointerException e)
+		{
+			intAmt = Integer.MAX_VALUE;
+		}
+		catch (NumberFormatException e)
+		{
+			intAmt = Integer.MAX_VALUE;
+		}
+
+		int idx = pItr.getIndex() - 1;
+		try
+		{
+			Object row = getRow(intAmt);
+			Object ret = exprEvaluator.evaluate(row);
+			ret = ObjectInspectorUtils.copyToStandardObject(ret, firstArgOI, ObjectInspectorCopyOption.WRITABLE);
+			return ret;
+		}
+		finally
+		{
+			Object currRow = pItr.resetToIndex(idx);
+			// reevaluate expression on current Row, to trigger the Lazy object
+			// caches to be reset to the current row.
+			exprEvaluator.evaluate(currRow);
+		}
+
+	}
+
+	@Override
+	public ObjectInspector initialize(ObjectInspector[] arguments)
+			throws UDFArgumentException
+	{
+		// index has to be a primitive
+		if (arguments[1] instanceof PrimitiveObjectInspector)
+		{
+			amtOI = (PrimitiveObjectInspector) arguments[1];
+		}
+		else
+		{
+			throw new UDFArgumentTypeException(1,
+					"Primitive Type is expected but "
+							+ arguments[1].getTypeName() + "\" is found");
+		}
+
+		firstArgOI = arguments[0];
+		return ObjectInspectorUtils.getStandardObjectInspector(firstArgOI,
+				ObjectInspectorCopyOption.WRITABLE);
+	}
+
+
+
+	public ExprNodeEvaluator getExprEvaluator()
+	{
+		return exprEvaluator;
+	}
+
+	public void setExprEvaluator(ExprNodeEvaluator exprEvaluator)
+	{
+		this.exprEvaluator = exprEvaluator;
+	}
+
+	public PTFPartitionIterator<Object> getpItr()
+	{
+		return pItr;
+	}
+
+	public void setpItr(PTFPartitionIterator<Object> pItr)
+	{
+		this.pItr = pItr;
+	}
+
+	@Override
+	public String getDisplayString(String[] children)
+	{
+		assert (children.length == 2);
+		StringBuilder sb = new StringBuilder();
+		sb.append(_getFnName());
+		sb.append("(");
+		sb.append(children[0]);
+		sb.append(", ");
+		sb.append(children[1]);
+		sb.append(")");
+		return sb.toString();
+	}
+
+	protected abstract String _getFnName();
+
+	protected abstract Object getRow(int amt);
+
+	public static class GenericUDFLead extends GenericUDFLeadLag
+	{
+
+		@Override
+		protected String _getFnName()
+		{
+			return "lead";
+		}
+
+		@Override
+		protected Object getRow(int amt)
+		{
+			return pItr.lead(amt - 1);
+		}
+
+	}
+
+	public static class GenericUDFLag extends GenericUDFLeadLag
+	{
+		@Override
+		protected String _getFnName()
+		{
+			return "lag";
+		}
+
+		@Override
+		protected Object getRow(int amt)
+		{
+			return pItr.lag(amt + 1);
+		}
+
+	}
+
+}
+

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/NPath.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/NPath.java?rev=1463556&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/NPath.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/NPath.java Tue Apr  2 14:16:34 2013
@@ -0,0 +1,936 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.udf.ptf;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator;
+import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluatorFactory;
+import org.apache.hadoop.hive.ql.exec.PTFPartition;
+import org.apache.hadoop.hive.ql.exec.PTFPartition.PTFPartitionIterator;
+import org.apache.hadoop.hive.ql.exec.PTFUtils;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.ASTNode;
+import org.apache.hadoop.hive.ql.parse.PTFTranslator;
+import org.apache.hadoop.hive.ql.parse.RowResolver;
+import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.TypeCheckCtx;
+import org.apache.hadoop.hive.ql.parse.TypeCheckProcFactory;
+import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowExpressionSpec;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.PTFDesc;
+import org.apache.hadoop.hive.ql.plan.PTFDesc.PTFExpressionDef;
+import org.apache.hadoop.hive.ql.plan.PTFDesc.PTFInputDef;
+import org.apache.hadoop.hive.ql.plan.PTFDesc.PartitionedTableFunctionDef;
+import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StandardListObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+
+/**
+ * return rows that meet a specified pattern. Use symbols to specify a list of expressions
+ * to match.
+ * Pattern is used to specify a Path. The results list can contain expressions based on
+ * the input columns and also the matched Path.
+ * <ol>
+ * <li><b>pattern:</b> pattern for the Path. Path is 'dot' separated list of symbols.
+ * Each element is treated as a symbol. Elements that end in '*' or '+' are interpreted with
+ * the usual meaning of zero or more, one or more respectively. For e.g.
+ * "LATE.EARLY*.ONTIMEOREARLY" implies a sequence of flights
+ * where the first occurrence was LATE, followed by zero or more EARLY flights,
+ * followed by a ONTIME or EARLY flight.
+ * <li><b>symbols</b> specify a list of name, expression pairs. For e.g.
+ * 'LATE', arrival_delay > 0, 'EARLY', arrival_delay < 0 , 'ONTIME', arrival_delay == 0.
+ * These symbols can be used in the Pattern defined above.
+ * <li><b>resultSelectList</b> specified as a select list.
+ * The expressions in the selectList are evaluated in the context where all the
+ * input columns are available, plus the attribute
+ * "tpath" is available. Path is a collection of rows that represents the matching Path.
+ * </ol>
+ */
+public class NPath extends TableFunctionEvaluator
+{
+  private transient String patternStr;
+  private transient SymbolsInfo symInfo;
+  private transient String resultExprStr;
+  private transient SymbolFunction syFn;
+  private ResultExprInfo resultExprInfo;
+  /*
+   * the names of the Columns of the input to NPath. Used to setup the tpath Struct column.
+   */
+  private ArrayList<String> inputColumnNames;
+  private ArrayList<String> selectListNames;
+
+  @Override
+  public void execute(PTFPartitionIterator<Object> pItr, PTFPartition outP) throws HiveException
+  {
+    while (pItr.hasNext())
+    {
+      Object iRow = pItr.next();
+
+      SymbolFunctionResult syFnRes = SymbolFunction.match(syFn, iRow, pItr);
+      if (syFnRes.matches )
+      {
+        int sz = syFnRes.nextRow - (pItr.getIndex() - 1);
+        Object selectListInput = NPath.getSelectListInput(iRow,
+            tDef.getInput().getOutputShape().getOI(), pItr, sz);
+        ArrayList<Object> oRow = new ArrayList<Object>();
+        for(ExprNodeEvaluator resExprEval : resultExprInfo.resultExprEvals)
+        {
+          oRow.add(resExprEval.evaluate(selectListInput));
+        }
+        outP.append(oRow);
+      }
+    }
+  }
+
+  static void throwErrorWithSignature(String message) throws SemanticException
+  {
+    throw new SemanticException(String.format(
+        "NPath signature is: SymbolPattern, one or more SymbolName, " +
+        "expression pairs, the result expression as a select list. Error %s",
+        message));
+  }
+
+  public ArrayList<String> getInputColumnNames() {
+    return inputColumnNames;
+  }
+
+  public void setInputColumnNames(ArrayList<String> inputColumnNames) {
+    this.inputColumnNames = inputColumnNames;
+  }
+
+  public ArrayList<String> getSelectListNames() {
+    return selectListNames;
+  }
+
+  public void setSelectListNames(ArrayList<String> selectListNames) {
+    this.selectListNames = selectListNames;
+  }
+
+  public static class NPathResolver extends TableFunctionResolver
+  {
+
+    @Override
+    protected TableFunctionEvaluator createEvaluator(PTFDesc ptfDesc,
+        PartitionedTableFunctionDef tDef)
+    {
+
+      return new NPath();
+    }
+
+    /**
+     * <ul>
+     * <li> check structure of Arguments:
+     * <ol>
+     * <li> First arg should be a String
+     * <li> then there should be an even number of Arguments:
+     * String, expression; expression should be Convertible to Boolean.
+     * <li> finally there should be a String.
+     * </ol>
+     * <li> convert pattern into a NNode chain.
+     * <li> convert symbol args into a Symbol Map.
+     * <li> parse selectList into SelectList struct. The inputOI used to translate
+     * these expressions should be based on the
+     * columns in the Input, the 'path.attr'
+     * </ul>
+     */
+    @Override
+    public void setupOutputOI() throws SemanticException
+    {
+      NPath evaluator = (NPath) getEvaluator();
+      PartitionedTableFunctionDef tDef = evaluator.getTableDef();
+
+      ArrayList<PTFExpressionDef> args = tDef.getArgs();
+      int argsNum = args == null ? 0 : args.size();
+
+      if ( argsNum < 4 )
+      {
+        throwErrorWithSignature("at least 4 arguments required");
+      }
+
+      validateAndSetupPatternStr(evaluator, args);
+      validateAndSetupSymbolInfo(evaluator, args, argsNum);
+      validateAndSetupResultExprStr(evaluator, args, argsNum);
+      setupSymbolFunctionChain(evaluator);
+
+      /*
+       * setup OI for input to resultExpr select list
+       */
+      RowResolver selectListInputRR = NPath.createSelectListRR(evaluator, tDef.getInput());
+
+      /*
+       * parse ResultExpr Str and setup OI.
+       */
+      ResultExpressionParser resultExprParser =
+          new ResultExpressionParser(evaluator.resultExprStr, selectListInputRR);
+      try {
+        resultExprParser.translate();
+      }
+      catch(HiveException he) {
+        throw new SemanticException(he);
+      }
+      evaluator.resultExprInfo = resultExprParser.getResultExprInfo();
+      StructObjectInspector OI = evaluator.resultExprInfo.resultOI;
+      evaluator.selectListNames = new ArrayList<String>();
+      extractOIColumnNames(resultExprParser.selectListInputOI, evaluator.selectListNames);
+
+      setOutputOI(OI);
+    }
+    /*
+     * validate and setup patternStr
+     */
+    private void validateAndSetupPatternStr(NPath evaluator,
+        ArrayList<PTFExpressionDef> args) throws SemanticException {
+      PTFExpressionDef symboPatternArg = args.get(0);
+      ObjectInspector symbolPatternArgOI = symboPatternArg.getOI();
+
+      if ( !ObjectInspectorUtils.isConstantObjectInspector(symbolPatternArgOI) ||
+          (symbolPatternArgOI.getCategory() != ObjectInspector.Category.PRIMITIVE) ||
+          ((PrimitiveObjectInspector)symbolPatternArgOI).getPrimitiveCategory() !=
+          PrimitiveObjectInspector.PrimitiveCategory.STRING )
+      {
+        throwErrorWithSignature("Currently the symbol Pattern must be a Constant String.");
+      }
+
+      evaluator.patternStr = ((ConstantObjectInspector)symbolPatternArgOI).
+          getWritableConstantValue().toString();
+    }
+
+    /*
+     * validate and setup SymbolInfo
+     */
+    private void validateAndSetupSymbolInfo(NPath evaluator,
+        ArrayList<PTFExpressionDef> args,
+        int argsNum) throws SemanticException {
+      int symbolArgsSz = argsNum - 2;
+      if ( symbolArgsSz % 2 != 0)
+      {
+        throwErrorWithSignature("Symbol Name, Expression need to be specified in pairs: " +
+        		"there are odd number of symbol args");
+      }
+
+      evaluator.symInfo = new SymbolsInfo(symbolArgsSz/2);
+      for(int i=1; i <= symbolArgsSz; i += 2)
+      {
+        PTFExpressionDef symbolNameArg = args.get(i);
+        ObjectInspector symbolNameArgOI = symbolNameArg.getOI();
+
+        if ( !ObjectInspectorUtils.isConstantObjectInspector(symbolNameArgOI) ||
+            (symbolNameArgOI.getCategory() != ObjectInspector.Category.PRIMITIVE) ||
+            ((PrimitiveObjectInspector)symbolNameArgOI).getPrimitiveCategory() !=
+            PrimitiveObjectInspector.PrimitiveCategory.STRING )
+        {
+          throwErrorWithSignature(
+              String.format("Currently a Symbol Name(%s) must be a Constant String",
+                  symbolNameArg.getExpressionTreeString()));
+        }
+        String symbolName = ((ConstantObjectInspector)symbolNameArgOI).
+            getWritableConstantValue().toString();
+
+        PTFExpressionDef symolExprArg = args.get(i+1);
+        ObjectInspector symolExprArgOI = symolExprArg.getOI();
+        if ( (symolExprArgOI.getCategory() != ObjectInspector.Category.PRIMITIVE) ||
+              ((PrimitiveObjectInspector)symolExprArgOI).getPrimitiveCategory() !=
+              PrimitiveObjectInspector.PrimitiveCategory.BOOLEAN )
+        {
+          throwErrorWithSignature(String.format("Currently a Symbol Expression(%s) " +
+          		"must be a boolean expression", symolExprArg.getExpressionTreeString()));
+        }
+        evaluator.symInfo.add(symbolName, symolExprArg);
+      }
+    }
+
+    /*
+     * validate and setup resultExprStr
+     */
+    private void validateAndSetupResultExprStr(NPath evaluator,
+        ArrayList<PTFExpressionDef> args,
+        int argsNum) throws SemanticException {
+      PTFExpressionDef resultExprArg = args.get(argsNum - 1);
+      ObjectInspector resultExprArgOI = resultExprArg.getOI();
+
+      if ( !ObjectInspectorUtils.isConstantObjectInspector(resultExprArgOI) ||
+            (resultExprArgOI.getCategory() != ObjectInspector.Category.PRIMITIVE) ||
+            ((PrimitiveObjectInspector)resultExprArgOI).getPrimitiveCategory() !=
+            PrimitiveObjectInspector.PrimitiveCategory.STRING )
+      {
+        throwErrorWithSignature("Currently the result Expr parameter must be a Constant String.");
+      }
+
+      evaluator.resultExprStr = ((ConstantObjectInspector)resultExprArgOI).
+          getWritableConstantValue().toString();
+    }
+
+    /*
+     * setup SymbolFunction chain.
+     */
+    private void setupSymbolFunctionChain(NPath evaluator) throws SemanticException {
+      SymbolParser syP = new SymbolParser(evaluator.patternStr,
+          evaluator.symInfo.symbolExprsNames,
+          evaluator.symInfo.symbolExprsEvaluators, evaluator.symInfo.symbolExprsOIs);
+      syP.parse();
+      evaluator.syFn = syP.getSymbolFunction();
+    }
+
+    @Override
+    public boolean transformsRawInput()
+    {
+      return false;
+    }
+
+    @Override
+    public void initializeOutputOI() throws HiveException {
+      try {
+        NPath evaluator = (NPath) getEvaluator();
+        PartitionedTableFunctionDef tDef = evaluator.getTableDef();
+
+        ArrayList<PTFExpressionDef> args = tDef.getArgs();
+        int argsNum = args.size();
+
+        validateAndSetupPatternStr(evaluator, args);
+        validateAndSetupSymbolInfo(evaluator, args, argsNum);
+        validateAndSetupResultExprStr(evaluator, args, argsNum);
+        setupSymbolFunctionChain(evaluator);
+
+        /*
+         * setup OI for input to resultExpr select list
+         */
+        StructObjectInspector selectListInputOI = NPath.createSelectListOI( evaluator,
+            tDef.getInput());
+        ResultExprInfo resultExprInfo = evaluator.resultExprInfo;
+        ArrayList<ObjectInspector> selectListExprOIs = new ArrayList<ObjectInspector>();
+        resultExprInfo.resultExprEvals = new ArrayList<ExprNodeEvaluator>();
+
+        for(int i=0 ; i < resultExprInfo.resultExprNodes.size(); i++) {
+          ExprNodeDesc selectColumnExprNode =resultExprInfo.resultExprNodes.get(i);
+          ExprNodeEvaluator selectColumnExprEval =
+              ExprNodeEvaluatorFactory.get(selectColumnExprNode);
+          ObjectInspector selectColumnOI = selectColumnExprEval.initialize(selectListInputOI);
+          resultExprInfo.resultExprEvals.add(selectColumnExprEval);
+          selectListExprOIs.add(selectColumnOI);
+        }
+
+        resultExprInfo.resultOI = ObjectInspectorFactory.getStandardStructObjectInspector(
+            resultExprInfo.resultExprNames, selectListExprOIs);
+        setOutputOI(resultExprInfo.resultOI);
+      }
+      catch(SemanticException se) {
+        throw new HiveException(se);
+      }
+    }
+
+    @Override
+    public ArrayList<String> getOutputColumnNames() {
+      NPath evaluator = (NPath) getEvaluator();
+      return evaluator.resultExprInfo.getResultExprNames();
+    }
+
+
+
+    private static void extractOIColumnNames(StructObjectInspector OI,
+        ArrayList<String> oiColumnNames) {
+      StructTypeInfo t = (StructTypeInfo) TypeInfoUtils.getTypeInfoFromObjectInspector(OI);
+      ArrayList<String> fnames = t.getAllStructFieldNames();
+      oiColumnNames.addAll(fnames);
+    }
+
+  }
+
+  public ResultExprInfo getResultExprInfo() {
+    return resultExprInfo;
+  }
+
+  public void setResultExprInfo(ResultExprInfo resultExprInfo) {
+    this.resultExprInfo = resultExprInfo;
+  }
+
+  static class SymbolsInfo {
+    int sz;
+    ArrayList<ExprNodeEvaluator> symbolExprsEvaluators;
+    ArrayList<ObjectInspector> symbolExprsOIs;
+    ArrayList<String> symbolExprsNames;
+
+    SymbolsInfo(int sz)
+    {
+      this.sz = sz;
+      symbolExprsEvaluators = new ArrayList<ExprNodeEvaluator>(sz);
+      symbolExprsOIs = new ArrayList<ObjectInspector>(sz);
+      symbolExprsNames = new ArrayList<String>(sz);
+    }
+
+    void add(String name, PTFExpressionDef arg)
+    {
+      symbolExprsNames.add(name);
+      symbolExprsEvaluators.add(arg.getExprEvaluator());
+      symbolExprsOIs.add(arg.getOI());
+    }
+  }
+
+  public static class ResultExprInfo {
+    ArrayList<String> resultExprNames;
+    ArrayList<ExprNodeDesc> resultExprNodes;
+    private transient ArrayList<ExprNodeEvaluator> resultExprEvals;
+    private transient StructObjectInspector resultOI;
+
+    public ArrayList<String> getResultExprNames() {
+      return resultExprNames;
+    }
+    public void setResultExprNames(ArrayList<String> resultExprNames) {
+      this.resultExprNames = resultExprNames;
+    }
+    public ArrayList<ExprNodeDesc> getResultExprNodes() {
+      return resultExprNodes;
+    }
+    public void setResultExprNodes(ArrayList<ExprNodeDesc> resultExprNodes) {
+      this.resultExprNodes = resultExprNodes;
+    }
+  }
+
+  public static abstract class SymbolFunction
+  {
+    SymbolFunctionResult result;
+
+    public SymbolFunction()
+    {
+      result = new SymbolFunctionResult();
+    }
+
+    public static SymbolFunctionResult match(SymbolFunction syFn, Object row,
+        PTFPartitionIterator<Object> pItr) throws HiveException
+    {
+      int resetToIdx = pItr.getIndex() - 1;
+      try
+      {
+        return syFn.match(row, pItr);
+      } finally
+      {
+        pItr.resetToIndex(resetToIdx);
+      }
+    }
+
+    protected abstract SymbolFunctionResult match(Object row, PTFPartitionIterator<Object> pItr)
+        throws HiveException;
+
+    protected abstract boolean isOptional();
+  }
+
+  public static class Symbol extends SymbolFunction {
+    ExprNodeEvaluator symbolExprEval;
+    Converter converter;
+
+    public Symbol(ExprNodeEvaluator symbolExprEval, ObjectInspector symbolOI)
+    {
+      this.symbolExprEval = symbolExprEval;
+      converter = ObjectInspectorConverters.getConverter(
+          symbolOI,
+          PrimitiveObjectInspectorFactory.javaBooleanObjectInspector);
+    }
+
+    @Override
+    protected SymbolFunctionResult match(Object row, PTFPartitionIterator<Object> pItr)
+        throws HiveException
+    {
+      Object val = null;
+      val = symbolExprEval.evaluate(row);
+      val = converter.convert(val);
+      result.matches = ((Boolean) val).booleanValue();
+      result.nextRow = pItr.getIndex();
+
+      return result;
+    }
+
+    @Override
+    protected boolean isOptional()
+    {
+      return false;
+    }
+  }
+
+  public static class Star extends SymbolFunction {
+    SymbolFunction symbolFn;
+
+    public Star(SymbolFunction symbolFn)
+    {
+      this.symbolFn = symbolFn;
+    }
+
+    @Override
+    protected SymbolFunctionResult match(Object row, PTFPartitionIterator<Object> pItr)
+        throws HiveException
+    {
+      result.matches = true;
+      SymbolFunctionResult rowResult = symbolFn.match(row, pItr);
+
+      while (rowResult.matches && pItr.hasNext())
+      {
+        row = pItr.next();
+        rowResult = symbolFn.match(row, pItr);
+      }
+
+      result.nextRow = pItr.getIndex() - 1;
+      return result;
+    }
+
+    @Override
+    protected boolean isOptional()
+    {
+      return true;
+    }
+  }
+
+  public static class Plus extends SymbolFunction {
+    SymbolFunction symbolFn;
+
+    public Plus(SymbolFunction symbolFn)
+    {
+      this.symbolFn = symbolFn;
+    }
+
+    @Override
+    protected SymbolFunctionResult match(Object row, PTFPartitionIterator<Object> pItr)
+        throws HiveException
+    {
+      SymbolFunctionResult rowResult = symbolFn.match(row, pItr);
+
+      if (!rowResult.matches)
+      {
+        result.matches = false;
+        result.nextRow = pItr.getIndex() - 1;
+        return result;
+      }
+
+      result.matches = true;
+      while (rowResult.matches && pItr.hasNext())
+      {
+        row = pItr.next();
+        rowResult = symbolFn.match(row, pItr);
+      }
+
+      result.nextRow = pItr.getIndex() - 1;
+      return result;
+    }
+
+    @Override
+    protected boolean isOptional()
+    {
+      return false;
+    }
+  }
+
+  public static class Chain extends SymbolFunction
+  {
+    ArrayList<SymbolFunction> components;
+
+    public Chain(ArrayList<SymbolFunction> components)
+    {
+      this.components = components;
+    }
+
+    /*
+     * Iterate over the Symbol Functions in the Chain:
+     * - If we are not at the end of the Iterator (i.e. row != null )
+     * - match the current componentFn
+     * - if it returns false, then return false
+     * - otherwise set row to the next row from the Iterator.
+     * - if we are at the end of the Iterator
+     * - skip any optional Symbol Fns (star patterns) at the end.
+     * - but if we come to a non optional Symbol Fn, return false.
+     * - if we match all Fns in the chain return true.
+     */
+    @Override
+    protected SymbolFunctionResult match(Object row, PTFPartitionIterator<Object> pItr)
+        throws HiveException
+    {
+      SymbolFunctionResult componentResult = null;
+      for (SymbolFunction sFn : components)
+      {
+        if (row != null)
+        {
+          componentResult = sFn.match(row, pItr);
+          if (!componentResult.matches)
+          {
+            result.matches = false;
+            result.nextRow = componentResult.nextRow;
+            return result;
+          }
+          row = pItr.resetToIndex(componentResult.nextRow);
+        }
+        else
+        {
+          if (!sFn.isOptional())
+          {
+            result.matches = false;
+            result.nextRow = componentResult.nextRow;
+            return result;
+          }
+        }
+      }
+
+      result.matches = true;
+      result.nextRow = componentResult.nextRow;
+      return result;
+    }
+
+    @Override
+    protected boolean isOptional()
+    {
+      return false;
+    }
+  }
+
+
+  public static class SymbolFunctionResult
+  {
+    /*
+     * does the row match the pattern represented by this SymbolFunction
+     */
+    public boolean matches;
+    /*
+     * what is the index of the row beyond the set of rows that match this pattern.
+     */
+    public int nextRow;
+  }
+
+  public static class SymbolParser
+  {
+    String patternStr;
+    String[] symbols;
+    HashMap<String, Object[]> symbolExprEvalMap;
+    ArrayList<SymbolFunction> symbolFunctions;
+    Chain symbolFnChain;
+
+
+    public SymbolParser(String patternStr, ArrayList<String> symbolNames,
+        ArrayList<ExprNodeEvaluator> symbolExprEvals, ArrayList<ObjectInspector> symbolExprOIs)
+    {
+      super();
+      this.patternStr = patternStr;
+      symbolExprEvalMap = new HashMap<String, Object[]>();
+      int sz = symbolNames.size();
+      for(int i=0; i < sz; i++)
+      {
+        String symbolName = symbolNames.get(i);
+        ExprNodeEvaluator symbolExprEval = symbolExprEvals.get(i);
+        ObjectInspector symbolExprOI = symbolExprOIs.get(i);
+        symbolExprEvalMap.put(symbolName.toLowerCase(),
+            new Object[] {symbolExprEval, symbolExprOI});
+      }
+    }
+
+    public SymbolFunction getSymbolFunction()
+    {
+      return symbolFnChain;
+    }
+
+    public void parse() throws SemanticException
+    {
+      symbols = patternStr.split("\\.");
+      symbolFunctions = new ArrayList<SymbolFunction>();
+
+      for(String symbol : symbols)
+      {
+        boolean isStar = symbol.endsWith("*");
+        boolean isPlus = symbol.endsWith("+");
+
+        symbol = (isStar || isPlus) ? symbol.substring(0, symbol.length() - 1) : symbol;
+        Object[] symbolDetails = symbolExprEvalMap.get(symbol.toLowerCase());
+        if ( symbolDetails == null )
+        {
+          throw new SemanticException(String.format("Unknown Symbol %s", symbol));
+        }
+
+        ExprNodeEvaluator symbolExprEval = (ExprNodeEvaluator) symbolDetails[0];
+        ObjectInspector symbolExprOI = (ObjectInspector) symbolDetails[1];
+        SymbolFunction sFn = new Symbol(symbolExprEval, symbolExprOI);
+
+        if ( isStar )
+        {
+          sFn = new Star(sFn);
+        }
+        else if ( isPlus )
+        {
+          sFn = new Plus(sFn);
+        }
+        symbolFunctions.add(sFn);
+      }
+      symbolFnChain = new Chain(symbolFunctions);
+    }
+  }
+
+  /*
+   * ResultExpression is a Select List with the following variation:
+   * - the select keyword is optional. The parser checks if the expression doesn't start with
+   * select; if not it prefixes it.
+   * - Window Fn clauses are not permitted.
+   * - expressions can operate on the input columns plus the psuedo column 'path'
+   * which is array of
+   * structs. The shape of the struct is
+   * the same as the input.
+   */
+  public static class ResultExpressionParser {
+    String resultExprString;
+
+    RowResolver selectListInputRowResolver;
+    TypeCheckCtx selectListInputTypeCheckCtx;
+    StructObjectInspector selectListInputOI;
+
+    ArrayList<WindowExpressionSpec> selectSpec;
+
+    ResultExprInfo resultExprInfo;
+
+    public ResultExpressionParser(String resultExprString,
+        RowResolver selectListInputRowResolver)
+    {
+      this.resultExprString = resultExprString;
+      this.selectListInputRowResolver = selectListInputRowResolver;
+    }
+
+    public void translate() throws SemanticException, HiveException
+    {
+      setupSelectListInputInfo();
+      fixResultExprString();
+      parse();
+      validateSelectExpr();
+      buildSelectListEvaluators();
+    }
+
+    public ResultExprInfo getResultExprInfo() {
+      return resultExprInfo;
+    }
+
+    private void buildSelectListEvaluators() throws SemanticException, HiveException
+    {
+      resultExprInfo = new ResultExprInfo();
+      resultExprInfo.resultExprEvals = new ArrayList<ExprNodeEvaluator>();
+      resultExprInfo.resultExprNames = new ArrayList<String>();
+      resultExprInfo.resultExprNodes = new ArrayList<ExprNodeDesc>();
+      //result
+      ArrayList<ObjectInspector> selectListExprOIs = new ArrayList<ObjectInspector>();
+      int i = 0;
+      for(WindowExpressionSpec expr : selectSpec)
+      {
+        String selectColName = expr.getAlias();
+        ASTNode selectColumnNode = expr.getExpression();
+        ExprNodeDesc selectColumnExprNode =
+            ResultExpressionParser.buildExprNode(selectColumnNode,
+            selectListInputTypeCheckCtx);
+        ExprNodeEvaluator selectColumnExprEval =
+            ExprNodeEvaluatorFactory.get(selectColumnExprNode);
+        ObjectInspector selectColumnOI = null;
+        selectColumnOI = selectColumnExprEval.initialize(selectListInputOI);
+
+        selectColName = getColumnName(selectColName, selectColumnExprNode, i);
+
+        resultExprInfo.resultExprEvals.add(selectColumnExprEval);
+        selectListExprOIs.add(selectColumnOI);
+        resultExprInfo.resultExprNodes.add(selectColumnExprNode);
+        resultExprInfo.resultExprNames.add(selectColName);
+        i++;
+      }
+
+      resultExprInfo.resultOI = ObjectInspectorFactory.getStandardStructObjectInspector(
+          resultExprInfo.resultExprNames, selectListExprOIs);
+    }
+
+    private void setupSelectListInputInfo() throws SemanticException
+    {
+      selectListInputTypeCheckCtx = new TypeCheckCtx(selectListInputRowResolver);
+      selectListInputTypeCheckCtx.setUnparseTranslator(null);
+      /*
+       * create SelectListOI
+       */
+      selectListInputOI = (StructObjectInspector)
+          PTFTranslator.getStandardStructOI(selectListInputRowResolver);
+    }
+
+    private void fixResultExprString()
+    {
+      String r = resultExprString.trim();
+      String prefix = r.substring(0, 6);
+      if (!prefix.toLowerCase().equals("select"))
+      {
+        r = "select " + r;
+      }
+      resultExprString = r;
+    }
+
+    private void parse() throws SemanticException
+    {
+      selectSpec = SemanticAnalyzer.parseSelect(resultExprString);
+    }
+
+    private void validateSelectExpr() throws SemanticException
+    {
+      for (WindowExpressionSpec expr : selectSpec)
+      {
+        PTFTranslator.validateNoLeadLagInValueBoundarySpec(expr.getExpression());
+      }
+    }
+
+    private String getColumnName(String alias, ExprNodeDesc exprNode, int colIdx)
+    {
+      if (alias != null)
+      {
+        return alias;
+      }
+      else if (exprNode instanceof ExprNodeColumnDesc)
+      {
+        ExprNodeColumnDesc colDesc = (ExprNodeColumnDesc) exprNode;
+        return colDesc.getColumn();
+      }
+      return "npath_col_" + colIdx;
+    }
+
+    public static ExprNodeDesc buildExprNode(ASTNode expr,
+        TypeCheckCtx typeCheckCtx) throws SemanticException
+    {
+      // todo: use SemanticAnalyzer::genExprNodeDesc
+      // currently SA not available to PTFTranslator.
+      Map<ASTNode, ExprNodeDesc> map = TypeCheckProcFactory
+          .genExprNode(expr, typeCheckCtx);
+      ExprNodeDesc desc = map.get(expr);
+      if (desc == null) {
+        String errMsg = typeCheckCtx.getError();
+        if ( errMsg == null) {
+          errMsg = "Error in parsing ";
+        }
+        throw new SemanticException(errMsg);
+      }
+      return desc;
+    }
+  }
+
+  public static final String PATHATTR_NAME = "tpath";
+
+  /*
+   * add array<struct> to the list of columns
+   */
+  protected static RowResolver createSelectListRR(NPath evaluator,
+      PTFInputDef inpDef) throws SemanticException {
+    RowResolver rr = new RowResolver();
+    RowResolver inputRR = inpDef.getOutputShape().getRr();
+    boolean inputColNamesKnown = evaluator.inputColumnNames != null;
+
+    if ( !inputColNamesKnown ) {
+      evaluator.inputColumnNames = new ArrayList<String>();
+    }
+
+    ArrayList<ObjectInspector> inpColOIs = new ArrayList<ObjectInspector>();
+
+    for (ColumnInfo inpCInfo : inputRR.getColumnInfos()) {
+      ColumnInfo cInfo = new ColumnInfo(inpCInfo);
+      String colAlias = cInfo.getAlias();
+
+      String[] tabColAlias = inputRR.reverseLookup(inpCInfo.getInternalName());
+      if (tabColAlias != null) {
+        colAlias = tabColAlias[1];
+      }
+      ASTNode inExpr = null;
+      inExpr = PTFTranslator.getASTNode(inpCInfo, inputRR);
+      if ( inExpr != null ) {
+        rr.putExpression(inExpr, cInfo);
+      }
+      else {
+        colAlias = colAlias == null ? cInfo.getInternalName() : colAlias;
+        rr.put(cInfo.getTabAlias(), colAlias, cInfo);
+      }
+
+      if ( !inputColNamesKnown ) {
+        evaluator.inputColumnNames.add(colAlias);
+      }
+      inpColOIs.add(cInfo.getObjectInspector());
+    }
+
+    StandardListObjectInspector pathAttrOI =
+        ObjectInspectorFactory.getStandardListObjectInspector(
+        ObjectInspectorFactory.getStandardStructObjectInspector(evaluator.inputColumnNames,
+            inpColOIs));
+
+    ColumnInfo pathColumn = new ColumnInfo(PATHATTR_NAME,
+        TypeInfoUtils.getTypeInfoFromObjectInspector(pathAttrOI),
+        null,
+        false, false);
+    rr.put(null, PATHATTR_NAME, pathColumn);
+
+    return rr;
+  }
+
+  protected static StructObjectInspector createSelectListOI(NPath evaluator, PTFInputDef inpDef) {
+    StructObjectInspector inOI = inpDef.getOutputShape().getOI();
+    ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
+    for(StructField f : inOI.getAllStructFieldRefs()) {
+      fieldOIs.add(f.getFieldObjectInspector());
+    }
+
+    StandardListObjectInspector pathAttrOI =
+        ObjectInspectorFactory.getStandardListObjectInspector(
+        ObjectInspectorFactory.getStandardStructObjectInspector(evaluator.inputColumnNames,
+            fieldOIs));
+
+    ArrayList<ObjectInspector> selectFieldOIs = new ArrayList<ObjectInspector>();
+    selectFieldOIs.addAll(fieldOIs);
+    selectFieldOIs.add(pathAttrOI);
+    return ObjectInspectorFactory.getStandardStructObjectInspector(
+        evaluator.selectListNames, selectFieldOIs);
+  }
+
+  public static Object getSelectListInput(Object currRow, ObjectInspector rowOI,
+      PTFPartitionIterator<Object> pItr, int sz) {
+    ArrayList<Object> oRow = new ArrayList<Object>();
+    List<?> currRowAsStdObject = (List<?>) ObjectInspectorUtils
+        .copyToStandardObject(currRow, rowOI);
+    oRow.addAll(currRowAsStdObject);
+    oRow.add(getPath(currRow, rowOI, pItr, sz));
+    return oRow;
+  }
+
+  public static ArrayList<Object> getPath(Object currRow, ObjectInspector rowOI,
+      PTFPartitionIterator<Object> pItr, int sz) {
+    int idx = pItr.getIndex() - 1;
+    ArrayList<Object> path = new ArrayList<Object>();
+    path.add(ObjectInspectorUtils.copyToStandardObject(currRow, rowOI));
+    int pSz = 1;
+
+    while (pSz < sz && pItr.hasNext())
+    {
+      currRow = pItr.next();
+      path.add(ObjectInspectorUtils.copyToStandardObject(currRow, rowOI));
+      pSz++;
+    }
+    pItr.resetToIndex(idx);
+    return path;
+  }
+}

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/Noop.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/Noop.java?rev=1463556&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/Noop.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/Noop.java Tue Apr  2 14:16:34 2013
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.udf.ptf;
+
+import java.util.ArrayList;
+
+import org.apache.hadoop.hive.ql.exec.PTFPartition;
+import org.apache.hadoop.hive.ql.exec.PTFPartition.PTFPartitionIterator;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.PTFDesc;
+import org.apache.hadoop.hive.ql.plan.PTFDesc.PartitionedTableFunctionDef;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+
+public class Noop extends TableFunctionEvaluator
+{
+
+  @Override
+  public PTFPartition execute(PTFPartition iPart) throws HiveException
+  {
+    return iPart;
+  }
+
+  @Override
+  protected void execute(PTFPartitionIterator<Object> pItr, PTFPartition oPart)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  public static class NoopResolver extends TableFunctionResolver
+  {
+
+    @Override
+    protected TableFunctionEvaluator createEvaluator(PTFDesc ptfDesc, PartitionedTableFunctionDef tDef)
+    {
+      return new Noop();
+    }
+
+    @Override
+    public void setupOutputOI() throws SemanticException
+    {
+      StructObjectInspector OI = getEvaluator().getTableDef().getInput().getOutputShape().getOI();
+      setOutputOI(OI);
+    }
+
+    /*
+     * (non-Javadoc)
+     * @see org.apache.hadoop.hive.ql.udf.ptf.TableFunctionResolver#carryForwardNames()
+     * Setting to true is correct only for special internal Functions.
+     */
+    @Override
+    public boolean carryForwardNames() {
+      return true;
+    }
+
+    /*
+     * (non-Javadoc)
+     * @see org.apache.hadoop.hive.ql.udf.ptf.TableFunctionResolver#getOutputNames()
+     * Set to null only because carryForwardNames is true.
+     */
+    @Override
+    public ArrayList<String> getOutputColumnNames() {
+      return null;
+    }
+
+    @Override
+    public boolean transformsRawInput()
+    {
+      return false;
+    }
+
+    @Override
+    public void initializeOutputOI() throws HiveException {
+      setupOutputOI();
+
+    }
+
+  }
+
+}

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/NoopWithMap.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/NoopWithMap.java?rev=1463556&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/NoopWithMap.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/NoopWithMap.java Tue Apr  2 14:16:34 2013
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.udf.ptf;
+
+import java.util.ArrayList;
+
+import org.apache.hadoop.hive.ql.exec.PTFPartition;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.PTFDesc;
+import org.apache.hadoop.hive.ql.plan.PTFDesc.PartitionedTableFunctionDef;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+
+public class NoopWithMap extends Noop
+{
+  @Override
+  public PTFPartition execute(PTFPartition iPart) throws HiveException
+  {
+    return iPart;
+  }
+
+  @Override
+  protected PTFPartition _transformRawInput(PTFPartition iPart) throws HiveException
+  {
+    return iPart;
+  }
+
+  public static class NoopWithMapResolver extends TableFunctionResolver
+  {
+
+    @Override
+    protected TableFunctionEvaluator createEvaluator(PTFDesc ptfDesc, PartitionedTableFunctionDef tDef)
+    {
+      return new NoopWithMap();
+    }
+
+    @Override
+    public void setupOutputOI() throws SemanticException
+    {
+      StructObjectInspector OI = getEvaluator().getTableDef().getInput().getOutputShape().getOI();
+      setOutputOI(OI);
+    }
+
+    /*
+     * (non-Javadoc)
+     * @see org.apache.hadoop.hive.ql.udf.ptf.TableFunctionResolver#carryForwardNames()
+     * Setting to true is correct only for special internal Functions.
+     */
+    @Override
+    public boolean carryForwardNames() {
+      return true;
+    }
+
+    /*
+     * (non-Javadoc)
+     * @see org.apache.hadoop.hive.ql.udf.ptf.TableFunctionResolver#getOutputNames()
+     * Set to null only because carryForwardNames is true.
+     */
+    @Override
+    public ArrayList<String> getOutputColumnNames() {
+      return null;
+    }
+
+    @Override
+    public void setupRawInputOI() throws SemanticException
+    {
+      StructObjectInspector OI = getEvaluator().getTableDef().getInput().getOutputShape().getOI();
+      setRawInputOI(OI);
+    }
+
+    /*
+     * (non-Javadoc)
+     * @see org.apache.hadoop.hive.ql.udf.ptf.TableFunctionResolver#getOutputNames()
+     * Set to null only because carryForwardNames is true.
+     */
+    @Override
+    public ArrayList<String> getRawInputColumnNames() throws SemanticException {
+      return null;
+    }
+
+    @Override
+    public boolean transformsRawInput()
+    {
+      return true;
+    }
+
+    @Override
+    public void initializeOutputOI() throws HiveException {
+      setupOutputOI();
+    }
+
+    @Override
+    public void initializeRawInputOI() throws HiveException {
+      setupRawInputOI();
+    }
+
+  }
+
+
+}

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/TableFunctionEvaluator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/TableFunctionEvaluator.java?rev=1463556&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/TableFunctionEvaluator.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/TableFunctionEvaluator.java Tue Apr  2 14:16:34 2013
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.udf.ptf;
+
+import org.apache.hadoop.hive.ql.exec.PTFOperator;
+import org.apache.hadoop.hive.ql.exec.PTFPartition;
+import org.apache.hadoop.hive.ql.exec.PTFPartition.PTFPartitionIterator;
+import org.apache.hadoop.hive.ql.exec.PTFUtils;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.PTFDesc;
+import org.apache.hadoop.hive.ql.plan.PTFDesc.PartitionedTableFunctionDef;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+
+/**
+ * Based on Hive {@link GenericUDAFEvaluator}. Break up the responsibility of the old AsbtractTableFunction
+ * class into a Resolver and Evaluator.
+ * <p>
+ * The Evaluator also holds onto the {@link TableFunctionDef}. This provides information
+ * about the arguments to the function, the shape of the Input partition and the Partitioning details.
+ * The Evaluator is responsible for providing the 2 execute methods:
+ * <ol>
+ * <li><b>execute:</b> which is invoked after the input is partitioned; the contract
+ * is, it is given an input Partition and must return an output Partition. The shape of the output
+ * Partition is obtained from the getOutputOI call.
+ * <li><b>transformRawInput:</b> In the case where this function indicates that it will transform the raw input
+ * before it is fed through the partitioning mechanics, this function is called. Again the contract is
+ * t is given an input Partition and must return an Partition. The shape of the output Partition is
+ * obtained from getRawInputOI() call.
+ * </ol>
+ *
+ */
+public abstract class TableFunctionEvaluator
+{
+  /*
+   * how is this different from the OutpuShape set on the TableDef.
+   * This is the OI of the object coming out of the PTF.
+   * It is put in an output Partition whose Serde is usually LazyBinarySerde.
+   * So the next PTF (or Operator) in the chain gets a LazyBinaryStruct.
+   */
+  transient protected StructObjectInspector OI;
+  /*
+   * same comment as OI applies here.
+   */
+  transient protected StructObjectInspector rawInputOI;
+  protected PartitionedTableFunctionDef tDef;
+  protected PTFDesc ptfDesc;
+  String partitionClass;
+  int partitionMemSize;
+  boolean transformsRawInput;
+  transient protected PTFPartition outputPartition;
+
+  static{
+    PTFUtils.makeTransient(TableFunctionEvaluator.class, "OI");
+    PTFUtils.makeTransient(TableFunctionEvaluator.class, "rawInputOI");
+    PTFUtils.makeTransient(TableFunctionEvaluator.class, "outputPartition");
+  }
+
+
+  public StructObjectInspector getOutputOI()
+  {
+    return OI;
+  }
+
+  protected void setOutputOI(StructObjectInspector outputOI)
+  {
+    OI = outputOI;
+  }
+
+  public PartitionedTableFunctionDef getTableDef()
+  {
+    return tDef;
+  }
+
+  public void setTableDef(PartitionedTableFunctionDef tDef)
+  {
+    this.tDef = tDef;
+  }
+
+  protected PTFDesc getQueryDef()
+  {
+    return ptfDesc;
+  }
+
+  protected void setQueryDef(PTFDesc ptfDesc)
+  {
+    this.ptfDesc = ptfDesc;
+  }
+
+  public String getPartitionClass()
+  {
+    return partitionClass;
+  }
+
+  public void setPartitionClass(String partitionClass)
+  {
+    this.partitionClass = partitionClass;
+  }
+
+  public int getPartitionMemSize()
+  {
+    return partitionMemSize;
+  }
+
+  public void setPartitionMemSize(int partitionMemSize)
+  {
+    this.partitionMemSize = partitionMemSize;
+  }
+
+  public StructObjectInspector getRawInputOI()
+  {
+    return rawInputOI;
+  }
+
+  protected void setRawInputOI(StructObjectInspector rawInputOI)
+  {
+    this.rawInputOI = rawInputOI;
+  }
+
+  public boolean isTransformsRawInput() {
+    return transformsRawInput;
+  }
+
+  public void setTransformsRawInput(boolean transformsRawInput) {
+    this.transformsRawInput = transformsRawInput;
+  }
+
+  public PTFPartition execute(PTFPartition iPart)
+      throws HiveException
+  {
+    PTFPartitionIterator<Object> pItr = iPart.iterator();
+    PTFOperator.connectLeadLagFunctionsToPartition(ptfDesc, pItr);
+
+    if ( outputPartition == null ) {
+      outputPartition = new PTFPartition(getPartitionClass(),
+          getPartitionMemSize(), tDef.getOutputShape().getSerde(), OI);
+    }
+    else {
+      outputPartition.reset();
+    }
+
+    execute(pItr, outputPartition);
+    return outputPartition;
+  }
+
+  protected abstract void execute(PTFPartitionIterator<Object> pItr, PTFPartition oPart) throws HiveException;
+
+  public PTFPartition transformRawInput(PTFPartition iPart) throws HiveException
+  {
+    if ( !isTransformsRawInput())
+    {
+      throw new HiveException(String.format("Internal Error: mapExecute called on function (%s)that has no Map Phase", tDef.getName()));
+    }
+    return _transformRawInput(iPart);
+  }
+
+  protected PTFPartition _transformRawInput(PTFPartition iPart) throws HiveException
+  {
+    return null;
+  }
+}
+

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/TableFunctionResolver.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/TableFunctionResolver.java?rev=1463556&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/TableFunctionResolver.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/TableFunctionResolver.java Tue Apr  2 14:16:34 2013
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.udf.ptf;
+
+import java.util.ArrayList;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator;
+import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.PTFDesc;
+import org.apache.hadoop.hive.ql.plan.PTFDesc.PartitionedTableFunctionDef;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+
+/**
+ * Based on Hive {@link GenericUDAFResolver}. Break up the responsibility of the
+ * old AbstractTableFunction class into a Resolver and Evaluator.
+ * The Resolver is responsible for:
+ * <ol>
+ * <li> setting up the {@link tableFunctionEvaluator}
+ * <li> Setting up the The raw and output ObjectInspectors of the Evaluator.
+ * <li> The Evaluator also holds onto the {@link TableFunctionDef}. This provides information
+ * about the arguments to the function, the shape of the Input partition and the Partitioning details.
+ * </ol>
+ * The Resolver for a function is obtained from the {@link FunctionRegistry}. The Resolver is initialized
+ * by the following 4 step process:
+ * <ol>
+ * <li> The initialize method is called; which is passed the {@link PTFDesc} and the {@link TableFunctionDef}.
+ * <li> The resolver is then asked to setup the Raw ObjectInspector. This is only required if the Function reshapes
+ * the raw input.
+ * <li> Once the Resolver has had a chance to compute the shape of the Raw Input that is fed to the partitioning
+ * machinery; the translator sets up the partitioning details on the tableFuncDef.
+ * <li> finally the resolver is asked to setup the output ObjectInspector.
+ * </ol>
+ */
+@SuppressWarnings("deprecation")
+public abstract class TableFunctionResolver
+{
+  TableFunctionEvaluator evaluator;
+  PTFDesc ptfDesc;
+
+  /*
+   * - called during translation.
+   * - invokes createEvaluator which must be implemented by a subclass
+   * - sets up the evaluator with references to the TableDef, PartitionClass, PartitonMemsize and
+   *   the transformsRawInput boolean.
+   */
+  public void initialize(HiveConf cfg, PTFDesc ptfDesc, PartitionedTableFunctionDef tDef)
+      throws SemanticException
+  {
+    this.ptfDesc = ptfDesc;
+    String partitionClass = HiveConf.getVar(cfg, ConfVars.HIVE_PTF_PARTITION_PERSISTENCE_CLASS);
+    int partitionMemSize = HiveConf.getIntVar(cfg, ConfVars.HIVE_PTF_PARTITION_PERSISTENT_SIZE);
+
+    evaluator = createEvaluator(ptfDesc, tDef);
+    evaluator.setTransformsRawInput(transformsRawInput());
+    evaluator.setTableDef(tDef);
+    evaluator.setQueryDef(ptfDesc);
+    evaluator.setPartitionClass(partitionClass);
+    evaluator.setPartitionMemSize(partitionMemSize);
+
+  }
+
+  /*
+   * called during deserialization of a QueryDef during runtime.
+   */
+  public void initialize(PTFDesc ptfDesc, PartitionedTableFunctionDef tDef, TableFunctionEvaluator evaluator)
+      throws HiveException
+  {
+    this.evaluator = evaluator;
+    this.ptfDesc = ptfDesc;
+    evaluator.setTableDef(tDef);
+    evaluator.setQueryDef(ptfDesc);
+  }
+
+  public TableFunctionEvaluator getEvaluator()
+  {
+    return evaluator;
+  }
+
+  /*
+   * - a subclass must provide this method.
+   * - this method is invoked during translation and also when the Operator is initialized during runtime.
+   * - a subclass must use this call to setup the shape of its output.
+   * - subsequent to this call, a call to getOutputOI call on the {@link TableFunctionEvaluator} must return the OI
+   * of the output of this function.
+   */
+  public abstract void setupOutputOI() throws SemanticException;
+
+  /*
+   * A PTF Function must provide the 'external' names of the columns in its Output.
+   *
+   */
+  public abstract ArrayList<String> getOutputColumnNames() throws SemanticException;
+
+
+  /**
+   * This method is invoked during runtime(during deserialization of theQueryDef).
+   * At this point the TableFunction can assume that the {@link ExprNodeDesc Expression Nodes}
+   * exist for all the Def (ArgDef, ColumnDef, WindowDef..). It is the responsibility of
+   * the TableFunction to construct the {@link ExprNodeEvaluator evaluators} and setup the OI.
+   *
+   * @param tblFuncDef
+   * @param ptfDesc
+   * @throws HiveException
+   */
+  public abstract void initializeOutputOI() throws HiveException;
+
+  /*
+   * - Called on functions that transform the raw input.
+   * - this method is invoked during translation and also when the Operator is initialized during runtime.
+   * - a subclass must use this call to setup the shape of the raw input, that is fed to the partitioning mechanics.
+   * - subsequent to this call, a call to getRawInputOI call on the {@link TableFunctionEvaluator} must return the OI
+   *   of the output of this function.
+   */
+  public void setupRawInputOI() throws SemanticException
+  {
+    if (!transformsRawInput())
+    {
+      return;
+    }
+    throw new SemanticException(
+        "Function has map phase, must extend setupMapOI");
+  }
+
+  /*
+   * A PTF Function must provide the 'external' names of the columns in the transformed Raw Input.
+   *
+   */
+  public ArrayList<String> getRawInputColumnNames() throws SemanticException {
+    if (!transformsRawInput())
+    {
+      return null;
+    }
+    throw new SemanticException(
+        "Function transforms Raw Input; must extend getRawColumnInputNames");
+  }
+
+  /*
+   * Same responsibility as initializeOI, but for the RawInput.
+   */
+  public void initializeRawInputOI() throws HiveException
+  {
+    if (!transformsRawInput())
+    {
+      return;
+    }
+    throw new HiveException(
+        "Function has map phase, must extend initializeRawInputOI");
+  }
+
+  /*
+   * callback method used by subclasses to set the RawInputOI on the Evaluator.
+   */
+  protected void setRawInputOI(StructObjectInspector rawInputOI)
+  {
+    evaluator.setRawInputOI(rawInputOI);
+  }
+
+  /*
+   * callback method used by subclasses to set the OutputOI on the Evaluator.
+   */
+  protected void setOutputOI(StructObjectInspector outputOI)
+  {
+    evaluator.setOutputOI(outputOI);
+  }
+
+  public PTFDesc getPtfDesc()
+  {
+    return ptfDesc;
+  }
+
+  /*
+   * This is used during translation to decide if the internalName -> alias mapping from the Input to the PTF is carried
+   * forward when building the Output RR for this PTF.
+   * This is used by internal PTFs: NOOP, WindowingTableFunction to make names in its input available in the Output.
+   * In general this should be false; and the names used for the Output Columns must be provided by the PTF Writer in the
+   * function getOutputNames.
+   */
+  public boolean carryForwardNames() {
+    return false;
+  }
+
+  /*
+   * a subclass must indicate whether it will transform the raw input before it is fed through the
+   * partitioning mechanics.
+   */
+  public abstract boolean transformsRawInput();
+
+  /*
+   * a subclass must provide the {@link TableFunctionEvaluator} instance.
+   */
+  protected abstract TableFunctionEvaluator createEvaluator(PTFDesc ptfDesc, PartitionedTableFunctionDef tDef);
+}