You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ra...@apache.org on 2015/04/27 10:33:51 UTC

[7/7] phoenix git commit: PHOENIX-538 Support UDFs(Rajeshbabu Chintaguntla)

PHOENIX-538 Support UDFs(Rajeshbabu Chintaguntla)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/66bd3e35
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/66bd3e35
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/66bd3e35

Branch: refs/heads/master
Commit: 66bd3e35c0d2105dcc393116f8bb5851ce1f5ec4
Parents: cd29be2
Author: Rajeshbabu Chintaguntla <ra...@apache.org>
Authored: Mon Apr 27 14:03:44 2015 +0530
Committer: Rajeshbabu Chintaguntla <ra...@apache.org>
Committed: Mon Apr 27 14:03:44 2015 +0530

----------------------------------------------------------------------
 bin/phoenix_utils.py                            |    9 +
 bin/sqlline.py                                  |    2 +-
 .../end2end/QueryDatabaseMetaDataIT.java        |    5 +
 .../end2end/TenantSpecificTablesDDLIT.java      |    5 +
 .../phoenix/end2end/UserDefinedFunctionsIT.java |  605 +++
 phoenix-core/src/main/antlr3/PhoenixSQL.g       |   77 +-
 .../org/apache/phoenix/cache/GlobalCache.java   |   30 +-
 .../apache/phoenix/compile/ColumnResolver.java  |   17 +
 .../phoenix/compile/CreateFunctionCompiler.java |   80 +
 .../phoenix/compile/CreateIndexCompiler.java    |    2 +-
 .../apache/phoenix/compile/DeleteCompiler.java  |    2 +-
 .../phoenix/compile/ExpressionCompiler.java     |   17 +-
 .../apache/phoenix/compile/FromCompiler.java    |  199 +-
 .../apache/phoenix/compile/JoinCompiler.java    |    9 +-
 .../apache/phoenix/compile/PostDDLCompiler.java |   14 +
 .../phoenix/compile/ProjectionCompiler.java     |    2 +-
 .../apache/phoenix/compile/QueryCompiler.java   |   18 +-
 .../apache/phoenix/compile/RowProjector.java    |   32 +-
 .../phoenix/compile/StatementNormalizer.java    |    5 +-
 .../phoenix/compile/SubqueryRewriter.java       |    4 +-
 .../phoenix/compile/SubselectRewriter.java      |    2 +-
 .../coprocessor/MetaDataEndpointImpl.java       |  651 ++-
 .../phoenix/coprocessor/MetaDataProtocol.java   |   30 +-
 .../coprocessor/generated/MetaDataProtos.java   | 4274 +++++++++++++++---
 .../coprocessor/generated/PFunctionProtos.java  | 2942 ++++++++++++
 .../phoenix/exception/SQLExceptionCode.java     |   20 +-
 .../phoenix/exception/SQLExceptionInfo.java     |   16 +
 .../phoenix/expression/ExpressionType.java      |    4 +-
 .../expression/function/ScalarFunction.java     |    2 +-
 .../expression/function/UDFExpression.java      |  220 +
 .../visitor/CloneExpressionVisitor.java         |    6 +
 .../apache/phoenix/index/IndexMaintainer.java   |   50 +-
 .../apache/phoenix/jdbc/PhoenixConnection.java  |   35 +-
 .../phoenix/jdbc/PhoenixDatabaseMetaData.java   |   30 +
 .../apache/phoenix/jdbc/PhoenixStatement.java   |  132 +-
 .../apache/phoenix/optimize/QueryOptimizer.java |    4 +-
 .../phoenix/parse/CreateFunctionStatement.java  |   42 +
 .../phoenix/parse/CreateIndexStatement.java     |    8 +-
 .../org/apache/phoenix/parse/DMLStatement.java  |   11 +-
 .../apache/phoenix/parse/DeleteStatement.java   |    5 +-
 .../phoenix/parse/DropFunctionStatement.java    |   41 +
 .../apache/phoenix/parse/FunctionParseNode.java |   75 +-
 .../parse/IndexExpressionParseNodeRewriter.java |    4 +-
 .../org/apache/phoenix/parse/NamedNode.java     |    2 +-
 .../org/apache/phoenix/parse/PFunction.java     |  255 ++
 .../apache/phoenix/parse/ParseNodeFactory.java  |   70 +-
 .../apache/phoenix/parse/ParseNodeRewriter.java |    2 +-
 .../apache/phoenix/parse/SelectStatement.java   |   22 +-
 .../org/apache/phoenix/parse/UDFParseNode.java  |   27 +
 .../apache/phoenix/parse/UpsertStatement.java   |    9 +-
 .../apache/phoenix/protobuf/ProtobufUtil.java   |   10 +
 .../phoenix/query/ConnectionQueryServices.java  |    4 +
 .../query/ConnectionQueryServicesImpl.java      |  161 +-
 .../query/ConnectionlessQueryServicesImpl.java  |   52 +-
 .../query/DelegateConnectionQueryServices.java  |   31 +
 .../apache/phoenix/query/MetaDataMutated.java   |    3 +
 .../apache/phoenix/query/QueryConstants.java    |   36 +
 .../org/apache/phoenix/query/QueryServices.java |    2 +
 .../phoenix/query/QueryServicesOptions.java     |    4 +-
 .../schema/FunctionAlreadyExistsException.java  |   58 +
 .../schema/FunctionNotFoundException.java       |   52 +
 .../apache/phoenix/schema/MetaDataClient.java   |  256 +-
 .../NewerFunctionAlreadyExistsException.java    |   39 +
 .../org/apache/phoenix/schema/PMetaData.java    |    6 +-
 .../apache/phoenix/schema/PMetaDataEntity.java  |   22 +
 .../apache/phoenix/schema/PMetaDataImpl.java    |  118 +-
 .../java/org/apache/phoenix/schema/PTable.java  |    3 +-
 .../org/apache/phoenix/util/MetaDataUtil.java   |    7 +-
 .../org/apache/phoenix/util/SchemaUtil.java     |   17 +-
 .../apache/phoenix/parse/QueryParserTest.java   |   18 -
 .../query/ParallelIteratorsSplitTest.java       |   15 +
 phoenix-protocol/src/main/MetaDataService.proto |   37 +-
 phoenix-protocol/src/main/PFunction.proto       |   45 +
 73 files changed, 10222 insertions(+), 899 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/bin/phoenix_utils.py
----------------------------------------------------------------------
diff --git a/bin/phoenix_utils.py b/bin/phoenix_utils.py
index 36c7b82..2cf7db7 100755
--- a/bin/phoenix_utils.py
+++ b/bin/phoenix_utils.py
@@ -64,6 +64,15 @@ def setPath():
  phoenix_client_jar = find("phoenix-*-client.jar", phoenix_jar_path)
  global phoenix_test_jar_path
  phoenix_test_jar_path = os.path.join(current_dir, "..", "phoenix-core", "target","*")
+ global hadoop_common_jar_path
+ hadoop_common_jar_path = os.path.join(current_dir, "..", "phoenix-assembly", "target","*")
+ global hadoop_common_jar
+ hadoop_common_jar = find("hadoop-common*.jar", hadoop_common_jar_path)
+ global hadoop_hdfs_jar_path
+ hadoop_hdfs_jar_path = os.path.join(current_dir, "..", "phoenix-assembly", "target","*")
+ global hadoop_hdfs_jar
+ hadoop_hdfs_jar = find("hadoop-hdfs*.jar", hadoop_hdfs_jar_path)
+
  global hbase_conf_dir
  hbase_conf_dir = os.getenv('HBASE_CONF_DIR', os.getenv('HBASE_CONF_PATH', '.'))
  global hbase_conf_path # keep conf_path around for backward compatibility

http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/bin/sqlline.py
----------------------------------------------------------------------
diff --git a/bin/sqlline.py b/bin/sqlline.py
index 6e5b5fa..80b5ff7 100755
--- a/bin/sqlline.py
+++ b/bin/sqlline.py
@@ -53,7 +53,7 @@ colorSetting = "true"
 if os.name == 'nt':
     colorSetting = "false"
 
-java_cmd = 'java -cp "' + phoenix_utils.hbase_conf_dir + os.pathsep + phoenix_utils.phoenix_client_jar + \
+java_cmd = 'java -cp "' + phoenix_utils.hbase_conf_dir + os.pathsep + phoenix_utils.phoenix_client_jar + os.pathsep + phoenix_utils.hadoop_common_jar + os.pathsep + phoenix_utils.hadoop_hdfs_jar + \
     '" -Dlog4j.configuration=file:' + \
     os.path.join(phoenix_utils.current_dir, "log4j.properties") + \
     " sqlline.SqlLine -d org.apache.phoenix.jdbc.PhoenixDriver \

http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java
index c9ec0ce..61459a5 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java
@@ -19,6 +19,7 @@ package org.apache.phoenix.end2end;
 
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_FUNCTION_TABLE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TYPE_SEQUENCE;
 import static org.apache.phoenix.util.TestUtil.ATABLE_NAME;
 import static org.apache.phoenix.util.TestUtil.ATABLE_SCHEMA_NAME;
@@ -125,6 +126,10 @@ public class QueryDatabaseMetaDataIT extends BaseClientManagedTimeIT {
         assertEquals(PTableType.SYSTEM.toString(), rs.getString("TABLE_TYPE"));
         assertTrue(rs.next());
         assertEquals(rs.getString("TABLE_SCHEM"),SYSTEM_CATALOG_SCHEMA);
+        assertEquals(rs.getString("TABLE_NAME"),SYSTEM_FUNCTION_TABLE);
+        assertEquals(PTableType.SYSTEM.toString(), rs.getString("TABLE_TYPE"));
+        assertTrue(rs.next());
+        assertEquals(rs.getString("TABLE_SCHEM"),SYSTEM_CATALOG_SCHEMA);
         assertEquals(rs.getString("TABLE_NAME"),TYPE_SEQUENCE);
         assertEquals(PTableType.SYSTEM.toString(), rs.getString("TABLE_TYPE"));
         assertTrue(rs.next());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java
index 4d0b45d..a7c7291 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java
@@ -26,6 +26,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.KEY_SEQ;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ORDINAL_POSITION;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_FUNCTION_TABLE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TYPE_SEQUENCE;
 import static org.apache.phoenix.schema.PTableType.SYSTEM;
 import static org.apache.phoenix.schema.PTableType.TABLE;
@@ -473,6 +474,8 @@ public class TenantSpecificTablesDDLIT extends BaseTenantSpecificTablesIT {
             assertTrue(rs.next());
             assertTableMetaData(rs, SYSTEM_CATALOG_SCHEMA, SYSTEM_CATALOG_TABLE, SYSTEM);
             assertTrue(rs.next());
+            assertTableMetaData(rs, SYSTEM_CATALOG_SCHEMA, SYSTEM_FUNCTION_TABLE, SYSTEM);
+            assertTrue(rs.next());
             assertTableMetaData(rs, SYSTEM_CATALOG_SCHEMA, TYPE_SEQUENCE, SYSTEM);
             assertTrue(rs.next());
             assertTableMetaData(rs, SYSTEM_CATALOG_SCHEMA, PhoenixDatabaseMetaData.SYSTEM_STATS_TABLE, SYSTEM);
@@ -539,6 +542,8 @@ public class TenantSpecificTablesDDLIT extends BaseTenantSpecificTablesIT {
             assertTrue(rs.next());
             assertTableMetaData(rs, PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA, PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE, PTableType.SYSTEM);
             assertTrue(rs.next());
+            assertTableMetaData(rs, SYSTEM_CATALOG_SCHEMA, SYSTEM_FUNCTION_TABLE, SYSTEM);
+            assertTrue(rs.next());
             assertTableMetaData(rs, PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA, PhoenixDatabaseMetaData.TYPE_SEQUENCE, PTableType.SYSTEM);
             assertTrue(rs.next());
             assertTableMetaData(rs, SYSTEM_CATALOG_SCHEMA, PhoenixDatabaseMetaData.SYSTEM_STATS_TABLE, PTableType.SYSTEM);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/it/java/org/apache/phoenix/end2end/UserDefinedFunctionsIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UserDefinedFunctionsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UserDefinedFunctionsIT.java
new file mode 100644
index 0000000..d56004b
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UserDefinedFunctionsIT.java
@@ -0,0 +1,605 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR;
+import static org.apache.phoenix.util.PhoenixRuntime.PHOENIX_TEST_DRIVER_URL_PARAM;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_FUNCTION_TABLE;
+import static org.apache.phoenix.query.QueryServices.DYNAMIC_JARS_DIR_KEY;
+import static org.apache.phoenix.util.TestUtil.LOCALHOST;
+import static org.junit.Assert.*;
+
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.Map;
+import java.util.Properties;
+import java.util.jar.Attributes;
+import java.util.jar.JarEntry;
+import java.util.jar.JarOutputStream;
+import java.util.jar.Manifest;
+
+import javax.tools.JavaCompiler;
+import javax.tools.ToolProvider;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.phoenix.expression.function.UDFExpression;
+import org.apache.phoenix.jdbc.PhoenixTestDriver;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.FunctionAlreadyExistsException;
+import org.apache.phoenix.schema.FunctionNotFoundException;
+import org.apache.phoenix.schema.ValueRangeExcpetion;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+
+public class UserDefinedFunctionsIT extends BaseTest{
+    
+    protected static final String TENANT_ID = "ZZTop";
+    private static String url;
+    private static PhoenixTestDriver driver;
+    private static HBaseTestingUtility util;
+
+    private static String STRING_REVERSE_EVALUATE_METHOD =
+            new StringBuffer()
+                    .append("    public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {\n")
+                    .append("        Expression arg = getChildren().get(0);\n")
+                    .append("        if (!arg.evaluate(tuple, ptr)) {\n")
+                    .append("           return false;\n")
+                    .append("       }\n")
+                    .append("       int targetOffset = ptr.getLength();\n")
+                    .append("       if (targetOffset == 0) {\n")
+                    .append("            return true;\n")
+                    .append("        }\n")
+                    .append("        byte[] source = ptr.get();\n")
+                    .append("        byte[] target = new byte[targetOffset];\n")
+                    .append("        int sourceOffset = ptr.getOffset(); \n")
+                    .append("        int endOffset = sourceOffset + ptr.getLength();\n")
+                    .append("        SortOrder sortOrder = arg.getSortOrder();\n")
+                    .append("        while (sourceOffset < endOffset) {\n")
+                    .append("            int nBytes = StringUtil.getBytesInChar(source[sourceOffset], sortOrder);\n")
+                    .append("            targetOffset -= nBytes;\n")
+                    .append("            System.arraycopy(source, sourceOffset, target, targetOffset, nBytes);\n")
+                    .append("            sourceOffset += nBytes;\n")
+                    .append("        }\n")
+                    .append("        ptr.set(target);\n")
+                    .append("        return true;\n")
+                    .append("    }\n").toString();
+
+    private static String SUM_COLUMN_VALUES_EVALUATE_METHOD =
+            new StringBuffer()
+                    .append("    public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {\n")
+                    .append("        int[] array = new int[getChildren().size()];\n")
+                    .append("        int i = 0;\n")
+                    .append("        for(Expression child:getChildren()) {\n")
+                    .append("            if (!child.evaluate(tuple, ptr)) {\n")
+                    .append("                return false;\n")
+                    .append("            }\n")
+                    .append("            int targetOffset = ptr.getLength();\n")
+                    .append("            if (targetOffset == 0) {\n")
+                    .append("                return true;\n")
+                    .append("            }\n")
+                    .append("            array[i++] = (Integer) PInteger.INSTANCE.toObject(ptr);\n")
+                    .append("        }\n")
+                    .append("        int sum = 0;\n")
+                    .append("        for(i=0;i<getChildren().size();i++) {\n")
+                    .append("            sum+=array[i];\n")
+                    .append("        }\n")
+                    .append("        ptr.set(PInteger.INSTANCE.toBytes((Integer)sum));\n")
+                    .append("        return true;\n")
+                    .append("    }\n").toString();
+
+    private static String MY_REVERSE_CLASS_NAME = "MyReverse";
+    private static String MY_SUM_CLASS_NAME = "MySum";
+    private static String MY_REVERSE_PROGRAM = getProgram(MY_REVERSE_CLASS_NAME, STRING_REVERSE_EVALUATE_METHOD, "PVarchar");
+    private static String MY_SUM_PROGRAM = getProgram(MY_SUM_CLASS_NAME, SUM_COLUMN_VALUES_EVALUATE_METHOD, "PInteger");
+    private static Properties EMPTY_PROPS = new Properties();
+    
+
+    private static String getProgram(String className, String evaluateMethod, String returnType) {
+        return new StringBuffer()
+                .append("package org.apache.phoenix.end2end;\n")
+                .append("import java.sql.SQLException;\n")
+                .append("import java.sql.SQLException;\n")
+                .append("import java.util.List;\n")
+                .append("import org.apache.hadoop.hbase.io.ImmutableBytesWritable;\n")
+                .append("import org.apache.phoenix.expression.Expression;\n")
+                .append("import org.apache.phoenix.expression.function.ScalarFunction;\n")
+                .append("import org.apache.phoenix.schema.SortOrder;\n")
+                .append("import org.apache.phoenix.schema.tuple.Tuple;\n")
+                .append("import org.apache.phoenix.schema.types.PDataType;\n")
+                .append("import org.apache.phoenix.schema.types.PInteger;\n")
+                .append("import org.apache.phoenix.schema.types.PVarchar;\n")
+                .append("import org.apache.phoenix.util.StringUtil;\n")
+                .append("public class "+className+" extends ScalarFunction{\n")
+                .append("    public static final String NAME = \"MY_REVERSE\";\n")
+                .append("    public "+className+"() {\n")
+                .append("    }\n")
+                .append("    public "+className+"(List<Expression> children) throws SQLException {\n")
+                .append("        super(children);\n")
+                .append("    }\n")
+                .append("    @Override\n")
+                .append(evaluateMethod)
+                .append("    @Override\n")
+                .append("    public SortOrder getSortOrder() {\n")
+                .append("        return getChildren().get(0).getSortOrder();\n")
+                .append("    }\n")
+                .append("  @Override\n")
+                .append("   public PDataType getDataType() {\n")
+                .append("       return "+returnType+".INSTANCE;\n")
+                .append("    }\n")
+                .append("    @Override\n")
+                .append("    public String getName() {\n")
+                .append("        return NAME;\n")
+                .append("    }\n")
+                .append("}\n").toString();
+    }
+
+    @BeforeClass
+    public static void doSetup() throws Exception {
+        Configuration conf = HBaseConfiguration.create();
+        setUpConfigForMiniCluster(conf);
+        util = new HBaseTestingUtility(conf);
+        util.startMiniDFSCluster(1);
+        util.startMiniZKCluster(1);
+        String string = util.getConfiguration().get("fs.defaultFS");
+        conf.set(DYNAMIC_JARS_DIR_KEY, string+"/hbase/tmpjars");
+        util.startMiniHBaseCluster(1, 1);
+        UDFExpression.setConfig(conf);
+        compileTestClass(MY_REVERSE_CLASS_NAME, MY_REVERSE_PROGRAM, 1);
+        compileTestClass(MY_SUM_CLASS_NAME, MY_SUM_PROGRAM, 2);
+        String clientPort = util.getConfiguration().get(QueryServices.ZOOKEEPER_PORT_ATTRIB);
+        url =
+                JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + LOCALHOST + JDBC_PROTOCOL_SEPARATOR
+                        + clientPort + JDBC_PROTOCOL_TERMINATOR + PHOENIX_TEST_DRIVER_URL_PARAM;
+        Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+        props.put(QueryServices.ALLOW_USER_DEFINED_FUNCTIONS_ATTRIB, "true");
+        driver = initAndRegisterDriver(url, new ReadOnlyProps(props.entrySet().iterator()));
+    }
+    
+    @Test
+    public void testCreateFunction() throws Exception {
+        Connection conn = driver.connect(url, EMPTY_PROPS);
+        Statement stmt = conn.createStatement();
+        conn.createStatement().execute("create table t(k integer primary key, firstname varchar, lastname varchar)");
+        stmt.execute("upsert into t values(1,'foo','jock')");
+        conn.commit();
+        stmt.execute("create function myreverse(VARCHAR) returns VARCHAR as 'org.apache.phoenix.end2end."+MY_REVERSE_CLASS_NAME+"' using jar "
+                + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar1.jar"+"'");
+        ResultSet rs = stmt.executeQuery("select myreverse(firstname) from t");
+        assertTrue(rs.next());
+        assertEquals("oof", rs.getString(1));
+        assertFalse(rs.next());
+        rs = stmt.executeQuery("select * from t where myreverse(firstname)='oof'");
+        assertTrue(rs.next());
+        assertEquals(1, rs.getInt(1));
+        assertEquals("foo", rs.getString(2));        
+        assertEquals("jock", rs.getString(3));
+        assertFalse(rs.next());
+        
+        try {
+            stmt.execute("create function myreverse(VARCHAR) returns VARCHAR as 'org.apache.phoenix.end2end."+MY_REVERSE_CLASS_NAME+"' using jar "
+                    + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar1.jar"+"'");
+            fail("Duplicate function should not be created.");
+        } catch(FunctionAlreadyExistsException e) {
+        }
+        // without specifying the jar should pick the class from path of hbase.dynamic.jars.dir configuration. 
+        stmt.execute("create function myreverse2(VARCHAR) returns VARCHAR as 'org.apache.phoenix.end2end."+MY_REVERSE_CLASS_NAME+"'");
+        rs = stmt.executeQuery("select myreverse2(firstname) from t");
+        assertTrue(rs.next());
+        assertEquals("oof", rs.getString(1));        
+        assertFalse(rs.next());
+        rs = stmt.executeQuery("select * from t where myreverse2(firstname)='oof'");
+        assertTrue(rs.next());
+        assertEquals(1, rs.getInt(1));
+        assertEquals("foo", rs.getString(2));        
+        assertEquals("jock", rs.getString(3));
+        assertFalse(rs.next());
+        conn.createStatement().execute("create table t3(tenant_id varchar not null, k integer not null, firstname varchar, lastname varchar constraint pk primary key(tenant_id,k)) MULTI_TENANT=true");
+        // Function created with global id should be accessible.
+        Connection conn2 = driver.connect(url+";"+PhoenixRuntime.TENANT_ID_ATTRIB+"="+TENANT_ID, EMPTY_PROPS);
+        try {
+            conn2.createStatement().execute("upsert into t3 values(1,'foo','jock')");
+            conn2.commit();
+            conn2.createStatement().execute("create function myreverse(VARCHAR) returns VARCHAR as 'org.apache.phoenix.end2end."+MY_REVERSE_CLASS_NAME+"' using jar "
+                    + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar1.jar"+"'");
+            rs = conn2.createStatement().executeQuery("select myreverse(firstname) from t3");
+            assertTrue(rs.next());
+            assertEquals("oof", rs.getString(1)); 
+        } catch(FunctionAlreadyExistsException e) {
+            fail("FunctionAlreadyExistsException should not be thrown");
+        }
+        // calling global udf on tenant specific specific connection.
+        rs = conn2.createStatement().executeQuery("select myreverse2(firstname) from t3");
+        assertTrue(rs.next());
+        assertEquals("oof", rs.getString(1));
+        try {
+            conn2.createStatement().execute("drop function myreverse2");
+            fail("FunctionNotFoundException should be thrown");
+        } catch(FunctionNotFoundException e){
+            
+        }
+        conn.createStatement().execute("drop function myreverse2");
+        try {
+            rs = conn2.createStatement().executeQuery("select myreverse2(firstname) from t3");
+            fail("FunctionNotFoundException should be thrown.");
+        } catch(FunctionNotFoundException e){
+            
+        }
+        try{
+            rs = conn2.createStatement().executeQuery("select unknownFunction(firstname) from t3");
+            fail("FunctionNotFoundException should be thrown.");
+        } catch(FunctionNotFoundException e) {
+            
+        }
+    }
+
+    @Test
+    public void testSameUDFWithDifferentImplementationsInDifferentTenantConnections() throws Exception {
+        Connection nonTenantConn = driver.connect(url, EMPTY_PROPS);
+        nonTenantConn.createStatement().execute("create function myfunction(VARCHAR) returns VARCHAR as 'org.apache.phoenix.end2end."+MY_REVERSE_CLASS_NAME+"' using jar "
+                + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar1.jar"+"'");
+        try {
+            nonTenantConn.createStatement().execute("create function myfunction(VARCHAR) returns VARCHAR as 'org.apache.phoenix.end2end.UnknownClass' using jar "
+                    + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar1.jar"+"'");
+            fail("FunctionAlreadyExistsException should be thrown.");
+        } catch(FunctionAlreadyExistsException e) {
+            
+        }
+        String tenantId1="tenId1";
+        String tenantId2="tenId2";
+        nonTenantConn.createStatement().execute("create table t7(tenant_id varchar not null, k integer not null, k1 integer, name varchar constraint pk primary key(tenant_id, k)) multi_tenant=true");
+        Connection tenant1Conn = driver.connect(url+";"+PhoenixRuntime.TENANT_ID_ATTRIB+"="+tenantId1, EMPTY_PROPS);
+        Connection tenant2Conn = driver.connect(url+";"+PhoenixRuntime.TENANT_ID_ATTRIB+"="+tenantId2, EMPTY_PROPS);
+        tenant1Conn.createStatement().execute("upsert into t7 values(1,1,'jock')");
+        tenant1Conn.commit();
+        tenant2Conn.createStatement().execute("upsert into t7 values(1,2,'jock')");
+        tenant2Conn.commit();
+        tenant1Conn.createStatement().execute("create function myfunction(VARCHAR) returns VARCHAR as 'org.apache.phoenix.end2end."+MY_REVERSE_CLASS_NAME+"' using jar "
+                + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar1.jar"+"'");
+        try {
+            tenant1Conn.createStatement().execute("create function myfunction(VARCHAR) returns VARCHAR as 'org.apache.phoenix.end2end.UnknownClass' using jar "
+                    + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar1.jar"+"'");
+            fail("FunctionAlreadyExistsException should be thrown.");
+        } catch(FunctionAlreadyExistsException e) {
+            
+        }
+
+        tenant2Conn.createStatement().execute("create function myfunction(INTEGER, INTEGER CONSTANT defaultValue='10' minvalue='1' maxvalue='15' ) returns INTEGER as 'org.apache.phoenix.end2end."+MY_SUM_CLASS_NAME+"' using jar "
+                + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar2.jar"+"'");
+        try {
+            tenant2Conn.createStatement().execute("create function myfunction(VARCHAR) returns VARCHAR as 'org.apache.phoenix.end2end.UnknownClass' using jar "
+                    + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/unknown.jar"+"'");
+            fail("FunctionAlreadyExistsException should be thrown.");
+        } catch(FunctionAlreadyExistsException e) {
+            
+        }
+
+        ResultSet rs = tenant1Conn.createStatement().executeQuery("select MYFUNCTION(name) from t7");
+        assertTrue(rs.next());
+        assertEquals("kcoj", rs.getString(1));
+        assertFalse(rs.next());
+        rs = tenant1Conn.createStatement().executeQuery("select * from t7 where MYFUNCTION(name)='kcoj'");
+        assertTrue(rs.next());
+        assertEquals(1, rs.getInt(1));
+        assertEquals(1, rs.getInt(2));        
+        assertEquals("jock", rs.getString(3));
+        assertFalse(rs.next());
+
+        rs = tenant2Conn.createStatement().executeQuery("select MYFUNCTION(k) from t7");
+        assertTrue(rs.next());
+        assertEquals(11, rs.getInt(1));
+        assertFalse(rs.next());
+        rs = tenant2Conn.createStatement().executeQuery("select * from t7 where MYFUNCTION(k1)=12");
+        assertTrue(rs.next());
+        assertEquals(1, rs.getInt(1));
+        assertEquals(2, rs.getInt(2));        
+        assertEquals("jock", rs.getString(3));
+        assertFalse(rs.next());
+    }
+
+    @Test
+    public void testUDFsWithMultipleConnections() throws Exception {
+        Connection conn1 = driver.connect(url, EMPTY_PROPS);
+        conn1.createStatement().execute("create function myfunction(VARCHAR) returns VARCHAR as 'org.apache.phoenix.end2end."+MY_REVERSE_CLASS_NAME+"' using jar "
+                + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar1.jar"+"'");
+        Connection conn2 = driver.connect(url, EMPTY_PROPS);
+        try{
+            conn2.createStatement().execute("create function myfunction(VARCHAR) returns VARCHAR as 'org.apache.phoenix.end2end."+MY_REVERSE_CLASS_NAME+"' using jar "
+                    + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar1.jar"+"'");
+            fail("FunctionAlreadyExistsException should be thrown.");
+        } catch(FunctionAlreadyExistsException e) {
+            
+        }
+        conn2.createStatement().execute("create table t8(k integer not null primary key, k1 integer, name varchar)");
+        conn2.createStatement().execute("upsert into t8 values(1,1,'jock')");
+        conn2.commit();
+        ResultSet rs = conn2.createStatement().executeQuery("select MYFUNCTION(name) from t8");
+        assertTrue(rs.next());
+        assertEquals("kcoj", rs.getString(1));
+        assertFalse(rs.next());
+        rs = conn2.createStatement().executeQuery("select * from t8 where MYFUNCTION(name)='kcoj'");
+        assertTrue(rs.next());
+        assertEquals(1, rs.getInt(1));
+        assertEquals(1, rs.getInt(2));
+        assertEquals("jock", rs.getString(3));
+        assertFalse(rs.next());
+        conn2.createStatement().execute("drop function MYFUNCTION");
+        try {
+            rs = conn1.createStatement().executeQuery("select MYFUNCTION(name) from t8");
+            fail("FunctionNotFoundException should be thrown");
+        } catch(FunctionNotFoundException e) {
+            
+        }
+    }
+    @Test
+    public void testUsingUDFFunctionInDifferentQueries() throws Exception {
+        Connection conn = driver.connect(url, EMPTY_PROPS);
+        Statement stmt = conn.createStatement();
+        conn.createStatement().execute("create table t1(k integer primary key, firstname varchar, lastname varchar)");
+        stmt.execute("upsert into t1 values(1,'foo','jock')");
+        conn.commit();
+        conn.createStatement().execute("create table t2(k integer primary key, k1 integer, lastname_reverse varchar)");
+        conn.commit();
+        stmt.execute("create function mysum3(INTEGER, INTEGER CONSTANT defaultValue='10' minvalue='1' maxvalue='15' ) returns INTEGER as 'org.apache.phoenix.end2end."+MY_SUM_CLASS_NAME+"' using jar "
+                + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar2.jar"+"'");
+        stmt.execute("create function myreverse3(VARCHAR) returns VARCHAR as 'org.apache.phoenix.end2end."+MY_REVERSE_CLASS_NAME+"' using jar "
+                + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar1.jar"+"'");
+        stmt.execute("upsert into t2(k,k1,lastname_reverse) select mysum3(k),mysum3(k,11),myreverse3(lastname) from t1");
+        conn.commit();
+        ResultSet rs = stmt.executeQuery("select * from t2");
+        assertTrue(rs.next());
+        assertEquals(11, rs.getInt(1));
+        assertEquals(12, rs.getInt(2));
+        assertEquals("kcoj", rs.getString(3));
+        assertFalse(rs.next());
+        stmt.execute("delete from t2 where myreverse3(lastname_reverse)='jock' and mysum3(k)=21");
+        conn.commit();
+        rs = stmt.executeQuery("select * from t2");
+        assertFalse(rs.next());
+        stmt.execute("create function myreverse4(VARCHAR CONSTANT defaultValue='null') returns VARCHAR as 'org.apache.phoenix.end2end."+MY_REVERSE_CLASS_NAME+"'");
+        stmt.execute("upsert into t2 values(11,12,myreverse4('jock'))");
+        conn.commit();
+        rs = stmt.executeQuery("select * from t2");
+        assertTrue(rs.next());
+        assertEquals(11, rs.getInt(1));
+        assertEquals(12, rs.getInt(2));
+        assertEquals("kcoj", rs.getString(3));
+        assertFalse(rs.next());
+    }
+
+    @Test
+    public void testVerifyCreateFunctionArguments() throws Exception {
+        Connection conn = driver.connect(url, EMPTY_PROPS);
+        Statement stmt = conn.createStatement();
+        conn.createStatement().execute("create table t4(k integer primary key, k1 integer, lastname varchar)");
+        stmt.execute("upsert into t4 values(1,1,'jock')");
+        conn.commit();
+        stmt.execute("create function mysum(INTEGER, INTEGER CONSTANT defaultValue='10' minvalue='1' maxvalue='15' ) returns INTEGER as 'org.apache.phoenix.end2end."+MY_SUM_CLASS_NAME+"' using jar "
+                + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar2.jar"+"'");
+        ResultSet rs = stmt.executeQuery("select mysum(k,12) from t4");
+        assertTrue(rs.next());
+        assertEquals(13, rs.getInt(1));
+        rs = stmt.executeQuery("select mysum(k) from t4");
+        assertTrue(rs.next());
+        assertEquals(11, rs.getInt(1));
+        try {
+            stmt.executeQuery("select mysum(k,20) from t4");
+            fail("Value Range Exception should be thrown.");
+        } catch(ValueRangeExcpetion e) {
+            
+        }
+    }
+
+    @Test
+    public void testTemporaryFunctions() throws Exception {
+        Connection conn = driver.connect(url, EMPTY_PROPS);
+        Statement stmt = conn.createStatement();
+        conn.createStatement().execute("create table t9(k integer primary key, k1 integer, lastname varchar)");
+        stmt.execute("upsert into t9 values(1,1,'jock')");
+        conn.commit();
+        stmt.execute("create temporary function mysum9(INTEGER, INTEGER CONSTANT defaultValue='10' minvalue='1' maxvalue='15' ) returns INTEGER as 'org.apache.phoenix.end2end."+MY_SUM_CLASS_NAME+"' using jar "
+                + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar2.jar"+"'");
+        ResultSet rs = stmt.executeQuery("select mysum9(k,12) from t9");
+        assertTrue(rs.next());
+        assertEquals(13, rs.getInt(1));
+        rs = stmt.executeQuery("select mysum9(k) from t9");
+        assertTrue(rs.next());
+        assertEquals(11, rs.getInt(1));
+        rs = stmt.executeQuery("select k from t9 where mysum9(k)=11");
+        assertTrue(rs.next());
+        assertEquals(1, rs.getInt(1));
+        stmt.execute("drop function mysum9");
+        try {
+            rs = stmt.executeQuery("select k from t9 where mysum9(k)=11");
+            fail("FunctionNotFoundException should be thrown");
+        } catch(FunctionNotFoundException e){
+            
+        }
+    }
+
+    @Test
+    public void testDropFunction() throws Exception {
+        Connection conn = driver.connect(url, EMPTY_PROPS);
+        Statement stmt = conn.createStatement();
+        String query = "select count(*) from "+ SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_FUNCTION_TABLE + "\"";
+        ResultSet rs = stmt.executeQuery(query);
+        rs.next();
+        int numRowsBefore = rs.getInt(1);
+        stmt.execute("create function mysum6(INTEGER, INTEGER CONSTANT defaultValue='10' minvalue='1' maxvalue='15' ) returns INTEGER as 'org.apache.phoenix.end2end."+MY_SUM_CLASS_NAME+"' using jar "
+                + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar2.jar"+"'");
+        rs = stmt.executeQuery(query);
+        rs.next();
+        int numRowsAfter= rs.getInt(1);
+        assertEquals(3, numRowsAfter - numRowsBefore);
+        stmt.execute("drop function mysum6");
+        rs = stmt.executeQuery(query);
+        rs.next();
+        assertEquals(numRowsBefore, rs.getInt(1));
+        conn.createStatement().execute("create table t6(k integer primary key, k1 integer, lastname varchar)");
+        try {
+            rs = stmt.executeQuery("select mysum6(k1) from t6");
+            fail("FunctionNotFoundException should be thrown");
+        } catch(FunctionNotFoundException e) {
+            
+        }
+        try {
+            stmt.execute("drop function mysum6");
+            fail("FunctionNotFoundException should be thrown");
+        } catch(FunctionNotFoundException e) {
+            
+        }
+        try {
+            stmt.execute("drop function if exists mysum6");
+        } catch(FunctionNotFoundException e) {
+            fail("FunctionNotFoundException should not be thrown");
+        }
+        stmt.execute("create function mysum6(INTEGER, INTEGER CONSTANT defaultValue='10' minvalue='1' maxvalue='15' ) returns INTEGER as 'org.apache.phoenix.end2end."+MY_SUM_CLASS_NAME+"' using jar "
+                + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar2.jar"+"'");
+        try {
+            rs = stmt.executeQuery("select mysum6(k1) from t6");
+        } catch(FunctionNotFoundException e) {
+            fail("FunctionNotFoundException should not be thrown");
+        }
+    }
+
+    @Test
+    public void testFunctionalIndexesWithUDFFunction() throws Exception {
+        Connection conn = driver.connect(url, EMPTY_PROPS);
+        Statement stmt = conn.createStatement();
+        stmt.execute("create table t5(k integer primary key, k1 integer, lastname_reverse varchar)");
+        stmt.execute("create function myreverse5(VARCHAR) returns VARCHAR as 'org.apache.phoenix.end2end."+MY_REVERSE_CLASS_NAME+"'");
+        stmt.execute("upsert into t5 values(1,1,'jock')");
+        conn.commit();
+        stmt.execute("create index idx on t5(myreverse5(lastname_reverse))");
+        String query = "select myreverse5(lastname_reverse) from t5";
+        ResultSet rs = stmt.executeQuery("explain " + query);
+        assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER IDX\n"
+                + "    SERVER FILTER BY FIRST KEY ONLY", QueryUtil.getExplainPlan(rs));
+        rs = stmt.executeQuery(query);
+        assertTrue(rs.next());
+        assertEquals("kcoj", rs.getString(1));
+        assertFalse(rs.next());
+        stmt.execute("create local index idx2 on t5(myreverse5(lastname_reverse))");
+        query = "select k,k1,myreverse5(lastname_reverse) from t5 where myreverse5(lastname_reverse)='kcoj'";
+        rs = stmt.executeQuery("explain " + query);
+        assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER _LOCAL_IDX_T5 [-32768,'kcoj']\n"
+                + "    SERVER FILTER BY FIRST KEY ONLY\n"
+                +"CLIENT MERGE SORT", QueryUtil.getExplainPlan(rs));
+        rs = stmt.executeQuery(query);
+        assertTrue(rs.next());
+        assertEquals(1, rs.getInt(1));
+        assertEquals(1, rs.getInt(2));
+        assertEquals("kcoj", rs.getString(3));
+        assertFalse(rs.next());
+    }
+
+    @AfterClass
+    public static void tearDown() throws Exception {
+        try {
+            destroyDriver(driver);
+        } finally {
+            util.shutdownMiniCluster();
+        }
+    }
+
+    /**
+     * Compiles the test class with bogus code into a .class file.
+     */
+    private static void compileTestClass(String className, String program, int counter) throws Exception {
+        String javaFileName = className+".java";
+        File javaFile = new File(javaFileName);
+        String classFileName = className+".class";
+        File classFile = new File(classFileName);
+        String jarName = "myjar"+counter+".jar";
+        String jarPath = "." + File.separator + jarName;
+        File jarFile = new File(jarPath);
+        try {
+            String packageName = "org.apache.phoenix.end2end";
+            FileOutputStream fos = new FileOutputStream(javaFileName);
+            fos.write(program.getBytes());
+            fos.close();
+            
+            JavaCompiler jc = ToolProvider.getSystemJavaCompiler();
+            int result = jc.run(null, null, null, javaFileName);
+            assertEquals(0, result);
+            
+            Manifest manifest = new Manifest();
+            manifest.getMainAttributes().put(Attributes.Name.MANIFEST_VERSION, "1.0");
+            FileOutputStream jarFos = new FileOutputStream(jarPath);
+            JarOutputStream jarOutputStream = new JarOutputStream(jarFos, manifest);
+            String pathToAdd =packageName.replace('.', File.separatorChar)
+                    + File.separator;
+            jarOutputStream.putNextEntry(new JarEntry(pathToAdd));
+            jarOutputStream.closeEntry();
+            jarOutputStream.putNextEntry(new JarEntry(pathToAdd + classFile.getName()));
+            byte[] allBytes = new byte[(int) classFile.length()];
+            FileInputStream fis = new FileInputStream(classFile);
+            fis.read(allBytes);
+            fis.close();
+            jarOutputStream.write(allBytes);
+            jarOutputStream.closeEntry();
+            jarOutputStream.close();
+            jarFos.close();
+            
+            assertTrue(jarFile.exists());
+            
+            InputStream inputStream = new BufferedInputStream(new FileInputStream(jarPath));
+            FileSystem fs = util.getDefaultRootDirPath().getFileSystem(util.getConfiguration());
+            Path jarsLocation = new Path(util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY));
+            Path myJarPath;
+            if (jarsLocation.toString().endsWith("/")) {
+                myJarPath = new Path(jarsLocation.toString() + jarName);
+            } else {
+                myJarPath = new Path(jarsLocation.toString() + "/" + jarName);
+            }
+            OutputStream outputStream = fs.create(myJarPath);
+            try {
+                IOUtils.copyBytes(inputStream, outputStream, 4096, false);
+            } finally {
+                IOUtils.closeStream(inputStream);
+                IOUtils.closeStream(outputStream);
+            }
+        } finally {
+            if (javaFile != null) javaFile.delete();
+            if (classFile != null) classFile.delete();
+            if (jarFile != null) jarFile.delete();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/antlr3/PhoenixSQL.g
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/antlr3/PhoenixSQL.g b/phoenix-core/src/main/antlr3/PhoenixSQL.g
index f57c5cc..d2bb241 100644
--- a/phoenix-core/src/main/antlr3/PhoenixSQL.g
+++ b/phoenix-core/src/main/antlr3/PhoenixSQL.g
@@ -114,6 +114,14 @@ tokens
     ASYNC='async';
     SAMPLING='sampling';
     UNION='union';
+    FUNCTION='function';
+    AS='as';
+    TEMPORARY='temporary';
+    RETURNS='returns';
+    USING='using';
+    JAR='jar';
+    DEFAULTVALUE='defaultvalue';
+    CONSTANT = 'constant';
 }
 
 
@@ -144,13 +152,18 @@ import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ListMultimap;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import java.lang.Boolean;
 import java.math.BigDecimal;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Stack;
 import java.sql.SQLException;
 import org.apache.phoenix.expression.function.CountAggregateFunction;
+import org.apache.phoenix.expression.LiteralExpression;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.parse.PFunction;
+import org.apache.phoenix.parse.PFunction.FunctionArgument;
+import org.apache.phoenix.parse.UDFParseNode;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.IllegalDataException;
@@ -206,6 +219,7 @@ package org.apache.phoenix.parse;
     private int anonBindNum;
     private ParseNodeFactory factory;
     private ParseContext.Stack contextStack = new ParseContext.Stack();
+    private Map<String, UDFParseNode> udfParseNodes = new HashMap<String, UDFParseNode>(1);
 
     public void setParseNodeFactory(ParseNodeFactory factory) {
         this.factory = factory;
@@ -341,13 +355,25 @@ package org.apache.phoenix.parse;
 // Used to incrementally parse a series of semicolon-terminated SQL statement
 // Note than unlike the rule below an EOF is not expected at the end.
 nextStatement returns [BindableStatement ret]
-    :  s=oneStatement {$ret = s;} SEMICOLON
+    :  s=oneStatement {
+    		try {
+    			$ret = s;
+    		} finally {
+    			udfParseNodes.clear();
+    		}
+    	} SEMICOLON
     |  EOF
     ;
 
 // Parses a single SQL statement (expects an EOF after the select statement).
 statement returns [BindableStatement ret]
-    :   s=oneStatement {$ret = s;} EOF
+    :   s=oneStatement {
+        		try {
+    			$ret = s;
+    		} finally {
+    			udfParseNodes.clear();
+    		}
+    	} EOF
     ;
 
 // Parses a select statement which must be the only statement (expects an EOF after the statement).
@@ -369,6 +395,8 @@ oneStatement returns [BindableStatement ret]
     |   s=alter_index_node
     |   s=alter_table_node
     |   s=trace_node
+    |   s=create_function_node
+    |   s=drop_function_node
     |   s=alter_session_node
     |	s=create_sequence_node
     |	s=drop_sequence_node
@@ -409,7 +437,7 @@ create_index_node returns [CreateIndexStatement ret]
         (async=ASYNC)?
         (p=fam_properties)?
         (SPLIT ON v=value_expression_list)?
-        {ret = factory.createIndex(i, factory.namedTable(null,t), ik, icrefs, v, p, ex!=null, l==null ? IndexType.getDefault() : IndexType.LOCAL, async != null, getBindCount()); }
+        {ret = factory.createIndex(i, factory.namedTable(null,t), ik, icrefs, v, p, ex!=null, l==null ? IndexType.getDefault() : IndexType.LOCAL, async != null, getBindCount(), new HashMap<String, UDFParseNode>(udfParseNodes)); }
     ;
 
 // Parse a create sequence statement.
@@ -510,6 +538,25 @@ trace_node returns [TraceStatement ret]
        {ret = factory.trace(Tracing.isTraceOn(flag.getText()), s == null ? Tracing.isTraceOn(flag.getText()) ? 1.0 : 0.0 : (((BigDecimal)s.getValue())).doubleValue());}
     ;
 
+// Parse a trace statement.
+create_function_node returns [CreateFunctionStatement ret]
+    :   CREATE (temp=TEMPORARY)? FUNCTION function=identifier 
+       (LPAREN args=zero_or_more_data_types RPAREN)
+       RETURNS r=identifier AS (className= jar_path)
+       (USING JAR (jarPath = jar_path))?
+        {
+            $ret = factory.createFunction(new PFunction(SchemaUtil.normalizeIdentifier(function), args,r,(String)className.getValue(), jarPath == null ? null : (String)jarPath.getValue()), temp!=null);;
+        } 
+    ;
+
+jar_path returns [LiteralParseNode ret]
+    : l=literal { $ret = l; }
+    ;
+
+drop_function_node returns [DropFunctionStatement ret]
+    : DROP FUNCTION (IF ex=EXISTS)? function=identifier {$ret = factory.dropFunction(SchemaUtil.normalizeIdentifier(function), ex!=null);}
+    ;
+
 // Parse an alter session statement.
 alter_session_node returns [AlterSessionStatement ret]
     :   ALTER SESSION (SET p=properties)
@@ -586,7 +633,7 @@ single_select returns [SelectStatement ret]
         (WHERE where=expression)?
         (GROUP BY group=group_by)?
         (HAVING having=expression)?
-        { ParseContext context = contextStack.peek(); $ret = factory.select(from, h, d!=null, sel, where, group, having, null, null, getBindCount(), context.isAggregate(), context.hasSequences(), null); }
+        { ParseContext context = contextStack.peek(); $ret = factory.select(from, h, d!=null, sel, where, group, having, null, null, getBindCount(), context.isAggregate(), context.hasSequences(), null, new HashMap<String,UDFParseNode>(udfParseNodes)); }
     ;
 finally{ contextStack.pop(); }
 
@@ -610,7 +657,7 @@ upsert_node returns [UpsertStatement ret]
     :   UPSERT (hint=hintClause)? INTO t=from_table_name
         (LPAREN p=upsert_column_refs RPAREN)?
         ((VALUES LPAREN v=one_or_more_expressions RPAREN) | s=select_node)
-        {ret = factory.upsert(factory.namedTable(null,t,p == null ? null : p.getFirst()), hint, p == null ? null : p.getSecond(), v, s, getBindCount()); }
+        {ret = factory.upsert(factory.namedTable(null,t,p == null ? null : p.getFirst()), hint, p == null ? null : p.getSecond(), v, s, getBindCount(), new HashMap<String, UDFParseNode>(udfParseNodes)); }
     ;
 
 upsert_column_refs returns [Pair<List<ColumnDef>,List<ColumnName>> ret]
@@ -625,7 +672,7 @@ delete_node returns [DeleteStatement ret]
         (WHERE v=expression)?
         (ORDER BY order=order_by)?
         (LIMIT l=limit)?
-        {ret = factory.delete(factory.namedTable(null,t), hint, v, order, l, getBindCount()); }
+        {ret = factory.delete(factory.namedTable(null,t), hint, v, order, l, getBindCount(), new HashMap<String, UDFParseNode>(udfParseNodes)); }
     ;
 
 limit returns [LimitNode ret]
@@ -813,17 +860,19 @@ term returns [ParseNode ret]
             if (!contextStack.isEmpty()) {
             	contextStack.peek().setAggregate(f.isAggregate());
             }
+            if(f instanceof UDFParseNode) udfParseNodes.put(f.getName(),(UDFParseNode)f);
             $ret = f;
         } 
     |   field=identifier LPAREN t=ASTERISK RPAREN 
         {
             if (!isCountFunction(field)) {
-                throwRecognitionException(t); 
+                throwRecognitionException(t);
             }
             FunctionParseNode f = factory.function(field, LiteralParseNode.STAR);
             if (!contextStack.isEmpty()) {
             	contextStack.peek().setAggregate(f.isAggregate());
             }
+            if(f instanceof UDFParseNode) udfParseNodes.put(f.getName(),(UDFParseNode)f);
             $ret = f;
         } 
     |   field=identifier LPAREN t=DISTINCT l=zero_or_more_expressions RPAREN 
@@ -832,6 +881,7 @@ term returns [ParseNode ret]
             if (!contextStack.isEmpty()) {
             	contextStack.peek().setAggregate(f.isAggregate());
             }
+            if(f instanceof UDFParseNode) udfParseNodes.put(f.getName(),(UDFParseNode)f);
             $ret = f;
         }
     |   e=case_statement { $ret = e; }
@@ -865,6 +915,19 @@ zero_or_more_expressions returns [List<ParseNode> ret]
     :  (v = expression {$ret.add(v);})?  (COMMA v = expression {$ret.add(v);} )*
 ;
 
+zero_or_more_data_types returns [List<FunctionArgument> ret]
+@init{ret = new ArrayList<FunctionArgument>(); }
+    : (dt = identifier (LPAREN l=NUMBER (COMMA s=NUMBER)? RPAREN)? ar=ARRAY? (lsq=LSQUARE (a=NUMBER)? RSQUARE)? (c = CONSTANT)? (DEFAULTVALUE EQ dv = value_expression)? (MINVALUE EQ minv = value_expression)?  (MAXVALUE EQ maxv = value_expression)? 
+    {$ret.add(new FunctionArgument(dt,  ar != null || lsq != null, c!=null, 
+    dv == null ? null : LiteralExpression.newConstant(((LiteralParseNode)dv).getValue()), 
+    minv == null ? null : LiteralExpression.newConstant(((LiteralParseNode)minv).getValue()), 
+    maxv == null ? null : LiteralExpression.newConstant(((LiteralParseNode)maxv).getValue())));})? (COMMA (dt = identifier (LPAREN l=NUMBER (COMMA s=NUMBER)? RPAREN)? ar=ARRAY? (lsq=LSQUARE (a=NUMBER)? RSQUARE)? (c = CONSTANT)? (DEFAULTVALUE EQ dv = value_expression)? (MINVALUE EQ minv = value_expression)?  (MAXVALUE EQ maxv = value_expression)?
+    {$ret.add(new FunctionArgument(dt,  ar != null || lsq != null, c!=null, 
+    dv == null ? null : LiteralExpression.newConstant(((LiteralParseNode)dv).getValue()), 
+    minv == null ? null : LiteralExpression.newConstant(((LiteralParseNode)minv).getValue()), 
+    maxv == null ? null : LiteralExpression.newConstant(((LiteralParseNode)maxv).getValue())));} ))*
+;
+
 value_expression_list returns [List<ParseNode> ret]
 @init{ret = new ArrayList<ParseNode>(); }
     :  LPAREN e = value_expression {$ret.add(e);}  (COMMA e = value_expression {$ret.add(e);} )* RPAREN

http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java
index fcef0ec..643112d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java
@@ -32,8 +32,10 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.memory.ChildMemoryManager;
 import org.apache.phoenix.memory.GlobalMemoryManager;
+import org.apache.phoenix.parse.PFunction;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PMetaDataEntity;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.util.SizedUtil;
 
@@ -57,17 +59,17 @@ public class GlobalCache extends TenantCacheImpl {
     // TODO: Use Guava cache with auto removal after lack of access 
     private final ConcurrentMap<ImmutableBytesWritable,TenantCache> perTenantCacheMap = new ConcurrentHashMap<ImmutableBytesWritable,TenantCache>();
     // Cache for lastest PTable for a given Phoenix table
-    private Cache<ImmutableBytesPtr,PTable> metaDataCache;
+    private Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache;
     
     public void clearTenantCache() {
         perTenantCacheMap.clear();
     }
     
-    public Cache<ImmutableBytesPtr,PTable> getMetaDataCache() {
+    public Cache<ImmutableBytesPtr,PMetaDataEntity> getMetaDataCache() {
         // Lazy initialize QueryServices so that we only attempt to create an HBase Configuration
         // object upon the first attempt to connect to any cluster. Otherwise, an attempt will be
         // made at driver initialization time which is too early for some systems.
-        Cache<ImmutableBytesPtr,PTable> result = metaDataCache;
+        Cache<ImmutableBytesPtr,PMetaDataEntity> result = metaDataCache;
         if (result == null) {
             synchronized(this) {
                 result = metaDataCache;
@@ -82,9 +84,9 @@ public class GlobalCache extends TenantCacheImpl {
                     metaDataCache = result = CacheBuilder.newBuilder()
                             .maximumWeight(maxSize)
                             .expireAfterAccess(maxTTL, TimeUnit.MILLISECONDS)
-                            .weigher(new Weigher<ImmutableBytesPtr, PTable>() {
+                            .weigher(new Weigher<ImmutableBytesPtr, PMetaDataEntity>() {
                                 @Override
-                                public int weigh(ImmutableBytesPtr key, PTable table) {
+                                public int weigh(ImmutableBytesPtr key, PMetaDataEntity table) {
                                     return SizedUtil.IMMUTABLE_BYTES_PTR_SIZE + key.getLength() + table.getEstimatedSize();
                                 }
                             })
@@ -157,4 +159,22 @@ public class GlobalCache extends TenantCacheImpl {
         }
         return tenantCache;
     }
+
+    public static class FunctionBytesPtr extends ImmutableBytesPtr {
+
+        public FunctionBytesPtr(byte[] key) {
+            super(key);
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if(obj instanceof FunctionBytesPtr) return super.equals(obj);
+            return false;
+        }
+
+        @Override
+        public int hashCode() {
+            return super.hashCode();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/compile/ColumnResolver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ColumnResolver.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ColumnResolver.java
index 7bb210b..55253ea 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ColumnResolver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ColumnResolver.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.compile;
 import java.sql.SQLException;
 import java.util.List;
 
+import org.apache.phoenix.parse.PFunction;
 import org.apache.phoenix.schema.ColumnRef;
 import org.apache.phoenix.schema.TableRef;
 
@@ -41,6 +42,11 @@ public interface ColumnResolver {
     public List<TableRef> getTables();
     
     /**
+     * Returns the collection of resolved functions.
+     */
+    public List<PFunction> getFunctions();
+
+    /**
      * Resolves table using name or alias.
      * @param schemaName the schema name
      * @param tableName the table name or table alias
@@ -60,4 +66,15 @@ public interface ColumnResolver {
      * @throws AmbiguousColumnException if the column name is ambiguous
      */
     public ColumnRef resolveColumn(String schemaName, String tableName, String colName) throws SQLException;
+        
+    /**
+     * Resolves function using functionName.
+     * @param functionName 
+     * @return the resolved PFunction
+     * @throws ColumnNotFoundException if the column could not be resolved
+     * @throws AmbiguousColumnException if the column name is ambiguous
+     */
+    public PFunction resolveFunction(String functionName) throws SQLException;
+
+    public boolean hasUDFs();
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateFunctionCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateFunctionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateFunctionCompiler.java
new file mode 100644
index 0000000..2e3a873
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateFunctionCompiler.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import java.sql.ParameterMetaData;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.util.Collections;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.parse.CreateFunctionStatement;
+import org.apache.phoenix.schema.MetaDataClient;
+
+public class CreateFunctionCompiler {
+
+    private final PhoenixStatement statement;
+    
+    public CreateFunctionCompiler(PhoenixStatement statement) {
+        this.statement = statement;
+    }
+
+    public MutationPlan compile(final CreateFunctionStatement create) throws SQLException {
+        final PhoenixConnection connection = statement.getConnection();
+        PhoenixConnection connectionToBe = connection;
+        final StatementContext context = new StatementContext(statement);
+        final MetaDataClient client = new MetaDataClient(connectionToBe);
+        
+        return new MutationPlan() {
+
+            @Override
+            public ParameterMetaData getParameterMetaData() {
+                return context.getBindManager().getParameterMetaData();
+            }
+
+            @Override
+            public MutationState execute() throws SQLException {
+                try {
+                    return client.createFunction(create);
+                } finally {
+                    if (client.getConnection() != connection) {
+                        client.getConnection().close();
+                    }
+                }
+            }
+
+            @Override
+            public ExplainPlan getExplainPlan() throws SQLException {
+                return new ExplainPlan(Collections.singletonList("CREATE FUNCTION"));
+            }
+
+            @Override
+            public PhoenixConnection getConnection() {
+                return connection;
+            }
+            
+            @Override
+            public StatementContext getContext() {
+                return context;
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java
index 07d9f56..f1937a5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java
@@ -46,7 +46,7 @@ public class CreateIndexCompiler {
 
     public MutationPlan compile(final CreateIndexStatement create) throws SQLException {
         final PhoenixConnection connection = statement.getConnection();
-        final ColumnResolver resolver = FromCompiler.getResolver(create, connection);
+        final ColumnResolver resolver = FromCompiler.getResolver(create, connection, create.getUdfParseNodes());
         Scan scan = new Scan();
         final StatementContext context = new StatementContext(statement, resolver, scan, new SequenceManager(statement));
         ExpressionCompiler expressionCompiler = new ExpressionCompiler(context);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index 4f6a719..575f0f3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -323,7 +323,7 @@ public class DeleteCompiler {
                         hint, false, aliasedNodes, delete.getWhere(), 
                         Collections.<ParseNode>emptyList(), null, 
                         delete.getOrderBy(), delete.getLimit(),
-                        delete.getBindCount(), false, false, Collections.<SelectStatement>emptyList());
+                        delete.getBindCount(), false, false, Collections.<SelectStatement>emptyList(), delete.getUdfParseNodes());
                 select = StatementNormalizer.normalize(select, resolver);
                 SelectStatement transformedSelect = SubqueryRewriter.transform(select, resolver, connection);
                 if (transformedSelect != select) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/compile/ExpressionCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ExpressionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ExpressionCompiler.java
index ab6b851..92899a6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ExpressionCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ExpressionCompiler.java
@@ -17,6 +17,7 @@
  */
 package org.apache.phoenix.compile;
 
+
 import java.math.BigDecimal;
 import java.sql.SQLException;
 import java.sql.SQLFeatureNotSupportedException;
@@ -70,6 +71,7 @@ import org.apache.phoenix.expression.function.ArrayAnyComparisonExpression;
 import org.apache.phoenix.expression.function.ArrayElemRefExpression;
 import org.apache.phoenix.expression.function.RoundDecimalExpression;
 import org.apache.phoenix.expression.function.RoundTimestampExpression;
+import org.apache.phoenix.expression.function.UDFExpression;
 import org.apache.phoenix.parse.AddParseNode;
 import org.apache.phoenix.parse.AndParseNode;
 import org.apache.phoenix.parse.ArithmeticParseNode;
@@ -95,12 +97,14 @@ import org.apache.phoenix.parse.ModulusParseNode;
 import org.apache.phoenix.parse.MultiplyParseNode;
 import org.apache.phoenix.parse.NotParseNode;
 import org.apache.phoenix.parse.OrParseNode;
+import org.apache.phoenix.parse.PFunction;
 import org.apache.phoenix.parse.ParseNode;
 import org.apache.phoenix.parse.RowValueConstructorParseNode;
 import org.apache.phoenix.parse.SequenceValueParseNode;
 import org.apache.phoenix.parse.StringConcatParseNode;
 import org.apache.phoenix.parse.SubqueryParseNode;
 import org.apache.phoenix.parse.SubtractParseNode;
+import org.apache.phoenix.parse.UDFParseNode;
 import org.apache.phoenix.parse.UnsupportedAllParseNodeVisitor;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
@@ -313,8 +317,19 @@ public class ExpressionCompiler extends UnsupportedAllParseNodeVisitor<Expressio
      * @param children the child expression arguments to the function expression node.
      */
     public Expression visitLeave(FunctionParseNode node, List<Expression> children) throws SQLException {
+        PFunction function = null;
+        if(node instanceof UDFParseNode) {
+            function = context.getResolver().resolveFunction(node.getName());
+            BuiltInFunctionInfo info = new BuiltInFunctionInfo(function);
+            node = new UDFParseNode(node.getName(), node.getChildren(), info);
+        }
         children = node.validate(children, context);
-        Expression expression = node.create(children, context);
+        Expression expression = null;
+        if (function == null) {
+            expression = node.create(children, context);
+        } else {
+            expression = node.create(children, function, context);
+        }
         ImmutableBytesWritable ptr = context.getTempPtr();
         BuiltInFunctionInfo info = node.getInfo();
         for (int i = 0; i < info.getRequiredArgCount(); i++) { 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
index da78b24..5fe0e6f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
@@ -22,9 +22,11 @@ import java.sql.SQLFeatureNotSupportedException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.HTableInterface;
@@ -35,12 +37,15 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.parse.AliasedNode;
 import org.apache.phoenix.parse.BindTableNode;
 import org.apache.phoenix.parse.ColumnDef;
+import org.apache.phoenix.parse.CreateFunctionStatement;
 import org.apache.phoenix.parse.CreateTableStatement;
 import org.apache.phoenix.parse.DMLStatement;
 import org.apache.phoenix.parse.DerivedTableNode;
 import org.apache.phoenix.parse.FamilyWildcardParseNode;
 import org.apache.phoenix.parse.JoinTableNode;
+import org.apache.phoenix.parse.NamedNode;
 import org.apache.phoenix.parse.NamedTableNode;
+import org.apache.phoenix.parse.PFunction;
 import org.apache.phoenix.parse.ParseNode;
 import org.apache.phoenix.parse.ParseNodeFactory;
 import org.apache.phoenix.parse.SelectStatement;
@@ -49,6 +54,7 @@ import org.apache.phoenix.parse.TableName;
 import org.apache.phoenix.parse.TableNode;
 import org.apache.phoenix.parse.TableNodeVisitor;
 import org.apache.phoenix.parse.TableWildcardParseNode;
+import org.apache.phoenix.parse.UDFParseNode;
 import org.apache.phoenix.parse.WildcardParseNode;
 import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.QueryConstants;
@@ -57,6 +63,7 @@ import org.apache.phoenix.schema.AmbiguousTableException;
 import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
 import org.apache.phoenix.schema.ColumnNotFoundException;
 import org.apache.phoenix.schema.ColumnRef;
+import org.apache.phoenix.schema.FunctionNotFoundException;
 import org.apache.phoenix.schema.MetaDataClient;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PColumnFamily;
@@ -103,6 +110,11 @@ public class FromCompiler {
         }
 
         @Override
+        public List<PFunction> getFunctions() {
+            return Collections.emptyList();
+        }
+
+        @Override
         public TableRef resolveTable(String schemaName, String tableName)
                 throws SQLException {
             throw new UnsupportedOperationException();
@@ -112,6 +124,14 @@ public class FromCompiler {
         public ColumnRef resolveColumn(String schemaName, String tableName, String colName) throws SQLException {
             throw new UnsupportedOperationException();
         }
+        
+        public PFunction resolveFunction(String functionName) throws SQLException {
+            throw new UnsupportedOperationException();
+        };
+
+        public boolean hasUDFs() {
+            return false;
+        };
     };
 
     public static ColumnResolver getResolverForCreation(final CreateTableStatement statement, final PhoenixConnection connection)
@@ -141,7 +161,7 @@ public class FromCompiler {
                     if (htable != null) Closeables.closeQuietly(htable);
                 }
                 tableNode = NamedTableNode.create(null, baseTable, statement.getColumnDefs());
-                return new SingleTableColumnResolver(connection, tableNode, e.getTimeStamp());
+                return new SingleTableColumnResolver(connection, tableNode, e.getTimeStamp(), new HashMap<String, UDFParseNode>(1));
             }
             throw e;
         }
@@ -166,9 +186,9 @@ public class FromCompiler {
     	if (fromNode == null)
     	    return EMPTY_TABLE_RESOLVER;
         if (fromNode instanceof NamedTableNode)
-            return new SingleTableColumnResolver(connection, (NamedTableNode) fromNode, true, 1);
+            return new SingleTableColumnResolver(connection, (NamedTableNode) fromNode, true, 1, statement.getUdfParseNodes());
 
-        MultiTableColumnResolver visitor = new MultiTableColumnResolver(connection, 1);
+        MultiTableColumnResolver visitor = new MultiTableColumnResolver(connection, 1, statement.getUdfParseNodes());
         fromNode.accept(visitor);
         return visitor;
     }
@@ -178,12 +198,24 @@ public class FromCompiler {
         return visitor;
     }
 
+    public static ColumnResolver getResolver(NamedTableNode tableNode, PhoenixConnection connection, Map<String, UDFParseNode> udfParseNodes) throws SQLException {
+        SingleTableColumnResolver visitor =
+                new SingleTableColumnResolver(connection, tableNode, true, 0, udfParseNodes);
+        return visitor;
+    }
+
     public static ColumnResolver getResolver(SingleTableStatement statement, PhoenixConnection connection)
             throws SQLException {
         SingleTableColumnResolver visitor = new SingleTableColumnResolver(connection, statement.getTable(), true);
         return visitor;
     }
 
+    public static ColumnResolver getResolver(SingleTableStatement statement, PhoenixConnection connection, Map<String, UDFParseNode> udfParseNodes)
+            throws SQLException {
+        SingleTableColumnResolver visitor = new SingleTableColumnResolver(connection, statement.getTable(), true, 0, udfParseNodes);
+        return visitor;
+    }
+
     public static ColumnResolver getResolverForCompiledDerivedTable(PhoenixConnection connection, TableRef tableRef, RowProjector projector)
             throws SQLException {
         List<PColumn> projectedColumns = new ArrayList<PColumn>();
@@ -205,26 +237,32 @@ public class FromCompiler {
         return visitor;
     }
 
+    public static ColumnResolver getResolver(PhoenixConnection connection, TableRef tableRef, Map<String, UDFParseNode> udfParseNodes)
+            throws SQLException {
+        SingleTableColumnResolver visitor = new SingleTableColumnResolver(connection, tableRef, udfParseNodes);
+        return visitor;
+    }
+
     public static ColumnResolver getResolverForMutation(DMLStatement statement, PhoenixConnection connection)
             throws SQLException {
         /*
          * We validate the meta data at commit time for mutations, as this allows us to do many UPSERT VALUES calls
          * without hitting the server each time to check if the meta data is up-to-date.
          */
-        SingleTableColumnResolver visitor = new SingleTableColumnResolver(connection, statement.getTable(), false);
+        SingleTableColumnResolver visitor = new SingleTableColumnResolver(connection, statement.getTable(), false, 0,  statement.getUdfParseNodes());
         return visitor;
     }
     
-    public static ColumnResolver getResolverForProjectedTable(PTable projectedTable) {
-        return new ProjectedTableColumnResolver(projectedTable);
+    public static ColumnResolver getResolverForProjectedTable(PTable projectedTable, PhoenixConnection connection, Map<String, UDFParseNode> udfParseNodes) throws SQLException {
+        return new ProjectedTableColumnResolver(projectedTable, connection, udfParseNodes);
     }
 
     private static class SingleTableColumnResolver extends BaseColumnResolver {
     	private final List<TableRef> tableRefs;
     	private final String alias;
 
-       public SingleTableColumnResolver(PhoenixConnection connection, NamedTableNode table, long timeStamp) throws SQLException  {
-           super(connection, 0);
+       public SingleTableColumnResolver(PhoenixConnection connection, NamedTableNode table, long timeStamp, Map<String, UDFParseNode> udfParseNodes) throws SQLException  {
+           super(connection, 0, false, udfParseNodes);
            List<PColumnFamily> families = Lists.newArrayListWithExpectedSize(table.getDynamicColumns().size());
            for (ColumnDef def : table.getDynamicColumns()) {
                if (def.getColumnDefName().getFamilyName() != null) {
@@ -239,11 +277,13 @@ public class FromCompiler {
        }
 
         public SingleTableColumnResolver(PhoenixConnection connection, NamedTableNode tableNode, boolean updateCacheImmediately) throws SQLException {
-            this(connection, tableNode, updateCacheImmediately, 0);
+            this(connection, tableNode, updateCacheImmediately, 0, new HashMap<String,UDFParseNode>(1));
         }
 
-        public SingleTableColumnResolver(PhoenixConnection connection, NamedTableNode tableNode, boolean updateCacheImmediately, int tsAddition) throws SQLException {
-            super(connection, tsAddition);
+        public SingleTableColumnResolver(PhoenixConnection connection, NamedTableNode tableNode,
+                boolean updateCacheImmediately, int tsAddition,
+                Map<String, UDFParseNode> udfParseNodes) throws SQLException {
+            super(connection, tsAddition, updateCacheImmediately, udfParseNodes);
             alias = tableNode.getAlias();
             TableRef tableRef = createTableRef(tableNode, updateCacheImmediately);
             tableRefs = ImmutableList.of(tableRef);
@@ -255,6 +295,12 @@ public class FromCompiler {
             tableRefs = ImmutableList.of(tableRef);
         }
 
+        public SingleTableColumnResolver(PhoenixConnection connection, TableRef tableRef, Map<String, UDFParseNode> udfParseNodes) throws SQLException {
+            super(connection, 0, false, udfParseNodes);
+            alias = tableRef.getTableAlias();
+            tableRefs = ImmutableList.of(tableRef);
+        }
+
         public SingleTableColumnResolver(TableRef tableRef) throws SQLException {
             super(null, 0);
             alias = tableRef.getTableAlias();
@@ -267,6 +313,11 @@ public class FromCompiler {
 		}
 
         @Override
+        public List<PFunction> getFunctions() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
         public TableRef resolveTable(String schemaName, String tableName)
                 throws SQLException {
             TableRef tableRef = tableRefs.get(0);
@@ -316,7 +367,6 @@ public class FromCompiler {
         			: tableRef.getTable().getColumn(colName);
             return new ColumnRef(tableRef, column.getPosition());
 		}
-
     }
 
     private static abstract class BaseColumnResolver implements ColumnResolver {
@@ -326,11 +376,30 @@ public class FromCompiler {
         // on Windows because the millis timestamp granularity is so bad we sometimes won't
         // get the data back that we just upsert.
         private final int tsAddition;
+        protected final Map<String, PFunction> functionMap;
+        protected List<PFunction> functions;
 
         private BaseColumnResolver(PhoenixConnection connection, int tsAddition) {
+            this.connection = connection;
+            this.client = connection == null ? null : new MetaDataClient(connection);
+            this.tsAddition = tsAddition;
+            functionMap = new HashMap<String, PFunction>(1);
+            this.functions = Collections.<PFunction>emptyList();
+        }
+
+        private BaseColumnResolver(PhoenixConnection connection, int tsAddition, boolean updateCacheImmediately, Map<String, UDFParseNode> udfParseNodes) throws SQLException {
         	this.connection = connection;
             this.client = connection == null ? null : new MetaDataClient(connection);
             this.tsAddition = tsAddition;
+            functionMap = new HashMap<String, PFunction>(1);
+            if (udfParseNodes.isEmpty()) {
+                functions = Collections.<PFunction> emptyList();
+            } else {
+                functions = createFunctionRef(new ArrayList<String>(udfParseNodes.keySet()), updateCacheImmediately);
+                for (PFunction function : functions) {
+                    functionMap.put(function.getFunctionName(), function);
+                }
+            }
         }
 
         protected TableRef createTableRef(NamedTableNode tableNode, boolean updateCacheImmediately) throws SQLException {
@@ -383,6 +452,85 @@ public class FromCompiler {
             return tableRef;
         }
 
+        @Override
+        public List<PFunction> getFunctions() {
+            return functions;
+        }
+
+        private List<PFunction> createFunctionRef(List<String> functionNames, boolean updateCacheImmediately) throws SQLException {
+            long timeStamp = QueryConstants.UNSET_TIMESTAMP;
+            int numFunctions = functionNames.size();
+            List<PFunction> functionsFound = new ArrayList<PFunction>(functionNames.size());
+            if (updateCacheImmediately || connection.getAutoCommit()) {
+                getFunctionFromCache(functionNames, functionsFound, true);
+                if(functionNames.isEmpty()) {
+                    return functionsFound;
+                }
+                MetaDataMutationResult result = client.updateCache(functionNames);
+                timeStamp = result.getMutationTime();
+                functionsFound = result.getFunctions();
+                if(functionNames.size() != functionsFound.size()){
+                    throw new FunctionNotFoundException("Some of the functions in "+functionNames.toString()+" are not found");
+                }
+            } else {
+                getFunctionFromCache(functionNames, functionsFound, false);
+                // We always attempt to update the cache in the event of a FunctionNotFoundException
+                MetaDataMutationResult result = null;
+                if (!functionNames.isEmpty()) {
+                    result = client.updateCache(functionNames);
+                }
+                if(result!=null) {
+                    if (!result.getFunctions().isEmpty()) {
+                        functionsFound.addAll(result.getFunctions());
+                    }
+                    if(result.wasUpdated()) {
+                        timeStamp = result.getMutationTime();
+                    }
+                }
+                if (functionsFound.size()!=numFunctions) {
+                    throw new FunctionNotFoundException("Some of the functions in "+functionNames.toString()+" are not found", timeStamp);
+                }
+            }
+            if (timeStamp != QueryConstants.UNSET_TIMESTAMP) {
+                timeStamp += tsAddition;
+            }
+            
+            if (logger.isDebugEnabled() && timeStamp != QueryConstants.UNSET_TIMESTAMP) {
+                logger.debug(LogUtil.addCustomAnnotations("Re-resolved stale function " + functionNames.toString() + "at timestamp " + timeStamp, connection));
+            }
+            return functionsFound;
+        }
+
+        private void getFunctionFromCache(List<String> functionNames,
+                List<PFunction> functionsFound, boolean getOnlyTemporyFunctions) {
+            Iterator<String> iterator = functionNames.iterator();
+            while(iterator.hasNext()) {
+                PFunction function = null;
+                String functionName = iterator.next();
+                try {
+                    function = connection.getMetaDataCache().getFunction(new PTableKey(connection.getTenantId(), functionName));
+                } catch (FunctionNotFoundException e1) {
+                    if (connection.getTenantId() != null) { // Check with null tenantId next
+                        try {
+                            function = connection.getMetaDataCache().getFunction(new PTableKey(null, functionName));
+                        } catch (FunctionNotFoundException e2) {
+                        }
+                    }
+                }
+                if (function != null) {
+                    if (getOnlyTemporyFunctions) {
+                        if (function.isTemporaryFunction()) {
+                            functionsFound.add(function);
+                            iterator.remove();
+                        }
+                    } else {
+                        functionsFound.add(function);
+                        iterator.remove();
+                    }
+                }
+            }
+        }
+
         protected PTable addDynamicColumns(List<ColumnDef> dynColumns, PTable theTable)
                 throws SQLException {
             if (!dynColumns.isEmpty()) {
@@ -409,6 +557,20 @@ public class FromCompiler {
             }
             return theTable;
         }
+        
+        @Override
+        public PFunction resolveFunction(String functionName) throws SQLException {
+            PFunction function = functionMap.get(functionName);
+            if(function == null) {
+                throw new FunctionNotFoundException(functionName);
+            }
+            return function;
+        }
+
+        @Override
+        public boolean hasUDFs() {
+            return !functions.isEmpty();
+        }
     }
 
     private static class MultiTableColumnResolver extends BaseColumnResolver implements TableNodeVisitor<Void> {
@@ -421,6 +583,12 @@ public class FromCompiler {
             tables = Lists.newArrayList();
         }
 
+        private MultiTableColumnResolver(PhoenixConnection connection, int tsAddition, Map<String, UDFParseNode> udfParseNodes) throws SQLException {
+            super(connection, tsAddition, false, udfParseNodes);
+            tableMap = ArrayListMultimap.<String, TableRef> create();
+            tables = Lists.newArrayList();
+        }
+
         @Override
         public List<TableRef> getTables() {
             return tables;
@@ -580,16 +748,14 @@ public class FromCompiler {
                 }
             }
         }
-
     }
     
     private static class ProjectedTableColumnResolver extends MultiTableColumnResolver {
         private final boolean isLocalIndex;
         private final List<TableRef> theTableRefs;
         private final Map<ColumnRef, Integer> columnRefMap;
-        
-        private ProjectedTableColumnResolver(PTable projectedTable) {
-            super(null, 0);
+        private ProjectedTableColumnResolver(PTable projectedTable, PhoenixConnection conn, Map<String, UDFParseNode> udfParseNodes) throws SQLException {
+            super(conn, 0, udfParseNodes);
             Preconditions.checkArgument(projectedTable.getType() == PTableType.PROJECTED);
             this.isLocalIndex = projectedTable.getIndexType() == IndexType.LOCAL;
             this.columnRefMap = new HashMap<ColumnRef, Integer>();
@@ -615,6 +781,7 @@ public class FromCompiler {
                 this.columnRefMap.put(new ColumnRef(tableRef, colRef.getColumnPosition()), column.getPosition());
             }
             this.theTableRefs = ImmutableList.of(new TableRef(ParseNodeFactory.createTempAlias(), projectedTable, ts, false));
+            
         }
         
         @Override