You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/01/27 20:23:36 UTC
[35/51] [partial] Initial commit
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java b/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
new file mode 100644
index 0000000..29c558f
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
@@ -0,0 +1,312 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.WritableUtils;
+
+import com.google.common.collect.Lists;
+import org.apache.phoenix.cache.GlobalCache;
+import org.apache.phoenix.cache.TenantCache;
+import org.apache.phoenix.expression.OrderByExpression;
+import org.apache.phoenix.iterate.OrderedResultIterator;
+import org.apache.phoenix.iterate.RegionScannerResultIterator;
+import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.join.HashJoinInfo;
+import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.ScanUtil;
+import org.apache.phoenix.util.ServerUtil;
+
+
+/**
+ *
+ * Wraps the scan performing a non aggregate query to prevent needless retries
+ * if a Phoenix bug is encountered from our custom filter expression evaluation.
+ * Unfortunately, until HBASE-7481 gets fixed, there's no way to do this from our
+ * custom filters.
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class ScanRegionObserver extends BaseScannerRegionObserver {
+ public static final String NON_AGGREGATE_QUERY = "NonAggregateQuery";
+ private static final String TOPN = "TopN";
+
+ public static void serializeIntoScan(Scan scan, int thresholdBytes, int limit, List<OrderByExpression> orderByExpressions, int estimatedRowSize) {
+ ByteArrayOutputStream stream = new ByteArrayOutputStream(); // TODO: size?
+ try {
+ DataOutputStream output = new DataOutputStream(stream);
+ WritableUtils.writeVInt(output, thresholdBytes);
+ WritableUtils.writeVInt(output, limit);
+ WritableUtils.writeVInt(output, estimatedRowSize);
+ WritableUtils.writeVInt(output, orderByExpressions.size());
+ for (OrderByExpression orderingCol : orderByExpressions) {
+ orderingCol.write(output);
+ }
+ scan.setAttribute(TOPN, stream.toByteArray());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } finally {
+ try {
+ stream.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ public static OrderedResultIterator deserializeFromScan(Scan scan, RegionScanner s) {
+ byte[] topN = scan.getAttribute(TOPN);
+ if (topN == null) {
+ return null;
+ }
+ ByteArrayInputStream stream = new ByteArrayInputStream(topN); // TODO: size?
+ try {
+ DataInputStream input = new DataInputStream(stream);
+ int thresholdBytes = WritableUtils.readVInt(input);
+ int limit = WritableUtils.readVInt(input);
+ int estimatedRowSize = WritableUtils.readVInt(input);
+ int size = WritableUtils.readVInt(input);
+ List<OrderByExpression> orderByExpressions = Lists.newArrayListWithExpectedSize(size);
+ for (int i = 0; i < size; i++) {
+ OrderByExpression orderByExpression = new OrderByExpression();
+ orderByExpression.readFields(input);
+ orderByExpressions.add(orderByExpression);
+ }
+ ResultIterator inner = new RegionScannerResultIterator(s);
+ return new OrderedResultIterator(inner, orderByExpressions, thresholdBytes, limit >= 0 ? limit : null, estimatedRowSize);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } finally {
+ try {
+ stream.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @Override
+ protected RegionScanner doPostScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final RegionScanner s) throws Throwable {
+ byte[] isScanQuery = scan.getAttribute(NON_AGGREGATE_QUERY);
+
+ if (isScanQuery == null || Bytes.compareTo(PDataType.FALSE_BYTES, isScanQuery) == 0) {
+ return s;
+ }
+
+ final ScanProjector p = ScanProjector.deserializeProjectorFromScan(scan);
+ final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan);
+ final OrderedResultIterator iterator = deserializeFromScan(scan,s);
+ final ImmutableBytesWritable tenantId = ScanUtil.getTenantId(scan);
+
+ RegionScanner innerScanner = s;
+ if (p != null || j != null) {
+ innerScanner = new HashJoinRegionScanner(s, p, j, tenantId, c.getEnvironment());
+ }
+
+ if (iterator == null) {
+ return getWrappedScanner(c, innerScanner);
+ }
+
+ return getTopNScanner(c, innerScanner, iterator, tenantId);
+ }
+
+ /**
+ * Return region scanner that does TopN.
+ * We only need to call startRegionOperation and closeRegionOperation when
+ * getting the first Tuple (which forces running through the entire region)
+ * since after this everything is held in memory
+ */
+ private RegionScanner getTopNScanner(final ObserverContext<RegionCoprocessorEnvironment> c, final RegionScanner s, final OrderedResultIterator iterator, ImmutableBytesWritable tenantId) throws Throwable {
+ final Tuple firstTuple;
+ TenantCache tenantCache = GlobalCache.getTenantCache(c.getEnvironment(), tenantId);
+ long estSize = iterator.getEstimatedByteSize();
+ final MemoryChunk chunk = tenantCache.getMemoryManager().allocate(estSize);
+ final HRegion region = c.getEnvironment().getRegion();
+ region.startRegionOperation();
+ try {
+ // Once we return from the first call to next, we've run through and cached
+ // the topN rows, so we no longer need to start/stop a region operation.
+ firstTuple = iterator.next();
+ // Now that the topN are cached, we can resize based on the real size
+ long actualSize = iterator.getByteSize();
+ chunk.resize(actualSize);
+ } catch (Throwable t) {
+ ServerUtil.throwIOException(region.getRegionNameAsString(), t);
+ return null;
+ } finally {
+ region.closeRegionOperation();
+ }
+ return new BaseRegionScanner() {
+ private Tuple tuple = firstTuple;
+
+ @Override
+ public boolean isFilterDone() {
+ return tuple == null;
+ }
+
+ @Override
+ public HRegionInfo getRegionInfo() {
+ return s.getRegionInfo();
+ }
+
+ @Override
+ public boolean next(List<KeyValue> results) throws IOException {
+ try {
+ if (isFilterDone()) {
+ return false;
+ }
+
+ for (int i = 0; i < tuple.size(); i++) {
+ results.add(tuple.getValue(i));
+ }
+
+ tuple = iterator.next();
+ return !isFilterDone();
+ } catch (Throwable t) {
+ ServerUtil.throwIOException(region.getRegionNameAsString(), t);
+ return false;
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ s.close();
+ } finally {
+ chunk.close(); }
+ }
+ };
+ }
+
+ /**
+ * Return wrapped scanner that catches unexpected exceptions (i.e. Phoenix bugs) and
+ * re-throws as DoNotRetryIOException to prevent needless retrying hanging the query
+ * for 30 seconds. Unfortunately, until HBASE-7481 gets fixed, there's no way to do
+ * the same from a custom filter.
+ */
+ private RegionScanner getWrappedScanner(final ObserverContext<RegionCoprocessorEnvironment> c, final RegionScanner s) {
+ return new RegionScanner() {
+
+ @Override
+ public boolean next(List<KeyValue> results) throws IOException {
+ try {
+ return s.next(results);
+ } catch (Throwable t) {
+ ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionNameAsString(), t);
+ return false; // impossible
+ }
+ }
+
+ @Override
+ public boolean next(List<KeyValue> results, String metric) throws IOException {
+ try {
+ return s.next(results, metric);
+ } catch (Throwable t) {
+ ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionNameAsString(), t);
+ return false; // impossible
+ }
+ }
+
+ @Override
+ public boolean next(List<KeyValue> result, int limit) throws IOException {
+ try {
+ return s.next(result, limit);
+ } catch (Throwable t) {
+ ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionNameAsString(), t);
+ return false; // impossible
+ }
+ }
+
+ @Override
+ public boolean next(List<KeyValue> result, int limit, String metric) throws IOException {
+ try {
+ return s.next(result, limit, metric);
+ } catch (Throwable t) {
+ ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionNameAsString(), t);
+ return false; // impossible
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ s.close();
+ }
+
+ @Override
+ public HRegionInfo getRegionInfo() {
+ return s.getRegionInfo();
+ }
+
+ @Override
+ public boolean isFilterDone() {
+ return s.isFilterDone();
+ }
+
+ @Override
+ public boolean reseek(byte[] row) throws IOException {
+ return s.reseek(row);
+ }
+
+ @Override
+ public long getMvccReadPoint() {
+ return s.getMvccReadPoint();
+ }
+
+ @Override
+ public boolean nextRaw(List<KeyValue> result, String metric) throws IOException {
+ try {
+ return s.nextRaw(result, metric);
+ } catch (Throwable t) {
+ ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionNameAsString(), t);
+ return false; // impossible
+ }
+ }
+
+ @Override
+ public boolean nextRaw(List<KeyValue> result, int limit, String metric) throws IOException {
+ try {
+ return s.nextRaw(result, limit, metric);
+ } catch (Throwable t) {
+ ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionNameAsString(), t);
+ return false; // impossible
+ }
+ }
+ };
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java b/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
new file mode 100644
index 0000000..ec0c06e
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import java.sql.SQLException;
+
+import org.apache.hadoop.hbase.coprocessor.BaseEndpointCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.cache.GlobalCache;
+import org.apache.phoenix.cache.TenantCache;
+
+
+
+
+/**
+ *
+ * Server-side implementation of {@link ServerCachingProtocol}
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class ServerCachingEndpointImpl extends BaseEndpointCoprocessor implements ServerCachingProtocol {
+
+ @Override
+ public boolean addServerCache(byte[] tenantId, byte[] cacheId, ImmutableBytesWritable cachePtr, ServerCacheFactory cacheFactory) throws SQLException {
+ TenantCache tenantCache = GlobalCache.getTenantCache((RegionCoprocessorEnvironment)this.getEnvironment(), tenantId == null ? null : new ImmutableBytesPtr(tenantId));
+ tenantCache.addServerCache(new ImmutableBytesPtr(cacheId), cachePtr, cacheFactory);
+ return true;
+ }
+
+ @Override
+ public boolean removeServerCache(byte[] tenantId, byte[] cacheId) throws SQLException {
+ TenantCache tenantCache = GlobalCache.getTenantCache((RegionCoprocessorEnvironment)this.getEnvironment(), tenantId == null ? null : new ImmutableBytesPtr(tenantId));
+ tenantCache.removeServerCache(new ImmutableBytesPtr(cacheId));
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/coprocessor/ServerCachingProtocol.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/coprocessor/ServerCachingProtocol.java b/src/main/java/org/apache/phoenix/coprocessor/ServerCachingProtocol.java
new file mode 100644
index 0000000..abda834
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/coprocessor/ServerCachingProtocol.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import java.io.Closeable;
+import java.sql.SQLException;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
+import org.apache.hadoop.io.Writable;
+
+import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
+
+/**
+ *
+ * EndPoint coprocessor to send a cache to a region server.
+ * Used for:
+ * a) hash joins, to send the smaller side of the join to each region server
+ * b) secondary indexes, to send the necessary meta data to each region server
+ * @author jtaylor
+ * @since 0.1
+ */
+public interface ServerCachingProtocol extends CoprocessorProtocol {
+ public static interface ServerCacheFactory extends Writable {
+ public Closeable newCache(ImmutableBytesWritable cachePtr, MemoryChunk chunk) throws SQLException;
+ }
+ /**
+ * Add the cache to the region server cache.
+ * @param tenantId the tenantId or null if not applicable
+ * @param cacheId unique identifier of the cache
+ * @param cachePtr pointer to the byte array of the cache
+ * @param cacheFactory factory that converts from byte array to object representation on the server side
+ * @return true on success and otherwise throws
+ * @throws SQLException
+ */
+ public boolean addServerCache(byte[] tenantId, byte[] cacheId, ImmutableBytesWritable cachePtr, ServerCacheFactory cacheFactory) throws SQLException;
+ /**
+ * Remove the cache from the region server cache. Called upon completion of
+ * the operation when cache is no longer needed.
+ * @param tenantId the tenantId or null if not applicable
+ * @param cacheId unique identifier of the cache
+ * @return true on success and otherwise throws
+ * @throws SQLException
+ */
+ public boolean removeServerCache(byte[] tenantId, byte[] cacheId) throws SQLException;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
new file mode 100644
index 0000000..e36ce13
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -0,0 +1,437 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+import static org.apache.phoenix.query.QueryConstants.UNGROUPED_AGG_ROW_KEY;
+import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.WritableUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.phoenix.exception.ValueTypeIncompatibleException;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.ExpressionType;
+import org.apache.phoenix.expression.aggregator.Aggregator;
+import org.apache.phoenix.expression.aggregator.Aggregators;
+import org.apache.phoenix.expression.aggregator.ServerAggregators;
+import org.apache.phoenix.index.PhoenixIndexCodec;
+import org.apache.phoenix.join.HashJoinInfo;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.ConstraintViolationException;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.PRow;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableImpl;
+import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.ScanUtil;
+import org.apache.phoenix.util.SchemaUtil;
+
+
+/**
+ * Region observer that aggregates ungrouped rows(i.e. SQL query with aggregation function and no GROUP BY).
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver {
+ private static final Logger logger = LoggerFactory.getLogger(UngroupedAggregateRegionObserver.class);
+ // TODO: move all constants into a single class
+ public static final String UNGROUPED_AGG = "UngroupedAgg";
+ public static final String DELETE_AGG = "DeleteAgg";
+ public static final String UPSERT_SELECT_TABLE = "UpsertSelectTable";
+ public static final String UPSERT_SELECT_EXPRS = "UpsertSelectExprs";
+ public static final String DELETE_CQ = "DeleteCQ";
+ public static final String DELETE_CF = "DeleteCF";
+ public static final String EMPTY_CF = "EmptyCF";
+
+ private static void commitBatch(HRegion region, List<Pair<Mutation,Integer>> mutations, byte[] indexUUID) throws IOException {
+ if (indexUUID != null) {
+ for (Pair<Mutation,Integer> pair : mutations) {
+ pair.getFirst().setAttribute(PhoenixIndexCodec.INDEX_UUID, indexUUID);
+ }
+ }
+ @SuppressWarnings("unchecked")
+ Pair<Mutation,Integer>[] mutationArray = new Pair[mutations.size()];
+ // TODO: should we use the one that is all or none?
+ region.batchMutate(mutations.toArray(mutationArray));
+ }
+
+ public static void serializeIntoScan(Scan scan) {
+ scan.setAttribute(UNGROUPED_AGG, QueryConstants.TRUE);
+ }
+
+ @Override
+ protected RegionScanner doPostScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final RegionScanner s) throws IOException {
+ byte[] isUngroupedAgg = scan.getAttribute(UNGROUPED_AGG);
+ if (isUngroupedAgg == null) {
+ return s;
+ }
+ byte[] upgradeTo20 = scan.getAttribute(SchemaUtil.UPGRADE_TO_2_0);
+ /* Hack to upgrade data to new 2.0 format */
+ if (upgradeTo20 != null) {
+ int nColumns = Bytes.toInt(upgradeTo20);
+ SchemaUtil.upgradeTo2IfNecessary(c.getEnvironment().getRegion(), nColumns);
+ return new BaseRegionScanner() {
+ @Override
+ public HRegionInfo getRegionInfo() {
+ return s.getRegionInfo();
+ }
+ @Override
+ public boolean isFilterDone() {
+ return true;
+ }
+ @Override
+ public void close() throws IOException {
+ s.close();
+ }
+ @Override
+ public boolean next(List<KeyValue> results) throws IOException {
+ return false;
+ }
+ };
+ }
+
+ final ScanProjector p = ScanProjector.deserializeProjectorFromScan(scan);
+ final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan);
+ RegionScanner theScanner = s;
+ if (p != null && j != null) {
+ theScanner = new HashJoinRegionScanner(s, p, j, ScanUtil.getTenantId(scan), c.getEnvironment());
+ }
+ final RegionScanner innerScanner = theScanner;
+
+ byte[] indexUUID = scan.getAttribute(PhoenixIndexCodec.INDEX_UUID);
+ PTable projectedTable = null;
+ List<Expression> selectExpressions = null;
+ byte[] upsertSelectTable = scan.getAttribute(UPSERT_SELECT_TABLE);
+ boolean isUpsert = false;
+ boolean isDelete = false;
+ byte[] deleteCQ = null;
+ byte[] deleteCF = null;
+ byte[][] values = null;
+ byte[] emptyCF = null;
+ ImmutableBytesWritable ptr = null;
+ if (upsertSelectTable != null) {
+ isUpsert = true;
+ projectedTable = deserializeTable(upsertSelectTable);
+ selectExpressions = deserializeExpressions(scan.getAttribute(UPSERT_SELECT_EXPRS));
+ values = new byte[projectedTable.getPKColumns().size()][];
+ ptr = new ImmutableBytesWritable();
+ } else {
+ byte[] isDeleteAgg = scan.getAttribute(DELETE_AGG);
+ isDelete = isDeleteAgg != null && Bytes.compareTo(PDataType.TRUE_BYTES, isDeleteAgg) == 0;
+ if (!isDelete) {
+ deleteCF = scan.getAttribute(DELETE_CF);
+ deleteCQ = scan.getAttribute(DELETE_CQ);
+ }
+ emptyCF = scan.getAttribute(EMPTY_CF);
+ }
+
+ int batchSize = 0;
+ long ts = scan.getTimeRange().getMax();
+ HRegion region = c.getEnvironment().getRegion();
+ List<Pair<Mutation,Integer>> mutations = Collections.emptyList();
+ if (isDelete || isUpsert || (deleteCQ != null && deleteCF != null) || emptyCF != null) {
+ // TODO: size better
+ mutations = Lists.newArrayListWithExpectedSize(1024);
+ batchSize = c.getEnvironment().getConfiguration().getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
+ }
+ Aggregators aggregators = ServerAggregators.deserialize(
+ scan.getAttribute(GroupedAggregateRegionObserver.AGGREGATORS), c.getEnvironment().getConfiguration());
+ Aggregator[] rowAggregators = aggregators.getAggregators();
+ boolean hasMore;
+ boolean hasAny = false;
+ MultiKeyValueTuple result = new MultiKeyValueTuple();
+ if (logger.isInfoEnabled()) {
+ logger.info("Starting ungrouped coprocessor scan " + scan);
+ }
+ long rowCount = 0;
+ MultiVersionConsistencyControl.setThreadReadPoint(innerScanner.getMvccReadPoint());
+ region.startRegionOperation();
+ try {
+ do {
+ List<KeyValue> results = new ArrayList<KeyValue>();
+ // Results are potentially returned even when the return value of s.next is false
+ // since this is an indication of whether or not there are more values after the
+ // ones returned
+ hasMore = innerScanner.nextRaw(results, null);
+ if (!results.isEmpty()) {
+ rowCount++;
+ result.setKeyValues(results);
+ try {
+ if (isDelete) {
+ @SuppressWarnings("deprecation") // FIXME: Remove when unintentionally deprecated method is fixed (HBASE-7870).
+ // FIXME: the version of the Delete constructor without the lock args was introduced
+ // in 0.94.4, thus if we try to use it here we can no longer use the 0.94.2 version
+ // of the client.
+ Delete delete = new Delete(results.get(0).getRow(),ts,null);
+ mutations.add(new Pair<Mutation,Integer>(delete,null));
+ } else if (isUpsert) {
+ Arrays.fill(values, null);
+ int i = 0;
+ List<PColumn> projectedColumns = projectedTable.getColumns();
+ for (; i < projectedTable.getPKColumns().size(); i++) {
+ Expression expression = selectExpressions.get(i);
+ if (expression.evaluate(result, ptr)) {
+ values[i] = ptr.copyBytes();
+ // If ColumnModifier from expression in SELECT doesn't match the
+ // column being projected into then invert the bits.
+ if (expression.getColumnModifier() != projectedColumns.get(i).getColumnModifier()) {
+ ColumnModifier.SORT_DESC.apply(values[i], 0, values[i], 0, values[i].length);
+ }
+ }
+ }
+ projectedTable.newKey(ptr, values);
+ PRow row = projectedTable.newRow(ts, ptr);
+ for (; i < projectedColumns.size(); i++) {
+ Expression expression = selectExpressions.get(i);
+ if (expression.evaluate(result, ptr)) {
+ PColumn column = projectedColumns.get(i);
+ byte[] bytes = ptr.copyBytes();
+ Object value = expression.getDataType().toObject(bytes, column.getColumnModifier());
+ // If ColumnModifier from expression in SELECT doesn't match the
+ // column being projected into then invert the bits.
+ if (expression.getColumnModifier() != column.getColumnModifier()) {
+ ColumnModifier.SORT_DESC.apply(bytes, 0, bytes, 0, bytes.length);
+ }
+ // We are guaranteed that the two column will have the same type.
+ if (!column.getDataType().isSizeCompatible(column.getDataType(),
+ value, bytes,
+ expression.getMaxLength(), column.getMaxLength(),
+ expression.getScale(), column.getScale())) {
+ throw new ValueTypeIncompatibleException(column.getDataType(),
+ column.getMaxLength(), column.getScale());
+ }
+ bytes = column.getDataType().coerceBytes(bytes, value, expression.getDataType(),
+ expression.getMaxLength(), expression.getScale(), column.getMaxLength(), column.getScale());
+ row.setValue(column, bytes);
+ }
+ }
+ for (Mutation mutation : row.toRowMutations()) {
+ mutations.add(new Pair<Mutation,Integer>(mutation,null));
+ }
+ } else if (deleteCF != null && deleteCQ != null) {
+ // No need to search for delete column, since we project only it
+ // if no empty key value is being set
+ if (emptyCF == null || result.getValue(deleteCF, deleteCQ) != null) {
+ Delete delete = new Delete(results.get(0).getRow());
+ delete.deleteColumns(deleteCF, deleteCQ, ts);
+ mutations.add(new Pair<Mutation,Integer>(delete,null));
+ }
+ }
+ if (emptyCF != null) {
+ /*
+ * If we've specified an emptyCF, then we need to insert an empty
+ * key value "retroactively" for any key value that is visible at
+ * the timestamp that the DDL was issued. Key values that are not
+ * visible at this timestamp will not ever be projected up to
+ * scans past this timestamp, so don't need to be considered.
+ * We insert one empty key value per row per timestamp.
+ */
+ Set<Long> timeStamps = Sets.newHashSetWithExpectedSize(results.size());
+ for (KeyValue kv : results) {
+ long kvts = kv.getTimestamp();
+ if (!timeStamps.contains(kvts)) {
+ Put put = new Put(kv.getRow());
+ put.add(emptyCF, QueryConstants.EMPTY_COLUMN_BYTES, kvts, ByteUtil.EMPTY_BYTE_ARRAY);
+ mutations.add(new Pair<Mutation,Integer>(put,null));
+ }
+ }
+ }
+ // Commit in batches based on UPSERT_BATCH_SIZE_ATTRIB in config
+ if (!mutations.isEmpty() && batchSize > 0 && mutations.size() % batchSize == 0) {
+ commitBatch(region,mutations, indexUUID);
+ mutations.clear();
+ }
+ } catch (ConstraintViolationException e) {
+ // Log and ignore in count
+ logger.error("Failed to create row in " + region.getRegionNameAsString() + " with values " + SchemaUtil.toString(values), e);
+ continue;
+ }
+ aggregators.aggregate(rowAggregators, result);
+ hasAny = true;
+ }
+ } while (hasMore);
+ } finally {
+ innerScanner.close();
+ region.closeRegionOperation();
+ }
+
+ if (logger.isInfoEnabled()) {
+ logger.info("Finished scanning " + rowCount + " rows for ungrouped coprocessor scan " + scan);
+ }
+
+ if (!mutations.isEmpty()) {
+ commitBatch(region,mutations, indexUUID);
+ }
+
+ final boolean hadAny = hasAny;
+ KeyValue keyValue = null;
+ if (hadAny) {
+ byte[] value = aggregators.toBytes(rowAggregators);
+ keyValue = KeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length);
+ }
+ final KeyValue aggKeyValue = keyValue;
+
+ RegionScanner scanner = new BaseRegionScanner() {
+ private boolean done = !hadAny;
+
+ @Override
+ public HRegionInfo getRegionInfo() {
+ return innerScanner.getRegionInfo();
+ }
+
+ @Override
+ public boolean isFilterDone() {
+ return done;
+ }
+
+ @Override
+ public void close() throws IOException {
+ innerScanner.close();
+ }
+
+ @Override
+ public boolean next(List<KeyValue> results) throws IOException {
+ if (done) return false;
+ done = true;
+ results.add(aggKeyValue);
+ return false;
+ }
+ };
+ return scanner;
+ }
+
+ private static PTable deserializeTable(byte[] b) {
+ ByteArrayInputStream stream = new ByteArrayInputStream(b);
+ try {
+ DataInputStream input = new DataInputStream(stream);
+ PTable table = new PTableImpl();
+ table.readFields(input);
+ return table;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } finally {
+ try {
+ stream.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private static List<Expression> deserializeExpressions(byte[] b) {
+ ByteArrayInputStream stream = new ByteArrayInputStream(b);
+ try {
+ DataInputStream input = new DataInputStream(stream);
+ int size = WritableUtils.readVInt(input);
+ List<Expression> selectExpressions = Lists.newArrayListWithExpectedSize(size);
+ for (int i = 0; i < size; i++) {
+ ExpressionType type = ExpressionType.values()[WritableUtils.readVInt(input)];
+ Expression selectExpression = type.newInstance();
+ selectExpression.readFields(input);
+ selectExpressions.add(selectExpression);
+ }
+ return selectExpressions;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } finally {
+ try {
+ stream.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ public static byte[] serialize(PTable projectedTable) {
+ ByteArrayOutputStream stream = new ByteArrayOutputStream();
+ try {
+ DataOutputStream output = new DataOutputStream(stream);
+ projectedTable.write(output);
+ return stream.toByteArray();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } finally {
+ try {
+ stream.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ public static byte[] serialize(List<Expression> selectExpressions) {
+ ByteArrayOutputStream stream = new ByteArrayOutputStream();
+ try {
+ DataOutputStream output = new DataOutputStream(stream);
+ WritableUtils.writeVInt(output, selectExpressions.size());
+ for (int i = 0; i < selectExpressions.size(); i++) {
+ Expression expression = selectExpressions.get(i);
+ WritableUtils.writeVInt(output, ExpressionType.valueOf(expression).ordinal());
+ expression.write(output);
+ }
+ return stream.toByteArray();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } finally {
+ try {
+ stream.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/exception/PhoenixIOException.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/exception/PhoenixIOException.java b/src/main/java/org/apache/phoenix/exception/PhoenixIOException.java
new file mode 100644
index 0000000..d1b0b18
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/exception/PhoenixIOException.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.exception;
+
+import java.sql.SQLException;
+
+
+public class PhoenixIOException extends SQLException {
+ private static final long serialVersionUID = 1L;
+ private static SQLExceptionCode code = SQLExceptionCode.IO_EXCEPTION;
+
+ public PhoenixIOException(Throwable e) {
+ super(e.getMessage(), code.getSQLState(), code.getErrorCode(), e);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/exception/PhoenixParserException.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/exception/PhoenixParserException.java b/src/main/java/org/apache/phoenix/exception/PhoenixParserException.java
new file mode 100644
index 0000000..75a4091
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/exception/PhoenixParserException.java
@@ -0,0 +1,107 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.exception;
+
+import java.sql.SQLSyntaxErrorException;
+
+import org.antlr.runtime.*;
+
+import org.apache.phoenix.parse.PhoenixSQLParser;
+
+
+public class PhoenixParserException extends SQLSyntaxErrorException {
+ private static final long serialVersionUID = 1L;
+
+ public PhoenixParserException(Exception e, PhoenixSQLParser parser) {
+ super(new SQLExceptionInfo.Builder(getErrorCode(e)).setRootCause(e)
+ .setMessage(getErrorMessage(e, parser)).build().toString(),
+ getErrorCode(e).getSQLState(), getErrorCode(e).getErrorCode(), e);
+ }
+
+ public static String getLine(RecognitionException e) {
+ return Integer.toString(e.token.getLine());
+ }
+
+ public static String getColumn(RecognitionException e) {
+ return Integer.toString(e.token.getCharPositionInLine() + 1);
+ }
+
+ public static String getTokenLocation(RecognitionException e) {
+ return "line " + getLine(e) + ", column " + getColumn(e) + ".";
+ }
+
+ public static String getErrorMessage(Exception e, PhoenixSQLParser parser) {
+ String[] tokenNames = parser.getTokenNames();
+ String msg;
+ if (e instanceof MissingTokenException) {
+ MissingTokenException mte = (MissingTokenException)e;
+ String tokenName;
+ if (mte.expecting== Token.EOF) {
+ tokenName = "EOF";
+ } else {
+ tokenName = tokenNames[mte.expecting];
+ }
+ msg = "Missing \""+ tokenName +"\" at "+ getTokenLocation(mte);
+ } else if (e instanceof UnwantedTokenException) {
+ UnwantedTokenException ute = (UnwantedTokenException)e;
+ String tokenName;
+ if (ute.expecting== Token.EOF) {
+ tokenName = "EOF";
+ } else {
+ tokenName = tokenNames[ute.expecting];
+ }
+ msg = "Unexpected input. Expecting \"" + tokenName + "\", got \"" + ute.getUnexpectedToken().getText()
+ + "\" at " + getTokenLocation(ute);
+ } else if (e instanceof MismatchedTokenException) {
+ MismatchedTokenException mte = (MismatchedTokenException)e;
+ String tokenName;
+ if (mte.expecting== Token.EOF) {
+ tokenName = "EOF";
+ } else {
+ tokenName = tokenNames[mte.expecting];
+ }
+ msg = "Mismatched input. Expecting \"" + tokenName + "\", got \"" + mte.token.getText()
+ + "\" at " + getTokenLocation(mte);
+ } else if (e instanceof RecognitionException){
+ RecognitionException re = (RecognitionException) e;
+ msg = "Encountered \"" + re.token.getText() + "\" at " + getTokenLocation(re);
+ } else if (e instanceof UnknownFunctionException) {
+ UnknownFunctionException ufe = (UnknownFunctionException) e;
+ msg = "Unknown function: \"" + ufe.getFuncName() + "\".";
+ } else {
+ msg = e.getMessage();
+ }
+ return msg;
+ }
+
+ public static SQLExceptionCode getErrorCode(Exception e) {
+ if (e instanceof MissingTokenException) {
+ return SQLExceptionCode.MISSING_TOKEN;
+ } else if (e instanceof UnwantedTokenException) {
+ return SQLExceptionCode.UNWANTED_TOKEN;
+ } else if (e instanceof MismatchedTokenException) {
+ return SQLExceptionCode.MISMATCHED_TOKEN;
+ } else if (e instanceof UnknownFunctionException) {
+ return SQLExceptionCode.UNKNOWN_FUNCTION;
+ } else {
+ return SQLExceptionCode.PARSER_ERROR;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
new file mode 100644
index 0000000..c6e664a
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -0,0 +1,195 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.exception;
+
+import org.apache.hbase.index.util.IndexManagementUtil;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.util.MetaDataUtil;
+
+
+/**
+ * Various SQLException Information. Including a vender-specific errorcode and a standard SQLState.
+ *
+ * @author zhuang
+ * @since 1.0
+ */
+public enum SQLExceptionCode {
+
+ /**
+ * Connection Exception (errorcode 01, sqlstate 08)
+ */
+ IO_EXCEPTION(101, "08000", "Unexpected IO exception."),
+ MALFORMED_CONNECTION_URL(102, "08001", "Malformed connection url."),
+ CANNOT_ESTABLISH_CONNECTION(103, "08004", "Unable to establish connection."),
+
+ /**
+ * Data Exception (errorcode 02, sqlstate 22)
+ */
+ ILLEGAL_DATA(201, "22000", "Illegal data."),
+ DIVIDE_BY_ZERO(202, "22012", "Divide by zero."),
+ TYPE_MISMATCH(203, "22005", "Type mismatch."),
+ VALUE_IN_UPSERT_NOT_CONSTANT(204, "22008", "Values in UPSERT must evaluate to a constant."),
+ MALFORMED_URL(205, "22009", "Malformed URL."),
+ DATA_INCOMPATIBLE_WITH_TYPE(206, "22003", "The value is outside the range for the data type."),
+ MISSING_CHAR_LENGTH(207, "22003", "Missing length for CHAR."),
+ NONPOSITIVE_CHAR_LENGTH(208, "22003", "CHAR or VARCHAR must have a positive length."),
+ DECIMAL_PRECISION_OUT_OF_RANGE(209, "22003", "Decimal precision outside of range. Should be within 1 and " + PDataType.MAX_PRECISION + "."),
+ MISSING_BINARY_LENGTH(210, "22003", "Missing length for BINARY."),
+ NONPOSITIVE_BINARY_LENGTH(211, "22003", "BINARY must have a positive length."),
+ SERVER_ARITHMETIC_ERROR(212, "22012", "Arithmetic error on server."),
+ VALUE_OUTSIDE_RANGE(213,"22003","Value outside range."),
+ VALUE_IN_LIST_NOT_CONSTANT(214, "22008", "Values in IN must evaluate to a constant."),
+
+ /**
+ * Constraint Violation (errorcode 03, sqlstate 23)
+ */
+ CONCURRENT_TABLE_MUTATION(301, "23000", "Concurrent modification to table."),
+ CANNOT_INDEX_COLUMN_ON_TYPE(201, "23100", "The column cannot be index due to its type."),
+
+ /**
+ * Invalid Cursor State (errorcode 04, sqlstate 24)
+ */
+ CURSOR_BEFORE_FIRST_ROW(401, "24015","Cursor before first row."),
+ CURSOR_PAST_LAST_ROW(401, "24016", "Cursor past last row."),
+
+ /**
+ * Syntax Error or Access Rule Violation (errorcode 05, sqlstate 42)
+ */
+ AMBIGUOUS_TABLE(501, "42000", "Table name exists in more than one table schema and is used without being qualified."),
+ READ_ONLY_TABLE(505, "42000", "Table is read only."),
+ INDEX_MISSING_PK_COLUMNS(513, "42602", "Index table missing PK Columns."),
+ AMBIGUOUS_COLUMN(502, "42702", "Column reference ambiguous or duplicate names."),
+ COLUMN_NOT_FOUND(504, "42703", "Undefined column."),
+ COLUMN_EXIST_IN_DEF(503, "42711", "A duplicate column name was detected in the object definition or ALTER TABLE statement."),
+ CANNOT_DROP_PK(506, "42817", "Primary key column may not be dropped."),
+ CANNOT_CONVERT_TYPE(507, "42846", "Cannot convert type."),
+ UNSUPPORTED_ORDER_BY_QUERY(508, "42878", "ORDER BY only allowed for limited or aggregate queries"),
+ PRIMARY_KEY_MISSING(509, "42888", "The table does not have a primary key."),
+ PRIMARY_KEY_ALREADY_EXISTS(510, "42889", "The table already has a primary key."),
+ ORDER_BY_NOT_IN_SELECT_DISTINCT(511, "42890", "All ORDER BY expressions must appear in SELECT DISTINCT:"),
+ INVALID_PRIMARY_KEY_CONSTRAINT(512, "42891", "Invalid column reference in primary key constraint"),
+
+ /**
+ * HBase and Phoenix specific implementation defined sub-classes.
+ * Column family related exceptions.
+ *
+ * For the following exceptions, use errorcode 10.
+ */
+ COLUMN_FAMILY_NOT_FOUND(1001, "42I01", "Undefined column family."),
+ PROPERTIES_FOR_FAMILY(1002, "42I02","Properties may not be defined for an unused family name."),
+ // Primary/row key related exceptions.
+ PRIMARY_KEY_WITH_FAMILY_NAME(1003, "42J01", "Primary key columns must not have a family name."),
+ PRIMARY_KEY_OUT_OF_ORDER(1004, "42J02", "Order of columns in primary key constraint must match the order in which they're declared."),
+ VARBINARY_IN_ROW_KEY(1005, "42J03", "The VARBINARY type can only be used as the last part of a multi-part row key."),
+ NOT_NULLABLE_COLUMN_IN_ROW_KEY(1006, "42J04", "Only nullable columns may be added to a multi-part row key."),
+ VARBINARY_LAST_PK(1022, "42J04", "Cannot add column to table when the last PK column is of type VARBINARY."),
+ NULLABLE_FIXED_WIDTH_LAST_PK(1023, "42J04", "Cannot add column to table when the last PK column is nullable and fixed width."),
+ // Key/value column related errors
+ KEY_VALUE_NOT_NULL(1007, "42K01", "A key/value column may not be declared as not null."),
+ // View related errors.
+ VIEW_WITH_TABLE_CONFIG(1008, "42L01", "A view may not contain table configuration properties."),
+ VIEW_WITH_PROPERTIES(1009, "42L02", "Properties may not be defined for a view."),
+ // Table related errors that are not in standard code.
+ CANNOT_MUTATE_TABLE(1010, "42M01", "Not allowed to mutate table."),
+ UNEXPECTED_MUTATION_CODE(1011, "42M02", "Unexpected mutation code."),
+ TABLE_UNDEFINED(1012, "42M03", "Table undefined."),
+ TABLE_ALREADY_EXIST(1013, "42M04", "Table already exists."),
+ // Index related errors
+ INDEX_ALREADY_EXIST(1023, "42N01", "Index already exists."),
+ CANNOT_MUTATE_INDEX(1024, "42N02", "Cannot mutate existing index."),
+ // Syntax error
+ TYPE_NOT_SUPPORTED_FOR_OPERATOR(1014, "42Y01", "The operator does not support the operand type."),
+ SCHEMA_NOT_FOUND(1015, "42Y07", "Schema not found."),
+ AGGREGATE_IN_GROUP_BY(1016, "42Y26", "Aggregate expressions may not be used in GROUP BY."),
+ AGGREGATE_IN_WHERE(1017, "42Y26", "Aggregate may not be used in WHERE."),
+ AGGREGATE_WITH_NOT_GROUP_BY_COLUMN(1018, "42Y27", "Aggregate may not contain columns not in GROUP BY."),
+ ONLY_AGGREGATE_IN_HAVING_CLAUSE(1019, "42Y26", "Only aggregate maybe used in the HAVING clause."),
+ UPSERT_COLUMN_NUMBERS_MISMATCH(1020, "42Y60", "Number of columns upserting must match number of values."),
+ // Table properties exception.
+ INVALID_BUCKET_NUM(1021, "42Y80", "Salt bucket numbers should be with 1 and 256."),
+ NO_SPLITS_ON_SALTED_TABLE(1022, "42Y81", "Should not specify split points on salted table with default row key order."),
+ SALT_ONLY_ON_CREATE_TABLE(1024, "42Y83", "Salt bucket number may only be specified when creating a table."),
+ SET_UNSUPPORTED_PROP_ON_ALTER_TABLE(1025, "42Y84", "Unsupported property set in ALTER TABLE command."),
+ CANNOT_ADD_NOT_NULLABLE_COLUMN(1030, "42Y84", "Only nullable columns may be added for a pre-existing table."),
+ NO_MUTABLE_INDEXES(1026, "42Y85", "Mutable secondary indexes are only supported for HBase version " + MetaDataUtil.decodeHBaseVersionAsString(PhoenixDatabaseMetaData.MUTABLE_SI_VERSION_THRESHOLD) + " and above."),
+ NO_DELETE_IF_IMMUTABLE_INDEX(1027, "42Y86", "Delete not allowed on a table with IMMUTABLE_ROW with non PK column in index."),
+ INVALID_INDEX_STATE_TRANSITION(1028, "42Y87", "Invalid index state transition."),
+ INVALID_MUTABLE_INDEX_CONFIG(1029, "42Y88", "Mutable secondary indexes must have the "
+ + IndexManagementUtil.WAL_EDIT_CODEC_CLASS_KEY + " property set to "
+ + IndexManagementUtil.INDEX_WAL_EDIT_CODEC_CLASS_NAME + " in the hbase-sites.xml of every region server"),
+
+ /** Parser error. (errorcode 06, sqlState 42P) */
+ PARSER_ERROR(601, "42P00", "Syntax error."),
+ MISSING_TOKEN(602, "42P00", "Syntax error."),
+ UNWANTED_TOKEN(603, "42P00", "Syntax error."),
+ MISMATCHED_TOKEN(603, "42P00", "Syntax error."),
+ UNKNOWN_FUNCTION(604, "42P00", "Syntax error."),
+
+ /**
+ * Implementation defined class. Execution exceptions (errorcode 11, sqlstate XCL).
+ */
+ RESULTSET_CLOSED(1101, "XCL01", "ResultSet is closed."),
+ GET_TABLE_REGIONS_FAIL(1102, "XCL02", "Cannot get all table regions"),
+ EXECUTE_QUERY_NOT_APPLICABLE(1103, "XCL03", "executeQuery may not be used."),
+ EXECUTE_UPDATE_NOT_APPLICABLE(1104, "XCL03", "executeUpdate may not be used."),
+ SPLIT_POINT_NOT_CONSTANT(1105, "XCL04", "Split points must be constants."),
+
+ /**
+ * Implementation defined class. Phoenix internal error. (errorcode 20, sqlstate INT).
+ */
+ CANNOT_CALL_METHOD_ON_TYPE(2001, "INT01", "Cannot call method on the argument type."),
+ CLASS_NOT_UNWRAPPABLE(2002, "INT03", "Class not unwrappable"),
+ PARAM_INDEX_OUT_OF_BOUND(2003, "INT04", "Parameter position is out of range."),
+ PARAM_VALUE_UNBOUND(2004, "INT05", "Parameter value unbound"),
+ INTERRUPTED_EXCEPTION(2005, "INT07", "Interrupted exception."),
+ INCOMPATIBLE_CLIENT_SERVER_JAR(2006, "INT08", "Incompatible jars detected between client and server."),
+ OUTDATED_JARS(2007, "INT09", "Outdated jars."),
+ INDEX_METADATA_NOT_FOUND(2008, "INT10", "Unable to find cached index metadata. "),
+ ;
+
+ private final int errorCode;
+ private final String sqlState;
+ private final String message;
+
+ private SQLExceptionCode(int errorCode, String sqlState, String message) {
+ this.errorCode = errorCode;
+ this.sqlState = sqlState;
+ this.message = message;
+ }
+
+ public String getSQLState() {
+ return sqlState;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+
+ public int getErrorCode() {
+ return errorCode;
+ }
+
+ @Override
+ public String toString() {
+ return "ERROR " + errorCode + " (" + sqlState + "): " + message;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/exception/SQLExceptionInfo.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/exception/SQLExceptionInfo.java b/src/main/java/org/apache/phoenix/exception/SQLExceptionInfo.java
new file mode 100644
index 0000000..a74bffc
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/exception/SQLExceptionInfo.java
@@ -0,0 +1,142 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.exception;
+
+import java.sql.SQLException;
+
+import org.apache.phoenix.util.SchemaUtil;
+
+
+/**
+ * Object serves as a closure of all coordinate information for SQLException messages.
+ *
+ * @author zhuang
+ * @since 1.0
+ */
+public class SQLExceptionInfo {
+
+ /**
+ * Constants used in naming exception location.
+ */
+ public static final String SCHEMA_NAME = "schemaName";
+ public static final String TABLE_NAME = "tableName";
+ public static final String FAMILY_NAME = "familyName";
+ public static final String COLUMN_NAME = "columnName";
+
+ private final Throwable rootCause;
+ private final SQLExceptionCode code; // Should always have one.
+ private final String message;
+ private final String schemaName;
+ private final String tableName;
+ private final String familyName;
+ private final String columnName;
+
+ public static class Builder {
+
+ private Throwable rootCause;
+ private SQLExceptionCode code; // Should always have one.
+ private String message;
+ private String schemaName;
+ private String tableName;
+ private String familyName;
+ private String columnName;
+
+ public Builder(SQLExceptionCode code) {
+ this.code = code;
+ }
+
+ public Builder setRootCause(Throwable t) {
+ this.rootCause = t;
+ return this;
+ }
+
+ public Builder setMessage(String message) {
+ this.message = message;
+ return this;
+ }
+
+ public Builder setSchemaName(String schemaName) {
+ this.schemaName = schemaName;
+ return this;
+ }
+
+ public Builder setTableName(String tableName) {
+ this.tableName = tableName;
+ return this;
+ }
+
+ public Builder setFamilyName(String familyName) {
+ this.familyName = familyName;
+ return this;
+ }
+
+ public Builder setColumnName(String columnName) {
+ this.columnName = columnName;
+ return this;
+ }
+
+ public SQLExceptionInfo build() {
+ return new SQLExceptionInfo(this);
+ }
+
+ @Override
+ public String toString() {
+ return code.toString();
+ }
+ }
+
+ private SQLExceptionInfo(Builder builder) {
+ code = builder.code;
+ rootCause = builder.rootCause;
+ message = builder.message;
+ schemaName = builder.schemaName;
+ tableName = builder.tableName;
+ familyName = builder.familyName;
+ columnName = builder.columnName;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder(code.toString());
+ if (message != null) {
+ builder.append(" ").append(message);
+ }
+ String columnDisplayName = SchemaUtil.getMetaDataEntityName(schemaName, tableName, familyName, columnName);
+ if (columnName != null) {
+ builder.append(" ").append(COLUMN_NAME).append("=").append(columnDisplayName);
+ } else if (familyName != null) {
+ builder.append(" ").append(FAMILY_NAME).append("=").append(columnDisplayName);
+ } else if (tableName != null) {
+ builder.append(" ").append(TABLE_NAME).append("=").append(columnDisplayName);
+ } else if (schemaName != null) {
+ builder.append(" ").append(SCHEMA_NAME).append("=").append(columnDisplayName);
+ }
+ return builder.toString();
+ }
+
+ public SQLException buildException() {
+ if (rootCause != null) {
+ return new SQLException(toString(), code.getSQLState(), code.getErrorCode(), rootCause);
+ } else {
+ return new SQLException(toString(), code.getSQLState(), code.getErrorCode(), null);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/exception/UnknownFunctionException.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/exception/UnknownFunctionException.java b/src/main/java/org/apache/phoenix/exception/UnknownFunctionException.java
new file mode 100644
index 0000000..1f66992
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/exception/UnknownFunctionException.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.exception;
+
+/**
+ * Thrown by ParseNodeFactory when it could not identify a node as a valid function.
+ */
+public class UnknownFunctionException extends RuntimeException {
+ private static final long serialVersionUID = 1L;
+ private final String funcName;
+
+ public UnknownFunctionException(String funcName) {
+ super();
+ this.funcName = funcName;
+ }
+
+ public String getFuncName() {
+ return funcName;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/exception/ValueTypeIncompatibleException.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/exception/ValueTypeIncompatibleException.java b/src/main/java/org/apache/phoenix/exception/ValueTypeIncompatibleException.java
new file mode 100644
index 0000000..403c009
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/exception/ValueTypeIncompatibleException.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.exception;
+
+import org.apache.phoenix.schema.IllegalDataException;
+import org.apache.phoenix.schema.PDataType;
+
+
+public class ValueTypeIncompatibleException extends IllegalDataException {
+ private static final long serialVersionUID = 1L;
+ private static SQLExceptionCode code = SQLExceptionCode.DATA_INCOMPATIBLE_WITH_TYPE;
+
+ public ValueTypeIncompatibleException(PDataType type, Integer precision, Integer scale) {
+ super(new SQLExceptionInfo.Builder(code).setMessage(getTypeDisplayString(type, precision, scale))
+ .build().toString());
+ }
+
+ private static String getTypeDisplayString(PDataType type, Integer precision, Integer scale) {
+ return type.toString() + "(" + precision + "," + scale + ")";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/execute/AggregatePlan.java b/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
new file mode 100644
index 0000000..aba97e3
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
@@ -0,0 +1,178 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.execute;
+
+
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.hbase.util.Bytes;
+
+import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.OrderByExpression;
+import org.apache.phoenix.expression.RowKeyExpression;
+import org.apache.phoenix.expression.aggregator.Aggregators;
+import org.apache.phoenix.iterate.AggregatingResultIterator;
+import org.apache.phoenix.iterate.ConcatResultIterator;
+import org.apache.phoenix.iterate.DistinctAggregatingResultIterator;
+import org.apache.phoenix.iterate.FilterAggregatingResultIterator;
+import org.apache.phoenix.iterate.GroupedAggregatingResultIterator;
+import org.apache.phoenix.iterate.LimitingResultIterator;
+import org.apache.phoenix.iterate.MergeSortRowKeyResultIterator;
+import org.apache.phoenix.iterate.OrderedAggregatingResultIterator;
+import org.apache.phoenix.iterate.OrderedResultIterator;
+import org.apache.phoenix.iterate.ParallelIterators;
+import org.apache.phoenix.iterate.ParallelIterators.ParallelIteratorFactory;
+import org.apache.phoenix.iterate.PeekingResultIterator;
+import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.iterate.SpoolingResultIterator;
+import org.apache.phoenix.iterate.UngroupedAggregatingResultIterator;
+import org.apache.phoenix.parse.FilterableStatement;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.query.Scanner;
+import org.apache.phoenix.query.WrappedScanner;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.util.SchemaUtil;
+
+
+
+/**
+ *
+ * Query plan for aggregating queries
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class AggregatePlan extends BasicQueryPlan {
+ private final Aggregators aggregators;
+ private final Expression having;
+ private List<KeyRange> splits;
+
+ public AggregatePlan(
+ StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector,
+ Integer limit, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory, GroupBy groupBy,
+ Expression having) {
+ super(context, statement, table, projector, context.getBindManager().getParameterMetaData(), limit, orderBy, groupBy, parallelIteratorFactory);
+ this.having = having;
+ this.aggregators = context.getAggregationManager().getAggregators();
+ }
+
+ @Override
+ public List<KeyRange> getSplits() {
+ return splits;
+ }
+
+ private static class OrderingResultIteratorFactory implements ParallelIteratorFactory {
+ private final QueryServices services;
+
+ public OrderingResultIteratorFactory(QueryServices services) {
+ this.services = services;
+ }
+ @Override
+ public PeekingResultIterator newIterator(ResultIterator scanner) throws SQLException {
+ Expression expression = RowKeyExpression.INSTANCE;
+ OrderByExpression orderByExpression = new OrderByExpression(expression, false, true);
+ int threshold = services.getProps().getInt(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
+ return new OrderedResultIterator(scanner, Collections.<OrderByExpression>singletonList(orderByExpression), threshold);
+ }
+ }
+
+ private static class WrappingResultIteratorFactory implements ParallelIteratorFactory {
+ private final ParallelIteratorFactory innerFactory;
+ private final ParallelIteratorFactory outerFactory;
+
+ public WrappingResultIteratorFactory(ParallelIteratorFactory innerFactory, ParallelIteratorFactory outerFactory) {
+ this.innerFactory = innerFactory;
+ this.outerFactory = outerFactory;
+ }
+ @Override
+ public PeekingResultIterator newIterator(ResultIterator scanner) throws SQLException {
+ PeekingResultIterator iterator = innerFactory.newIterator(scanner);
+ return outerFactory.newIterator(iterator);
+ }
+ }
+
+ private ParallelIteratorFactory wrapParallelIteratorFactory () {
+ ParallelIteratorFactory innerFactory;
+ QueryServices services = context.getConnection().getQueryServices();
+ if (groupBy.isEmpty() || groupBy.isOrderPreserving()) {
+ innerFactory = new SpoolingResultIterator.SpoolingResultIteratorFactory(services);
+ } else {
+ innerFactory = new OrderingResultIteratorFactory(services);
+ }
+ if (parallelIteratorFactory == null) {
+ return innerFactory;
+ }
+ // wrap any existing parallelIteratorFactory
+ return new WrappingResultIteratorFactory(innerFactory, parallelIteratorFactory);
+ }
+
+ @Override
+ protected Scanner newScanner(ConnectionQueryServices services) throws SQLException {
+ // Hack to set state on scan to make upgrade happen
+ int upgradeColumnCount = SchemaUtil.upgradeColumnCount(context.getConnection().getURL(),context.getConnection().getClientInfo());
+ if (upgradeColumnCount > 0) {
+ context.getScan().setAttribute(SchemaUtil.UPGRADE_TO_2_0, Bytes.toBytes(upgradeColumnCount));
+ }
+ if (groupBy.isEmpty()) {
+ UngroupedAggregateRegionObserver.serializeIntoScan(context.getScan());
+ }
+ ParallelIterators parallelIterators = new ParallelIterators(context, tableRef, statement, projection, groupBy, null, wrapParallelIteratorFactory());
+ splits = parallelIterators.getSplits();
+
+ AggregatingResultIterator aggResultIterator;
+ // No need to merge sort for ungrouped aggregation
+ if (groupBy.isEmpty()) {
+ aggResultIterator = new UngroupedAggregatingResultIterator(new ConcatResultIterator(parallelIterators), aggregators);
+ } else {
+ aggResultIterator = new GroupedAggregatingResultIterator(new MergeSortRowKeyResultIterator(parallelIterators), aggregators);
+ }
+
+ if (having != null) {
+ aggResultIterator = new FilterAggregatingResultIterator(aggResultIterator, having);
+ }
+
+ if (statement.isDistinct() && statement.isAggregate()) { // Dedup on client if select distinct and aggregation
+ aggResultIterator = new DistinctAggregatingResultIterator(aggResultIterator, getProjector());
+ }
+
+ ResultIterator resultScanner = aggResultIterator;
+ if (orderBy.getOrderByExpressions().isEmpty()) {
+ if (limit != null) {
+ resultScanner = new LimitingResultIterator(aggResultIterator, limit);
+ }
+ } else {
+ int thresholdBytes = services.getProps().getInt(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB,
+ QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
+ resultScanner = new OrderedAggregatingResultIterator(aggResultIterator, orderBy.getOrderByExpressions(), thresholdBytes, limit);
+ }
+
+ return new WrappedScanner(resultScanner, getProjector());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/execute/BasicQueryPlan.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/execute/BasicQueryPlan.java b/src/main/java/org/apache/phoenix/execute/BasicQueryPlan.java
new file mode 100644
index 0000000..4ecba32
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/execute/BasicQueryPlan.java
@@ -0,0 +1,188 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.execute;
+
+import java.sql.ParameterMetaData;
+import java.sql.SQLException;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.compile.ExplainPlan;
+import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.ScanRanges;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.iterate.ParallelIterators.ParallelIteratorFactory;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.parse.FilterableStatement;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.DegenerateScanner;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.Scanner;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.util.ScanUtil;
+import org.apache.phoenix.util.SchemaUtil;
+
+
+
+/**
+ *
+ * Query plan that has no child plans
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public abstract class BasicQueryPlan implements QueryPlan {
+ protected final TableRef tableRef;
+ protected final StatementContext context;
+ protected final FilterableStatement statement;
+ protected final RowProjector projection;
+ protected final ParameterMetaData paramMetaData;
+ protected final Integer limit;
+ protected final OrderBy orderBy;
+ protected final GroupBy groupBy;
+ protected final ParallelIteratorFactory parallelIteratorFactory;
+
+ private Scanner scanner;
+
+ protected BasicQueryPlan(
+ StatementContext context, FilterableStatement statement, TableRef table,
+ RowProjector projection, ParameterMetaData paramMetaData, Integer limit, OrderBy orderBy,
+ GroupBy groupBy, ParallelIteratorFactory parallelIteratorFactory) {
+ this.context = context;
+ this.statement = statement;
+ this.tableRef = table;
+ this.projection = projection;
+ this.paramMetaData = paramMetaData;
+ this.limit = limit;
+ this.orderBy = orderBy;
+ this.groupBy = groupBy;
+ this.parallelIteratorFactory = parallelIteratorFactory;
+ }
+
+ @Override
+ public GroupBy getGroupBy() {
+ return groupBy;
+ }
+
+
+ @Override
+ public OrderBy getOrderBy() {
+ return orderBy;
+ }
+
+ @Override
+ public TableRef getTableRef() {
+ return tableRef;
+ }
+
+ @Override
+ public Integer getLimit() {
+ return limit;
+ }
+
+ @Override
+ public RowProjector getProjector() {
+ return projection;
+ }
+
+ private ConnectionQueryServices getConnectionQueryServices(ConnectionQueryServices services) {
+ byte[] tenantId = context.getConnection().getTenantId();
+ // Get child services associated with tenantId of query.
+ ConnectionQueryServices childServices = tenantId == null ? services : services.getChildQueryServices(new ImmutableBytesWritable(tenantId));
+ return childServices;
+ }
+
+ protected void projectEmptyKeyValue() {
+ Scan scan = context.getScan();
+ PTable table = tableRef.getTable();
+ if (!projection.isProjectEmptyKeyValue() && table.getType() != PTableType.VIEW) {
+ scan.addColumn(SchemaUtil.getEmptyColumnFamily(table.getColumnFamilies()), QueryConstants.EMPTY_COLUMN_BYTES);
+ }
+ }
+// /**
+// * Sets up an id used to do round robin queue processing on the server
+// * @param scan
+// */
+// private void setProducer(Scan scan) {
+// byte[] producer = Bytes.toBytes(UUID.randomUUID().toString());
+// scan.setAttribute(HBaseServer.CALL_QUEUE_PRODUCER_ATTRIB_NAME, producer);
+// }
+
+ @Override
+ public final Scanner getScanner() throws SQLException {
+ if (scanner != null) {
+ return scanner;
+ }
+ Scan scan = context.getScan();
+ // Set producer on scan so HBase server does round robin processing
+ //setProducer(scan);
+ // Set the time range on the scan so we don't get back rows newer than when the statement was compiled
+ // The time stamp comes from the server at compile time when the meta data
+ // is resolved.
+ // TODO: include time range in explain plan?
+ PhoenixConnection connection = context.getConnection();
+ Long scn = connection.getSCN();
+ ScanUtil.setTimeRange(scan, scn == null ? context.getCurrentTime() : scn);
+ ScanUtil.setTenantId(scan, connection.getTenantId());
+ scanner = newScanner();
+ return scanner;
+ }
+
+ abstract protected Scanner newScanner(ConnectionQueryServices services) throws SQLException;
+
+ private Scanner newScanner() throws SQLException {
+ ConnectionQueryServices services = getConnectionQueryServices(context.getConnection().getQueryServices());
+ if (context.getScanRanges() == ScanRanges.NOTHING) { // is degenerate
+ scanner = new DegenerateScanner(tableRef, getProjector());
+ } else {
+ scanner = newScanner(services);
+ }
+ return scanner;
+ }
+
+ @Override
+ public ParameterMetaData getParameterMetaData() {
+ return paramMetaData;
+ }
+
+ @Override
+ public FilterableStatement getStatement() {
+ return statement;
+ }
+
+ @Override
+ public StatementContext getContext() {
+ return context;
+ }
+
+ @Override
+ public ExplainPlan getExplainPlan() throws SQLException {
+ if (scanner == null) {
+ scanner = newScanner();
+ }
+ return scanner.getExplainPlan();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/execute/CommitException.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/execute/CommitException.java b/src/main/java/org/apache/phoenix/execute/CommitException.java
new file mode 100644
index 0000000..4ca47ec
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/execute/CommitException.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.execute;
+
+import java.sql.SQLException;
+
+public class CommitException extends SQLException {
+ private static final long serialVersionUID = 1L;
+ private final MutationState uncommittedState;
+ private final MutationState committedState;
+
+ public CommitException(Exception e, MutationState uncommittedState, MutationState committedState) {
+ super(e);
+ this.uncommittedState = uncommittedState;
+ this.committedState = committedState;
+ }
+
+ public MutationState getUncommittedState() {
+ return uncommittedState;
+ }
+
+ public MutationState getCommittedState() {
+ return committedState;
+ }
+
+}