You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ra...@apache.org on 2015/04/27 10:33:51 UTC
[7/7] phoenix git commit: PHOENIX-538 Support UDFs(Rajeshbabu
Chintaguntla)
PHOENIX-538 Support UDFs(Rajeshbabu Chintaguntla)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/66bd3e35
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/66bd3e35
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/66bd3e35
Branch: refs/heads/master
Commit: 66bd3e35c0d2105dcc393116f8bb5851ce1f5ec4
Parents: cd29be2
Author: Rajeshbabu Chintaguntla <ra...@apache.org>
Authored: Mon Apr 27 14:03:44 2015 +0530
Committer: Rajeshbabu Chintaguntla <ra...@apache.org>
Committed: Mon Apr 27 14:03:44 2015 +0530
----------------------------------------------------------------------
bin/phoenix_utils.py | 9 +
bin/sqlline.py | 2 +-
.../end2end/QueryDatabaseMetaDataIT.java | 5 +
.../end2end/TenantSpecificTablesDDLIT.java | 5 +
.../phoenix/end2end/UserDefinedFunctionsIT.java | 605 +++
phoenix-core/src/main/antlr3/PhoenixSQL.g | 77 +-
.../org/apache/phoenix/cache/GlobalCache.java | 30 +-
.../apache/phoenix/compile/ColumnResolver.java | 17 +
.../phoenix/compile/CreateFunctionCompiler.java | 80 +
.../phoenix/compile/CreateIndexCompiler.java | 2 +-
.../apache/phoenix/compile/DeleteCompiler.java | 2 +-
.../phoenix/compile/ExpressionCompiler.java | 17 +-
.../apache/phoenix/compile/FromCompiler.java | 199 +-
.../apache/phoenix/compile/JoinCompiler.java | 9 +-
.../apache/phoenix/compile/PostDDLCompiler.java | 14 +
.../phoenix/compile/ProjectionCompiler.java | 2 +-
.../apache/phoenix/compile/QueryCompiler.java | 18 +-
.../apache/phoenix/compile/RowProjector.java | 32 +-
.../phoenix/compile/StatementNormalizer.java | 5 +-
.../phoenix/compile/SubqueryRewriter.java | 4 +-
.../phoenix/compile/SubselectRewriter.java | 2 +-
.../coprocessor/MetaDataEndpointImpl.java | 651 ++-
.../phoenix/coprocessor/MetaDataProtocol.java | 30 +-
.../coprocessor/generated/MetaDataProtos.java | 4274 +++++++++++++++---
.../coprocessor/generated/PFunctionProtos.java | 2942 ++++++++++++
.../phoenix/exception/SQLExceptionCode.java | 20 +-
.../phoenix/exception/SQLExceptionInfo.java | 16 +
.../phoenix/expression/ExpressionType.java | 4 +-
.../expression/function/ScalarFunction.java | 2 +-
.../expression/function/UDFExpression.java | 220 +
.../visitor/CloneExpressionVisitor.java | 6 +
.../apache/phoenix/index/IndexMaintainer.java | 50 +-
.../apache/phoenix/jdbc/PhoenixConnection.java | 35 +-
.../phoenix/jdbc/PhoenixDatabaseMetaData.java | 30 +
.../apache/phoenix/jdbc/PhoenixStatement.java | 132 +-
.../apache/phoenix/optimize/QueryOptimizer.java | 4 +-
.../phoenix/parse/CreateFunctionStatement.java | 42 +
.../phoenix/parse/CreateIndexStatement.java | 8 +-
.../org/apache/phoenix/parse/DMLStatement.java | 11 +-
.../apache/phoenix/parse/DeleteStatement.java | 5 +-
.../phoenix/parse/DropFunctionStatement.java | 41 +
.../apache/phoenix/parse/FunctionParseNode.java | 75 +-
.../parse/IndexExpressionParseNodeRewriter.java | 4 +-
.../org/apache/phoenix/parse/NamedNode.java | 2 +-
.../org/apache/phoenix/parse/PFunction.java | 255 ++
.../apache/phoenix/parse/ParseNodeFactory.java | 70 +-
.../apache/phoenix/parse/ParseNodeRewriter.java | 2 +-
.../apache/phoenix/parse/SelectStatement.java | 22 +-
.../org/apache/phoenix/parse/UDFParseNode.java | 27 +
.../apache/phoenix/parse/UpsertStatement.java | 9 +-
.../apache/phoenix/protobuf/ProtobufUtil.java | 10 +
.../phoenix/query/ConnectionQueryServices.java | 4 +
.../query/ConnectionQueryServicesImpl.java | 161 +-
.../query/ConnectionlessQueryServicesImpl.java | 52 +-
.../query/DelegateConnectionQueryServices.java | 31 +
.../apache/phoenix/query/MetaDataMutated.java | 3 +
.../apache/phoenix/query/QueryConstants.java | 36 +
.../org/apache/phoenix/query/QueryServices.java | 2 +
.../phoenix/query/QueryServicesOptions.java | 4 +-
.../schema/FunctionAlreadyExistsException.java | 58 +
.../schema/FunctionNotFoundException.java | 52 +
.../apache/phoenix/schema/MetaDataClient.java | 256 +-
.../NewerFunctionAlreadyExistsException.java | 39 +
.../org/apache/phoenix/schema/PMetaData.java | 6 +-
.../apache/phoenix/schema/PMetaDataEntity.java | 22 +
.../apache/phoenix/schema/PMetaDataImpl.java | 118 +-
.../java/org/apache/phoenix/schema/PTable.java | 3 +-
.../org/apache/phoenix/util/MetaDataUtil.java | 7 +-
.../org/apache/phoenix/util/SchemaUtil.java | 17 +-
.../apache/phoenix/parse/QueryParserTest.java | 18 -
.../query/ParallelIteratorsSplitTest.java | 15 +
phoenix-protocol/src/main/MetaDataService.proto | 37 +-
phoenix-protocol/src/main/PFunction.proto | 45 +
73 files changed, 10222 insertions(+), 899 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/bin/phoenix_utils.py
----------------------------------------------------------------------
diff --git a/bin/phoenix_utils.py b/bin/phoenix_utils.py
index 36c7b82..2cf7db7 100755
--- a/bin/phoenix_utils.py
+++ b/bin/phoenix_utils.py
@@ -64,6 +64,15 @@ def setPath():
phoenix_client_jar = find("phoenix-*-client.jar", phoenix_jar_path)
global phoenix_test_jar_path
phoenix_test_jar_path = os.path.join(current_dir, "..", "phoenix-core", "target","*")
+ global hadoop_common_jar_path
+ hadoop_common_jar_path = os.path.join(current_dir, "..", "phoenix-assembly", "target","*")
+ global hadoop_common_jar
+ hadoop_common_jar = find("hadoop-common*.jar", hadoop_common_jar_path)
+ global hadoop_hdfs_jar_path
+ hadoop_hdfs_jar_path = os.path.join(current_dir, "..", "phoenix-assembly", "target","*")
+ global hadoop_hdfs_jar
+ hadoop_hdfs_jar = find("hadoop-hdfs*.jar", hadoop_hdfs_jar_path)
+
global hbase_conf_dir
hbase_conf_dir = os.getenv('HBASE_CONF_DIR', os.getenv('HBASE_CONF_PATH', '.'))
global hbase_conf_path # keep conf_path around for backward compatibility
http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/bin/sqlline.py
----------------------------------------------------------------------
diff --git a/bin/sqlline.py b/bin/sqlline.py
index 6e5b5fa..80b5ff7 100755
--- a/bin/sqlline.py
+++ b/bin/sqlline.py
@@ -53,7 +53,7 @@ colorSetting = "true"
if os.name == 'nt':
colorSetting = "false"
-java_cmd = 'java -cp "' + phoenix_utils.hbase_conf_dir + os.pathsep + phoenix_utils.phoenix_client_jar + \
+java_cmd = 'java -cp "' + phoenix_utils.hbase_conf_dir + os.pathsep + phoenix_utils.phoenix_client_jar + os.pathsep + phoenix_utils.hadoop_common_jar + os.pathsep + phoenix_utils.hadoop_hdfs_jar + \
'" -Dlog4j.configuration=file:' + \
os.path.join(phoenix_utils.current_dir, "log4j.properties") + \
" sqlline.SqlLine -d org.apache.phoenix.jdbc.PhoenixDriver \
http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java
index c9ec0ce..61459a5 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java
@@ -19,6 +19,7 @@ package org.apache.phoenix.end2end;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_FUNCTION_TABLE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TYPE_SEQUENCE;
import static org.apache.phoenix.util.TestUtil.ATABLE_NAME;
import static org.apache.phoenix.util.TestUtil.ATABLE_SCHEMA_NAME;
@@ -125,6 +126,10 @@ public class QueryDatabaseMetaDataIT extends BaseClientManagedTimeIT {
assertEquals(PTableType.SYSTEM.toString(), rs.getString("TABLE_TYPE"));
assertTrue(rs.next());
assertEquals(rs.getString("TABLE_SCHEM"),SYSTEM_CATALOG_SCHEMA);
+ assertEquals(rs.getString("TABLE_NAME"),SYSTEM_FUNCTION_TABLE);
+ assertEquals(PTableType.SYSTEM.toString(), rs.getString("TABLE_TYPE"));
+ assertTrue(rs.next());
+ assertEquals(rs.getString("TABLE_SCHEM"),SYSTEM_CATALOG_SCHEMA);
assertEquals(rs.getString("TABLE_NAME"),TYPE_SEQUENCE);
assertEquals(PTableType.SYSTEM.toString(), rs.getString("TABLE_TYPE"));
assertTrue(rs.next());
http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java
index 4d0b45d..a7c7291 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java
@@ -26,6 +26,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.KEY_SEQ;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ORDINAL_POSITION;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_FUNCTION_TABLE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TYPE_SEQUENCE;
import static org.apache.phoenix.schema.PTableType.SYSTEM;
import static org.apache.phoenix.schema.PTableType.TABLE;
@@ -473,6 +474,8 @@ public class TenantSpecificTablesDDLIT extends BaseTenantSpecificTablesIT {
assertTrue(rs.next());
assertTableMetaData(rs, SYSTEM_CATALOG_SCHEMA, SYSTEM_CATALOG_TABLE, SYSTEM);
assertTrue(rs.next());
+ assertTableMetaData(rs, SYSTEM_CATALOG_SCHEMA, SYSTEM_FUNCTION_TABLE, SYSTEM);
+ assertTrue(rs.next());
assertTableMetaData(rs, SYSTEM_CATALOG_SCHEMA, TYPE_SEQUENCE, SYSTEM);
assertTrue(rs.next());
assertTableMetaData(rs, SYSTEM_CATALOG_SCHEMA, PhoenixDatabaseMetaData.SYSTEM_STATS_TABLE, SYSTEM);
@@ -539,6 +542,8 @@ public class TenantSpecificTablesDDLIT extends BaseTenantSpecificTablesIT {
assertTrue(rs.next());
assertTableMetaData(rs, PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA, PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE, PTableType.SYSTEM);
assertTrue(rs.next());
+ assertTableMetaData(rs, SYSTEM_CATALOG_SCHEMA, SYSTEM_FUNCTION_TABLE, SYSTEM);
+ assertTrue(rs.next());
assertTableMetaData(rs, PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA, PhoenixDatabaseMetaData.TYPE_SEQUENCE, PTableType.SYSTEM);
assertTrue(rs.next());
assertTableMetaData(rs, SYSTEM_CATALOG_SCHEMA, PhoenixDatabaseMetaData.SYSTEM_STATS_TABLE, PTableType.SYSTEM);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/it/java/org/apache/phoenix/end2end/UserDefinedFunctionsIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UserDefinedFunctionsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UserDefinedFunctionsIT.java
new file mode 100644
index 0000000..d56004b
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UserDefinedFunctionsIT.java
@@ -0,0 +1,605 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR;
+import static org.apache.phoenix.util.PhoenixRuntime.PHOENIX_TEST_DRIVER_URL_PARAM;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_FUNCTION_TABLE;
+import static org.apache.phoenix.query.QueryServices.DYNAMIC_JARS_DIR_KEY;
+import static org.apache.phoenix.util.TestUtil.LOCALHOST;
+import static org.junit.Assert.*;
+
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.Map;
+import java.util.Properties;
+import java.util.jar.Attributes;
+import java.util.jar.JarEntry;
+import java.util.jar.JarOutputStream;
+import java.util.jar.Manifest;
+
+import javax.tools.JavaCompiler;
+import javax.tools.ToolProvider;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.phoenix.expression.function.UDFExpression;
+import org.apache.phoenix.jdbc.PhoenixTestDriver;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.FunctionAlreadyExistsException;
+import org.apache.phoenix.schema.FunctionNotFoundException;
+import org.apache.phoenix.schema.ValueRangeExcpetion;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+
+public class UserDefinedFunctionsIT extends BaseTest{
+
+ protected static final String TENANT_ID = "ZZTop";
+ private static String url;
+ private static PhoenixTestDriver driver;
+ private static HBaseTestingUtility util;
+
+ private static String STRING_REVERSE_EVALUATE_METHOD =
+ new StringBuffer()
+ .append(" public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {\n")
+ .append(" Expression arg = getChildren().get(0);\n")
+ .append(" if (!arg.evaluate(tuple, ptr)) {\n")
+ .append(" return false;\n")
+ .append(" }\n")
+ .append(" int targetOffset = ptr.getLength();\n")
+ .append(" if (targetOffset == 0) {\n")
+ .append(" return true;\n")
+ .append(" }\n")
+ .append(" byte[] source = ptr.get();\n")
+ .append(" byte[] target = new byte[targetOffset];\n")
+ .append(" int sourceOffset = ptr.getOffset(); \n")
+ .append(" int endOffset = sourceOffset + ptr.getLength();\n")
+ .append(" SortOrder sortOrder = arg.getSortOrder();\n")
+ .append(" while (sourceOffset < endOffset) {\n")
+ .append(" int nBytes = StringUtil.getBytesInChar(source[sourceOffset], sortOrder);\n")
+ .append(" targetOffset -= nBytes;\n")
+ .append(" System.arraycopy(source, sourceOffset, target, targetOffset, nBytes);\n")
+ .append(" sourceOffset += nBytes;\n")
+ .append(" }\n")
+ .append(" ptr.set(target);\n")
+ .append(" return true;\n")
+ .append(" }\n").toString();
+
+ private static String SUM_COLUMN_VALUES_EVALUATE_METHOD =
+ new StringBuffer()
+ .append(" public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {\n")
+ .append(" int[] array = new int[getChildren().size()];\n")
+ .append(" int i = 0;\n")
+ .append(" for(Expression child:getChildren()) {\n")
+ .append(" if (!child.evaluate(tuple, ptr)) {\n")
+ .append(" return false;\n")
+ .append(" }\n")
+ .append(" int targetOffset = ptr.getLength();\n")
+ .append(" if (targetOffset == 0) {\n")
+ .append(" return true;\n")
+ .append(" }\n")
+ .append(" array[i++] = (Integer) PInteger.INSTANCE.toObject(ptr);\n")
+ .append(" }\n")
+ .append(" int sum = 0;\n")
+ .append(" for(i=0;i<getChildren().size();i++) {\n")
+ .append(" sum+=array[i];\n")
+ .append(" }\n")
+ .append(" ptr.set(PInteger.INSTANCE.toBytes((Integer)sum));\n")
+ .append(" return true;\n")
+ .append(" }\n").toString();
+
+ private static String MY_REVERSE_CLASS_NAME = "MyReverse";
+ private static String MY_SUM_CLASS_NAME = "MySum";
+ private static String MY_REVERSE_PROGRAM = getProgram(MY_REVERSE_CLASS_NAME, STRING_REVERSE_EVALUATE_METHOD, "PVarchar");
+ private static String MY_SUM_PROGRAM = getProgram(MY_SUM_CLASS_NAME, SUM_COLUMN_VALUES_EVALUATE_METHOD, "PInteger");
+ private static Properties EMPTY_PROPS = new Properties();
+
+
+ private static String getProgram(String className, String evaluateMethod, String returnType) {
+ return new StringBuffer()
+ .append("package org.apache.phoenix.end2end;\n")
+ .append("import java.sql.SQLException;\n")
+ .append("import java.sql.SQLException;\n")
+ .append("import java.util.List;\n")
+ .append("import org.apache.hadoop.hbase.io.ImmutableBytesWritable;\n")
+ .append("import org.apache.phoenix.expression.Expression;\n")
+ .append("import org.apache.phoenix.expression.function.ScalarFunction;\n")
+ .append("import org.apache.phoenix.schema.SortOrder;\n")
+ .append("import org.apache.phoenix.schema.tuple.Tuple;\n")
+ .append("import org.apache.phoenix.schema.types.PDataType;\n")
+ .append("import org.apache.phoenix.schema.types.PInteger;\n")
+ .append("import org.apache.phoenix.schema.types.PVarchar;\n")
+ .append("import org.apache.phoenix.util.StringUtil;\n")
+ .append("public class "+className+" extends ScalarFunction{\n")
+ .append(" public static final String NAME = \"MY_REVERSE\";\n")
+ .append(" public "+className+"() {\n")
+ .append(" }\n")
+ .append(" public "+className+"(List<Expression> children) throws SQLException {\n")
+ .append(" super(children);\n")
+ .append(" }\n")
+ .append(" @Override\n")
+ .append(evaluateMethod)
+ .append(" @Override\n")
+ .append(" public SortOrder getSortOrder() {\n")
+ .append(" return getChildren().get(0).getSortOrder();\n")
+ .append(" }\n")
+ .append(" @Override\n")
+ .append(" public PDataType getDataType() {\n")
+ .append(" return "+returnType+".INSTANCE;\n")
+ .append(" }\n")
+ .append(" @Override\n")
+ .append(" public String getName() {\n")
+ .append(" return NAME;\n")
+ .append(" }\n")
+ .append("}\n").toString();
+ }
+
+ @BeforeClass
+ public static void doSetup() throws Exception {
+ Configuration conf = HBaseConfiguration.create();
+ setUpConfigForMiniCluster(conf);
+ util = new HBaseTestingUtility(conf);
+ util.startMiniDFSCluster(1);
+ util.startMiniZKCluster(1);
+ String string = util.getConfiguration().get("fs.defaultFS");
+ conf.set(DYNAMIC_JARS_DIR_KEY, string+"/hbase/tmpjars");
+ util.startMiniHBaseCluster(1, 1);
+ UDFExpression.setConfig(conf);
+ compileTestClass(MY_REVERSE_CLASS_NAME, MY_REVERSE_PROGRAM, 1);
+ compileTestClass(MY_SUM_CLASS_NAME, MY_SUM_PROGRAM, 2);
+ String clientPort = util.getConfiguration().get(QueryServices.ZOOKEEPER_PORT_ATTRIB);
+ url =
+ JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + LOCALHOST + JDBC_PROTOCOL_SEPARATOR
+ + clientPort + JDBC_PROTOCOL_TERMINATOR + PHOENIX_TEST_DRIVER_URL_PARAM;
+ Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+ props.put(QueryServices.ALLOW_USER_DEFINED_FUNCTIONS_ATTRIB, "true");
+ driver = initAndRegisterDriver(url, new ReadOnlyProps(props.entrySet().iterator()));
+ }
+
+ @Test
+ public void testCreateFunction() throws Exception {
+ Connection conn = driver.connect(url, EMPTY_PROPS);
+ Statement stmt = conn.createStatement();
+ conn.createStatement().execute("create table t(k integer primary key, firstname varchar, lastname varchar)");
+ stmt.execute("upsert into t values(1,'foo','jock')");
+ conn.commit();
+ stmt.execute("create function myreverse(VARCHAR) returns VARCHAR as 'org.apache.phoenix.end2end."+MY_REVERSE_CLASS_NAME+"' using jar "
+ + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar1.jar"+"'");
+ ResultSet rs = stmt.executeQuery("select myreverse(firstname) from t");
+ assertTrue(rs.next());
+ assertEquals("oof", rs.getString(1));
+ assertFalse(rs.next());
+ rs = stmt.executeQuery("select * from t where myreverse(firstname)='oof'");
+ assertTrue(rs.next());
+ assertEquals(1, rs.getInt(1));
+ assertEquals("foo", rs.getString(2));
+ assertEquals("jock", rs.getString(3));
+ assertFalse(rs.next());
+
+ try {
+ stmt.execute("create function myreverse(VARCHAR) returns VARCHAR as 'org.apache.phoenix.end2end."+MY_REVERSE_CLASS_NAME+"' using jar "
+ + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar1.jar"+"'");
+ fail("Duplicate function should not be created.");
+ } catch(FunctionAlreadyExistsException e) {
+ }
+ // without specifying the jar should pick the class from path of hbase.dynamic.jars.dir configuration.
+ stmt.execute("create function myreverse2(VARCHAR) returns VARCHAR as 'org.apache.phoenix.end2end."+MY_REVERSE_CLASS_NAME+"'");
+ rs = stmt.executeQuery("select myreverse2(firstname) from t");
+ assertTrue(rs.next());
+ assertEquals("oof", rs.getString(1));
+ assertFalse(rs.next());
+ rs = stmt.executeQuery("select * from t where myreverse2(firstname)='oof'");
+ assertTrue(rs.next());
+ assertEquals(1, rs.getInt(1));
+ assertEquals("foo", rs.getString(2));
+ assertEquals("jock", rs.getString(3));
+ assertFalse(rs.next());
+ conn.createStatement().execute("create table t3(tenant_id varchar not null, k integer not null, firstname varchar, lastname varchar constraint pk primary key(tenant_id,k)) MULTI_TENANT=true");
+ // Function created with global id should be accessible.
+ Connection conn2 = driver.connect(url+";"+PhoenixRuntime.TENANT_ID_ATTRIB+"="+TENANT_ID, EMPTY_PROPS);
+ try {
+ conn2.createStatement().execute("upsert into t3 values(1,'foo','jock')");
+ conn2.commit();
+ conn2.createStatement().execute("create function myreverse(VARCHAR) returns VARCHAR as 'org.apache.phoenix.end2end."+MY_REVERSE_CLASS_NAME+"' using jar "
+ + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar1.jar"+"'");
+ rs = conn2.createStatement().executeQuery("select myreverse(firstname) from t3");
+ assertTrue(rs.next());
+ assertEquals("oof", rs.getString(1));
+ } catch(FunctionAlreadyExistsException e) {
+ fail("FunctionAlreadyExistsException should not be thrown");
+ }
+ // calling global udf on tenant specific specific connection.
+ rs = conn2.createStatement().executeQuery("select myreverse2(firstname) from t3");
+ assertTrue(rs.next());
+ assertEquals("oof", rs.getString(1));
+ try {
+ conn2.createStatement().execute("drop function myreverse2");
+ fail("FunctionNotFoundException should be thrown");
+ } catch(FunctionNotFoundException e){
+
+ }
+ conn.createStatement().execute("drop function myreverse2");
+ try {
+ rs = conn2.createStatement().executeQuery("select myreverse2(firstname) from t3");
+ fail("FunctionNotFoundException should be thrown.");
+ } catch(FunctionNotFoundException e){
+
+ }
+ try{
+ rs = conn2.createStatement().executeQuery("select unknownFunction(firstname) from t3");
+ fail("FunctionNotFoundException should be thrown.");
+ } catch(FunctionNotFoundException e) {
+
+ }
+ }
+
+ @Test
+ public void testSameUDFWithDifferentImplementationsInDifferentTenantConnections() throws Exception {
+ Connection nonTenantConn = driver.connect(url, EMPTY_PROPS);
+ nonTenantConn.createStatement().execute("create function myfunction(VARCHAR) returns VARCHAR as 'org.apache.phoenix.end2end."+MY_REVERSE_CLASS_NAME+"' using jar "
+ + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar1.jar"+"'");
+ try {
+ nonTenantConn.createStatement().execute("create function myfunction(VARCHAR) returns VARCHAR as 'org.apache.phoenix.end2end.UnknownClass' using jar "
+ + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar1.jar"+"'");
+ fail("FunctionAlreadyExistsException should be thrown.");
+ } catch(FunctionAlreadyExistsException e) {
+
+ }
+ String tenantId1="tenId1";
+ String tenantId2="tenId2";
+ nonTenantConn.createStatement().execute("create table t7(tenant_id varchar not null, k integer not null, k1 integer, name varchar constraint pk primary key(tenant_id, k)) multi_tenant=true");
+ Connection tenant1Conn = driver.connect(url+";"+PhoenixRuntime.TENANT_ID_ATTRIB+"="+tenantId1, EMPTY_PROPS);
+ Connection tenant2Conn = driver.connect(url+";"+PhoenixRuntime.TENANT_ID_ATTRIB+"="+tenantId2, EMPTY_PROPS);
+ tenant1Conn.createStatement().execute("upsert into t7 values(1,1,'jock')");
+ tenant1Conn.commit();
+ tenant2Conn.createStatement().execute("upsert into t7 values(1,2,'jock')");
+ tenant2Conn.commit();
+ tenant1Conn.createStatement().execute("create function myfunction(VARCHAR) returns VARCHAR as 'org.apache.phoenix.end2end."+MY_REVERSE_CLASS_NAME+"' using jar "
+ + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar1.jar"+"'");
+ try {
+ tenant1Conn.createStatement().execute("create function myfunction(VARCHAR) returns VARCHAR as 'org.apache.phoenix.end2end.UnknownClass' using jar "
+ + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar1.jar"+"'");
+ fail("FunctionAlreadyExistsException should be thrown.");
+ } catch(FunctionAlreadyExistsException e) {
+
+ }
+
+ tenant2Conn.createStatement().execute("create function myfunction(INTEGER, INTEGER CONSTANT defaultValue='10' minvalue='1' maxvalue='15' ) returns INTEGER as 'org.apache.phoenix.end2end."+MY_SUM_CLASS_NAME+"' using jar "
+ + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar2.jar"+"'");
+ try {
+ tenant2Conn.createStatement().execute("create function myfunction(VARCHAR) returns VARCHAR as 'org.apache.phoenix.end2end.UnknownClass' using jar "
+ + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/unknown.jar"+"'");
+ fail("FunctionAlreadyExistsException should be thrown.");
+ } catch(FunctionAlreadyExistsException e) {
+
+ }
+
+ ResultSet rs = tenant1Conn.createStatement().executeQuery("select MYFUNCTION(name) from t7");
+ assertTrue(rs.next());
+ assertEquals("kcoj", rs.getString(1));
+ assertFalse(rs.next());
+ rs = tenant1Conn.createStatement().executeQuery("select * from t7 where MYFUNCTION(name)='kcoj'");
+ assertTrue(rs.next());
+ assertEquals(1, rs.getInt(1));
+ assertEquals(1, rs.getInt(2));
+ assertEquals("jock", rs.getString(3));
+ assertFalse(rs.next());
+
+ rs = tenant2Conn.createStatement().executeQuery("select MYFUNCTION(k) from t7");
+ assertTrue(rs.next());
+ assertEquals(11, rs.getInt(1));
+ assertFalse(rs.next());
+ rs = tenant2Conn.createStatement().executeQuery("select * from t7 where MYFUNCTION(k1)=12");
+ assertTrue(rs.next());
+ assertEquals(1, rs.getInt(1));
+ assertEquals(2, rs.getInt(2));
+ assertEquals("jock", rs.getString(3));
+ assertFalse(rs.next());
+ }
+
+ @Test
+ public void testUDFsWithMultipleConnections() throws Exception {
+ Connection conn1 = driver.connect(url, EMPTY_PROPS);
+ conn1.createStatement().execute("create function myfunction(VARCHAR) returns VARCHAR as 'org.apache.phoenix.end2end."+MY_REVERSE_CLASS_NAME+"' using jar "
+ + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar1.jar"+"'");
+ Connection conn2 = driver.connect(url, EMPTY_PROPS);
+ try{
+ conn2.createStatement().execute("create function myfunction(VARCHAR) returns VARCHAR as 'org.apache.phoenix.end2end."+MY_REVERSE_CLASS_NAME+"' using jar "
+ + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar1.jar"+"'");
+ fail("FunctionAlreadyExistsException should be thrown.");
+ } catch(FunctionAlreadyExistsException e) {
+
+ }
+ conn2.createStatement().execute("create table t8(k integer not null primary key, k1 integer, name varchar)");
+ conn2.createStatement().execute("upsert into t8 values(1,1,'jock')");
+ conn2.commit();
+ ResultSet rs = conn2.createStatement().executeQuery("select MYFUNCTION(name) from t8");
+ assertTrue(rs.next());
+ assertEquals("kcoj", rs.getString(1));
+ assertFalse(rs.next());
+ rs = conn2.createStatement().executeQuery("select * from t8 where MYFUNCTION(name)='kcoj'");
+ assertTrue(rs.next());
+ assertEquals(1, rs.getInt(1));
+ assertEquals(1, rs.getInt(2));
+ assertEquals("jock", rs.getString(3));
+ assertFalse(rs.next());
+ conn2.createStatement().execute("drop function MYFUNCTION");
+ try {
+ rs = conn1.createStatement().executeQuery("select MYFUNCTION(name) from t8");
+ fail("FunctionNotFoundException should be thrown");
+ } catch(FunctionNotFoundException e) {
+
+ }
+ }
+ @Test
+ public void testUsingUDFFunctionInDifferentQueries() throws Exception {
+ Connection conn = driver.connect(url, EMPTY_PROPS);
+ Statement stmt = conn.createStatement();
+ conn.createStatement().execute("create table t1(k integer primary key, firstname varchar, lastname varchar)");
+ stmt.execute("upsert into t1 values(1,'foo','jock')");
+ conn.commit();
+ conn.createStatement().execute("create table t2(k integer primary key, k1 integer, lastname_reverse varchar)");
+ conn.commit();
+ stmt.execute("create function mysum3(INTEGER, INTEGER CONSTANT defaultValue='10' minvalue='1' maxvalue='15' ) returns INTEGER as 'org.apache.phoenix.end2end."+MY_SUM_CLASS_NAME+"' using jar "
+ + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar2.jar"+"'");
+ stmt.execute("create function myreverse3(VARCHAR) returns VARCHAR as 'org.apache.phoenix.end2end."+MY_REVERSE_CLASS_NAME+"' using jar "
+ + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar1.jar"+"'");
+ stmt.execute("upsert into t2(k,k1,lastname_reverse) select mysum3(k),mysum3(k,11),myreverse3(lastname) from t1");
+ conn.commit();
+ ResultSet rs = stmt.executeQuery("select * from t2");
+ assertTrue(rs.next());
+ assertEquals(11, rs.getInt(1));
+ assertEquals(12, rs.getInt(2));
+ assertEquals("kcoj", rs.getString(3));
+ assertFalse(rs.next());
+ stmt.execute("delete from t2 where myreverse3(lastname_reverse)='jock' and mysum3(k)=21");
+ conn.commit();
+ rs = stmt.executeQuery("select * from t2");
+ assertFalse(rs.next());
+ stmt.execute("create function myreverse4(VARCHAR CONSTANT defaultValue='null') returns VARCHAR as 'org.apache.phoenix.end2end."+MY_REVERSE_CLASS_NAME+"'");
+ stmt.execute("upsert into t2 values(11,12,myreverse4('jock'))");
+ conn.commit();
+ rs = stmt.executeQuery("select * from t2");
+ assertTrue(rs.next());
+ assertEquals(11, rs.getInt(1));
+ assertEquals(12, rs.getInt(2));
+ assertEquals("kcoj", rs.getString(3));
+ assertFalse(rs.next());
+ }
+
+ @Test
+ public void testVerifyCreateFunctionArguments() throws Exception {
+ Connection conn = driver.connect(url, EMPTY_PROPS);
+ Statement stmt = conn.createStatement();
+ conn.createStatement().execute("create table t4(k integer primary key, k1 integer, lastname varchar)");
+ stmt.execute("upsert into t4 values(1,1,'jock')");
+ conn.commit();
+ stmt.execute("create function mysum(INTEGER, INTEGER CONSTANT defaultValue='10' minvalue='1' maxvalue='15' ) returns INTEGER as 'org.apache.phoenix.end2end."+MY_SUM_CLASS_NAME+"' using jar "
+ + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar2.jar"+"'");
+ ResultSet rs = stmt.executeQuery("select mysum(k,12) from t4");
+ assertTrue(rs.next());
+ assertEquals(13, rs.getInt(1));
+ rs = stmt.executeQuery("select mysum(k) from t4");
+ assertTrue(rs.next());
+ assertEquals(11, rs.getInt(1));
+ try {
+ stmt.executeQuery("select mysum(k,20) from t4");
+ fail("Value Range Exception should be thrown.");
+ } catch(ValueRangeExcpetion e) {
+
+ }
+ }
+
+ @Test
+ public void testTemporaryFunctions() throws Exception {
+ Connection conn = driver.connect(url, EMPTY_PROPS);
+ Statement stmt = conn.createStatement();
+ conn.createStatement().execute("create table t9(k integer primary key, k1 integer, lastname varchar)");
+ stmt.execute("upsert into t9 values(1,1,'jock')");
+ conn.commit();
+ stmt.execute("create temporary function mysum9(INTEGER, INTEGER CONSTANT defaultValue='10' minvalue='1' maxvalue='15' ) returns INTEGER as 'org.apache.phoenix.end2end."+MY_SUM_CLASS_NAME+"' using jar "
+ + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar2.jar"+"'");
+ ResultSet rs = stmt.executeQuery("select mysum9(k,12) from t9");
+ assertTrue(rs.next());
+ assertEquals(13, rs.getInt(1));
+ rs = stmt.executeQuery("select mysum9(k) from t9");
+ assertTrue(rs.next());
+ assertEquals(11, rs.getInt(1));
+ rs = stmt.executeQuery("select k from t9 where mysum9(k)=11");
+ assertTrue(rs.next());
+ assertEquals(1, rs.getInt(1));
+ stmt.execute("drop function mysum9");
+ try {
+ rs = stmt.executeQuery("select k from t9 where mysum9(k)=11");
+ fail("FunctionNotFoundException should be thrown");
+ } catch(FunctionNotFoundException e){
+
+ }
+ }
+
+ @Test
+ public void testDropFunction() throws Exception {
+ Connection conn = driver.connect(url, EMPTY_PROPS);
+ Statement stmt = conn.createStatement();
+ String query = "select count(*) from "+ SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_FUNCTION_TABLE + "\"";
+ ResultSet rs = stmt.executeQuery(query);
+ rs.next();
+ int numRowsBefore = rs.getInt(1);
+ stmt.execute("create function mysum6(INTEGER, INTEGER CONSTANT defaultValue='10' minvalue='1' maxvalue='15' ) returns INTEGER as 'org.apache.phoenix.end2end."+MY_SUM_CLASS_NAME+"' using jar "
+ + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar2.jar"+"'");
+ rs = stmt.executeQuery(query);
+ rs.next();
+ int numRowsAfter= rs.getInt(1);
+ assertEquals(3, numRowsAfter - numRowsBefore);
+ stmt.execute("drop function mysum6");
+ rs = stmt.executeQuery(query);
+ rs.next();
+ assertEquals(numRowsBefore, rs.getInt(1));
+ conn.createStatement().execute("create table t6(k integer primary key, k1 integer, lastname varchar)");
+ try {
+ rs = stmt.executeQuery("select mysum6(k1) from t6");
+ fail("FunctionNotFoundException should be thrown");
+ } catch(FunctionNotFoundException e) {
+
+ }
+ try {
+ stmt.execute("drop function mysum6");
+ fail("FunctionNotFoundException should be thrown");
+ } catch(FunctionNotFoundException e) {
+
+ }
+ try {
+ stmt.execute("drop function if exists mysum6");
+ } catch(FunctionNotFoundException e) {
+ fail("FunctionNotFoundException should not be thrown");
+ }
+ stmt.execute("create function mysum6(INTEGER, INTEGER CONSTANT defaultValue='10' minvalue='1' maxvalue='15' ) returns INTEGER as 'org.apache.phoenix.end2end."+MY_SUM_CLASS_NAME+"' using jar "
+ + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar2.jar"+"'");
+ try {
+ rs = stmt.executeQuery("select mysum6(k1) from t6");
+ } catch(FunctionNotFoundException e) {
+ fail("FunctionNotFoundException should not be thrown");
+ }
+ }
+
+ @Test
+ public void testFunctionalIndexesWithUDFFunction() throws Exception {
+ Connection conn = driver.connect(url, EMPTY_PROPS);
+ Statement stmt = conn.createStatement();
+ stmt.execute("create table t5(k integer primary key, k1 integer, lastname_reverse varchar)");
+ stmt.execute("create function myreverse5(VARCHAR) returns VARCHAR as 'org.apache.phoenix.end2end."+MY_REVERSE_CLASS_NAME+"'");
+ stmt.execute("upsert into t5 values(1,1,'jock')");
+ conn.commit();
+ stmt.execute("create index idx on t5(myreverse5(lastname_reverse))");
+ String query = "select myreverse5(lastname_reverse) from t5";
+ ResultSet rs = stmt.executeQuery("explain " + query);
+ assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER IDX\n"
+ + " SERVER FILTER BY FIRST KEY ONLY", QueryUtil.getExplainPlan(rs));
+ rs = stmt.executeQuery(query);
+ assertTrue(rs.next());
+ assertEquals("kcoj", rs.getString(1));
+ assertFalse(rs.next());
+ stmt.execute("create local index idx2 on t5(myreverse5(lastname_reverse))");
+ query = "select k,k1,myreverse5(lastname_reverse) from t5 where myreverse5(lastname_reverse)='kcoj'";
+ rs = stmt.executeQuery("explain " + query);
+ assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER _LOCAL_IDX_T5 [-32768,'kcoj']\n"
+ + " SERVER FILTER BY FIRST KEY ONLY\n"
+ +"CLIENT MERGE SORT", QueryUtil.getExplainPlan(rs));
+ rs = stmt.executeQuery(query);
+ assertTrue(rs.next());
+ assertEquals(1, rs.getInt(1));
+ assertEquals(1, rs.getInt(2));
+ assertEquals("kcoj", rs.getString(3));
+ assertFalse(rs.next());
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ try {
+ destroyDriver(driver);
+ } finally {
+ util.shutdownMiniCluster();
+ }
+ }
+
+ /**
+ * Compiles the test class with bogus code into a .class file.
+ */
+ private static void compileTestClass(String className, String program, int counter) throws Exception {
+ String javaFileName = className+".java";
+ File javaFile = new File(javaFileName);
+ String classFileName = className+".class";
+ File classFile = new File(classFileName);
+ String jarName = "myjar"+counter+".jar";
+ String jarPath = "." + File.separator + jarName;
+ File jarFile = new File(jarPath);
+ try {
+ String packageName = "org.apache.phoenix.end2end";
+ FileOutputStream fos = new FileOutputStream(javaFileName);
+ fos.write(program.getBytes());
+ fos.close();
+
+ JavaCompiler jc = ToolProvider.getSystemJavaCompiler();
+ int result = jc.run(null, null, null, javaFileName);
+ assertEquals(0, result);
+
+ Manifest manifest = new Manifest();
+ manifest.getMainAttributes().put(Attributes.Name.MANIFEST_VERSION, "1.0");
+ FileOutputStream jarFos = new FileOutputStream(jarPath);
+ JarOutputStream jarOutputStream = new JarOutputStream(jarFos, manifest);
+ String pathToAdd =packageName.replace('.', File.separatorChar)
+ + File.separator;
+ jarOutputStream.putNextEntry(new JarEntry(pathToAdd));
+ jarOutputStream.closeEntry();
+ jarOutputStream.putNextEntry(new JarEntry(pathToAdd + classFile.getName()));
+ byte[] allBytes = new byte[(int) classFile.length()];
+ FileInputStream fis = new FileInputStream(classFile);
+ fis.read(allBytes);
+ fis.close();
+ jarOutputStream.write(allBytes);
+ jarOutputStream.closeEntry();
+ jarOutputStream.close();
+ jarFos.close();
+
+ assertTrue(jarFile.exists());
+
+ InputStream inputStream = new BufferedInputStream(new FileInputStream(jarPath));
+ FileSystem fs = util.getDefaultRootDirPath().getFileSystem(util.getConfiguration());
+ Path jarsLocation = new Path(util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY));
+ Path myJarPath;
+ if (jarsLocation.toString().endsWith("/")) {
+ myJarPath = new Path(jarsLocation.toString() + jarName);
+ } else {
+ myJarPath = new Path(jarsLocation.toString() + "/" + jarName);
+ }
+ OutputStream outputStream = fs.create(myJarPath);
+ try {
+ IOUtils.copyBytes(inputStream, outputStream, 4096, false);
+ } finally {
+ IOUtils.closeStream(inputStream);
+ IOUtils.closeStream(outputStream);
+ }
+ } finally {
+ if (javaFile != null) javaFile.delete();
+ if (classFile != null) classFile.delete();
+ if (jarFile != null) jarFile.delete();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/antlr3/PhoenixSQL.g
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/antlr3/PhoenixSQL.g b/phoenix-core/src/main/antlr3/PhoenixSQL.g
index f57c5cc..d2bb241 100644
--- a/phoenix-core/src/main/antlr3/PhoenixSQL.g
+++ b/phoenix-core/src/main/antlr3/PhoenixSQL.g
@@ -114,6 +114,14 @@ tokens
ASYNC='async';
SAMPLING='sampling';
UNION='union';
+ FUNCTION='function';
+ AS='as';
+ TEMPORARY='temporary';
+ RETURNS='returns';
+ USING='using';
+ JAR='jar';
+ DEFAULTVALUE='defaultvalue';
+ CONSTANT = 'constant';
}
@@ -144,13 +152,18 @@ import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import java.lang.Boolean;
import java.math.BigDecimal;
import java.util.Arrays;
import java.util.Collections;
import java.util.Stack;
import java.sql.SQLException;
import org.apache.phoenix.expression.function.CountAggregateFunction;
+import org.apache.phoenix.expression.LiteralExpression;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.parse.PFunction;
+import org.apache.phoenix.parse.PFunction.FunctionArgument;
+import org.apache.phoenix.parse.UDFParseNode;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.IllegalDataException;
@@ -206,6 +219,7 @@ package org.apache.phoenix.parse;
private int anonBindNum;
private ParseNodeFactory factory;
private ParseContext.Stack contextStack = new ParseContext.Stack();
+ private Map<String, UDFParseNode> udfParseNodes = new HashMap<String, UDFParseNode>(1);
public void setParseNodeFactory(ParseNodeFactory factory) {
this.factory = factory;
@@ -341,13 +355,25 @@ package org.apache.phoenix.parse;
// Used to incrementally parse a series of semicolon-terminated SQL statement
// Note than unlike the rule below an EOF is not expected at the end.
nextStatement returns [BindableStatement ret]
- : s=oneStatement {$ret = s;} SEMICOLON
+ : s=oneStatement {
+ try {
+ $ret = s;
+ } finally {
+ udfParseNodes.clear();
+ }
+ } SEMICOLON
| EOF
;
// Parses a single SQL statement (expects an EOF after the select statement).
statement returns [BindableStatement ret]
- : s=oneStatement {$ret = s;} EOF
+ : s=oneStatement {
+ try {
+ $ret = s;
+ } finally {
+ udfParseNodes.clear();
+ }
+ } EOF
;
// Parses a select statement which must be the only statement (expects an EOF after the statement).
@@ -369,6 +395,8 @@ oneStatement returns [BindableStatement ret]
| s=alter_index_node
| s=alter_table_node
| s=trace_node
+ | s=create_function_node
+ | s=drop_function_node
| s=alter_session_node
| s=create_sequence_node
| s=drop_sequence_node
@@ -409,7 +437,7 @@ create_index_node returns [CreateIndexStatement ret]
(async=ASYNC)?
(p=fam_properties)?
(SPLIT ON v=value_expression_list)?
- {ret = factory.createIndex(i, factory.namedTable(null,t), ik, icrefs, v, p, ex!=null, l==null ? IndexType.getDefault() : IndexType.LOCAL, async != null, getBindCount()); }
+ {ret = factory.createIndex(i, factory.namedTable(null,t), ik, icrefs, v, p, ex!=null, l==null ? IndexType.getDefault() : IndexType.LOCAL, async != null, getBindCount(), new HashMap<String, UDFParseNode>(udfParseNodes)); }
;
// Parse a create sequence statement.
@@ -510,6 +538,25 @@ trace_node returns [TraceStatement ret]
{ret = factory.trace(Tracing.isTraceOn(flag.getText()), s == null ? Tracing.isTraceOn(flag.getText()) ? 1.0 : 0.0 : (((BigDecimal)s.getValue())).doubleValue());}
;
+// Parse a trace statement.
+create_function_node returns [CreateFunctionStatement ret]
+ : CREATE (temp=TEMPORARY)? FUNCTION function=identifier
+ (LPAREN args=zero_or_more_data_types RPAREN)
+ RETURNS r=identifier AS (className= jar_path)
+ (USING JAR (jarPath = jar_path))?
+ {
+ $ret = factory.createFunction(new PFunction(SchemaUtil.normalizeIdentifier(function), args,r,(String)className.getValue(), jarPath == null ? null : (String)jarPath.getValue()), temp!=null);;
+ }
+ ;
+
+jar_path returns [LiteralParseNode ret]
+ : l=literal { $ret = l; }
+ ;
+
+drop_function_node returns [DropFunctionStatement ret]
+ : DROP FUNCTION (IF ex=EXISTS)? function=identifier {$ret = factory.dropFunction(SchemaUtil.normalizeIdentifier(function), ex!=null);}
+ ;
+
// Parse an alter session statement.
alter_session_node returns [AlterSessionStatement ret]
: ALTER SESSION (SET p=properties)
@@ -586,7 +633,7 @@ single_select returns [SelectStatement ret]
(WHERE where=expression)?
(GROUP BY group=group_by)?
(HAVING having=expression)?
- { ParseContext context = contextStack.peek(); $ret = factory.select(from, h, d!=null, sel, where, group, having, null, null, getBindCount(), context.isAggregate(), context.hasSequences(), null); }
+ { ParseContext context = contextStack.peek(); $ret = factory.select(from, h, d!=null, sel, where, group, having, null, null, getBindCount(), context.isAggregate(), context.hasSequences(), null, new HashMap<String,UDFParseNode>(udfParseNodes)); }
;
finally{ contextStack.pop(); }
@@ -610,7 +657,7 @@ upsert_node returns [UpsertStatement ret]
: UPSERT (hint=hintClause)? INTO t=from_table_name
(LPAREN p=upsert_column_refs RPAREN)?
((VALUES LPAREN v=one_or_more_expressions RPAREN) | s=select_node)
- {ret = factory.upsert(factory.namedTable(null,t,p == null ? null : p.getFirst()), hint, p == null ? null : p.getSecond(), v, s, getBindCount()); }
+ {ret = factory.upsert(factory.namedTable(null,t,p == null ? null : p.getFirst()), hint, p == null ? null : p.getSecond(), v, s, getBindCount(), new HashMap<String, UDFParseNode>(udfParseNodes)); }
;
upsert_column_refs returns [Pair<List<ColumnDef>,List<ColumnName>> ret]
@@ -625,7 +672,7 @@ delete_node returns [DeleteStatement ret]
(WHERE v=expression)?
(ORDER BY order=order_by)?
(LIMIT l=limit)?
- {ret = factory.delete(factory.namedTable(null,t), hint, v, order, l, getBindCount()); }
+ {ret = factory.delete(factory.namedTable(null,t), hint, v, order, l, getBindCount(), new HashMap<String, UDFParseNode>(udfParseNodes)); }
;
limit returns [LimitNode ret]
@@ -813,17 +860,19 @@ term returns [ParseNode ret]
if (!contextStack.isEmpty()) {
contextStack.peek().setAggregate(f.isAggregate());
}
+ if(f instanceof UDFParseNode) udfParseNodes.put(f.getName(),(UDFParseNode)f);
$ret = f;
}
| field=identifier LPAREN t=ASTERISK RPAREN
{
if (!isCountFunction(field)) {
- throwRecognitionException(t);
+ throwRecognitionException(t);
}
FunctionParseNode f = factory.function(field, LiteralParseNode.STAR);
if (!contextStack.isEmpty()) {
contextStack.peek().setAggregate(f.isAggregate());
}
+ if(f instanceof UDFParseNode) udfParseNodes.put(f.getName(),(UDFParseNode)f);
$ret = f;
}
| field=identifier LPAREN t=DISTINCT l=zero_or_more_expressions RPAREN
@@ -832,6 +881,7 @@ term returns [ParseNode ret]
if (!contextStack.isEmpty()) {
contextStack.peek().setAggregate(f.isAggregate());
}
+ if(f instanceof UDFParseNode) udfParseNodes.put(f.getName(),(UDFParseNode)f);
$ret = f;
}
| e=case_statement { $ret = e; }
@@ -865,6 +915,19 @@ zero_or_more_expressions returns [List<ParseNode> ret]
: (v = expression {$ret.add(v);})? (COMMA v = expression {$ret.add(v);} )*
;
+zero_or_more_data_types returns [List<FunctionArgument> ret]
+@init{ret = new ArrayList<FunctionArgument>(); }
+ : (dt = identifier (LPAREN l=NUMBER (COMMA s=NUMBER)? RPAREN)? ar=ARRAY? (lsq=LSQUARE (a=NUMBER)? RSQUARE)? (c = CONSTANT)? (DEFAULTVALUE EQ dv = value_expression)? (MINVALUE EQ minv = value_expression)? (MAXVALUE EQ maxv = value_expression)?
+ {$ret.add(new FunctionArgument(dt, ar != null || lsq != null, c!=null,
+ dv == null ? null : LiteralExpression.newConstant(((LiteralParseNode)dv).getValue()),
+ minv == null ? null : LiteralExpression.newConstant(((LiteralParseNode)minv).getValue()),
+ maxv == null ? null : LiteralExpression.newConstant(((LiteralParseNode)maxv).getValue())));})? (COMMA (dt = identifier (LPAREN l=NUMBER (COMMA s=NUMBER)? RPAREN)? ar=ARRAY? (lsq=LSQUARE (a=NUMBER)? RSQUARE)? (c = CONSTANT)? (DEFAULTVALUE EQ dv = value_expression)? (MINVALUE EQ minv = value_expression)? (MAXVALUE EQ maxv = value_expression)?
+ {$ret.add(new FunctionArgument(dt, ar != null || lsq != null, c!=null,
+ dv == null ? null : LiteralExpression.newConstant(((LiteralParseNode)dv).getValue()),
+ minv == null ? null : LiteralExpression.newConstant(((LiteralParseNode)minv).getValue()),
+ maxv == null ? null : LiteralExpression.newConstant(((LiteralParseNode)maxv).getValue())));} ))*
+;
+
value_expression_list returns [List<ParseNode> ret]
@init{ret = new ArrayList<ParseNode>(); }
: LPAREN e = value_expression {$ret.add(e);} (COMMA e = value_expression {$ret.add(e);} )* RPAREN
http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java
index fcef0ec..643112d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java
@@ -32,8 +32,10 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.memory.ChildMemoryManager;
import org.apache.phoenix.memory.GlobalMemoryManager;
+import org.apache.phoenix.parse.PFunction;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PMetaDataEntity;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.util.SizedUtil;
@@ -57,17 +59,17 @@ public class GlobalCache extends TenantCacheImpl {
// TODO: Use Guava cache with auto removal after lack of access
private final ConcurrentMap<ImmutableBytesWritable,TenantCache> perTenantCacheMap = new ConcurrentHashMap<ImmutableBytesWritable,TenantCache>();
// Cache for lastest PTable for a given Phoenix table
- private Cache<ImmutableBytesPtr,PTable> metaDataCache;
+ private Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache;
public void clearTenantCache() {
perTenantCacheMap.clear();
}
- public Cache<ImmutableBytesPtr,PTable> getMetaDataCache() {
+ public Cache<ImmutableBytesPtr,PMetaDataEntity> getMetaDataCache() {
// Lazy initialize QueryServices so that we only attempt to create an HBase Configuration
// object upon the first attempt to connect to any cluster. Otherwise, an attempt will be
// made at driver initialization time which is too early for some systems.
- Cache<ImmutableBytesPtr,PTable> result = metaDataCache;
+ Cache<ImmutableBytesPtr,PMetaDataEntity> result = metaDataCache;
if (result == null) {
synchronized(this) {
result = metaDataCache;
@@ -82,9 +84,9 @@ public class GlobalCache extends TenantCacheImpl {
metaDataCache = result = CacheBuilder.newBuilder()
.maximumWeight(maxSize)
.expireAfterAccess(maxTTL, TimeUnit.MILLISECONDS)
- .weigher(new Weigher<ImmutableBytesPtr, PTable>() {
+ .weigher(new Weigher<ImmutableBytesPtr, PMetaDataEntity>() {
@Override
- public int weigh(ImmutableBytesPtr key, PTable table) {
+ public int weigh(ImmutableBytesPtr key, PMetaDataEntity table) {
return SizedUtil.IMMUTABLE_BYTES_PTR_SIZE + key.getLength() + table.getEstimatedSize();
}
})
@@ -157,4 +159,22 @@ public class GlobalCache extends TenantCacheImpl {
}
return tenantCache;
}
+
+ public static class FunctionBytesPtr extends ImmutableBytesPtr {
+
+ public FunctionBytesPtr(byte[] key) {
+ super(key);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if(obj instanceof FunctionBytesPtr) return super.equals(obj);
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return super.hashCode();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/compile/ColumnResolver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ColumnResolver.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ColumnResolver.java
index 7bb210b..55253ea 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ColumnResolver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ColumnResolver.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.compile;
import java.sql.SQLException;
import java.util.List;
+import org.apache.phoenix.parse.PFunction;
import org.apache.phoenix.schema.ColumnRef;
import org.apache.phoenix.schema.TableRef;
@@ -41,6 +42,11 @@ public interface ColumnResolver {
public List<TableRef> getTables();
/**
+ * Returns the collection of resolved functions.
+ */
+ public List<PFunction> getFunctions();
+
+ /**
* Resolves table using name or alias.
* @param schemaName the schema name
* @param tableName the table name or table alias
@@ -60,4 +66,15 @@ public interface ColumnResolver {
* @throws AmbiguousColumnException if the column name is ambiguous
*/
public ColumnRef resolveColumn(String schemaName, String tableName, String colName) throws SQLException;
+
+ /**
+ * Resolves function using functionName.
+ * @param functionName
+ * @return the resolved PFunction
+ * @throws ColumnNotFoundException if the column could not be resolved
+ * @throws AmbiguousColumnException if the column name is ambiguous
+ */
+ public PFunction resolveFunction(String functionName) throws SQLException;
+
+ public boolean hasUDFs();
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateFunctionCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateFunctionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateFunctionCompiler.java
new file mode 100644
index 0000000..2e3a873
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateFunctionCompiler.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import java.sql.ParameterMetaData;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.util.Collections;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.parse.CreateFunctionStatement;
+import org.apache.phoenix.schema.MetaDataClient;
+
+public class CreateFunctionCompiler {
+
+ private final PhoenixStatement statement;
+
+ public CreateFunctionCompiler(PhoenixStatement statement) {
+ this.statement = statement;
+ }
+
+ public MutationPlan compile(final CreateFunctionStatement create) throws SQLException {
+ final PhoenixConnection connection = statement.getConnection();
+ PhoenixConnection connectionToBe = connection;
+ final StatementContext context = new StatementContext(statement);
+ final MetaDataClient client = new MetaDataClient(connectionToBe);
+
+ return new MutationPlan() {
+
+ @Override
+ public ParameterMetaData getParameterMetaData() {
+ return context.getBindManager().getParameterMetaData();
+ }
+
+ @Override
+ public MutationState execute() throws SQLException {
+ try {
+ return client.createFunction(create);
+ } finally {
+ if (client.getConnection() != connection) {
+ client.getConnection().close();
+ }
+ }
+ }
+
+ @Override
+ public ExplainPlan getExplainPlan() throws SQLException {
+ return new ExplainPlan(Collections.singletonList("CREATE FUNCTION"));
+ }
+
+ @Override
+ public PhoenixConnection getConnection() {
+ return connection;
+ }
+
+ @Override
+ public StatementContext getContext() {
+ return context;
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java
index 07d9f56..f1937a5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java
@@ -46,7 +46,7 @@ public class CreateIndexCompiler {
public MutationPlan compile(final CreateIndexStatement create) throws SQLException {
final PhoenixConnection connection = statement.getConnection();
- final ColumnResolver resolver = FromCompiler.getResolver(create, connection);
+ final ColumnResolver resolver = FromCompiler.getResolver(create, connection, create.getUdfParseNodes());
Scan scan = new Scan();
final StatementContext context = new StatementContext(statement, resolver, scan, new SequenceManager(statement));
ExpressionCompiler expressionCompiler = new ExpressionCompiler(context);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index 4f6a719..575f0f3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -323,7 +323,7 @@ public class DeleteCompiler {
hint, false, aliasedNodes, delete.getWhere(),
Collections.<ParseNode>emptyList(), null,
delete.getOrderBy(), delete.getLimit(),
- delete.getBindCount(), false, false, Collections.<SelectStatement>emptyList());
+ delete.getBindCount(), false, false, Collections.<SelectStatement>emptyList(), delete.getUdfParseNodes());
select = StatementNormalizer.normalize(select, resolver);
SelectStatement transformedSelect = SubqueryRewriter.transform(select, resolver, connection);
if (transformedSelect != select) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/compile/ExpressionCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ExpressionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ExpressionCompiler.java
index ab6b851..92899a6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ExpressionCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ExpressionCompiler.java
@@ -17,6 +17,7 @@
*/
package org.apache.phoenix.compile;
+
import java.math.BigDecimal;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
@@ -70,6 +71,7 @@ import org.apache.phoenix.expression.function.ArrayAnyComparisonExpression;
import org.apache.phoenix.expression.function.ArrayElemRefExpression;
import org.apache.phoenix.expression.function.RoundDecimalExpression;
import org.apache.phoenix.expression.function.RoundTimestampExpression;
+import org.apache.phoenix.expression.function.UDFExpression;
import org.apache.phoenix.parse.AddParseNode;
import org.apache.phoenix.parse.AndParseNode;
import org.apache.phoenix.parse.ArithmeticParseNode;
@@ -95,12 +97,14 @@ import org.apache.phoenix.parse.ModulusParseNode;
import org.apache.phoenix.parse.MultiplyParseNode;
import org.apache.phoenix.parse.NotParseNode;
import org.apache.phoenix.parse.OrParseNode;
+import org.apache.phoenix.parse.PFunction;
import org.apache.phoenix.parse.ParseNode;
import org.apache.phoenix.parse.RowValueConstructorParseNode;
import org.apache.phoenix.parse.SequenceValueParseNode;
import org.apache.phoenix.parse.StringConcatParseNode;
import org.apache.phoenix.parse.SubqueryParseNode;
import org.apache.phoenix.parse.SubtractParseNode;
+import org.apache.phoenix.parse.UDFParseNode;
import org.apache.phoenix.parse.UnsupportedAllParseNodeVisitor;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
@@ -313,8 +317,19 @@ public class ExpressionCompiler extends UnsupportedAllParseNodeVisitor<Expressio
* @param children the child expression arguments to the function expression node.
*/
public Expression visitLeave(FunctionParseNode node, List<Expression> children) throws SQLException {
+ PFunction function = null;
+ if(node instanceof UDFParseNode) {
+ function = context.getResolver().resolveFunction(node.getName());
+ BuiltInFunctionInfo info = new BuiltInFunctionInfo(function);
+ node = new UDFParseNode(node.getName(), node.getChildren(), info);
+ }
children = node.validate(children, context);
- Expression expression = node.create(children, context);
+ Expression expression = null;
+ if (function == null) {
+ expression = node.create(children, context);
+ } else {
+ expression = node.create(children, function, context);
+ }
ImmutableBytesWritable ptr = context.getTempPtr();
BuiltInFunctionInfo info = node.getInfo();
for (int i = 0; i < info.getRequiredArgCount(); i++) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
index da78b24..5fe0e6f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
@@ -22,9 +22,11 @@ import java.sql.SQLFeatureNotSupportedException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.HTableInterface;
@@ -35,12 +37,15 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.parse.AliasedNode;
import org.apache.phoenix.parse.BindTableNode;
import org.apache.phoenix.parse.ColumnDef;
+import org.apache.phoenix.parse.CreateFunctionStatement;
import org.apache.phoenix.parse.CreateTableStatement;
import org.apache.phoenix.parse.DMLStatement;
import org.apache.phoenix.parse.DerivedTableNode;
import org.apache.phoenix.parse.FamilyWildcardParseNode;
import org.apache.phoenix.parse.JoinTableNode;
+import org.apache.phoenix.parse.NamedNode;
import org.apache.phoenix.parse.NamedTableNode;
+import org.apache.phoenix.parse.PFunction;
import org.apache.phoenix.parse.ParseNode;
import org.apache.phoenix.parse.ParseNodeFactory;
import org.apache.phoenix.parse.SelectStatement;
@@ -49,6 +54,7 @@ import org.apache.phoenix.parse.TableName;
import org.apache.phoenix.parse.TableNode;
import org.apache.phoenix.parse.TableNodeVisitor;
import org.apache.phoenix.parse.TableWildcardParseNode;
+import org.apache.phoenix.parse.UDFParseNode;
import org.apache.phoenix.parse.WildcardParseNode;
import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.QueryConstants;
@@ -57,6 +63,7 @@ import org.apache.phoenix.schema.AmbiguousTableException;
import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
import org.apache.phoenix.schema.ColumnNotFoundException;
import org.apache.phoenix.schema.ColumnRef;
+import org.apache.phoenix.schema.FunctionNotFoundException;
import org.apache.phoenix.schema.MetaDataClient;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PColumnFamily;
@@ -103,6 +110,11 @@ public class FromCompiler {
}
@Override
+ public List<PFunction> getFunctions() {
+ return Collections.emptyList();
+ }
+
+ @Override
public TableRef resolveTable(String schemaName, String tableName)
throws SQLException {
throw new UnsupportedOperationException();
@@ -112,6 +124,14 @@ public class FromCompiler {
public ColumnRef resolveColumn(String schemaName, String tableName, String colName) throws SQLException {
throw new UnsupportedOperationException();
}
+
+ public PFunction resolveFunction(String functionName) throws SQLException {
+ throw new UnsupportedOperationException();
+ };
+
+ public boolean hasUDFs() {
+ return false;
+ };
};
public static ColumnResolver getResolverForCreation(final CreateTableStatement statement, final PhoenixConnection connection)
@@ -141,7 +161,7 @@ public class FromCompiler {
if (htable != null) Closeables.closeQuietly(htable);
}
tableNode = NamedTableNode.create(null, baseTable, statement.getColumnDefs());
- return new SingleTableColumnResolver(connection, tableNode, e.getTimeStamp());
+ return new SingleTableColumnResolver(connection, tableNode, e.getTimeStamp(), new HashMap<String, UDFParseNode>(1));
}
throw e;
}
@@ -166,9 +186,9 @@ public class FromCompiler {
if (fromNode == null)
return EMPTY_TABLE_RESOLVER;
if (fromNode instanceof NamedTableNode)
- return new SingleTableColumnResolver(connection, (NamedTableNode) fromNode, true, 1);
+ return new SingleTableColumnResolver(connection, (NamedTableNode) fromNode, true, 1, statement.getUdfParseNodes());
- MultiTableColumnResolver visitor = new MultiTableColumnResolver(connection, 1);
+ MultiTableColumnResolver visitor = new MultiTableColumnResolver(connection, 1, statement.getUdfParseNodes());
fromNode.accept(visitor);
return visitor;
}
@@ -178,12 +198,24 @@ public class FromCompiler {
return visitor;
}
+ public static ColumnResolver getResolver(NamedTableNode tableNode, PhoenixConnection connection, Map<String, UDFParseNode> udfParseNodes) throws SQLException {
+ SingleTableColumnResolver visitor =
+ new SingleTableColumnResolver(connection, tableNode, true, 0, udfParseNodes);
+ return visitor;
+ }
+
public static ColumnResolver getResolver(SingleTableStatement statement, PhoenixConnection connection)
throws SQLException {
SingleTableColumnResolver visitor = new SingleTableColumnResolver(connection, statement.getTable(), true);
return visitor;
}
+ public static ColumnResolver getResolver(SingleTableStatement statement, PhoenixConnection connection, Map<String, UDFParseNode> udfParseNodes)
+ throws SQLException {
+ SingleTableColumnResolver visitor = new SingleTableColumnResolver(connection, statement.getTable(), true, 0, udfParseNodes);
+ return visitor;
+ }
+
public static ColumnResolver getResolverForCompiledDerivedTable(PhoenixConnection connection, TableRef tableRef, RowProjector projector)
throws SQLException {
List<PColumn> projectedColumns = new ArrayList<PColumn>();
@@ -205,26 +237,32 @@ public class FromCompiler {
return visitor;
}
+ public static ColumnResolver getResolver(PhoenixConnection connection, TableRef tableRef, Map<String, UDFParseNode> udfParseNodes)
+ throws SQLException {
+ SingleTableColumnResolver visitor = new SingleTableColumnResolver(connection, tableRef, udfParseNodes);
+ return visitor;
+ }
+
public static ColumnResolver getResolverForMutation(DMLStatement statement, PhoenixConnection connection)
throws SQLException {
/*
* We validate the meta data at commit time for mutations, as this allows us to do many UPSERT VALUES calls
* without hitting the server each time to check if the meta data is up-to-date.
*/
- SingleTableColumnResolver visitor = new SingleTableColumnResolver(connection, statement.getTable(), false);
+ SingleTableColumnResolver visitor = new SingleTableColumnResolver(connection, statement.getTable(), false, 0, statement.getUdfParseNodes());
return visitor;
}
- public static ColumnResolver getResolverForProjectedTable(PTable projectedTable) {
- return new ProjectedTableColumnResolver(projectedTable);
+ public static ColumnResolver getResolverForProjectedTable(PTable projectedTable, PhoenixConnection connection, Map<String, UDFParseNode> udfParseNodes) throws SQLException {
+ return new ProjectedTableColumnResolver(projectedTable, connection, udfParseNodes);
}
private static class SingleTableColumnResolver extends BaseColumnResolver {
private final List<TableRef> tableRefs;
private final String alias;
- public SingleTableColumnResolver(PhoenixConnection connection, NamedTableNode table, long timeStamp) throws SQLException {
- super(connection, 0);
+ public SingleTableColumnResolver(PhoenixConnection connection, NamedTableNode table, long timeStamp, Map<String, UDFParseNode> udfParseNodes) throws SQLException {
+ super(connection, 0, false, udfParseNodes);
List<PColumnFamily> families = Lists.newArrayListWithExpectedSize(table.getDynamicColumns().size());
for (ColumnDef def : table.getDynamicColumns()) {
if (def.getColumnDefName().getFamilyName() != null) {
@@ -239,11 +277,13 @@ public class FromCompiler {
}
public SingleTableColumnResolver(PhoenixConnection connection, NamedTableNode tableNode, boolean updateCacheImmediately) throws SQLException {
- this(connection, tableNode, updateCacheImmediately, 0);
+ this(connection, tableNode, updateCacheImmediately, 0, new HashMap<String,UDFParseNode>(1));
}
- public SingleTableColumnResolver(PhoenixConnection connection, NamedTableNode tableNode, boolean updateCacheImmediately, int tsAddition) throws SQLException {
- super(connection, tsAddition);
+ public SingleTableColumnResolver(PhoenixConnection connection, NamedTableNode tableNode,
+ boolean updateCacheImmediately, int tsAddition,
+ Map<String, UDFParseNode> udfParseNodes) throws SQLException {
+ super(connection, tsAddition, updateCacheImmediately, udfParseNodes);
alias = tableNode.getAlias();
TableRef tableRef = createTableRef(tableNode, updateCacheImmediately);
tableRefs = ImmutableList.of(tableRef);
@@ -255,6 +295,12 @@ public class FromCompiler {
tableRefs = ImmutableList.of(tableRef);
}
+ public SingleTableColumnResolver(PhoenixConnection connection, TableRef tableRef, Map<String, UDFParseNode> udfParseNodes) throws SQLException {
+ super(connection, 0, false, udfParseNodes);
+ alias = tableRef.getTableAlias();
+ tableRefs = ImmutableList.of(tableRef);
+ }
+
public SingleTableColumnResolver(TableRef tableRef) throws SQLException {
super(null, 0);
alias = tableRef.getTableAlias();
@@ -267,6 +313,11 @@ public class FromCompiler {
}
@Override
+ public List<PFunction> getFunctions() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public TableRef resolveTable(String schemaName, String tableName)
throws SQLException {
TableRef tableRef = tableRefs.get(0);
@@ -316,7 +367,6 @@ public class FromCompiler {
: tableRef.getTable().getColumn(colName);
return new ColumnRef(tableRef, column.getPosition());
}
-
}
private static abstract class BaseColumnResolver implements ColumnResolver {
@@ -326,11 +376,30 @@ public class FromCompiler {
// on Windows because the millis timestamp granularity is so bad we sometimes won't
// get the data back that we just upsert.
private final int tsAddition;
+ protected final Map<String, PFunction> functionMap;
+ protected List<PFunction> functions;
private BaseColumnResolver(PhoenixConnection connection, int tsAddition) {
+ this.connection = connection;
+ this.client = connection == null ? null : new MetaDataClient(connection);
+ this.tsAddition = tsAddition;
+ functionMap = new HashMap<String, PFunction>(1);
+ this.functions = Collections.<PFunction>emptyList();
+ }
+
+ private BaseColumnResolver(PhoenixConnection connection, int tsAddition, boolean updateCacheImmediately, Map<String, UDFParseNode> udfParseNodes) throws SQLException {
this.connection = connection;
this.client = connection == null ? null : new MetaDataClient(connection);
this.tsAddition = tsAddition;
+ functionMap = new HashMap<String, PFunction>(1);
+ if (udfParseNodes.isEmpty()) {
+ functions = Collections.<PFunction> emptyList();
+ } else {
+ functions = createFunctionRef(new ArrayList<String>(udfParseNodes.keySet()), updateCacheImmediately);
+ for (PFunction function : functions) {
+ functionMap.put(function.getFunctionName(), function);
+ }
+ }
}
protected TableRef createTableRef(NamedTableNode tableNode, boolean updateCacheImmediately) throws SQLException {
@@ -383,6 +452,85 @@ public class FromCompiler {
return tableRef;
}
+ @Override
+ public List<PFunction> getFunctions() {
+ return functions;
+ }
+
+ private List<PFunction> createFunctionRef(List<String> functionNames, boolean updateCacheImmediately) throws SQLException {
+ long timeStamp = QueryConstants.UNSET_TIMESTAMP;
+ int numFunctions = functionNames.size();
+ List<PFunction> functionsFound = new ArrayList<PFunction>(functionNames.size());
+ if (updateCacheImmediately || connection.getAutoCommit()) {
+ getFunctionFromCache(functionNames, functionsFound, true);
+ if(functionNames.isEmpty()) {
+ return functionsFound;
+ }
+ MetaDataMutationResult result = client.updateCache(functionNames);
+ timeStamp = result.getMutationTime();
+ functionsFound = result.getFunctions();
+ if(functionNames.size() != functionsFound.size()){
+ throw new FunctionNotFoundException("Some of the functions in "+functionNames.toString()+" are not found");
+ }
+ } else {
+ getFunctionFromCache(functionNames, functionsFound, false);
+ // We always attempt to update the cache in the event of a FunctionNotFoundException
+ MetaDataMutationResult result = null;
+ if (!functionNames.isEmpty()) {
+ result = client.updateCache(functionNames);
+ }
+ if(result!=null) {
+ if (!result.getFunctions().isEmpty()) {
+ functionsFound.addAll(result.getFunctions());
+ }
+ if(result.wasUpdated()) {
+ timeStamp = result.getMutationTime();
+ }
+ }
+ if (functionsFound.size()!=numFunctions) {
+ throw new FunctionNotFoundException("Some of the functions in "+functionNames.toString()+" are not found", timeStamp);
+ }
+ }
+ if (timeStamp != QueryConstants.UNSET_TIMESTAMP) {
+ timeStamp += tsAddition;
+ }
+
+ if (logger.isDebugEnabled() && timeStamp != QueryConstants.UNSET_TIMESTAMP) {
+ logger.debug(LogUtil.addCustomAnnotations("Re-resolved stale function " + functionNames.toString() + "at timestamp " + timeStamp, connection));
+ }
+ return functionsFound;
+ }
+
+ private void getFunctionFromCache(List<String> functionNames,
+ List<PFunction> functionsFound, boolean getOnlyTemporyFunctions) {
+ Iterator<String> iterator = functionNames.iterator();
+ while(iterator.hasNext()) {
+ PFunction function = null;
+ String functionName = iterator.next();
+ try {
+ function = connection.getMetaDataCache().getFunction(new PTableKey(connection.getTenantId(), functionName));
+ } catch (FunctionNotFoundException e1) {
+ if (connection.getTenantId() != null) { // Check with null tenantId next
+ try {
+ function = connection.getMetaDataCache().getFunction(new PTableKey(null, functionName));
+ } catch (FunctionNotFoundException e2) {
+ }
+ }
+ }
+ if (function != null) {
+ if (getOnlyTemporyFunctions) {
+ if (function.isTemporaryFunction()) {
+ functionsFound.add(function);
+ iterator.remove();
+ }
+ } else {
+ functionsFound.add(function);
+ iterator.remove();
+ }
+ }
+ }
+ }
+
protected PTable addDynamicColumns(List<ColumnDef> dynColumns, PTable theTable)
throws SQLException {
if (!dynColumns.isEmpty()) {
@@ -409,6 +557,20 @@ public class FromCompiler {
}
return theTable;
}
+
+ @Override
+ public PFunction resolveFunction(String functionName) throws SQLException {
+ PFunction function = functionMap.get(functionName);
+ if(function == null) {
+ throw new FunctionNotFoundException(functionName);
+ }
+ return function;
+ }
+
+ @Override
+ public boolean hasUDFs() {
+ return !functions.isEmpty();
+ }
}
private static class MultiTableColumnResolver extends BaseColumnResolver implements TableNodeVisitor<Void> {
@@ -421,6 +583,12 @@ public class FromCompiler {
tables = Lists.newArrayList();
}
+ private MultiTableColumnResolver(PhoenixConnection connection, int tsAddition, Map<String, UDFParseNode> udfParseNodes) throws SQLException {
+ super(connection, tsAddition, false, udfParseNodes);
+ tableMap = ArrayListMultimap.<String, TableRef> create();
+ tables = Lists.newArrayList();
+ }
+
@Override
public List<TableRef> getTables() {
return tables;
@@ -580,16 +748,14 @@ public class FromCompiler {
}
}
}
-
}
private static class ProjectedTableColumnResolver extends MultiTableColumnResolver {
private final boolean isLocalIndex;
private final List<TableRef> theTableRefs;
private final Map<ColumnRef, Integer> columnRefMap;
-
- private ProjectedTableColumnResolver(PTable projectedTable) {
- super(null, 0);
+ private ProjectedTableColumnResolver(PTable projectedTable, PhoenixConnection conn, Map<String, UDFParseNode> udfParseNodes) throws SQLException {
+ super(conn, 0, udfParseNodes);
Preconditions.checkArgument(projectedTable.getType() == PTableType.PROJECTED);
this.isLocalIndex = projectedTable.getIndexType() == IndexType.LOCAL;
this.columnRefMap = new HashMap<ColumnRef, Integer>();
@@ -615,6 +781,7 @@ public class FromCompiler {
this.columnRefMap.put(new ColumnRef(tableRef, colRef.getColumnPosition()), column.getPosition());
}
this.theTableRefs = ImmutableList.of(new TableRef(ParseNodeFactory.createTempAlias(), projectedTable, ts, false));
+
}
@Override