You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/01/27 20:23:36 UTC

[35/51] [partial] Initial commit

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java b/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
new file mode 100644
index 0000000..29c558f
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
@@ -0,0 +1,312 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.WritableUtils;
+
+import com.google.common.collect.Lists;
+import org.apache.phoenix.cache.GlobalCache;
+import org.apache.phoenix.cache.TenantCache;
+import org.apache.phoenix.expression.OrderByExpression;
+import org.apache.phoenix.iterate.OrderedResultIterator;
+import org.apache.phoenix.iterate.RegionScannerResultIterator;
+import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.join.HashJoinInfo;
+import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.ScanUtil;
+import org.apache.phoenix.util.ServerUtil;
+
+
+/**
+ * 
+ * Wraps the scan performing a non aggregate query to prevent needless retries
+ * if a Phoenix bug is encountered from our custom filter expression evaluation.
+ * Unfortunately, until HBASE-7481 gets fixed, there's no way to do this from our
+ * custom filters.
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class ScanRegionObserver extends BaseScannerRegionObserver {
+    public static final String NON_AGGREGATE_QUERY = "NonAggregateQuery";
+    private static final String TOPN = "TopN";
+
+    public static void serializeIntoScan(Scan scan, int thresholdBytes, int limit, List<OrderByExpression> orderByExpressions, int estimatedRowSize) {
+        ByteArrayOutputStream stream = new ByteArrayOutputStream(); // TODO: size?
+        try {
+            DataOutputStream output = new DataOutputStream(stream);
+            WritableUtils.writeVInt(output, thresholdBytes);
+            WritableUtils.writeVInt(output, limit);
+            WritableUtils.writeVInt(output, estimatedRowSize);
+            WritableUtils.writeVInt(output, orderByExpressions.size());
+            for (OrderByExpression orderingCol : orderByExpressions) {
+                orderingCol.write(output);
+            }
+            scan.setAttribute(TOPN, stream.toByteArray());
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        } finally {
+            try {
+                stream.close();
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+    
+    public static OrderedResultIterator deserializeFromScan(Scan scan, RegionScanner s) {
+        byte[] topN = scan.getAttribute(TOPN);
+        if (topN == null) {
+            return null;
+        }
+        ByteArrayInputStream stream = new ByteArrayInputStream(topN); // TODO: size?
+        try {
+            DataInputStream input = new DataInputStream(stream);
+            int thresholdBytes = WritableUtils.readVInt(input);
+            int limit = WritableUtils.readVInt(input);
+            int estimatedRowSize = WritableUtils.readVInt(input);
+            int size = WritableUtils.readVInt(input);
+            List<OrderByExpression> orderByExpressions = Lists.newArrayListWithExpectedSize(size);           
+            for (int i = 0; i < size; i++) {
+                OrderByExpression orderByExpression = new OrderByExpression();
+                orderByExpression.readFields(input);
+                orderByExpressions.add(orderByExpression);
+            }
+            ResultIterator inner = new RegionScannerResultIterator(s);
+            return new OrderedResultIterator(inner, orderByExpressions, thresholdBytes, limit >= 0 ? limit : null, estimatedRowSize);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        } finally {
+            try {
+                stream.close();
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    @Override
+    protected RegionScanner doPostScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final RegionScanner s) throws Throwable {
+        byte[] isScanQuery = scan.getAttribute(NON_AGGREGATE_QUERY);
+
+        if (isScanQuery == null || Bytes.compareTo(PDataType.FALSE_BYTES, isScanQuery) == 0) {
+            return s;
+        }
+        
+        final ScanProjector p = ScanProjector.deserializeProjectorFromScan(scan);
+        final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan);
+        final OrderedResultIterator iterator = deserializeFromScan(scan,s);
+        final ImmutableBytesWritable tenantId = ScanUtil.getTenantId(scan);
+        
+        RegionScanner innerScanner = s;
+        if (p != null || j != null) {
+            innerScanner = new HashJoinRegionScanner(s, p, j, tenantId, c.getEnvironment());
+        }
+        
+        if (iterator == null) {
+            return getWrappedScanner(c, innerScanner);
+        }
+        
+        return getTopNScanner(c, innerScanner, iterator, tenantId);
+    }
+    
+    /**
+     *  Return region scanner that does TopN.
+     *  We only need to call startRegionOperation and closeRegionOperation when
+     *  getting the first Tuple (which forces running through the entire region)
+     *  since after this everything is held in memory
+     */
+    private RegionScanner getTopNScanner(final ObserverContext<RegionCoprocessorEnvironment> c, final RegionScanner s, final OrderedResultIterator iterator, ImmutableBytesWritable tenantId) throws Throwable {
+        final Tuple firstTuple;
+        TenantCache tenantCache = GlobalCache.getTenantCache(c.getEnvironment(), tenantId);
+        long estSize = iterator.getEstimatedByteSize();
+        final MemoryChunk chunk = tenantCache.getMemoryManager().allocate(estSize);
+        final HRegion region = c.getEnvironment().getRegion();
+        region.startRegionOperation();
+        try {
+            // Once we return from the first call to next, we've run through and cached
+            // the topN rows, so we no longer need to start/stop a region operation.
+            firstTuple = iterator.next();
+            // Now that the topN are cached, we can resize based on the real size
+            long actualSize = iterator.getByteSize();
+            chunk.resize(actualSize);
+        } catch (Throwable t) {
+            ServerUtil.throwIOException(region.getRegionNameAsString(), t);
+            return null;
+        } finally {
+            region.closeRegionOperation();
+        }
+        return new BaseRegionScanner() {
+            private Tuple tuple = firstTuple;
+            
+            @Override
+            public boolean isFilterDone() {
+                return tuple == null; 
+            }
+
+            @Override
+            public HRegionInfo getRegionInfo() {
+                return s.getRegionInfo();
+            }
+
+            @Override
+            public boolean next(List<KeyValue> results) throws IOException {
+                try {
+                    if (isFilterDone()) {
+                        return false;
+                    }
+                    
+                    for (int i = 0; i < tuple.size(); i++) {
+                        results.add(tuple.getValue(i));
+                    }
+                    
+                    tuple = iterator.next();
+                    return !isFilterDone();
+                } catch (Throwable t) {
+                    ServerUtil.throwIOException(region.getRegionNameAsString(), t);
+                    return false;
+                }
+            }
+
+            @Override
+            public void close() throws IOException {
+                try {
+                    s.close();
+                } finally {
+                    chunk.close();                }
+            }
+        };
+    }
+        
+    /**
+     * Return wrapped scanner that catches unexpected exceptions (i.e. Phoenix bugs) and
+     * re-throws as DoNotRetryIOException to prevent needless retrying hanging the query
+     * for 30 seconds. Unfortunately, until HBASE-7481 gets fixed, there's no way to do
+     * the same from a custom filter.
+     */
+    private RegionScanner getWrappedScanner(final ObserverContext<RegionCoprocessorEnvironment> c, final RegionScanner s) {
+        return new RegionScanner() {
+
+            @Override
+            public boolean next(List<KeyValue> results) throws IOException {
+                try {
+                    return s.next(results);
+                } catch (Throwable t) {
+                    ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionNameAsString(), t);
+                    return false; // impossible
+                }
+            }
+
+            @Override
+            public boolean next(List<KeyValue> results, String metric) throws IOException {
+                try {
+                    return s.next(results, metric);
+                } catch (Throwable t) {
+                    ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionNameAsString(), t);
+                    return false; // impossible
+                }
+            }
+
+            @Override
+            public boolean next(List<KeyValue> result, int limit) throws IOException {
+                try {
+                    return s.next(result, limit);
+                } catch (Throwable t) {
+                    ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionNameAsString(), t);
+                    return false; // impossible
+                }
+            }
+
+            @Override
+            public boolean next(List<KeyValue> result, int limit, String metric) throws IOException {
+                try {
+                    return s.next(result, limit, metric);
+                } catch (Throwable t) {
+                    ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionNameAsString(), t);
+                    return false; // impossible
+                }
+            }
+
+            @Override
+            public void close() throws IOException {
+                s.close();
+            }
+
+            @Override
+            public HRegionInfo getRegionInfo() {
+                return s.getRegionInfo();
+            }
+
+            @Override
+            public boolean isFilterDone() {
+                return s.isFilterDone();
+            }
+
+            @Override
+            public boolean reseek(byte[] row) throws IOException {
+                return s.reseek(row);
+            }
+            
+            @Override
+            public long getMvccReadPoint() {
+                return s.getMvccReadPoint();
+            }
+
+            @Override
+            public boolean nextRaw(List<KeyValue> result, String metric) throws IOException {
+                try {
+                    return s.nextRaw(result, metric);
+                } catch (Throwable t) {
+                    ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionNameAsString(), t);
+                    return false; // impossible
+                }
+            }
+
+            @Override
+            public boolean nextRaw(List<KeyValue> result, int limit, String metric) throws IOException {
+                try {
+                    return s.nextRaw(result, limit, metric);
+                } catch (Throwable t) {
+                    ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionNameAsString(), t);
+                    return false; // impossible
+                }
+            }
+        };
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java b/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
new file mode 100644
index 0000000..ec0c06e
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import java.sql.SQLException;
+
+import org.apache.hadoop.hbase.coprocessor.BaseEndpointCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.cache.GlobalCache;
+import org.apache.phoenix.cache.TenantCache;
+
+
+
+
+/**
+ * 
+ * Server-side implementation of {@link ServerCachingProtocol}
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class ServerCachingEndpointImpl extends BaseEndpointCoprocessor implements ServerCachingProtocol {
+
+    @Override
+    public boolean addServerCache(byte[] tenantId, byte[] cacheId, ImmutableBytesWritable cachePtr, ServerCacheFactory cacheFactory) throws SQLException {
+        TenantCache tenantCache = GlobalCache.getTenantCache((RegionCoprocessorEnvironment)this.getEnvironment(), tenantId == null ? null : new ImmutableBytesPtr(tenantId));
+        tenantCache.addServerCache(new ImmutableBytesPtr(cacheId), cachePtr, cacheFactory);
+        return true;
+    }
+
+    @Override
+    public boolean removeServerCache(byte[] tenantId, byte[] cacheId) throws SQLException {
+        TenantCache tenantCache = GlobalCache.getTenantCache((RegionCoprocessorEnvironment)this.getEnvironment(), tenantId == null ? null : new ImmutableBytesPtr(tenantId));
+        tenantCache.removeServerCache(new ImmutableBytesPtr(cacheId));
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/coprocessor/ServerCachingProtocol.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/coprocessor/ServerCachingProtocol.java b/src/main/java/org/apache/phoenix/coprocessor/ServerCachingProtocol.java
new file mode 100644
index 0000000..abda834
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/coprocessor/ServerCachingProtocol.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import java.io.Closeable;
+import java.sql.SQLException;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
+import org.apache.hadoop.io.Writable;
+
+import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
+
+/**
+ * 
+ * EndPoint coprocessor to send a cache to a region server.
+ * Used for:
+ * a) hash joins, to send the smaller side of the join to each region server
+ * b) secondary indexes, to send the necessary meta data to each region server
+ * @author jtaylor
+ * @since 0.1
+ */
+public interface ServerCachingProtocol extends CoprocessorProtocol {
+    public static interface ServerCacheFactory extends Writable {
+        public Closeable newCache(ImmutableBytesWritable cachePtr, MemoryChunk chunk) throws SQLException;
+    }
+    /**
+     * Add the cache to the region server cache.  
+     * @param tenantId the tenantId or null if not applicable
+     * @param cacheId unique identifier of the cache
+     * @param cachePtr pointer to the byte array of the cache
+     * @param cacheFactory factory that converts from byte array to object representation on the server side
+     * @return true on success and otherwise throws
+     * @throws SQLException 
+     */
+    public boolean addServerCache(byte[] tenantId, byte[] cacheId, ImmutableBytesWritable cachePtr, ServerCacheFactory cacheFactory) throws SQLException;
+    /**
+     * Remove the cache from the region server cache.  Called upon completion of
+     * the operation when cache is no longer needed.
+     * @param tenantId the tenantId or null if not applicable
+     * @param cacheId unique identifier of the cache
+     * @return true on success and otherwise throws
+     * @throws SQLException 
+     */
+    public boolean removeServerCache(byte[] tenantId, byte[] cacheId) throws SQLException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
new file mode 100644
index 0000000..e36ce13
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -0,0 +1,437 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+import static org.apache.phoenix.query.QueryConstants.UNGROUPED_AGG_ROW_KEY;
+import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.WritableUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.phoenix.exception.ValueTypeIncompatibleException;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.ExpressionType;
+import org.apache.phoenix.expression.aggregator.Aggregator;
+import org.apache.phoenix.expression.aggregator.Aggregators;
+import org.apache.phoenix.expression.aggregator.ServerAggregators;
+import org.apache.phoenix.index.PhoenixIndexCodec;
+import org.apache.phoenix.join.HashJoinInfo;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.ConstraintViolationException;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.PRow;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableImpl;
+import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.ScanUtil;
+import org.apache.phoenix.util.SchemaUtil;
+
+
+/**
+ * Region observer that aggregates ungrouped rows(i.e. SQL query with aggregation function and no GROUP BY).
+ * 
+ * @author jtaylor
+ * @since 0.1
+ */
+public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver {
+    private static final Logger logger = LoggerFactory.getLogger(UngroupedAggregateRegionObserver.class);
+    // TODO: move all constants into a single class
+    public static final String UNGROUPED_AGG = "UngroupedAgg";
+    public static final String DELETE_AGG = "DeleteAgg";
+    public static final String UPSERT_SELECT_TABLE = "UpsertSelectTable";
+    public static final String UPSERT_SELECT_EXPRS = "UpsertSelectExprs";
+    public static final String DELETE_CQ = "DeleteCQ";
+    public static final String DELETE_CF = "DeleteCF";
+    public static final String EMPTY_CF = "EmptyCF";
+    
+    private static void commitBatch(HRegion region, List<Pair<Mutation,Integer>> mutations, byte[] indexUUID) throws IOException {
+        if (indexUUID != null) {
+            for (Pair<Mutation,Integer> pair : mutations) {
+                pair.getFirst().setAttribute(PhoenixIndexCodec.INDEX_UUID, indexUUID);
+            }
+        }
+        @SuppressWarnings("unchecked")
+        Pair<Mutation,Integer>[] mutationArray = new Pair[mutations.size()];
+        // TODO: should we use the one that is all or none?
+        region.batchMutate(mutations.toArray(mutationArray));
+    }
+    
+    public static void serializeIntoScan(Scan scan) {
+        scan.setAttribute(UNGROUPED_AGG, QueryConstants.TRUE);
+    }
+
+    @Override
+    protected RegionScanner doPostScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final RegionScanner s) throws IOException {
+        byte[] isUngroupedAgg = scan.getAttribute(UNGROUPED_AGG);
+        if (isUngroupedAgg == null) {
+            return s;
+        }
+        byte[] upgradeTo20 = scan.getAttribute(SchemaUtil.UPGRADE_TO_2_0);
+        /* Hack to upgrade data to new 2.0 format */
+        if (upgradeTo20 != null) {
+            int nColumns = Bytes.toInt(upgradeTo20);
+            SchemaUtil.upgradeTo2IfNecessary(c.getEnvironment().getRegion(), nColumns);
+            return new BaseRegionScanner() {
+                @Override
+                public HRegionInfo getRegionInfo() {
+                    return s.getRegionInfo();
+                }
+                @Override
+                public boolean isFilterDone() {
+                    return true;
+                }
+                @Override
+                public void close() throws IOException {
+                    s.close();
+                }
+                @Override
+                public boolean next(List<KeyValue> results) throws IOException {
+                    return false;
+                }
+            };
+        }
+        
+        final ScanProjector p = ScanProjector.deserializeProjectorFromScan(scan);
+        final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan);
+        RegionScanner theScanner = s;
+        if (p != null && j != null)  {
+            theScanner = new HashJoinRegionScanner(s, p, j, ScanUtil.getTenantId(scan), c.getEnvironment());
+        }
+        final RegionScanner innerScanner = theScanner;
+        
+        byte[] indexUUID = scan.getAttribute(PhoenixIndexCodec.INDEX_UUID);
+        PTable projectedTable = null;
+        List<Expression> selectExpressions = null;
+        byte[] upsertSelectTable = scan.getAttribute(UPSERT_SELECT_TABLE);
+        boolean isUpsert = false;
+        boolean isDelete = false;
+        byte[] deleteCQ = null;
+        byte[] deleteCF = null;
+        byte[][] values = null;
+        byte[] emptyCF = null;
+        ImmutableBytesWritable ptr = null;
+        if (upsertSelectTable != null) {
+            isUpsert = true;
+            projectedTable = deserializeTable(upsertSelectTable);
+            selectExpressions = deserializeExpressions(scan.getAttribute(UPSERT_SELECT_EXPRS));
+            values = new byte[projectedTable.getPKColumns().size()][];
+            ptr = new ImmutableBytesWritable();
+        } else {
+            byte[] isDeleteAgg = scan.getAttribute(DELETE_AGG);
+            isDelete = isDeleteAgg != null && Bytes.compareTo(PDataType.TRUE_BYTES, isDeleteAgg) == 0;
+            if (!isDelete) {
+                deleteCF = scan.getAttribute(DELETE_CF);
+                deleteCQ = scan.getAttribute(DELETE_CQ);
+            }
+            emptyCF = scan.getAttribute(EMPTY_CF);
+        }
+        
+        int batchSize = 0;
+        long ts = scan.getTimeRange().getMax();
+        HRegion region = c.getEnvironment().getRegion();
+        List<Pair<Mutation,Integer>> mutations = Collections.emptyList();
+        if (isDelete || isUpsert || (deleteCQ != null && deleteCF != null) || emptyCF != null) {
+            // TODO: size better
+            mutations = Lists.newArrayListWithExpectedSize(1024);
+            batchSize = c.getEnvironment().getConfiguration().getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
+        }
+        Aggregators aggregators = ServerAggregators.deserialize(
+                scan.getAttribute(GroupedAggregateRegionObserver.AGGREGATORS), c.getEnvironment().getConfiguration());
+        Aggregator[] rowAggregators = aggregators.getAggregators();
+        boolean hasMore;
+        boolean hasAny = false;
+        MultiKeyValueTuple result = new MultiKeyValueTuple();
+        if (logger.isInfoEnabled()) {
+        	logger.info("Starting ungrouped coprocessor scan " + scan);
+        }
+        long rowCount = 0;
+        MultiVersionConsistencyControl.setThreadReadPoint(innerScanner.getMvccReadPoint());
+        region.startRegionOperation();
+        try {
+            do {
+                List<KeyValue> results = new ArrayList<KeyValue>();
+                // Results are potentially returned even when the return value of s.next is false
+                // since this is an indication of whether or not there are more values after the
+                // ones returned
+                hasMore = innerScanner.nextRaw(results, null);
+                if (!results.isEmpty()) {
+                	rowCount++;
+                    result.setKeyValues(results);
+                    try {
+                        if (isDelete) {
+                            @SuppressWarnings("deprecation") // FIXME: Remove when unintentionally deprecated method is fixed (HBASE-7870).
+                            // FIXME: the version of the Delete constructor without the lock args was introduced
+                            // in 0.94.4, thus if we try to use it here we can no longer use the 0.94.2 version
+                            // of the client.
+                            Delete delete = new Delete(results.get(0).getRow(),ts,null);
+                            mutations.add(new Pair<Mutation,Integer>(delete,null));
+                        } else if (isUpsert) {
+                            Arrays.fill(values, null);
+                            int i = 0;
+                            List<PColumn> projectedColumns = projectedTable.getColumns();
+                            for (; i < projectedTable.getPKColumns().size(); i++) {
+                                Expression expression = selectExpressions.get(i);
+                                if (expression.evaluate(result, ptr)) {
+                                    values[i] = ptr.copyBytes();
+                                    // If ColumnModifier from expression in SELECT doesn't match the
+                                    // column being projected into then invert the bits.
+                                    if (expression.getColumnModifier() != projectedColumns.get(i).getColumnModifier()) {
+                                        ColumnModifier.SORT_DESC.apply(values[i], 0, values[i], 0, values[i].length);
+                                    }
+                                }
+                            }
+                            projectedTable.newKey(ptr, values);
+                            PRow row = projectedTable.newRow(ts, ptr);
+                            for (; i < projectedColumns.size(); i++) {
+                                Expression expression = selectExpressions.get(i);
+                                if (expression.evaluate(result, ptr)) {
+                                    PColumn column = projectedColumns.get(i);
+                                    byte[] bytes = ptr.copyBytes();
+                                    Object value = expression.getDataType().toObject(bytes, column.getColumnModifier());
+                                    // If ColumnModifier from expression in SELECT doesn't match the
+                                    // column being projected into then invert the bits.
+                                    if (expression.getColumnModifier() != column.getColumnModifier()) {
+                                        ColumnModifier.SORT_DESC.apply(bytes, 0, bytes, 0, bytes.length);
+                                    }
+                                    // We are guaranteed that the two column will have the same type.
+                                    if (!column.getDataType().isSizeCompatible(column.getDataType(),
+                                            value, bytes,
+                                            expression.getMaxLength(), column.getMaxLength(), 
+                                            expression.getScale(), column.getScale())) {
+                                        throw new ValueTypeIncompatibleException(column.getDataType(),
+                                                column.getMaxLength(), column.getScale());
+                                    }
+                                    bytes = column.getDataType().coerceBytes(bytes, value, expression.getDataType(),
+                                            expression.getMaxLength(), expression.getScale(), column.getMaxLength(), column.getScale());
+                                    row.setValue(column, bytes);
+                                }
+                            }
+                            for (Mutation mutation : row.toRowMutations()) {
+                                mutations.add(new Pair<Mutation,Integer>(mutation,null));
+                            }
+                        } else if (deleteCF != null && deleteCQ != null) {
+                            // No need to search for delete column, since we project only it
+                            // if no empty key value is being set
+                            if (emptyCF == null || result.getValue(deleteCF, deleteCQ) != null) {
+                                Delete delete = new Delete(results.get(0).getRow());
+                                delete.deleteColumns(deleteCF,  deleteCQ, ts);
+                                mutations.add(new Pair<Mutation,Integer>(delete,null));
+                            }
+                        }
+                        if (emptyCF != null) {
+                            /*
+                             * If we've specified an emptyCF, then we need to insert an empty
+                             * key value "retroactively" for any key value that is visible at
+                             * the timestamp that the DDL was issued. Key values that are not
+                             * visible at this timestamp will not ever be projected up to
+                             * scans past this timestamp, so don't need to be considered.
+                             * We insert one empty key value per row per timestamp.
+                             */
+                            Set<Long> timeStamps = Sets.newHashSetWithExpectedSize(results.size());
+                            for (KeyValue kv : results) {
+                                long kvts = kv.getTimestamp();
+                                if (!timeStamps.contains(kvts)) {
+                                    Put put = new Put(kv.getRow());
+                                    put.add(emptyCF, QueryConstants.EMPTY_COLUMN_BYTES, kvts, ByteUtil.EMPTY_BYTE_ARRAY);
+                                    mutations.add(new Pair<Mutation,Integer>(put,null));
+                                }
+                            }
+                        }
+                        // Commit in batches based on UPSERT_BATCH_SIZE_ATTRIB in config
+                        if (!mutations.isEmpty() && batchSize > 0 && mutations.size() % batchSize == 0) {
+                            commitBatch(region,mutations, indexUUID);
+                            mutations.clear();
+                        }
+                    } catch (ConstraintViolationException e) {
+                        // Log and ignore in count
+                        logger.error("Failed to create row in " + region.getRegionNameAsString() + " with values " + SchemaUtil.toString(values), e);
+                        continue;
+                    }
+                    aggregators.aggregate(rowAggregators, result);
+                    hasAny = true;
+                }
+            } while (hasMore);
+        } finally {
+            innerScanner.close();
+            region.closeRegionOperation();
+        }
+        
+        if (logger.isInfoEnabled()) {
+        	logger.info("Finished scanning " + rowCount + " rows for ungrouped coprocessor scan " + scan);
+        }
+
+        if (!mutations.isEmpty()) {
+            commitBatch(region,mutations, indexUUID);
+        }
+
+        final boolean hadAny = hasAny;
+        KeyValue keyValue = null;
+        if (hadAny) {
+            byte[] value = aggregators.toBytes(rowAggregators);
+            keyValue = KeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length);
+        }
+        final KeyValue aggKeyValue = keyValue;
+        
+        RegionScanner scanner = new BaseRegionScanner() {
+            private boolean done = !hadAny;
+
+            @Override
+            public HRegionInfo getRegionInfo() {
+                return innerScanner.getRegionInfo();
+            }
+
+            @Override
+            public boolean isFilterDone() {
+                return done;
+            }
+
+            @Override
+            public void close() throws IOException {
+                innerScanner.close();
+            }
+
+            @Override
+            public boolean next(List<KeyValue> results) throws IOException {
+                if (done) return false;
+                done = true;
+                results.add(aggKeyValue);
+                return false;
+            }
+        };
+        return scanner;
+    }
+    
+    private static PTable deserializeTable(byte[] b) {
+        ByteArrayInputStream stream = new ByteArrayInputStream(b);
+        try {
+            DataInputStream input = new DataInputStream(stream);
+            PTable table = new PTableImpl();
+            table.readFields(input);
+            return table;
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        } finally {
+            try {
+                stream.close();
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    private static List<Expression> deserializeExpressions(byte[] b) {
+        ByteArrayInputStream stream = new ByteArrayInputStream(b);
+        try {
+            DataInputStream input = new DataInputStream(stream);
+            int size = WritableUtils.readVInt(input);
+            List<Expression> selectExpressions = Lists.newArrayListWithExpectedSize(size);
+            for (int i = 0; i < size; i++) {
+                ExpressionType type = ExpressionType.values()[WritableUtils.readVInt(input)];
+                Expression selectExpression = type.newInstance();
+                selectExpression.readFields(input);
+                selectExpressions.add(selectExpression);
+            }
+            return selectExpressions;
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        } finally {
+            try {
+                stream.close();
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    public static byte[] serialize(PTable projectedTable) {
+        ByteArrayOutputStream stream = new ByteArrayOutputStream();
+        try {
+            DataOutputStream output = new DataOutputStream(stream);
+            projectedTable.write(output);
+            return stream.toByteArray();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        } finally {
+            try {
+                stream.close();
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    public static byte[] serialize(List<Expression> selectExpressions) {
+        ByteArrayOutputStream stream = new ByteArrayOutputStream();
+        try {
+            DataOutputStream output = new DataOutputStream(stream);
+            WritableUtils.writeVInt(output, selectExpressions.size());
+            for (int i = 0; i < selectExpressions.size(); i++) {
+                Expression expression = selectExpressions.get(i);
+                WritableUtils.writeVInt(output, ExpressionType.valueOf(expression).ordinal());
+                expression.write(output);
+            }
+            return stream.toByteArray();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        } finally {
+            try {
+                stream.close();
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/exception/PhoenixIOException.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/exception/PhoenixIOException.java b/src/main/java/org/apache/phoenix/exception/PhoenixIOException.java
new file mode 100644
index 0000000..d1b0b18
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/exception/PhoenixIOException.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.exception;
+
+import java.sql.SQLException;
+
+
+public class PhoenixIOException extends SQLException {
+    private static final long serialVersionUID = 1L;
+    private static SQLExceptionCode code = SQLExceptionCode.IO_EXCEPTION;
+
+    public PhoenixIOException(Throwable e) {
+        super(e.getMessage(), code.getSQLState(), code.getErrorCode(), e);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/exception/PhoenixParserException.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/exception/PhoenixParserException.java b/src/main/java/org/apache/phoenix/exception/PhoenixParserException.java
new file mode 100644
index 0000000..75a4091
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/exception/PhoenixParserException.java
@@ -0,0 +1,107 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.exception;
+
+import java.sql.SQLSyntaxErrorException;
+
+import org.antlr.runtime.*;
+
+import org.apache.phoenix.parse.PhoenixSQLParser;
+
+
+public class PhoenixParserException extends SQLSyntaxErrorException {
+    private static final long serialVersionUID = 1L;
+
+    public PhoenixParserException(Exception e, PhoenixSQLParser parser) {
+        super(new SQLExceptionInfo.Builder(getErrorCode(e)).setRootCause(e)
+                .setMessage(getErrorMessage(e, parser)).build().toString(),
+                getErrorCode(e).getSQLState(), getErrorCode(e).getErrorCode(), e);
+    }
+
+    public static String getLine(RecognitionException e) {
+        return Integer.toString(e.token.getLine());
+    }
+
+    public static String getColumn(RecognitionException e) {
+        return Integer.toString(e.token.getCharPositionInLine() + 1);
+    }
+
+    public static String getTokenLocation(RecognitionException e) {
+        return "line " + getLine(e) + ", column " + getColumn(e) + ".";
+    }
+
+    public static String getErrorMessage(Exception e, PhoenixSQLParser parser) {
+        String[] tokenNames = parser.getTokenNames();
+        String msg;
+        if (e instanceof MissingTokenException) {
+            MissingTokenException mte = (MissingTokenException)e;
+            String tokenName;
+            if (mte.expecting== Token.EOF) {
+                tokenName = "EOF";
+            } else {
+                tokenName = tokenNames[mte.expecting];
+            }
+            msg = "Missing \""+ tokenName +"\" at "+ getTokenLocation(mte);
+        } else if (e instanceof UnwantedTokenException) {
+            UnwantedTokenException ute = (UnwantedTokenException)e;
+            String tokenName;
+            if (ute.expecting== Token.EOF) {
+                tokenName = "EOF";
+            } else {
+                tokenName = tokenNames[ute.expecting];
+            }
+            msg = "Unexpected input. Expecting \"" + tokenName + "\", got \"" + ute.getUnexpectedToken().getText() 
+                    + "\" at " + getTokenLocation(ute);
+        } else if (e instanceof MismatchedTokenException) {
+            MismatchedTokenException mte = (MismatchedTokenException)e;
+            String tokenName;
+            if (mte.expecting== Token.EOF) {
+                tokenName = "EOF";
+            } else {
+                tokenName = tokenNames[mte.expecting];
+            }
+            msg = "Mismatched input. Expecting \"" + tokenName + "\", got \"" + mte.token.getText()
+                    + "\" at " + getTokenLocation(mte);
+        } else if (e instanceof RecognitionException){
+            RecognitionException re = (RecognitionException) e;
+            msg = "Encountered \"" + re.token.getText() + "\" at " + getTokenLocation(re);
+        } else if (e instanceof UnknownFunctionException) {
+            UnknownFunctionException ufe = (UnknownFunctionException) e;
+            msg = "Unknown function: \"" + ufe.getFuncName() + "\".";
+        } else {
+            msg = e.getMessage();
+        }
+        return msg;
+    }
+
+    public static SQLExceptionCode getErrorCode(Exception e) {
+        if (e instanceof MissingTokenException) {
+            return SQLExceptionCode.MISSING_TOKEN;
+        } else if (e instanceof UnwantedTokenException) {
+            return SQLExceptionCode.UNWANTED_TOKEN;
+        } else if (e instanceof MismatchedTokenException) {
+            return SQLExceptionCode.MISMATCHED_TOKEN;
+        } else if (e instanceof UnknownFunctionException) {
+            return SQLExceptionCode.UNKNOWN_FUNCTION;
+        } else {
+            return SQLExceptionCode.PARSER_ERROR;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
new file mode 100644
index 0000000..c6e664a
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -0,0 +1,195 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.exception;
+
+import org.apache.hbase.index.util.IndexManagementUtil;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.util.MetaDataUtil;
+
+
+/**
+ * Various SQLException Information. Including a vender-specific errorcode and a standard SQLState.
+ * 
+ * @author zhuang
+ * @since 1.0
+ */
+public enum SQLExceptionCode {
+
+    /**
+     * Connection Exception (errorcode 01, sqlstate 08)
+     */
+    IO_EXCEPTION(101, "08000", "Unexpected IO exception."),
+    MALFORMED_CONNECTION_URL(102, "08001", "Malformed connection url."),
+    CANNOT_ESTABLISH_CONNECTION(103, "08004", "Unable to establish connection."),
+    
+    /**
+     * Data Exception (errorcode 02, sqlstate 22)
+     */
+    ILLEGAL_DATA(201, "22000", "Illegal data."),
+    DIVIDE_BY_ZERO(202, "22012", "Divide by zero."),
+    TYPE_MISMATCH(203, "22005", "Type mismatch."),
+    VALUE_IN_UPSERT_NOT_CONSTANT(204, "22008", "Values in UPSERT must evaluate to a constant."),
+    MALFORMED_URL(205, "22009", "Malformed URL."),
+    DATA_INCOMPATIBLE_WITH_TYPE(206, "22003", "The value is outside the range for the data type."),
+    MISSING_CHAR_LENGTH(207, "22003", "Missing length for CHAR."),
+    NONPOSITIVE_CHAR_LENGTH(208, "22003", "CHAR or VARCHAR must have a positive length."),
+    DECIMAL_PRECISION_OUT_OF_RANGE(209, "22003", "Decimal precision outside of range. Should be within 1 and " + PDataType.MAX_PRECISION + "."),
+    MISSING_BINARY_LENGTH(210, "22003", "Missing length for BINARY."),
+    NONPOSITIVE_BINARY_LENGTH(211, "22003", "BINARY must have a positive length."),
+    SERVER_ARITHMETIC_ERROR(212, "22012", "Arithmetic error on server."),
+    VALUE_OUTSIDE_RANGE(213,"22003","Value outside range."),
+    VALUE_IN_LIST_NOT_CONSTANT(214, "22008", "Values in IN must evaluate to a constant."),
+    
+    /**
+     * Constraint Violation (errorcode 03, sqlstate 23)
+     */
+    CONCURRENT_TABLE_MUTATION(301, "23000", "Concurrent modification to table."),
+    CANNOT_INDEX_COLUMN_ON_TYPE(201, "23100", "The column cannot be index due to its type."),
+    
+    /**
+     * Invalid Cursor State (errorcode 04, sqlstate 24)
+     */
+    CURSOR_BEFORE_FIRST_ROW(401, "24015","Cursor before first row."),
+    CURSOR_PAST_LAST_ROW(401, "24016", "Cursor past last row."),
+    
+    /**
+     * Syntax Error or Access Rule Violation (errorcode 05, sqlstate 42)
+     */
+    AMBIGUOUS_TABLE(501, "42000", "Table name exists in more than one table schema and is used without being qualified."),
+    READ_ONLY_TABLE(505, "42000", "Table is read only."),
+    INDEX_MISSING_PK_COLUMNS(513, "42602", "Index table missing PK Columns."),
+    AMBIGUOUS_COLUMN(502, "42702", "Column reference ambiguous or duplicate names."),
+    COLUMN_NOT_FOUND(504, "42703", "Undefined column."),
+    COLUMN_EXIST_IN_DEF(503, "42711", "A duplicate column name was detected in the object definition or ALTER TABLE statement."),
+    CANNOT_DROP_PK(506, "42817", "Primary key column may not be dropped."),
+    CANNOT_CONVERT_TYPE(507, "42846", "Cannot convert type."),
+    UNSUPPORTED_ORDER_BY_QUERY(508, "42878", "ORDER BY only allowed for limited or aggregate queries"),
+    PRIMARY_KEY_MISSING(509, "42888", "The table does not have a primary key."),
+    PRIMARY_KEY_ALREADY_EXISTS(510, "42889", "The table already has a primary key."),
+    ORDER_BY_NOT_IN_SELECT_DISTINCT(511, "42890", "All ORDER BY expressions must appear in SELECT DISTINCT:"),
+    INVALID_PRIMARY_KEY_CONSTRAINT(512, "42891", "Invalid column reference in primary key constraint"),
+    
+    /** 
+     * HBase and Phoenix specific implementation defined sub-classes.
+     * Column family related exceptions.
+     * 
+     * For the following exceptions, use errorcode 10.
+     */
+    COLUMN_FAMILY_NOT_FOUND(1001, "42I01", "Undefined column family."),
+    PROPERTIES_FOR_FAMILY(1002, "42I02","Properties may not be defined for an unused family name."),
+    // Primary/row key related exceptions.
+    PRIMARY_KEY_WITH_FAMILY_NAME(1003, "42J01", "Primary key columns must not have a family name."),
+    PRIMARY_KEY_OUT_OF_ORDER(1004, "42J02", "Order of columns in primary key constraint must match the order in which they're declared."),
+    VARBINARY_IN_ROW_KEY(1005, "42J03", "The VARBINARY type can only be used as the last part of a multi-part row key."),
+    NOT_NULLABLE_COLUMN_IN_ROW_KEY(1006, "42J04", "Only nullable columns may be added to a multi-part row key."),
+    VARBINARY_LAST_PK(1022, "42J04", "Cannot add column to table when the last PK column is of type VARBINARY."),
+    NULLABLE_FIXED_WIDTH_LAST_PK(1023, "42J04", "Cannot add column to table when the last PK column is nullable and fixed width."),
+    // Key/value column related errors
+    KEY_VALUE_NOT_NULL(1007, "42K01", "A key/value column may not be declared as not null."),
+    // View related errors.
+    VIEW_WITH_TABLE_CONFIG(1008, "42L01", "A view may not contain table configuration properties."),
+    VIEW_WITH_PROPERTIES(1009, "42L02", "Properties may not be defined for a view."),
+    // Table related errors that are not in standard code.
+    CANNOT_MUTATE_TABLE(1010, "42M01", "Not allowed to mutate table."),
+    UNEXPECTED_MUTATION_CODE(1011, "42M02", "Unexpected mutation code."),
+    TABLE_UNDEFINED(1012, "42M03", "Table undefined."),
+    TABLE_ALREADY_EXIST(1013, "42M04", "Table already exists."),
+    // Index related errors
+    INDEX_ALREADY_EXIST(1023, "42N01", "Index already exists."),
+    CANNOT_MUTATE_INDEX(1024, "42N02", "Cannot mutate existing index."),
+    // Syntax error
+    TYPE_NOT_SUPPORTED_FOR_OPERATOR(1014, "42Y01", "The operator does not support the operand type."),
+    SCHEMA_NOT_FOUND(1015, "42Y07", "Schema not found."),
+    AGGREGATE_IN_GROUP_BY(1016, "42Y26", "Aggregate expressions may not be used in GROUP BY."),
+    AGGREGATE_IN_WHERE(1017, "42Y26", "Aggregate may not be used in WHERE."),
+    AGGREGATE_WITH_NOT_GROUP_BY_COLUMN(1018, "42Y27", "Aggregate may not contain columns not in GROUP BY."),
+    ONLY_AGGREGATE_IN_HAVING_CLAUSE(1019, "42Y26", "Only aggregate maybe used in the HAVING clause."),
+    UPSERT_COLUMN_NUMBERS_MISMATCH(1020, "42Y60", "Number of columns upserting must match number of values."),
+    // Table properties exception.
+    INVALID_BUCKET_NUM(1021, "42Y80", "Salt bucket numbers should be with 1 and 256."),
+    NO_SPLITS_ON_SALTED_TABLE(1022, "42Y81", "Should not specify split points on salted table with default row key order."),
+    SALT_ONLY_ON_CREATE_TABLE(1024, "42Y83", "Salt bucket number may only be specified when creating a table."),
+    SET_UNSUPPORTED_PROP_ON_ALTER_TABLE(1025, "42Y84", "Unsupported property set in ALTER TABLE command."),
+    CANNOT_ADD_NOT_NULLABLE_COLUMN(1030, "42Y84", "Only nullable columns may be added for a pre-existing table."),
+    NO_MUTABLE_INDEXES(1026, "42Y85", "Mutable secondary indexes are only supported for HBase version " + MetaDataUtil.decodeHBaseVersionAsString(PhoenixDatabaseMetaData.MUTABLE_SI_VERSION_THRESHOLD) + " and above."),
+    NO_DELETE_IF_IMMUTABLE_INDEX(1027, "42Y86", "Delete not allowed on a table with IMMUTABLE_ROW with non PK column in index."),
+    INVALID_INDEX_STATE_TRANSITION(1028, "42Y87", "Invalid index state transition."),
+    INVALID_MUTABLE_INDEX_CONFIG(1029, "42Y88", "Mutable secondary indexes must have the " 
+            + IndexManagementUtil.WAL_EDIT_CODEC_CLASS_KEY + " property set to " 
+            +  IndexManagementUtil.INDEX_WAL_EDIT_CODEC_CLASS_NAME + " in the hbase-sites.xml of every region server"),
+    
+    /** Parser error. (errorcode 06, sqlState 42P) */
+    PARSER_ERROR(601, "42P00", "Syntax error."),
+    MISSING_TOKEN(602, "42P00", "Syntax error."),
+    UNWANTED_TOKEN(603, "42P00", "Syntax error."),
+    MISMATCHED_TOKEN(603, "42P00", "Syntax error."),
+    UNKNOWN_FUNCTION(604, "42P00", "Syntax error."),
+    
+    /**
+     * Implementation defined class. Execution exceptions (errorcode 11, sqlstate XCL). 
+     */
+    RESULTSET_CLOSED(1101, "XCL01", "ResultSet is closed."),
+    GET_TABLE_REGIONS_FAIL(1102, "XCL02", "Cannot get all table regions"),
+    EXECUTE_QUERY_NOT_APPLICABLE(1103, "XCL03", "executeQuery may not be used."),
+    EXECUTE_UPDATE_NOT_APPLICABLE(1104, "XCL03", "executeUpdate may not be used."),
+    SPLIT_POINT_NOT_CONSTANT(1105, "XCL04", "Split points must be constants."),
+    
+    /**
+     * Implementation defined class. Phoenix internal error. (errorcode 20, sqlstate INT).
+     */
+    CANNOT_CALL_METHOD_ON_TYPE(2001, "INT01", "Cannot call method on the argument type."),
+    CLASS_NOT_UNWRAPPABLE(2002, "INT03", "Class not unwrappable"),
+    PARAM_INDEX_OUT_OF_BOUND(2003, "INT04", "Parameter position is out of range."),
+    PARAM_VALUE_UNBOUND(2004, "INT05", "Parameter value unbound"),
+    INTERRUPTED_EXCEPTION(2005, "INT07", "Interrupted exception."),
+    INCOMPATIBLE_CLIENT_SERVER_JAR(2006, "INT08", "Incompatible jars detected between client and server."),
+    OUTDATED_JARS(2007, "INT09", "Outdated jars."),
+    INDEX_METADATA_NOT_FOUND(2008, "INT10", "Unable to find cached index metadata. "),
+    ;
+
+    private final int errorCode;
+    private final String sqlState;
+    private final String message;
+
+    private SQLExceptionCode(int errorCode, String sqlState, String message) {
+        this.errorCode = errorCode;
+        this.sqlState = sqlState;
+        this.message = message;
+    }
+
+    public String getSQLState() {
+        return sqlState;
+    }
+
+    public String getMessage() {
+        return message;
+    }
+
+    public int getErrorCode() {
+        return errorCode;
+    }
+
+    @Override
+    public String toString() {
+        return "ERROR " + errorCode + " (" + sqlState + "): " + message;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/exception/SQLExceptionInfo.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/exception/SQLExceptionInfo.java b/src/main/java/org/apache/phoenix/exception/SQLExceptionInfo.java
new file mode 100644
index 0000000..a74bffc
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/exception/SQLExceptionInfo.java
@@ -0,0 +1,142 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.exception;
+
+import java.sql.SQLException;
+
+import org.apache.phoenix.util.SchemaUtil;
+
+
+/**
+ * Object serves as a closure of all coordinate information for SQLException messages.
+ * 
+ * @author zhuang
+ * @since 1.0
+ */
+public class SQLExceptionInfo {
+
+    /**
+     * Constants used in naming exception location.
+     */
+    public static final String SCHEMA_NAME = "schemaName";
+    public static final String TABLE_NAME = "tableName";
+    public static final String FAMILY_NAME = "familyName";
+    public static final String COLUMN_NAME = "columnName";
+
+    private final Throwable rootCause;
+    private final SQLExceptionCode code; // Should always have one.
+    private final String message;
+    private final String schemaName;
+    private final String tableName;
+    private final String familyName;
+    private final String columnName;
+
+    public static class Builder {
+
+        private Throwable rootCause;
+        private SQLExceptionCode code; // Should always have one.
+        private String message;
+        private String schemaName;
+        private String tableName;
+        private String familyName;
+        private String columnName;
+
+        public Builder(SQLExceptionCode code) {
+            this.code = code;
+        }
+
+        public Builder setRootCause(Throwable t) {
+            this.rootCause = t;
+            return this;
+        }
+
+        public Builder setMessage(String message) {
+            this.message = message;
+            return this;
+        }
+
+        public Builder setSchemaName(String schemaName) {
+            this.schemaName = schemaName;
+            return this;
+        }
+
+        public Builder setTableName(String tableName) {
+            this.tableName = tableName;
+            return this;
+        }
+
+        public Builder setFamilyName(String familyName) {
+            this.familyName = familyName;
+            return this;
+        }
+
+        public Builder setColumnName(String columnName) {
+            this.columnName = columnName;
+            return this;
+        }
+
+        public SQLExceptionInfo build() {
+            return new SQLExceptionInfo(this);
+        }
+
+        @Override
+        public String toString() {
+            return code.toString();
+        }
+    }
+
+    private SQLExceptionInfo(Builder builder) {
+        code = builder.code;
+        rootCause = builder.rootCause;
+        message = builder.message;
+        schemaName = builder.schemaName;
+        tableName = builder.tableName;
+        familyName = builder.familyName;
+        columnName = builder.columnName;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder builder = new StringBuilder(code.toString());
+        if (message != null) {
+            builder.append(" ").append(message);
+        }
+        String columnDisplayName = SchemaUtil.getMetaDataEntityName(schemaName, tableName, familyName, columnName);
+        if (columnName != null) {
+            builder.append(" ").append(COLUMN_NAME).append("=").append(columnDisplayName);
+        } else if (familyName != null) {
+            builder.append(" ").append(FAMILY_NAME).append("=").append(columnDisplayName);
+        } else if (tableName != null) {
+            builder.append(" ").append(TABLE_NAME).append("=").append(columnDisplayName);
+        } else if (schemaName != null) {
+            builder.append(" ").append(SCHEMA_NAME).append("=").append(columnDisplayName);
+        }
+        return builder.toString();
+    }
+
+    public SQLException buildException() {
+        if (rootCause != null) {
+            return new SQLException(toString(), code.getSQLState(), code.getErrorCode(), rootCause);
+        } else {
+            return new SQLException(toString(), code.getSQLState(), code.getErrorCode(), null);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/exception/UnknownFunctionException.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/exception/UnknownFunctionException.java b/src/main/java/org/apache/phoenix/exception/UnknownFunctionException.java
new file mode 100644
index 0000000..1f66992
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/exception/UnknownFunctionException.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.exception;
+
+/**
+ * Thrown by ParseNodeFactory when it could not identify a node as a valid function.
+ */
+public class UnknownFunctionException extends RuntimeException {
+    private static final long serialVersionUID = 1L;
+    private final String funcName;
+
+    public UnknownFunctionException(String funcName) {
+        super();
+        this.funcName = funcName;
+    }
+
+    public String getFuncName() {
+        return funcName;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/exception/ValueTypeIncompatibleException.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/exception/ValueTypeIncompatibleException.java b/src/main/java/org/apache/phoenix/exception/ValueTypeIncompatibleException.java
new file mode 100644
index 0000000..403c009
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/exception/ValueTypeIncompatibleException.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.exception;
+
+import org.apache.phoenix.schema.IllegalDataException;
+import org.apache.phoenix.schema.PDataType;
+
+
+public class ValueTypeIncompatibleException extends IllegalDataException {
+    private static final long serialVersionUID = 1L;
+    private static SQLExceptionCode code = SQLExceptionCode.DATA_INCOMPATIBLE_WITH_TYPE;
+
+    public ValueTypeIncompatibleException(PDataType type, Integer precision, Integer scale) {
+        super(new SQLExceptionInfo.Builder(code).setMessage(getTypeDisplayString(type, precision, scale))
+                .build().toString());
+    }
+
+    private static String getTypeDisplayString(PDataType type, Integer precision, Integer scale) {
+        return type.toString() + "(" + precision + "," + scale + ")";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/execute/AggregatePlan.java b/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
new file mode 100644
index 0000000..aba97e3
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
@@ -0,0 +1,178 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.execute;
+
+
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.hbase.util.Bytes;
+
+import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.OrderByExpression;
+import org.apache.phoenix.expression.RowKeyExpression;
+import org.apache.phoenix.expression.aggregator.Aggregators;
+import org.apache.phoenix.iterate.AggregatingResultIterator;
+import org.apache.phoenix.iterate.ConcatResultIterator;
+import org.apache.phoenix.iterate.DistinctAggregatingResultIterator;
+import org.apache.phoenix.iterate.FilterAggregatingResultIterator;
+import org.apache.phoenix.iterate.GroupedAggregatingResultIterator;
+import org.apache.phoenix.iterate.LimitingResultIterator;
+import org.apache.phoenix.iterate.MergeSortRowKeyResultIterator;
+import org.apache.phoenix.iterate.OrderedAggregatingResultIterator;
+import org.apache.phoenix.iterate.OrderedResultIterator;
+import org.apache.phoenix.iterate.ParallelIterators;
+import org.apache.phoenix.iterate.ParallelIterators.ParallelIteratorFactory;
+import org.apache.phoenix.iterate.PeekingResultIterator;
+import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.iterate.SpoolingResultIterator;
+import org.apache.phoenix.iterate.UngroupedAggregatingResultIterator;
+import org.apache.phoenix.parse.FilterableStatement;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.query.Scanner;
+import org.apache.phoenix.query.WrappedScanner;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.util.SchemaUtil;
+
+
+
+/**
+ *
+ * Query plan for aggregating queries
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class AggregatePlan extends BasicQueryPlan {
+    private final Aggregators aggregators;
+    private final Expression having;
+    private List<KeyRange> splits;
+
+    public AggregatePlan(
+            StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector,
+            Integer limit, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory, GroupBy groupBy,
+            Expression having) {
+        super(context, statement, table, projector, context.getBindManager().getParameterMetaData(), limit, orderBy, groupBy, parallelIteratorFactory);
+        this.having = having;
+        this.aggregators = context.getAggregationManager().getAggregators();
+    }
+
+    @Override
+    public List<KeyRange> getSplits() {
+        return splits;
+    }
+
+    private static class OrderingResultIteratorFactory implements ParallelIteratorFactory {
+        private final QueryServices services;
+        
+        public OrderingResultIteratorFactory(QueryServices services) {
+            this.services = services;
+        }
+        @Override
+        public PeekingResultIterator newIterator(ResultIterator scanner) throws SQLException {
+            Expression expression = RowKeyExpression.INSTANCE;
+            OrderByExpression orderByExpression = new OrderByExpression(expression, false, true);
+            int threshold = services.getProps().getInt(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
+            return new OrderedResultIterator(scanner, Collections.<OrderByExpression>singletonList(orderByExpression), threshold);
+        }
+    }
+
+    private static class WrappingResultIteratorFactory implements ParallelIteratorFactory {
+        private final ParallelIteratorFactory innerFactory;
+        private final ParallelIteratorFactory outerFactory;
+        
+        public WrappingResultIteratorFactory(ParallelIteratorFactory innerFactory, ParallelIteratorFactory outerFactory) {
+            this.innerFactory = innerFactory;
+            this.outerFactory = outerFactory;
+        }
+        @Override
+        public PeekingResultIterator newIterator(ResultIterator scanner) throws SQLException {
+            PeekingResultIterator iterator = innerFactory.newIterator(scanner);
+            return outerFactory.newIterator(iterator);
+        }
+    }
+
+    private ParallelIteratorFactory wrapParallelIteratorFactory () {
+        ParallelIteratorFactory innerFactory;
+        QueryServices services = context.getConnection().getQueryServices();
+        if (groupBy.isEmpty() || groupBy.isOrderPreserving()) {
+            innerFactory = new SpoolingResultIterator.SpoolingResultIteratorFactory(services);
+        } else {
+            innerFactory = new OrderingResultIteratorFactory(services);
+        }
+        if (parallelIteratorFactory == null) {
+            return innerFactory;
+        }
+        // wrap any existing parallelIteratorFactory
+        return new WrappingResultIteratorFactory(innerFactory, parallelIteratorFactory);
+    }
+    
+    @Override
+    protected Scanner newScanner(ConnectionQueryServices services) throws SQLException {
+        // Hack to set state on scan to make upgrade happen
+        int upgradeColumnCount = SchemaUtil.upgradeColumnCount(context.getConnection().getURL(),context.getConnection().getClientInfo());
+        if (upgradeColumnCount > 0) {
+            context.getScan().setAttribute(SchemaUtil.UPGRADE_TO_2_0, Bytes.toBytes(upgradeColumnCount));
+        }
+        if (groupBy.isEmpty()) {
+            UngroupedAggregateRegionObserver.serializeIntoScan(context.getScan());
+        }
+        ParallelIterators parallelIterators = new ParallelIterators(context, tableRef, statement, projection, groupBy, null, wrapParallelIteratorFactory());
+        splits = parallelIterators.getSplits();
+
+        AggregatingResultIterator aggResultIterator;
+        // No need to merge sort for ungrouped aggregation
+        if (groupBy.isEmpty()) {
+            aggResultIterator = new UngroupedAggregatingResultIterator(new ConcatResultIterator(parallelIterators), aggregators);
+        } else {
+            aggResultIterator = new GroupedAggregatingResultIterator(new MergeSortRowKeyResultIterator(parallelIterators), aggregators);
+        }
+
+        if (having != null) {
+            aggResultIterator = new FilterAggregatingResultIterator(aggResultIterator, having);
+        }
+        
+        if (statement.isDistinct() && statement.isAggregate()) { // Dedup on client if select distinct and aggregation
+            aggResultIterator = new DistinctAggregatingResultIterator(aggResultIterator, getProjector());
+        }
+
+        ResultIterator resultScanner = aggResultIterator;
+        if (orderBy.getOrderByExpressions().isEmpty()) {
+            if (limit != null) {
+                resultScanner = new LimitingResultIterator(aggResultIterator, limit);
+            }
+        } else {
+            int thresholdBytes = services.getProps().getInt(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, 
+                    QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
+            resultScanner = new OrderedAggregatingResultIterator(aggResultIterator, orderBy.getOrderByExpressions(), thresholdBytes, limit);
+        }
+        
+        return new WrappedScanner(resultScanner, getProjector());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/execute/BasicQueryPlan.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/execute/BasicQueryPlan.java b/src/main/java/org/apache/phoenix/execute/BasicQueryPlan.java
new file mode 100644
index 0000000..4ecba32
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/execute/BasicQueryPlan.java
@@ -0,0 +1,188 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.execute;
+
+import java.sql.ParameterMetaData;
+import java.sql.SQLException;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.compile.ExplainPlan;
+import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.ScanRanges;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.iterate.ParallelIterators.ParallelIteratorFactory;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.parse.FilterableStatement;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.DegenerateScanner;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.Scanner;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.util.ScanUtil;
+import org.apache.phoenix.util.SchemaUtil;
+
+
+
+/**
+ *
+ * Query plan that has no child plans
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public abstract class BasicQueryPlan implements QueryPlan {
+    protected final TableRef tableRef;
+    protected final StatementContext context;
+    protected final FilterableStatement statement;
+    protected final RowProjector projection;
+    protected final ParameterMetaData paramMetaData;
+    protected final Integer limit;
+    protected final OrderBy orderBy;
+    protected final GroupBy groupBy;
+    protected final ParallelIteratorFactory parallelIteratorFactory;
+
+    private Scanner scanner;
+
+    protected BasicQueryPlan(
+            StatementContext context, FilterableStatement statement, TableRef table,
+            RowProjector projection, ParameterMetaData paramMetaData, Integer limit, OrderBy orderBy,
+            GroupBy groupBy, ParallelIteratorFactory parallelIteratorFactory) {
+        this.context = context;
+        this.statement = statement;
+        this.tableRef = table;
+        this.projection = projection;
+        this.paramMetaData = paramMetaData;
+        this.limit = limit;
+        this.orderBy = orderBy;
+        this.groupBy = groupBy;
+        this.parallelIteratorFactory = parallelIteratorFactory;
+    }
+
+    @Override
+    public GroupBy getGroupBy() {
+        return groupBy;
+    }
+
+    
+    @Override
+    public OrderBy getOrderBy() {
+        return orderBy;
+    }
+
+    @Override
+    public TableRef getTableRef() {
+        return tableRef;
+    }
+
+    @Override
+    public Integer getLimit() {
+        return limit;
+    }
+
+    @Override
+    public RowProjector getProjector() {
+        return projection;
+    }
+
+    private ConnectionQueryServices getConnectionQueryServices(ConnectionQueryServices services) {
+        byte[] tenantId = context.getConnection().getTenantId();
+        // Get child services associated with tenantId of query.
+        ConnectionQueryServices childServices = tenantId == null ? services : services.getChildQueryServices(new ImmutableBytesWritable(tenantId));
+        return childServices;
+    }
+
+    protected void projectEmptyKeyValue() {
+        Scan scan = context.getScan();
+        PTable table = tableRef.getTable();
+        if (!projection.isProjectEmptyKeyValue() && table.getType() != PTableType.VIEW) {
+                scan.addColumn(SchemaUtil.getEmptyColumnFamily(table.getColumnFamilies()), QueryConstants.EMPTY_COLUMN_BYTES);
+        }
+    }
+//    /**
+//     * Sets up an id used to do round robin queue processing on the server
+//     * @param scan
+//     */
+//    private void setProducer(Scan scan) {
+//        byte[] producer = Bytes.toBytes(UUID.randomUUID().toString());
+//        scan.setAttribute(HBaseServer.CALL_QUEUE_PRODUCER_ATTRIB_NAME, producer);
+//    }
+
+    @Override
+    public final Scanner getScanner() throws SQLException {
+        if (scanner != null) {
+            return scanner;
+        }
+        Scan scan = context.getScan();
+        // Set producer on scan so HBase server does round robin processing
+        //setProducer(scan);
+        // Set the time range on the scan so we don't get back rows newer than when the statement was compiled
+        // The time stamp comes from the server at compile time when the meta data
+        // is resolved.
+        // TODO: include time range in explain plan?
+        PhoenixConnection connection = context.getConnection();
+        Long scn = connection.getSCN();
+        ScanUtil.setTimeRange(scan, scn == null ? context.getCurrentTime() : scn);
+        ScanUtil.setTenantId(scan, connection.getTenantId());
+        scanner = newScanner();
+        return scanner;
+    }
+
+    abstract protected Scanner newScanner(ConnectionQueryServices services) throws SQLException;
+
+    private Scanner newScanner() throws SQLException {
+        ConnectionQueryServices services = getConnectionQueryServices(context.getConnection().getQueryServices());
+        if (context.getScanRanges() == ScanRanges.NOTHING) { // is degenerate
+            scanner = new DegenerateScanner(tableRef, getProjector());
+        } else {
+            scanner = newScanner(services);
+        }
+        return scanner;
+    }
+
+    @Override
+    public ParameterMetaData getParameterMetaData() {
+        return paramMetaData;
+    }
+
+    @Override
+    public FilterableStatement getStatement() {
+        return statement;
+    }
+
+    @Override
+    public StatementContext getContext() {
+        return context;
+    }
+
+    @Override
+    public ExplainPlan getExplainPlan() throws SQLException {
+        if (scanner == null) {
+            scanner = newScanner();
+        }
+        return scanner.getExplainPlan();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/execute/CommitException.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/execute/CommitException.java b/src/main/java/org/apache/phoenix/execute/CommitException.java
new file mode 100644
index 0000000..4ca47ec
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/execute/CommitException.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.execute;
+
+import java.sql.SQLException;
+
+public class CommitException extends SQLException {
+    private static final long serialVersionUID = 1L;
+    private final MutationState uncommittedState;
+    private final MutationState committedState;
+
+    public CommitException(Exception e, MutationState uncommittedState, MutationState committedState) {
+        super(e);
+        this.uncommittedState = uncommittedState;
+        this.committedState = committedState;
+    }
+
+    public MutationState getUncommittedState() {
+        return uncommittedState;
+    }
+
+    public MutationState getCommittedState() {
+        return committedState;
+    }
+
+}