You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/01/27 23:15:59 UTC
[43/51] [partial] Initial commit of master branch from github
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateSequenceCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateSequenceCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateSequenceCompiler.java
new file mode 100644
index 0000000..1a22f11
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateSequenceCompiler.java
@@ -0,0 +1,213 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import java.sql.ParameterMetaData;
+import java.sql.SQLException;
+import java.util.Collections;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.parse.BindParseNode;
+import org.apache.phoenix.parse.CreateSequenceStatement;
+import org.apache.phoenix.parse.ParseNode;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.MetaDataClient;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.PDatum;
+
+
+public class CreateSequenceCompiler {
+ private final PhoenixStatement statement;
+
+ public CreateSequenceCompiler(PhoenixStatement statement) {
+ this.statement = statement;
+ }
+
+ private static class LongDatum implements PDatum {
+
+ @Override
+ public boolean isNullable() {
+ return false;
+ }
+
+ @Override
+ public PDataType getDataType() {
+ return PDataType.LONG;
+ }
+
+ @Override
+ public Integer getByteSize() {
+ return PDataType.LONG.getByteSize();
+ }
+
+ @Override
+ public Integer getMaxLength() {
+ return null;
+ }
+
+ @Override
+ public Integer getScale() {
+ return null;
+ }
+
+ @Override
+ public ColumnModifier getColumnModifier() {
+ return null;
+ }
+
+ }
+ private static class IntegerDatum implements PDatum {
+
+ @Override
+ public boolean isNullable() {
+ return false;
+ }
+
+ @Override
+ public PDataType getDataType() {
+ return PDataType.INTEGER;
+ }
+
+ @Override
+ public Integer getByteSize() {
+ return PDataType.INTEGER.getByteSize();
+ }
+
+ @Override
+ public Integer getMaxLength() {
+ return null;
+ }
+
+ @Override
+ public Integer getScale() {
+ return null;
+ }
+
+ @Override
+ public ColumnModifier getColumnModifier() {
+ return null;
+ }
+
+ }
+ private static final PDatum LONG_DATUM = new LongDatum();
+ private static final PDatum INTEGER_DATUM = new IntegerDatum();
+
+ public MutationPlan compile(final CreateSequenceStatement sequence) throws SQLException {
+ ParseNode startsWithNode = sequence.getStartWith();
+ ParseNode incrementByNode = sequence.getIncrementBy();
+ if (!startsWithNode.isStateless()) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.STARTS_WITH_MUST_BE_CONSTANT)
+ .setSchemaName(sequence.getSequenceName().getSchemaName())
+ .setTableName(sequence.getSequenceName().getTableName()).build().buildException();
+ }
+ if (!incrementByNode.isStateless()) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.INCREMENT_BY_MUST_BE_CONSTANT)
+ .setSchemaName(sequence.getSequenceName().getSchemaName())
+ .setTableName(sequence.getSequenceName().getTableName()).build().buildException();
+ }
+ ParseNode cacheNode = sequence.getCacheSize();
+ if (cacheNode != null && !cacheNode.isStateless()) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.CACHE_MUST_BE_NON_NEGATIVE_CONSTANT)
+ .setSchemaName(sequence.getSequenceName().getSchemaName())
+ .setTableName(sequence.getSequenceName().getTableName()).build().buildException();
+ }
+
+ final PhoenixConnection connection = statement.getConnection();
+ final ColumnResolver resolver = FromCompiler.EMPTY_TABLE_RESOLVER;
+
+ final StatementContext context = new StatementContext(statement, resolver, statement.getParameters(), new Scan());
+ if (startsWithNode instanceof BindParseNode) {
+ context.getBindManager().addParamMetaData((BindParseNode)startsWithNode, LONG_DATUM);
+ }
+ if (incrementByNode instanceof BindParseNode) {
+ context.getBindManager().addParamMetaData((BindParseNode)incrementByNode, LONG_DATUM);
+ }
+ if (cacheNode instanceof BindParseNode) {
+ context.getBindManager().addParamMetaData((BindParseNode)cacheNode, INTEGER_DATUM);
+ }
+ ExpressionCompiler expressionCompiler = new ExpressionCompiler(context);
+ Expression startsWithExpr = startsWithNode.accept(expressionCompiler);
+ ImmutableBytesWritable ptr = context.getTempPtr();
+ startsWithExpr.evaluate(null, ptr);
+ if (ptr.getLength() == 0 || !startsWithExpr.getDataType().isCoercibleTo(PDataType.LONG)) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.STARTS_WITH_MUST_BE_CONSTANT)
+ .setSchemaName(sequence.getSequenceName().getSchemaName())
+ .setTableName(sequence.getSequenceName().getTableName()).build().buildException();
+ }
+ final long startsWith = (Long)PDataType.LONG.toObject(ptr, startsWithExpr.getDataType());
+
+ Expression incrementByExpr = incrementByNode.accept(expressionCompiler);
+ incrementByExpr.evaluate(null, ptr);
+ if (ptr.getLength() == 0 || !incrementByExpr.getDataType().isCoercibleTo(PDataType.LONG)) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.INCREMENT_BY_MUST_BE_CONSTANT)
+ .setSchemaName(sequence.getSequenceName().getSchemaName())
+ .setTableName(sequence.getSequenceName().getTableName()).build().buildException();
+ }
+ final long incrementBy = (Long)PDataType.LONG.toObject(ptr, incrementByExpr.getDataType());
+
+ int cacheSizeValue = connection.getQueryServices().getProps().getInt(QueryServices.SEQUENCE_CACHE_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_SEQUENCE_CACHE_SIZE);
+ if (cacheNode != null) {
+ Expression cacheSizeExpr = cacheNode.accept(expressionCompiler);
+ cacheSizeExpr.evaluate(null, ptr);
+ if (ptr.getLength() != 0 && (!cacheSizeExpr.getDataType().isCoercibleTo(PDataType.INTEGER) || (cacheSizeValue = (Integer)PDataType.INTEGER.toObject(ptr)) < 0)) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.CACHE_MUST_BE_NON_NEGATIVE_CONSTANT)
+ .setSchemaName(sequence.getSequenceName().getSchemaName())
+ .setTableName(sequence.getSequenceName().getTableName()).build().buildException();
+ }
+ }
+ final int cacheSize = Math.max(1, cacheSizeValue);
+
+
+ final MetaDataClient client = new MetaDataClient(connection);
+ return new MutationPlan() {
+
+ @Override
+ public MutationState execute() throws SQLException {
+ return client.createSequence(sequence, startsWith, incrementBy, cacheSize);
+ }
+
+ @Override
+ public ExplainPlan getExplainPlan() throws SQLException {
+ return new ExplainPlan(Collections.singletonList("CREATE SEQUENCE"));
+ }
+
+ @Override
+ public PhoenixConnection getConnection() {
+ return connection;
+ }
+
+ @Override
+ public ParameterMetaData getParameterMetaData() {
+ return context.getBindManager().getParameterMetaData();
+ }
+
+ };
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java
new file mode 100644
index 0000000..12c60c8
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java
@@ -0,0 +1,245 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import java.sql.ParameterMetaData;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.expression.AndExpression;
+import org.apache.phoenix.expression.ComparisonExpression;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.IsNullExpression;
+import org.apache.phoenix.expression.KeyValueColumnExpression;
+import org.apache.phoenix.expression.LiteralExpression;
+import org.apache.phoenix.expression.RowKeyColumnExpression;
+import org.apache.phoenix.expression.visitor.TraverseNoExpressionVisitor;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.parse.CreateTableStatement;
+import org.apache.phoenix.parse.ParseNode;
+import org.apache.phoenix.parse.SQLParser;
+import org.apache.phoenix.parse.SelectStatement;
+import org.apache.phoenix.parse.TableName;
+import org.apache.phoenix.parse.WildcardParseNode;
+import org.apache.phoenix.query.DelegateConnectionQueryServices;
+import org.apache.phoenix.schema.MetaDataClient;
+import org.apache.phoenix.schema.PMetaData;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTable.ViewType;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.util.ByteUtil;
+
+import com.google.common.collect.Iterators;
+
+
+public class CreateTableCompiler {
+ private static final String SELECT = "SELECT";
+ private static final String FROM = "FROM";
+ private static final String WHERE = "WHERE";
+
+ private final PhoenixStatement statement;
+
+ public CreateTableCompiler(PhoenixStatement statement) {
+ this.statement = statement;
+ }
+
+ public MutationPlan compile(final CreateTableStatement create) throws SQLException {
+ final PhoenixConnection connection = statement.getConnection();
+ ColumnResolver resolver = FromCompiler.getResolver(create, connection);
+ PTableType type = create.getTableType();
+ PhoenixConnection connectionToBe = connection;
+ PTable parentToBe = null;
+ ViewType viewTypeToBe = null;
+ Scan scan = new Scan();
+ final StatementContext context = new StatementContext(statement, resolver, statement.getParameters(), scan);
+ ExpressionCompiler expressionCompiler = new ExpressionCompiler(context);
+ // TODO: support any statement for a VIEW instead of just a WHERE clause
+ ParseNode whereNode = create.getWhereClause();
+ String viewStatementToBe = null;
+ if (type == PTableType.VIEW) {
+ TableRef tableRef = resolver.getTables().get(0);
+ parentToBe = tableRef.getTable();
+ viewTypeToBe = parentToBe.getViewType() == ViewType.MAPPED ? ViewType.MAPPED : ViewType.UPDATABLE;
+ if (whereNode == null) {
+ viewStatementToBe = parentToBe.getViewStatement();
+ } else {
+ whereNode = StatementNormalizer.normalize(whereNode, resolver);
+ if (whereNode.isStateless()) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.VIEW_WHERE_IS_CONSTANT)
+ .build().buildException();
+ }
+ // If our parent has a VIEW statement, combine it with this one
+ if (parentToBe.getViewStatement() != null) {
+ SelectStatement select = new SQLParser(parentToBe.getViewStatement()).parseQuery().combine(whereNode);
+ whereNode = select.getWhere();
+ }
+ Expression where = whereNode.accept(expressionCompiler);
+ if (where != null && !LiteralExpression.isTrue(where)) {
+ TableName baseTableName = create.getBaseTableName();
+ String schemaName = baseTableName.getSchemaName();
+ // Only form we currently support for VIEWs: SELECT * FROM t WHERE ...
+ viewStatementToBe = SELECT + " " + WildcardParseNode.NAME + " " + FROM +
+ (schemaName == null ? "" : "\"" + schemaName + "\".") +
+ (" \"" + baseTableName.getTableName() + "\" ") +
+ (WHERE + " " + where.toString());
+ }
+ if (viewTypeToBe != ViewType.MAPPED) {
+ Long scn = connection.getSCN();
+ connectionToBe = scn != null ? connection :
+ // If we haved no SCN on our connection, freeze the SCN at when
+ // the base table was resolved to prevent any race condition on
+ // the error checking we do for the base table. The only potential
+ // issue is if the base table lives on a different region server
+ // than the new table will, then we're relying here on the system
+ // clocks being in sync.
+ new PhoenixConnection(
+ // When the new table is created, we still want to cache it
+ // on our connection.
+ new DelegateConnectionQueryServices(connection.getQueryServices()) {
+ @Override
+ public PMetaData addTable(PTable table) throws SQLException {
+ return connection.addTable(table);
+ }
+ },
+ connection, tableRef.getTimeStamp());
+ ViewWhereExpressionVisitor visitor = new ViewWhereExpressionVisitor();
+ where.accept(visitor);
+ viewTypeToBe = visitor.isUpdatable() ? ViewType.UPDATABLE : ViewType.READ_ONLY;
+ }
+ }
+ }
+ final ViewType viewType = viewTypeToBe;
+ final String viewStatement = viewStatementToBe;
+ List<ParseNode> splitNodes = create.getSplitNodes();
+ final byte[][] splits = new byte[splitNodes.size()][];
+ ImmutableBytesWritable ptr = context.getTempPtr();
+ for (int i = 0; i < splits.length; i++) {
+ ParseNode node = splitNodes.get(i);
+ if (node.isStateless()) {
+ Expression expression = node.accept(expressionCompiler);
+ if (expression.evaluate(null, ptr)) {;
+ splits[i] = ByteUtil.copyKeyBytesIfNecessary(ptr);
+ continue;
+ }
+ }
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.SPLIT_POINT_NOT_CONSTANT)
+ .setMessage("Node: " + node).build().buildException();
+ }
+ final MetaDataClient client = new MetaDataClient(connectionToBe);
+ final PTable parent = parentToBe;
+
+ return new MutationPlan() {
+
+ @Override
+ public ParameterMetaData getParameterMetaData() {
+ return context.getBindManager().getParameterMetaData();
+ }
+
+ @Override
+ public MutationState execute() throws SQLException {
+ try {
+ return client.createTable(create, splits, parent, viewStatement, viewType);
+ } finally {
+ if (client.getConnection() != connection) {
+ client.getConnection().close();
+ }
+ }
+ }
+
+ @Override
+ public ExplainPlan getExplainPlan() throws SQLException {
+ return new ExplainPlan(Collections.singletonList("CREATE TABLE"));
+ }
+
+ @Override
+ public PhoenixConnection getConnection() {
+ return connection;
+ }
+
+ };
+ }
+
+ private static class ViewWhereExpressionVisitor extends TraverseNoExpressionVisitor<Boolean> {
+ private boolean isUpdatable = true;
+
+ public boolean isUpdatable() {
+ return isUpdatable;
+ }
+
+ @Override
+ public Boolean defaultReturn(Expression node, List<Boolean> l) {
+ // We only hit this if we're trying to traverse somewhere
+ // in which we don't have a visitLeave that returns non null
+ isUpdatable = false;
+ return null;
+ }
+
+ @Override
+ public Iterator<Expression> visitEnter(AndExpression node) {
+ return node.getChildren().iterator();
+ }
+
+ @Override
+ public Boolean visitLeave(AndExpression node, List<Boolean> l) {
+ return l.isEmpty() ? null : Boolean.TRUE;
+ }
+
+ @Override
+ public Iterator<Expression> visitEnter(ComparisonExpression node) {
+ return node.getFilterOp() == CompareOp.EQUAL && node.getChildren().get(1).isStateless() && node.getChildren().get(1).isDeterministic() ? Iterators.singletonIterator(node.getChildren().get(0)) : super.visitEnter(node);
+ }
+
+ @Override
+ public Boolean visitLeave(ComparisonExpression node, List<Boolean> l) {
+ return l.isEmpty() ? null : Boolean.TRUE;
+ }
+
+ @Override
+ public Iterator<Expression> visitEnter(IsNullExpression node) {
+ return node.isNegate() ? super.visitEnter(node) : node.getChildren().iterator();
+ }
+
+ @Override
+ public Boolean visitLeave(IsNullExpression node, List<Boolean> l) {
+ return l.isEmpty() ? null : Boolean.TRUE;
+ }
+
+ @Override
+ public Boolean visit(RowKeyColumnExpression node) {
+ return Boolean.TRUE;
+ }
+
+ @Override
+ public Boolean visit(KeyValueColumnExpression node) {
+ return Boolean.TRUE;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
new file mode 100644
index 0000000..a73c111
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -0,0 +1,360 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import java.sql.ParameterMetaData;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
+import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.execute.AggregatePlan;
+import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.index.IndexMetaDataCacheClient;
+import org.apache.phoenix.index.PhoenixIndexCodec;
+import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixResultSet;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.optimize.QueryOptimizer;
+import org.apache.phoenix.parse.AliasedNode;
+import org.apache.phoenix.parse.DeleteStatement;
+import org.apache.phoenix.parse.HintNode;
+import org.apache.phoenix.parse.HintNode.Hint;
+import org.apache.phoenix.parse.ParseNode;
+import org.apache.phoenix.parse.ParseNodeFactory;
+import org.apache.phoenix.parse.SelectStatement;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.PRow;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.ReadOnlyTableException;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.IndexUtil;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class DeleteCompiler {
+ private static ParseNodeFactory FACTORY = new ParseNodeFactory();
+
+ private final PhoenixStatement statement;
+
+ public DeleteCompiler(PhoenixStatement statement) {
+ this.statement = statement;
+ }
+
+ private static MutationState deleteRows(PhoenixStatement statement, TableRef tableRef, ResultIterator iterator, RowProjector projector) throws SQLException {
+ PhoenixConnection connection = statement.getConnection();
+ byte[] tenantId = connection.getTenantId() == null ? null : connection.getTenantId().getBytes();
+ final boolean isAutoCommit = connection.getAutoCommit();
+ ConnectionQueryServices services = connection.getQueryServices();
+ final int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
+ final int batchSize = Math.min(connection.getMutateBatchSize(), maxSize);
+ Map<ImmutableBytesPtr,Map<PColumn,byte[]>> mutations = Maps.newHashMapWithExpectedSize(batchSize);
+ try {
+ PTable table = tableRef.getTable();
+ List<PColumn> pkColumns = table.getPKColumns();
+ boolean isMultiTenant = table.isMultiTenant() && tenantId != null;
+ int offset = (table.getBucketNum() == null ? 0 : 1) + (isMultiTenant ? 1 : 0); // Take into account salting and multi-tenant
+ byte[][] values = new byte[pkColumns.size()][];
+ if (isMultiTenant) {
+ values[offset-1] = tenantId;
+ }
+ ResultSet rs = new PhoenixResultSet(iterator, projector, statement);
+ int rowCount = 0;
+ while (rs.next()) {
+ for (int i = offset; i < values.length; i++) {
+ byte[] byteValue = rs.getBytes(i+1-offset);
+ // The ResultSet.getBytes() call will have inverted it - we need to invert it back.
+ // TODO: consider going under the hood and just getting the bytes
+ if (pkColumns.get(i).getColumnModifier() == ColumnModifier.SORT_DESC) {
+ byte[] tempByteValue = Arrays.copyOf(byteValue, byteValue.length);
+ byteValue = ColumnModifier.SORT_DESC.apply(byteValue, 0, tempByteValue, 0, byteValue.length);
+ }
+ values[i] = byteValue;
+ }
+ ImmutableBytesPtr ptr = new ImmutableBytesPtr();
+ table.newKey(ptr, values);
+ mutations.put(ptr, PRow.DELETE_MARKER);
+ if (mutations.size() > maxSize) {
+ throw new IllegalArgumentException("MutationState size of " + mutations.size() + " is bigger than max allowed size of " + maxSize);
+ }
+ rowCount++;
+ // Commit a batch if auto commit is true and we're at our batch size
+ if (isAutoCommit && rowCount % batchSize == 0) {
+ MutationState state = new MutationState(tableRef, mutations, 0, maxSize, connection);
+ connection.getMutationState().join(state);
+ connection.commit();
+ mutations.clear();
+ }
+ }
+
+ // If auto commit is true, this last batch will be committed upon return
+ return new MutationState(tableRef, mutations, rowCount / batchSize * batchSize, maxSize, connection);
+ } finally {
+ iterator.close();
+ }
+ }
+
+ private static class DeletingParallelIteratorFactory extends MutatingParallelIteratorFactory {
+ private RowProjector projector;
+
+ private DeletingParallelIteratorFactory(PhoenixConnection connection, TableRef tableRef) {
+ super(connection, tableRef);
+ }
+
+ @Override
+ protected MutationState mutate(PhoenixConnection connection, ResultIterator iterator) throws SQLException {
+ PhoenixStatement statement = new PhoenixStatement(connection);
+ return deleteRows(statement, tableRef, iterator, projector);
+ }
+
+ public void setRowProjector(RowProjector projector) {
+ this.projector = projector;
+ }
+
+ }
+
+ private boolean hasImmutableIndex(TableRef tableRef) {
+ return tableRef.getTable().isImmutableRows() && !tableRef.getTable().getIndexes().isEmpty();
+ }
+
+ private boolean hasImmutableIndexWithKeyValueColumns(TableRef tableRef) {
+ if (!hasImmutableIndex(tableRef)) {
+ return false;
+ }
+ for (PTable index : tableRef.getTable().getIndexes()) {
+ for (PColumn column : index.getPKColumns()) {
+ if (!IndexUtil.isDataPKColumn(column)) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ public MutationPlan compile(DeleteStatement delete) throws SQLException {
+ final PhoenixConnection connection = statement.getConnection();
+ final boolean isAutoCommit = connection.getAutoCommit();
+ final ConnectionQueryServices services = connection.getQueryServices();
+ final ColumnResolver resolver = FromCompiler.getResolver(delete, connection);
+ final TableRef tableRef = resolver.getTables().get(0);
+ PTable table = tableRef.getTable();
+ if (table.getType() == PTableType.VIEW && table.getViewType().isReadOnly()) {
+ throw new ReadOnlyTableException(table.getSchemaName().getString(),table.getTableName().getString());
+ }
+
+ final boolean hasLimit = delete.getLimit() != null;
+ boolean noQueryReqd = !hasLimit && !hasImmutableIndex(tableRef);
+ boolean runOnServer = isAutoCommit && noQueryReqd;
+ HintNode hint = delete.getHint();
+ if (runOnServer && !delete.getHint().hasHint(Hint.USE_INDEX_OVER_DATA_TABLE)) {
+ hint = HintNode.create(hint, Hint.USE_DATA_OVER_INDEX_TABLE);
+ }
+
+ List<AliasedNode> aliasedNodes = Lists.newArrayListWithExpectedSize(table.getPKColumns().size());
+ boolean isSalted = table.getBucketNum() != null;
+ boolean isMultiTenant = connection.getTenantId() != null && table.isMultiTenant();
+ for (int i = (isSalted ? 1 : 0) + (isMultiTenant ? 1 : 0); i < table.getPKColumns().size(); i++) {
+ PColumn column = table.getPKColumns().get(i);
+ aliasedNodes.add(FACTORY.aliasedNode(null, FACTORY.column(null, '"' + column.getName().getString() + '"', null)));
+ }
+ SelectStatement select = FACTORY.select(
+ Collections.singletonList(delete.getTable()),
+ hint, false, aliasedNodes, delete.getWhere(),
+ Collections.<ParseNode>emptyList(), null,
+ delete.getOrderBy(), delete.getLimit(),
+ delete.getBindCount(), false);
+ DeletingParallelIteratorFactory parallelIteratorFactory = hasLimit ? null : new DeletingParallelIteratorFactory(connection, tableRef);
+ final QueryPlan plan = new QueryOptimizer(services).optimize(select, statement, Collections.<PColumn>emptyList(), parallelIteratorFactory);
+ if (!plan.getTableRef().equals(tableRef)) {
+ runOnServer = false;
+ noQueryReqd = false;
+ }
+
+ final int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
+
+ if (hasImmutableIndexWithKeyValueColumns(tableRef)) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.NO_DELETE_IF_IMMUTABLE_INDEX).setSchemaName(tableRef.getTable().getSchemaName().getString())
+ .setTableName(tableRef.getTable().getTableName().getString()).build().buildException();
+ }
+
+ final StatementContext context = plan.getContext();
+ // If we're doing a query for a single row with no where clause, then we don't need to contact the server at all.
+ // A simple check of the none existence of a where clause in the parse node is not sufficient, as the where clause
+ // may have been optimized out.
+ if (noQueryReqd && context.isSingleRowScan()) {
+ final ImmutableBytesPtr key = new ImmutableBytesPtr(context.getScan().getStartRow());
+ return new MutationPlan() {
+
+ @Override
+ public ParameterMetaData getParameterMetaData() {
+ return context.getBindManager().getParameterMetaData();
+ }
+
+ @Override
+ public MutationState execute() {
+ Map<ImmutableBytesPtr,Map<PColumn,byte[]>> mutation = Maps.newHashMapWithExpectedSize(1);
+ mutation.put(key, PRow.DELETE_MARKER);
+ return new MutationState(tableRef, mutation, 0, maxSize, connection);
+ }
+
+ @Override
+ public ExplainPlan getExplainPlan() throws SQLException {
+ return new ExplainPlan(Collections.singletonList("DELETE SINGLE ROW"));
+ }
+
+ @Override
+ public PhoenixConnection getConnection() {
+ return connection;
+ }
+ };
+ } else if (runOnServer) {
+ // TODO: better abstraction
+ Scan scan = context.getScan();
+ scan.setAttribute(UngroupedAggregateRegionObserver.DELETE_AGG, QueryConstants.TRUE);
+
+ // Build an ungrouped aggregate query: select COUNT(*) from <table> where <where>
+ // The coprocessor will delete each row returned from the scan
+ // Ignoring ORDER BY, since with auto commit on and no limit makes no difference
+ SelectStatement aggSelect = SelectStatement.create(SelectStatement.COUNT_ONE, delete.getHint());
+ final RowProjector projector = ProjectionCompiler.compile(context, aggSelect, GroupBy.EMPTY_GROUP_BY);
+ final QueryPlan aggPlan = new AggregatePlan(context, select, tableRef, projector, null, OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null);
+ return new MutationPlan() {
+
+ @Override
+ public PhoenixConnection getConnection() {
+ return connection;
+ }
+
+ @Override
+ public ParameterMetaData getParameterMetaData() {
+ return context.getBindManager().getParameterMetaData();
+ }
+
+ @Override
+ public MutationState execute() throws SQLException {
+ // TODO: share this block of code with UPSERT SELECT
+ ImmutableBytesWritable ptr = context.getTempPtr();
+ tableRef.getTable().getIndexMaintainers(ptr);
+ ServerCache cache = null;
+ try {
+ if (ptr.getLength() > 0) {
+ IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection, tableRef);
+ cache = client.addIndexMetadataCache(context.getScanRanges(), ptr);
+ byte[] uuidValue = cache.getId();
+ context.getScan().setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
+ }
+ ResultIterator iterator = aggPlan.iterator();
+ try {
+ Tuple row = iterator.next();
+ final long mutationCount = (Long)projector.getColumnProjector(0).getValue(row, PDataType.LONG, ptr);
+ return new MutationState(maxSize, connection) {
+ @Override
+ public long getUpdateCount() {
+ return mutationCount;
+ }
+ };
+ } finally {
+ iterator.close();
+ }
+ } finally {
+ if (cache != null) {
+ cache.close();
+ }
+ }
+ }
+
+ @Override
+ public ExplainPlan getExplainPlan() throws SQLException {
+ List<String> queryPlanSteps = aggPlan.getExplainPlan().getPlanSteps();
+ List<String> planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1);
+ planSteps.add("DELETE ROWS");
+ planSteps.addAll(queryPlanSteps);
+ return new ExplainPlan(planSteps);
+ }
+ };
+ } else {
+ if (parallelIteratorFactory != null) {
+ parallelIteratorFactory.setRowProjector(plan.getProjector());
+ }
+ return new MutationPlan() {
+
+ @Override
+ public PhoenixConnection getConnection() {
+ return connection;
+ }
+
+ @Override
+ public ParameterMetaData getParameterMetaData() {
+ return context.getBindManager().getParameterMetaData();
+ }
+
+ @Override
+ public MutationState execute() throws SQLException {
+ ResultIterator iterator = plan.iterator();
+ if (!hasLimit) {
+ Tuple tuple;
+ long totalRowCount = 0;
+ while ((tuple=iterator.next()) != null) {// Runs query
+ KeyValue kv = tuple.getValue(0);
+ totalRowCount += PDataType.LONG.getCodec().decodeLong(kv.getBuffer(), kv.getValueOffset(), null);
+ }
+ // Return total number of rows that have been delete. In the case of auto commit being off
+ // the mutations will all be in the mutation state of the current connection.
+ return new MutationState(maxSize, connection, totalRowCount);
+ } else {
+ return deleteRows(statement, tableRef, iterator, plan.getProjector());
+ }
+ }
+
+ @Override
+ public ExplainPlan getExplainPlan() throws SQLException {
+ List<String> queryPlanSteps = plan.getExplainPlan().getPlanSteps();
+ List<String> planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1);
+ planSteps.add("DELETE ROWS");
+ planSteps.addAll(queryPlanSteps);
+ return new ExplainPlan(planSteps);
+ }
+ };
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/compile/DropSequenceCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DropSequenceCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DropSequenceCompiler.java
new file mode 100644
index 0000000..be53f6b
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DropSequenceCompiler.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import java.sql.ParameterMetaData;
+import java.sql.SQLException;
+import java.util.Collections;
+
+import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixParameterMetaData;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.parse.DropSequenceStatement;
+import org.apache.phoenix.schema.MetaDataClient;
+
+
+public class DropSequenceCompiler {
+ private final PhoenixStatement statement;
+
+ public DropSequenceCompiler(PhoenixStatement statement) {
+ this.statement = statement;
+ }
+
+
+ public MutationPlan compile(final DropSequenceStatement sequence) throws SQLException {
+ final PhoenixConnection connection = statement.getConnection();
+ final MetaDataClient client = new MetaDataClient(connection);
+ return new MutationPlan() {
+
+ @Override
+ public MutationState execute() throws SQLException {
+ return client.dropSequence(sequence);
+ }
+
+ @Override
+ public ExplainPlan getExplainPlan() throws SQLException {
+ return new ExplainPlan(Collections.singletonList("DROP SEQUENCE"));
+ }
+
+ @Override
+ public PhoenixConnection getConnection() {
+ return connection;
+ }
+
+ @Override
+ public ParameterMetaData getParameterMetaData() {
+ return PhoenixParameterMetaData.EMPTY_PARAMETER_META_DATA;
+ }
+
+ };
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/compile/ExplainPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ExplainPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ExplainPlan.java
new file mode 100644
index 0000000..e1049a0
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ExplainPlan.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import java.util.Collections;
+import java.util.List;
+
+import com.google.common.collect.ImmutableList;
+
+public class ExplainPlan {
+ public static final ExplainPlan EMPTY_PLAN = new ExplainPlan(Collections.<String>emptyList());
+
+ private final List<String> planSteps;
+
+ public ExplainPlan(List<String> planSteps) {
+ this.planSteps = ImmutableList.copyOf(planSteps);
+ }
+
+ public List<String> getPlanSteps() {
+ return planSteps;
+ }
+}