You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ra...@apache.org on 2015/04/27 10:33:50 UTC
[6/7] phoenix git commit: PHOENIX-538 Support UDFs(Rajeshbabu
Chintaguntla)
http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
index af6c712..78f54e8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
@@ -63,6 +63,7 @@ import org.apache.phoenix.parse.TableName;
import org.apache.phoenix.parse.TableNode;
import org.apache.phoenix.parse.TableNodeVisitor;
import org.apache.phoenix.parse.TableWildcardParseNode;
+import org.apache.phoenix.parse.UDFParseNode;
import org.apache.phoenix.parse.WildcardParseNode;
import org.apache.phoenix.schema.ColumnNotFoundException;
import org.apache.phoenix.schema.ColumnRef;
@@ -688,7 +689,7 @@ public class JoinCompiler {
if (isSubselect())
return SubselectRewriter.applyOrderBy(SubselectRewriter.applyPostFilters(subselect, preFilters, tableNode.getAlias()), orderBy, tableNode.getAlias());
- return NODE_FACTORY.select(tableNode, select.getHint(), false, selectNodes, getPreFiltersCombined(), null, null, orderBy, null, 0, false, select.hasSequence(), Collections.<SelectStatement>emptyList());
+ return NODE_FACTORY.select(tableNode, select.getHint(), false, selectNodes, getPreFiltersCombined(), null, null, orderBy, null, 0, false, select.hasSequence(), Collections.<SelectStatement>emptyList(), select.getUdfParseNodes());
}
public boolean hasFilters() {
@@ -1177,7 +1178,7 @@ public class JoinCompiler {
TableRef tableRef = table.getTableRef();
List<ParseNode> groupBy = tableRef.equals(groupByTableRef) ? select.getGroupBy() : null;
List<OrderByNode> orderBy = tableRef.equals(orderByTableRef) ? select.getOrderBy() : null;
- SelectStatement stmt = getSubqueryForOptimizedPlan(select.getHint(), table.getDynamicColumns(), tableRef, join.getColumnRefs(), table.getPreFiltersCombined(), groupBy, orderBy, table.isWildCardSelect(), select.hasSequence());
+ SelectStatement stmt = getSubqueryForOptimizedPlan(select.getHint(), table.getDynamicColumns(), tableRef, join.getColumnRefs(), table.getPreFiltersCombined(), groupBy, orderBy, table.isWildCardSelect(), select.hasSequence(), select.getUdfParseNodes());
QueryPlan plan = statement.getConnection().getQueryServices().getOptimizer().optimize(statement, stmt);
if (!plan.getTableRef().equals(tableRef)) {
replacement.put(tableRef, plan.getTableRef());
@@ -1247,7 +1248,7 @@ public class JoinCompiler {
}
private static SelectStatement getSubqueryForOptimizedPlan(HintNode hintNode, List<ColumnDef> dynamicCols, TableRef tableRef, Map<ColumnRef, ColumnRefType> columnRefs, ParseNode where, List<ParseNode> groupBy,
- List<OrderByNode> orderBy, boolean isWildCardSelect, boolean hasSequence) {
+ List<OrderByNode> orderBy, boolean isWildCardSelect, boolean hasSequence, Map<String, UDFParseNode> udfParseNodes) {
String schemaName = tableRef.getTable().getSchemaName().getString();
TableName tName = TableName.create(schemaName.length() == 0 ? null : schemaName, tableRef.getTable().getTableName().getString());
List<AliasedNode> selectList = new ArrayList<AliasedNode>();
@@ -1267,7 +1268,7 @@ public class JoinCompiler {
String tableAlias = tableRef.getTableAlias();
TableNode from = NODE_FACTORY.namedTable(tableAlias == null ? null : '"' + tableAlias + '"', tName, dynamicCols);
- return NODE_FACTORY.select(from, hintNode, false, selectList, where, groupBy, null, orderBy, null, 0, groupBy != null, hasSequence, Collections.<SelectStatement>emptyList());
+ return NODE_FACTORY.select(from, hintNode, false, selectList, where, groupBy, null, orderBy, null, 0, groupBy != null, hasSequence, Collections.<SelectStatement>emptyList(), udfParseNodes);
}
public static PTable joinProjectedTables(PTable left, PTable right, JoinType type) throws SQLException {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
index 0c586f0..fcbeb7e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
@@ -34,6 +34,7 @@ import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixParameterMetaData;
import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.parse.PFunction;
import org.apache.phoenix.parse.SelectStatement;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.AmbiguousColumnException;
@@ -123,6 +124,11 @@ public class PostDDLCompiler {
public List<TableRef> getTables() {
return Collections.singletonList(tableRef);
}
+
+ public java.util.List<PFunction> getFunctions() {
+ return Collections.emptyList();
+ };
+
@Override
public TableRef resolveTable(String schemaName, String tableName)
throws SQLException {
@@ -135,6 +141,14 @@ public class PostDDLCompiler {
: tableRef.getTable().getColumn(colName);
return new ColumnRef(tableRef, column.getPosition());
}
+
+ public PFunction resolveFunction(String functionName) throws SQLException {
+ throw new UnsupportedOperationException();
+ };
+
+ public boolean hasUDFs() {
+ return false;
+ };
};
PhoenixStatement statement = new PhoenixStatement(connection);
StatementContext context = new StatementContext(statement, resolver, scan, new SequenceManager(statement));
http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java
index e84ca2a..c39db09 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java
@@ -458,7 +458,7 @@ public class ProjectionCompiler {
projectColumnFamily(table, scan, family);
}
}
- return new RowProjector(projectedColumns, estimatedByteSize, isProjectEmptyKeyValue);
+ return new RowProjector(projectedColumns, estimatedByteSize, isProjectEmptyKeyValue, resolver.hasUDFs());
}
private static void projectAllColumnFamilies(PTable table, Scan scan) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
index 3100664..e877e03 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -21,6 +21,7 @@ import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Set;
@@ -62,6 +63,7 @@ import org.apache.phoenix.parse.SQLParser;
import org.apache.phoenix.parse.SelectStatement;
import org.apache.phoenix.parse.SubqueryParseNode;
import org.apache.phoenix.parse.TableNode;
+import org.apache.phoenix.parse.UDFParseNode;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.AmbiguousColumnException;
import org.apache.phoenix.schema.ColumnNotFoundException;
@@ -240,13 +242,13 @@ public class QueryCompiler {
context.setCurrentTable(table.getTableRef());
PTable projectedTable = table.createProjectedTable(!projectPKColumns, context);
TupleProjector.serializeProjectorIntoScan(context.getScan(), new TupleProjector(projectedTable));
- context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable));
+ context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), subquery.getUdfParseNodes()));
table.projectColumns(context.getScan());
return compileSingleQuery(context, subquery, binds, asSubquery, !asSubquery);
}
QueryPlan plan = compileSubquery(subquery, false);
PTable projectedTable = table.createProjectedTable(plan.getProjector());
- context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable));
+ context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), subquery.getUdfParseNodes()));
return new TupleProjectionPlan(plan, new TupleProjector(plan.getProjector()), table.compilePostFilterExpression(context));
}
@@ -295,7 +297,7 @@ public class QueryCompiler {
} else {
tables[i] = null;
}
- context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable));
+ context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), query.getUdfParseNodes()));
joinIds[i] = new ImmutableBytesPtr(emptyByteArray); // place-holder
Pair<List<Expression>, List<Expression>> joinConditions = joinSpec.compileJoinConditions(context, subContext, true);
joinExpressions[i] = joinConditions.getFirst();
@@ -354,7 +356,7 @@ public class QueryCompiler {
tupleProjector = new TupleProjector(plan.getProjector());
}
context.setCurrentTable(rhsTableRef);
- context.setResolver(FromCompiler.getResolverForProjectedTable(rhsProjTable));
+ context.setResolver(FromCompiler.getResolverForProjectedTable(rhsProjTable, context.getConnection(), rhs.getUdfParseNodes()));
ImmutableBytesPtr[] joinIds = new ImmutableBytesPtr[] {new ImmutableBytesPtr(emptyByteArray)};
Pair<List<Expression>, List<Expression>> joinConditions = lastJoinSpec.compileJoinConditions(lhsCtx, context, true);
List<Expression> joinExpressions = joinConditions.getSecond();
@@ -364,7 +366,7 @@ public class QueryCompiler {
int fieldPosition = needsMerge ? rhsProjTable.getColumns().size() - rhsProjTable.getPKColumns().size() : 0;
PTable projectedTable = needsMerge ? JoinCompiler.joinProjectedTables(rhsProjTable, lhsTable, type == JoinType.Right ? JoinType.Left : type) : rhsProjTable;
TupleProjector.serializeProjectorIntoScan(context.getScan(), tupleProjector);
- context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable));
+ context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), rhs.getUdfParseNodes()));
QueryPlan rhsPlan = compileSingleQuery(context, rhs, binds, asSubquery, !asSubquery && type == JoinType.Right);
Expression postJoinFilterExpression = joinTable.compilePostFilterExpression(context, rhsTable);
Integer limit = null;
@@ -413,7 +415,7 @@ public class QueryCompiler {
int fieldPosition = needsMerge ? lhsProjTable.getColumns().size() - lhsProjTable.getPKColumns().size() : 0;
PTable projectedTable = needsMerge ? JoinCompiler.joinProjectedTables(lhsProjTable, rhsProjTable, type == JoinType.Right ? JoinType.Left : type) : lhsProjTable;
- ColumnResolver resolver = FromCompiler.getResolverForProjectedTable(projectedTable);
+ ColumnResolver resolver = FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), new HashMap<String,UDFParseNode>(1));
TableRef tableRef = resolver.getTables().get(0);
StatementContext subCtx = new StatementContext(statement, resolver, ScanUtil.newScan(originalScan), new SequenceManager(statement));
subCtx.setCurrentTable(tableRef);
@@ -422,7 +424,7 @@ public class QueryCompiler {
context.setResolver(resolver);
TableNode from = NODE_FACTORY.namedTable(tableRef.getTableAlias(), NODE_FACTORY.table(tableRef.getTable().getSchemaName().getString(), tableRef.getTable().getTableName().getString()));
ParseNode where = joinTable.getPostFiltersCombined();
- SelectStatement select = asSubquery ? NODE_FACTORY.select(from, joinTable.getStatement().getHint(), false, Collections.<AliasedNode> emptyList(), where, null, null, orderBy, null, 0, false, joinTable.getStatement().hasSequence(), Collections.<SelectStatement>emptyList())
+ SelectStatement select = asSubquery ? NODE_FACTORY.select(from, joinTable.getStatement().getHint(), false, Collections.<AliasedNode> emptyList(), where, null, null, orderBy, null, 0, false, joinTable.getStatement().hasSequence(), Collections.<SelectStatement>emptyList(), joinTable.getStatement().getUdfParseNodes())
: NODE_FACTORY.select(joinTable.getStatement(), from, where);
return compileSingleFlatQuery(context, select, binds, asSubquery, false, innerPlan, null, isInRowKeyOrder);
@@ -505,7 +507,7 @@ public class QueryCompiler {
if (this.projectTuples) {
projectedTable = TupleProjectionCompiler.createProjectedTable(select, context);
if (projectedTable != null) {
- context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable));
+ context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), select.getUdfParseNodes()));
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/compile/RowProjector.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/RowProjector.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/RowProjector.java
index 1b35e92..c60933e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/RowProjector.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/RowProjector.java
@@ -52,9 +52,10 @@ public class RowProjector {
private final int estimatedSize;
private final boolean isProjectEmptyKeyValue;
private final boolean cloneRequired;
+ private final boolean hasUDFs;
public RowProjector(RowProjector projector, boolean isProjectEmptyKeyValue) {
- this(projector.getColumnProjectors(), projector.getEstimatedRowByteSize(), isProjectEmptyKeyValue);
+ this(projector.getColumnProjectors(), projector.getEstimatedRowByteSize(), isProjectEmptyKeyValue, projector.hasUDFs);
}
/**
* Construct RowProjector based on a list of ColumnProjectors.
@@ -64,6 +65,18 @@ public class RowProjector {
* @param estimatedRowSize
*/
public RowProjector(List<? extends ColumnProjector> columnProjectors, int estimatedRowSize, boolean isProjectEmptyKeyValue) {
+ this(columnProjectors, estimatedRowSize, isProjectEmptyKeyValue, false);
+ }
+ /**
+ * Construct RowProjector based on a list of ColumnProjectors.
+ * @param columnProjectors ordered list of ColumnProjectors corresponding to projected columns in SELECT clause
+ * aggregating coprocessor. Only required in the case of an aggregate query with a limit clause and otherwise may
+ * be null.
+ * @param estimatedRowSize
+ * @param isProjectEmptyKeyValue
+ * @param hasUDFs
+ */
+ public RowProjector(List<? extends ColumnProjector> columnProjectors, int estimatedRowSize, boolean isProjectEmptyKeyValue, boolean hasUDFs) {
this.columnProjectors = Collections.unmodifiableList(columnProjectors);
int position = columnProjectors.size();
reverseIndex = ArrayListMultimap.<String, Integer>create();
@@ -82,15 +95,18 @@ public class RowProjector {
this.someCaseSensitive = someCaseSensitive;
this.estimatedSize = estimatedRowSize;
this.isProjectEmptyKeyValue = isProjectEmptyKeyValue;
+ this.hasUDFs = hasUDFs;
boolean hasPerInvocationExpression = false;
- for (int i = 0; i < this.columnProjectors.size(); i++) {
- Expression expression = this.columnProjectors.get(i).getExpression();
- if (expression.getDeterminism() == Determinism.PER_INVOCATION) {
- hasPerInvocationExpression = true;
- break;
+ if (!hasUDFs) {
+ for (int i = 0; i < this.columnProjectors.size(); i++) {
+ Expression expression = this.columnProjectors.get(i).getExpression();
+ if (expression.getDeterminism() == Determinism.PER_INVOCATION) {
+ hasPerInvocationExpression = true;
+ break;
+ }
}
}
- this.cloneRequired = hasPerInvocationExpression;
+ this.cloneRequired = hasPerInvocationExpression || hasUDFs;
}
public RowProjector cloneIfNecessary() {
@@ -114,7 +130,7 @@ public class RowProjector {
}
return new RowProjector(clonedColProjectors,
this.getEstimatedRowByteSize(),
- this.isProjectEmptyKeyValue());
+ this.isProjectEmptyKeyValue(), this.hasUDFs);
}
public boolean isProjectEmptyKeyValue() {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementNormalizer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementNormalizer.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementNormalizer.java
index b9897b1..9b54c86 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementNormalizer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementNormalizer.java
@@ -30,6 +30,7 @@ import org.apache.phoenix.parse.ComparisonParseNode;
import org.apache.phoenix.parse.DerivedTableNode;
import org.apache.phoenix.parse.FamilyWildcardParseNode;
import org.apache.phoenix.parse.JoinTableNode;
+import org.apache.phoenix.parse.NamedNode;
import org.apache.phoenix.parse.JoinTableNode.JoinType;
import org.apache.phoenix.parse.LessThanOrEqualParseNode;
import org.apache.phoenix.parse.NamedTableNode;
@@ -99,7 +100,7 @@ public class StatementNormalizer extends ParseNodeRewriter {
if (selectNodes != normSelectNodes) {
statement = NODE_FACTORY.select(statement.getFrom(), statement.getHint(), statement.isDistinct(),
normSelectNodes, statement.getWhere(), statement.getGroupBy(), statement.getHaving(), statement.getOrderBy(),
- statement.getLimit(), statement.getBindCount(), statement.isAggregate(), statement.hasSequence(), statement.getSelects());
+ statement.getLimit(), statement.getBindCount(), statement.isAggregate(), statement.hasSequence(), statement.getSelects(), statement.getUdfParseNodes());
}
}
@@ -151,7 +152,7 @@ public class StatementNormalizer extends ParseNodeRewriter {
}
return super.visitLeave(node, nodes);
}
-
+
@Override
public ParseNode visitLeave(final BetweenParseNode node, List<ParseNode> nodes) throws SQLException {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/compile/SubqueryRewriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/SubqueryRewriter.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/SubqueryRewriter.java
index 1746d8a..123cb6a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/SubqueryRewriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/SubqueryRewriter.java
@@ -341,7 +341,7 @@ public class SubqueryRewriter extends ParseNodeRewriter {
groupbyNodes.set(i - 1, aliasedNode.getNode());
}
SelectStatement derivedTableStmt = NODE_FACTORY.select(subquery, subquery.isDistinct(), derivedTableSelect, where, derivedTableGroupBy, true);
- subquery = NODE_FACTORY.select(NODE_FACTORY.derivedTable(derivedTableAlias, derivedTableStmt), subquery.getHint(), false, selectNodes, null, groupbyNodes, null, Collections.<OrderByNode> emptyList(), null, subquery.getBindCount(), true, false, Collections.<SelectStatement>emptyList());
+ subquery = NODE_FACTORY.select(NODE_FACTORY.derivedTable(derivedTableAlias, derivedTableStmt), subquery.getHint(), false, selectNodes, null, groupbyNodes, null, Collections.<OrderByNode> emptyList(), null, subquery.getBindCount(), true, false, Collections.<SelectStatement>emptyList(), subquery.getUdfParseNodes());
}
ParseNode onNode = conditionExtractor.getJoinCondition();
@@ -364,7 +364,7 @@ public class SubqueryRewriter extends ParseNodeRewriter {
return select;
// Wrap as a derived table.
- return NODE_FACTORY.select(NODE_FACTORY.derivedTable(ParseNodeFactory.createTempAlias(), select), HintNode.EMPTY_HINT_NODE, false, select.getSelect(), null, null, null, null, null, select.getBindCount(), false, false, Collections.<SelectStatement> emptyList());
+ return NODE_FACTORY.select(NODE_FACTORY.derivedTable(ParseNodeFactory.createTempAlias(), select), HintNode.EMPTY_HINT_NODE, false, select.getSelect(), null, null, null, null, null, select.getBindCount(), false, false, Collections.<SelectStatement> emptyList(), select.getUdfParseNodes());
}
private List<AliasedNode> fixAliasedNodes(List<AliasedNode> nodes, boolean addSelectOne) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/compile/SubselectRewriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/SubselectRewriter.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/SubselectRewriter.java
index 6862802..5a91a17 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/SubselectRewriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/SubselectRewriter.java
@@ -205,7 +205,7 @@ public class SubselectRewriter extends ParseNodeRewriter {
}
return NODE_FACTORY.select(subselect.getFrom(), hintRewrite, isDistinctRewrite, selectNodesRewrite, whereRewrite, groupByRewrite,
- havingRewrite, orderByRewrite, limitRewrite, select.getBindCount(), isAggregateRewrite, select.hasSequence(), select.getSelects());
+ havingRewrite, orderByRewrite, limitRewrite, select.getBindCount(), isAggregateRewrite, select.hasSequence(), select.getSelects(), select.getUdfParseNodes());
}
private SelectStatement applyPostFilters(SelectStatement statement, List<ParseNode> postFilters) throws SQLException {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index fab1ad0..cd10007 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -51,6 +51,17 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_CONSTANT_BYTE
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_STATEMENT_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_TYPE_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.FUNCTION_NAME_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NUM_ARGS_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_ARRAY_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_CONSTANT_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_VALUE_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MIN_VALUE_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MAX_VALUE_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLASS_NAME_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.JAR_PATH_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TYPE_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.RETURN_TYPE_BYTES;
import static org.apache.phoenix.schema.PTableType.INDEX;
import static org.apache.phoenix.util.SchemaUtil.getVarCharLength;
import static org.apache.phoenix.util.SchemaUtil.getVarChars;
@@ -61,6 +72,8 @@ import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
import java.util.List;
import java.util.Set;
@@ -91,22 +104,29 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegion.RowLock;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.cache.GlobalCache;
+import org.apache.phoenix.cache.GlobalCache.FunctionBytesPtr;
+import org.apache.phoenix.compile.ScanRanges;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.AddColumnRequest;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheRequest;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheResponse;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearTableFromCacheRequest;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearTableFromCacheResponse;
+import org.apache.phoenix.coprocessor.generated.MetaDataProtos.CreateFunctionRequest;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.CreateTableRequest;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.DropColumnRequest;
+import org.apache.phoenix.coprocessor.generated.MetaDataProtos.DropFunctionRequest;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.DropTableRequest;
+import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetFunctionsRequest;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetTableRequest;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetVersionRequest;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetVersionResponse;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.UpdateIndexStateRequest;
+import org.apache.phoenix.expression.LiteralExpression;
import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
@@ -115,7 +135,10 @@ import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.metrics.Metrics;
+import org.apache.phoenix.parse.PFunction;
+import org.apache.phoenix.parse.PFunction.FunctionArgument;
import org.apache.phoenix.protobuf.ProtobufUtil;
+import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.AmbiguousColumnException;
import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
@@ -123,6 +146,7 @@ import org.apache.phoenix.schema.ColumnNotFoundException;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PColumnFamily;
import org.apache.phoenix.schema.PColumnImpl;
+import org.apache.phoenix.schema.PMetaDataEntity;
import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PName;
import org.apache.phoenix.schema.PNameFactory;
@@ -149,6 +173,7 @@ import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.KeyValueUtil;
import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.ServerUtil;
import org.slf4j.Logger;
@@ -156,6 +181,7 @@ import org.slf4j.LoggerFactory;
import com.google.common.cache.Cache;
import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
@@ -220,6 +246,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
static {
Collections.sort(TABLE_KV_COLUMNS, KeyValue.COMPARATOR);
}
+
private static final int TABLE_TYPE_INDEX = TABLE_KV_COLUMNS.indexOf(TABLE_TYPE_KV);
private static final int TABLE_SEQ_NUM_INDEX = TABLE_KV_COLUMNS.indexOf(TABLE_SEQ_NUM_KV);
private static final int COLUMN_COUNT_INDEX = TABLE_KV_COLUMNS.indexOf(COLUMN_COUNT_KV);
@@ -277,6 +304,52 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
private static final int LINK_TYPE_INDEX = 0;
+ private static final KeyValue CLASS_NAME_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, CLASS_NAME_BYTES);
+ private static final KeyValue JAR_PATH_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, JAR_PATH_BYTES);
+ private static final KeyValue RETURN_TYPE_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, RETURN_TYPE_BYTES);
+ private static final KeyValue NUM_ARGS_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, NUM_ARGS_BYTES);
+ private static final KeyValue TYPE_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, TYPE_BYTES);
+ private static final KeyValue IS_CONSTANT_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, IS_CONSTANT_BYTES);
+ private static final KeyValue DEFAULT_VALUE_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, DEFAULT_VALUE_BYTES);
+ private static final KeyValue MIN_VALUE_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, MIN_VALUE_BYTES);
+ private static final KeyValue MAX_VALUE_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, MAX_VALUE_BYTES);
+ private static final KeyValue IS_ARRAY_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, IS_ARRAY_BYTES);
+
+ private static final List<KeyValue> FUNCTION_KV_COLUMNS = Arrays.<KeyValue>asList(
+ EMPTY_KEYVALUE_KV,
+ CLASS_NAME_KV,
+ JAR_PATH_KV,
+ RETURN_TYPE_KV,
+ NUM_ARGS_KV
+ );
+ static {
+ Collections.sort(FUNCTION_KV_COLUMNS, KeyValue.COMPARATOR);
+ }
+
+ private static final int CLASS_NAME_INDEX = FUNCTION_KV_COLUMNS.indexOf(CLASS_NAME_KV);
+ private static final int JAR_PATH_INDEX = FUNCTION_KV_COLUMNS.indexOf(JAR_PATH_KV);
+ private static final int RETURN_TYPE_INDEX = FUNCTION_KV_COLUMNS.indexOf(RETURN_TYPE_KV);
+ private static final int NUM_ARGS_INDEX = FUNCTION_KV_COLUMNS.indexOf(NUM_ARGS_KV);
+
+ private static final List<KeyValue> FUNCTION_ARG_KV_COLUMNS = Arrays.<KeyValue>asList(
+ TYPE_KV,
+ IS_ARRAY_KV,
+ IS_CONSTANT_KV,
+ DEFAULT_VALUE_KV,
+ MIN_VALUE_KV,
+ MAX_VALUE_KV
+ );
+ static {
+ Collections.sort(FUNCTION_ARG_KV_COLUMNS, KeyValue.COMPARATOR);
+ }
+
+ private static final int TYPE_INDEX = FUNCTION_ARG_KV_COLUMNS.indexOf(TYPE_KV);
+ private static final int IS_ARRAY_INDEX = FUNCTION_ARG_KV_COLUMNS.indexOf(IS_ARRAY_KV);
+ private static final int IS_CONSTANT_INDEX = FUNCTION_ARG_KV_COLUMNS.indexOf(IS_CONSTANT_KV);
+ private static final int DEFAULT_VALUE_INDEX = FUNCTION_ARG_KV_COLUMNS.indexOf(DEFAULT_VALUE_KV);
+ private static final int MIN_VALUE_INDEX = FUNCTION_ARG_KV_COLUMNS.indexOf(MIN_VALUE_KV);
+ private static final int MAX_VALUE_INDEX = FUNCTION_ARG_KV_COLUMNS.indexOf(MAX_VALUE_KV);
+
private static PName newPName(byte[] keyBuffer, int keyOffset, int keyLength) {
if (keyLength <= 0) {
return null;
@@ -368,9 +441,9 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
Scan scan = MetaDataUtil.newTableRowsScan(key, MIN_TABLE_TIMESTAMP, clientTimeStamp);
RegionScanner scanner = region.getScanner(scan);
- Cache<ImmutableBytesPtr,PTable> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
+ Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
try {
- PTable oldTable = metaDataCache.getIfPresent(cacheKey);
+ PTable oldTable = (PTable)metaDataCache.getIfPresent(cacheKey);
long tableTimeStamp = oldTable == null ? MIN_TABLE_TIMESTAMP-1 : oldTable.getTimeStamp();
PTable newTable;
newTable = getTable(scanner, clientTimeStamp, tableTimeStamp);
@@ -393,6 +466,48 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
}
}
+ private List<PFunction> buildFunctions(List<byte[]> keys, HRegion region,
+ long clientTimeStamp) throws IOException, SQLException {
+ List<KeyRange> keyRanges = Lists.newArrayListWithExpectedSize(keys.size());
+ for (byte[] key : keys) {
+ byte[] stopKey = ByteUtil.concat(key, QueryConstants.SEPARATOR_BYTE_ARRAY);
+ ByteUtil.nextKey(stopKey, stopKey.length);
+ keyRanges.add(PVarbinary.INSTANCE.getKeyRange(key, true, stopKey, false));
+ }
+ Scan scan = new Scan();
+ scan.setTimeRange(MIN_TABLE_TIMESTAMP, clientTimeStamp);
+ ScanRanges scanRanges =
+ ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA,
+ Collections.singletonList(keyRanges), ScanUtil.SINGLE_COLUMN_SLOT_SPAN);
+ scanRanges.initializeScan(scan);
+ scan.setFilter(scanRanges.getSkipScanFilter());
+
+ RegionScanner scanner = region.getScanner(scan);
+
+ Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
+ List<PFunction> functions = new ArrayList<PFunction>();
+ PFunction function = null;
+ try {
+ for(int i = 0; i< keys.size(); i++) {
+ function = null;
+ function = getFunction(scanner);
+ if (function == null) {
+ return null;
+ }
+ byte[] functionKey =
+ SchemaUtil.getFunctionKey(
+ function.getTenantId() == null ? ByteUtil.EMPTY_BYTE_ARRAY : function
+ .getTenantId().getBytes(), Bytes.toBytes(function
+ .getFunctionName()));
+ metaDataCache.put(new FunctionBytesPtr(functionKey), function);
+ functions.add(function);
+ }
+ return functions;
+ } finally {
+ scanner.close();
+ }
+ }
+
private void addIndexToTable(PName tenantId, PName schemaName, PName indexName, PName tableName, long clientTimeStamp, List<PTable> indexes) throws IOException, SQLException {
byte[] key = SchemaUtil.getTableKey(tenantId == null ? ByteUtil.EMPTY_BYTE_ARRAY : tenantId.getBytes(), schemaName.getBytes(), indexName.getBytes());
PTable indexTable = doGetTable(key, clientTimeStamp);
@@ -473,6 +588,61 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
PColumn column = new PColumnImpl(colName, famName, dataType, maxLength, scale, isNullable, position-1, sortOrder, arraySize, viewConstant, isViewReferenced, expressionStr);
columns.add(column);
}
+
+ private void addArgumentToFunction(List<Cell> results, PName functionName, PName type,
+ Cell[] functionKeyValues, List<FunctionArgument> arguments, short argPosition) {
+ int i = 0;
+ int j = 0;
+ while (i < results.size() && j < FUNCTION_ARG_KV_COLUMNS.size()) {
+ Cell kv = results.get(i);
+ Cell searchKv = FUNCTION_ARG_KV_COLUMNS.get(j);
+ int cmp =
+ Bytes.compareTo(kv.getQualifierArray(), kv.getQualifierOffset(),
+ kv.getQualifierLength(), searchKv.getQualifierArray(),
+ searchKv.getQualifierOffset(), searchKv.getQualifierLength());
+ if (cmp == 0) {
+ functionKeyValues[j++] = kv;
+ i++;
+ } else if (cmp > 0) {
+ functionKeyValues[j++] = null;
+ } else {
+ i++; // shouldn't happen - means unexpected KV in system table column row
+ }
+ }
+
+ Cell isArrayKv = functionKeyValues[IS_ARRAY_INDEX];
+ boolean isArrayType =
+ isArrayKv == null ? false : Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(
+ isArrayKv.getValueArray(), isArrayKv.getValueOffset(),
+ isArrayKv.getValueLength()));
+ Cell isConstantKv = functionKeyValues[IS_CONSTANT_INDEX];
+ boolean isConstant =
+ isConstantKv == null ? false : Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(
+ isConstantKv.getValueArray(), isConstantKv.getValueOffset(),
+ isConstantKv.getValueLength()));
+ Cell defaultValueKv = functionKeyValues[DEFAULT_VALUE_INDEX];
+ String defaultValue =
+ defaultValueKv == null ? null : (String) PVarchar.INSTANCE.toObject(
+ defaultValueKv.getValueArray(), defaultValueKv.getValueOffset(),
+ defaultValueKv.getValueLength());
+ Cell minValueKv = functionKeyValues[MIN_VALUE_INDEX];
+ String minValue =
+ minValueKv == null ? null : (String) PVarchar.INSTANCE.toObject(
+ minValueKv.getValueArray(), minValueKv.getValueOffset(),
+ minValueKv.getValueLength());
+ Cell maxValueKv = functionKeyValues[MAX_VALUE_INDEX];
+ String maxValue =
+ maxValueKv == null ? null : (String) PVarchar.INSTANCE.toObject(
+ maxValueKv.getValueArray(), maxValueKv.getValueOffset(),
+ maxValueKv.getValueLength());
+ FunctionArgument arg =
+ new FunctionArgument(type.getString(), isArrayType, isConstant,
+ defaultValue == null ? null : LiteralExpression.newConstant(defaultValue),
+ minValue == null ? null : LiteralExpression.newConstant(minValue),
+ maxValue == null ? null : LiteralExpression.newConstant(maxValue),
+ argPosition);
+ arguments.add(arg);
+ }
private PTable getTable(RegionScanner scanner, long clientTimeStamp, long tableTimeStamp)
throws IOException, SQLException {
@@ -646,6 +816,106 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
disableWAL, multiTenant, storeNulls, viewType, viewIndexId, indexType, stats);
}
+ private PFunction getFunction(RegionScanner scanner)
+ throws IOException, SQLException {
+ List<Cell> results = Lists.newArrayList();
+ scanner.next(results);
+ if (results.isEmpty()) {
+ return null;
+ }
+ Cell[] functionKeyValues = new Cell[FUNCTION_KV_COLUMNS.size()];
+ Cell[] functionArgKeyValues = new Cell[FUNCTION_ARG_KV_COLUMNS.size()];
+
+ // Create PFunction based on KeyValues from scan
+ Cell keyValue = results.get(0);
+ byte[] keyBuffer = keyValue.getRowArray();
+ int keyLength = keyValue.getRowLength();
+ int keyOffset = keyValue.getRowOffset();
+ PName tenantId = newPName(keyBuffer, keyOffset, keyLength);
+ int tenantIdLength = (tenantId == null) ? 0 : tenantId.getBytes().length;
+ if (tenantIdLength == 0) {
+ tenantId = null;
+ }
+ PName functionName =
+ newPName(keyBuffer, keyOffset + tenantIdLength + 1, keyLength - tenantIdLength - 1);
+ int functionNameLength = functionName.getBytes().length+1;
+ int offset = tenantIdLength + functionNameLength + 1;
+
+ long timeStamp = keyValue.getTimestamp();
+
+ int i = 0;
+ int j = 0;
+ while (i < results.size() && j < FUNCTION_KV_COLUMNS.size()) {
+ Cell kv = results.get(i);
+ Cell searchKv = FUNCTION_KV_COLUMNS.get(j);
+ int cmp =
+ Bytes.compareTo(kv.getQualifierArray(), kv.getQualifierOffset(),
+ kv.getQualifierLength(), searchKv.getQualifierArray(),
+ searchKv.getQualifierOffset(), searchKv.getQualifierLength());
+ if (cmp == 0) {
+ timeStamp = Math.max(timeStamp, kv.getTimestamp()); // Find max timestamp of table
+ // header row
+ functionKeyValues[j++] = kv;
+ i++;
+ } else if (cmp > 0) {
+ timeStamp = Math.max(timeStamp, kv.getTimestamp());
+ functionKeyValues[j++] = null;
+ } else {
+ i++; // shouldn't happen - means unexpected KV in system table header row
+ }
+ }
+ // CLASS_NAME,NUM_ARGS and JAR_PATH are required.
+ if (functionKeyValues[CLASS_NAME_INDEX] == null || functionKeyValues[NUM_ARGS_INDEX] == null) {
+ throw new IllegalStateException(
+ "Didn't find expected key values for function row in metadata row");
+ }
+
+ Cell classNameKv = functionKeyValues[CLASS_NAME_INDEX];
+ PName className = newPName(classNameKv.getValueArray(), classNameKv.getValueOffset(),
+ classNameKv.getValueLength());
+ Cell jarPathKv = functionKeyValues[JAR_PATH_INDEX];
+ PName jarPath = null;
+ if(jarPathKv != null) {
+ jarPath = newPName(jarPathKv.getValueArray(), jarPathKv.getValueOffset(),
+ jarPathKv.getValueLength());
+ }
+ Cell numArgsKv = functionKeyValues[NUM_ARGS_INDEX];
+ int numArgs =
+ PInteger.INSTANCE.getCodec().decodeInt(numArgsKv.getValueArray(),
+ numArgsKv.getValueOffset(), SortOrder.getDefault());
+ Cell returnTypeKv = functionKeyValues[RETURN_TYPE_INDEX];
+ PName returnType =
+ returnTypeKv == null ? null : newPName(returnTypeKv.getValueArray(),
+ returnTypeKv.getValueOffset(), returnTypeKv.getValueLength());
+
+ List<FunctionArgument> arguments = Lists.newArrayListWithExpectedSize(numArgs);
+ for (int k = 0; k < numArgs; k++) {
+ results.clear();
+ scanner.next(results);
+ if (results.isEmpty()) {
+ break;
+ }
+ Cell typeKv = results.get(0);
+ int typeKeyLength = typeKv.getRowLength();
+ PName typeName =
+ newPName(typeKv.getRowArray(), typeKv.getRowOffset() + offset, typeKeyLength
+ - offset - 3);
+
+ int argPositionOffset = offset + typeName.getBytes().length + 1;
+ short argPosition = Bytes.toShort(typeKv.getRowArray(), typeKv.getRowOffset() + argPositionOffset, typeKeyLength
+ - argPositionOffset);
+ addArgumentToFunction(results, functionName, typeName, functionArgKeyValues, arguments, argPosition);
+ }
+ Collections.sort(arguments, new Comparator<FunctionArgument>() {
+ @Override
+ public int compare(FunctionArgument o1, FunctionArgument o2) {
+ return o1.getArgPosition() - o2.getArgPosition();
+ }
+ });
+ return new PFunction(tenantId, functionName.getString(), arguments, returnType.getString(),
+ className.getString(), jarPath == null ? null : jarPath.getString(), timeStamp);
+ }
+
private PTable buildDeletedTable(byte[] key, ImmutableBytesPtr cacheKey, HRegion region,
long clientTimeStamp) throws IOException {
if (clientTimeStamp == HConstants.LATEST_TIMESTAMP) {
@@ -663,7 +933,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
if (!results.isEmpty() && results.get(0).getTimestamp() > clientTimeStamp) {
Cell kv = results.get(0);
if (kv.getTypeByte() == Type.Delete.getCode()) {
- Cache<ImmutableBytesPtr, PTable> metaDataCache =
+ Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache =
GlobalCache.getInstance(this.env).getMetaDataCache();
PTable table = newDeletedTableMarker(kv.getTimestamp());
metaDataCache.put(cacheKey, table);
@@ -673,20 +943,57 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
return null;
}
+
+ private PFunction buildDeletedFunction(byte[] key, ImmutableBytesPtr cacheKey, HRegion region,
+ long clientTimeStamp) throws IOException {
+ if (clientTimeStamp == HConstants.LATEST_TIMESTAMP) {
+ return null;
+ }
+
+ Scan scan = MetaDataUtil.newTableRowsScan(key, clientTimeStamp, HConstants.LATEST_TIMESTAMP);
+ scan.setFilter(new FirstKeyOnlyFilter());
+ scan.setRaw(true);
+ List<Cell> results = Lists.<Cell> newArrayList();
+ try (RegionScanner scanner = region.getScanner(scan);) {
+ scanner.next(results);
+ }
+ // HBase ignores the time range on a raw scan (HBASE-7362)
+ if (!results.isEmpty() && results.get(0).getTimestamp() > clientTimeStamp) {
+ Cell kv = results.get(0);
+ if (kv.getTypeByte() == Type.Delete.getCode()) {
+ Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache =
+ GlobalCache.getInstance(this.env).getMetaDataCache();
+ PFunction function = newDeletedFunctionMarker(kv.getTimestamp());
+ metaDataCache.put(cacheKey, function);
+ return function;
+ }
+ }
+ return null;
+ }
+
+
private static PTable newDeletedTableMarker(long timestamp) {
return new PTableImpl(timestamp);
}
+ private static PFunction newDeletedFunctionMarker(long timestamp) {
+ return new PFunction(timestamp);
+ }
+
private static boolean isTableDeleted(PTable table) {
return table.getName() == null;
}
+ private static boolean isFunctionDeleted(PFunction function) {
+ return function.getFunctionName() == null;
+ }
+
private PTable loadTable(RegionCoprocessorEnvironment env, byte[] key,
ImmutableBytesPtr cacheKey, long clientTimeStamp, long asOfTimeStamp)
throws IOException, SQLException {
HRegion region = env.getRegion();
- Cache<ImmutableBytesPtr,PTable> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
- PTable table = metaDataCache.getIfPresent(cacheKey);
+ Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
+ PTable table = (PTable)metaDataCache.getIfPresent(cacheKey);
// We always cache the latest version - fault in if not in cache
if (table != null || (table = buildTable(key, cacheKey, region, asOfTimeStamp)) != null) {
return table;
@@ -700,6 +1007,29 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
return null;
}
+ private PFunction loadFunction(RegionCoprocessorEnvironment env, byte[] key,
+ ImmutableBytesPtr cacheKey, long clientTimeStamp, long asOfTimeStamp)
+ throws IOException, SQLException {
+ HRegion region = env.getRegion();
+ Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
+ PFunction function = (PFunction)metaDataCache.getIfPresent(cacheKey);
+ // We always cache the latest version - fault in if not in cache
+ if (function != null) {
+ return function;
+ }
+ ArrayList<byte[]> arrayList = new ArrayList<byte[]>(1);
+ arrayList.add(key);
+ List<PFunction> functions = buildFunctions(arrayList, region, asOfTimeStamp);
+ if(functions != null) return functions.get(0);
+ // if not found then check if newer table already exists and add delete marker for timestamp
+ // found
+ if (function == null
+ && (function = buildDeletedFunction(key, cacheKey, region, clientTimeStamp)) != null) {
+ return function;
+ }
+ return null;
+ }
+
@Override
public void createTable(RpcController controller, CreateTableRequest request,
@@ -801,7 +1131,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
// Invalidate the cache - the next getTable call will add it
// TODO: consider loading the table that was just created here, patching up the parent table, and updating the cache
- Cache<ImmutableBytesPtr,PTable> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
+ Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
if (parentCacheKey != null) {
metaDataCache.invalidate(parentCacheKey);
}
@@ -950,7 +1280,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
done.run(MetaDataMutationResult.toProto(result));
return;
}
- Cache<ImmutableBytesPtr,PTable> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
+ Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
// Commit the list of deletion.
region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]> emptySet());
long currentTime = MetaDataUtil.getClientTimeStamp(tableMetadata);
@@ -984,8 +1314,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
HRegion region = env.getRegion();
ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key);
- Cache<ImmutableBytesPtr,PTable> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
- PTable table = metaDataCache.getIfPresent(cacheKey);
+ Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
+ PTable table = (PTable)metaDataCache.getIfPresent(cacheKey);
// We always cache the latest version - fault in if not in cache
if (table != null
@@ -1132,8 +1462,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key);
List<ImmutableBytesPtr> invalidateList = new ArrayList<ImmutableBytesPtr>();
invalidateList.add(cacheKey);
- Cache<ImmutableBytesPtr,PTable> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
- PTable table = metaDataCache.getIfPresent(cacheKey);
+ Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
+ PTable table = (PTable)metaDataCache.getIfPresent(cacheKey);
if (logger.isDebugEnabled()) {
if (table == null) {
logger.debug("Table " + Bytes.toStringBinary(key)
@@ -1299,9 +1629,9 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
private PTable doGetTable(byte[] key, long clientTimeStamp, RowLock rowLock) throws IOException, SQLException {
ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key);
- Cache<ImmutableBytesPtr, PTable> metaDataCache =
+ Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache =
GlobalCache.getInstance(this.env).getMetaDataCache();
- PTable table = metaDataCache.getIfPresent(cacheKey);
+ PTable table = (PTable)metaDataCache.getIfPresent(cacheKey);
// We only cache the latest, so we'll end up building the table with every call if the
// client connection has specified an SCN.
// TODO: If we indicate to the client that we're returning an older version, but there's a
@@ -1333,7 +1663,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
}
try {
// Try cache again in case we were waiting on a lock
- table = metaDataCache.getIfPresent(cacheKey);
+ table = (PTable)metaDataCache.getIfPresent(cacheKey);
// We only cache the latest, so we'll end up building the table with every call if the
// client connection has specified an SCN.
// TODO: If we indicate to the client that we're returning an older version, but there's
@@ -1358,6 +1688,64 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
}
}
+ private List<PFunction> doGetFunctions(List<byte[]> keys, long clientTimeStamp) throws IOException, SQLException {
+ Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache =
+ GlobalCache.getInstance(this.env).getMetaDataCache();
+ HRegion region = env.getRegion();
+ Collections.sort(keys, new Comparator<byte[]>() {
+ @Override
+ public int compare(byte[] o1, byte[] o2) {
+ return Bytes.compareTo(o1, o2);
+ }
+ });
+ /*
+ * Lock directly on key, though it may be an index table. This will just prevent a table
+ * from getting rebuilt too often.
+ */
+ List<RowLock> rowLocks = new ArrayList<HRegion.RowLock>(keys.size());;
+ try {
+ rowLocks = new ArrayList<HRegion.RowLock>(keys.size());
+ for (int i = 0; i < keys.size(); i++) {
+ HRegion.RowLock rowLock = region.getRowLock(keys.get(i));
+ if (rowLock == null) {
+ throw new IOException("Failed to acquire lock on "
+ + Bytes.toStringBinary(keys.get(i)));
+ }
+ rowLocks.add(rowLock);
+ }
+
+ List<PFunction> functionsAvailable = new ArrayList<PFunction>(keys.size());
+ int numFunctions = keys.size();
+ Iterator<byte[]> iterator = keys.iterator();
+ while(iterator.hasNext()) {
+ byte[] key = iterator.next();
+ PFunction function = (PFunction)metaDataCache.getIfPresent(new FunctionBytesPtr(key));
+ if (function != null && function.getTimeStamp() < clientTimeStamp) {
+ if (isFunctionDeleted(function)) {
+ return null;
+ }
+ functionsAvailable.add(function);
+ iterator.remove();
+ }
+ }
+ if(functionsAvailable.size() == numFunctions) return functionsAvailable;
+
+ // Query for the latest table first, since it's not cached
+ List<PFunction> buildFunctions = buildFunctions(keys, region, clientTimeStamp);
+ if(buildFunctions == null || buildFunctions.isEmpty()) {
+ return null;
+ }
+ functionsAvailable.addAll(buildFunctions);
+ if(functionsAvailable.size() == numFunctions) return functionsAvailable;
+ return null;
+ } finally {
+ for (HRegion.RowLock lock : rowLocks) {
+ lock.release();
+ }
+ rowLocks.clear();
+ }
+ }
+
@Override
public void dropColumn(RpcController controller, DropColumnRequest request,
RpcCallback<MetaDataResponse> done) {
@@ -1478,7 +1866,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
public void clearCache(RpcController controller, ClearCacheRequest request,
RpcCallback<ClearCacheResponse> done) {
GlobalCache cache = GlobalCache.getInstance(this.env);
- Cache<ImmutableBytesPtr, PTable> metaDataCache =
+ Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache =
GlobalCache.getInstance(this.env).getMetaDataCache();
metaDataCache.invalidateAll();
cache.clearTenantCache();
@@ -1635,7 +2023,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
}
region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]> emptySet());
// Invalidate from cache
- Cache<ImmutableBytesPtr,PTable> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
+ Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
metaDataCache.invalidate(cacheKey);
if(dataTableKey != null) {
metaDataCache.invalidate(new ImmutableBytesPtr(dataTableKey));
@@ -1670,6 +2058,18 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
EnvironmentEdgeManager.currentTimeMillis(), null);
}
+ private static MetaDataMutationResult checkFunctionKeyInRegion(byte[] key, HRegion region) {
+ byte[] startKey = region.getStartKey();
+ byte[] endKey = region.getEndKey();
+ if (Bytes.compareTo(startKey, key) <= 0
+ && (Bytes.compareTo(HConstants.LAST_ROW, endKey) == 0 || Bytes.compareTo(key,
+ endKey) < 0)) {
+ return null; // normal case;
+ }
+ return new MetaDataMutationResult(MutationCode.FUNCTION_NOT_IN_REGION,
+ EnvironmentEdgeManager.currentTimeMillis(), null);
+ }
+
/**
* Certain operations, such as DROP TABLE are not allowed if there a table has child views. This class wraps the
* Results of a scanning the Phoenix Metadata for child views for a specific table and stores an additional flag for
@@ -1720,7 +2120,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
byte[] tenantId = request.getTenantId().toByteArray();
byte[] key = SchemaUtil.getTableKey(tenantId, schemaName, tableName);
ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key);
- Cache<ImmutableBytesPtr, PTable> metaDataCache =
+ Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache =
GlobalCache.getInstance(this.env).getMetaDataCache();
metaDataCache.invalidate(cacheKey);
} catch (Throwable t) {
@@ -1729,5 +2129,222 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
ServerUtil.createIOException(SchemaUtil.getTableName(schemaName, tableName), t));
}
}
+ @Override
+ public void getFunctions(RpcController controller, GetFunctionsRequest request,
+ RpcCallback<MetaDataResponse> done) {
+ MetaDataResponse.Builder builder = MetaDataResponse.newBuilder();
+ byte[] tenantId = request.getTenantId().toByteArray();
+ List<String> functionNames = new ArrayList<>(request.getFunctionNamesCount());
+ try {
+ HRegion region = env.getRegion();
+ List<ByteString> functionNamesList = request.getFunctionNamesList();
+ List<Long> functionTimestampsList = request.getFunctionTimestampsList();
+ List<byte[]> keys = new ArrayList<byte[]>(request.getFunctionNamesCount());
+ List<Pair<byte[], Long>> functions = new ArrayList<Pair<byte[], Long>>(request.getFunctionNamesCount());
+ for(int i = 0; i< functionNamesList.size();i++) {
+ byte[] functionName = functionNamesList.get(i).toByteArray();
+ functionNames.add(Bytes.toString(functionName));
+ byte[] key = SchemaUtil.getFunctionKey(tenantId, functionName);
+ MetaDataMutationResult result = checkFunctionKeyInRegion(key, region);
+ if (result != null) {
+ done.run(MetaDataMutationResult.toProto(result));
+ return;
+ }
+ functions.add(new Pair<byte[], Long>(functionName,functionTimestampsList.get(i)));
+ keys.add(key);
+ }
+
+ long currentTime = EnvironmentEdgeManager.currentTimeMillis();
+ List<PFunction> functionsAvailable = doGetFunctions(keys, request.getClientTimestamp());
+ if (functionsAvailable == null) {
+ builder.setReturnCode(MetaDataProtos.MutationCode.FUNCTION_NOT_FOUND);
+ builder.setMutationTime(currentTime);
+ done.run(builder.build());
+ return;
+ }
+ builder.setReturnCode(MetaDataProtos.MutationCode.FUNCTION_ALREADY_EXISTS);
+ builder.setMutationTime(currentTime);
+
+ for (PFunction function : functionsAvailable) {
+ builder.addFunction(PFunction.toProto(function));
+ }
+ done.run(builder.build());
+ return;
+ } catch (Throwable t) {
+ logger.error("getFunctions failed", t);
+ ProtobufUtil.setControllerException(controller,
+ ServerUtil.createIOException(functionNames.toString(), t));
+ }
+ }
+ @Override
+ public void createFunction(RpcController controller, CreateFunctionRequest request,
+ RpcCallback<MetaDataResponse> done) {
+ MetaDataResponse.Builder builder = MetaDataResponse.newBuilder();
+ byte[][] rowKeyMetaData = new byte[2][];
+ byte[] functionName = null;
+ try {
+ List<Mutation> functionMetaData = ProtobufUtil.getMutations(request);
+ boolean temporaryFunction = request.getTemporary();
+ MetaDataUtil.getTenantIdAndFunctionName(functionMetaData, rowKeyMetaData);
+ byte[] tenantIdBytes = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
+ functionName = rowKeyMetaData[PhoenixDatabaseMetaData.FUNTION_NAME_INDEX];
+ byte[] lockKey = SchemaUtil.getFunctionKey(tenantIdBytes, functionName);
+ HRegion region = env.getRegion();
+ MetaDataMutationResult result = checkFunctionKeyInRegion(lockKey, region);
+ if (result != null) {
+ done.run(MetaDataMutationResult.toProto(result));
+ return;
+ }
+ List<RowLock> locks = Lists.newArrayList();
+ long clientTimeStamp = MetaDataUtil.getClientTimeStamp(functionMetaData);
+ try {
+ acquireLock(region, lockKey, locks);
+ // Get as of latest timestamp so we can detect if we have a newer function that already
+ // exists without making an additional query
+ ImmutableBytesPtr cacheKey = new FunctionBytesPtr(lockKey);
+ PFunction function =
+ loadFunction(env, lockKey, cacheKey, clientTimeStamp, clientTimeStamp);
+ if (function != null) {
+ if (function.getTimeStamp() < clientTimeStamp) {
+ // If the function is older than the client time stamp and it's deleted,
+ // continue
+ if (!isFunctionDeleted(function)) {
+ builder.setReturnCode(MetaDataProtos.MutationCode.FUNCTION_ALREADY_EXISTS);
+ builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
+ builder.addFunction(PFunction.toProto(function));
+ done.run(builder.build());
+ return;
+ }
+ } else {
+ builder.setReturnCode(MetaDataProtos.MutationCode.NEWER_FUNCTION_FOUND);
+ builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
+ builder.addFunction(PFunction.toProto(function));
+ done.run(builder.build());
+ return;
+ }
+ }
+ // Don't store function info for temporary functions.
+ if(!temporaryFunction) {
+ region.mutateRowsWithLocks(functionMetaData, Collections.<byte[]> emptySet());
+ }
+
+ // Invalidate the cache - the next getFunction call will add it
+ // TODO: consider loading the function that was just created here, patching up the parent function, and updating the cache
+ Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
+ metaDataCache.invalidate(cacheKey);
+ // Get timeStamp from mutations - the above method sets it if it's unset
+ long currentTimeStamp = MetaDataUtil.getClientTimeStamp(functionMetaData);
+ builder.setReturnCode(MetaDataProtos.MutationCode.FUNCTION_NOT_FOUND);
+ builder.setMutationTime(currentTimeStamp);
+ done.run(builder.build());
+ return;
+ } finally {
+ region.releaseRowLocks(locks);
+ }
+ } catch (Throwable t) {
+ logger.error("createFunction failed", t);
+ ProtobufUtil.setControllerException(controller,
+ ServerUtil.createIOException(Bytes.toString(functionName), t));
+ }
+ }
+
+ @Override
+ public void dropFunction(RpcController controller, DropFunctionRequest request,
+ RpcCallback<MetaDataResponse> done) {
+ byte[][] rowKeyMetaData = new byte[2][];
+ byte[] functionName = null;
+ try {
+ List<Mutation> functionMetaData = ProtobufUtil.getMutations(request);
+ MetaDataUtil.getTenantIdAndFunctionName(functionMetaData, rowKeyMetaData);
+ byte[] tenantIdBytes = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
+ functionName = rowKeyMetaData[PhoenixDatabaseMetaData.FUNTION_NAME_INDEX];
+ byte[] lockKey = SchemaUtil.getFunctionKey(tenantIdBytes, functionName);
+ HRegion region = env.getRegion();
+ MetaDataMutationResult result = checkFunctionKeyInRegion(lockKey, region);
+ if (result != null) {
+ done.run(MetaDataMutationResult.toProto(result));
+ return;
+ }
+ List<RowLock> locks = Lists.newArrayList();
+ long clientTimeStamp = MetaDataUtil.getClientTimeStamp(functionMetaData);
+ try {
+ acquireLock(region, lockKey, locks);
+ ImmutableBytesPtr cacheKey = new FunctionBytesPtr(lockKey);
+ List<byte[]> keys = new ArrayList<byte[]>(1);
+ keys.add(lockKey);
+ List<ImmutableBytesPtr> invalidateList = new ArrayList<ImmutableBytesPtr>();
+
+ result = doDropFunction(clientTimeStamp, keys, functionMetaData, invalidateList);
+ if (result.getMutationCode() != MutationCode.FUNCTION_ALREADY_EXISTS) {
+ done.run(MetaDataMutationResult.toProto(result));
+ return;
+ }
+ region.mutateRowsWithLocks(functionMetaData, Collections.<byte[]> emptySet());
+
+ Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
+ long currentTime = MetaDataUtil.getClientTimeStamp(functionMetaData);
+ for(ImmutableBytesPtr ptr: invalidateList) {
+ metaDataCache.invalidate(ptr);
+ metaDataCache.put(ptr, newDeletedFunctionMarker(currentTime));
+
+ }
+
+ done.run(MetaDataMutationResult.toProto(result));
+ return;
+ } finally {
+ region.releaseRowLocks(locks);
+ }
+ } catch (Throwable t) {
+ logger.error("dropFunction failed", t);
+ ProtobufUtil.setControllerException(controller,
+ ServerUtil.createIOException(Bytes.toString(functionName), t));
+ }
+ }
+
+ private MetaDataMutationResult doDropFunction(long clientTimeStamp, List<byte[]> keys, List<Mutation> functionMetaData, List<ImmutableBytesPtr> invalidateList)
+ throws IOException, SQLException {
+ List<byte[]> keysClone = new ArrayList<byte[]>(keys);
+ List<PFunction> functions = doGetFunctions(keysClone, clientTimeStamp);
+ // We didn't find a table at the latest timestamp, so either there is no table or
+ // there was a table, but it's been deleted. In either case we want to return.
+ if (functions == null || functions.isEmpty()) {
+ if (buildDeletedFunction(keys.get(0), new FunctionBytesPtr(keys.get(0)), env.getRegion(), clientTimeStamp) != null) {
+ return new MetaDataMutationResult(MutationCode.FUNCTION_ALREADY_EXISTS, EnvironmentEdgeManager.currentTimeMillis(), null);
+ }
+ return new MetaDataMutationResult(MutationCode.FUNCTION_NOT_FOUND, EnvironmentEdgeManager.currentTimeMillis(), null);
+ }
+
+ if (functions != null && !functions.isEmpty()) {
+ if (functions.get(0).getTimeStamp() < clientTimeStamp) {
+ // If the function is older than the client time stamp and it's deleted,
+ // continue
+ if (isFunctionDeleted(functions.get(0))) {
+ return new MetaDataMutationResult(MutationCode.FUNCTION_NOT_FOUND,
+ EnvironmentEdgeManager.currentTimeMillis(), null);
+ }
+ invalidateList.add(new FunctionBytesPtr(keys.get(0)));
+ HRegion region = env.getRegion();
+ Scan scan = MetaDataUtil.newTableRowsScan(keys.get(0), MIN_TABLE_TIMESTAMP, clientTimeStamp);
+ List<Cell> results = Lists.newArrayList();
+ try (RegionScanner scanner = region.getScanner(scan);) {
+ scanner.next(results);
+ if (results.isEmpty()) { // Should not be possible
+ return new MetaDataMutationResult(MutationCode.FUNCTION_NOT_FOUND, EnvironmentEdgeManager.currentTimeMillis(), null);
+ }
+ do {
+ Cell kv = results.get(0);
+ Delete delete = new Delete(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(), clientTimeStamp);
+ functionMetaData.add(delete);
+ results.clear();
+ scanner.next(results);
+ } while (!results.isEmpty());
+ }
+ return new MetaDataMutationResult(MutationCode.FUNCTION_ALREADY_EXISTS,
+ EnvironmentEdgeManager.currentTimeMillis(), functions, true);
+ }
+ }
+ return new MetaDataMutationResult(MutationCode.FUNCTION_NOT_FOUND,
+ EnvironmentEdgeManager.currentTimeMillis(), null);
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
index 3ef6e80..2cca4bc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
@@ -17,13 +17,16 @@
*/
package org.apache.phoenix.coprocessor;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos;
+import org.apache.phoenix.coprocessor.generated.PFunctionProtos;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataService;
import org.apache.phoenix.hbase.index.util.VersionUtil;
+import org.apache.phoenix.parse.PFunction;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTableImpl;
@@ -87,6 +90,10 @@ public abstract class MetaDataProtocol extends MetaDataService {
UNALLOWED_TABLE_MUTATION,
NO_PK_COLUMNS,
PARENT_TABLE_NOT_FOUND,
+ FUNCTION_ALREADY_EXISTS,
+ FUNCTION_NOT_FOUND,
+ NEWER_FUNCTION_FOUND,
+ FUNCTION_NOT_IN_REGION,
NO_OP
};
@@ -98,6 +105,7 @@ public abstract class MetaDataProtocol extends MetaDataService {
private byte[] columnName;
private byte[] familyName;
private boolean wasUpdated;
+ private List<PFunction> functions = new ArrayList<PFunction>(1);
public MetaDataMutationResult() {
}
@@ -114,12 +122,19 @@ public abstract class MetaDataProtocol extends MetaDataService {
this(returnCode, currentTime, table, Collections.<byte[]> emptyList());
}
+ public MetaDataMutationResult(MutationCode returnCode, long currentTime, List<PFunction> functions, boolean wasUpdated) {
+ this.returnCode = returnCode;
+ this.mutationTime = currentTime;
+ this.functions = functions;
+ this.wasUpdated = wasUpdated;
+ }
+
// For testing, so that connectionless can set wasUpdated so ColumnResolver doesn't complain
public MetaDataMutationResult(MutationCode returnCode, long currentTime, PTable table, boolean wasUpdated) {
this(returnCode, currentTime, table, Collections.<byte[]> emptyList());
this.wasUpdated = wasUpdated;
}
-
+
public MetaDataMutationResult(MutationCode returnCode, long currentTime, PTable table, List<byte[]> tableNamesToDelete) {
this.returnCode = returnCode;
this.mutationTime = currentTime;
@@ -146,6 +161,10 @@ public abstract class MetaDataProtocol extends MetaDataService {
public void setTable(PTable table) {
this.table = table;
}
+
+ public void setFunction(PFunction function) {
+ this.functions.add(function);
+ }
public List<byte[]> getTableNamesToDelete() {
return tableNamesToDelete;
@@ -159,6 +178,10 @@ public abstract class MetaDataProtocol extends MetaDataService {
return familyName;
}
+ public List<PFunction> getFunctions() {
+ return functions;
+ }
+
public static MetaDataMutationResult constructFromProto(MetaDataResponse proto) {
MetaDataMutationResult result = new MetaDataMutationResult();
result.returnCode = MutationCode.values()[proto.getReturnCode().ordinal()];
@@ -167,6 +190,11 @@ public abstract class MetaDataProtocol extends MetaDataService {
result.wasUpdated = true;
result.table = PTableImpl.createFromProto(proto.getTable());
}
+ if (proto.getFunctionCount() > 0) {
+ result.wasUpdated = true;
+ for(PFunctionProtos.PFunction function: proto.getFunctionList())
+ result.functions.add(PFunction.createFromProto(function));
+ }
if (proto.getTablesToDeleteCount() > 0) {
result.tableNamesToDelete =
Lists.newArrayListWithExpectedSize(proto.getTablesToDeleteCount());