You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/04/02 16:16:37 UTC
svn commit: r1463556 [8/15] - in /hive/trunk:
common/src/java/org/apache/hadoop/hive/conf/ data/files/ ql/if/
ql/src/gen/thrift/gen-cpp/
ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/
ql/src/gen/thrift/gen-php/ ql/src/gen/thrift/gen...
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFLeadLag.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFLeadLag.java?rev=1463556&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFLeadLag.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFLeadLag.java Tue Apr 2 14:16:34 2013
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.udf.generic;
+
+import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator;
+import org.apache.hadoop.hive.ql.exec.PTFPartition.PTFPartitionIterator;
+import org.apache.hadoop.hive.ql.exec.PTFUtils;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
+
+public abstract class GenericUDFLeadLag extends GenericUDF
+{
+ transient ExprNodeEvaluator exprEvaluator;
+ transient PTFPartitionIterator<Object> pItr;
+ ObjectInspector firstArgOI;
+
+ private PrimitiveObjectInspector amtOI;
+
+ static{
+ PTFUtils.makeTransient(GenericUDFLeadLag.class, "exprEvaluator");
+ PTFUtils.makeTransient(GenericUDFLeadLag.class, "pItr");
+ }
+
+ @Override
+ public Object evaluate(DeferredObject[] arguments) throws HiveException
+ {
+ DeferredObject amt = arguments[1];
+ int intAmt = 0;
+ try
+ {
+ intAmt = PrimitiveObjectInspectorUtils.getInt(amt.get(), amtOI);
+ }
+ catch (NullPointerException e)
+ {
+ intAmt = Integer.MAX_VALUE;
+ }
+ catch (NumberFormatException e)
+ {
+ intAmt = Integer.MAX_VALUE;
+ }
+
+ int idx = pItr.getIndex() - 1;
+ try
+ {
+ Object row = getRow(intAmt);
+ Object ret = exprEvaluator.evaluate(row);
+ ret = ObjectInspectorUtils.copyToStandardObject(ret, firstArgOI, ObjectInspectorCopyOption.WRITABLE);
+ return ret;
+ }
+ finally
+ {
+ Object currRow = pItr.resetToIndex(idx);
+ // reevaluate expression on current Row, to trigger the Lazy object
+ // caches to be reset to the current row.
+ exprEvaluator.evaluate(currRow);
+ }
+
+ }
+
+ @Override
+ public ObjectInspector initialize(ObjectInspector[] arguments)
+ throws UDFArgumentException
+ {
+ // index has to be a primitive
+ if (arguments[1] instanceof PrimitiveObjectInspector)
+ {
+ amtOI = (PrimitiveObjectInspector) arguments[1];
+ }
+ else
+ {
+ throw new UDFArgumentTypeException(1,
+ "Primitive Type is expected but "
+ + arguments[1].getTypeName() + "\" is found");
+ }
+
+ firstArgOI = arguments[0];
+ return ObjectInspectorUtils.getStandardObjectInspector(firstArgOI,
+ ObjectInspectorCopyOption.WRITABLE);
+ }
+
+
+
+ public ExprNodeEvaluator getExprEvaluator()
+ {
+ return exprEvaluator;
+ }
+
+ public void setExprEvaluator(ExprNodeEvaluator exprEvaluator)
+ {
+ this.exprEvaluator = exprEvaluator;
+ }
+
+ public PTFPartitionIterator<Object> getpItr()
+ {
+ return pItr;
+ }
+
+ public void setpItr(PTFPartitionIterator<Object> pItr)
+ {
+ this.pItr = pItr;
+ }
+
+ @Override
+ public String getDisplayString(String[] children)
+ {
+ assert (children.length == 2);
+ StringBuilder sb = new StringBuilder();
+ sb.append(_getFnName());
+ sb.append("(");
+ sb.append(children[0]);
+ sb.append(", ");
+ sb.append(children[1]);
+ sb.append(")");
+ return sb.toString();
+ }
+
+ protected abstract String _getFnName();
+
+ protected abstract Object getRow(int amt);
+
+ public static class GenericUDFLead extends GenericUDFLeadLag
+ {
+
+ @Override
+ protected String _getFnName()
+ {
+ return "lead";
+ }
+
+ @Override
+ protected Object getRow(int amt)
+ {
+ return pItr.lead(amt - 1);
+ }
+
+ }
+
+ public static class GenericUDFLag extends GenericUDFLeadLag
+ {
+ @Override
+ protected String _getFnName()
+ {
+ return "lag";
+ }
+
+ @Override
+ protected Object getRow(int amt)
+ {
+ return pItr.lag(amt + 1);
+ }
+
+ }
+
+}
+
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/NPath.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/NPath.java?rev=1463556&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/NPath.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/NPath.java Tue Apr 2 14:16:34 2013
@@ -0,0 +1,936 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.udf.ptf;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator;
+import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluatorFactory;
+import org.apache.hadoop.hive.ql.exec.PTFPartition;
+import org.apache.hadoop.hive.ql.exec.PTFPartition.PTFPartitionIterator;
+import org.apache.hadoop.hive.ql.exec.PTFUtils;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.ASTNode;
+import org.apache.hadoop.hive.ql.parse.PTFTranslator;
+import org.apache.hadoop.hive.ql.parse.RowResolver;
+import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.TypeCheckCtx;
+import org.apache.hadoop.hive.ql.parse.TypeCheckProcFactory;
+import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowExpressionSpec;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.PTFDesc;
+import org.apache.hadoop.hive.ql.plan.PTFDesc.PTFExpressionDef;
+import org.apache.hadoop.hive.ql.plan.PTFDesc.PTFInputDef;
+import org.apache.hadoop.hive.ql.plan.PTFDesc.PartitionedTableFunctionDef;
+import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StandardListObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+
+/**
+ * return rows that meet a specified pattern. Use symbols to specify a list of expressions
+ * to match.
+ * Pattern is used to specify a Path. The results list can contain expressions based on
+ * the input columns and also the matched Path.
+ * <ol>
+ * <li><b>pattern:</b> pattern for the Path. Path is 'dot' separated list of symbols.
+ * Each element is treated as a symbol. Elements that end in '*' or '+' are interpreted with
+ * the usual meaning of zero or more, one or more respectively. For e.g.
+ * "LATE.EARLY*.ONTIMEOREARLY" implies a sequence of flights
+ * where the first occurrence was LATE, followed by zero or more EARLY flights,
+ * followed by a ONTIME or EARLY flight.
+ * <li><b>symbols</b> specify a list of name, expression pairs. For e.g.
+ * 'LATE', arrival_delay > 0, 'EARLY', arrival_delay < 0 , 'ONTIME', arrival_delay == 0.
+ * These symbols can be used in the Pattern defined above.
+ * <li><b>resultSelectList</b> specified as a select list.
+ * The expressions in the selectList are evaluated in the context where all the
+ * input columns are available, plus the attribute
+ * "tpath" is available. Path is a collection of rows that represents the matching Path.
+ * </ol>
+ */
+public class NPath extends TableFunctionEvaluator
+{
+ private transient String patternStr;
+ private transient SymbolsInfo symInfo;
+ private transient String resultExprStr;
+ private transient SymbolFunction syFn;
+ private ResultExprInfo resultExprInfo;
+ /*
+ * the names of the Columns of the input to NPath. Used to setup the tpath Struct column.
+ */
+ private ArrayList<String> inputColumnNames;
+ private ArrayList<String> selectListNames;
+
+ @Override
+ public void execute(PTFPartitionIterator<Object> pItr, PTFPartition outP) throws HiveException
+ {
+ while (pItr.hasNext())
+ {
+ Object iRow = pItr.next();
+
+ SymbolFunctionResult syFnRes = SymbolFunction.match(syFn, iRow, pItr);
+ if (syFnRes.matches )
+ {
+ int sz = syFnRes.nextRow - (pItr.getIndex() - 1);
+ Object selectListInput = NPath.getSelectListInput(iRow,
+ tDef.getInput().getOutputShape().getOI(), pItr, sz);
+ ArrayList<Object> oRow = new ArrayList<Object>();
+ for(ExprNodeEvaluator resExprEval : resultExprInfo.resultExprEvals)
+ {
+ oRow.add(resExprEval.evaluate(selectListInput));
+ }
+ outP.append(oRow);
+ }
+ }
+ }
+
+ static void throwErrorWithSignature(String message) throws SemanticException
+ {
+ throw new SemanticException(String.format(
+ "NPath signature is: SymbolPattern, one or more SymbolName, " +
+ "expression pairs, the result expression as a select list. Error %s",
+ message));
+ }
+
+ public ArrayList<String> getInputColumnNames() {
+ return inputColumnNames;
+ }
+
+ public void setInputColumnNames(ArrayList<String> inputColumnNames) {
+ this.inputColumnNames = inputColumnNames;
+ }
+
+ public ArrayList<String> getSelectListNames() {
+ return selectListNames;
+ }
+
+ public void setSelectListNames(ArrayList<String> selectListNames) {
+ this.selectListNames = selectListNames;
+ }
+
+ public static class NPathResolver extends TableFunctionResolver
+ {
+
+ @Override
+ protected TableFunctionEvaluator createEvaluator(PTFDesc ptfDesc,
+ PartitionedTableFunctionDef tDef)
+ {
+
+ return new NPath();
+ }
+
+ /**
+ * <ul>
+ * <li> check structure of Arguments:
+ * <ol>
+ * <li> First arg should be a String
+ * <li> then there should be an even number of Arguments:
+ * String, expression; expression should be Convertible to Boolean.
+ * <li> finally there should be a String.
+ * </ol>
+ * <li> convert pattern into a NNode chain.
+ * <li> convert symbol args into a Symbol Map.
+ * <li> parse selectList into SelectList struct. The inputOI used to translate
+ * these expressions should be based on the
+ * columns in the Input, the 'path.attr'
+ * </ul>
+ */
+ @Override
+ public void setupOutputOI() throws SemanticException
+ {
+ NPath evaluator = (NPath) getEvaluator();
+ PartitionedTableFunctionDef tDef = evaluator.getTableDef();
+
+ ArrayList<PTFExpressionDef> args = tDef.getArgs();
+ int argsNum = args == null ? 0 : args.size();
+
+ if ( argsNum < 4 )
+ {
+ throwErrorWithSignature("at least 4 arguments required");
+ }
+
+ validateAndSetupPatternStr(evaluator, args);
+ validateAndSetupSymbolInfo(evaluator, args, argsNum);
+ validateAndSetupResultExprStr(evaluator, args, argsNum);
+ setupSymbolFunctionChain(evaluator);
+
+ /*
+ * setup OI for input to resultExpr select list
+ */
+ RowResolver selectListInputRR = NPath.createSelectListRR(evaluator, tDef.getInput());
+
+ /*
+ * parse ResultExpr Str and setup OI.
+ */
+ ResultExpressionParser resultExprParser =
+ new ResultExpressionParser(evaluator.resultExprStr, selectListInputRR);
+ try {
+ resultExprParser.translate();
+ }
+ catch(HiveException he) {
+ throw new SemanticException(he);
+ }
+ evaluator.resultExprInfo = resultExprParser.getResultExprInfo();
+ StructObjectInspector OI = evaluator.resultExprInfo.resultOI;
+ evaluator.selectListNames = new ArrayList<String>();
+ extractOIColumnNames(resultExprParser.selectListInputOI, evaluator.selectListNames);
+
+ setOutputOI(OI);
+ }
+ /*
+ * validate and setup patternStr
+ */
+ private void validateAndSetupPatternStr(NPath evaluator,
+ ArrayList<PTFExpressionDef> args) throws SemanticException {
+ PTFExpressionDef symboPatternArg = args.get(0);
+ ObjectInspector symbolPatternArgOI = symboPatternArg.getOI();
+
+ if ( !ObjectInspectorUtils.isConstantObjectInspector(symbolPatternArgOI) ||
+ (symbolPatternArgOI.getCategory() != ObjectInspector.Category.PRIMITIVE) ||
+ ((PrimitiveObjectInspector)symbolPatternArgOI).getPrimitiveCategory() !=
+ PrimitiveObjectInspector.PrimitiveCategory.STRING )
+ {
+ throwErrorWithSignature("Currently the symbol Pattern must be a Constant String.");
+ }
+
+ evaluator.patternStr = ((ConstantObjectInspector)symbolPatternArgOI).
+ getWritableConstantValue().toString();
+ }
+
+ /*
+ * validate and setup SymbolInfo
+ */
+ private void validateAndSetupSymbolInfo(NPath evaluator,
+ ArrayList<PTFExpressionDef> args,
+ int argsNum) throws SemanticException {
+ int symbolArgsSz = argsNum - 2;
+ if ( symbolArgsSz % 2 != 0)
+ {
+ throwErrorWithSignature("Symbol Name, Expression need to be specified in pairs: " +
+ "there are odd number of symbol args");
+ }
+
+ evaluator.symInfo = new SymbolsInfo(symbolArgsSz/2);
+ for(int i=1; i <= symbolArgsSz; i += 2)
+ {
+ PTFExpressionDef symbolNameArg = args.get(i);
+ ObjectInspector symbolNameArgOI = symbolNameArg.getOI();
+
+ if ( !ObjectInspectorUtils.isConstantObjectInspector(symbolNameArgOI) ||
+ (symbolNameArgOI.getCategory() != ObjectInspector.Category.PRIMITIVE) ||
+ ((PrimitiveObjectInspector)symbolNameArgOI).getPrimitiveCategory() !=
+ PrimitiveObjectInspector.PrimitiveCategory.STRING )
+ {
+ throwErrorWithSignature(
+ String.format("Currently a Symbol Name(%s) must be a Constant String",
+ symbolNameArg.getExpressionTreeString()));
+ }
+ String symbolName = ((ConstantObjectInspector)symbolNameArgOI).
+ getWritableConstantValue().toString();
+
+ PTFExpressionDef symolExprArg = args.get(i+1);
+ ObjectInspector symolExprArgOI = symolExprArg.getOI();
+ if ( (symolExprArgOI.getCategory() != ObjectInspector.Category.PRIMITIVE) ||
+ ((PrimitiveObjectInspector)symolExprArgOI).getPrimitiveCategory() !=
+ PrimitiveObjectInspector.PrimitiveCategory.BOOLEAN )
+ {
+ throwErrorWithSignature(String.format("Currently a Symbol Expression(%s) " +
+ "must be a boolean expression", symolExprArg.getExpressionTreeString()));
+ }
+ evaluator.symInfo.add(symbolName, symolExprArg);
+ }
+ }
+
+ /*
+ * validate and setup resultExprStr
+ */
+ private void validateAndSetupResultExprStr(NPath evaluator,
+ ArrayList<PTFExpressionDef> args,
+ int argsNum) throws SemanticException {
+ PTFExpressionDef resultExprArg = args.get(argsNum - 1);
+ ObjectInspector resultExprArgOI = resultExprArg.getOI();
+
+ if ( !ObjectInspectorUtils.isConstantObjectInspector(resultExprArgOI) ||
+ (resultExprArgOI.getCategory() != ObjectInspector.Category.PRIMITIVE) ||
+ ((PrimitiveObjectInspector)resultExprArgOI).getPrimitiveCategory() !=
+ PrimitiveObjectInspector.PrimitiveCategory.STRING )
+ {
+ throwErrorWithSignature("Currently the result Expr parameter must be a Constant String.");
+ }
+
+ evaluator.resultExprStr = ((ConstantObjectInspector)resultExprArgOI).
+ getWritableConstantValue().toString();
+ }
+
+ /*
+ * setup SymbolFunction chain.
+ */
+ private void setupSymbolFunctionChain(NPath evaluator) throws SemanticException {
+ SymbolParser syP = new SymbolParser(evaluator.patternStr,
+ evaluator.symInfo.symbolExprsNames,
+ evaluator.symInfo.symbolExprsEvaluators, evaluator.symInfo.symbolExprsOIs);
+ syP.parse();
+ evaluator.syFn = syP.getSymbolFunction();
+ }
+
+ @Override
+ public boolean transformsRawInput()
+ {
+ return false;
+ }
+
+ @Override
+ public void initializeOutputOI() throws HiveException {
+ try {
+ NPath evaluator = (NPath) getEvaluator();
+ PartitionedTableFunctionDef tDef = evaluator.getTableDef();
+
+ ArrayList<PTFExpressionDef> args = tDef.getArgs();
+ int argsNum = args.size();
+
+ validateAndSetupPatternStr(evaluator, args);
+ validateAndSetupSymbolInfo(evaluator, args, argsNum);
+ validateAndSetupResultExprStr(evaluator, args, argsNum);
+ setupSymbolFunctionChain(evaluator);
+
+ /*
+ * setup OI for input to resultExpr select list
+ */
+ StructObjectInspector selectListInputOI = NPath.createSelectListOI( evaluator,
+ tDef.getInput());
+ ResultExprInfo resultExprInfo = evaluator.resultExprInfo;
+ ArrayList<ObjectInspector> selectListExprOIs = new ArrayList<ObjectInspector>();
+ resultExprInfo.resultExprEvals = new ArrayList<ExprNodeEvaluator>();
+
+ for(int i=0 ; i < resultExprInfo.resultExprNodes.size(); i++) {
+ ExprNodeDesc selectColumnExprNode =resultExprInfo.resultExprNodes.get(i);
+ ExprNodeEvaluator selectColumnExprEval =
+ ExprNodeEvaluatorFactory.get(selectColumnExprNode);
+ ObjectInspector selectColumnOI = selectColumnExprEval.initialize(selectListInputOI);
+ resultExprInfo.resultExprEvals.add(selectColumnExprEval);
+ selectListExprOIs.add(selectColumnOI);
+ }
+
+ resultExprInfo.resultOI = ObjectInspectorFactory.getStandardStructObjectInspector(
+ resultExprInfo.resultExprNames, selectListExprOIs);
+ setOutputOI(resultExprInfo.resultOI);
+ }
+ catch(SemanticException se) {
+ throw new HiveException(se);
+ }
+ }
+
+ @Override
+ public ArrayList<String> getOutputColumnNames() {
+ NPath evaluator = (NPath) getEvaluator();
+ return evaluator.resultExprInfo.getResultExprNames();
+ }
+
+
+
+ private static void extractOIColumnNames(StructObjectInspector OI,
+ ArrayList<String> oiColumnNames) {
+ StructTypeInfo t = (StructTypeInfo) TypeInfoUtils.getTypeInfoFromObjectInspector(OI);
+ ArrayList<String> fnames = t.getAllStructFieldNames();
+ oiColumnNames.addAll(fnames);
+ }
+
+ }
+
+ public ResultExprInfo getResultExprInfo() {
+ return resultExprInfo;
+ }
+
+ public void setResultExprInfo(ResultExprInfo resultExprInfo) {
+ this.resultExprInfo = resultExprInfo;
+ }
+
+ static class SymbolsInfo {
+ int sz;
+ ArrayList<ExprNodeEvaluator> symbolExprsEvaluators;
+ ArrayList<ObjectInspector> symbolExprsOIs;
+ ArrayList<String> symbolExprsNames;
+
+ SymbolsInfo(int sz)
+ {
+ this.sz = sz;
+ symbolExprsEvaluators = new ArrayList<ExprNodeEvaluator>(sz);
+ symbolExprsOIs = new ArrayList<ObjectInspector>(sz);
+ symbolExprsNames = new ArrayList<String>(sz);
+ }
+
+ void add(String name, PTFExpressionDef arg)
+ {
+ symbolExprsNames.add(name);
+ symbolExprsEvaluators.add(arg.getExprEvaluator());
+ symbolExprsOIs.add(arg.getOI());
+ }
+ }
+
+ public static class ResultExprInfo {
+ ArrayList<String> resultExprNames;
+ ArrayList<ExprNodeDesc> resultExprNodes;
+ private transient ArrayList<ExprNodeEvaluator> resultExprEvals;
+ private transient StructObjectInspector resultOI;
+
+ public ArrayList<String> getResultExprNames() {
+ return resultExprNames;
+ }
+ public void setResultExprNames(ArrayList<String> resultExprNames) {
+ this.resultExprNames = resultExprNames;
+ }
+ public ArrayList<ExprNodeDesc> getResultExprNodes() {
+ return resultExprNodes;
+ }
+ public void setResultExprNodes(ArrayList<ExprNodeDesc> resultExprNodes) {
+ this.resultExprNodes = resultExprNodes;
+ }
+ }
+
+ public static abstract class SymbolFunction
+ {
+ SymbolFunctionResult result;
+
+ public SymbolFunction()
+ {
+ result = new SymbolFunctionResult();
+ }
+
+ public static SymbolFunctionResult match(SymbolFunction syFn, Object row,
+ PTFPartitionIterator<Object> pItr) throws HiveException
+ {
+ int resetToIdx = pItr.getIndex() - 1;
+ try
+ {
+ return syFn.match(row, pItr);
+ } finally
+ {
+ pItr.resetToIndex(resetToIdx);
+ }
+ }
+
+ protected abstract SymbolFunctionResult match(Object row, PTFPartitionIterator<Object> pItr)
+ throws HiveException;
+
+ protected abstract boolean isOptional();
+ }
+
+ public static class Symbol extends SymbolFunction {
+ ExprNodeEvaluator symbolExprEval;
+ Converter converter;
+
+ public Symbol(ExprNodeEvaluator symbolExprEval, ObjectInspector symbolOI)
+ {
+ this.symbolExprEval = symbolExprEval;
+ converter = ObjectInspectorConverters.getConverter(
+ symbolOI,
+ PrimitiveObjectInspectorFactory.javaBooleanObjectInspector);
+ }
+
+ @Override
+ protected SymbolFunctionResult match(Object row, PTFPartitionIterator<Object> pItr)
+ throws HiveException
+ {
+ Object val = null;
+ val = symbolExprEval.evaluate(row);
+ val = converter.convert(val);
+ result.matches = ((Boolean) val).booleanValue();
+ result.nextRow = pItr.getIndex();
+
+ return result;
+ }
+
+ @Override
+ protected boolean isOptional()
+ {
+ return false;
+ }
+ }
+
+ public static class Star extends SymbolFunction {
+ SymbolFunction symbolFn;
+
+ public Star(SymbolFunction symbolFn)
+ {
+ this.symbolFn = symbolFn;
+ }
+
+ @Override
+ protected SymbolFunctionResult match(Object row, PTFPartitionIterator<Object> pItr)
+ throws HiveException
+ {
+ result.matches = true;
+ SymbolFunctionResult rowResult = symbolFn.match(row, pItr);
+
+ while (rowResult.matches && pItr.hasNext())
+ {
+ row = pItr.next();
+ rowResult = symbolFn.match(row, pItr);
+ }
+
+ result.nextRow = pItr.getIndex() - 1;
+ return result;
+ }
+
+ @Override
+ protected boolean isOptional()
+ {
+ return true;
+ }
+ }
+
+ public static class Plus extends SymbolFunction {
+ SymbolFunction symbolFn;
+
+ public Plus(SymbolFunction symbolFn)
+ {
+ this.symbolFn = symbolFn;
+ }
+
+ @Override
+ protected SymbolFunctionResult match(Object row, PTFPartitionIterator<Object> pItr)
+ throws HiveException
+ {
+ SymbolFunctionResult rowResult = symbolFn.match(row, pItr);
+
+ if (!rowResult.matches)
+ {
+ result.matches = false;
+ result.nextRow = pItr.getIndex() - 1;
+ return result;
+ }
+
+ result.matches = true;
+ while (rowResult.matches && pItr.hasNext())
+ {
+ row = pItr.next();
+ rowResult = symbolFn.match(row, pItr);
+ }
+
+ result.nextRow = pItr.getIndex() - 1;
+ return result;
+ }
+
+ @Override
+ protected boolean isOptional()
+ {
+ return false;
+ }
+ }
+
+ public static class Chain extends SymbolFunction
+ {
+ ArrayList<SymbolFunction> components;
+
+ public Chain(ArrayList<SymbolFunction> components)
+ {
+ this.components = components;
+ }
+
+ /*
+ * Iterate over the Symbol Functions in the Chain:
+ * - If we are not at the end of the Iterator (i.e. row != null )
+ * - match the current componentFn
+ * - if it returns false, then return false
+ * - otherwise set row to the next row from the Iterator.
+ * - if we are at the end of the Iterator
+ * - skip any optional Symbol Fns (star patterns) at the end.
+ * - but if we come to a non optional Symbol Fn, return false.
+ * - if we match all Fns in the chain return true.
+ */
+ @Override
+ protected SymbolFunctionResult match(Object row, PTFPartitionIterator<Object> pItr)
+ throws HiveException
+ {
+ SymbolFunctionResult componentResult = null;
+ for (SymbolFunction sFn : components)
+ {
+ if (row != null)
+ {
+ componentResult = sFn.match(row, pItr);
+ if (!componentResult.matches)
+ {
+ result.matches = false;
+ result.nextRow = componentResult.nextRow;
+ return result;
+ }
+ row = pItr.resetToIndex(componentResult.nextRow);
+ }
+ else
+ {
+ if (!sFn.isOptional())
+ {
+ result.matches = false;
+ result.nextRow = componentResult.nextRow;
+ return result;
+ }
+ }
+ }
+
+ result.matches = true;
+ result.nextRow = componentResult.nextRow;
+ return result;
+ }
+
+ @Override
+ protected boolean isOptional()
+ {
+ return false;
+ }
+ }
+
+
+ public static class SymbolFunctionResult
+ {
+ /*
+ * does the row match the pattern represented by this SymbolFunction
+ */
+ public boolean matches;
+ /*
+ * what is the index of the row beyond the set of rows that match this pattern.
+ */
+ public int nextRow;
+ }
+
+ public static class SymbolParser
+ {
+ String patternStr;
+ String[] symbols;
+ HashMap<String, Object[]> symbolExprEvalMap;
+ ArrayList<SymbolFunction> symbolFunctions;
+ Chain symbolFnChain;
+
+
+ public SymbolParser(String patternStr, ArrayList<String> symbolNames,
+ ArrayList<ExprNodeEvaluator> symbolExprEvals, ArrayList<ObjectInspector> symbolExprOIs)
+ {
+ super();
+ this.patternStr = patternStr;
+ symbolExprEvalMap = new HashMap<String, Object[]>();
+ int sz = symbolNames.size();
+ for(int i=0; i < sz; i++)
+ {
+ String symbolName = symbolNames.get(i);
+ ExprNodeEvaluator symbolExprEval = symbolExprEvals.get(i);
+ ObjectInspector symbolExprOI = symbolExprOIs.get(i);
+ symbolExprEvalMap.put(symbolName.toLowerCase(),
+ new Object[] {symbolExprEval, symbolExprOI});
+ }
+ }
+
+ public SymbolFunction getSymbolFunction()
+ {
+ return symbolFnChain;
+ }
+
+ public void parse() throws SemanticException
+ {
+ symbols = patternStr.split("\\.");
+ symbolFunctions = new ArrayList<SymbolFunction>();
+
+ for(String symbol : symbols)
+ {
+ boolean isStar = symbol.endsWith("*");
+ boolean isPlus = symbol.endsWith("+");
+
+ symbol = (isStar || isPlus) ? symbol.substring(0, symbol.length() - 1) : symbol;
+ Object[] symbolDetails = symbolExprEvalMap.get(symbol.toLowerCase());
+ if ( symbolDetails == null )
+ {
+ throw new SemanticException(String.format("Unknown Symbol %s", symbol));
+ }
+
+ ExprNodeEvaluator symbolExprEval = (ExprNodeEvaluator) symbolDetails[0];
+ ObjectInspector symbolExprOI = (ObjectInspector) symbolDetails[1];
+ SymbolFunction sFn = new Symbol(symbolExprEval, symbolExprOI);
+
+ if ( isStar )
+ {
+ sFn = new Star(sFn);
+ }
+ else if ( isPlus )
+ {
+ sFn = new Plus(sFn);
+ }
+ symbolFunctions.add(sFn);
+ }
+ symbolFnChain = new Chain(symbolFunctions);
+ }
+ }
+
+ /*
+ * ResultExpression is a Select List with the following variation:
+ * - the select keyword is optional. The parser checks if the expression doesn't start with
+ * select; if not it prefixes it.
+ * - Window Fn clauses are not permitted.
+ * - expressions can operate on the input columns plus the psuedo column 'path'
+ * which is array of
+ * structs. The shape of the struct is
+ * the same as the input.
+ */
+ public static class ResultExpressionParser {
+ String resultExprString;
+
+ RowResolver selectListInputRowResolver;
+ TypeCheckCtx selectListInputTypeCheckCtx;
+ StructObjectInspector selectListInputOI;
+
+ ArrayList<WindowExpressionSpec> selectSpec;
+
+ ResultExprInfo resultExprInfo;
+
+ public ResultExpressionParser(String resultExprString,
+ RowResolver selectListInputRowResolver)
+ {
+ this.resultExprString = resultExprString;
+ this.selectListInputRowResolver = selectListInputRowResolver;
+ }
+
+ public void translate() throws SemanticException, HiveException
+ {
+ setupSelectListInputInfo();
+ fixResultExprString();
+ parse();
+ validateSelectExpr();
+ buildSelectListEvaluators();
+ }
+
+ public ResultExprInfo getResultExprInfo() {
+ return resultExprInfo;
+ }
+
+ private void buildSelectListEvaluators() throws SemanticException, HiveException
+ {
+ resultExprInfo = new ResultExprInfo();
+ resultExprInfo.resultExprEvals = new ArrayList<ExprNodeEvaluator>();
+ resultExprInfo.resultExprNames = new ArrayList<String>();
+ resultExprInfo.resultExprNodes = new ArrayList<ExprNodeDesc>();
+ //result
+ ArrayList<ObjectInspector> selectListExprOIs = new ArrayList<ObjectInspector>();
+ int i = 0;
+ for(WindowExpressionSpec expr : selectSpec)
+ {
+ String selectColName = expr.getAlias();
+ ASTNode selectColumnNode = expr.getExpression();
+ ExprNodeDesc selectColumnExprNode =
+ ResultExpressionParser.buildExprNode(selectColumnNode,
+ selectListInputTypeCheckCtx);
+ ExprNodeEvaluator selectColumnExprEval =
+ ExprNodeEvaluatorFactory.get(selectColumnExprNode);
+ ObjectInspector selectColumnOI = null;
+ selectColumnOI = selectColumnExprEval.initialize(selectListInputOI);
+
+ selectColName = getColumnName(selectColName, selectColumnExprNode, i);
+
+ resultExprInfo.resultExprEvals.add(selectColumnExprEval);
+ selectListExprOIs.add(selectColumnOI);
+ resultExprInfo.resultExprNodes.add(selectColumnExprNode);
+ resultExprInfo.resultExprNames.add(selectColName);
+ i++;
+ }
+
+ resultExprInfo.resultOI = ObjectInspectorFactory.getStandardStructObjectInspector(
+ resultExprInfo.resultExprNames, selectListExprOIs);
+ }
+
+ private void setupSelectListInputInfo() throws SemanticException
+ {
+ selectListInputTypeCheckCtx = new TypeCheckCtx(selectListInputRowResolver);
+ selectListInputTypeCheckCtx.setUnparseTranslator(null);
+ /*
+ * create SelectListOI
+ */
+ selectListInputOI = (StructObjectInspector)
+ PTFTranslator.getStandardStructOI(selectListInputRowResolver);
+ }
+
+ private void fixResultExprString()
+ {
+ String r = resultExprString.trim();
+ String prefix = r.substring(0, 6);
+ if (!prefix.toLowerCase().equals("select"))
+ {
+ r = "select " + r;
+ }
+ resultExprString = r;
+ }
+
+ private void parse() throws SemanticException
+ {
+ selectSpec = SemanticAnalyzer.parseSelect(resultExprString);
+ }
+
+ private void validateSelectExpr() throws SemanticException
+ {
+ for (WindowExpressionSpec expr : selectSpec)
+ {
+ PTFTranslator.validateNoLeadLagInValueBoundarySpec(expr.getExpression());
+ }
+ }
+
+ private String getColumnName(String alias, ExprNodeDesc exprNode, int colIdx)
+ {
+ if (alias != null)
+ {
+ return alias;
+ }
+ else if (exprNode instanceof ExprNodeColumnDesc)
+ {
+ ExprNodeColumnDesc colDesc = (ExprNodeColumnDesc) exprNode;
+ return colDesc.getColumn();
+ }
+ return "npath_col_" + colIdx;
+ }
+
+ public static ExprNodeDesc buildExprNode(ASTNode expr,
+ TypeCheckCtx typeCheckCtx) throws SemanticException
+ {
+ // todo: use SemanticAnalyzer::genExprNodeDesc
+ // currently SA not available to PTFTranslator.
+ Map<ASTNode, ExprNodeDesc> map = TypeCheckProcFactory
+ .genExprNode(expr, typeCheckCtx);
+ ExprNodeDesc desc = map.get(expr);
+ if (desc == null) {
+ String errMsg = typeCheckCtx.getError();
+ if ( errMsg == null) {
+ errMsg = "Error in parsing ";
+ }
+ throw new SemanticException(errMsg);
+ }
+ return desc;
+ }
+ }
+
+ public static final String PATHATTR_NAME = "tpath";
+
+ /*
+ * add array<struct> to the list of columns
+ */
+ protected static RowResolver createSelectListRR(NPath evaluator,
+ PTFInputDef inpDef) throws SemanticException {
+ RowResolver rr = new RowResolver();
+ RowResolver inputRR = inpDef.getOutputShape().getRr();
+ boolean inputColNamesKnown = evaluator.inputColumnNames != null;
+
+ if ( !inputColNamesKnown ) {
+ evaluator.inputColumnNames = new ArrayList<String>();
+ }
+
+ ArrayList<ObjectInspector> inpColOIs = new ArrayList<ObjectInspector>();
+
+ for (ColumnInfo inpCInfo : inputRR.getColumnInfos()) {
+ ColumnInfo cInfo = new ColumnInfo(inpCInfo);
+ String colAlias = cInfo.getAlias();
+
+ String[] tabColAlias = inputRR.reverseLookup(inpCInfo.getInternalName());
+ if (tabColAlias != null) {
+ colAlias = tabColAlias[1];
+ }
+ ASTNode inExpr = null;
+ inExpr = PTFTranslator.getASTNode(inpCInfo, inputRR);
+ if ( inExpr != null ) {
+ rr.putExpression(inExpr, cInfo);
+ }
+ else {
+ colAlias = colAlias == null ? cInfo.getInternalName() : colAlias;
+ rr.put(cInfo.getTabAlias(), colAlias, cInfo);
+ }
+
+ if ( !inputColNamesKnown ) {
+ evaluator.inputColumnNames.add(colAlias);
+ }
+ inpColOIs.add(cInfo.getObjectInspector());
+ }
+
+ StandardListObjectInspector pathAttrOI =
+ ObjectInspectorFactory.getStandardListObjectInspector(
+ ObjectInspectorFactory.getStandardStructObjectInspector(evaluator.inputColumnNames,
+ inpColOIs));
+
+ ColumnInfo pathColumn = new ColumnInfo(PATHATTR_NAME,
+ TypeInfoUtils.getTypeInfoFromObjectInspector(pathAttrOI),
+ null,
+ false, false);
+ rr.put(null, PATHATTR_NAME, pathColumn);
+
+ return rr;
+ }
+
+ protected static StructObjectInspector createSelectListOI(NPath evaluator, PTFInputDef inpDef) {
+ StructObjectInspector inOI = inpDef.getOutputShape().getOI();
+ ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
+ for(StructField f : inOI.getAllStructFieldRefs()) {
+ fieldOIs.add(f.getFieldObjectInspector());
+ }
+
+ StandardListObjectInspector pathAttrOI =
+ ObjectInspectorFactory.getStandardListObjectInspector(
+ ObjectInspectorFactory.getStandardStructObjectInspector(evaluator.inputColumnNames,
+ fieldOIs));
+
+ ArrayList<ObjectInspector> selectFieldOIs = new ArrayList<ObjectInspector>();
+ selectFieldOIs.addAll(fieldOIs);
+ selectFieldOIs.add(pathAttrOI);
+ return ObjectInspectorFactory.getStandardStructObjectInspector(
+ evaluator.selectListNames, selectFieldOIs);
+ }
+
+ public static Object getSelectListInput(Object currRow, ObjectInspector rowOI,
+ PTFPartitionIterator<Object> pItr, int sz) {
+ ArrayList<Object> oRow = new ArrayList<Object>();
+ List<?> currRowAsStdObject = (List<?>) ObjectInspectorUtils
+ .copyToStandardObject(currRow, rowOI);
+ oRow.addAll(currRowAsStdObject);
+ oRow.add(getPath(currRow, rowOI, pItr, sz));
+ return oRow;
+ }
+
+ public static ArrayList<Object> getPath(Object currRow, ObjectInspector rowOI,
+ PTFPartitionIterator<Object> pItr, int sz) {
+ int idx = pItr.getIndex() - 1;
+ ArrayList<Object> path = new ArrayList<Object>();
+ path.add(ObjectInspectorUtils.copyToStandardObject(currRow, rowOI));
+ int pSz = 1;
+
+ while (pSz < sz && pItr.hasNext())
+ {
+ currRow = pItr.next();
+ path.add(ObjectInspectorUtils.copyToStandardObject(currRow, rowOI));
+ pSz++;
+ }
+ pItr.resetToIndex(idx);
+ return path;
+ }
+}
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/Noop.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/Noop.java?rev=1463556&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/Noop.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/Noop.java Tue Apr 2 14:16:34 2013
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.udf.ptf;
+
+import java.util.ArrayList;
+
+import org.apache.hadoop.hive.ql.exec.PTFPartition;
+import org.apache.hadoop.hive.ql.exec.PTFPartition.PTFPartitionIterator;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.PTFDesc;
+import org.apache.hadoop.hive.ql.plan.PTFDesc.PartitionedTableFunctionDef;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+
+public class Noop extends TableFunctionEvaluator
+{
+
+ @Override
+ public PTFPartition execute(PTFPartition iPart) throws HiveException
+ {
+ return iPart;
+ }
+
+ @Override
+ protected void execute(PTFPartitionIterator<Object> pItr, PTFPartition oPart)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public static class NoopResolver extends TableFunctionResolver
+ {
+
+ @Override
+ protected TableFunctionEvaluator createEvaluator(PTFDesc ptfDesc, PartitionedTableFunctionDef tDef)
+ {
+ return new Noop();
+ }
+
+ @Override
+ public void setupOutputOI() throws SemanticException
+ {
+ StructObjectInspector OI = getEvaluator().getTableDef().getInput().getOutputShape().getOI();
+ setOutputOI(OI);
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.hadoop.hive.ql.udf.ptf.TableFunctionResolver#carryForwardNames()
+ * Setting to true is correct only for special internal Functions.
+ */
+ @Override
+ public boolean carryForwardNames() {
+ return true;
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.hadoop.hive.ql.udf.ptf.TableFunctionResolver#getOutputNames()
+ * Set to null only because carryForwardNames is true.
+ */
+ @Override
+ public ArrayList<String> getOutputColumnNames() {
+ return null;
+ }
+
+ @Override
+ public boolean transformsRawInput()
+ {
+ return false;
+ }
+
+ @Override
+ public void initializeOutputOI() throws HiveException {
+ setupOutputOI();
+
+ }
+
+ }
+
+}
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/NoopWithMap.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/NoopWithMap.java?rev=1463556&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/NoopWithMap.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/NoopWithMap.java Tue Apr 2 14:16:34 2013
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.udf.ptf;
+
+import java.util.ArrayList;
+
+import org.apache.hadoop.hive.ql.exec.PTFPartition;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.PTFDesc;
+import org.apache.hadoop.hive.ql.plan.PTFDesc.PartitionedTableFunctionDef;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+
+public class NoopWithMap extends Noop
+{
+ @Override
+ public PTFPartition execute(PTFPartition iPart) throws HiveException
+ {
+ return iPart;
+ }
+
+ @Override
+ protected PTFPartition _transformRawInput(PTFPartition iPart) throws HiveException
+ {
+ return iPart;
+ }
+
+ public static class NoopWithMapResolver extends TableFunctionResolver
+ {
+
+ @Override
+ protected TableFunctionEvaluator createEvaluator(PTFDesc ptfDesc, PartitionedTableFunctionDef tDef)
+ {
+ return new NoopWithMap();
+ }
+
+ @Override
+ public void setupOutputOI() throws SemanticException
+ {
+ StructObjectInspector OI = getEvaluator().getTableDef().getInput().getOutputShape().getOI();
+ setOutputOI(OI);
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.hadoop.hive.ql.udf.ptf.TableFunctionResolver#carryForwardNames()
+ * Setting to true is correct only for special internal Functions.
+ */
+ @Override
+ public boolean carryForwardNames() {
+ return true;
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.hadoop.hive.ql.udf.ptf.TableFunctionResolver#getOutputNames()
+ * Set to null only because carryForwardNames is true.
+ */
+ @Override
+ public ArrayList<String> getOutputColumnNames() {
+ return null;
+ }
+
+ @Override
+ public void setupRawInputOI() throws SemanticException
+ {
+ StructObjectInspector OI = getEvaluator().getTableDef().getInput().getOutputShape().getOI();
+ setRawInputOI(OI);
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.hadoop.hive.ql.udf.ptf.TableFunctionResolver#getOutputNames()
+ * Set to null only because carryForwardNames is true.
+ */
+ @Override
+ public ArrayList<String> getRawInputColumnNames() throws SemanticException {
+ return null;
+ }
+
+ @Override
+ public boolean transformsRawInput()
+ {
+ return true;
+ }
+
+ @Override
+ public void initializeOutputOI() throws HiveException {
+ setupOutputOI();
+ }
+
+ @Override
+ public void initializeRawInputOI() throws HiveException {
+ setupRawInputOI();
+ }
+
+ }
+
+
+}
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/TableFunctionEvaluator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/TableFunctionEvaluator.java?rev=1463556&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/TableFunctionEvaluator.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/TableFunctionEvaluator.java Tue Apr 2 14:16:34 2013
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.udf.ptf;
+
+import org.apache.hadoop.hive.ql.exec.PTFOperator;
+import org.apache.hadoop.hive.ql.exec.PTFPartition;
+import org.apache.hadoop.hive.ql.exec.PTFPartition.PTFPartitionIterator;
+import org.apache.hadoop.hive.ql.exec.PTFUtils;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.PTFDesc;
+import org.apache.hadoop.hive.ql.plan.PTFDesc.PartitionedTableFunctionDef;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+
+/**
+ * Based on Hive {@link GenericUDAFEvaluator}. Break up the responsibility of the old AsbtractTableFunction
+ * class into a Resolver and Evaluator.
+ * <p>
+ * The Evaluator also holds onto the {@link TableFunctionDef}. This provides information
+ * about the arguments to the function, the shape of the Input partition and the Partitioning details.
+ * The Evaluator is responsible for providing the 2 execute methods:
+ * <ol>
+ * <li><b>execute:</b> which is invoked after the input is partitioned; the contract
+ * is, it is given an input Partition and must return an output Partition. The shape of the output
+ * Partition is obtained from the getOutputOI call.
+ * <li><b>transformRawInput:</b> In the case where this function indicates that it will transform the raw input
+ * before it is fed through the partitioning mechanics, this function is called. Again the contract is
+ * t is given an input Partition and must return an Partition. The shape of the output Partition is
+ * obtained from getRawInputOI() call.
+ * </ol>
+ *
+ */
+public abstract class TableFunctionEvaluator
+{
+ /*
+ * how is this different from the OutpuShape set on the TableDef.
+ * This is the OI of the object coming out of the PTF.
+ * It is put in an output Partition whose Serde is usually LazyBinarySerde.
+ * So the next PTF (or Operator) in the chain gets a LazyBinaryStruct.
+ */
+ transient protected StructObjectInspector OI;
+ /*
+ * same comment as OI applies here.
+ */
+ transient protected StructObjectInspector rawInputOI;
+ protected PartitionedTableFunctionDef tDef;
+ protected PTFDesc ptfDesc;
+ String partitionClass;
+ int partitionMemSize;
+ boolean transformsRawInput;
+ transient protected PTFPartition outputPartition;
+
+ static{
+ PTFUtils.makeTransient(TableFunctionEvaluator.class, "OI");
+ PTFUtils.makeTransient(TableFunctionEvaluator.class, "rawInputOI");
+ PTFUtils.makeTransient(TableFunctionEvaluator.class, "outputPartition");
+ }
+
+
+ public StructObjectInspector getOutputOI()
+ {
+ return OI;
+ }
+
+ protected void setOutputOI(StructObjectInspector outputOI)
+ {
+ OI = outputOI;
+ }
+
+ public PartitionedTableFunctionDef getTableDef()
+ {
+ return tDef;
+ }
+
+ public void setTableDef(PartitionedTableFunctionDef tDef)
+ {
+ this.tDef = tDef;
+ }
+
+ protected PTFDesc getQueryDef()
+ {
+ return ptfDesc;
+ }
+
+ protected void setQueryDef(PTFDesc ptfDesc)
+ {
+ this.ptfDesc = ptfDesc;
+ }
+
+ public String getPartitionClass()
+ {
+ return partitionClass;
+ }
+
+ public void setPartitionClass(String partitionClass)
+ {
+ this.partitionClass = partitionClass;
+ }
+
+ public int getPartitionMemSize()
+ {
+ return partitionMemSize;
+ }
+
+ public void setPartitionMemSize(int partitionMemSize)
+ {
+ this.partitionMemSize = partitionMemSize;
+ }
+
+ public StructObjectInspector getRawInputOI()
+ {
+ return rawInputOI;
+ }
+
+ protected void setRawInputOI(StructObjectInspector rawInputOI)
+ {
+ this.rawInputOI = rawInputOI;
+ }
+
+ public boolean isTransformsRawInput() {
+ return transformsRawInput;
+ }
+
+ public void setTransformsRawInput(boolean transformsRawInput) {
+ this.transformsRawInput = transformsRawInput;
+ }
+
+ public PTFPartition execute(PTFPartition iPart)
+ throws HiveException
+ {
+ PTFPartitionIterator<Object> pItr = iPart.iterator();
+ PTFOperator.connectLeadLagFunctionsToPartition(ptfDesc, pItr);
+
+ if ( outputPartition == null ) {
+ outputPartition = new PTFPartition(getPartitionClass(),
+ getPartitionMemSize(), tDef.getOutputShape().getSerde(), OI);
+ }
+ else {
+ outputPartition.reset();
+ }
+
+ execute(pItr, outputPartition);
+ return outputPartition;
+ }
+
+ protected abstract void execute(PTFPartitionIterator<Object> pItr, PTFPartition oPart) throws HiveException;
+
+ public PTFPartition transformRawInput(PTFPartition iPart) throws HiveException
+ {
+ if ( !isTransformsRawInput())
+ {
+ throw new HiveException(String.format("Internal Error: mapExecute called on function (%s)that has no Map Phase", tDef.getName()));
+ }
+ return _transformRawInput(iPart);
+ }
+
+ protected PTFPartition _transformRawInput(PTFPartition iPart) throws HiveException
+ {
+ return null;
+ }
+}
+
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/TableFunctionResolver.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/TableFunctionResolver.java?rev=1463556&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/TableFunctionResolver.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/TableFunctionResolver.java Tue Apr 2 14:16:34 2013
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.udf.ptf;
+
+import java.util.ArrayList;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator;
+import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.PTFDesc;
+import org.apache.hadoop.hive.ql.plan.PTFDesc.PartitionedTableFunctionDef;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+
+/**
+ * Based on Hive {@link GenericUDAFResolver}. Break up the responsibility of the
+ * old AbstractTableFunction class into a Resolver and Evaluator.
+ * The Resolver is responsible for:
+ * <ol>
+ * <li> setting up the {@link tableFunctionEvaluator}
+ * <li> Setting up the The raw and output ObjectInspectors of the Evaluator.
+ * <li> The Evaluator also holds onto the {@link TableFunctionDef}. This provides information
+ * about the arguments to the function, the shape of the Input partition and the Partitioning details.
+ * </ol>
+ * The Resolver for a function is obtained from the {@link FunctionRegistry}. The Resolver is initialized
+ * by the following 4 step process:
+ * <ol>
+ * <li> The initialize method is called; which is passed the {@link PTFDesc} and the {@link TableFunctionDef}.
+ * <li> The resolver is then asked to setup the Raw ObjectInspector. This is only required if the Function reshapes
+ * the raw input.
+ * <li> Once the Resolver has had a chance to compute the shape of the Raw Input that is fed to the partitioning
+ * machinery; the translator sets up the partitioning details on the tableFuncDef.
+ * <li> finally the resolver is asked to setup the output ObjectInspector.
+ * </ol>
+ */
+@SuppressWarnings("deprecation")
+public abstract class TableFunctionResolver
+{
+ TableFunctionEvaluator evaluator;
+ PTFDesc ptfDesc;
+
+ /*
+ * - called during translation.
+ * - invokes createEvaluator which must be implemented by a subclass
+ * - sets up the evaluator with references to the TableDef, PartitionClass, PartitonMemsize and
+ * the transformsRawInput boolean.
+ */
+ public void initialize(HiveConf cfg, PTFDesc ptfDesc, PartitionedTableFunctionDef tDef)
+ throws SemanticException
+ {
+ this.ptfDesc = ptfDesc;
+ String partitionClass = HiveConf.getVar(cfg, ConfVars.HIVE_PTF_PARTITION_PERSISTENCE_CLASS);
+ int partitionMemSize = HiveConf.getIntVar(cfg, ConfVars.HIVE_PTF_PARTITION_PERSISTENT_SIZE);
+
+ evaluator = createEvaluator(ptfDesc, tDef);
+ evaluator.setTransformsRawInput(transformsRawInput());
+ evaluator.setTableDef(tDef);
+ evaluator.setQueryDef(ptfDesc);
+ evaluator.setPartitionClass(partitionClass);
+ evaluator.setPartitionMemSize(partitionMemSize);
+
+ }
+
+ /*
+ * called during deserialization of a QueryDef during runtime.
+ */
+ public void initialize(PTFDesc ptfDesc, PartitionedTableFunctionDef tDef, TableFunctionEvaluator evaluator)
+ throws HiveException
+ {
+ this.evaluator = evaluator;
+ this.ptfDesc = ptfDesc;
+ evaluator.setTableDef(tDef);
+ evaluator.setQueryDef(ptfDesc);
+ }
+
+ public TableFunctionEvaluator getEvaluator()
+ {
+ return evaluator;
+ }
+
+ /*
+ * - a subclass must provide this method.
+ * - this method is invoked during translation and also when the Operator is initialized during runtime.
+ * - a subclass must use this call to setup the shape of its output.
+ * - subsequent to this call, a call to getOutputOI call on the {@link TableFunctionEvaluator} must return the OI
+ * of the output of this function.
+ */
+ public abstract void setupOutputOI() throws SemanticException;
+
+ /*
+ * A PTF Function must provide the 'external' names of the columns in its Output.
+ *
+ */
+ public abstract ArrayList<String> getOutputColumnNames() throws SemanticException;
+
+
+ /**
+ * This method is invoked during runtime(during deserialization of theQueryDef).
+ * At this point the TableFunction can assume that the {@link ExprNodeDesc Expression Nodes}
+ * exist for all the Def (ArgDef, ColumnDef, WindowDef..). It is the responsibility of
+ * the TableFunction to construct the {@link ExprNodeEvaluator evaluators} and setup the OI.
+ *
+ * @param tblFuncDef
+ * @param ptfDesc
+ * @throws HiveException
+ */
+ public abstract void initializeOutputOI() throws HiveException;
+
+ /*
+ * - Called on functions that transform the raw input.
+ * - this method is invoked during translation and also when the Operator is initialized during runtime.
+ * - a subclass must use this call to setup the shape of the raw input, that is fed to the partitioning mechanics.
+ * - subsequent to this call, a call to getRawInputOI call on the {@link TableFunctionEvaluator} must return the OI
+ * of the output of this function.
+ */
+ public void setupRawInputOI() throws SemanticException
+ {
+ if (!transformsRawInput())
+ {
+ return;
+ }
+ throw new SemanticException(
+ "Function has map phase, must extend setupMapOI");
+ }
+
+ /*
+ * A PTF Function must provide the 'external' names of the columns in the transformed Raw Input.
+ *
+ */
+ public ArrayList<String> getRawInputColumnNames() throws SemanticException {
+ if (!transformsRawInput())
+ {
+ return null;
+ }
+ throw new SemanticException(
+ "Function transforms Raw Input; must extend getRawColumnInputNames");
+ }
+
+ /*
+ * Same responsibility as initializeOI, but for the RawInput.
+ */
+ public void initializeRawInputOI() throws HiveException
+ {
+ if (!transformsRawInput())
+ {
+ return;
+ }
+ throw new HiveException(
+ "Function has map phase, must extend initializeRawInputOI");
+ }
+
+ /*
+ * callback method used by subclasses to set the RawInputOI on the Evaluator.
+ */
+ protected void setRawInputOI(StructObjectInspector rawInputOI)
+ {
+ evaluator.setRawInputOI(rawInputOI);
+ }
+
+ /*
+ * callback method used by subclasses to set the OutputOI on the Evaluator.
+ */
+ protected void setOutputOI(StructObjectInspector outputOI)
+ {
+ evaluator.setOutputOI(outputOI);
+ }
+
+ public PTFDesc getPtfDesc()
+ {
+ return ptfDesc;
+ }
+
+ /*
+ * This is used during translation to decide if the internalName -> alias mapping from the Input to the PTF is carried
+ * forward when building the Output RR for this PTF.
+ * This is used by internal PTFs: NOOP, WindowingTableFunction to make names in its input available in the Output.
+ * In general this should be false; and the names used for the Output Columns must be provided by the PTF Writer in the
+ * function getOutputNames.
+ */
+ public boolean carryForwardNames() {
+ return false;
+ }
+
+ /*
+ * a subclass must indicate whether it will transform the raw input before it is fed through the
+ * partitioning mechanics.
+ */
+ public abstract boolean transformsRawInput();
+
+ /*
+ * a subclass must provide the {@link TableFunctionEvaluator} instance.
+ */
+ protected abstract TableFunctionEvaluator createEvaluator(PTFDesc ptfDesc, PartitionedTableFunctionDef tDef);
+}