You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ra...@apache.org on 2015/04/27 10:33:46 UTC
[2/7] phoenix git commit: PHOENIX-538 Support UDFs(Rajeshbabu
Chintaguntla)
http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
index 5eb641e..291c84c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
@@ -22,6 +22,7 @@ import java.math.BigDecimal;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
@@ -87,11 +88,11 @@ public class ParseNodeFactory {
*
* @since 0.1
*/
- private static class BuiltInFunctionKey {
+ public static class BuiltInFunctionKey {
private final String upperName;
private final int argCount;
- private BuiltInFunctionKey(String lowerName, int argCount) {
+ public BuiltInFunctionKey(String lowerName, int argCount) {
this.upperName = lowerName;
this.argCount = argCount;
}
@@ -180,9 +181,6 @@ public class ParseNodeFactory {
public static BuiltInFunctionInfo get(String normalizedName, List<ParseNode> children) {
initBuiltInFunctionMap();
BuiltInFunctionInfo info = BUILT_IN_FUNCTION_MAP.get(new BuiltInFunctionKey(normalizedName,children.size()));
- if (info == null) {
- throw new UnknownFunctionException(normalizedName);
- }
return info;
}
@@ -288,8 +286,8 @@ public class ParseNodeFactory {
return new CreateTableStatement(tableName, props, columns, pkConstraint, splits, tableType, ifNotExists, baseTableName, tableTypeIdNode, bindCount);
}
- public CreateIndexStatement createIndex(NamedNode indexName, NamedTableNode dataTable, IndexKeyConstraint ikConstraint, List<ColumnName> includeColumns, List<ParseNode> splits, ListMultimap<String,Pair<String,Object>> props, boolean ifNotExists, IndexType indexType,boolean async, int bindCount) {
- return new CreateIndexStatement(indexName, dataTable, ikConstraint, includeColumns, splits, props, ifNotExists, indexType, async, bindCount);
+ public CreateIndexStatement createIndex(NamedNode indexName, NamedTableNode dataTable, IndexKeyConstraint ikConstraint, List<ColumnName> includeColumns, List<ParseNode> splits, ListMultimap<String,Pair<String,Object>> props, boolean ifNotExists, IndexType indexType,boolean async, int bindCount, Map<String, UDFParseNode> udfParseNodes) {
+ return new CreateIndexStatement(indexName, dataTable, ikConstraint, includeColumns, splits, props, ifNotExists, indexType, async, bindCount, udfParseNodes);
}
public CreateSequenceStatement createSequence(TableName tableName, ParseNode startsWith,
@@ -299,6 +297,14 @@ public class ParseNodeFactory {
maxValue, cycle, ifNotExits, bindCount);
}
+ public CreateFunctionStatement createFunction(PFunction functionInfo, boolean temporary) {
+ return new CreateFunctionStatement(functionInfo, temporary);
+ }
+
+ public DropFunctionStatement dropFunction(String functionName, boolean ifExists) {
+ return new DropFunctionStatement(functionName, ifExists);
+ }
+
public DropSequenceStatement dropSequence(TableName tableName, boolean ifExits, int bindCount){
return new DropSequenceStatement(tableName, ifExits, bindCount);
}
@@ -388,6 +394,9 @@ public class ParseNodeFactory {
public FunctionParseNode function(String name, List<ParseNode> args) {
BuiltInFunctionInfo info = getInfo(name, args);
+ if (info == null) {
+ return new UDFParseNode(name, args, info);
+ }
Constructor<? extends FunctionParseNode> ctor = info.getNodeCtor();
if (ctor == null) {
return info.isAggregate()
@@ -411,6 +420,9 @@ public class ParseNodeFactory {
args.addAll(valueNodes);
BuiltInFunctionInfo info = getInfo(name, args);
+ if(info==null) {
+ return new UDFParseNode(name,args,info);
+ }
Constructor<? extends FunctionParseNode> ctor = info.getNodeCtor();
if (ctor == null) {
return new AggregateFunctionWithinGroupParseNode(name, args, info);
@@ -657,100 +669,100 @@ public class ParseNodeFactory {
public SelectStatement select(TableNode from, HintNode hint, boolean isDistinct, List<AliasedNode> select, ParseNode where,
List<ParseNode> groupBy, ParseNode having, List<OrderByNode> orderBy, LimitNode limit, int bindCount, boolean isAggregate,
- boolean hasSequence, List<SelectStatement> selects) {
+ boolean hasSequence, List<SelectStatement> selects, Map<String, UDFParseNode> udfParseNodes) {
return new SelectStatement(from, hint, isDistinct, select, where, groupBy == null ? Collections.<ParseNode>emptyList() : groupBy, having,
- orderBy == null ? Collections.<OrderByNode>emptyList() : orderBy, limit, bindCount, isAggregate, hasSequence, selects == null ? Collections.<SelectStatement>emptyList() : selects);
+ orderBy == null ? Collections.<OrderByNode>emptyList() : orderBy, limit, bindCount, isAggregate, hasSequence, selects == null ? Collections.<SelectStatement>emptyList() : selects, udfParseNodes);
}
- public UpsertStatement upsert(NamedTableNode table, HintNode hint, List<ColumnName> columns, List<ParseNode> values, SelectStatement select, int bindCount) {
- return new UpsertStatement(table, hint, columns, values, select, bindCount);
+ public UpsertStatement upsert(NamedTableNode table, HintNode hint, List<ColumnName> columns, List<ParseNode> values, SelectStatement select, int bindCount, Map<String, UDFParseNode> udfParseNodes) {
+ return new UpsertStatement(table, hint, columns, values, select, bindCount, udfParseNodes);
}
- public DeleteStatement delete(NamedTableNode table, HintNode hint, ParseNode node, List<OrderByNode> orderBy, LimitNode limit, int bindCount) {
- return new DeleteStatement(table, hint, node, orderBy, limit, bindCount);
+ public DeleteStatement delete(NamedTableNode table, HintNode hint, ParseNode node, List<OrderByNode> orderBy, LimitNode limit, int bindCount, Map<String, UDFParseNode> udfParseNodes) {
+ return new DeleteStatement(table, hint, node, orderBy, limit, bindCount, udfParseNodes);
}
public SelectStatement select(SelectStatement statement, ParseNode where) {
return select(statement.getFrom(), statement.getHint(), statement.isDistinct(), statement.getSelect(), where, statement.getGroupBy(), statement.getHaving(),
- statement.getOrderBy(), statement.getLimit(), statement.getBindCount(), statement.isAggregate(), statement.hasSequence(), statement.getSelects());
+ statement.getOrderBy(), statement.getLimit(), statement.getBindCount(), statement.isAggregate(), statement.hasSequence(), statement.getSelects(), statement.getUdfParseNodes());
}
public SelectStatement select(SelectStatement statement, ParseNode where, ParseNode having) {
return select(statement.getFrom(), statement.getHint(), statement.isDistinct(), statement.getSelect(), where, statement.getGroupBy(), having,
- statement.getOrderBy(), statement.getLimit(), statement.getBindCount(), statement.isAggregate(), statement.hasSequence(), statement.getSelects());
+ statement.getOrderBy(), statement.getLimit(), statement.getBindCount(), statement.isAggregate(), statement.hasSequence(), statement.getSelects(), statement.getUdfParseNodes());
}
public SelectStatement select(SelectStatement statement, List<AliasedNode> select, ParseNode where, List<ParseNode> groupBy, ParseNode having, List<OrderByNode> orderBy) {
return select(statement.getFrom(), statement.getHint(), statement.isDistinct(),
- select, where, groupBy, having, orderBy, statement.getLimit(), statement.getBindCount(), statement.isAggregate(), statement.hasSequence(), statement.getSelects());
+ select, where, groupBy, having, orderBy, statement.getLimit(), statement.getBindCount(), statement.isAggregate(), statement.hasSequence(), statement.getSelects(), statement.getUdfParseNodes());
}
public SelectStatement select(SelectStatement statement, TableNode table) {
return select(table, statement.getHint(), statement.isDistinct(), statement.getSelect(), statement.getWhere(), statement.getGroupBy(),
statement.getHaving(), statement.getOrderBy(), statement.getLimit(), statement.getBindCount(), statement.isAggregate(),
- statement.hasSequence(), statement.getSelects());
+ statement.hasSequence(), statement.getSelects(), statement.getUdfParseNodes());
}
public SelectStatement select(SelectStatement statement, TableNode table, ParseNode where) {
return select(table, statement.getHint(), statement.isDistinct(), statement.getSelect(), where, statement.getGroupBy(),
statement.getHaving(), statement.getOrderBy(), statement.getLimit(), statement.getBindCount(), statement.isAggregate(),
- statement.hasSequence(), statement.getSelects());
+ statement.hasSequence(), statement.getSelects(), statement.getUdfParseNodes());
}
public SelectStatement select(SelectStatement statement, boolean isDistinct, List<AliasedNode> select) {
return select(statement.getFrom(), statement.getHint(), isDistinct, select, statement.getWhere(), statement.getGroupBy(),
statement.getHaving(), statement.getOrderBy(), statement.getLimit(), statement.getBindCount(), statement.isAggregate(),
- statement.hasSequence(), statement.getSelects());
+ statement.hasSequence(), statement.getSelects(), statement.getUdfParseNodes());
}
public SelectStatement select(SelectStatement statement, boolean isDistinct, List<AliasedNode> select, ParseNode where) {
return select(statement.getFrom(), statement.getHint(), isDistinct, select, where, statement.getGroupBy(),
statement.getHaving(), statement.getOrderBy(), statement.getLimit(), statement.getBindCount(), statement.isAggregate(),
- statement.hasSequence(), statement.getSelects());
+ statement.hasSequence(), statement.getSelects(), statement.getUdfParseNodes());
}
public SelectStatement select(SelectStatement statement, boolean isDistinct, List<AliasedNode> select, ParseNode where, List<ParseNode> groupBy, boolean isAggregate) {
return select(statement.getFrom(), statement.getHint(), isDistinct, select, where, groupBy,
statement.getHaving(), statement.getOrderBy(), statement.getLimit(), statement.getBindCount(), isAggregate,
- statement.hasSequence(), statement.getSelects());
+ statement.hasSequence(), statement.getSelects(), statement.getUdfParseNodes());
}
public SelectStatement select(SelectStatement statement, List<OrderByNode> orderBy) {
return select(statement.getFrom(), statement.getHint(), statement.isDistinct(), statement.getSelect(),
statement.getWhere(), statement.getGroupBy(), statement.getHaving(), orderBy, statement.getLimit(),
- statement.getBindCount(), statement.isAggregate(), statement.hasSequence(), statement.getSelects());
+ statement.getBindCount(), statement.isAggregate(), statement.hasSequence(), statement.getSelects(), statement.getUdfParseNodes());
}
public SelectStatement select(SelectStatement statement, HintNode hint) {
return hint == null || hint.isEmpty() ? statement : select(statement.getFrom(), hint, statement.isDistinct(), statement.getSelect(),
statement.getWhere(), statement.getGroupBy(), statement.getHaving(), statement.getOrderBy(), statement.getLimit(),
- statement.getBindCount(), statement.isAggregate(), statement.hasSequence(), statement.getSelects());
+ statement.getBindCount(), statement.isAggregate(), statement.hasSequence(), statement.getSelects(), statement.getUdfParseNodes());
}
public SelectStatement select(SelectStatement statement, HintNode hint, ParseNode where) {
return select(statement.getFrom(), hint, statement.isDistinct(), statement.getSelect(), where, statement.getGroupBy(),
statement.getHaving(), statement.getOrderBy(), statement.getLimit(), statement.getBindCount(), statement.isAggregate(),
- statement.hasSequence(), statement.getSelects());
+ statement.hasSequence(), statement.getSelects(), statement.getUdfParseNodes());
}
public SelectStatement select(SelectStatement statement, List<OrderByNode> orderBy, LimitNode limit, int bindCount, boolean isAggregate) {
return select(statement.getFrom(), statement.getHint(), statement.isDistinct(), statement.getSelect(),
statement.getWhere(), statement.getGroupBy(), statement.getHaving(), orderBy, limit,
- bindCount, isAggregate || statement.isAggregate(), statement.hasSequence(), statement.getSelects());
+ bindCount, isAggregate || statement.isAggregate(), statement.hasSequence(), statement.getSelects(), statement.getUdfParseNodes());
}
public SelectStatement select(SelectStatement statement, LimitNode limit) {
return select(statement.getFrom(), statement.getHint(), statement.isDistinct(), statement.getSelect(),
statement.getWhere(), statement.getGroupBy(), statement.getHaving(), statement.getOrderBy(), limit,
- statement.getBindCount(), statement.isAggregate(), statement.hasSequence(), statement.getSelects());
+ statement.getBindCount(), statement.isAggregate(), statement.hasSequence(), statement.getSelects(), statement.getUdfParseNodes());
}
public SelectStatement select(SelectStatement statement, List<OrderByNode> orderBy, LimitNode limit) {
return select(statement.getFrom(), statement.getHint(), statement.isDistinct(), statement.getSelect(),
statement.getWhere(), statement.getGroupBy(), statement.getHaving(), orderBy, limit,
- statement.getBindCount(), statement.isAggregate(), statement.hasSequence(), statement.getSelects());
+ statement.getBindCount(), statement.isAggregate(), statement.hasSequence(), statement.getSelects(), statement.getUdfParseNodes());
}
public SelectStatement select(List<SelectStatement> statements, List<OrderByNode> orderBy, LimitNode limit, int bindCount, boolean isAggregate) {
@@ -762,8 +774,10 @@ public class ParseNodeFactory {
// it will be done later at compile stage. Empty or different aliases
// are ignored, since they cannot be referred by outer queries.
List<String> aliases = Lists.<String> newArrayList();
+ Map<String, UDFParseNode> udfParseNodes = new HashMap<String, UDFParseNode>(1);
for (int i = 0; i < statements.size() && aliases.isEmpty(); i++) {
SelectStatement subselect = statements.get(i);
+ udfParseNodes.putAll(subselect.getUdfParseNodes());
if (!subselect.hasWildcard()) {
for (AliasedNode aliasedNode : subselect.getSelect()) {
String alias = aliasedNode.getAlias();
@@ -786,7 +800,7 @@ public class ParseNodeFactory {
}
return select(null, HintNode.EMPTY_HINT_NODE, false, aliasedNodes,
- null, null, null, orderBy, limit, bindCount, false, false, statements);
+ null, null, null, orderBy, limit, bindCount, false, false, statements, udfParseNodes);
}
public SubqueryParseNode subquery(SelectStatement select, boolean expectSingleRow) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeRewriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeRewriter.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeRewriter.java
index 4ce893d..e48967b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeRewriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeRewriter.java
@@ -151,7 +151,7 @@ public class ParseNodeRewriter extends TraverseAllParseNodeVisitor<ParseNode> {
return NODE_FACTORY.select(normFrom, statement.getHint(), statement.isDistinct(),
normSelectNodes, normWhere, normGroupByNodes, normHaving, normOrderByNodes,
statement.getLimit(), statement.getBindCount(), statement.isAggregate(), statement.hasSequence(),
- statement.getSelects());
+ statement.getSelects(), statement.getUdfParseNodes());
}
private Map<String, ParseNode> getAliasMap() {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatement.java
index 44b24af..362e98d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatement.java
@@ -20,7 +20,9 @@ package org.apache.phoenix.parse;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import org.apache.phoenix.compile.ColumnResolver;
import org.apache.phoenix.expression.function.CountAggregateFunction;
@@ -42,7 +44,7 @@ public class SelectStatement implements FilterableStatement {
Collections.<AliasedNode>singletonList(new AliasedNode(null, LiteralParseNode.ONE)),
null, Collections.<ParseNode>emptyList(),
null, Collections.<OrderByNode>emptyList(),
- null, 0, false, false, Collections.<SelectStatement>emptyList());
+ null, 0, false, false, Collections.<SelectStatement>emptyList(), new HashMap<String, UDFParseNode>(1));
public static final SelectStatement COUNT_ONE =
new SelectStatement(
null, null, false,
@@ -54,14 +56,14 @@ public class SelectStatement implements FilterableStatement {
new BuiltInFunctionInfo(CountAggregateFunction.class, CountAggregateFunction.class.getAnnotation(BuiltInFunction.class))))),
null, Collections.<ParseNode>emptyList(),
null, Collections.<OrderByNode>emptyList(),
- null, 0, true, false, Collections.<SelectStatement>emptyList());
+ null, 0, true, false, Collections.<SelectStatement>emptyList(), new HashMap<String, UDFParseNode>(1));
public static SelectStatement create(SelectStatement select, HintNode hint) {
if (select.getHint() == hint || hint.isEmpty()) {
return select;
}
return new SelectStatement(select.getFrom(), hint, select.isDistinct(),
select.getSelect(), select.getWhere(), select.getGroupBy(), select.getHaving(),
- select.getOrderBy(), select.getLimit(), select.getBindCount(), select.isAggregate(), select.hasSequence(), select.getSelects());
+ select.getOrderBy(), select.getLimit(), select.getBindCount(), select.isAggregate(), select.hasSequence(), select.getSelects(), select.getUdfParseNodes());
}
public SelectStatement combine(ParseNode where) {
@@ -73,13 +75,13 @@ public class SelectStatement implements FilterableStatement {
}
return new SelectStatement(this.getFrom(), this.getHint(), this.isDistinct(),
this.getSelect(), where, this.getGroupBy(), this.getHaving(),
- this.getOrderBy(), this.getLimit(), this.getBindCount(), this.isAggregate(), this.hasSequence(), this.selects);
+ this.getOrderBy(), this.getLimit(), this.getBindCount(), this.isAggregate(), this.hasSequence(), this.selects, this.udfParseNodes);
}
public static SelectStatement create(SelectStatement select, List<AliasedNode> selects) {
return new SelectStatement(select.getFrom(), select.getHint(), select.isDistinct(),
selects, select.getWhere(), select.getGroupBy(), select.getHaving(),
- select.getOrderBy(), select.getLimit(), select.getBindCount(), select.isAggregate(), select.hasSequence(), select.getSelects());
+ select.getOrderBy(), select.getLimit(), select.getBindCount(), select.isAggregate(), select.hasSequence(), select.getSelects(), select.getUdfParseNodes());
}
// Copy constructor for sub select statements in a union
@@ -87,7 +89,7 @@ public class SelectStatement implements FilterableStatement {
List<OrderByNode> orderBy, LimitNode limit, boolean isAggregate) {
return new SelectStatement(select.getFrom(), select.getHint(), select.isDistinct(),
select.getSelect(), select.getWhere(), select.getGroupBy(), select.getHaving(),
- orderBy, limit, select.getBindCount(), isAggregate, select.hasSequence(), select.getSelects());
+ orderBy, limit, select.getBindCount(), isAggregate, select.hasSequence(), select.getSelects(), select.getUdfParseNodes());
}
private final TableNode fromTable;
@@ -104,6 +106,7 @@ public class SelectStatement implements FilterableStatement {
private final boolean hasSequence;
private final boolean hasWildcard;
private final List<SelectStatement> selects = new ArrayList<SelectStatement>();
+ private final Map<String, UDFParseNode> udfParseNodes;
@Override
public final String toString() {
@@ -216,7 +219,7 @@ public class SelectStatement implements FilterableStatement {
protected SelectStatement(TableNode from, HintNode hint, boolean isDistinct, List<AliasedNode> select,
ParseNode where, List<ParseNode> groupBy, ParseNode having, List<OrderByNode> orderBy, LimitNode limit,
- int bindCount, boolean isAggregate, boolean hasSequence, List<SelectStatement> selects) {
+ int bindCount, boolean isAggregate, boolean hasSequence, List<SelectStatement> selects, Map<String, UDFParseNode> udfParseNodes) {
this.fromTable = from;
this.hint = hint == null ? HintNode.EMPTY_HINT_NODE : hint;
this.isDistinct = isDistinct;
@@ -241,6 +244,7 @@ public class SelectStatement implements FilterableStatement {
if (!selects.isEmpty()) {
this.selects.addAll(selects);
}
+ this.udfParseNodes = udfParseNodes;
}
@Override
@@ -333,4 +337,8 @@ public class SelectStatement implements FilterableStatement {
public boolean hasWildcard() {
return hasWildcard;
}
+
+ public Map<String, UDFParseNode> getUdfParseNodes() {
+ return udfParseNodes;
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/parse/UDFParseNode.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/UDFParseNode.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/UDFParseNode.java
new file mode 100644
index 0000000..c0b972f
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/UDFParseNode.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.parse;
+
+import java.util.List;
+
+public class UDFParseNode extends FunctionParseNode {
+
+ public UDFParseNode(String name, List<ParseNode> children, BuiltInFunctionInfo info) {
+ super(name, children, info);
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/parse/UpsertStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/UpsertStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/UpsertStatement.java
index fc299d1..48698bd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/UpsertStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/UpsertStatement.java
@@ -19,15 +19,18 @@ package org.apache.phoenix.parse;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
-public class UpsertStatement extends DMLStatement {
+public class UpsertStatement extends DMLStatement {
private final List<ColumnName> columns;
private final List<ParseNode> values;
private final SelectStatement select;
private final HintNode hint;
- public UpsertStatement(NamedTableNode table, HintNode hint, List<ColumnName> columns, List<ParseNode> values, SelectStatement select, int bindCount) {
- super(table, bindCount);
+ public UpsertStatement(NamedTableNode table, HintNode hint, List<ColumnName> columns,
+ List<ParseNode> values, SelectStatement select, int bindCount,
+ Map<String, UDFParseNode> udfParseNodes) {
+ super(table, bindCount, udfParseNodes);
this.columns = columns == null ? Collections.<ColumnName>emptyList() : columns;
this.values = values;
this.select = select;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/protobuf/ProtobufUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/protobuf/ProtobufUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/protobuf/ProtobufUtil.java
index c12c64d..f4a60bc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/protobuf/ProtobufUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/protobuf/ProtobufUtil.java
@@ -71,6 +71,16 @@ public class ProtobufUtil {
return getMutations(request.getTableMetadataMutationsList());
}
+ public static List<Mutation> getMutations(MetaDataProtos.DropFunctionRequest request)
+ throws IOException {
+ return getMutations(request.getTableMetadataMutationsList());
+ }
+
+ public static List<Mutation> getMutations(MetaDataProtos.CreateFunctionRequest request)
+ throws IOException {
+ return getMutations(request.getTableMetadataMutationsList());
+ }
+
public static List<Mutation> getMutations(MetaDataProtos.DropTableRequest request)
throws IOException {
return getMutations(request.getTableMetadataMutationsList());
http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
index 09705c6..dc51b10 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
@@ -35,6 +35,7 @@ import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.parse.PFunction;
import org.apache.phoenix.schema.PName;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTableType;
@@ -70,8 +71,10 @@ public interface ConnectionQueryServices extends QueryServices, MetaDataMutated
public PhoenixConnection connect(String url, Properties info) throws SQLException;
public MetaDataMutationResult getTable(PName tenantId, byte[] schemaName, byte[] tableName, long tableTimestamp, long clientTimetamp) throws SQLException;
+ public MetaDataMutationResult getFunctions(PName tenantId, List<Pair<byte[], Long>> functionNameAndTimeStampPairs, long clientTimestamp) throws SQLException;
public MetaDataMutationResult createTable(List<Mutation> tableMetaData, byte[] tableName, PTableType tableType, Map<String,Object> tableProps, List<Pair<byte[],Map<String,Object>>> families, byte[][] splits) throws SQLException;
public MetaDataMutationResult dropTable(List<Mutation> tableMetadata, PTableType tableType, boolean cascade) throws SQLException;
+ public MetaDataMutationResult dropFunction(List<Mutation> tableMetadata, boolean ifExists) throws SQLException;
public MetaDataMutationResult addColumn(List<Mutation> tableMetaData, PTable table, Map<String, List<Pair<String,Object>>> properties, Set<String> colFamiliesForPColumnsToBeAdded) throws SQLException;
public MetaDataMutationResult dropColumn(List<Mutation> tableMetadata, PTableType tableType) throws SQLException;
public MetaDataMutationResult updateIndexState(List<Mutation> tableMetadata, String parentTableName) throws SQLException;
@@ -93,6 +96,7 @@ public interface ConnectionQueryServices extends QueryServices, MetaDataMutated
long currentSequenceValue(SequenceKey sequenceKey, long timestamp) throws SQLException;
void returnSequences(List<SequenceKey> sequenceKeys, long timestamp, SQLException[] exceptions) throws SQLException;
+ MetaDataMutationResult createFunction(List<Mutation> functionData, PFunction function, boolean temporary) throws SQLException;
void addConnection(PhoenixConnection connection) throws SQLException;
void removeConnection(PhoenixConnection connection) throws SQLException;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 49c946a..30b43d5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -87,9 +87,12 @@ import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheRequest
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheResponse;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearTableFromCacheRequest;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearTableFromCacheResponse;
+import org.apache.phoenix.coprocessor.generated.MetaDataProtos.CreateFunctionRequest;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.CreateTableRequest;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.DropColumnRequest;
+import org.apache.phoenix.coprocessor.generated.MetaDataProtos.DropFunctionRequest;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.DropTableRequest;
+import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetFunctionsRequest;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetTableRequest;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetVersionRequest;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetVersionResponse;
@@ -110,9 +113,11 @@ import org.apache.phoenix.index.PhoenixIndexCodec;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
+import org.apache.phoenix.parse.PFunction;
import org.apache.phoenix.protobuf.ProtobufUtil;
import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
import org.apache.phoenix.schema.EmptySequenceCacheException;
+import org.apache.phoenix.schema.FunctionNotFoundException;
import org.apache.phoenix.schema.MetaDataSplitPolicy;
import org.apache.phoenix.schema.NewerTableAlreadyExistsException;
import org.apache.phoenix.schema.PColumn;
@@ -220,6 +225,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
QueryServicesOptions.DEFAULT_MAX_CLIENT_METADATA_CACHE_SIZE);
return new PMetaDataImpl(INITIAL_META_DATA_TABLE_CAPACITY, maxSizeBytes);
}
+
/**
* Construct a ConnectionQueryServicesImpl that represents a connection to an HBase
* cluster.
@@ -562,6 +568,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
if (metadata == null) {
throwConnectionClosedException();
}
+
return new PhoenixConnection(this, url, info, metadata);
}
@@ -691,12 +698,14 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
// Setup split policy on Phoenix metadata table to ensure that the key values of a Phoenix table
// stay on the same region.
- if (SchemaUtil.isMetaTable(tableName)) {
+ if (SchemaUtil.isMetaTable(tableName) || SchemaUtil.isFunctionTable(tableName)) {
if (!descriptor.hasCoprocessor(MetaDataEndpointImpl.class.getName())) {
descriptor.addCoprocessor(MetaDataEndpointImpl.class.getName(), null, priority, null);
}
- if (!descriptor.hasCoprocessor(MetaDataRegionObserver.class.getName())) {
- descriptor.addCoprocessor(MetaDataRegionObserver.class.getName(), null, priority + 1, null);
+ if(SchemaUtil.isMetaTable(tableName) ) {
+ if (!descriptor.hasCoprocessor(MetaDataRegionObserver.class.getName())) {
+ descriptor.addCoprocessor(MetaDataRegionObserver.class.getName(), null, priority + 1, null);
+ }
}
} else if (SchemaUtil.isSequenceTable(tableName)) {
if (!descriptor.hasCoprocessor(SequenceRegionObserver.class.getName())) {
@@ -992,23 +1001,33 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
*/
private MetaDataMutationResult metaDataCoprocessorExec(byte[] tableKey,
Batch.Call<MetaDataService, MetaDataResponse> callable) throws SQLException {
+ return metaDataCoprocessorExec(tableKey, callable, PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);
+ }
+ /**
+ * Invoke meta data coprocessor with one retry if the key was found to not be in the regions
+ * (due to a table split)
+ */
+ private MetaDataMutationResult metaDataCoprocessorExec(byte[] tableKey,
+ Batch.Call<MetaDataService, MetaDataResponse> callable, byte[] tableName) throws SQLException {
+
try {
boolean retried = false;
while (true) {
if (retried) {
connection.relocateRegion(
- TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES),
+ TableName.valueOf(tableName),
tableKey);
}
- HTableInterface ht = this.getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);
+ HTableInterface ht = this.getTable(tableName);
try {
final Map<byte[], MetaDataResponse> results =
ht.coprocessorService(MetaDataService.class, tableKey, tableKey, callable);
assert(results.size() == 1);
MetaDataResponse result = results.values().iterator().next();
- if (result.getReturnCode() == MetaDataProtos.MutationCode.TABLE_NOT_IN_REGION) {
+ if (result.getReturnCode() == MetaDataProtos.MutationCode.TABLE_NOT_IN_REGION
+ || result.getReturnCode() == MetaDataProtos.MutationCode.FUNCTION_NOT_IN_REGION) {
if (retried) return MetaDataMutationResult.constructFromProto(result);
retried = true;
continue;
@@ -1331,6 +1350,37 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
return result;
}
+ @Override
+ public MetaDataMutationResult dropFunction(final List<Mutation> functionData, final boolean ifExists) throws SQLException {
+ byte[][] rowKeyMetadata = new byte[2][];
+ byte[] key = functionData.get(0).getRow();
+ SchemaUtil.getVarChars(key, rowKeyMetadata);
+ byte[] tenantIdBytes = rowKeyMetadata[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
+ byte[] functionBytes = rowKeyMetadata[PhoenixDatabaseMetaData.FUNTION_NAME_INDEX];
+ byte[] functionKey = SchemaUtil.getFunctionKey(tenantIdBytes, functionBytes);
+
+ final MetaDataMutationResult result = metaDataCoprocessorExec(functionKey,
+ new Batch.Call<MetaDataService, MetaDataResponse>() {
+ @Override
+ public MetaDataResponse call(MetaDataService instance) throws IOException {
+ ServerRpcController controller = new ServerRpcController();
+ BlockingRpcCallback<MetaDataResponse> rpcCallback =
+ new BlockingRpcCallback<MetaDataResponse>();
+ DropFunctionRequest.Builder builder = DropFunctionRequest.newBuilder();
+ for (Mutation m : functionData) {
+ MutationProto mp = ProtobufUtil.toProto(m);
+ builder.addTableMetadataMutations(mp.toByteString());
+ }
+ builder.setIfExists(ifExists);
+ instance.dropFunction(controller, builder.build(), rpcCallback);
+ if(controller.getFailedOn() != null) {
+ throw controller.getFailedOn();
+ }
+ return rpcCallback.get();
+ }
+ }, PhoenixDatabaseMetaData.SYSTEM_FUNCTION_NAME_BYTES);
+ return result;
+ }
private void invalidateTables(final List<byte[]> tableNamesToDelete) {
if (tableNamesToDelete != null) {
for ( byte[] tableName : tableNamesToDelete ) {
@@ -1944,6 +1994,13 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
PhoenixDatabaseMetaData.GUIDE_POSTS_ROW_COUNT + " "
+ PLong.INSTANCE.getSqlTypeName());
}
+ try {
+ metaConnection.createStatement().executeUpdate(
+ QueryConstants.CREATE_FUNCTION_METADATA);
+ } catch (NewerTableAlreadyExistsException e) {
+ } catch (TableAlreadyExistsException e) {
+ }
+
} catch (Exception e) {
if (e instanceof SQLException) {
initializationException = (SQLException)e;
@@ -2540,4 +2597,96 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
public int getSequenceSaltBuckets() {
return nSequenceSaltBuckets;
}
+
+ @Override
+ public PMetaData addFunction(PFunction function) throws SQLException {
+ synchronized (latestMetaDataLock) {
+ try {
+ throwConnectionClosedIfNullMetaData();
+ // If existing table isn't older than new table, don't replace
+ // If a client opens a connection at an earlier timestamp, this can happen
+ PFunction existingFunction = latestMetaData.getFunction(new PTableKey(function.getTenantId(), function.getFunctionName()));
+ if (existingFunction.getTimeStamp() >= function.getTimeStamp()) {
+ return latestMetaData;
+ }
+ } catch (FunctionNotFoundException e) {}
+ latestMetaData = latestMetaData.addFunction(function);
+ latestMetaDataLock.notifyAll();
+ return latestMetaData;
+ }
+ }
+
+ @Override
+ public PMetaData removeFunction(PName tenantId, String function, long functionTimeStamp)
+ throws SQLException {
+ synchronized (latestMetaDataLock) {
+ throwConnectionClosedIfNullMetaData();
+ latestMetaData = latestMetaData.removeFunction(tenantId, function, functionTimeStamp);
+ latestMetaDataLock.notifyAll();
+ return latestMetaData;
+ }
+ }
+
+ @Override
+ public MetaDataMutationResult getFunctions(PName tenantId, final List<Pair<byte[], Long>> functions,
+ final long clientTimestamp) throws SQLException {
+ final byte[] tenantIdBytes = tenantId == null ? ByteUtil.EMPTY_BYTE_ARRAY : tenantId.getBytes();
+ return metaDataCoprocessorExec(tenantIdBytes,
+ new Batch.Call<MetaDataService, MetaDataResponse>() {
+ @Override
+ public MetaDataResponse call(MetaDataService instance) throws IOException {
+ ServerRpcController controller = new ServerRpcController();
+ BlockingRpcCallback<MetaDataResponse> rpcCallback =
+ new BlockingRpcCallback<MetaDataResponse>();
+ GetFunctionsRequest.Builder builder = GetFunctionsRequest.newBuilder();
+ builder.setTenantId(HBaseZeroCopyByteString.wrap(tenantIdBytes));
+ for(Pair<byte[], Long> function: functions) {
+ builder.addFunctionNames(HBaseZeroCopyByteString.wrap(function.getFirst()));
+ builder.addFunctionTimestamps(function.getSecond().longValue());
+ }
+ builder.setClientTimestamp(clientTimestamp);
+
+ instance.getFunctions(controller, builder.build(), rpcCallback);
+ if(controller.getFailedOn() != null) {
+ throw controller.getFailedOn();
+ }
+ return rpcCallback.get();
+ }
+ }, PhoenixDatabaseMetaData.SYSTEM_FUNCTION_NAME_BYTES);
+
+ }
+
+ // TODO the mutations should be added to System functions table.
+ @Override
+ public MetaDataMutationResult createFunction(final List<Mutation> functionData,
+ final PFunction function, final boolean temporary) throws SQLException {
+ byte[][] rowKeyMetadata = new byte[2][];
+ Mutation m = MetaDataUtil.getPutOnlyTableHeaderRow(functionData);
+ byte[] key = m.getRow();
+ SchemaUtil.getVarChars(key, rowKeyMetadata);
+ byte[] tenantIdBytes = rowKeyMetadata[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
+ byte[] functionBytes = rowKeyMetadata[PhoenixDatabaseMetaData.FUNTION_NAME_INDEX];
+ byte[] functionKey = SchemaUtil.getFunctionKey(tenantIdBytes, functionBytes);
+ MetaDataMutationResult result = metaDataCoprocessorExec(functionKey,
+ new Batch.Call<MetaDataService, MetaDataResponse>() {
+ @Override
+ public MetaDataResponse call(MetaDataService instance) throws IOException {
+ ServerRpcController controller = new ServerRpcController();
+ BlockingRpcCallback<MetaDataResponse> rpcCallback =
+ new BlockingRpcCallback<MetaDataResponse>();
+ CreateFunctionRequest.Builder builder = CreateFunctionRequest.newBuilder();
+ for (Mutation m : functionData) {
+ MutationProto mp = ProtobufUtil.toProto(m);
+ builder.addTableMetadataMutations(mp.toByteString());
+ }
+ builder.setTemporary(temporary);
+ instance.createFunction(controller, builder.build(), rpcCallback);
+ if(controller.getFailedOn() != null) {
+ throw controller.getFailedOn();
+ }
+ return rpcCallback.get();
+ }
+ }, PhoenixDatabaseMetaData.SYSTEM_FUNCTION_NAME_BYTES);
+ return result;
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
index 742c38e..4d582be 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.query;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_STATE_BYTES;
import java.sql.SQLException;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -51,6 +52,8 @@ import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
+import org.apache.phoenix.parse.PFunction;
+import org.apache.phoenix.schema.FunctionNotFoundException;
import org.apache.phoenix.schema.NewerTableAlreadyExistsException;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PIndexState;
@@ -104,6 +107,7 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
super(queryServices);
userName = connInfo.getPrincipal();
metaData = newEmptyMetaData();
+
// Use KeyValueBuilder that builds real KeyValues, as our test utils require this
this.kvBuilder = GenericKeyValueBuilder.INSTANCE;
}
@@ -280,6 +284,11 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
// A TableAlreadyExistsException is not thrown, since the table only exists *after* this
// fixed timestamp.
}
+
+ try {
+ metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_FUNCTION_METADATA);
+ } catch (NewerTableAlreadyExistsException ignore) {
+ }
} catch (SQLException e) {
sqlE = e;
} finally {
@@ -479,5 +488,46 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
return getProps().getInt(QueryServices.SEQUENCE_SALT_BUCKETS_ATTRIB,
QueryServicesOptions.DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS);
}
-
+
+ @Override
+ public MetaDataMutationResult createFunction(List<Mutation> functionData, PFunction function, boolean temporary)
+ throws SQLException {
+ return new MetaDataMutationResult(MutationCode.FUNCTION_NOT_FOUND, 0l, null);
+ }
+
+ @Override
+ public PMetaData addFunction(PFunction function) throws SQLException {
+ return metaData = this.metaData.addFunction(function);
+ }
+
+ @Override
+ public PMetaData removeFunction(PName tenantId, String function, long functionTimeStamp)
+ throws SQLException {
+ return metaData = this.metaData.removeFunction(tenantId, function, functionTimeStamp);
+ }
+
+ @Override
+ public MetaDataMutationResult getFunctions(PName tenantId,
+ List<Pair<byte[], Long>> functionNameAndTimeStampPairs, long clientTimestamp)
+ throws SQLException {
+ List<PFunction> functions = new ArrayList<PFunction>(functionNameAndTimeStampPairs.size());
+ for(Pair<byte[], Long> functionInfo: functionNameAndTimeStampPairs) {
+ try {
+ PFunction function2 = metaData.getFunction(new PTableKey(tenantId, Bytes.toString(functionInfo.getFirst())));
+ functions.add(function2);
+ } catch (FunctionNotFoundException e) {
+ return new MetaDataMutationResult(MutationCode.FUNCTION_NOT_FOUND, 0, null);
+ }
+ }
+ if(functions.isEmpty()) {
+ return null;
+ }
+ return new MetaDataMutationResult(MutationCode.FUNCTION_ALREADY_EXISTS, 0, functions, true);
+ }
+
+ @Override
+ public MetaDataMutationResult dropFunction(List<Mutation> tableMetadata, boolean ifExists)
+ throws SQLException {
+ return new MetaDataMutationResult(MutationCode.FUNCTION_ALREADY_EXISTS, 0, null);
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
index e2c9544..2a98cd5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
@@ -35,6 +35,7 @@ import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.parse.PFunction;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PMetaData;
import org.apache.phoenix.schema.PName;
@@ -250,4 +251,34 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple
public int getSequenceSaltBuckets() {
return getDelegate().getSequenceSaltBuckets();
}
+
+ @Override
+ public MetaDataMutationResult createFunction(List<Mutation> functionData, PFunction function, boolean temporary)
+ throws SQLException {
+ return getDelegate().createFunction(functionData, function, temporary);
+ }
+
+ @Override
+ public PMetaData addFunction(PFunction function) throws SQLException {
+ return getDelegate().addFunction(function);
+ }
+
+ @Override
+ public PMetaData removeFunction(PName tenantId, String function, long functionTimeStamp)
+ throws SQLException {
+ return getDelegate().removeFunction(tenantId, function, functionTimeStamp);
+ }
+
+ @Override
+ public MetaDataMutationResult getFunctions(PName tenantId,
+ List<Pair<byte[], Long>> functionNameAndTimeStampPairs, long clientTimestamp)
+ throws SQLException {
+ return getDelegate().getFunctions(tenantId, functionNameAndTimeStampPairs, clientTimestamp);
+ }
+
+ @Override
+ public MetaDataMutationResult dropFunction(List<Mutation> tableMetadata, boolean ifExists)
+ throws SQLException {
+ return getDelegate().dropFunction(tableMetadata, ifExists);
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/query/MetaDataMutated.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/MetaDataMutated.java b/phoenix-core/src/main/java/org/apache/phoenix/query/MetaDataMutated.java
index ae37ac6..76e7593 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/MetaDataMutated.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/MetaDataMutated.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.query;
import java.sql.SQLException;
import java.util.List;
+import org.apache.phoenix.parse.PFunction;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PMetaData;
import org.apache.phoenix.schema.PName;
@@ -38,4 +39,6 @@ public interface MetaDataMutated {
PMetaData removeTable(PName tenantId, String tableName, String parentTableName, long tableTimeStamp) throws SQLException;
PMetaData addColumn(PName tenantId, String tableName, List<PColumn> columns, long tableTimeStamp, long tableSeqNum, boolean isImmutableRows, boolean isWalDisabled, boolean isMultitenant, boolean storeNulls) throws SQLException;
PMetaData removeColumn(PName tenantId, String tableName, List<PColumn> columnsToRemove, long tableTimeStamp, long tableSeqNum) throws SQLException;
+ PMetaData addFunction(PFunction function) throws SQLException;
+ PMetaData removeFunction(PName tenantId, String function, long functionTimeStamp) throws SQLException;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
index 6470b72..73d1123 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -86,6 +86,17 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_TYPE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TYPE_NAME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TYPE_SEQUENCE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLASS_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_FUNCTION_TABLE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ARG_POSITION;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.FUNCTION_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.JAR_PATH;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NUM_ARGS;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_ARRAY;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_CONSTANT;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_VALUE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TYPE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.RETURN_TYPE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_CONSTANT;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_STATEMENT;
@@ -276,4 +287,29 @@ public interface QueryConstants {
" CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY (" + TENANT_ID + "," + SEQUENCE_SCHEMA + "," + SEQUENCE_NAME + "))\n" +
HConstants.VERSIONS + "=" + MetaDataProtocol.DEFAULT_MAX_META_DATA_VERSIONS + ",\n" +
HColumnDescriptor.KEEP_DELETED_CELLS + "=" + MetaDataProtocol.DEFAULT_META_DATA_KEEP_DELETED_CELLS + "\n";
+
+ public static final String CREATE_FUNCTION_METADATA =
+ "CREATE TABLE " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_FUNCTION_TABLE + "\"(\n" +
+ // Pk columns
+ TENANT_ID + " VARCHAR NULL," +
+ FUNCTION_NAME + " VARCHAR NOT NULL, \n" +
+ NUM_ARGS + " INTEGER, \n" +
+ // Function metadata (will be null for argument row)
+ CLASS_NAME + " VARCHAR, \n" +
+ JAR_PATH + " VARCHAR, \n" +
+ RETURN_TYPE + " VARCHAR, \n" +
+ // Argument metadata (will be null for function row)
+ TYPE + " VARCHAR, \n" +
+ ARG_POSITION + " VARBINARY, \n" +
+ IS_ARRAY + " BOOLEAN, \n" +
+ IS_CONSTANT + " BOOLEAN, \n" +
+ DEFAULT_VALUE + " VARCHAR, \n" +
+ MIN_VALUE + " VARCHAR, \n" +
+ MAX_VALUE + " VARCHAR, \n" +
+ " CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY (" + TENANT_ID + ", " + FUNCTION_NAME + ", " + TYPE + ", " + ARG_POSITION + "))\n" +
+ HConstants.VERSIONS + "=" + MetaDataProtocol.DEFAULT_MAX_META_DATA_VERSIONS + ",\n" +
+ HColumnDescriptor.KEEP_DELETED_CELLS + "=" + MetaDataProtocol.DEFAULT_META_DATA_KEEP_DELETED_CELLS + ",\n"+
+ // Install split policy to prevent a tenant's metadata from being split across regions.
+ HTableDescriptor.SPLIT_POLICY + "='" + MetaDataSplitPolicy.class.getName() + "'\n";
+
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 4b793d1..9183a70 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -103,6 +103,7 @@ public interface QueryServices extends SQLCloseable {
public static final String REGIONSERVER_INFO_PORT_ATTRIB = "hbase.regionserver.info.port";
public static final String REGIONSERVER_LEASE_PERIOD_ATTRIB = "hbase.regionserver.lease.period";
public static final String RPC_TIMEOUT_ATTRIB = "hbase.rpc.timeout";
+ public static final String DYNAMIC_JARS_DIR_KEY = "hbase.dynamic.jars.dir";
public static final String ZOOKEEPER_QUARUM_ATTRIB = "hbase.zookeeper.quorum";
public static final String ZOOKEEPER_PORT_ATTRIB = "hbase.zookeeper.property.clientPort";
public static final String ZOOKEEPER_ROOT_NODE_ATTRIB = "zookeeper.znode.parent";
@@ -165,6 +166,7 @@ public interface QueryServices extends SQLCloseable {
public static final String METADATA_HANDLER_COUNT_ATTRIB = "phoenix.rpc.metadata.handler.count";
public static final String FORCE_ROW_KEY_ORDER_ATTRIB = "phoenix.query.force.rowkeyorder";
+ public static final String ALLOW_USER_DEFINED_FUNCTIONS_ATTRIB = "phoenix.functions.allowUserDefinedFunctions";
/**
http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index b98c9ee..972bf26 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -18,6 +18,7 @@
package org.apache.phoenix.query;
import static org.apache.phoenix.query.QueryServices.ALLOW_ONLINE_TABLE_SCHEMA_UPDATE;
+import static org.apache.phoenix.query.QueryServices.ALLOW_USER_DEFINED_FUNCTIONS_ATTRIB;
import static org.apache.phoenix.query.QueryServices.CALL_QUEUE_PRODUCER_ATTRIB_NAME;
import static org.apache.phoenix.query.QueryServices.CALL_QUEUE_ROUND_ROBIN_ATTRIB;
import static org.apache.phoenix.query.QueryServices.DATE_FORMAT_ATTRIB;
@@ -195,6 +196,7 @@ public class QueryServicesOptions {
public static final boolean DEFAULT_USE_BYTE_BASED_REGEX = false;
public static final boolean DEFAULT_FORCE_ROW_KEY_ORDER = false;
+ public static final boolean DEFAULT_ALLOW_USER_DEFINED_FUNCTIONS = false;
private final Configuration config;
@@ -250,7 +252,7 @@ public class QueryServicesOptions {
.setIfUnset(METRICS_ENABLED, DEFAULT_IS_METRICS_ENABLED)
.setIfUnset(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, DEFAULT_CLIENT_RPC_CONTROLLER_FACTORY)
.setIfUnset(USE_BYTE_BASED_REGEX_ATTRIB, DEFAULT_USE_BYTE_BASED_REGEX)
- .setIfUnset(FORCE_ROW_KEY_ORDER_ATTRIB, DEFAULT_FORCE_ROW_KEY_ORDER)
+ .setIfUnset(FORCE_ROW_KEY_ORDER_ATTRIB, DEFAULT_FORCE_ROW_KEY_ORDER);
;
// HBase sets this to 1, so we reset it to something more appropriate.
// Hopefully HBase will change this, because we can't know if a user set
http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/schema/FunctionAlreadyExistsException.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/FunctionAlreadyExistsException.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/FunctionAlreadyExistsException.java
new file mode 100644
index 0000000..91b9d07
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/FunctionAlreadyExistsException.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema;
+
+import java.sql.SQLException;
+
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.parse.PFunction;
+
+public class FunctionAlreadyExistsException extends SQLException {
+ private static final long serialVersionUID = 1L;
+ private static SQLExceptionCode code = SQLExceptionCode.FUNCTION_ALREADY_EXIST;
+ private final PFunction function;
+ private final String functionName;
+
+ public FunctionAlreadyExistsException(String functionName) {
+ this(functionName, null, null);
+ }
+
+ public FunctionAlreadyExistsException(String functionName, String msg) {
+ this(functionName, msg, null);
+ }
+
+ public FunctionAlreadyExistsException(String functionName, PFunction function) {
+ this(functionName, null, function);
+ }
+
+ public FunctionAlreadyExistsException(String functionName, String msg, PFunction function) {
+ super(new SQLExceptionInfo.Builder(code).setFunctionName(functionName).setMessage(msg).build().toString(),
+ code.getSQLState(), code.getErrorCode());
+ this.functionName = functionName;
+ this.function = function;
+ }
+
+ public String getFunctionName() {
+ return functionName;
+ }
+
+ public PFunction getFunction() {
+ return function;
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/schema/FunctionNotFoundException.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/FunctionNotFoundException.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/FunctionNotFoundException.java
new file mode 100644
index 0000000..73e23be
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/FunctionNotFoundException.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+
+public class FunctionNotFoundException extends MetaDataEntityNotFoundException {
+ private static final long serialVersionUID = 1L;
+ private static SQLExceptionCode code = SQLExceptionCode.FUNCTION_UNDEFINED;
+ private final String functionName;
+ private final long timestamp;
+
+ public FunctionNotFoundException(FunctionNotFoundException e, long timestamp) {
+ this(e.functionName, timestamp);
+ }
+
+ public FunctionNotFoundException(String functionName) {
+ this(functionName, HConstants.LATEST_TIMESTAMP);
+ }
+
+ public FunctionNotFoundException(String functionName, long timestamp) {
+ super(new SQLExceptionInfo.Builder(code).setFunctionName(functionName).build().toString(),
+ code.getSQLState(), code.getErrorCode(), null);
+ this.functionName = functionName;
+ this.timestamp = timestamp;
+ }
+
+ public String getFunctionName() {
+ return functionName;
+ }
+
+ public long getTimeStamp() {
+ return timestamp;
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 22208f1..fcdb651 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -32,15 +32,20 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TABLE_NAME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TYPE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DECIMAL_DIGITS;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAMILY_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_VALUE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DISABLE_WAL;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IMMUTABLE_ROWS;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_STATE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_TYPE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_ARRAY;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_CONSTANT;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_VIEW_REFERENCED;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.KEY_SEQ;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LAST_STATS_UPDATE_TIME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LINK_TYPE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MAX_VALUE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MIN_VALUE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MULTI_TENANT;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NULLABLE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ORDINAL_POSITION;
@@ -62,10 +67,20 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_CONSTANT;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_STATEMENT;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_TYPE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ARG_POSITION;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NUM_ARGS;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_FUNCTION_TABLE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.FUNCTION_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLASS_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.JAR_PATH;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TYPE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.RETURN_TYPE;
import static org.apache.phoenix.query.QueryServices.DROP_METADATA_ATTRIB;
import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_DROP_METADATA;
import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.sql.Array;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ParameterMetaData;
@@ -75,6 +90,7 @@ import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.sql.Types;
+import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection;
import java.util.Collections;
@@ -99,6 +115,7 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.DynamicClassLoader;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.compile.ColumnResolver;
import org.apache.phoenix.compile.ExplainPlan;
@@ -120,6 +137,8 @@ import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.expression.Determinism;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.RowKeyColumnExpression;
+import org.apache.phoenix.expression.function.FunctionExpression;
+import org.apache.phoenix.expression.function.ScalarFunction;
import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.jdbc.PhoenixConnection;
@@ -130,21 +149,27 @@ import org.apache.phoenix.parse.AddColumnStatement;
import org.apache.phoenix.parse.AlterIndexStatement;
import org.apache.phoenix.parse.ColumnDef;
import org.apache.phoenix.parse.ColumnName;
+import org.apache.phoenix.parse.CreateFunctionStatement;
import org.apache.phoenix.parse.CreateIndexStatement;
import org.apache.phoenix.parse.CreateSequenceStatement;
import org.apache.phoenix.parse.CreateTableStatement;
import org.apache.phoenix.parse.DropColumnStatement;
+import org.apache.phoenix.parse.DropFunctionStatement;
import org.apache.phoenix.parse.DropIndexStatement;
import org.apache.phoenix.parse.DropSequenceStatement;
import org.apache.phoenix.parse.DropTableStatement;
+import org.apache.phoenix.parse.FunctionParseNode;
import org.apache.phoenix.parse.IndexKeyConstraint;
import org.apache.phoenix.parse.NamedTableNode;
+import org.apache.phoenix.parse.PFunction;
import org.apache.phoenix.parse.ParseNode;
import org.apache.phoenix.parse.ParseNodeFactory;
import org.apache.phoenix.parse.PrimaryKeyConstraint;
import org.apache.phoenix.parse.TableName;
import org.apache.phoenix.parse.UpdateStatisticsStatement;
+import org.apache.phoenix.parse.PFunction.FunctionArgument;
import org.apache.phoenix.query.ConnectionQueryServices.Feature;
+import org.apache.phoenix.query.HBaseFactoryProvider;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
@@ -280,6 +305,28 @@ public class MetaDataClient {
COLUMN_FAMILY + "," +
ORDINAL_POSITION +
") VALUES (?, ?, ?, ?, ?, ?)";
+ private static final String CREATE_FUNCTION =
+ "UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_FUNCTION_TABLE + "\" ( " +
+ TENANT_ID +","+
+ FUNCTION_NAME + "," +
+ NUM_ARGS + "," +
+ CLASS_NAME + "," +
+ JAR_PATH + "," +
+ RETURN_TYPE +
+ ") VALUES (?, ?, ?, ?, ?, ?)";
+ private static final String INSERT_FUNCTION_ARGUMENT =
+ "UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_FUNCTION_TABLE + "\" ( " +
+ TENANT_ID +","+
+ FUNCTION_NAME + "," +
+ TYPE + "," +
+ ARG_POSITION +","+
+ IS_ARRAY + "," +
+ IS_CONSTANT + "," +
+ DEFAULT_VALUE + "," +
+ MIN_VALUE + "," +
+ MAX_VALUE +
+ ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
+
private final PhoenixConnection connection;
@@ -315,6 +362,24 @@ public class MetaDataClient {
return updateCache(tenantId, schemaName, tableName, false);
}
+ /**
+ * Update the cache with the latest as of the connection scn.
+ * @param functioNames
+ * @return the timestamp from the server, negative if the function was added to the cache and positive otherwise
+ * @throws SQLException
+ */
+ public MetaDataMutationResult updateCache(List<String> functionNames) throws SQLException {
+ return updateCache(functionNames, false);
+ }
+
+ private MetaDataMutationResult updateCache(List<String> functionNames, boolean alwaysHitServer) throws SQLException {
+ return updateCache(connection.getTenantId(), functionNames, alwaysHitServer);
+ }
+
+ public MetaDataMutationResult updateCache(PName tenantId, List<String> functionNames) throws SQLException {
+ return updateCache(tenantId, functionNames, false);
+ }
+
private long getClientTimeStamp() {
Long scn = connection.getSCN();
long clientTimeStamp = scn == null ? HConstants.LATEST_TIMESTAMP : scn;
@@ -394,6 +459,77 @@ public class MetaDataClient {
return result;
}
+
+ private MetaDataMutationResult updateCache(PName tenantId, List<String> functionNames,
+ boolean alwaysHitServer) throws SQLException { // TODO: pass byte[] herez
+ long clientTimeStamp = getClientTimeStamp();
+ List<PFunction> functions = new ArrayList<PFunction>(functionNames.size());
+ List<Long> functionTimeStamps = new ArrayList<Long>(functionNames.size());
+ Iterator<String> iterator = functionNames.iterator();
+ while (iterator.hasNext()) {
+ PFunction function = null;
+ try {
+ String functionName = iterator.next();
+ function =
+ connection.getMetaDataCache().getFunction(
+ new PTableKey(tenantId, functionName));
+ if (function != null && !alwaysHitServer
+ && function.getTimeStamp() == clientTimeStamp - 1) {
+ functions.add(function);
+ iterator.remove();
+ continue;
+ }
+ if (function != null && function.getTimeStamp() != clientTimeStamp - 1) {
+ functionTimeStamps.add(function.getTimeStamp());
+ } else {
+ functionTimeStamps.add(HConstants.LATEST_TIMESTAMP);
+ }
+ } catch (FunctionNotFoundException e) {
+ functionTimeStamps.add(HConstants.LATEST_TIMESTAMP);
+ }
+ }
+ // Don't bother with server call: we can't possibly find a newer function
+ if (functionNames.isEmpty()) {
+ return new MetaDataMutationResult(MutationCode.FUNCTION_ALREADY_EXISTS,QueryConstants.UNSET_TIMESTAMP,functions, true);
+ }
+
+ int maxTryCount = tenantId == null ? 1 : 2;
+ int tryCount = 0;
+ MetaDataMutationResult result;
+
+ do {
+ List<Pair<byte[], Long>> functionsToFecth = new ArrayList<Pair<byte[], Long>>(functionNames.size());
+ for(int i = 0; i< functionNames.size(); i++) {
+ functionsToFecth.add(new Pair<byte[], Long>(PVarchar.INSTANCE.toBytes(functionNames.get(i)), functionTimeStamps.get(i)));
+ }
+ result = connection.getQueryServices().getFunctions(tenantId, functionsToFecth, clientTimeStamp);
+
+ MutationCode code = result.getMutationCode();
+ // We found an updated table, so update our cache
+ if (result.getFunctions() != null && !result.getFunctions().isEmpty()) {
+ result.getFunctions().addAll(functions);
+ addFunctionToCache(result);
+ return result;
+ } else {
+ if (code == MutationCode.FUNCTION_ALREADY_EXISTS) {
+ result.getFunctions().addAll(functions);
+ addFunctionToCache(result);
+ return result;
+ }
+ if (code == MutationCode.FUNCTION_NOT_FOUND && tryCount + 1 == maxTryCount) {
+ for (Pair<byte[], Long> f : functionsToFecth) {
+ connection.removeFunction(tenantId, Bytes.toString(f.getFirst()),
+ f.getSecond());
+ }
+ // TODO removeFunctions all together from cache when
+ throw new FunctionNotFoundException(functionNames.toString() + " not found");
+ }
+ }
+ tenantId = null; // Try again with global tenantId
+ } while (++tryCount < maxTryCount);
+
+ return result;
+ }
/**
* Fault in the physical table to the cache and add any indexes it has to the indexes
@@ -540,6 +676,20 @@ public class MetaDataClient {
colUpsert.execute();
}
+ private void addFunctionArgMutation(String functionName, FunctionArgument arg, PreparedStatement argUpsert, int position) throws SQLException {
+ argUpsert.setString(1, connection.getTenantId() == null ? null : connection.getTenantId().getString());
+ argUpsert.setString(2, functionName);
+ argUpsert.setString(3, arg.getArgumentType());
+ byte[] bytes = Bytes.toBytes((short)position);
+ argUpsert.setBytes(4, bytes);
+ argUpsert.setBoolean(5, arg.isArrayType());
+ argUpsert.setBoolean(6, arg.isConstant());
+ argUpsert.setString(7, arg.getDefaultValue() == null? null: (String)arg.getDefaultValue().getValue());
+ argUpsert.setString(8, arg.getMinValue() == null? null: (String)arg.getMinValue().getValue());
+ argUpsert.setString(9, arg.getMaxValue() == null? null: (String)arg.getMaxValue().getValue());
+ argUpsert.execute();
+ }
+
private PColumn newColumn(int position, ColumnDef def, PrimaryKeyConstraint pkConstraint, String defaultColumnFamily, boolean addingToPK) throws SQLException {
try {
ColumnName columnDefName = def.getColumnDefName();
@@ -954,7 +1104,7 @@ public class MetaDataClient {
}
while (true) {
try {
- ColumnResolver resolver = FromCompiler.getResolver(statement, connection);
+ ColumnResolver resolver = FromCompiler.getResolver(statement, connection, statement.getUdfParseNodes());
tableRef = resolver.getTables().get(0);
PTable dataTable = tableRef.getTable();
boolean isTenantConnection = connection.getTenantId() != null;
@@ -1198,6 +1348,57 @@ public class MetaDataClient {
return new MutationState(1, connection);
}
+ public MutationState createFunction(CreateFunctionStatement stmt) throws SQLException {
+ boolean wasAutoCommit = connection.getAutoCommit();
+ connection.rollback();
+ try {
+ PFunction function = new PFunction(stmt.getFunctionInfo(), stmt.isTemporary());
+ connection.setAutoCommit(false);
+ String tenantIdStr = connection.getTenantId() == null ? null : connection.getTenantId().getString();
+ List<Mutation> functionData = Lists.newArrayListWithExpectedSize(function.getFunctionArguments().size() + 1);
+
+ List<FunctionArgument> args = function.getFunctionArguments();
+ PreparedStatement argUpsert = connection.prepareStatement(INSERT_FUNCTION_ARGUMENT);
+
+ for (int i = 0; i < args.size(); i++) {
+ FunctionArgument arg = args.get(i);
+ addFunctionArgMutation(function.getFunctionName(), arg, argUpsert, i);
+ }
+ functionData.addAll(connection.getMutationState().toMutations().next().getSecond());
+ connection.rollback();
+
+ PreparedStatement functionUpsert = connection.prepareStatement(CREATE_FUNCTION);
+ functionUpsert.setString(1, tenantIdStr);
+ functionUpsert.setString(2, function.getFunctionName());
+ functionUpsert.setInt(3, function.getFunctionArguments().size());
+ functionUpsert.setString(4, function.getClassName());
+ functionUpsert.setString(5, function.getJarPath());
+ functionUpsert.setString(6, function.getReturnType());
+ functionUpsert.execute();
+ functionData.addAll(connection.getMutationState().toMutations().next().getSecond());
+ connection.rollback();
+ MetaDataMutationResult result = connection.getQueryServices().createFunction(functionData, function, stmt.isTemporary());
+ MutationCode code = result.getMutationCode();
+ switch(code) {
+ case FUNCTION_ALREADY_EXISTS:
+ throw new FunctionAlreadyExistsException(function.getFunctionName(), result
+ .getFunctions().get(0));
+ case NEWER_FUNCTION_FOUND:
+ // Add function to ConnectionQueryServices so it's cached, but don't add
+ // it to this connection as we can't see it.
+ throw new NewerFunctionAlreadyExistsException(function.getFunctionName(), result.getFunctions().get(0));
+ default:
+ List<PFunction> functions = new ArrayList<PFunction>(1);
+ functions.add(function);
+ result = new MetaDataMutationResult(code, result.getMutationTime(), functions, true);
+ addFunctionToCache(result);
+ }
+ } finally {
+ connection.setAutoCommit(wasAutoCommit);
+ }
+ return new MutationState(1, connection);
+ }
+
private static ColumnDef findColumnDefOrNull(List<ColumnDef> colDefs, ColumnName colName) {
for (ColumnDef colDef : colDefs) {
if (colDef.getColumnDefName().getColumnName().equals(colName.getColumnName())) {
@@ -1807,6 +2008,10 @@ public class MetaDataClient {
return dropTable(schemaName, tableName, null, statement.getTableType(), statement.ifExists(), statement.cascade());
}
+ public MutationState dropFunction(DropFunctionStatement statement) throws SQLException {
+ return dropFunction(statement.getFunctionName(), statement.ifExists());
+ }
+
public MutationState dropIndex(DropIndexStatement statement) throws SQLException {
String schemaName = statement.getTableName().getSchemaName();
String tableName = statement.getIndexName().getName();
@@ -1814,6 +2019,46 @@ public class MetaDataClient {
return dropTable(schemaName, tableName, parentTableName, PTableType.INDEX, statement.ifExists(), false);
}
+ private MutationState dropFunction(String functionName,
+ boolean ifExists) throws SQLException {
+ connection.rollback();
+ boolean wasAutoCommit = connection.getAutoCommit();
+ try {
+ PName tenantId = connection.getTenantId();
+ byte[] key =
+ SchemaUtil.getFunctionKey(tenantId == null ? ByteUtil.EMPTY_BYTE_ARRAY
+ : tenantId.getBytes(), Bytes.toBytes(functionName));
+ Long scn = connection.getSCN();
+ long clientTimeStamp = scn == null ? HConstants.LATEST_TIMESTAMP : scn;
+ try {
+ PFunction function = connection.getMetaDataCache().getFunction(new PTableKey(tenantId, functionName));
+ if (function.isTemporaryFunction()) {
+ connection.removeFunction(tenantId, functionName, clientTimeStamp);
+ return new MutationState(0, connection);
+ }
+ } catch(FunctionNotFoundException e) {
+
+ }
+ List<Mutation> functionMetaData = Lists.newArrayListWithExpectedSize(2);
+ Delete functionDelete = new Delete(key, clientTimeStamp);
+ functionMetaData.add(functionDelete);
+ MetaDataMutationResult result = connection.getQueryServices().dropFunction(functionMetaData, ifExists);
+ MutationCode code = result.getMutationCode();
+ switch (code) {
+ case FUNCTION_NOT_FOUND:
+ if (!ifExists) {
+ throw new FunctionNotFoundException(functionName);
+ }
+ break;
+ default:
+ connection.removeFunction(tenantId, functionName, result.getMutationTime());
+ break;
+ }
+ return new MutationState(0, connection);
+ } finally {
+ connection.setAutoCommit(wasAutoCommit);
+ }
+ }
private MutationState dropTable(String schemaName, String tableName, String parentTableName, PTableType tableType,
boolean ifExists, boolean cascade) throws SQLException {
connection.rollback();
@@ -2658,7 +2903,14 @@ public class MetaDataClient {
connection.addTable(table);
return table;
}
-
+
+ private List<PFunction> addFunctionToCache(MetaDataMutationResult result) throws SQLException {
+ for(PFunction function: result.getFunctions()) {
+ connection.addFunction(function);
+ }
+ return result.getFunctions();
+ }
+
private void throwIfAlteringViewPK(ColumnDef col, PTable table) throws SQLException {
if (col != null && col.isPK() && table.getType() == PTableType.VIEW) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_MODIFY_VIEW_PK)