You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ra...@apache.org on 2015/04/27 10:33:50 UTC

[6/7] phoenix git commit: PHOENIX-538 Support UDFs(Rajeshbabu Chintaguntla)

http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
index af6c712..78f54e8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
@@ -63,6 +63,7 @@ import org.apache.phoenix.parse.TableName;
 import org.apache.phoenix.parse.TableNode;
 import org.apache.phoenix.parse.TableNodeVisitor;
 import org.apache.phoenix.parse.TableWildcardParseNode;
+import org.apache.phoenix.parse.UDFParseNode;
 import org.apache.phoenix.parse.WildcardParseNode;
 import org.apache.phoenix.schema.ColumnNotFoundException;
 import org.apache.phoenix.schema.ColumnRef;
@@ -688,7 +689,7 @@ public class JoinCompiler {
             if (isSubselect())
                 return SubselectRewriter.applyOrderBy(SubselectRewriter.applyPostFilters(subselect, preFilters, tableNode.getAlias()), orderBy, tableNode.getAlias());
 
-            return NODE_FACTORY.select(tableNode, select.getHint(), false, selectNodes, getPreFiltersCombined(), null, null, orderBy, null, 0, false, select.hasSequence(), Collections.<SelectStatement>emptyList());
+            return NODE_FACTORY.select(tableNode, select.getHint(), false, selectNodes, getPreFiltersCombined(), null, null, orderBy, null, 0, false, select.hasSequence(), Collections.<SelectStatement>emptyList(), select.getUdfParseNodes());
         }
 
         public boolean hasFilters() {
@@ -1177,7 +1178,7 @@ public class JoinCompiler {
             TableRef tableRef = table.getTableRef();
             List<ParseNode> groupBy = tableRef.equals(groupByTableRef) ? select.getGroupBy() : null;
             List<OrderByNode> orderBy = tableRef.equals(orderByTableRef) ? select.getOrderBy() : null;
-            SelectStatement stmt = getSubqueryForOptimizedPlan(select.getHint(), table.getDynamicColumns(), tableRef, join.getColumnRefs(), table.getPreFiltersCombined(), groupBy, orderBy, table.isWildCardSelect(), select.hasSequence());
+            SelectStatement stmt = getSubqueryForOptimizedPlan(select.getHint(), table.getDynamicColumns(), tableRef, join.getColumnRefs(), table.getPreFiltersCombined(), groupBy, orderBy, table.isWildCardSelect(), select.hasSequence(), select.getUdfParseNodes());
             QueryPlan plan = statement.getConnection().getQueryServices().getOptimizer().optimize(statement, stmt);
             if (!plan.getTableRef().equals(tableRef)) {
                 replacement.put(tableRef, plan.getTableRef());
@@ -1247,7 +1248,7 @@ public class JoinCompiler {
     }
 
     private static SelectStatement getSubqueryForOptimizedPlan(HintNode hintNode, List<ColumnDef> dynamicCols, TableRef tableRef, Map<ColumnRef, ColumnRefType> columnRefs, ParseNode where, List<ParseNode> groupBy,
-            List<OrderByNode> orderBy, boolean isWildCardSelect, boolean hasSequence) {
+            List<OrderByNode> orderBy, boolean isWildCardSelect, boolean hasSequence, Map<String, UDFParseNode> udfParseNodes) {
         String schemaName = tableRef.getTable().getSchemaName().getString();
         TableName tName = TableName.create(schemaName.length() == 0 ? null : schemaName, tableRef.getTable().getTableName().getString());
         List<AliasedNode> selectList = new ArrayList<AliasedNode>();
@@ -1267,7 +1268,7 @@ public class JoinCompiler {
         String tableAlias = tableRef.getTableAlias();
         TableNode from = NODE_FACTORY.namedTable(tableAlias == null ? null : '"' + tableAlias + '"', tName, dynamicCols);
 
-        return NODE_FACTORY.select(from, hintNode, false, selectList, where, groupBy, null, orderBy, null, 0, groupBy != null, hasSequence, Collections.<SelectStatement>emptyList());
+        return NODE_FACTORY.select(from, hintNode, false, selectList, where, groupBy, null, orderBy, null, 0, groupBy != null, hasSequence, Collections.<SelectStatement>emptyList(), udfParseNodes);
     }
 
     public static PTable joinProjectedTables(PTable left, PTable right, JoinType type) throws SQLException {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
index 0c586f0..fcbeb7e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
@@ -34,6 +34,7 @@ import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixParameterMetaData;
 import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.parse.PFunction;
 import org.apache.phoenix.parse.SelectStatement;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.AmbiguousColumnException;
@@ -123,6 +124,11 @@ public class PostDDLCompiler {
                             public List<TableRef> getTables() {
                                 return Collections.singletonList(tableRef);
                             }
+                            
+                            public java.util.List<PFunction> getFunctions() {
+                                return Collections.emptyList();
+                            };
+                            
                             @Override
                             public TableRef resolveTable(String schemaName, String tableName)
                                     throws SQLException {
@@ -135,6 +141,14 @@ public class PostDDLCompiler {
                                         : tableRef.getTable().getColumn(colName);
                                 return new ColumnRef(tableRef, column.getPosition());
                             }
+                            
+                            public PFunction resolveFunction(String functionName) throws SQLException {
+                                throw new UnsupportedOperationException();
+                            };
+
+                            public boolean hasUDFs() {
+                                return false;
+                            };
                         };
                         PhoenixStatement statement = new PhoenixStatement(connection);
                         StatementContext context = new StatementContext(statement, resolver, scan, new SequenceManager(statement));

http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java
index e84ca2a..c39db09 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java
@@ -458,7 +458,7 @@ public class ProjectionCompiler {
                 projectColumnFamily(table, scan, family);
             }
         }
-        return new RowProjector(projectedColumns, estimatedByteSize, isProjectEmptyKeyValue);
+        return new RowProjector(projectedColumns, estimatedByteSize, isProjectEmptyKeyValue, resolver.hasUDFs());
     }
 
     private static void projectAllColumnFamilies(PTable table, Scan scan) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
index 3100664..e877e03 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -21,6 +21,7 @@ import java.sql.SQLException;
 import java.sql.SQLFeatureNotSupportedException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Set;
 
@@ -62,6 +63,7 @@ import org.apache.phoenix.parse.SQLParser;
 import org.apache.phoenix.parse.SelectStatement;
 import org.apache.phoenix.parse.SubqueryParseNode;
 import org.apache.phoenix.parse.TableNode;
+import org.apache.phoenix.parse.UDFParseNode;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.AmbiguousColumnException;
 import org.apache.phoenix.schema.ColumnNotFoundException;
@@ -240,13 +242,13 @@ public class QueryCompiler {
                 context.setCurrentTable(table.getTableRef());
                 PTable projectedTable = table.createProjectedTable(!projectPKColumns, context);
                 TupleProjector.serializeProjectorIntoScan(context.getScan(), new TupleProjector(projectedTable));
-                context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable));
+                context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), subquery.getUdfParseNodes()));
                 table.projectColumns(context.getScan());
                 return compileSingleQuery(context, subquery, binds, asSubquery, !asSubquery);
             }
             QueryPlan plan = compileSubquery(subquery, false);
             PTable projectedTable = table.createProjectedTable(plan.getProjector());
-            context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable));
+            context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), subquery.getUdfParseNodes()));
             return new TupleProjectionPlan(plan, new TupleProjector(plan.getProjector()), table.compilePostFilterExpression(context));
         }
 
@@ -295,7 +297,7 @@ public class QueryCompiler {
                 } else {
                     tables[i] = null;
                 }
-                context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable));
+                context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), query.getUdfParseNodes()));
                 joinIds[i] = new ImmutableBytesPtr(emptyByteArray); // place-holder
                 Pair<List<Expression>, List<Expression>> joinConditions = joinSpec.compileJoinConditions(context, subContext, true);
                 joinExpressions[i] = joinConditions.getFirst();
@@ -354,7 +356,7 @@ public class QueryCompiler {
                 tupleProjector = new TupleProjector(plan.getProjector());
             }
             context.setCurrentTable(rhsTableRef);
-            context.setResolver(FromCompiler.getResolverForProjectedTable(rhsProjTable));
+            context.setResolver(FromCompiler.getResolverForProjectedTable(rhsProjTable, context.getConnection(), rhs.getUdfParseNodes()));
             ImmutableBytesPtr[] joinIds = new ImmutableBytesPtr[] {new ImmutableBytesPtr(emptyByteArray)};
             Pair<List<Expression>, List<Expression>> joinConditions = lastJoinSpec.compileJoinConditions(lhsCtx, context, true);
             List<Expression> joinExpressions = joinConditions.getSecond();
@@ -364,7 +366,7 @@ public class QueryCompiler {
             int fieldPosition = needsMerge ? rhsProjTable.getColumns().size() - rhsProjTable.getPKColumns().size() : 0;
             PTable projectedTable = needsMerge ? JoinCompiler.joinProjectedTables(rhsProjTable, lhsTable, type == JoinType.Right ? JoinType.Left : type) : rhsProjTable;
             TupleProjector.serializeProjectorIntoScan(context.getScan(), tupleProjector);
-            context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable));
+            context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), rhs.getUdfParseNodes()));
             QueryPlan rhsPlan = compileSingleQuery(context, rhs, binds, asSubquery, !asSubquery && type == JoinType.Right);
             Expression postJoinFilterExpression = joinTable.compilePostFilterExpression(context, rhsTable);
             Integer limit = null;
@@ -413,7 +415,7 @@ public class QueryCompiler {
         int fieldPosition = needsMerge ? lhsProjTable.getColumns().size() - lhsProjTable.getPKColumns().size() : 0;
         PTable projectedTable = needsMerge ? JoinCompiler.joinProjectedTables(lhsProjTable, rhsProjTable, type == JoinType.Right ? JoinType.Left : type) : lhsProjTable;
 
-        ColumnResolver resolver = FromCompiler.getResolverForProjectedTable(projectedTable);
+        ColumnResolver resolver = FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), new HashMap<String,UDFParseNode>(1));
         TableRef tableRef = resolver.getTables().get(0);
         StatementContext subCtx = new StatementContext(statement, resolver, ScanUtil.newScan(originalScan), new SequenceManager(statement));
         subCtx.setCurrentTable(tableRef);
@@ -422,7 +424,7 @@ public class QueryCompiler {
         context.setResolver(resolver);
         TableNode from = NODE_FACTORY.namedTable(tableRef.getTableAlias(), NODE_FACTORY.table(tableRef.getTable().getSchemaName().getString(), tableRef.getTable().getTableName().getString()));
         ParseNode where = joinTable.getPostFiltersCombined();
-        SelectStatement select = asSubquery ? NODE_FACTORY.select(from, joinTable.getStatement().getHint(), false, Collections.<AliasedNode> emptyList(), where, null, null, orderBy, null, 0, false, joinTable.getStatement().hasSequence(), Collections.<SelectStatement>emptyList())
+        SelectStatement select = asSubquery ? NODE_FACTORY.select(from, joinTable.getStatement().getHint(), false, Collections.<AliasedNode> emptyList(), where, null, null, orderBy, null, 0, false, joinTable.getStatement().hasSequence(), Collections.<SelectStatement>emptyList(), joinTable.getStatement().getUdfParseNodes())
                 : NODE_FACTORY.select(joinTable.getStatement(), from, where);
         
         return compileSingleFlatQuery(context, select, binds, asSubquery, false, innerPlan, null, isInRowKeyOrder);
@@ -505,7 +507,7 @@ public class QueryCompiler {
         if (this.projectTuples) {
             projectedTable = TupleProjectionCompiler.createProjectedTable(select, context);
             if (projectedTable != null) {
-                context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable));
+                context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), select.getUdfParseNodes()));
             }
         }
         

http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/compile/RowProjector.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/RowProjector.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/RowProjector.java
index 1b35e92..c60933e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/RowProjector.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/RowProjector.java
@@ -52,9 +52,10 @@ public class RowProjector {
     private final int estimatedSize;
     private final boolean isProjectEmptyKeyValue;
     private final boolean cloneRequired;
+    private final boolean hasUDFs;
     
     public RowProjector(RowProjector projector, boolean isProjectEmptyKeyValue) {
-        this(projector.getColumnProjectors(), projector.getEstimatedRowByteSize(), isProjectEmptyKeyValue);
+        this(projector.getColumnProjectors(), projector.getEstimatedRowByteSize(), isProjectEmptyKeyValue, projector.hasUDFs);
     }
     /**
      * Construct RowProjector based on a list of ColumnProjectors.
@@ -64,6 +65,18 @@ public class RowProjector {
      * @param estimatedRowSize 
      */
     public RowProjector(List<? extends ColumnProjector> columnProjectors, int estimatedRowSize, boolean isProjectEmptyKeyValue) {
+        this(columnProjectors, estimatedRowSize, isProjectEmptyKeyValue, false);
+    }
+    /**
+     * Construct RowProjector based on a list of ColumnProjectors.
+     * @param columnProjectors ordered list of ColumnProjectors corresponding to projected columns in SELECT clause
+     * aggregating coprocessor. Only required in the case of an aggregate query with a limit clause and otherwise may
+     * be null.
+     * @param estimatedRowSize 
+     * @param isProjectEmptyKeyValue
+     * @param hasUDFs
+     */
+    public RowProjector(List<? extends ColumnProjector> columnProjectors, int estimatedRowSize, boolean isProjectEmptyKeyValue, boolean hasUDFs) {
         this.columnProjectors = Collections.unmodifiableList(columnProjectors);
         int position = columnProjectors.size();
         reverseIndex = ArrayListMultimap.<String, Integer>create();
@@ -82,15 +95,18 @@ public class RowProjector {
         this.someCaseSensitive = someCaseSensitive;
         this.estimatedSize = estimatedRowSize;
         this.isProjectEmptyKeyValue = isProjectEmptyKeyValue;
+        this.hasUDFs = hasUDFs;
         boolean hasPerInvocationExpression = false;
-        for (int i = 0; i < this.columnProjectors.size(); i++) {
-            Expression expression = this.columnProjectors.get(i).getExpression();
-            if (expression.getDeterminism() == Determinism.PER_INVOCATION) {
-                hasPerInvocationExpression = true;
-                break;
+        if (!hasUDFs) {
+            for (int i = 0; i < this.columnProjectors.size(); i++) {
+                Expression expression = this.columnProjectors.get(i).getExpression();
+                if (expression.getDeterminism() == Determinism.PER_INVOCATION) {
+                    hasPerInvocationExpression = true;
+                    break;
+                }
             }
         }
-        this.cloneRequired = hasPerInvocationExpression;
+        this.cloneRequired = hasPerInvocationExpression || hasUDFs;
     }
 
     public RowProjector cloneIfNecessary() {
@@ -114,7 +130,7 @@ public class RowProjector {
         }
         return new RowProjector(clonedColProjectors, 
                 this.getEstimatedRowByteSize(),
-                this.isProjectEmptyKeyValue());
+                this.isProjectEmptyKeyValue(), this.hasUDFs);
     }
 
     public boolean isProjectEmptyKeyValue() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementNormalizer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementNormalizer.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementNormalizer.java
index b9897b1..9b54c86 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementNormalizer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementNormalizer.java
@@ -30,6 +30,7 @@ import org.apache.phoenix.parse.ComparisonParseNode;
 import org.apache.phoenix.parse.DerivedTableNode;
 import org.apache.phoenix.parse.FamilyWildcardParseNode;
 import org.apache.phoenix.parse.JoinTableNode;
+import org.apache.phoenix.parse.NamedNode;
 import org.apache.phoenix.parse.JoinTableNode.JoinType;
 import org.apache.phoenix.parse.LessThanOrEqualParseNode;
 import org.apache.phoenix.parse.NamedTableNode;
@@ -99,7 +100,7 @@ public class StatementNormalizer extends ParseNodeRewriter {
             if (selectNodes != normSelectNodes) {
                 statement = NODE_FACTORY.select(statement.getFrom(), statement.getHint(), statement.isDistinct(),
                         normSelectNodes, statement.getWhere(), statement.getGroupBy(), statement.getHaving(), statement.getOrderBy(),
-                        statement.getLimit(), statement.getBindCount(), statement.isAggregate(), statement.hasSequence(), statement.getSelects());
+                        statement.getLimit(), statement.getBindCount(), statement.isAggregate(), statement.hasSequence(), statement.getSelects(), statement.getUdfParseNodes());
             }
         }
         
@@ -151,7 +152,7 @@ public class StatementNormalizer extends ParseNodeRewriter {
          }
          return super.visitLeave(node, nodes);
     }
-    
+
     @Override
     public ParseNode visitLeave(final BetweenParseNode node, List<ParseNode> nodes) throws SQLException {
        

http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/compile/SubqueryRewriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/SubqueryRewriter.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/SubqueryRewriter.java
index 1746d8a..123cb6a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/SubqueryRewriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/SubqueryRewriter.java
@@ -341,7 +341,7 @@ public class SubqueryRewriter extends ParseNodeRewriter {
                 groupbyNodes.set(i - 1, aliasedNode.getNode());
             }
             SelectStatement derivedTableStmt = NODE_FACTORY.select(subquery, subquery.isDistinct(), derivedTableSelect, where, derivedTableGroupBy, true);
-            subquery = NODE_FACTORY.select(NODE_FACTORY.derivedTable(derivedTableAlias, derivedTableStmt), subquery.getHint(), false, selectNodes, null, groupbyNodes, null, Collections.<OrderByNode> emptyList(), null, subquery.getBindCount(), true, false, Collections.<SelectStatement>emptyList());
+            subquery = NODE_FACTORY.select(NODE_FACTORY.derivedTable(derivedTableAlias, derivedTableStmt), subquery.getHint(), false, selectNodes, null, groupbyNodes, null, Collections.<OrderByNode> emptyList(), null, subquery.getBindCount(), true, false, Collections.<SelectStatement>emptyList(), subquery.getUdfParseNodes());
         }
         
         ParseNode onNode = conditionExtractor.getJoinCondition();
@@ -364,7 +364,7 @@ public class SubqueryRewriter extends ParseNodeRewriter {
             return select;
         
         // Wrap as a derived table.
-        return NODE_FACTORY.select(NODE_FACTORY.derivedTable(ParseNodeFactory.createTempAlias(), select), HintNode.EMPTY_HINT_NODE, false, select.getSelect(), null, null, null, null, null, select.getBindCount(), false, false, Collections.<SelectStatement> emptyList());
+        return NODE_FACTORY.select(NODE_FACTORY.derivedTable(ParseNodeFactory.createTempAlias(), select), HintNode.EMPTY_HINT_NODE, false, select.getSelect(), null, null, null, null, null, select.getBindCount(), false, false, Collections.<SelectStatement> emptyList(), select.getUdfParseNodes());
     }
     
     private List<AliasedNode> fixAliasedNodes(List<AliasedNode> nodes, boolean addSelectOne) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/compile/SubselectRewriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/SubselectRewriter.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/SubselectRewriter.java
index 6862802..5a91a17 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/SubselectRewriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/SubselectRewriter.java
@@ -205,7 +205,7 @@ public class SubselectRewriter extends ParseNodeRewriter {
         }
         
         return NODE_FACTORY.select(subselect.getFrom(), hintRewrite, isDistinctRewrite, selectNodesRewrite, whereRewrite, groupByRewrite, 
-            havingRewrite, orderByRewrite, limitRewrite, select.getBindCount(), isAggregateRewrite, select.hasSequence(), select.getSelects());
+            havingRewrite, orderByRewrite, limitRewrite, select.getBindCount(), isAggregateRewrite, select.hasSequence(), select.getSelects(), select.getUdfParseNodes());
     }
     
     private SelectStatement applyPostFilters(SelectStatement statement, List<ParseNode> postFilters) throws SQLException {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index fab1ad0..cd10007 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -51,6 +51,17 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_CONSTANT_BYTE
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_STATEMENT_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_TYPE_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.FUNCTION_NAME_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NUM_ARGS_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_ARRAY_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_CONSTANT_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_VALUE_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MIN_VALUE_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MAX_VALUE_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLASS_NAME_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.JAR_PATH_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TYPE_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.RETURN_TYPE_BYTES;
 import static org.apache.phoenix.schema.PTableType.INDEX;
 import static org.apache.phoenix.util.SchemaUtil.getVarCharLength;
 import static org.apache.phoenix.util.SchemaUtil.getVarChars;
@@ -61,6 +72,8 @@ import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 
@@ -91,22 +104,29 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegion.RowLock;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.cache.GlobalCache;
+import org.apache.phoenix.cache.GlobalCache.FunctionBytesPtr;
+import org.apache.phoenix.compile.ScanRanges;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.AddColumnRequest;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheRequest;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheResponse;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearTableFromCacheRequest;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearTableFromCacheResponse;
+import org.apache.phoenix.coprocessor.generated.MetaDataProtos.CreateFunctionRequest;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.CreateTableRequest;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.DropColumnRequest;
+import org.apache.phoenix.coprocessor.generated.MetaDataProtos.DropFunctionRequest;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.DropTableRequest;
+import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetFunctionsRequest;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetTableRequest;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetVersionRequest;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetVersionResponse;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.UpdateIndexStateRequest;
+import org.apache.phoenix.expression.LiteralExpression;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
@@ -115,7 +135,10 @@ import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.metrics.Metrics;
+import org.apache.phoenix.parse.PFunction;
+import org.apache.phoenix.parse.PFunction.FunctionArgument;
 import org.apache.phoenix.protobuf.ProtobufUtil;
+import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.AmbiguousColumnException;
 import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
@@ -123,6 +146,7 @@ import org.apache.phoenix.schema.ColumnNotFoundException;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PColumnFamily;
 import org.apache.phoenix.schema.PColumnImpl;
+import org.apache.phoenix.schema.PMetaDataEntity;
 import org.apache.phoenix.schema.PIndexState;
 import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PNameFactory;
@@ -149,6 +173,7 @@ import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.KeyValueUtil;
 import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.ServerUtil;
 import org.slf4j.Logger;
@@ -156,6 +181,7 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.cache.Cache;
 import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
 import com.google.protobuf.RpcCallback;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.Service;
@@ -220,6 +246,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
     static {
         Collections.sort(TABLE_KV_COLUMNS, KeyValue.COMPARATOR);
     }
+
     private static final int TABLE_TYPE_INDEX = TABLE_KV_COLUMNS.indexOf(TABLE_TYPE_KV);
     private static final int TABLE_SEQ_NUM_INDEX = TABLE_KV_COLUMNS.indexOf(TABLE_SEQ_NUM_KV);
     private static final int COLUMN_COUNT_INDEX = TABLE_KV_COLUMNS.indexOf(COLUMN_COUNT_KV);
@@ -277,6 +304,52 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
     
     private static final int LINK_TYPE_INDEX = 0;
 
+    private static final KeyValue CLASS_NAME_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, CLASS_NAME_BYTES);
+    private static final KeyValue JAR_PATH_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, JAR_PATH_BYTES);
+    private static final KeyValue RETURN_TYPE_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, RETURN_TYPE_BYTES);
+    private static final KeyValue NUM_ARGS_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, NUM_ARGS_BYTES);
+    private static final KeyValue TYPE_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, TYPE_BYTES);
+    private static final KeyValue IS_CONSTANT_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, IS_CONSTANT_BYTES);
+    private static final KeyValue DEFAULT_VALUE_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, DEFAULT_VALUE_BYTES);
+    private static final KeyValue MIN_VALUE_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, MIN_VALUE_BYTES);
+    private static final KeyValue MAX_VALUE_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, MAX_VALUE_BYTES);
+    private static final KeyValue IS_ARRAY_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, IS_ARRAY_BYTES);
+
+    private static final List<KeyValue> FUNCTION_KV_COLUMNS = Arrays.<KeyValue>asList(
+        EMPTY_KEYVALUE_KV,
+        CLASS_NAME_KV,
+        JAR_PATH_KV,
+        RETURN_TYPE_KV,
+        NUM_ARGS_KV
+        );
+    static {
+        Collections.sort(FUNCTION_KV_COLUMNS, KeyValue.COMPARATOR);
+    }
+    
+    private static final int CLASS_NAME_INDEX = FUNCTION_KV_COLUMNS.indexOf(CLASS_NAME_KV);
+    private static final int JAR_PATH_INDEX = FUNCTION_KV_COLUMNS.indexOf(JAR_PATH_KV);
+    private static final int RETURN_TYPE_INDEX = FUNCTION_KV_COLUMNS.indexOf(RETURN_TYPE_KV);
+    private static final int NUM_ARGS_INDEX = FUNCTION_KV_COLUMNS.indexOf(NUM_ARGS_KV);
+
+    private static final List<KeyValue> FUNCTION_ARG_KV_COLUMNS = Arrays.<KeyValue>asList(
+        TYPE_KV,
+        IS_ARRAY_KV,
+        IS_CONSTANT_KV,
+        DEFAULT_VALUE_KV,
+        MIN_VALUE_KV,
+        MAX_VALUE_KV
+        );
+    static {
+        Collections.sort(FUNCTION_ARG_KV_COLUMNS, KeyValue.COMPARATOR);
+    }
+    
+    private static final int TYPE_INDEX = FUNCTION_ARG_KV_COLUMNS.indexOf(TYPE_KV);
+    private static final int IS_ARRAY_INDEX = FUNCTION_ARG_KV_COLUMNS.indexOf(IS_ARRAY_KV);
+    private static final int IS_CONSTANT_INDEX = FUNCTION_ARG_KV_COLUMNS.indexOf(IS_CONSTANT_KV);
+    private static final int DEFAULT_VALUE_INDEX = FUNCTION_ARG_KV_COLUMNS.indexOf(DEFAULT_VALUE_KV);
+    private static final int MIN_VALUE_INDEX = FUNCTION_ARG_KV_COLUMNS.indexOf(MIN_VALUE_KV);
+    private static final int MAX_VALUE_INDEX = FUNCTION_ARG_KV_COLUMNS.indexOf(MAX_VALUE_KV);
+    
     private static PName newPName(byte[] keyBuffer, int keyOffset, int keyLength) {
         if (keyLength <= 0) {
             return null;
@@ -368,9 +441,9 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
         Scan scan = MetaDataUtil.newTableRowsScan(key, MIN_TABLE_TIMESTAMP, clientTimeStamp);
         RegionScanner scanner = region.getScanner(scan);
 
-        Cache<ImmutableBytesPtr,PTable> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
+        Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
         try {
-            PTable oldTable = metaDataCache.getIfPresent(cacheKey);
+            PTable oldTable = (PTable)metaDataCache.getIfPresent(cacheKey);
             long tableTimeStamp = oldTable == null ? MIN_TABLE_TIMESTAMP-1 : oldTable.getTimeStamp();
             PTable newTable;
             newTable = getTable(scanner, clientTimeStamp, tableTimeStamp);
@@ -393,6 +466,48 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
         }
     }
 
+    private List<PFunction> buildFunctions(List<byte[]> keys, HRegion region,
+            long clientTimeStamp) throws IOException, SQLException {
+        List<KeyRange> keyRanges = Lists.newArrayListWithExpectedSize(keys.size());
+        for (byte[] key : keys) {
+            byte[] stopKey = ByteUtil.concat(key, QueryConstants.SEPARATOR_BYTE_ARRAY);
+            ByteUtil.nextKey(stopKey, stopKey.length);
+            keyRanges.add(PVarbinary.INSTANCE.getKeyRange(key, true, stopKey, false));
+        }
+        Scan scan = new Scan();
+        scan.setTimeRange(MIN_TABLE_TIMESTAMP, clientTimeStamp);
+        ScanRanges scanRanges =
+                ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA,
+                    Collections.singletonList(keyRanges), ScanUtil.SINGLE_COLUMN_SLOT_SPAN);
+        scanRanges.initializeScan(scan);
+        scan.setFilter(scanRanges.getSkipScanFilter());
+
+        RegionScanner scanner = region.getScanner(scan);
+
+        Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
+        List<PFunction> functions = new ArrayList<PFunction>();
+        PFunction function = null;
+        try {
+            for(int i = 0; i< keys.size(); i++) {
+                function = null;
+                function = getFunction(scanner);
+                if (function == null) {
+                    return null;
+                }
+                byte[] functionKey =
+                        SchemaUtil.getFunctionKey(
+                            function.getTenantId() == null ? ByteUtil.EMPTY_BYTE_ARRAY : function
+                                    .getTenantId().getBytes(), Bytes.toBytes(function
+                                    .getFunctionName()));
+                metaDataCache.put(new FunctionBytesPtr(functionKey), function);
+                functions.add(function);
+            }
+            return functions;
+        } finally {
+            scanner.close();
+        }
+    }
+
     private void addIndexToTable(PName tenantId, PName schemaName, PName indexName, PName tableName, long clientTimeStamp, List<PTable> indexes) throws IOException, SQLException {
         byte[] key = SchemaUtil.getTableKey(tenantId == null ? ByteUtil.EMPTY_BYTE_ARRAY : tenantId.getBytes(), schemaName.getBytes(), indexName.getBytes());
         PTable indexTable = doGetTable(key, clientTimeStamp);
@@ -473,6 +588,61 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
         PColumn column = new PColumnImpl(colName, famName, dataType, maxLength, scale, isNullable, position-1, sortOrder, arraySize, viewConstant, isViewReferenced, expressionStr);
         columns.add(column);
     }
+    
+    private void addArgumentToFunction(List<Cell> results, PName functionName, PName type,
+        Cell[] functionKeyValues, List<FunctionArgument> arguments, short argPosition) {
+        int i = 0;
+        int j = 0;
+        while (i < results.size() && j < FUNCTION_ARG_KV_COLUMNS.size()) {
+            Cell kv = results.get(i);
+            Cell searchKv = FUNCTION_ARG_KV_COLUMNS.get(j);
+            int cmp =
+                    Bytes.compareTo(kv.getQualifierArray(), kv.getQualifierOffset(),
+                        kv.getQualifierLength(), searchKv.getQualifierArray(),
+                        searchKv.getQualifierOffset(), searchKv.getQualifierLength());
+            if (cmp == 0) {
+                functionKeyValues[j++] = kv;
+                i++;
+            } else if (cmp > 0) {
+                functionKeyValues[j++] = null;
+            } else {
+                i++; // shouldn't happen - means unexpected KV in system table column row
+            }
+        }
+
+        Cell isArrayKv = functionKeyValues[IS_ARRAY_INDEX];
+        boolean isArrayType =
+                isArrayKv == null ? false : Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(
+                    isArrayKv.getValueArray(), isArrayKv.getValueOffset(),
+                    isArrayKv.getValueLength()));
+        Cell isConstantKv = functionKeyValues[IS_CONSTANT_INDEX];
+        boolean isConstant =
+                isConstantKv == null ? false : Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(
+                    isConstantKv.getValueArray(), isConstantKv.getValueOffset(),
+                    isConstantKv.getValueLength()));
+        Cell defaultValueKv = functionKeyValues[DEFAULT_VALUE_INDEX];
+        String defaultValue =
+                defaultValueKv == null ? null : (String) PVarchar.INSTANCE.toObject(
+                    defaultValueKv.getValueArray(), defaultValueKv.getValueOffset(),
+                    defaultValueKv.getValueLength());
+        Cell minValueKv = functionKeyValues[MIN_VALUE_INDEX];
+        String minValue =
+                minValueKv == null ? null : (String) PVarchar.INSTANCE.toObject(
+                    minValueKv.getValueArray(), minValueKv.getValueOffset(),
+                    minValueKv.getValueLength());
+        Cell maxValueKv = functionKeyValues[MAX_VALUE_INDEX];
+        String maxValue =
+                maxValueKv == null ? null : (String) PVarchar.INSTANCE.toObject(
+                    maxValueKv.getValueArray(), maxValueKv.getValueOffset(),
+                    maxValueKv.getValueLength());
+        FunctionArgument arg =
+                new FunctionArgument(type.getString(), isArrayType, isConstant,
+                        defaultValue == null ? null : LiteralExpression.newConstant(defaultValue),
+                        minValue == null ? null : LiteralExpression.newConstant(minValue),
+                        maxValue == null ? null : LiteralExpression.newConstant(maxValue),
+                        argPosition);
+        arguments.add(arg);
+    }
 
     private PTable getTable(RegionScanner scanner, long clientTimeStamp, long tableTimeStamp)
         throws IOException, SQLException {
@@ -646,6 +816,106 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
             disableWAL, multiTenant, storeNulls, viewType, viewIndexId, indexType, stats);
     }
 
+    private PFunction getFunction(RegionScanner scanner)
+            throws IOException, SQLException {
+        List<Cell> results = Lists.newArrayList();
+        scanner.next(results);
+        if (results.isEmpty()) {
+            return null;
+        }
+        Cell[] functionKeyValues = new Cell[FUNCTION_KV_COLUMNS.size()];
+        Cell[] functionArgKeyValues = new Cell[FUNCTION_ARG_KV_COLUMNS.size()];
+
+        // Create PFunction based on KeyValues from scan
+        Cell keyValue = results.get(0);
+        byte[] keyBuffer = keyValue.getRowArray();
+        int keyLength = keyValue.getRowLength();
+        int keyOffset = keyValue.getRowOffset();
+        PName tenantId = newPName(keyBuffer, keyOffset, keyLength);
+        int tenantIdLength = (tenantId == null) ? 0 : tenantId.getBytes().length;
+        if (tenantIdLength == 0) {
+            tenantId = null;
+        }
+        PName functionName =
+                newPName(keyBuffer, keyOffset + tenantIdLength + 1, keyLength - tenantIdLength - 1);
+        int functionNameLength = functionName.getBytes().length+1;
+        int offset = tenantIdLength + functionNameLength + 1;
+
+        long timeStamp = keyValue.getTimestamp();
+
+        int i = 0;
+        int j = 0;
+        while (i < results.size() && j < FUNCTION_KV_COLUMNS.size()) {
+            Cell kv = results.get(i);
+            Cell searchKv = FUNCTION_KV_COLUMNS.get(j);
+            int cmp =
+                    Bytes.compareTo(kv.getQualifierArray(), kv.getQualifierOffset(),
+                        kv.getQualifierLength(), searchKv.getQualifierArray(),
+                        searchKv.getQualifierOffset(), searchKv.getQualifierLength());
+            if (cmp == 0) {
+                timeStamp = Math.max(timeStamp, kv.getTimestamp()); // Find max timestamp of table
+                                                                    // header row
+                functionKeyValues[j++] = kv;
+                i++;
+            } else if (cmp > 0) {
+                timeStamp = Math.max(timeStamp, kv.getTimestamp());
+                functionKeyValues[j++] = null;
+            } else {
+                i++; // shouldn't happen - means unexpected KV in system table header row
+            }
+        }
+        // CLASS_NAME,NUM_ARGS and JAR_PATH are required.
+        if (functionKeyValues[CLASS_NAME_INDEX] == null || functionKeyValues[NUM_ARGS_INDEX] == null) {
+            throw new IllegalStateException(
+                    "Didn't find expected key values for function row in metadata row");
+        }
+
+        Cell classNameKv = functionKeyValues[CLASS_NAME_INDEX];
+        PName className = newPName(classNameKv.getValueArray(), classNameKv.getValueOffset(),
+            classNameKv.getValueLength());
+        Cell jarPathKv = functionKeyValues[JAR_PATH_INDEX];
+        PName jarPath = null;
+        if(jarPathKv != null) {
+            jarPath = newPName(jarPathKv.getValueArray(), jarPathKv.getValueOffset(),
+                jarPathKv.getValueLength());
+        }
+        Cell numArgsKv = functionKeyValues[NUM_ARGS_INDEX];
+        int numArgs =
+                PInteger.INSTANCE.getCodec().decodeInt(numArgsKv.getValueArray(),
+                    numArgsKv.getValueOffset(), SortOrder.getDefault());
+        Cell returnTypeKv = functionKeyValues[RETURN_TYPE_INDEX];
+        PName returnType =
+                returnTypeKv == null ? null : newPName(returnTypeKv.getValueArray(),
+                    returnTypeKv.getValueOffset(), returnTypeKv.getValueLength());
+
+        List<FunctionArgument> arguments = Lists.newArrayListWithExpectedSize(numArgs);
+        for (int k = 0; k < numArgs; k++) {
+            results.clear();
+            scanner.next(results);
+            if (results.isEmpty()) {
+                break;
+            }
+            Cell typeKv = results.get(0);
+            int typeKeyLength = typeKv.getRowLength();
+            PName typeName =
+                    newPName(typeKv.getRowArray(), typeKv.getRowOffset() + offset, typeKeyLength
+                            - offset - 3);
+            
+            int argPositionOffset =  offset + typeName.getBytes().length + 1;
+            short argPosition = Bytes.toShort(typeKv.getRowArray(), typeKv.getRowOffset() + argPositionOffset, typeKeyLength
+                - argPositionOffset);
+            addArgumentToFunction(results, functionName, typeName, functionArgKeyValues, arguments, argPosition);
+        }
+        Collections.sort(arguments, new Comparator<FunctionArgument>() {
+            @Override
+            public int compare(FunctionArgument o1, FunctionArgument o2) {
+                return o1.getArgPosition() - o2.getArgPosition();
+            }
+        });
+        return new PFunction(tenantId, functionName.getString(), arguments, returnType.getString(),
+                className.getString(), jarPath == null ? null : jarPath.getString(), timeStamp);
+    }
+    
     private PTable buildDeletedTable(byte[] key, ImmutableBytesPtr cacheKey, HRegion region,
         long clientTimeStamp) throws IOException {
         if (clientTimeStamp == HConstants.LATEST_TIMESTAMP) {
@@ -663,7 +933,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
         if (!results.isEmpty() && results.get(0).getTimestamp() > clientTimeStamp) {
             Cell kv = results.get(0);
             if (kv.getTypeByte() == Type.Delete.getCode()) {
-                Cache<ImmutableBytesPtr, PTable> metaDataCache =
+                Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache =
                         GlobalCache.getInstance(this.env).getMetaDataCache();
                 PTable table = newDeletedTableMarker(kv.getTimestamp());
                 metaDataCache.put(cacheKey, table);
@@ -673,20 +943,57 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
         return null;
     }
 
+    
+    private PFunction buildDeletedFunction(byte[] key, ImmutableBytesPtr cacheKey, HRegion region,
+        long clientTimeStamp) throws IOException {
+        if (clientTimeStamp == HConstants.LATEST_TIMESTAMP) {
+            return null;
+        }
+
+        Scan scan = MetaDataUtil.newTableRowsScan(key, clientTimeStamp, HConstants.LATEST_TIMESTAMP);
+        scan.setFilter(new FirstKeyOnlyFilter());
+        scan.setRaw(true);
+        List<Cell> results = Lists.<Cell> newArrayList();
+        try (RegionScanner scanner = region.getScanner(scan);) {
+          scanner.next(results);
+        }
+        // HBase ignores the time range on a raw scan (HBASE-7362)
+        if (!results.isEmpty() && results.get(0).getTimestamp() > clientTimeStamp) {
+            Cell kv = results.get(0);
+            if (kv.getTypeByte() == Type.Delete.getCode()) {
+                Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache =
+                        GlobalCache.getInstance(this.env).getMetaDataCache();
+                PFunction function = newDeletedFunctionMarker(kv.getTimestamp());
+                metaDataCache.put(cacheKey, function);
+                return function;
+            }
+        }
+        return null;
+    }
+
+
     private static PTable newDeletedTableMarker(long timestamp) {
         return new PTableImpl(timestamp);
     }
 
+    private static PFunction newDeletedFunctionMarker(long timestamp) {
+        return new PFunction(timestamp);
+    }
+
     private static boolean isTableDeleted(PTable table) {
         return table.getName() == null;
     }
 
+    private static boolean isFunctionDeleted(PFunction function) {
+        return function.getFunctionName() == null;
+    }
+
     private PTable loadTable(RegionCoprocessorEnvironment env, byte[] key,
         ImmutableBytesPtr cacheKey, long clientTimeStamp, long asOfTimeStamp)
         throws IOException, SQLException {
         HRegion region = env.getRegion();
-        Cache<ImmutableBytesPtr,PTable> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
-        PTable table = metaDataCache.getIfPresent(cacheKey);
+        Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
+        PTable table = (PTable)metaDataCache.getIfPresent(cacheKey);
         // We always cache the latest version - fault in if not in cache
         if (table != null || (table = buildTable(key, cacheKey, region, asOfTimeStamp)) != null) {
             return table;
@@ -700,6 +1007,29 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
         return null;
     }
 
+    private PFunction loadFunction(RegionCoprocessorEnvironment env, byte[] key,
+            ImmutableBytesPtr cacheKey, long clientTimeStamp, long asOfTimeStamp)
+            throws IOException, SQLException {
+            HRegion region = env.getRegion();
+            Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
+            PFunction function = (PFunction)metaDataCache.getIfPresent(cacheKey);
+            // We always cache the latest version - fault in if not in cache
+            if (function != null) {
+                return function;
+            }
+            ArrayList<byte[]> arrayList = new ArrayList<byte[]>(1);
+            arrayList.add(key);
+            List<PFunction> functions = buildFunctions(arrayList, region, asOfTimeStamp);
+            if(functions != null) return functions.get(0);
+            // if not found then check if newer table already exists and add delete marker for timestamp
+            // found
+            if (function == null
+                    && (function = buildDeletedFunction(key, cacheKey, region, clientTimeStamp)) != null) {
+                return function;
+            }
+            return null;
+        }
+
 
     @Override
     public void createTable(RpcController controller, CreateTableRequest request,
@@ -801,7 +1131,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
 
                 // Invalidate the cache - the next getTable call will add it
                 // TODO: consider loading the table that was just created here, patching up the parent table, and updating the cache
-                Cache<ImmutableBytesPtr,PTable> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
+                Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
                 if (parentCacheKey != null) {
                     metaDataCache.invalidate(parentCacheKey);
                 }
@@ -950,7 +1280,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                     done.run(MetaDataMutationResult.toProto(result));
                     return;
                 }
-                Cache<ImmutableBytesPtr,PTable> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
+                Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
                 // Commit the list of deletion.
                 region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]> emptySet());
                 long currentTime = MetaDataUtil.getClientTimeStamp(tableMetadata);
@@ -984,8 +1314,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
         HRegion region = env.getRegion();
         ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key);
 
-        Cache<ImmutableBytesPtr,PTable> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
-        PTable table = metaDataCache.getIfPresent(cacheKey);
+        Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
+        PTable table = (PTable)metaDataCache.getIfPresent(cacheKey);
 
         // We always cache the latest version - fault in if not in cache
         if (table != null
@@ -1132,8 +1462,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                 ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key);
                 List<ImmutableBytesPtr> invalidateList = new ArrayList<ImmutableBytesPtr>();
                 invalidateList.add(cacheKey);
-                Cache<ImmutableBytesPtr,PTable> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
-                PTable table = metaDataCache.getIfPresent(cacheKey);
+                Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
+                PTable table = (PTable)metaDataCache.getIfPresent(cacheKey);
                 if (logger.isDebugEnabled()) {
                     if (table == null) {
                         logger.debug("Table " + Bytes.toStringBinary(key)
@@ -1299,9 +1629,9 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
 
     private PTable doGetTable(byte[] key, long clientTimeStamp, RowLock rowLock) throws IOException, SQLException {
         ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key);
-        Cache<ImmutableBytesPtr, PTable> metaDataCache =
+        Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache =
                 GlobalCache.getInstance(this.env).getMetaDataCache();
-        PTable table = metaDataCache.getIfPresent(cacheKey);
+        PTable table = (PTable)metaDataCache.getIfPresent(cacheKey);
         // We only cache the latest, so we'll end up building the table with every call if the
         // client connection has specified an SCN.
         // TODO: If we indicate to the client that we're returning an older version, but there's a
@@ -1333,7 +1663,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
         }
         try {
             // Try cache again in case we were waiting on a lock
-            table = metaDataCache.getIfPresent(cacheKey);
+            table = (PTable)metaDataCache.getIfPresent(cacheKey);
             // We only cache the latest, so we'll end up building the table with every call if the
             // client connection has specified an SCN.
             // TODO: If we indicate to the client that we're returning an older version, but there's
@@ -1358,6 +1688,64 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
         }
     }
 
+    private List<PFunction> doGetFunctions(List<byte[]> keys, long clientTimeStamp) throws IOException, SQLException {
+        Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache =
+                GlobalCache.getInstance(this.env).getMetaDataCache();
+        HRegion region = env.getRegion();
+        Collections.sort(keys, new Comparator<byte[]>() {
+            @Override
+            public int compare(byte[] o1, byte[] o2) {
+                return Bytes.compareTo(o1, o2);
+            }
+        });
+        /*
+         * Lock directly on key, though it may be an index table. This will just prevent a table
+         * from getting rebuilt too often.
+         */
+        List<RowLock> rowLocks = new ArrayList<HRegion.RowLock>(keys.size());;
+        try {
+            rowLocks = new ArrayList<HRegion.RowLock>(keys.size());
+            for (int i = 0; i < keys.size(); i++) {
+                HRegion.RowLock rowLock = region.getRowLock(keys.get(i));
+                if (rowLock == null) {
+                    throw new IOException("Failed to acquire lock on "
+                            + Bytes.toStringBinary(keys.get(i)));
+                }
+                rowLocks.add(rowLock);
+            }
+
+            List<PFunction> functionsAvailable = new ArrayList<PFunction>(keys.size());
+            int numFunctions = keys.size(); 
+            Iterator<byte[]> iterator = keys.iterator();
+            while(iterator.hasNext()) {
+                byte[] key = iterator.next();
+                PFunction function = (PFunction)metaDataCache.getIfPresent(new FunctionBytesPtr(key));
+                if (function != null && function.getTimeStamp() < clientTimeStamp) {
+                    if (isFunctionDeleted(function)) {
+                        return null;
+                    }
+                    functionsAvailable.add(function);
+                    iterator.remove();
+                }
+            }
+            if(functionsAvailable.size() == numFunctions) return functionsAvailable;
+
+            // Query for the latest table first, since it's not cached
+            List<PFunction> buildFunctions = buildFunctions(keys, region, clientTimeStamp);
+            if(buildFunctions == null || buildFunctions.isEmpty()) {
+                return null;
+            }
+            functionsAvailable.addAll(buildFunctions);
+            if(functionsAvailable.size() == numFunctions) return functionsAvailable;
+            return null;
+        } finally {
+            for (HRegion.RowLock lock : rowLocks) {
+                lock.release();
+            }
+            rowLocks.clear();
+        }
+    }
+
     @Override
     public void dropColumn(RpcController controller, DropColumnRequest request,
             RpcCallback<MetaDataResponse> done) {
@@ -1478,7 +1866,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
     public void clearCache(RpcController controller, ClearCacheRequest request,
             RpcCallback<ClearCacheResponse> done) {
         GlobalCache cache = GlobalCache.getInstance(this.env);
-        Cache<ImmutableBytesPtr, PTable> metaDataCache =
+        Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache =
                 GlobalCache.getInstance(this.env).getMetaDataCache();
         metaDataCache.invalidateAll();
         cache.clearTenantCache();
@@ -1635,7 +2023,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                     }
                     region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]> emptySet());
                     // Invalidate from cache
-                    Cache<ImmutableBytesPtr,PTable> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
+                    Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
                     metaDataCache.invalidate(cacheKey);
                     if(dataTableKey != null) {
                         metaDataCache.invalidate(new ImmutableBytesPtr(dataTableKey));
@@ -1670,6 +2058,18 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                 EnvironmentEdgeManager.currentTimeMillis(), null);
     }
 
+    private static MetaDataMutationResult checkFunctionKeyInRegion(byte[] key, HRegion region) {
+        byte[] startKey = region.getStartKey();
+        byte[] endKey = region.getEndKey();
+        if (Bytes.compareTo(startKey, key) <= 0
+                && (Bytes.compareTo(HConstants.LAST_ROW, endKey) == 0 || Bytes.compareTo(key,
+                    endKey) < 0)) {
+            return null; // normal case;
+        }
+        return new MetaDataMutationResult(MutationCode.FUNCTION_NOT_IN_REGION,
+                EnvironmentEdgeManager.currentTimeMillis(), null);
+    }
+
     /**
      * Certain operations, such as DROP TABLE are not allowed if there a table has child views. This class wraps the
      * Results of a scanning the Phoenix Metadata for child views for a specific table and stores an additional flag for
@@ -1720,7 +2120,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
             byte[] tenantId = request.getTenantId().toByteArray();
             byte[] key = SchemaUtil.getTableKey(tenantId, schemaName, tableName);
             ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key);
-            Cache<ImmutableBytesPtr, PTable> metaDataCache =
+            Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache =
                     GlobalCache.getInstance(this.env).getMetaDataCache();
             metaDataCache.invalidate(cacheKey);
         } catch (Throwable t) {
@@ -1729,5 +2129,222 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                 ServerUtil.createIOException(SchemaUtil.getTableName(schemaName, tableName), t));
         }
     }
+    @Override
+    public void getFunctions(RpcController controller, GetFunctionsRequest request,
+            RpcCallback<MetaDataResponse> done) {
+        MetaDataResponse.Builder builder = MetaDataResponse.newBuilder();
+        byte[] tenantId = request.getTenantId().toByteArray();
+        List<String> functionNames = new ArrayList<>(request.getFunctionNamesCount());
+        try {
+            HRegion region = env.getRegion();
+            List<ByteString> functionNamesList = request.getFunctionNamesList();
+            List<Long> functionTimestampsList = request.getFunctionTimestampsList();
+            List<byte[]> keys = new ArrayList<byte[]>(request.getFunctionNamesCount());
+            List<Pair<byte[], Long>> functions = new ArrayList<Pair<byte[], Long>>(request.getFunctionNamesCount());
+            for(int i = 0; i< functionNamesList.size();i++) {
+                byte[] functionName = functionNamesList.get(i).toByteArray();
+                functionNames.add(Bytes.toString(functionName));
+                byte[] key = SchemaUtil.getFunctionKey(tenantId, functionName);
+                MetaDataMutationResult result = checkFunctionKeyInRegion(key, region);
+                if (result != null) {
+                    done.run(MetaDataMutationResult.toProto(result));
+                    return;
+                }
+                functions.add(new Pair<byte[], Long>(functionName,functionTimestampsList.get(i)));
+                keys.add(key);
+            }
+
+            long currentTime = EnvironmentEdgeManager.currentTimeMillis();
+            List<PFunction> functionsAvailable = doGetFunctions(keys, request.getClientTimestamp());
+            if (functionsAvailable == null) {
+                builder.setReturnCode(MetaDataProtos.MutationCode.FUNCTION_NOT_FOUND);
+                builder.setMutationTime(currentTime);
+                done.run(builder.build());
+                return;
+            }
+            builder.setReturnCode(MetaDataProtos.MutationCode.FUNCTION_ALREADY_EXISTS);
+            builder.setMutationTime(currentTime);
+
+            for (PFunction function : functionsAvailable) {
+                builder.addFunction(PFunction.toProto(function));
+            }
+            done.run(builder.build());
+            return;
+        } catch (Throwable t) {
+            logger.error("getFunctions failed", t);
+            ProtobufUtil.setControllerException(controller,
+                ServerUtil.createIOException(functionNames.toString(), t));
+        }
+    }
 
+    @Override
+    public void createFunction(RpcController controller, CreateFunctionRequest request,
+            RpcCallback<MetaDataResponse> done) {
+        MetaDataResponse.Builder builder = MetaDataResponse.newBuilder();
+        byte[][] rowKeyMetaData = new byte[2][];
+        byte[] functionName = null;
+        try {
+            List<Mutation> functionMetaData = ProtobufUtil.getMutations(request);
+            boolean temporaryFunction = request.getTemporary();
+            MetaDataUtil.getTenantIdAndFunctionName(functionMetaData, rowKeyMetaData);
+            byte[] tenantIdBytes = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
+            functionName = rowKeyMetaData[PhoenixDatabaseMetaData.FUNTION_NAME_INDEX];
+            byte[] lockKey = SchemaUtil.getFunctionKey(tenantIdBytes, functionName);
+            HRegion region = env.getRegion();
+            MetaDataMutationResult result = checkFunctionKeyInRegion(lockKey, region);
+            if (result != null) {
+                done.run(MetaDataMutationResult.toProto(result));
+                return;
+            }
+            List<RowLock> locks = Lists.newArrayList();
+            long clientTimeStamp = MetaDataUtil.getClientTimeStamp(functionMetaData);
+            try {
+                acquireLock(region, lockKey, locks);
+                // Get as of latest timestamp so we can detect if we have a newer function that already
+                // exists without making an additional query
+                ImmutableBytesPtr cacheKey = new FunctionBytesPtr(lockKey);
+                PFunction function =
+                        loadFunction(env, lockKey, cacheKey, clientTimeStamp, clientTimeStamp);
+                if (function != null) {
+                    if (function.getTimeStamp() < clientTimeStamp) {
+                        // If the function is older than the client time stamp and it's deleted,
+                        // continue
+                        if (!isFunctionDeleted(function)) {
+                            builder.setReturnCode(MetaDataProtos.MutationCode.FUNCTION_ALREADY_EXISTS);
+                            builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
+                            builder.addFunction(PFunction.toProto(function));
+                            done.run(builder.build());
+                            return;
+                        }
+                    } else {
+                        builder.setReturnCode(MetaDataProtos.MutationCode.NEWER_FUNCTION_FOUND);
+                        builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
+                        builder.addFunction(PFunction.toProto(function));
+                        done.run(builder.build());
+                        return;
+                    }
+                }
+                // Don't store function info for temporary functions.
+                if(!temporaryFunction) {
+                    region.mutateRowsWithLocks(functionMetaData, Collections.<byte[]> emptySet());
+                }
+
+                // Invalidate the cache - the next getFunction call will add it
+                // TODO: consider loading the function that was just created here, patching up the parent function, and updating the cache
+                Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
+                metaDataCache.invalidate(cacheKey);
+                // Get timeStamp from mutations - the above method sets it if it's unset
+                long currentTimeStamp = MetaDataUtil.getClientTimeStamp(functionMetaData);
+                builder.setReturnCode(MetaDataProtos.MutationCode.FUNCTION_NOT_FOUND);
+                builder.setMutationTime(currentTimeStamp);
+                done.run(builder.build());
+                return;
+            } finally {
+                region.releaseRowLocks(locks);
+            }
+        } catch (Throwable t) {
+          logger.error("createFunction failed", t);
+            ProtobufUtil.setControllerException(controller,
+                ServerUtil.createIOException(Bytes.toString(functionName), t));
+        }         
+    }
+
+    @Override
+    public void dropFunction(RpcController controller, DropFunctionRequest request,
+            RpcCallback<MetaDataResponse> done) {
+        byte[][] rowKeyMetaData = new byte[2][];
+        byte[] functionName = null;
+        try {
+            List<Mutation> functionMetaData = ProtobufUtil.getMutations(request);
+            MetaDataUtil.getTenantIdAndFunctionName(functionMetaData, rowKeyMetaData);
+            byte[] tenantIdBytes = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
+            functionName = rowKeyMetaData[PhoenixDatabaseMetaData.FUNTION_NAME_INDEX];
+            byte[] lockKey = SchemaUtil.getFunctionKey(tenantIdBytes, functionName);
+            HRegion region = env.getRegion();
+            MetaDataMutationResult result = checkFunctionKeyInRegion(lockKey, region);
+            if (result != null) {
+                done.run(MetaDataMutationResult.toProto(result));
+                return;
+            }
+            List<RowLock> locks = Lists.newArrayList();
+            long clientTimeStamp = MetaDataUtil.getClientTimeStamp(functionMetaData);
+            try {
+                acquireLock(region, lockKey, locks);
+                ImmutableBytesPtr cacheKey = new FunctionBytesPtr(lockKey);
+                List<byte[]> keys = new ArrayList<byte[]>(1);
+                keys.add(lockKey);
+                List<ImmutableBytesPtr> invalidateList = new ArrayList<ImmutableBytesPtr>();
+
+                result = doDropFunction(clientTimeStamp, keys, functionMetaData, invalidateList);
+                if (result.getMutationCode() != MutationCode.FUNCTION_ALREADY_EXISTS) {
+                    done.run(MetaDataMutationResult.toProto(result));
+                    return;
+                }
+                region.mutateRowsWithLocks(functionMetaData, Collections.<byte[]> emptySet());
+
+                Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
+                long currentTime = MetaDataUtil.getClientTimeStamp(functionMetaData);
+                for(ImmutableBytesPtr ptr: invalidateList) {
+                    metaDataCache.invalidate(ptr);
+                    metaDataCache.put(ptr, newDeletedFunctionMarker(currentTime));
+                    
+                }
+
+                done.run(MetaDataMutationResult.toProto(result));
+                return;
+            } finally {
+                region.releaseRowLocks(locks);
+            }
+        } catch (Throwable t) {
+          logger.error("dropFunction failed", t);
+            ProtobufUtil.setControllerException(controller,
+                ServerUtil.createIOException(Bytes.toString(functionName), t));
+        }         
+    }
+
+    private MetaDataMutationResult doDropFunction(long clientTimeStamp, List<byte[]> keys, List<Mutation> functionMetaData, List<ImmutableBytesPtr> invalidateList)
+            throws IOException, SQLException {
+        List<byte[]> keysClone = new ArrayList<byte[]>(keys);
+        List<PFunction> functions = doGetFunctions(keysClone, clientTimeStamp);
+        // We didn't find a table at the latest timestamp, so either there is no table or
+        // there was a table, but it's been deleted. In either case we want to return.
+        if (functions == null || functions.isEmpty()) {
+            if (buildDeletedFunction(keys.get(0), new FunctionBytesPtr(keys.get(0)), env.getRegion(), clientTimeStamp) != null) {
+                return new MetaDataMutationResult(MutationCode.FUNCTION_ALREADY_EXISTS, EnvironmentEdgeManager.currentTimeMillis(), null);
+            }
+            return new MetaDataMutationResult(MutationCode.FUNCTION_NOT_FOUND, EnvironmentEdgeManager.currentTimeMillis(), null);
+        }
+
+        if (functions != null && !functions.isEmpty()) {
+            if (functions.get(0).getTimeStamp() < clientTimeStamp) {
+                // If the function is older than the client time stamp and it's deleted,
+                // continue
+                if (isFunctionDeleted(functions.get(0))) {
+                    return new MetaDataMutationResult(MutationCode.FUNCTION_NOT_FOUND,
+                            EnvironmentEdgeManager.currentTimeMillis(), null);
+                }
+                invalidateList.add(new FunctionBytesPtr(keys.get(0)));
+                HRegion region = env.getRegion();
+                Scan scan = MetaDataUtil.newTableRowsScan(keys.get(0), MIN_TABLE_TIMESTAMP, clientTimeStamp);
+                List<Cell> results = Lists.newArrayList();
+                try (RegionScanner scanner = region.getScanner(scan);) {
+                  scanner.next(results);
+                  if (results.isEmpty()) { // Should not be possible
+                    return new MetaDataMutationResult(MutationCode.FUNCTION_NOT_FOUND, EnvironmentEdgeManager.currentTimeMillis(), null);
+                  }
+                  do {
+                    Cell kv = results.get(0);
+                    Delete delete = new Delete(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(), clientTimeStamp);
+                    functionMetaData.add(delete);
+                    results.clear();
+                    scanner.next(results);
+                  } while (!results.isEmpty());
+                }
+                return new MetaDataMutationResult(MutationCode.FUNCTION_ALREADY_EXISTS,
+                        EnvironmentEdgeManager.currentTimeMillis(), functions, true);
+            }
+        }
+        return new MetaDataMutationResult(MutationCode.FUNCTION_NOT_FOUND,
+                EnvironmentEdgeManager.currentTimeMillis(), null);
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
index 3ef6e80..2cca4bc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
@@ -17,13 +17,16 @@
  */
 package org.apache.phoenix.coprocessor;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos;
+import org.apache.phoenix.coprocessor.generated.PFunctionProtos;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataService;
 import org.apache.phoenix.hbase.index.util.VersionUtil;
+import org.apache.phoenix.parse.PFunction;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableImpl;
@@ -87,6 +90,10 @@ public abstract class MetaDataProtocol extends MetaDataService {
         UNALLOWED_TABLE_MUTATION,
         NO_PK_COLUMNS,
         PARENT_TABLE_NOT_FOUND,
+        FUNCTION_ALREADY_EXISTS,
+        FUNCTION_NOT_FOUND,
+        NEWER_FUNCTION_FOUND,
+        FUNCTION_NOT_IN_REGION,
         NO_OP
     };
 
@@ -98,6 +105,7 @@ public abstract class MetaDataProtocol extends MetaDataService {
         private byte[] columnName;
         private byte[] familyName;
         private boolean wasUpdated;
+        private List<PFunction> functions = new ArrayList<PFunction>(1);
 
         public MetaDataMutationResult() {
         }
@@ -114,12 +122,19 @@ public abstract class MetaDataProtocol extends MetaDataService {
            this(returnCode, currentTime, table, Collections.<byte[]> emptyList());
         }
 
+        public MetaDataMutationResult(MutationCode returnCode, long currentTime, List<PFunction> functions, boolean wasUpdated) {
+            this.returnCode = returnCode;
+            this.mutationTime = currentTime;
+            this.functions = functions;
+            this.wasUpdated = wasUpdated;
+         }
+
         // For testing, so that connectionless can set wasUpdated so ColumnResolver doesn't complain
         public MetaDataMutationResult(MutationCode returnCode, long currentTime, PTable table, boolean wasUpdated) {
             this(returnCode, currentTime, table, Collections.<byte[]> emptyList());
             this.wasUpdated = wasUpdated;
          }
-
+        
         public MetaDataMutationResult(MutationCode returnCode, long currentTime, PTable table, List<byte[]> tableNamesToDelete) {
             this.returnCode = returnCode;
             this.mutationTime = currentTime;
@@ -146,6 +161,10 @@ public abstract class MetaDataProtocol extends MetaDataService {
         public void setTable(PTable table) {
             this.table = table;
         }
+        
+        public void setFunction(PFunction function) {
+            this.functions.add(function);
+        }
 
         public List<byte[]> getTableNamesToDelete() {
             return tableNamesToDelete;
@@ -159,6 +178,10 @@ public abstract class MetaDataProtocol extends MetaDataService {
             return familyName;
         }
 
+        public List<PFunction> getFunctions() {
+            return functions;
+        }
+
         public static MetaDataMutationResult constructFromProto(MetaDataResponse proto) {
           MetaDataMutationResult result = new MetaDataMutationResult();
           result.returnCode = MutationCode.values()[proto.getReturnCode().ordinal()];
@@ -167,6 +190,11 @@ public abstract class MetaDataProtocol extends MetaDataService {
             result.wasUpdated = true;
             result.table = PTableImpl.createFromProto(proto.getTable());
           }
+          if (proto.getFunctionCount() > 0) {
+              result.wasUpdated = true;
+              for(PFunctionProtos.PFunction function: proto.getFunctionList())
+              result.functions.add(PFunction.createFromProto(function));
+          }
           if (proto.getTablesToDeleteCount() > 0) {
             result.tableNamesToDelete =
                 Lists.newArrayListWithExpectedSize(proto.getTablesToDeleteCount());