You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ra...@apache.org on 2015/04/27 10:33:46 UTC

[2/7] phoenix git commit: PHOENIX-538 Support UDFs(Rajeshbabu Chintaguntla)

http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
index 5eb641e..291c84c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
@@ -22,6 +22,7 @@ import java.math.BigDecimal;
 import java.sql.SQLException;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -87,11 +88,11 @@ public class ParseNodeFactory {
      * 
      * @since 0.1
      */
-    private static class BuiltInFunctionKey {
+    public static class BuiltInFunctionKey {
         private final String upperName;
         private final int argCount;
 
-        private BuiltInFunctionKey(String lowerName, int argCount) {
+        public BuiltInFunctionKey(String lowerName, int argCount) {
             this.upperName = lowerName;
             this.argCount = argCount;
         }
@@ -180,9 +181,6 @@ public class ParseNodeFactory {
     public static BuiltInFunctionInfo get(String normalizedName, List<ParseNode> children) {
         initBuiltInFunctionMap();
         BuiltInFunctionInfo info = BUILT_IN_FUNCTION_MAP.get(new BuiltInFunctionKey(normalizedName,children.size()));
-        if (info == null) {
-            throw new UnknownFunctionException(normalizedName);
-        }
         return info;
     }
 
@@ -288,8 +286,8 @@ public class ParseNodeFactory {
         return new CreateTableStatement(tableName, props, columns, pkConstraint, splits, tableType, ifNotExists, baseTableName, tableTypeIdNode, bindCount);
     }
 
-    public CreateIndexStatement createIndex(NamedNode indexName, NamedTableNode dataTable, IndexKeyConstraint ikConstraint, List<ColumnName> includeColumns, List<ParseNode> splits, ListMultimap<String,Pair<String,Object>> props, boolean ifNotExists, IndexType indexType,boolean async, int bindCount) {
-        return new CreateIndexStatement(indexName, dataTable, ikConstraint, includeColumns, splits, props, ifNotExists, indexType, async, bindCount);
+    public CreateIndexStatement createIndex(NamedNode indexName, NamedTableNode dataTable, IndexKeyConstraint ikConstraint, List<ColumnName> includeColumns, List<ParseNode> splits, ListMultimap<String,Pair<String,Object>> props, boolean ifNotExists, IndexType indexType,boolean async, int bindCount, Map<String, UDFParseNode> udfParseNodes) {
+        return new CreateIndexStatement(indexName, dataTable, ikConstraint, includeColumns, splits, props, ifNotExists, indexType, async, bindCount, udfParseNodes);
     }
 
     public CreateSequenceStatement createSequence(TableName tableName, ParseNode startsWith,
@@ -299,6 +297,14 @@ public class ParseNodeFactory {
                 maxValue, cycle, ifNotExits, bindCount);
     }
 
+    public CreateFunctionStatement createFunction(PFunction functionInfo, boolean temporary) {
+        return new CreateFunctionStatement(functionInfo, temporary);
+    }
+
+    public DropFunctionStatement dropFunction(String functionName, boolean ifExists) {
+        return new DropFunctionStatement(functionName, ifExists);
+    }
+
     public DropSequenceStatement dropSequence(TableName tableName, boolean ifExits, int bindCount){
         return new DropSequenceStatement(tableName, ifExits, bindCount);
     }
@@ -388,6 +394,9 @@ public class ParseNodeFactory {
 
     public FunctionParseNode function(String name, List<ParseNode> args) {
         BuiltInFunctionInfo info = getInfo(name, args);
+        if (info == null) {
+            return new UDFParseNode(name, args, info);
+        }
         Constructor<? extends FunctionParseNode> ctor = info.getNodeCtor();
         if (ctor == null) {
             return info.isAggregate()
@@ -411,6 +420,9 @@ public class ParseNodeFactory {
         args.addAll(valueNodes);
 
         BuiltInFunctionInfo info = getInfo(name, args);
+        if(info==null) {
+            return new UDFParseNode(name,args,info);
+        }
         Constructor<? extends FunctionParseNode> ctor = info.getNodeCtor();
         if (ctor == null) {
             return new AggregateFunctionWithinGroupParseNode(name, args, info);
@@ -657,100 +669,100 @@ public class ParseNodeFactory {
 
     public SelectStatement select(TableNode from, HintNode hint, boolean isDistinct, List<AliasedNode> select, ParseNode where,
             List<ParseNode> groupBy, ParseNode having, List<OrderByNode> orderBy, LimitNode limit, int bindCount, boolean isAggregate, 
-            boolean hasSequence, List<SelectStatement> selects) {
+            boolean hasSequence, List<SelectStatement> selects, Map<String, UDFParseNode> udfParseNodes) {
 
         return new SelectStatement(from, hint, isDistinct, select, where, groupBy == null ? Collections.<ParseNode>emptyList() : groupBy, having,
-                orderBy == null ? Collections.<OrderByNode>emptyList() : orderBy, limit, bindCount, isAggregate, hasSequence, selects == null ? Collections.<SelectStatement>emptyList() : selects);
+                orderBy == null ? Collections.<OrderByNode>emptyList() : orderBy, limit, bindCount, isAggregate, hasSequence, selects == null ? Collections.<SelectStatement>emptyList() : selects, udfParseNodes);
     } 
     
-    public UpsertStatement upsert(NamedTableNode table, HintNode hint, List<ColumnName> columns, List<ParseNode> values, SelectStatement select, int bindCount) {
-        return new UpsertStatement(table, hint, columns, values, select, bindCount);
+    public UpsertStatement upsert(NamedTableNode table, HintNode hint, List<ColumnName> columns, List<ParseNode> values, SelectStatement select, int bindCount, Map<String, UDFParseNode> udfParseNodes) {
+        return new UpsertStatement(table, hint, columns, values, select, bindCount, udfParseNodes);
     }
 
-    public DeleteStatement delete(NamedTableNode table, HintNode hint, ParseNode node, List<OrderByNode> orderBy, LimitNode limit, int bindCount) {
-        return new DeleteStatement(table, hint, node, orderBy, limit, bindCount);
+    public DeleteStatement delete(NamedTableNode table, HintNode hint, ParseNode node, List<OrderByNode> orderBy, LimitNode limit, int bindCount, Map<String, UDFParseNode> udfParseNodes) {
+        return new DeleteStatement(table, hint, node, orderBy, limit, bindCount, udfParseNodes);
     }
 
     public SelectStatement select(SelectStatement statement, ParseNode where) {
         return select(statement.getFrom(), statement.getHint(), statement.isDistinct(), statement.getSelect(), where, statement.getGroupBy(), statement.getHaving(),
-                statement.getOrderBy(), statement.getLimit(), statement.getBindCount(), statement.isAggregate(), statement.hasSequence(), statement.getSelects());
+                statement.getOrderBy(), statement.getLimit(), statement.getBindCount(), statement.isAggregate(), statement.hasSequence(), statement.getSelects(), statement.getUdfParseNodes());
     }
 
     public SelectStatement select(SelectStatement statement, ParseNode where, ParseNode having) {
         return select(statement.getFrom(), statement.getHint(), statement.isDistinct(), statement.getSelect(), where, statement.getGroupBy(), having,
-                statement.getOrderBy(), statement.getLimit(), statement.getBindCount(), statement.isAggregate(), statement.hasSequence(), statement.getSelects());
+                statement.getOrderBy(), statement.getLimit(), statement.getBindCount(), statement.isAggregate(), statement.hasSequence(), statement.getSelects(), statement.getUdfParseNodes());
     }
     
     public SelectStatement select(SelectStatement statement, List<AliasedNode> select, ParseNode where, List<ParseNode> groupBy, ParseNode having, List<OrderByNode> orderBy) {
         return select(statement.getFrom(), statement.getHint(), statement.isDistinct(), 
-                select, where, groupBy, having, orderBy, statement.getLimit(), statement.getBindCount(), statement.isAggregate(), statement.hasSequence(), statement.getSelects());
+                select, where, groupBy, having, orderBy, statement.getLimit(), statement.getBindCount(), statement.isAggregate(), statement.hasSequence(), statement.getSelects(), statement.getUdfParseNodes());
     }
     
     public SelectStatement select(SelectStatement statement, TableNode table) {
         return select(table, statement.getHint(), statement.isDistinct(), statement.getSelect(), statement.getWhere(), statement.getGroupBy(),
                 statement.getHaving(), statement.getOrderBy(), statement.getLimit(), statement.getBindCount(), statement.isAggregate(),
-                statement.hasSequence(), statement.getSelects());
+                statement.hasSequence(), statement.getSelects(), statement.getUdfParseNodes());
     }
 
     public SelectStatement select(SelectStatement statement, TableNode table, ParseNode where) {
         return select(table, statement.getHint(), statement.isDistinct(), statement.getSelect(), where, statement.getGroupBy(),
                 statement.getHaving(), statement.getOrderBy(), statement.getLimit(), statement.getBindCount(), statement.isAggregate(),
-                statement.hasSequence(), statement.getSelects());
+                statement.hasSequence(), statement.getSelects(), statement.getUdfParseNodes());
     }
 
     public SelectStatement select(SelectStatement statement, boolean isDistinct, List<AliasedNode> select) {
         return select(statement.getFrom(), statement.getHint(), isDistinct, select, statement.getWhere(), statement.getGroupBy(),
                 statement.getHaving(), statement.getOrderBy(), statement.getLimit(), statement.getBindCount(), statement.isAggregate(),
-                statement.hasSequence(), statement.getSelects());
+                statement.hasSequence(), statement.getSelects(), statement.getUdfParseNodes());
     }
 
     public SelectStatement select(SelectStatement statement, boolean isDistinct, List<AliasedNode> select, ParseNode where) {
         return select(statement.getFrom(), statement.getHint(), isDistinct, select, where, statement.getGroupBy(),
                 statement.getHaving(), statement.getOrderBy(), statement.getLimit(), statement.getBindCount(), statement.isAggregate(),
-                statement.hasSequence(), statement.getSelects());
+                statement.hasSequence(), statement.getSelects(), statement.getUdfParseNodes());
     }
 
     public SelectStatement select(SelectStatement statement, boolean isDistinct, List<AliasedNode> select, ParseNode where, List<ParseNode> groupBy, boolean isAggregate) {
         return select(statement.getFrom(), statement.getHint(), isDistinct, select, where, groupBy,
                 statement.getHaving(), statement.getOrderBy(), statement.getLimit(), statement.getBindCount(), isAggregate,
-                statement.hasSequence(), statement.getSelects());
+                statement.hasSequence(), statement.getSelects(), statement.getUdfParseNodes());
     }
 
     public SelectStatement select(SelectStatement statement, List<OrderByNode> orderBy) {
         return select(statement.getFrom(), statement.getHint(), statement.isDistinct(), statement.getSelect(),
                 statement.getWhere(), statement.getGroupBy(), statement.getHaving(), orderBy, statement.getLimit(),
-                statement.getBindCount(), statement.isAggregate(), statement.hasSequence(), statement.getSelects());
+                statement.getBindCount(), statement.isAggregate(), statement.hasSequence(), statement.getSelects(), statement.getUdfParseNodes());
     }
 
     public SelectStatement select(SelectStatement statement, HintNode hint) {
         return hint == null || hint.isEmpty() ? statement : select(statement.getFrom(), hint, statement.isDistinct(), statement.getSelect(),
                 statement.getWhere(), statement.getGroupBy(), statement.getHaving(), statement.getOrderBy(), statement.getLimit(),
-                statement.getBindCount(), statement.isAggregate(), statement.hasSequence(), statement.getSelects());
+                statement.getBindCount(), statement.isAggregate(), statement.hasSequence(), statement.getSelects(), statement.getUdfParseNodes());
     }
 
     public SelectStatement select(SelectStatement statement, HintNode hint, ParseNode where) {
         return select(statement.getFrom(), hint, statement.isDistinct(), statement.getSelect(), where, statement.getGroupBy(),
                 statement.getHaving(), statement.getOrderBy(), statement.getLimit(), statement.getBindCount(), statement.isAggregate(),
-                statement.hasSequence(), statement.getSelects());
+                statement.hasSequence(), statement.getSelects(), statement.getUdfParseNodes());
     }
 
     public SelectStatement select(SelectStatement statement, List<OrderByNode> orderBy, LimitNode limit, int bindCount, boolean isAggregate) {
         return select(statement.getFrom(), statement.getHint(), statement.isDistinct(), statement.getSelect(),
             statement.getWhere(), statement.getGroupBy(), statement.getHaving(), orderBy, limit,
-            bindCount, isAggregate || statement.isAggregate(), statement.hasSequence(), statement.getSelects());
+            bindCount, isAggregate || statement.isAggregate(), statement.hasSequence(), statement.getSelects(), statement.getUdfParseNodes());
 
     }
 
     public SelectStatement select(SelectStatement statement, LimitNode limit) {
         return select(statement.getFrom(), statement.getHint(), statement.isDistinct(), statement.getSelect(),
             statement.getWhere(), statement.getGroupBy(), statement.getHaving(), statement.getOrderBy(), limit,
-            statement.getBindCount(), statement.isAggregate(), statement.hasSequence(), statement.getSelects());
+            statement.getBindCount(), statement.isAggregate(), statement.hasSequence(), statement.getSelects(), statement.getUdfParseNodes());
     }
 
     public SelectStatement select(SelectStatement statement, List<OrderByNode> orderBy, LimitNode limit) {
         return select(statement.getFrom(), statement.getHint(), statement.isDistinct(), statement.getSelect(),
             statement.getWhere(), statement.getGroupBy(), statement.getHaving(), orderBy, limit,
-            statement.getBindCount(), statement.isAggregate(), statement.hasSequence(), statement.getSelects());
+            statement.getBindCount(), statement.isAggregate(), statement.hasSequence(), statement.getSelects(), statement.getUdfParseNodes());
     }
 
     public SelectStatement select(List<SelectStatement> statements, List<OrderByNode> orderBy, LimitNode limit, int bindCount, boolean isAggregate) {
@@ -762,8 +774,10 @@ public class ParseNodeFactory {
         // it will be done later at compile stage. Empty or different aliases 
         // are ignored, since they cannot be referred by outer queries.
         List<String> aliases = Lists.<String> newArrayList();
+        Map<String, UDFParseNode> udfParseNodes = new HashMap<String, UDFParseNode>(1);
         for (int i = 0; i < statements.size() && aliases.isEmpty(); i++) {
             SelectStatement subselect = statements.get(i);
+            udfParseNodes.putAll(subselect.getUdfParseNodes());
             if (!subselect.hasWildcard()) {
                 for (AliasedNode aliasedNode : subselect.getSelect()) {
                     String alias = aliasedNode.getAlias();
@@ -786,7 +800,7 @@ public class ParseNodeFactory {
         }
         
         return select(null, HintNode.EMPTY_HINT_NODE, false, aliasedNodes, 
-                null, null, null, orderBy, limit, bindCount, false, false, statements);
+                null, null, null, orderBy, limit, bindCount, false, false, statements, udfParseNodes);
     }
 
     public SubqueryParseNode subquery(SelectStatement select, boolean expectSingleRow) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeRewriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeRewriter.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeRewriter.java
index 4ce893d..e48967b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeRewriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeRewriter.java
@@ -151,7 +151,7 @@ public class ParseNodeRewriter extends TraverseAllParseNodeVisitor<ParseNode> {
         return NODE_FACTORY.select(normFrom, statement.getHint(), statement.isDistinct(),
                 normSelectNodes, normWhere, normGroupByNodes, normHaving, normOrderByNodes,
                 statement.getLimit(), statement.getBindCount(), statement.isAggregate(), statement.hasSequence(),
-                statement.getSelects());
+                statement.getSelects(), statement.getUdfParseNodes());
     }
 
     private Map<String, ParseNode> getAliasMap() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatement.java
index 44b24af..362e98d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatement.java
@@ -20,7 +20,9 @@ package org.apache.phoenix.parse;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.phoenix.compile.ColumnResolver;
 import org.apache.phoenix.expression.function.CountAggregateFunction;
@@ -42,7 +44,7 @@ public class SelectStatement implements FilterableStatement {
                     Collections.<AliasedNode>singletonList(new AliasedNode(null, LiteralParseNode.ONE)),
                     null, Collections.<ParseNode>emptyList(),
                     null, Collections.<OrderByNode>emptyList(),
-                    null, 0, false, false, Collections.<SelectStatement>emptyList());
+                    null, 0, false, false, Collections.<SelectStatement>emptyList(), new HashMap<String, UDFParseNode>(1));
     public static final SelectStatement COUNT_ONE =
             new SelectStatement(
                     null, null, false,
@@ -54,14 +56,14 @@ public class SelectStatement implements FilterableStatement {
                                 new BuiltInFunctionInfo(CountAggregateFunction.class, CountAggregateFunction.class.getAnnotation(BuiltInFunction.class))))),
                     null, Collections.<ParseNode>emptyList(), 
                     null, Collections.<OrderByNode>emptyList(), 
-                    null, 0, true, false, Collections.<SelectStatement>emptyList());
+                    null, 0, true, false, Collections.<SelectStatement>emptyList(), new HashMap<String, UDFParseNode>(1));
     public static SelectStatement create(SelectStatement select, HintNode hint) {
         if (select.getHint() == hint || hint.isEmpty()) {
             return select;
         }
         return new SelectStatement(select.getFrom(), hint, select.isDistinct(), 
                 select.getSelect(), select.getWhere(), select.getGroupBy(), select.getHaving(), 
-                select.getOrderBy(), select.getLimit(), select.getBindCount(), select.isAggregate(), select.hasSequence(), select.getSelects());
+                select.getOrderBy(), select.getLimit(), select.getBindCount(), select.isAggregate(), select.hasSequence(), select.getSelects(), select.getUdfParseNodes());
     }
     
     public SelectStatement combine(ParseNode where) {
@@ -73,13 +75,13 @@ public class SelectStatement implements FilterableStatement {
         }
         return new SelectStatement(this.getFrom(), this.getHint(), this.isDistinct(), 
                 this.getSelect(), where, this.getGroupBy(), this.getHaving(), 
-                this.getOrderBy(), this.getLimit(), this.getBindCount(), this.isAggregate(), this.hasSequence(), this.selects);
+                this.getOrderBy(), this.getLimit(), this.getBindCount(), this.isAggregate(), this.hasSequence(), this.selects, this.udfParseNodes);
     }
     
     public static SelectStatement create(SelectStatement select, List<AliasedNode> selects) {
         return new SelectStatement(select.getFrom(), select.getHint(), select.isDistinct(), 
                 selects, select.getWhere(), select.getGroupBy(), select.getHaving(), 
-                select.getOrderBy(), select.getLimit(), select.getBindCount(), select.isAggregate(), select.hasSequence(), select.getSelects());
+                select.getOrderBy(), select.getLimit(), select.getBindCount(), select.isAggregate(), select.hasSequence(), select.getSelects(), select.getUdfParseNodes());
     }
     
     // Copy constructor for sub select statements in a union
@@ -87,7 +89,7 @@ public class SelectStatement implements FilterableStatement {
             List<OrderByNode> orderBy, LimitNode limit, boolean isAggregate) {
         return new SelectStatement(select.getFrom(), select.getHint(), select.isDistinct(), 
                 select.getSelect(), select.getWhere(), select.getGroupBy(), select.getHaving(), 
-                orderBy, limit, select.getBindCount(), isAggregate, select.hasSequence(), select.getSelects());
+                orderBy, limit, select.getBindCount(), isAggregate, select.hasSequence(), select.getSelects(), select.getUdfParseNodes());
     }
 
     private final TableNode fromTable;
@@ -104,6 +106,7 @@ public class SelectStatement implements FilterableStatement {
     private final boolean hasSequence;
     private final boolean hasWildcard;
     private final List<SelectStatement> selects = new ArrayList<SelectStatement>();
+    private final Map<String, UDFParseNode> udfParseNodes;
     
     @Override
     public final String toString() {
@@ -216,7 +219,7 @@ public class SelectStatement implements FilterableStatement {
     
     protected SelectStatement(TableNode from, HintNode hint, boolean isDistinct, List<AliasedNode> select,
             ParseNode where, List<ParseNode> groupBy, ParseNode having, List<OrderByNode> orderBy, LimitNode limit,
-            int bindCount, boolean isAggregate, boolean hasSequence, List<SelectStatement> selects) {
+            int bindCount, boolean isAggregate, boolean hasSequence, List<SelectStatement> selects, Map<String, UDFParseNode> udfParseNodes) {
         this.fromTable = from;
         this.hint = hint == null ? HintNode.EMPTY_HINT_NODE : hint;
         this.isDistinct = isDistinct;
@@ -241,6 +244,7 @@ public class SelectStatement implements FilterableStatement {
         if (!selects.isEmpty()) {
             this.selects.addAll(selects);
         }
+        this.udfParseNodes = udfParseNodes;
     }
     
     @Override
@@ -333,4 +337,8 @@ public class SelectStatement implements FilterableStatement {
     public boolean hasWildcard() {
         return hasWildcard;
     }
+
+    public Map<String, UDFParseNode> getUdfParseNodes() {
+        return udfParseNodes;
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/parse/UDFParseNode.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/UDFParseNode.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/UDFParseNode.java
new file mode 100644
index 0000000..c0b972f
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/UDFParseNode.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.parse;
+
+import java.util.List;
+
+public class UDFParseNode extends FunctionParseNode {
+
+    public UDFParseNode(String name, List<ParseNode> children, BuiltInFunctionInfo info) {
+        super(name, children, info);
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/parse/UpsertStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/UpsertStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/UpsertStatement.java
index fc299d1..48698bd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/UpsertStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/UpsertStatement.java
@@ -19,15 +19,18 @@ package org.apache.phoenix.parse;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 
-public class UpsertStatement extends DMLStatement { 
+public class UpsertStatement extends DMLStatement {
     private final List<ColumnName> columns;
     private final List<ParseNode> values;
     private final SelectStatement select;
     private final HintNode hint;
 
-    public UpsertStatement(NamedTableNode table, HintNode hint, List<ColumnName> columns, List<ParseNode> values, SelectStatement select, int bindCount) {
-        super(table, bindCount);
+    public UpsertStatement(NamedTableNode table, HintNode hint, List<ColumnName> columns,
+            List<ParseNode> values, SelectStatement select, int bindCount,
+            Map<String, UDFParseNode> udfParseNodes) {
+        super(table, bindCount, udfParseNodes);
         this.columns = columns == null ? Collections.<ColumnName>emptyList() : columns;
         this.values = values;
         this.select = select;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/protobuf/ProtobufUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/protobuf/ProtobufUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/protobuf/ProtobufUtil.java
index c12c64d..f4a60bc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/protobuf/ProtobufUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/protobuf/ProtobufUtil.java
@@ -71,6 +71,16 @@ public class ProtobufUtil {
         return getMutations(request.getTableMetadataMutationsList());
     }
 
+    public static List<Mutation> getMutations(MetaDataProtos.DropFunctionRequest request)
+            throws IOException {
+        return getMutations(request.getTableMetadataMutationsList());
+    }
+
+    public static List<Mutation> getMutations(MetaDataProtos.CreateFunctionRequest request)
+            throws IOException {
+        return getMutations(request.getTableMetadataMutationsList());
+    }
+
     public static List<Mutation> getMutations(MetaDataProtos.DropTableRequest request)
             throws IOException {
         return getMutations(request.getTableMetadataMutationsList());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
index 09705c6..dc51b10 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
@@ -35,6 +35,7 @@ import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
 import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
 import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.parse.PFunction;
 import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableType;
@@ -70,8 +71,10 @@ public interface ConnectionQueryServices extends QueryServices, MetaDataMutated
     public PhoenixConnection connect(String url, Properties info) throws SQLException;
 
     public MetaDataMutationResult getTable(PName tenantId, byte[] schemaName, byte[] tableName, long tableTimestamp, long clientTimetamp) throws SQLException;
+    public MetaDataMutationResult getFunctions(PName tenantId, List<Pair<byte[], Long>> functionNameAndTimeStampPairs, long clientTimestamp) throws SQLException;
     public MetaDataMutationResult createTable(List<Mutation> tableMetaData, byte[] tableName, PTableType tableType, Map<String,Object> tableProps, List<Pair<byte[],Map<String,Object>>> families, byte[][] splits) throws SQLException;
     public MetaDataMutationResult dropTable(List<Mutation> tableMetadata, PTableType tableType, boolean cascade) throws SQLException;
+    public MetaDataMutationResult dropFunction(List<Mutation> tableMetadata, boolean ifExists) throws SQLException;
     public MetaDataMutationResult addColumn(List<Mutation> tableMetaData, PTable table, Map<String, List<Pair<String,Object>>> properties, Set<String> colFamiliesForPColumnsToBeAdded) throws SQLException;
     public MetaDataMutationResult dropColumn(List<Mutation> tableMetadata, PTableType tableType) throws SQLException;
     public MetaDataMutationResult updateIndexState(List<Mutation> tableMetadata, String parentTableName) throws SQLException;
@@ -93,6 +96,7 @@ public interface ConnectionQueryServices extends QueryServices, MetaDataMutated
     long currentSequenceValue(SequenceKey sequenceKey, long timestamp) throws SQLException;
     void returnSequences(List<SequenceKey> sequenceKeys, long timestamp, SQLException[] exceptions) throws SQLException;
 
+    MetaDataMutationResult createFunction(List<Mutation> functionData, PFunction function, boolean temporary) throws SQLException;
     void addConnection(PhoenixConnection connection) throws SQLException;
     void removeConnection(PhoenixConnection connection) throws SQLException;
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 49c946a..30b43d5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -87,9 +87,12 @@ import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheRequest
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheResponse;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearTableFromCacheRequest;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearTableFromCacheResponse;
+import org.apache.phoenix.coprocessor.generated.MetaDataProtos.CreateFunctionRequest;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.CreateTableRequest;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.DropColumnRequest;
+import org.apache.phoenix.coprocessor.generated.MetaDataProtos.DropFunctionRequest;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.DropTableRequest;
+import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetFunctionsRequest;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetTableRequest;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetVersionRequest;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetVersionResponse;
@@ -110,9 +113,11 @@ import org.apache.phoenix.index.PhoenixIndexCodec;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
+import org.apache.phoenix.parse.PFunction;
 import org.apache.phoenix.protobuf.ProtobufUtil;
 import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
 import org.apache.phoenix.schema.EmptySequenceCacheException;
+import org.apache.phoenix.schema.FunctionNotFoundException;
 import org.apache.phoenix.schema.MetaDataSplitPolicy;
 import org.apache.phoenix.schema.NewerTableAlreadyExistsException;
 import org.apache.phoenix.schema.PColumn;
@@ -220,6 +225,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                 QueryServicesOptions.DEFAULT_MAX_CLIENT_METADATA_CACHE_SIZE);
         return new PMetaDataImpl(INITIAL_META_DATA_TABLE_CAPACITY, maxSizeBytes);
     }
+    
     /**
      * Construct a ConnectionQueryServicesImpl that represents a connection to an HBase
      * cluster.
@@ -562,6 +568,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
         if (metadata == null) {
             throwConnectionClosedException();
         }
+
         return new PhoenixConnection(this, url, info, metadata);
     }
 
@@ -691,12 +698,14 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
 
             // Setup split policy on Phoenix metadata table to ensure that the key values of a Phoenix table
             // stay on the same region.
-            if (SchemaUtil.isMetaTable(tableName)) {
+            if (SchemaUtil.isMetaTable(tableName) || SchemaUtil.isFunctionTable(tableName)) {
                 if (!descriptor.hasCoprocessor(MetaDataEndpointImpl.class.getName())) {
                     descriptor.addCoprocessor(MetaDataEndpointImpl.class.getName(), null, priority, null);
                 }
-                if (!descriptor.hasCoprocessor(MetaDataRegionObserver.class.getName())) {
-                    descriptor.addCoprocessor(MetaDataRegionObserver.class.getName(), null, priority + 1, null);
+                if(SchemaUtil.isMetaTable(tableName) ) {
+                    if (!descriptor.hasCoprocessor(MetaDataRegionObserver.class.getName())) {
+                        descriptor.addCoprocessor(MetaDataRegionObserver.class.getName(), null, priority + 1, null);
+                    }
                 }
             } else if (SchemaUtil.isSequenceTable(tableName)) {
                 if (!descriptor.hasCoprocessor(SequenceRegionObserver.class.getName())) {
@@ -992,23 +1001,33 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
      */
     private MetaDataMutationResult metaDataCoprocessorExec(byte[] tableKey,
             Batch.Call<MetaDataService, MetaDataResponse> callable) throws SQLException {
+        return metaDataCoprocessorExec(tableKey, callable, PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);
+    }
+        /**
+         * Invoke meta data coprocessor with one retry if the key was found to not be in the regions
+         * (due to a table split)
+         */
+        private MetaDataMutationResult metaDataCoprocessorExec(byte[] tableKey,
+                Batch.Call<MetaDataService, MetaDataResponse> callable, byte[] tableName) throws SQLException {
+
         try {
             boolean retried = false;
             while (true) {
                 if (retried) {
                     connection.relocateRegion(
-                        TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES),
+                        TableName.valueOf(tableName),
                         tableKey);
                 }
 
-                HTableInterface ht = this.getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);
+                HTableInterface ht = this.getTable(tableName);
                 try {
                     final Map<byte[], MetaDataResponse> results =
                             ht.coprocessorService(MetaDataService.class, tableKey, tableKey, callable);
 
                     assert(results.size() == 1);
                     MetaDataResponse result = results.values().iterator().next();
-                    if (result.getReturnCode() == MetaDataProtos.MutationCode.TABLE_NOT_IN_REGION) {
+                    if (result.getReturnCode() == MetaDataProtos.MutationCode.TABLE_NOT_IN_REGION
+                            || result.getReturnCode() == MetaDataProtos.MutationCode.FUNCTION_NOT_IN_REGION) {
                         if (retried) return MetaDataMutationResult.constructFromProto(result);
                         retried = true;
                         continue;
@@ -1331,6 +1350,37 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
           return result;
     }
 
+    @Override
+    public MetaDataMutationResult dropFunction(final List<Mutation> functionData, final boolean ifExists) throws SQLException {
+        byte[][] rowKeyMetadata = new byte[2][];
+        byte[] key = functionData.get(0).getRow();
+        SchemaUtil.getVarChars(key, rowKeyMetadata);
+        byte[] tenantIdBytes = rowKeyMetadata[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
+        byte[] functionBytes = rowKeyMetadata[PhoenixDatabaseMetaData.FUNTION_NAME_INDEX];
+        byte[] functionKey = SchemaUtil.getFunctionKey(tenantIdBytes, functionBytes);
+        
+        final MetaDataMutationResult result =  metaDataCoprocessorExec(functionKey,
+                new Batch.Call<MetaDataService, MetaDataResponse>() {
+                    @Override
+                    public MetaDataResponse call(MetaDataService instance) throws IOException {
+                        ServerRpcController controller = new ServerRpcController();
+                        BlockingRpcCallback<MetaDataResponse> rpcCallback =
+                                new BlockingRpcCallback<MetaDataResponse>();
+                        DropFunctionRequest.Builder builder = DropFunctionRequest.newBuilder();
+                        for (Mutation m : functionData) {
+                            MutationProto mp = ProtobufUtil.toProto(m);
+                            builder.addTableMetadataMutations(mp.toByteString());
+                        }
+                        builder.setIfExists(ifExists);
+                        instance.dropFunction(controller, builder.build(), rpcCallback);
+                        if(controller.getFailedOn() != null) {
+                            throw controller.getFailedOn();
+                        }
+                        return rpcCallback.get();
+                    }
+                }, PhoenixDatabaseMetaData.SYSTEM_FUNCTION_NAME_BYTES);
+        return result;
+    }
     private void invalidateTables(final List<byte[]> tableNamesToDelete) {
         if (tableNamesToDelete != null) {
             for ( byte[] tableName : tableNamesToDelete ) {
@@ -1944,6 +1994,13 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                                         PhoenixDatabaseMetaData.GUIDE_POSTS_ROW_COUNT + " "
                                                 + PLong.INSTANCE.getSqlTypeName());
                             }
+                            try {
+                                metaConnection.createStatement().executeUpdate(
+                                    QueryConstants.CREATE_FUNCTION_METADATA);
+                            } catch (NewerTableAlreadyExistsException e) {
+                            } catch (TableAlreadyExistsException e) {
+                            }
+
                         } catch (Exception e) {
                             if (e instanceof SQLException) {
                                 initializationException = (SQLException)e;
@@ -2540,4 +2597,96 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
     public int getSequenceSaltBuckets() {
         return nSequenceSaltBuckets;
     }
+
+    @Override
+    public PMetaData addFunction(PFunction function) throws SQLException {
+        synchronized (latestMetaDataLock) {
+            try {
+                throwConnectionClosedIfNullMetaData();
+                // If existing table isn't older than new table, don't replace
+                // If a client opens a connection at an earlier timestamp, this can happen
+                PFunction existingFunction = latestMetaData.getFunction(new PTableKey(function.getTenantId(), function.getFunctionName()));
+                if (existingFunction.getTimeStamp() >= function.getTimeStamp()) {
+                    return latestMetaData;
+                }
+            } catch (FunctionNotFoundException e) {}
+            latestMetaData = latestMetaData.addFunction(function);
+            latestMetaDataLock.notifyAll();
+            return latestMetaData;
+        }
+    }
+
+    @Override
+    public PMetaData removeFunction(PName tenantId, String function, long functionTimeStamp)
+            throws SQLException {
+        synchronized (latestMetaDataLock) {
+            throwConnectionClosedIfNullMetaData();
+            latestMetaData = latestMetaData.removeFunction(tenantId, function, functionTimeStamp);
+            latestMetaDataLock.notifyAll();
+            return latestMetaData;
+        }
+    }
+
+    @Override
+    public MetaDataMutationResult getFunctions(PName tenantId, final List<Pair<byte[], Long>> functions,
+            final long clientTimestamp) throws SQLException {
+        final byte[] tenantIdBytes = tenantId == null ? ByteUtil.EMPTY_BYTE_ARRAY : tenantId.getBytes();
+        return metaDataCoprocessorExec(tenantIdBytes,
+            new Batch.Call<MetaDataService, MetaDataResponse>() {
+                @Override
+                public MetaDataResponse call(MetaDataService instance) throws IOException {
+                    ServerRpcController controller = new ServerRpcController();
+                    BlockingRpcCallback<MetaDataResponse> rpcCallback =
+                            new BlockingRpcCallback<MetaDataResponse>();
+                    GetFunctionsRequest.Builder builder = GetFunctionsRequest.newBuilder();
+                    builder.setTenantId(HBaseZeroCopyByteString.wrap(tenantIdBytes));
+                    for(Pair<byte[], Long> function: functions) {
+                        builder.addFunctionNames(HBaseZeroCopyByteString.wrap(function.getFirst()));
+                        builder.addFunctionTimestamps(function.getSecond().longValue());
+                    }
+                    builder.setClientTimestamp(clientTimestamp);
+
+                   instance.getFunctions(controller, builder.build(), rpcCallback);
+                   if(controller.getFailedOn() != null) {
+                       throw controller.getFailedOn();
+                   }
+                   return rpcCallback.get();
+                }
+            }, PhoenixDatabaseMetaData.SYSTEM_FUNCTION_NAME_BYTES);
+
+    }
+
+    // TODO the mutations should be added to System functions table.
+    @Override
+    public MetaDataMutationResult createFunction(final List<Mutation> functionData,
+            final PFunction function, final boolean temporary) throws SQLException {
+        byte[][] rowKeyMetadata = new byte[2][];
+        Mutation m = MetaDataUtil.getPutOnlyTableHeaderRow(functionData);
+        byte[] key = m.getRow();
+        SchemaUtil.getVarChars(key, rowKeyMetadata);
+        byte[] tenantIdBytes = rowKeyMetadata[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
+        byte[] functionBytes = rowKeyMetadata[PhoenixDatabaseMetaData.FUNTION_NAME_INDEX];
+        byte[] functionKey = SchemaUtil.getFunctionKey(tenantIdBytes, functionBytes);
+        MetaDataMutationResult result = metaDataCoprocessorExec(functionKey,
+            new Batch.Call<MetaDataService, MetaDataResponse>() {
+                    @Override
+                    public MetaDataResponse call(MetaDataService instance) throws IOException {
+                        ServerRpcController controller = new ServerRpcController();
+                        BlockingRpcCallback<MetaDataResponse> rpcCallback =
+                                new BlockingRpcCallback<MetaDataResponse>();
+                        CreateFunctionRequest.Builder builder = CreateFunctionRequest.newBuilder();
+                        for (Mutation m : functionData) {
+                            MutationProto mp = ProtobufUtil.toProto(m);
+                            builder.addTableMetadataMutations(mp.toByteString());
+                        }
+                        builder.setTemporary(temporary);
+                        instance.createFunction(controller, builder.build(), rpcCallback);
+                        if(controller.getFailedOn() != null) {
+                            throw controller.getFailedOn();
+                        }
+                        return rpcCallback.get();
+                    }
+        }, PhoenixDatabaseMetaData.SYSTEM_FUNCTION_NAME_BYTES);
+        return result;
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
index 742c38e..4d582be 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.query;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_STATE_BYTES;
 
 import java.sql.SQLException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -51,6 +52,8 @@ import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
+import org.apache.phoenix.parse.PFunction;
+import org.apache.phoenix.schema.FunctionNotFoundException;
 import org.apache.phoenix.schema.NewerTableAlreadyExistsException;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PIndexState;
@@ -104,6 +107,7 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
         super(queryServices);
         userName = connInfo.getPrincipal();
         metaData = newEmptyMetaData();
+        
         // Use KeyValueBuilder that builds real KeyValues, as our test utils require this
         this.kvBuilder = GenericKeyValueBuilder.INSTANCE;
     }
@@ -280,6 +284,11 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
                     // A TableAlreadyExistsException is not thrown, since the table only exists *after* this
                     // fixed timestamp.
                 }
+                
+                try {
+                   metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_FUNCTION_METADATA);
+                } catch (NewerTableAlreadyExistsException ignore) {
+                }
             } catch (SQLException e) {
                 sqlE = e;
             } finally {
@@ -479,5 +488,46 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
         return getProps().getInt(QueryServices.SEQUENCE_SALT_BUCKETS_ATTRIB,
                 QueryServicesOptions.DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS);
     }
- 
+
+    @Override
+    public MetaDataMutationResult createFunction(List<Mutation> functionData, PFunction function, boolean temporary)
+            throws SQLException {
+        return new MetaDataMutationResult(MutationCode.FUNCTION_NOT_FOUND, 0l, null);
+    }
+
+    @Override
+    public PMetaData addFunction(PFunction function) throws SQLException {
+        return metaData = this.metaData.addFunction(function);
+    }
+
+    @Override
+    public PMetaData removeFunction(PName tenantId, String function, long functionTimeStamp)
+            throws SQLException {
+        return metaData = this.metaData.removeFunction(tenantId, function, functionTimeStamp);
+    }
+
+    @Override
+    public MetaDataMutationResult getFunctions(PName tenantId,
+            List<Pair<byte[], Long>> functionNameAndTimeStampPairs, long clientTimestamp)
+            throws SQLException {
+        List<PFunction> functions = new ArrayList<PFunction>(functionNameAndTimeStampPairs.size());
+        for(Pair<byte[], Long> functionInfo: functionNameAndTimeStampPairs) {
+            try {
+                PFunction function2 = metaData.getFunction(new PTableKey(tenantId, Bytes.toString(functionInfo.getFirst())));
+                functions.add(function2);
+            } catch (FunctionNotFoundException e) {
+                return new MetaDataMutationResult(MutationCode.FUNCTION_NOT_FOUND, 0, null);
+            }
+        }
+        if(functions.isEmpty()) {
+            return null;
+        }
+        return new MetaDataMutationResult(MutationCode.FUNCTION_ALREADY_EXISTS, 0, functions, true);
+    }
+
+    @Override
+    public MetaDataMutationResult dropFunction(List<Mutation> tableMetadata, boolean ifExists)
+            throws SQLException {
+        return new MetaDataMutationResult(MutationCode.FUNCTION_ALREADY_EXISTS, 0, null);
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
index e2c9544..2a98cd5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
@@ -35,6 +35,7 @@ import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
 import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
 import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.parse.PFunction;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PMetaData;
 import org.apache.phoenix.schema.PName;
@@ -250,4 +251,34 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple
     public int getSequenceSaltBuckets() {
         return getDelegate().getSequenceSaltBuckets();
     }
+
+    @Override
+    public MetaDataMutationResult createFunction(List<Mutation> functionData, PFunction function, boolean temporary)
+            throws SQLException {
+        return getDelegate().createFunction(functionData, function, temporary);
+    }
+
+    @Override
+    public PMetaData addFunction(PFunction function) throws SQLException {
+        return getDelegate().addFunction(function);
+    }
+
+    @Override
+    public PMetaData removeFunction(PName tenantId, String function, long functionTimeStamp)
+            throws SQLException {
+        return getDelegate().removeFunction(tenantId, function, functionTimeStamp);
+    }
+
+    @Override
+    public MetaDataMutationResult getFunctions(PName tenantId,
+            List<Pair<byte[], Long>> functionNameAndTimeStampPairs, long clientTimestamp)
+            throws SQLException {
+        return getDelegate().getFunctions(tenantId, functionNameAndTimeStampPairs, clientTimestamp);
+    }
+
+    @Override
+    public MetaDataMutationResult dropFunction(List<Mutation> tableMetadata, boolean ifExists)
+            throws SQLException {
+        return getDelegate().dropFunction(tableMetadata, ifExists);
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/query/MetaDataMutated.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/MetaDataMutated.java b/phoenix-core/src/main/java/org/apache/phoenix/query/MetaDataMutated.java
index ae37ac6..76e7593 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/MetaDataMutated.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/MetaDataMutated.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.query;
 import java.sql.SQLException;
 import java.util.List;
 
+import org.apache.phoenix.parse.PFunction;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PMetaData;
 import org.apache.phoenix.schema.PName;
@@ -38,4 +39,6 @@ public interface MetaDataMutated {
     PMetaData removeTable(PName tenantId, String tableName, String parentTableName, long tableTimeStamp) throws SQLException;
     PMetaData addColumn(PName tenantId, String tableName, List<PColumn> columns, long tableTimeStamp, long tableSeqNum, boolean isImmutableRows, boolean isWalDisabled, boolean isMultitenant, boolean storeNulls) throws SQLException;
     PMetaData removeColumn(PName tenantId, String tableName, List<PColumn> columnsToRemove, long tableTimeStamp, long tableSeqNum) throws SQLException;
+    PMetaData addFunction(PFunction function) throws SQLException;
+    PMetaData removeFunction(PName tenantId, String function, long functionTimeStamp) throws SQLException;
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
index 6470b72..73d1123 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -86,6 +86,17 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_TYPE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TYPE_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TYPE_SEQUENCE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLASS_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_FUNCTION_TABLE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ARG_POSITION;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.FUNCTION_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.JAR_PATH;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NUM_ARGS;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_ARRAY;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_CONSTANT;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_VALUE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TYPE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.RETURN_TYPE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_CONSTANT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_STATEMENT;
@@ -276,4 +287,29 @@ public interface QueryConstants {
             " CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY (" + TENANT_ID + "," + SEQUENCE_SCHEMA + "," + SEQUENCE_NAME + "))\n" +
             HConstants.VERSIONS + "=" + MetaDataProtocol.DEFAULT_MAX_META_DATA_VERSIONS + ",\n" +
             HColumnDescriptor.KEEP_DELETED_CELLS + "="  + MetaDataProtocol.DEFAULT_META_DATA_KEEP_DELETED_CELLS + "\n";
+
+    public static final String CREATE_FUNCTION_METADATA =
+            "CREATE TABLE " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_FUNCTION_TABLE + "\"(\n" +
+             // Pk columns
+            TENANT_ID + " VARCHAR NULL," +
+            FUNCTION_NAME + " VARCHAR NOT NULL, \n" +
+            NUM_ARGS + " INTEGER, \n" +
+            // Function metadata (will be null for argument row)
+            CLASS_NAME +  " VARCHAR, \n" +
+            JAR_PATH + "  VARCHAR, \n" +
+            RETURN_TYPE + " VARCHAR, \n" +
+            // Argument metadata (will be null for function row)
+            TYPE + " VARCHAR, \n" +
+            ARG_POSITION + " VARBINARY, \n" +
+            IS_ARRAY + " BOOLEAN, \n" +
+            IS_CONSTANT + " BOOLEAN, \n" +
+            DEFAULT_VALUE + " VARCHAR, \n" +
+            MIN_VALUE + " VARCHAR, \n" +
+            MAX_VALUE + " VARCHAR, \n" +
+            " CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY (" + TENANT_ID + ", " + FUNCTION_NAME + ", " + TYPE + ", " + ARG_POSITION + "))\n" +
+            HConstants.VERSIONS + "=" + MetaDataProtocol.DEFAULT_MAX_META_DATA_VERSIONS + ",\n" +
+            HColumnDescriptor.KEEP_DELETED_CELLS + "="  + MetaDataProtocol.DEFAULT_META_DATA_KEEP_DELETED_CELLS + ",\n"+
+            // Install split policy to prevent a tenant's metadata from being split across regions.
+            HTableDescriptor.SPLIT_POLICY + "='" + MetaDataSplitPolicy.class.getName() + "'\n";
+
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 4b793d1..9183a70 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -103,6 +103,7 @@ public interface QueryServices extends SQLCloseable {
     public static final String REGIONSERVER_INFO_PORT_ATTRIB = "hbase.regionserver.info.port";
     public static final String REGIONSERVER_LEASE_PERIOD_ATTRIB = "hbase.regionserver.lease.period";
     public static final String RPC_TIMEOUT_ATTRIB = "hbase.rpc.timeout";
+    public static final String DYNAMIC_JARS_DIR_KEY = "hbase.dynamic.jars.dir";
     public static final String ZOOKEEPER_QUARUM_ATTRIB = "hbase.zookeeper.quorum";
     public static final String ZOOKEEPER_PORT_ATTRIB = "hbase.zookeeper.property.clientPort";
     public static final String ZOOKEEPER_ROOT_NODE_ATTRIB = "zookeeper.znode.parent";
@@ -165,6 +166,7 @@ public interface QueryServices extends SQLCloseable {
     public static final String METADATA_HANDLER_COUNT_ATTRIB = "phoenix.rpc.metadata.handler.count";
     
     public static final String FORCE_ROW_KEY_ORDER_ATTRIB = "phoenix.query.force.rowkeyorder";
+    public static final String ALLOW_USER_DEFINED_FUNCTIONS_ATTRIB = "phoenix.functions.allowUserDefinedFunctions";
     
 
     /**

http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index b98c9ee..972bf26 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -18,6 +18,7 @@
 package org.apache.phoenix.query;
 
 import static org.apache.phoenix.query.QueryServices.ALLOW_ONLINE_TABLE_SCHEMA_UPDATE;
+import static org.apache.phoenix.query.QueryServices.ALLOW_USER_DEFINED_FUNCTIONS_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.CALL_QUEUE_PRODUCER_ATTRIB_NAME;
 import static org.apache.phoenix.query.QueryServices.CALL_QUEUE_ROUND_ROBIN_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.DATE_FORMAT_ATTRIB;
@@ -195,6 +196,7 @@ public class QueryServicesOptions {
 
     public static final boolean DEFAULT_USE_BYTE_BASED_REGEX = false;
     public static final boolean DEFAULT_FORCE_ROW_KEY_ORDER = false;
+    public static final boolean DEFAULT_ALLOW_USER_DEFINED_FUNCTIONS = false;
 
     private final Configuration config;
 
@@ -250,7 +252,7 @@ public class QueryServicesOptions {
             .setIfUnset(METRICS_ENABLED, DEFAULT_IS_METRICS_ENABLED)
             .setIfUnset(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, DEFAULT_CLIENT_RPC_CONTROLLER_FACTORY)
             .setIfUnset(USE_BYTE_BASED_REGEX_ATTRIB, DEFAULT_USE_BYTE_BASED_REGEX)
-            .setIfUnset(FORCE_ROW_KEY_ORDER_ATTRIB, DEFAULT_FORCE_ROW_KEY_ORDER)
+            .setIfUnset(FORCE_ROW_KEY_ORDER_ATTRIB, DEFAULT_FORCE_ROW_KEY_ORDER);
             ;
         // HBase sets this to 1, so we reset it to something more appropriate.
         // Hopefully HBase will change this, because we can't know if a user set

http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/schema/FunctionAlreadyExistsException.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/FunctionAlreadyExistsException.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/FunctionAlreadyExistsException.java
new file mode 100644
index 0000000..91b9d07
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/FunctionAlreadyExistsException.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema;
+
+import java.sql.SQLException;
+
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.parse.PFunction;
+
+public class FunctionAlreadyExistsException extends SQLException {
+    private static final long serialVersionUID = 1L;
+    private static SQLExceptionCode code = SQLExceptionCode.FUNCTION_ALREADY_EXIST;
+    private final PFunction function;
+    private final String functionName;
+
+    public FunctionAlreadyExistsException(String functionName) {
+        this(functionName, null, null);
+    }
+
+    public FunctionAlreadyExistsException(String functionName, String msg) {
+        this(functionName, msg, null);
+    }
+
+    public FunctionAlreadyExistsException(String functionName, PFunction function) {
+        this(functionName, null, function);
+    }
+
+    public FunctionAlreadyExistsException(String functionName, String msg, PFunction function) {
+        super(new SQLExceptionInfo.Builder(code).setFunctionName(functionName).setMessage(msg).build().toString(),
+                code.getSQLState(), code.getErrorCode());
+        this.functionName = functionName;
+        this.function = function;
+    }
+
+    public String getFunctionName() {
+        return functionName;
+    }
+    
+    public PFunction getFunction() {
+        return function;
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/schema/FunctionNotFoundException.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/FunctionNotFoundException.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/FunctionNotFoundException.java
new file mode 100644
index 0000000..73e23be
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/FunctionNotFoundException.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+
+public class FunctionNotFoundException extends MetaDataEntityNotFoundException {
+    private static final long serialVersionUID = 1L;
+    private static SQLExceptionCode code = SQLExceptionCode.FUNCTION_UNDEFINED;
+    private final String functionName;
+    private final long timestamp;
+
+    public FunctionNotFoundException(FunctionNotFoundException e, long timestamp) {
+        this(e.functionName, timestamp);
+    }
+
+    public FunctionNotFoundException(String functionName) {
+        this(functionName, HConstants.LATEST_TIMESTAMP);
+    }
+    
+    public FunctionNotFoundException(String functionName, long timestamp) {
+        super(new SQLExceptionInfo.Builder(code).setFunctionName(functionName).build().toString(),
+                code.getSQLState(), code.getErrorCode(), null);
+        this.functionName = functionName;
+        this.timestamp = timestamp;
+    }
+
+    public String getFunctionName() {
+        return functionName;
+    }
+
+    public long getTimeStamp() {
+        return timestamp;
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 22208f1..fcdb651 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -32,15 +32,20 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TABLE_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TYPE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DECIMAL_DIGITS;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAMILY_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_VALUE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DISABLE_WAL;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IMMUTABLE_ROWS;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_STATE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_TYPE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_ARRAY;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_CONSTANT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_VIEW_REFERENCED;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.KEY_SEQ;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LAST_STATS_UPDATE_TIME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LINK_TYPE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MAX_VALUE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MIN_VALUE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MULTI_TENANT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NULLABLE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ORDINAL_POSITION;
@@ -62,10 +67,20 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_CONSTANT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_STATEMENT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_TYPE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ARG_POSITION;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NUM_ARGS;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_FUNCTION_TABLE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.FUNCTION_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLASS_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.JAR_PATH;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TYPE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.RETURN_TYPE;
 import static org.apache.phoenix.query.QueryServices.DROP_METADATA_ATTRIB;
 import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_DROP_METADATA;
 
 import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.sql.Array;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.ParameterMetaData;
@@ -75,6 +90,7 @@ import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.SQLFeatureNotSupportedException;
 import java.sql.Types;
+import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.Collection;
 import java.util.Collections;
@@ -99,6 +115,7 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.DynamicClassLoader;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.compile.ColumnResolver;
 import org.apache.phoenix.compile.ExplainPlan;
@@ -120,6 +137,8 @@ import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.expression.Determinism;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.RowKeyColumnExpression;
+import org.apache.phoenix.expression.function.FunctionExpression;
+import org.apache.phoenix.expression.function.ScalarFunction;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.jdbc.PhoenixConnection;
@@ -130,21 +149,27 @@ import org.apache.phoenix.parse.AddColumnStatement;
 import org.apache.phoenix.parse.AlterIndexStatement;
 import org.apache.phoenix.parse.ColumnDef;
 import org.apache.phoenix.parse.ColumnName;
+import org.apache.phoenix.parse.CreateFunctionStatement;
 import org.apache.phoenix.parse.CreateIndexStatement;
 import org.apache.phoenix.parse.CreateSequenceStatement;
 import org.apache.phoenix.parse.CreateTableStatement;
 import org.apache.phoenix.parse.DropColumnStatement;
+import org.apache.phoenix.parse.DropFunctionStatement;
 import org.apache.phoenix.parse.DropIndexStatement;
 import org.apache.phoenix.parse.DropSequenceStatement;
 import org.apache.phoenix.parse.DropTableStatement;
+import org.apache.phoenix.parse.FunctionParseNode;
 import org.apache.phoenix.parse.IndexKeyConstraint;
 import org.apache.phoenix.parse.NamedTableNode;
+import org.apache.phoenix.parse.PFunction;
 import org.apache.phoenix.parse.ParseNode;
 import org.apache.phoenix.parse.ParseNodeFactory;
 import org.apache.phoenix.parse.PrimaryKeyConstraint;
 import org.apache.phoenix.parse.TableName;
 import org.apache.phoenix.parse.UpdateStatisticsStatement;
+import org.apache.phoenix.parse.PFunction.FunctionArgument;
 import org.apache.phoenix.query.ConnectionQueryServices.Feature;
+import org.apache.phoenix.query.HBaseFactoryProvider;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
@@ -280,6 +305,28 @@ public class MetaDataClient {
         COLUMN_FAMILY + "," +
         ORDINAL_POSITION +
         ") VALUES (?, ?, ?, ?, ?, ?)";
+    private static final String CREATE_FUNCTION =
+            "UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_FUNCTION_TABLE + "\" ( " +
+            TENANT_ID +","+
+            FUNCTION_NAME + "," +
+            NUM_ARGS + "," +
+            CLASS_NAME + "," +
+            JAR_PATH + "," +
+            RETURN_TYPE +
+            ") VALUES (?, ?, ?, ?, ?, ?)";
+    private static final String INSERT_FUNCTION_ARGUMENT =
+            "UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_FUNCTION_TABLE + "\" ( " +
+            TENANT_ID +","+
+            FUNCTION_NAME + "," +
+            TYPE + "," +
+            ARG_POSITION +","+
+            IS_ARRAY + "," +
+            IS_CONSTANT  + "," +
+            DEFAULT_VALUE + "," +
+            MIN_VALUE + "," +
+            MAX_VALUE +
+            ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
+
 
     private final PhoenixConnection connection;
 
@@ -315,6 +362,24 @@ public class MetaDataClient {
         return updateCache(tenantId, schemaName, tableName, false);
     }
 
+    /**
+     * Update the cache with the latest as of the connection scn.
+     * @param functioNames
+     * @return the timestamp from the server, negative if the function was added to the cache and positive otherwise
+     * @throws SQLException
+     */
+    public MetaDataMutationResult updateCache(List<String> functionNames) throws SQLException {
+        return updateCache(functionNames, false);
+    }
+
+    private MetaDataMutationResult updateCache(List<String> functionNames, boolean alwaysHitServer) throws SQLException {
+        return updateCache(connection.getTenantId(), functionNames, alwaysHitServer);
+    }
+
+    public MetaDataMutationResult updateCache(PName tenantId, List<String> functionNames) throws SQLException {
+        return updateCache(tenantId, functionNames, false);
+    }
+
     private long getClientTimeStamp() {
         Long scn = connection.getSCN();
         long clientTimeStamp = scn == null ? HConstants.LATEST_TIMESTAMP : scn;
@@ -394,6 +459,77 @@ public class MetaDataClient {
 
         return result;
     }
+    
+    private MetaDataMutationResult updateCache(PName tenantId, List<String> functionNames,
+            boolean alwaysHitServer) throws SQLException { // TODO: pass byte[] herez
+        long clientTimeStamp = getClientTimeStamp();
+        List<PFunction> functions = new ArrayList<PFunction>(functionNames.size());
+        List<Long> functionTimeStamps = new ArrayList<Long>(functionNames.size());
+        Iterator<String> iterator = functionNames.iterator();
+        while (iterator.hasNext()) {
+            PFunction function = null;
+            try {
+                String functionName = iterator.next();
+                function =
+                        connection.getMetaDataCache().getFunction(
+                            new PTableKey(tenantId, functionName));
+                if (function != null && !alwaysHitServer
+                        && function.getTimeStamp() == clientTimeStamp - 1) {
+                    functions.add(function);
+                    iterator.remove();
+                    continue;
+                }
+                if (function != null && function.getTimeStamp() != clientTimeStamp - 1) {
+                    functionTimeStamps.add(function.getTimeStamp());
+                } else {
+                    functionTimeStamps.add(HConstants.LATEST_TIMESTAMP);
+                }
+            } catch (FunctionNotFoundException e) {
+                functionTimeStamps.add(HConstants.LATEST_TIMESTAMP);
+            }
+        }
+        // Don't bother with server call: we can't possibly find a newer function
+        if (functionNames.isEmpty()) {
+            return new MetaDataMutationResult(MutationCode.FUNCTION_ALREADY_EXISTS,QueryConstants.UNSET_TIMESTAMP,functions, true);
+        }
+
+        int maxTryCount = tenantId == null ? 1 : 2;
+        int tryCount = 0;
+        MetaDataMutationResult result;
+
+        do {
+            List<Pair<byte[], Long>> functionsToFecth = new ArrayList<Pair<byte[], Long>>(functionNames.size()); 
+            for(int i = 0; i< functionNames.size(); i++) {
+                functionsToFecth.add(new Pair<byte[], Long>(PVarchar.INSTANCE.toBytes(functionNames.get(i)), functionTimeStamps.get(i)));
+            }
+            result = connection.getQueryServices().getFunctions(tenantId, functionsToFecth, clientTimeStamp);
+
+            MutationCode code = result.getMutationCode();
+            // We found an updated table, so update our cache
+            if (result.getFunctions() != null && !result.getFunctions().isEmpty()) {
+                result.getFunctions().addAll(functions);
+                addFunctionToCache(result);
+                return result;
+            } else {
+                if (code == MutationCode.FUNCTION_ALREADY_EXISTS) {
+                    result.getFunctions().addAll(functions);
+                    addFunctionToCache(result);
+                    return result;
+                }
+                if (code == MutationCode.FUNCTION_NOT_FOUND && tryCount + 1 == maxTryCount) {
+                    for (Pair<byte[], Long> f : functionsToFecth) {
+                        connection.removeFunction(tenantId, Bytes.toString(f.getFirst()),
+                            f.getSecond());
+                    }
+                    // TODO removeFunctions all together from cache when 
+                    throw new FunctionNotFoundException(functionNames.toString() + " not found");
+                }
+            }
+            tenantId = null; // Try again with global tenantId
+        } while (++tryCount < maxTryCount);
+
+        return result;
+    }
 
     /**
      * Fault in the physical table to the cache and add any indexes it has to the indexes
@@ -540,6 +676,20 @@ public class MetaDataClient {
         colUpsert.execute();
     }
 
+    private void addFunctionArgMutation(String functionName, FunctionArgument arg, PreparedStatement argUpsert, int position) throws SQLException {
+        argUpsert.setString(1, connection.getTenantId() == null ? null : connection.getTenantId().getString());
+        argUpsert.setString(2, functionName);
+        argUpsert.setString(3, arg.getArgumentType());
+        byte[] bytes = Bytes.toBytes((short)position);
+        argUpsert.setBytes(4, bytes);
+        argUpsert.setBoolean(5, arg.isArrayType());
+        argUpsert.setBoolean(6, arg.isConstant());
+        argUpsert.setString(7, arg.getDefaultValue() == null? null: (String)arg.getDefaultValue().getValue());
+        argUpsert.setString(8, arg.getMinValue() == null? null: (String)arg.getMinValue().getValue());
+        argUpsert.setString(9, arg.getMaxValue() == null? null: (String)arg.getMaxValue().getValue());
+        argUpsert.execute();
+    }
+
     private PColumn newColumn(int position, ColumnDef def, PrimaryKeyConstraint pkConstraint, String defaultColumnFamily, boolean addingToPK) throws SQLException {
         try {
             ColumnName columnDefName = def.getColumnDefName();
@@ -954,7 +1104,7 @@ public class MetaDataClient {
         }
         while (true) {
             try {
-                ColumnResolver resolver = FromCompiler.getResolver(statement, connection);
+                ColumnResolver resolver = FromCompiler.getResolver(statement, connection, statement.getUdfParseNodes());
                 tableRef = resolver.getTables().get(0);
                 PTable dataTable = tableRef.getTable();
                 boolean isTenantConnection = connection.getTenantId() != null;
@@ -1198,6 +1348,57 @@ public class MetaDataClient {
         return new MutationState(1, connection);
     }
 
+    public MutationState createFunction(CreateFunctionStatement stmt) throws SQLException {
+        boolean wasAutoCommit = connection.getAutoCommit();
+        connection.rollback();
+        try {
+            PFunction function = new PFunction(stmt.getFunctionInfo(), stmt.isTemporary());
+            connection.setAutoCommit(false);
+            String tenantIdStr = connection.getTenantId() == null ? null : connection.getTenantId().getString();
+            List<Mutation> functionData = Lists.newArrayListWithExpectedSize(function.getFunctionArguments().size() + 1);
+
+            List<FunctionArgument> args = function.getFunctionArguments();
+            PreparedStatement argUpsert = connection.prepareStatement(INSERT_FUNCTION_ARGUMENT);
+
+            for (int i = 0; i < args.size(); i++) {
+                FunctionArgument arg = args.get(i);
+                addFunctionArgMutation(function.getFunctionName(), arg, argUpsert, i);
+            }
+            functionData.addAll(connection.getMutationState().toMutations().next().getSecond());
+            connection.rollback();
+
+            PreparedStatement functionUpsert = connection.prepareStatement(CREATE_FUNCTION);
+            functionUpsert.setString(1, tenantIdStr);
+            functionUpsert.setString(2, function.getFunctionName());
+            functionUpsert.setInt(3, function.getFunctionArguments().size());
+            functionUpsert.setString(4, function.getClassName());
+            functionUpsert.setString(5, function.getJarPath());
+            functionUpsert.setString(6, function.getReturnType());
+            functionUpsert.execute();
+            functionData.addAll(connection.getMutationState().toMutations().next().getSecond());
+            connection.rollback();
+            MetaDataMutationResult result = connection.getQueryServices().createFunction(functionData, function, stmt.isTemporary());
+            MutationCode code = result.getMutationCode();
+            switch(code) {
+            case FUNCTION_ALREADY_EXISTS:
+                throw new FunctionAlreadyExistsException(function.getFunctionName(), result
+                        .getFunctions().get(0));
+            case NEWER_FUNCTION_FOUND:
+                // Add function to ConnectionQueryServices so it's cached, but don't add
+                // it to this connection as we can't see it.
+                throw new NewerFunctionAlreadyExistsException(function.getFunctionName(), result.getFunctions().get(0));
+            default:
+                List<PFunction> functions = new ArrayList<PFunction>(1);
+                functions.add(function);
+                result = new MetaDataMutationResult(code, result.getMutationTime(), functions, true);
+                addFunctionToCache(result);
+            }
+        } finally {
+            connection.setAutoCommit(wasAutoCommit);
+        }
+        return new MutationState(1, connection);
+    }
+    
     private static ColumnDef findColumnDefOrNull(List<ColumnDef> colDefs, ColumnName colName) {
         for (ColumnDef colDef : colDefs) {
             if (colDef.getColumnDefName().getColumnName().equals(colName.getColumnName())) {
@@ -1807,6 +2008,10 @@ public class MetaDataClient {
         return dropTable(schemaName, tableName, null, statement.getTableType(), statement.ifExists(), statement.cascade());
     }
 
+    public MutationState dropFunction(DropFunctionStatement statement) throws SQLException {
+        return dropFunction(statement.getFunctionName(), statement.ifExists());
+    }
+
     public MutationState dropIndex(DropIndexStatement statement) throws SQLException {
         String schemaName = statement.getTableName().getSchemaName();
         String tableName = statement.getIndexName().getName();
@@ -1814,6 +2019,46 @@ public class MetaDataClient {
         return dropTable(schemaName, tableName, parentTableName, PTableType.INDEX, statement.ifExists(), false);
     }
 
+    private MutationState dropFunction(String functionName, 
+            boolean ifExists) throws SQLException {
+        connection.rollback();
+        boolean wasAutoCommit = connection.getAutoCommit();
+        try {
+            PName tenantId = connection.getTenantId();
+            byte[] key =
+                    SchemaUtil.getFunctionKey(tenantId == null ? ByteUtil.EMPTY_BYTE_ARRAY
+                            : tenantId.getBytes(), Bytes.toBytes(functionName));
+            Long scn = connection.getSCN();
+            long clientTimeStamp = scn == null ? HConstants.LATEST_TIMESTAMP : scn;
+            try {
+                PFunction function = connection.getMetaDataCache().getFunction(new PTableKey(tenantId, functionName));
+                if (function.isTemporaryFunction()) {
+                    connection.removeFunction(tenantId, functionName, clientTimeStamp);
+                    return new MutationState(0, connection);
+                }
+            } catch(FunctionNotFoundException e) {
+                
+            }
+            List<Mutation> functionMetaData = Lists.newArrayListWithExpectedSize(2);
+            Delete functionDelete = new Delete(key, clientTimeStamp);
+            functionMetaData.add(functionDelete);
+            MetaDataMutationResult result = connection.getQueryServices().dropFunction(functionMetaData, ifExists);
+            MutationCode code = result.getMutationCode();
+            switch (code) {
+            case FUNCTION_NOT_FOUND:
+                if (!ifExists) {
+                    throw new FunctionNotFoundException(functionName);
+                }
+                break;
+            default:
+                connection.removeFunction(tenantId, functionName, result.getMutationTime());
+                break;
+            }
+            return new MutationState(0, connection);
+        } finally {
+            connection.setAutoCommit(wasAutoCommit);
+        }
+    }
     private MutationState dropTable(String schemaName, String tableName, String parentTableName, PTableType tableType,
             boolean ifExists, boolean cascade) throws SQLException {
         connection.rollback();
@@ -2658,7 +2903,14 @@ public class MetaDataClient {
         connection.addTable(table);
         return table;
     }
-    
+
+    private List<PFunction> addFunctionToCache(MetaDataMutationResult result) throws SQLException {
+        for(PFunction function: result.getFunctions()) {
+            connection.addFunction(function);
+        }
+        return result.getFunctions();
+    }
+
     private void throwIfAlteringViewPK(ColumnDef col, PTable table) throws SQLException {
         if (col != null && col.isPK() && table.getType() == PTableType.VIEW) {
             throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_MODIFY_VIEW_PK)