You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/01/27 23:16:00 UTC

[44/51] [partial] Initial commit of master branch from github

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillMap.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillMap.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillMap.java
new file mode 100644
index 0000000..d93d08c
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillMap.java
@@ -0,0 +1,494 @@
+package org.apache.phoenix.cache.aggcache;
+
+import java.io.IOException;
+import java.nio.BufferOverflowException;
+import java.nio.MappedByteBuffer;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.hbase.util.Bytes;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.hash.BloomFilter;
+import com.google.common.hash.Funnels;
+import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
+
+/**
+ * Class implements an active spilled partition serialized tuples are first written into an in-memory data structure
+ * that represents a single page. As the page fills up, it is written to the current spillFile or spill partition For
+ * fast tuple discovery, the class maintains a per page bloom-filter and never de-serializes elements. The element
+ * spilling employs an extentible hashing technique.
+ */
+public class SpillMap extends AbstractMap<ImmutableBytesPtr, byte[]> implements Iterable<byte[]> {
+
+    // Threshold is typically the page size
+    private final int thresholdBytes;
+    private final int pageInserts;
+    // Global directory depth
+    private int globalDepth;
+    private int curMapBufferIndex;
+    private SpillFile spillFile;
+    // Directory of hash buckets --> extendible hashing implementation
+    private MappedByteBufferMap[] directory;
+    private final SpillableGroupByCache.QueryCache cache;
+
+    public SpillMap(SpillFile file, int thresholdBytes, int estValueSize, SpillableGroupByCache.QueryCache cache)
+            throws IOException {
+        this.thresholdBytes = thresholdBytes - Bytes.SIZEOF_INT;
+        this.pageInserts = thresholdBytes / estValueSize;
+        this.spillFile = file;
+        this.cache = cache;
+
+        // Init the e-hashing directory structure
+        globalDepth = 1;
+        directory = new MappedByteBufferMap[(1 << globalDepth)];
+
+        for (int i = 0; i < directory.length; i++) {
+            // Create an empty bucket list
+            directory[i] = new MappedByteBufferMap(i, this.thresholdBytes, pageInserts, file);
+            directory[i].flushBuffer();
+        }
+        directory[0].pageIn();
+        curMapBufferIndex = 0;
+    }
+
+    // Get the directoy index for a specific key
+    private int getBucketIndex(ImmutableBytesPtr key) {
+        // Get key hash
+        int hashCode = key.hashCode();
+
+        // Mask all but globalDepth low n bits
+        return hashCode & ((1 << globalDepth) - 1);
+    }
+
+    // Function redistributes the elements in the current index
+    // to two new buckets, based on the bit at localDepth + 1 position.
+    // Optionally this function also doubles the directory to allow
+    // for bucket splits
+    private void redistribute(int index, ImmutableBytesPtr keyNew, byte[] valueNew) {
+        // Get the respective bucket
+        MappedByteBufferMap byteMap = directory[index];
+
+        // Get the actual bucket index, that the directory index points to
+        int mappedIdx = byteMap.pageIndex;
+
+        int localDepth = byteMap.localDepth;
+        ArrayList<Integer> buckets = Lists.newArrayList();
+        // Get all directory entries that point to the same bucket.
+        // TODO: can be made faster!
+        for (int i = 0; i < directory.length; i++) {
+            if (directory[i].pageIndex == mappedIdx) {
+                buckets.add(i);
+            }
+        }
+
+        // Assuming no directory doubling for now
+        // compute the two new bucket Ids for splitting
+        // SpillFile adds new files dynamically in case the directory points to pageIDs
+        // that exceed the size limit of a single file.
+
+        // TODO verify if some sort of de-fragmentation might be helpful
+        int tmpIndex = index ^ ((1 << localDepth));
+        int b1Index = Math.min(index, tmpIndex);
+        int b2Index = Math.max(index, tmpIndex);
+
+        // Create two new split buckets
+        MappedByteBufferMap b1 = new MappedByteBufferMap(b1Index, thresholdBytes, pageInserts, spillFile);
+        MappedByteBufferMap b2 = new MappedByteBufferMap(b2Index, thresholdBytes, pageInserts, spillFile);
+
+        // redistribute old elements into b1 and b2
+        for (Entry<ImmutableBytesPtr, byte[]> element : byteMap.pageMap.entrySet()) {
+            ImmutableBytesPtr key = element.getKey();
+            byte[] value = element.getValue();
+            // Only add key during redistribution if its not in the cache
+            // Otherwise this is an good point to reduce the number of spilled elements
+            if (!cache.isKeyContained(key)) {
+                // Re-distribute element onto the new 2 split buckets
+                if ((key.hashCode() & ((1 << localDepth))) != 0) {
+                    b2.addElement(null, key, value);
+                } else {
+                    b1.addElement(null, key, value);
+                }
+            }
+        }
+
+        // Clear and GC the old now redistributed bucket
+        byteMap.pageMap.clear();
+        byteMap = null;
+
+        // Increase local bucket depths
+        b1.localDepth = localDepth + 1;
+        b2.localDepth = localDepth + 1;
+        boolean doubleDir = false;
+
+        if (globalDepth < (localDepth + 1)) {
+            // Double directory structure and re-adjust pointers
+            doubleDir = true;
+
+            b2Index = doubleDirectory(b2Index, keyNew);
+        }
+
+        if (!doubleDir) {
+            // This is a bit more tricky, we have to cover scenarios where
+            // globalDepth - localDepth > 1
+            // Here even after bucket splitting, multiple directory entries point to
+            // the new buckets
+            for (int i = 0; i < buckets.size(); i++) {
+                if ((buckets.get(i) & (1 << (localDepth))) != 0) {
+                    directory[buckets.get(i)] = b2;
+                } else {
+                    directory[buckets.get(i)] = b1;
+                }
+            }
+        } else {
+            // Update the directory indexes in case of directory doubling
+            directory[b1Index] = b1;
+            directory[b2Index] = b2;
+        }
+    }
+
+    // Doubles the directory and readjusts pointers.
+    private int doubleDirectory(int b2Index, ImmutableBytesPtr keyNew) {
+        // Double the directory in size, second half points to original first half
+        int newDirSize = 1 << (globalDepth + 1);
+
+        // Ensure that the new directory size does not exceed size limits
+        Preconditions.checkArgument(newDirSize < Integer.MAX_VALUE);
+
+        // Double it!
+        MappedByteBufferMap[] newDirectory = new MappedByteBufferMap[newDirSize];
+        for (int i = 0; i < directory.length; i++) {
+            newDirectory[i] = directory[i];
+            newDirectory[i + directory.length] = directory[i];
+        }
+        directory = newDirectory;
+        newDirectory = null;
+
+        // Adjust the index for new split bucket, according to the directory double
+        b2Index = (keyNew.hashCode() & ((1 << globalDepth) - 1)) | (1 << globalDepth);
+
+        // Increment global depth
+        globalDepth++;
+
+        return b2Index;
+    }
+
+    /**
+     * Get a key from the spillable data structures. page is determined via hash partitioning, and a bloomFilter check
+     * is used to determine if its worth paging in the data.
+     */
+    @Override
+    public byte[] get(Object key) {
+        if (!(key instanceof ImmutableBytesPtr)) {
+            // TODO ... work on type safety
+        }
+        ImmutableBytesPtr ikey = (ImmutableBytesPtr)key;
+        byte[] value = null;
+
+        int bucketIndex = getBucketIndex(ikey);
+        MappedByteBufferMap byteMap = directory[bucketIndex];
+
+        // Decision based on bucket ID, not the directory ID due to the n:1 relationship
+        if (directory[curMapBufferIndex].pageIndex != byteMap.pageIndex) {
+            // map not paged in
+            MappedByteBufferMap curByteMap = directory[curMapBufferIndex];
+
+            // Use bloomFilter to check if key was spilled before
+            if (byteMap.containsKey(ikey.copyBytesIfNecessary())) {
+                // ensure consistency and flush current memory page to disk
+                // fflush current buffer
+                curByteMap.flushBuffer();
+                // page in new buffer
+                byteMap.pageIn();
+                // update index
+                curMapBufferIndex = bucketIndex;
+            }
+        }
+        // get KV from current map
+        value = byteMap.getPagedInElement(ikey);
+        return value;
+    }
+
+    // Similar as get(Object key) function, however
+    // always pages in page a key is spilled to, no bloom filter decision
+    private byte[] getAlways(ImmutableBytesPtr key) {
+        byte[] value = null;
+        int bucketIndex = getBucketIndex(key);
+        MappedByteBufferMap byteMap = directory[bucketIndex];
+
+        if (directory[curMapBufferIndex].pageIndex != byteMap.pageIndex) {
+            MappedByteBufferMap curByteMap = directory[curMapBufferIndex];
+            // ensure consistency and flush current memory page to disk
+            curByteMap.flushBuffer();
+
+            byteMap.pageIn();
+            curMapBufferIndex = bucketIndex;
+        }
+        // get KV from current queue
+        value = byteMap.getPagedInElement(key);
+        return value;
+    }
+
+    /**
+     * Spill a key First we discover if the key has been spilled before and load it into memory: #ref get() if it was
+     * loaded before just replace the old value in the memory page if it was not loaded before try to store it in the
+     * current page alternatively if not enough memory available, request new page.
+     */
+    @Override
+    public byte[] put(ImmutableBytesPtr key, byte[] value) {
+        boolean redistributed = false;
+        // page in element and replace if present
+        byte[] spilledValue = getAlways(key);
+
+        MappedByteBufferMap byteMap = directory[curMapBufferIndex];
+        int index = curMapBufferIndex;
+
+        // TODO: We split buckets until the new element fits onto a
+        // one of the new buckets. Might consider the use of an overflow
+        // bucket, especially in case the directory runs out of page IDs.
+        while (!byteMap.canFit(spilledValue, value)) {
+            // Element does not fit... Split the bucket!
+            redistribute(index, key, value);
+            redistributed = true;
+
+            index = getBucketIndex(key);
+            byteMap = directory[index];
+        }
+        // Ensure that all pages that were paged in during redistribution are flushed back out
+        // to disk to keep memory footprint small.
+        if (redistributed) {
+            for (int i = 0; i < directory.length; i++) {
+                if (directory[i].pageIndex != byteMap.pageIndex) {
+                    directory[i].flushBuffer();
+                }
+            }
+            // Ensure the page that receives the new key is in memory
+            spilledValue = getAlways(key);
+        }
+        byteMap.addElement(spilledValue, key, value);
+
+        return value;
+    }
+
+    /**
+     * Function returns the current spill file
+     */
+    public SpillFile getSpillFile() {
+        return spillFile;
+    }
+
+    /**
+     * This inner class represents the currently mapped file region. It uses a Map to represent the current in memory
+     * page for easy get() and update() calls on an individual key The class keeps track of the current size of the in
+     * memory page and handles flushing and paging in respectively
+     */
+    private static class MappedByteBufferMap {
+        private SpillFile spillFile;
+        private int pageIndex;
+        private final int thresholdBytes;
+        private long totalResultSize;
+        private boolean pagedIn;
+        private int localDepth;
+        // dirtyPage flag tracks if a paged in page was modified
+        // if not, no need to flush it back out to disk
+        private boolean dirtyPage;
+        // Use a map for in memory page representation
+        Map<ImmutableBytesPtr, byte[]> pageMap = Maps.newHashMap();
+        // Used to determine is an element was written to this page before or not
+        BloomFilter<byte[]> bFilter;
+
+        public MappedByteBufferMap(int id, int thresholdBytes, int pageInserts, SpillFile spillFile) {
+            this.spillFile = spillFile;
+            // size threshold of a page
+            this.thresholdBytes = thresholdBytes;
+            this.pageIndex = id;
+            pageMap.clear();
+            bFilter = BloomFilter.create(Funnels.byteArrayFunnel(), pageInserts);
+            pagedIn = true;
+            totalResultSize = 0;
+            localDepth = 1;
+            dirtyPage = true;
+        }
+
+        private boolean containsKey(byte[] key) {
+            return bFilter.mightContain(key);
+        }
+
+        private boolean canFit(byte[] curValue, byte[] newValue) {
+            if (thresholdBytes < newValue.length) {
+                // TODO resize page size if single element is too big,
+                // Can this ever happen?
+                throw new RuntimeException("page size too small to store a single KV element");
+            }
+
+            int resultSize = newValue.length + Bytes.SIZEOF_INT;
+            if (curValue != null) {
+                // Key existed before
+                // Ensure to compensate for potential larger byte[] for agg
+                resultSize = Math.max(0, resultSize - (curValue.length + Bytes.SIZEOF_INT));
+            }
+
+            if ((thresholdBytes - totalResultSize) <= (resultSize)) {
+                // KV does not fit
+                return false;
+            }
+            // KV fits
+            return true;
+        }
+
+        // Flush the current page to the memory mapped byte buffer
+        private void flushBuffer() throws BufferOverflowException {
+            if (pagedIn) {
+                MappedByteBuffer buffer;
+                // Only flush if page was changed
+                if (dirtyPage) {
+                    Collection<byte[]> values = pageMap.values();
+                    buffer = spillFile.getPage(pageIndex);
+                    buffer.clear();
+                    // number of elements
+                    buffer.putInt(values.size());
+                    for (byte[] value : values) {
+                        // element length
+                        buffer.putInt(value.length);
+                        // element
+                        buffer.put(value, 0, value.length);
+                    }
+                }
+                buffer = null;
+                // Reset page stats
+                pageMap.clear();
+                totalResultSize = 0;
+            }
+            pagedIn = false;
+            dirtyPage = false;
+        }
+
+        // load memory mapped region into a map for fast element access
+        private void pageIn() throws IndexOutOfBoundsException {
+            if (!pagedIn) {
+                // Map the memory region
+                MappedByteBuffer buffer = spillFile.getPage(pageIndex);
+                int numElements = buffer.getInt();
+                for (int i = 0; i < numElements; i++) {
+                    int kvSize = buffer.getInt();
+                    byte[] data = new byte[kvSize];
+                    buffer.get(data, 0, kvSize);
+                    try {
+                        pageMap.put(SpillManager.getKey(data), data);
+                        totalResultSize += (data.length + Bytes.SIZEOF_INT);
+                    } catch (IOException ioe) {
+                        // Error during key access on spilled resource
+                        // TODO rework error handling
+                        throw new RuntimeException(ioe);
+                    }
+                }
+                pagedIn = true;
+                dirtyPage = false;
+            }
+        }
+
+        /**
+         * Return a cache element currently page into memory Direct access via mapped page map
+         * 
+         * @param key
+         * @return
+         */
+        public byte[] getPagedInElement(ImmutableBytesPtr key) {
+            return pageMap.get(key);
+        }
+
+        /**
+         * Inserts / Replaces cache element in the currently loaded page. Direct access via mapped page map
+         * 
+         * @param key
+         * @param value
+         */
+        public void addElement(byte[] spilledValue, ImmutableBytesPtr key, byte[] value) {
+
+            // put Element into map
+            pageMap.put(key, value);
+            // Update bloom filter
+            bFilter.put(key.copyBytesIfNecessary());
+            // track current Map size to prevent Buffer overflows
+            if (spilledValue != null) {
+                // if previous key was present, just add the size difference
+                totalResultSize += Math.max(0, value.length - (spilledValue.length));
+            } else {
+                // Add new size information
+                totalResultSize += (value.length + Bytes.SIZEOF_INT);
+            }
+
+            dirtyPage = true;
+        }
+
+        /**
+         * Returns a value iterator over the pageMap
+         */
+        public Iterator<byte[]> getPageMapEntries() {
+            pageIn();
+            return pageMap.values().iterator();
+        }
+    }
+
+    /**
+     * Iterate over all spilled elements, including the ones that are currently paged into memory
+     */
+    @Override
+    public Iterator<byte[]> iterator() {
+        directory[curMapBufferIndex].flushBuffer();
+
+        return new Iterator<byte[]>() {
+            int pageIndex = 0;
+            Iterator<byte[]> entriesIter = directory[pageIndex].getPageMapEntries();
+            HashSet<Integer> dups = new HashSet<Integer>();
+
+            @Override
+            public boolean hasNext() {
+                if (!entriesIter.hasNext()) {
+                    boolean found = false;
+                    // Clear in memory map
+
+                    while (!found) {
+                        pageIndex++;
+                        if (pageIndex >= directory.length) { return false; }
+                        directory[pageIndex - 1].pageMap.clear();
+                        // get keys from all spilled pages
+                        if (!dups.contains(directory[pageIndex].pageIndex)) {
+                            dups.add(directory[pageIndex].pageIndex);
+                            entriesIter = directory[pageIndex].getPageMapEntries();
+                            if (entriesIter.hasNext()) {
+                                found = true;
+                            }
+                        }
+                    }
+                }
+                dups.add(directory[pageIndex].pageIndex);
+                return true;
+            }
+
+            @Override
+            public byte[] next() {
+                // get elements from in memory map first
+                return entriesIter.next();
+            }
+
+            @Override
+            public void remove() {
+                throw new IllegalAccessError("Iterator does not support removal operation");
+            }
+        };
+    }
+
+    // TODO implement this method to make the SpillMap a true Map implementation
+    @Override
+    public Set<java.util.Map.Entry<ImmutableBytesPtr, byte[]>> entrySet() {
+        throw new IllegalAccessError("entrySet is not supported for this type of cache");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java
new file mode 100644
index 0000000..41a4e65
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java
@@ -0,0 +1,361 @@
+package org.apache.phoenix.cache.aggcache;
+
+import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+import static org.apache.phoenix.query.QueryServices.GROUPBY_MAX_CACHE_SIZE_ATTRIB;
+import static org.apache.phoenix.query.QueryServices.GROUPBY_SPILL_FILES_ATTRIB;
+import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_GROUPBY_MAX_CACHE_MAX;
+import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_GROUPBY_SPILL_FILES;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.io.Closeables;
+import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.cache.GlobalCache;
+import org.apache.phoenix.cache.TenantCache;
+import org.apache.phoenix.cache.aggcache.SpillManager.CacheEntry;
+import org.apache.phoenix.coprocessor.BaseRegionScanner;
+import org.apache.phoenix.coprocessor.GroupByCache;
+import org.apache.phoenix.coprocessor.GroupedAggregateRegionObserver;
+import org.apache.phoenix.expression.aggregator.Aggregator;
+import org.apache.phoenix.expression.aggregator.ServerAggregators;
+import org.apache.phoenix.memory.InsufficientMemoryException;
+import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
+import org.apache.phoenix.util.KeyValueUtil;
+
+/**
+ * The main entry point is in GroupedAggregateRegionObserver. It instantiates a SpillableGroupByCache and invokes a
+ * get() method on it. There is no: "if key not exists -> put into map" case, since the cache is a Loading cache and
+ * therefore handles the put under the covers. I tried to implement the final cache element accesses (RegionScanner
+ * below) streaming, i.e. there is just an iterator on it and removed the existing result materialization.
+ * SpillableGroupByCache implements a LRU cache using a LinkedHashMap with access order. There is a configurable an
+ * upper and lower size limit in bytes which are used as follows to compute the initial cache size in number of
+ * elements: Max(lowerBoundElements, Min(upperBoundElements, estimatedCacheSize)). Once the number of cached elements
+ * exceeds this number, the cache size is increased by a factor of 1.5. This happens until the additional memory to grow
+ * the cache cannot be requested. At this point the Cache starts spilling elements. As long as no eviction happens no
+ * spillable data structures are allocated, this only happens as soon as the first element is evicted from the cache. We
+ * cannot really make any assumptions on which keys arrive at the map, but assume the LRU would at least cover the cases
+ * where some keys have a slight skew and they should stay memory resident. Once a key gets evicted, the spillManager is
+ * instantiated. It basically takes care of spilling an element to disk and does all the SERDE work. It pre-allocates a
+ * configurable number of SpillFiles (spill partition) which are memory mapped temp files. The SpillManager keeps a list
+ * of these and hash distributes the keys within this list. Once an element gets spilled, it is serialized and will only
+ * get deserialized again, when it is requested from the client, i.e. loaded back into the LRU cache. The SpillManager
+ * holds a single SpillMap object in memory for every spill partition (SpillFile). The SpillMap is an in memory Map
+ * representation of a single page of spilled serialized key/value pairs. To achieve fast key lookup the key is hash
+ * partitioned into random pages of the current spill file. The code implements an extendible hashing approach which
+ * dynamically adjusts the hash function, in order to adapt to growing number of storage pages and avoiding long chains
+ * of overflow buckets. For an excellent discussion of the algorithm please refer to the following online resource:
+ * http://db.inf.uni-tuebingen.de/files/teaching/ws1011/db2/db2-hash-indexes.pdf . For this, each SpillFile keeps a
+ * directory of pointers to Integer.MAX_VALUE 4K pages in memory, which allows each directory to address more pages than
+ * a single memory mapped temp file could theoretically store. In case directory doubling, requests a page index that
+ * exceeds the limits of the initial temp file limits, the implementation dynamically allocates additional temp files to
+ * the SpillFile. The directory starts with a global depth of 1 and therefore a directory size of 2 buckets. Only during
+ * bucket split and directory doubling more than one page is temporarily kept in memory until all elements have been
+ * redistributed. The current implementation conducts bucket splits as long as an element does not fit onto a page. No
+ * overflow chain is created, which might be an alternative. For get requests, each directory entry maintains a
+ * bloomFilter to prevent page-in operations in case an element has never been spilled before. The deserialization is
+ * only triggered when a key a loaded back into the LRU cache. The aggregators are returned from the LRU cache and the
+ * next value is computed. In case the key is not found on any page, the Loader create new aggregators for it.
+ */
+
+public class SpillableGroupByCache implements GroupByCache {
+
+    private static final Logger logger = LoggerFactory.getLogger(SpillableGroupByCache.class);
+
+    // Min size of 1st level main memory cache in bytes --> lower bound
+    private static final int SPGBY_CACHE_MIN_SIZE = 4096; // 4K
+
+    // TODO Generally better to use Collection API with generics instead of
+    // array types
+    private final LinkedHashMap<ImmutableBytesWritable, Aggregator[]> cache;
+    private SpillManager spillManager = null;
+    private int curNumCacheElements;
+    private final ServerAggregators aggregators;
+    private final RegionCoprocessorEnvironment env;
+    private final MemoryChunk chunk;
+
+    /*
+     * inner class that makes cache queryable for other classes that should not get the full instance. Queryable view of
+     * the cache
+     */
+    public class QueryCache {
+        public boolean isKeyContained(ImmutableBytesPtr key) {
+            return cache.containsKey(key);
+        }
+    }
+
+    /**
+     * Instantiates a Loading LRU Cache that stores key / aggregator[] tuples used for group by queries
+     * 
+     * @param estSize
+     * @param estValueSize
+     * @param aggs
+     * @param ctxt
+     */
+    public SpillableGroupByCache(final RegionCoprocessorEnvironment env, ImmutableBytesWritable tenantId,
+            ServerAggregators aggs, final int estSizeNum) {
+        curNumCacheElements = 0;
+        this.aggregators = aggs;
+        this.env = env;
+
+        final int estValueSize = aggregators.getEstimatedByteSize();
+        final TenantCache tenantCache = GlobalCache.getTenantCache(env, tenantId);
+
+        // Compute Map initial map
+        final Configuration conf = env.getConfiguration();
+        final long maxCacheSizeConf = conf.getLong(GROUPBY_MAX_CACHE_SIZE_ATTRIB, DEFAULT_GROUPBY_MAX_CACHE_MAX);
+        final int numSpillFilesConf = conf.getInt(GROUPBY_SPILL_FILES_ATTRIB, DEFAULT_GROUPBY_SPILL_FILES);
+
+        final int maxSizeNum = (int)(maxCacheSizeConf / estValueSize);
+        final int minSizeNum = (SPGBY_CACHE_MIN_SIZE / estValueSize);
+
+        // use upper and lower bounds for the cache size
+        final int maxCacheSize = Math.max(minSizeNum, Math.min(maxSizeNum, estSizeNum));
+        final int estSize = GroupedAggregateRegionObserver.sizeOfUnorderedGroupByMap(maxCacheSize, estValueSize);
+        try {
+            this.chunk = tenantCache.getMemoryManager().allocate(estSize);
+        } catch (InsufficientMemoryException ime) {
+            logger.error("Requested Map size exceeds memory limit, please decrease max size via config paramter: "
+                    + GROUPBY_MAX_CACHE_SIZE_ATTRIB);
+            throw ime;
+        }
+
+        if (logger.isDebugEnabled()) {
+            logger.debug("Instantiating LRU groupby cache of element size: " + maxCacheSize);
+        }
+
+        // LRU cache implemented as LinkedHashMap with access order
+        cache = new LinkedHashMap<ImmutableBytesWritable, Aggregator[]>(maxCacheSize, 0.75f, true) {
+            boolean spill = false;
+            int cacheSize = maxCacheSize;
+
+            @Override
+            protected boolean removeEldestEntry(Map.Entry<ImmutableBytesWritable, Aggregator[]> eldest) {
+                if (!spill && size() > cacheSize) { // increase allocation
+                    cacheSize *= 1.5f;
+                    int estSize = GroupedAggregateRegionObserver.sizeOfUnorderedGroupByMap(cacheSize, estValueSize);
+                    try {
+                        chunk.resize(estSize);
+                    } catch (InsufficientMemoryException im) {
+                        // Cannot extend Map anymore, start spilling
+                        spill = true;
+                    }
+                }
+
+                if (spill) {
+                    try {
+                        if (spillManager == null) {
+                            // Lazy instantiation of spillable data
+                            // structures
+                            //
+                            // Only create spill data structs if LRU
+                            // cache is too small
+                            spillManager = new SpillManager(numSpillFilesConf, aggregators, env.getConfiguration(),
+                                    new QueryCache());
+                        }
+                        spillManager.spill(eldest.getKey(), eldest.getValue());
+                        // keep track of elements in cache
+                        curNumCacheElements--;
+                    } catch (IOException ioe) {
+                        // Ensure that we always close and delete the temp files
+                        try {
+                            throw new RuntimeException(ioe);
+                        } finally {
+                            Closeables.closeQuietly(SpillableGroupByCache.this);
+                        }
+                    }
+                    return true;
+                }
+
+                return false;
+            }
+        };
+    }
+
+    /**
+     * Size function returns the estimate LRU cache size in bytes
+     */
+    @Override
+    public int size() {
+        return curNumCacheElements * aggregators.getEstimatedByteSize();
+    }
+
+    /**
+     * Extract an element from the Cache If element is not present in in-memory cache / or in spill files cache
+     * implements an implicit put() of a new key/value tuple and loads it into the cache
+     */
+    @Override
+    public Aggregator[] cache(ImmutableBytesWritable cacheKey) {
+        ImmutableBytesPtr key = new ImmutableBytesPtr(cacheKey);
+        Aggregator[] rowAggregators = cache.get(key);
+        if (rowAggregators == null) {
+            // If Aggregators not found for this distinct
+            // value, clone our original one (we need one
+            // per distinct value)
+            if (spillManager != null) {
+                // Spill manager present, check if key has been
+                // spilled before
+                try {
+                    rowAggregators = spillManager.loadEntry(key);
+                } catch (IOException ioe) {
+                    // Ensure that we always close and delete the temp files
+                    try {
+                        throw new RuntimeException(ioe);
+                    } finally {
+                        Closeables.closeQuietly(SpillableGroupByCache.this);
+                    }
+                }
+            }
+            if (rowAggregators == null) {
+                // No, key never spilled before, create a new tuple
+                rowAggregators = aggregators.newAggregators(env.getConfiguration());
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Adding new aggregate bucket for row key "
+                            + Bytes.toStringBinary(key.get(), key.getOffset(), key.getLength()));
+                }
+            }
+            cache.put(key, rowAggregators);
+            // keep track of elements in cache
+            curNumCacheElements++;
+        }
+        return rowAggregators;
+    }
+
+    /**
+     * Iterator over the cache and the spilled data structures by returning CacheEntries. CacheEntries are either
+     * extracted from the LRU cache or from the spillable data structures.The key/value tuples are returned in
+     * non-deterministic order.
+     */
+    private final class EntryIterator implements Iterator<Map.Entry<ImmutableBytesWritable, Aggregator[]>> {
+        final Iterator<Map.Entry<ImmutableBytesWritable, Aggregator[]>> cacheIter;
+        final Iterator<byte[]> spilledCacheIter;
+
+        private EntryIterator() {
+            cacheIter = cache.entrySet().iterator();
+            if (spillManager != null) {
+                spilledCacheIter = spillManager.newDataIterator();
+            } else {
+                spilledCacheIter = null;
+            }
+        }
+
+        @Override
+        public boolean hasNext() {
+            return cacheIter.hasNext();
+        }
+
+        @Override
+        public Map.Entry<ImmutableBytesWritable, Aggregator[]> next() {
+            if (spilledCacheIter != null && spilledCacheIter.hasNext()) {
+                try {
+                    byte[] value = spilledCacheIter.next();
+                    // Deserialize into a CacheEntry
+                    Map.Entry<ImmutableBytesWritable, Aggregator[]> spilledEntry = spillManager.toCacheEntry(value);
+
+                    boolean notFound = false;
+                    // check against map and return only if not present
+                    while (cache.containsKey(spilledEntry.getKey())) {
+                        // LRU Cache entries always take precedence,
+                        // since they are more up to date
+                        if (spilledCacheIter.hasNext()) {
+                            value = spilledCacheIter.next();
+                            spilledEntry = spillManager.toCacheEntry(value);
+                        } else {
+                            notFound = true;
+                            break;
+                        }
+                    }
+                    if (!notFound) {
+                        // Return a spilled entry, this only happens if the
+                        // entry was not
+                        // found in the LRU cache
+                        return spilledEntry;
+                    }
+                } catch (IOException ioe) {
+                    // TODO rework error handling
+                    throw new RuntimeException(ioe);
+                }
+            }
+            // Spilled elements exhausted
+            // Finally return all elements from LRU cache
+            Map.Entry<ImmutableBytesWritable, Aggregator[]> entry = cacheIter.next();
+            return new CacheEntry<ImmutableBytesWritable>(entry.getKey(), entry.getValue());
+        }
+
+        /**
+         * Remove??? Denied!!!
+         */
+        @Override
+        public void remove() {
+            throw new IllegalAccessError("Remove is not supported for this type of iterator");
+        }
+    }
+
+    /**
+     * Closes cache and releases spill resources
+     * 
+     * @throws IOException
+     */
+    @Override
+    public void close() throws IOException {
+        // Close spillable resources
+        Closeables.closeQuietly(spillManager);
+        Closeables.closeQuietly(chunk);
+    }
+
+    @Override
+    public RegionScanner getScanner(final RegionScanner s) {
+        final Iterator<Entry<ImmutableBytesWritable, Aggregator[]>> cacheIter = new EntryIterator();
+
+        // scanner using the spillable implementation
+        return new BaseRegionScanner() {
+            @Override
+            public HRegionInfo getRegionInfo() {
+                return s.getRegionInfo();
+            }
+
+            @Override
+            public void close() throws IOException {
+                try {
+                    s.close();
+                } finally {
+                    // Always close gbCache and swallow possible Exceptions
+                    Closeables.closeQuietly(SpillableGroupByCache.this);
+                }
+            }
+
+            @Override
+            public boolean next(List<KeyValue> results) throws IOException {
+                if (!cacheIter.hasNext()) { return false; }
+                Map.Entry<ImmutableBytesWritable, Aggregator[]> ce = cacheIter.next();
+                ImmutableBytesWritable key = ce.getKey();
+                Aggregator[] aggs = ce.getValue();
+                byte[] value = aggregators.toBytes(aggs);
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Adding new distinct group: "
+                            + Bytes.toStringBinary(key.get(), key.getOffset(), key.getLength()) + " with aggregators "
+                            + aggs.toString() + " value = " + Bytes.toStringBinary(value));
+                }
+                results.add(KeyValueUtil.newKeyValue(key.get(), key.getOffset(), key.getLength(), SINGLE_COLUMN_FAMILY,
+                        SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length));
+                return cacheIter.hasNext();
+            }
+        };
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/client/ClientKeyValue.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/client/ClientKeyValue.java b/phoenix-core/src/main/java/org/apache/phoenix/client/ClientKeyValue.java
new file mode 100644
index 0000000..3288c4d
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/client/ClientKeyValue.java
@@ -0,0 +1,503 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.client;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * {@link KeyValue} that should only be used from the client side. Enables clients to be more
+ * flexible with the byte arrays they use when building a {@link KeyValue}, but still wire
+ * compatible.
+ * <p>
+ * All<tt> byte[]</tt> (or {@link ImmutableBytesWritable}) passed into the constructor are only ever
+ * read once - when writing <tt>this</tt> onto the wire. They are never copied into another array or
+ * reused. This has the advantage of being much more efficient than the usual {@link KeyValue}
+ * <p>
+ * The down side is that we no longer can support some of the usual methods like
+ * {@link #getBuffer()} or {@link #getKey()} since its is backed with multiple <tt>byte[]</tt> and
+ * <i>should only be used by the client to <b>send</b> information</i>
+ * <p>
+ * <b>WARNING:</b> should only be used by advanced users who know how to construct their own
+ * KeyValues
+ */
+public class ClientKeyValue extends KeyValue {
+
+  private static ImmutableBytesWritable NULL = new ImmutableBytesWritable(new byte[0]);
+  private ImmutableBytesWritable row;
+  private ImmutableBytesWritable family;
+  private ImmutableBytesWritable qualifier;
+  private Type type;
+  private long ts;
+  private ImmutableBytesWritable value;
+
+  /**
+   * @param row must not be <tt>null</tt>
+   * @param type must not be <tt>null</tt>
+   */
+  public ClientKeyValue(ImmutableBytesWritable row, ImmutableBytesWritable family,
+      ImmutableBytesWritable qualifier, long ts, Type type, ImmutableBytesWritable value) {
+    this.row = row;
+    this.family = family == null ? NULL : family;
+    this.qualifier = qualifier == null ? NULL : qualifier;
+    this.type = type;
+    this.ts = ts;
+    this.value = value == null ? NULL : value;
+  }
+
+  /**
+   * Convenience constructor that just wraps all the bytes in {@link ImmutableBytesWritable}
+   */
+  public ClientKeyValue(byte[] row, byte[] family, byte[] qualifier, long ts, Type t, byte[] value) {
+    this(wrap(row), wrap(family), wrap(qualifier), ts, t, wrap(value));
+  }
+
+  /**
+   * Convenience constructor that just wraps all the bytes in {@link ImmutableBytesWritable}
+   */
+  public ClientKeyValue(byte[] row, byte[] family, byte[] qualifier, long ts, Type t) {
+    this(wrap(row), wrap(family), wrap(qualifier), ts, t, null);
+  }
+
+  private static ImmutableBytesWritable wrap(byte[] b) {
+    return b == null ? NULL : new ImmutableBytesWritable(b);
+  }
+
+  @Override
+  public KeyValue clone() {
+    return new ClientKeyValue(copy(row), copy(family), copy(qualifier), ts, type, copy(value));
+  }
+
+  private ImmutableBytesWritable copy(ImmutableBytesWritable bytes) {
+    return new ImmutableBytesWritable(bytes.copyBytes());
+  }
+
+  private static byte[] copyIfNecessary(ImmutableBytesWritable bytes) {
+    byte[] b = bytes.get();
+    if (bytes.getLength() == b.length && bytes.getOffset() == 0) {
+      return b;
+    }
+    return Arrays.copyOfRange(b, bytes.getOffset(), bytes.getOffset() + bytes.getLength());
+  }
+
+  @Override
+  public KeyValue shallowCopy() {
+    return new ClientKeyValue(row, family, qualifier, ts, type, value);
+  }
+
+  @Override
+  public int getValueOffset() {
+    return value.getOffset();
+  }
+
+  @Override
+  public int getValueLength() {
+    return value.getLength();
+  }
+
+  @Override
+  public int getRowOffset() {
+    return row.getOffset();
+  }
+
+  @Override
+  public short getRowLength() {
+    return (short) row.getLength();
+  }
+
+  @Override
+  public int getFamilyOffset() {
+    return family.getOffset();
+  }
+
+  @Override
+  public byte getFamilyLength() {
+    return (byte) family.getLength();
+  }
+
+  @Override
+  public byte getFamilyLength(int foffset) {
+    return this.getFamilyLength();
+  }
+
+  @Override
+  public int getQualifierOffset() {
+    return qualifier.getOffset();
+  }
+
+  @Override
+  public int getQualifierLength() {
+    return qualifier.getLength();
+  }
+
+  @Override
+  public int getQualifierLength(int rlength, int flength) {
+    return this.getQualifierLength();
+  }
+
+  @Override
+  public int getTotalColumnLength(int rlength, int foffset) {
+    return this.getFamilyLength() + this.getQualifierLength();
+  }
+
+  @Override
+  public int getTotalColumnLength() {
+    return qualifier.getLength() + family.getLength();
+  }
+
+  @Override
+  public byte[] getValue() {
+    return copyIfNecessary(value);
+  }
+
+  @Override
+  public byte[] getRow() {
+    return copyIfNecessary(row);
+  }
+
+  @Override
+  public long getTimestamp() {
+    return ts;
+  }
+
+  @Override
+  public byte[] getFamily() {
+    return copyIfNecessary(family);
+  }
+
+  @Override
+  public byte[] getQualifier() {
+    return copyIfNecessary(qualifier);
+  }
+
+  @Override
+  public byte getType() {
+    return this.type.getCode();
+  }
+
+  @Override
+  public boolean matchingFamily(byte[] family) {
+    if (family == null) {
+      if (this.family.getLength() == 0) {
+        return true;
+      }
+      return false;
+    }
+    return matchingFamily(family, 0, family.length);
+  }
+
+  @Override
+  public boolean matchingFamily(byte[] family, int offset, int length) {
+    if (family == null) {
+      if (this.family.getLength() == 0) {
+        return true;
+      }
+      return false;
+    }
+    return matches(family, offset, length, this.family);
+  }
+
+  @Override
+  public boolean matchingFamily(KeyValue other) {
+    if(other == null) {
+      return false;
+    }
+    if(other instanceof ClientKeyValue) {
+      ClientKeyValue kv = (ClientKeyValue)other;
+      return this.family.compareTo(kv.family) == 0;
+    }
+    return matchingFamily(other.getBuffer(), other.getFamilyOffset(), other.getFamilyLength());
+  }
+
+  private boolean matches(byte[] b, int offset, int length, ImmutableBytesWritable bytes) {
+    return Bytes.equals(b, offset, length, bytes.get(), bytes.getOffset(), bytes.getLength());
+  }
+
+  @Override
+  public boolean matchingQualifier(byte[] qualifier) {
+    if (qualifier == null) {
+      if (this.qualifier.getLength() == 0) {
+        return true;
+      }
+      return false;
+    }
+    return matchingQualifier(qualifier, 0, qualifier.length);
+  }
+
+  @Override
+  public boolean matchingQualifier(byte[] qualifier, int offset, int length) {
+    if (qualifier == null) {
+      if (this.qualifier.getLength() == 0) {
+        return true;
+      }
+      return false;
+    }
+    return matches(qualifier, offset, length, this.qualifier);
+  }
+
+  @Override
+  public boolean matchingQualifier(KeyValue other) {
+    if (other == null) {
+      return false;
+    }
+    if (other instanceof ClientKeyValue) {
+      ClientKeyValue kv = (ClientKeyValue) other;
+      return this.row.compareTo(kv.row) == 0;
+    }
+    return matchingQualifier(other.getBuffer(), other.getQualifierOffset(),
+      other.getQualifierLength());
+  }
+
+  @Override
+  public boolean matchingRow(byte[] row){
+    if (row == null) {
+      return false;
+    }
+    return matches(row, 0, row.length, this.row);
+  }
+
+  @Override
+  public boolean matchingRow(byte[] row, int offset, int length) {
+    if (row == null) {
+      return false;
+    }
+    return matches(row, offset, length, this.row);
+  }
+
+  @Override
+  public boolean matchingRow(KeyValue other) {
+    return matchingRow(other.getBuffer(), other.getRowOffset(), other.getRowLength());
+  }
+
+  @Override
+  public boolean matchingColumnNoDelimiter(byte[] column) {
+    // match both the family and qualifier
+    if (matchingFamily(column, 0, this.family.getLength())) {
+      return matchingQualifier(column, family.getLength(), column.length - family.getLength());
+    }
+    return false;
+  }
+
+  @Override
+  public boolean matchingColumn(byte[] family, byte[] qualifier) {
+    return this.matchingFamily(family) && matchingQualifier(qualifier);
+  }
+
+  @Override
+  public boolean nonNullRowAndColumn() {
+    return (this.row != null && row.getLength() > 0) && !isEmptyColumn();
+  }
+
+  @Override
+  public boolean isEmptyColumn() {
+    return this.qualifier != null && this.qualifier.getLength() > 0;
+  }
+
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    // we need to simulate the keyvalue writing, but actually step through each buffer.
+    //start with keylength
+    long longkeylength = KeyValue.KEY_INFRASTRUCTURE_SIZE + row.getLength() + family.getLength()
+        + qualifier.getLength();
+    if (longkeylength > Integer.MAX_VALUE) {
+      throw new IllegalArgumentException("keylength " + longkeylength + " > " + Integer.MAX_VALUE);
+    }
+    // need to figure out the max length before we start
+    int length = this.getLength();
+    out.writeInt(length);
+
+    // write the actual data
+    int keylength = (int) longkeylength;
+    out.writeInt(keylength);
+    int vlength = value == null ? 0 : value.getLength();
+    out.writeInt(vlength);
+    out.writeShort((short) (row.getLength() & 0x0000ffff));
+    out.write(this.row.get(), this.row.getOffset(), this.row.getLength());
+    out.writeByte((byte) (family.getLength() & 0x0000ff));
+    if (family.getLength() != 0) {
+      out.write(this.family.get(), this.family.getOffset(), this.family.getLength());
+    }
+    if (qualifier != NULL) {
+      out.write(this.qualifier.get(), this.qualifier.getOffset(), this.qualifier.getLength());
+    }
+    out.writeLong(ts);
+    out.writeByte(this.type.getCode());
+    if (this.value != NULL) {
+      out.write(this.value.get(), this.value.getOffset(), this.value.getLength());
+    }
+  }
+
+  @Override
+  public int getLength() {
+    return KEYVALUE_INFRASTRUCTURE_SIZE + KeyValue.ROW_LENGTH_SIZE + row.getLength()
+        + KeyValue.FAMILY_LENGTH_SIZE + family.getLength() + qualifier.getLength()
+        + KeyValue.TIMESTAMP_SIZE + KeyValue.TYPE_SIZE + value.getLength();
+  }
+
+  @Override
+  public String toString() {
+    return keyToString() + "/vlen=" + getValueLength() + "/ts=" + getMemstoreTS();
+  }
+
+  private String keyToString() {
+    String row = Bytes.toStringBinary(this.row.get(), this.row.getOffset(), this.row.getLength());
+    String family = this.family.getLength() == 0 ? "" : Bytes.toStringBinary(this.family.get(),
+      this.family.getOffset(), this.family.getLength());
+    String qualifier = this.qualifier.getLength() == 0 ? "" : Bytes.toStringBinary(
+      this.qualifier.get(), this.qualifier.getOffset(), this.qualifier.getLength());
+    String timestampStr = Long.toString(ts);
+    byte type = this.type.getCode();
+    return row + "/" + family + (family != null && family.length() > 0 ? ":" : "") + qualifier
+        + "/" + timestampStr + "/" + Type.codeToType(type);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) return true;
+    if (!super.equals(obj)) return false;
+    if (getClass() != obj.getClass()) return false;
+    ClientKeyValue other = (ClientKeyValue) obj;
+    if (family == null) {
+      if (other.family != null) return false;
+    } else if (!family.equals(other.family)) return false;
+    if (qualifier == null) {
+      if (other.qualifier != null) return false;
+    } else if (!qualifier.equals(other.qualifier)) return false;
+    if (row == null) {
+      if (other.row != null) return false;
+    } else if (!row.equals(other.row)) return false;
+    if (ts != other.ts) return false;
+    if (type != other.type) return false;
+    if (value == null) {
+      if (other.value != null) return false;
+    } else if (!value.equals(other.value)) return false;
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    // TODO do we need to keep the same hashcode logic as KeyValue? Everywhere else we don't keep
+    // them by reference, but presumably clients might hash them.
+    final int prime = 31;
+    int result = super.hashCode();
+    result = prime * result + family.hashCode();
+    result = prime * result + qualifier.hashCode();
+    result = prime * result + row.hashCode();
+    result = prime * result + (int) (ts ^ (ts >>> 32));
+    result = prime * result + type.hashCode();
+    result = prime * result + value.hashCode();
+    return result;
+  }
+
+  @Override
+  public void readFields(int length, DataInput in) throws IOException {
+    throw new UnsupportedOperationException(ClientKeyValue.class.getSimpleName()
+        + " should not be used for server-side operations");
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    throw new UnsupportedOperationException(ClientKeyValue.class.getSimpleName()
+        + " should not be used for server-side operations");
+  }
+
+  @Override
+  public int getKeyOffset() {
+    return 0;
+  }
+
+
+  @Override
+  public int getFamilyOffset(int rlength) {
+    return 0;
+  }
+
+  @Override
+  public int getQualifierOffset(int foffset) {
+    return 0;
+  }
+
+  @Override
+  public int getTimestampOffset() {
+    return 0;
+  }
+
+  @Override
+  public int getTimestampOffset(int keylength) {
+    return 0;
+  }
+
+  @Override
+  public int getOffset() {
+    return 0;
+  }
+
+  @Override
+  public boolean updateLatestStamp(byte[] now) {
+    if (this.isLatestTimestamp()) {
+      // unfortunately, this is a bit slower than the usual kv, but we don't expect this to happen
+      // all that often on the client (unless users are updating the ts this way), as it generally
+      // happens on the server
+      this.ts = Bytes.toLong(now);
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public boolean isLatestTimestamp() {
+    return this.ts == HConstants.LATEST_TIMESTAMP;
+  }
+
+  @Override
+  public int getKeyLength() {
+    return KEY_INFRASTRUCTURE_SIZE + getRowLength() + getFamilyLength() + getQualifierLength();
+  }
+
+  @Override
+  public byte[] getKey() {
+    throw new UnsupportedOperationException(ClientKeyValue.class.getSimpleName()
+        + " does not support a single backing buffer.");
+  }
+
+  @Override
+  public String getKeyString() {
+    throw new UnsupportedOperationException(ClientKeyValue.class.getSimpleName()
+        + " does not support a single backing buffer.");
+  }
+
+  @Override
+  public SplitKeyValue split() {
+    throw new UnsupportedOperationException(ClientKeyValue.class.getSimpleName()
+        + " should not be used for server-side operations");
+  }
+
+  @Override
+  public byte[] getBuffer() {
+    throw new UnsupportedOperationException(ClientKeyValue.class.getSimpleName()
+        + " does not support a single backing buffer.");
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/client/ClientKeyValueBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/client/ClientKeyValueBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/client/ClientKeyValueBuilder.java
new file mode 100644
index 0000000..09e295a
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/client/ClientKeyValueBuilder.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.client;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+
+/**
+ * A {@link KeyValueBuilder} that builds {@link ClientKeyValue}, eliminating the extra byte copies
+ * inherent in the standard {@link KeyValue} implementation.
+ * <p>
+ * This {@link KeyValueBuilder} is only supported in HBase 0.94.14+ (
+ * {@link PhoenixDatabaseMetaData#CLIENT_KEY_VALUE_BUILDER_THRESHOLD}), with the addition of
+ * HBASE-9834.
+ */
+public class ClientKeyValueBuilder extends KeyValueBuilder {
+
+    public static final KeyValueBuilder INSTANCE = new ClientKeyValueBuilder();
+
+  private ClientKeyValueBuilder() {
+    // private ctor for singleton
+  }
+
+  @Override
+  public KeyValue buildPut(ImmutableBytesWritable row, ImmutableBytesWritable family,
+      ImmutableBytesWritable qualifier, long ts, ImmutableBytesWritable value) {
+    return new ClientKeyValue(row, family, qualifier, ts, Type.Put, value);
+  }
+
+  @Override
+  public KeyValue buildDeleteFamily(ImmutableBytesWritable row, ImmutableBytesWritable family,
+            ImmutableBytesWritable qualifier, long ts) {
+        return new ClientKeyValue(row, family, qualifier, ts, Type.DeleteFamily, null);
+  }
+
+  @Override
+  public KeyValue buildDeleteColumns(ImmutableBytesWritable row, ImmutableBytesWritable family,
+            ImmutableBytesWritable qualifier, long ts) {
+        return new ClientKeyValue(row, family, qualifier, ts, Type.DeleteColumn, null);
+  }
+
+  @Override
+  public KeyValue buildDeleteColumn(ImmutableBytesWritable row, ImmutableBytesWritable family,
+            ImmutableBytesWritable qualifier, long ts) {
+        return new ClientKeyValue(row, family, qualifier, ts, Type.Delete, null);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/client/GenericKeyValueBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/client/GenericKeyValueBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/client/GenericKeyValueBuilder.java
new file mode 100644
index 0000000..b3771ca
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/client/GenericKeyValueBuilder.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.client;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import static org.apache.hadoop.hbase.index.util.ImmutableBytesPtr.copyBytesIfNecessary;
+
+/**
+ * {@link KeyValueBuilder} that does simple byte[] copies to build the underlying key-value. This is
+ * exactly the same behavior as currently used in {@link Delete} and {@link Put}.
+ */
+public class GenericKeyValueBuilder extends KeyValueBuilder {
+
+  public static final KeyValueBuilder INSTANCE = new GenericKeyValueBuilder();
+
+  private GenericKeyValueBuilder() {
+    // private ctor for singleton
+  }
+
+  @Override
+  public KeyValue buildPut(ImmutableBytesWritable row, ImmutableBytesWritable family,
+      ImmutableBytesWritable qualifier, long ts, ImmutableBytesWritable value) {
+    return build(row, family, qualifier, ts, Type.Put, value);
+  }
+
+  @Override
+  public KeyValue buildDeleteFamily(ImmutableBytesWritable row, ImmutableBytesWritable family,
+      ImmutableBytesWritable qualifier, long ts) {
+    return build(row, family, qualifier, ts, Type.DeleteFamily, null);
+  }
+
+  @Override
+  public KeyValue buildDeleteColumns(ImmutableBytesWritable row, ImmutableBytesWritable family,
+      ImmutableBytesWritable qualifier, long ts) {
+    return build(row, family, qualifier, ts, Type.DeleteColumn, null);
+  }
+
+  @Override
+  public KeyValue buildDeleteColumn(ImmutableBytesWritable row, ImmutableBytesWritable family,
+            ImmutableBytesWritable qualifier, long ts) {
+    return build(row, family, qualifier, ts, Type.Delete, null);
+  }
+
+  private KeyValue build(ImmutableBytesWritable row, ImmutableBytesWritable family,
+      ImmutableBytesWritable qualifier, long ts, KeyValue.Type type, ImmutableBytesWritable value) {
+    return new KeyValue(copyBytesIfNecessary(row), copyBytesIfNecessary(family),
+        copyBytesIfNecessary(qualifier), ts, type, value == null? null: copyBytesIfNecessary(value));
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/client/KeyValueBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/client/KeyValueBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/client/KeyValueBuilder.java
new file mode 100644
index 0000000..48608ff
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/client/KeyValueBuilder.java
@@ -0,0 +1,108 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.client;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.util.MetaDataUtil;
+
+/**
+ * Build {@link KeyValue} in an efficient way
+ */
+public abstract class KeyValueBuilder {
+
+    /**
+     * Helper method for a {@link KeyValueBuilder} that catches an IOException from a {@link Put}
+     * when adding a {@link KeyValue} generated by the KeyValueBuilder.
+     * @throws RuntimeException if there is an IOException thrown from the underlying {@link Put}
+     */
+    @SuppressWarnings("javadoc")
+    public static void addQuietly(Put put, KeyValueBuilder builder, KeyValue kv) {
+        try {
+            put.add(kv);
+        } catch (IOException e) {
+            throw new RuntimeException("KeyValue Builder " + builder + " created an invalid kv: "
+                    + kv + "!");
+        }
+    }
+
+    /**
+     * Helper method for a {@link KeyValueBuilder} that catches an IOException from a {@link Put}
+     * when adding a {@link KeyValue} generated by the KeyValueBuilder.
+     * @throws RuntimeException if there is an IOException thrown from the underlying {@link Put}
+     */
+    @SuppressWarnings("javadoc")
+    public static void deleteQuietly(Delete delete, KeyValueBuilder builder, KeyValue kv) {
+        try {
+            delete.addDeleteMarker(kv);
+        } catch (IOException e) {
+            throw new RuntimeException("KeyValue Builder " + builder + " created an invalid kv: "
+                    + kv + "!");
+        }
+    }
+
+    private static final int CUSTOM_KEY_VALUE_MIN_VERSION = MetaDataUtil.encodeVersion("0.94.14");
+
+    public static KeyValueBuilder get(String hbaseVersion) {
+        int version = MetaDataUtil.encodeVersion(hbaseVersion);
+        if (version >= CUSTOM_KEY_VALUE_MIN_VERSION) {
+            return ClientKeyValueBuilder.INSTANCE;
+        }
+        return GenericKeyValueBuilder.INSTANCE;
+    }
+
+  public KeyValue buildPut(ImmutableBytesWritable row, ImmutableBytesWritable family,
+      ImmutableBytesWritable qualifier, ImmutableBytesWritable value) {
+    return buildPut(row, family, qualifier, HConstants.LATEST_TIMESTAMP, value);
+  }
+
+  public abstract KeyValue buildPut(ImmutableBytesWritable row, ImmutableBytesWritable family,
+      ImmutableBytesWritable qualifier, long ts, ImmutableBytesWritable value);
+
+  public KeyValue buildDeleteFamily(ImmutableBytesWritable row, ImmutableBytesWritable family,
+            ImmutableBytesWritable qualifier) {
+        return buildDeleteFamily(row, family, qualifier, HConstants.LATEST_TIMESTAMP);
+  }
+
+  public abstract KeyValue buildDeleteFamily(ImmutableBytesWritable row,
+            ImmutableBytesWritable family, ImmutableBytesWritable qualifier, long ts);
+
+  public KeyValue buildDeleteColumns(ImmutableBytesWritable row, ImmutableBytesWritable family,
+            ImmutableBytesWritable qualifier) {
+        return buildDeleteColumns(row, family, qualifier, HConstants.LATEST_TIMESTAMP);
+  }
+
+  public abstract KeyValue buildDeleteColumns(ImmutableBytesWritable row,
+            ImmutableBytesWritable family, ImmutableBytesWritable qualifier, long ts);
+
+  public KeyValue buildDeleteColumn(ImmutableBytesWritable row, ImmutableBytesWritable family,
+            ImmutableBytesWritable qualifier) {
+        return buildDeleteColumn(row, family, qualifier, HConstants.LATEST_TIMESTAMP);
+  }
+
+  public abstract KeyValue buildDeleteColumn(ImmutableBytesWritable row,
+            ImmutableBytesWritable family, ImmutableBytesWritable qualifier, long ts);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/compile/AggregationManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/AggregationManager.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/AggregationManager.java
new file mode 100644
index 0000000..70ea3a2
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/AggregationManager.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import org.apache.phoenix.expression.aggregator.ClientAggregators;
+
+/**
+ * 
+ * Class that manages aggregations during query compilation
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class AggregationManager {
+    private ClientAggregators aggregators;
+    private int position = 0;
+    
+    public AggregationManager() {
+    }
+
+    public ClientAggregators getAggregators() {
+        return aggregators;
+    }
+    
+    /**
+     * @return allocate the next available zero-based positional index
+     * for the client-side aggregate function.
+     */
+    protected int nextPosition() {
+        return position++;
+    }
+    
+    public void setAggregators(ClientAggregators clientAggregator) {
+        this.aggregators = clientAggregator;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/compile/BindManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/BindManager.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/BindManager.java
new file mode 100644
index 0000000..859d233
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/BindManager.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import java.sql.ParameterMetaData;
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.jdbc.PhoenixParameterMetaData;
+import org.apache.phoenix.parse.BindParseNode;
+import org.apache.phoenix.schema.PDatum;
+
+
+/**
+ * 
+ * Class that manages binding parameters and checking type matching. There are
+ * two main usages:
+ * 
+ * 1) the standard query case where we have the values for the binds.
+ * 2) the retrieve param metadata case where we don't have the bind values.
+ * 
+ * In both cases, during query compilation we figure out what type the bind variable
+ * "should" be, based on how it's used in the query. For example foo < ? would expect
+ * that the bind variable type matches or can be coerced to the type of foo. For (1),
+ * we check that the bind value has the correct type and for (2) we set the param
+ * metadata type.
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class BindManager {
+    public static final Object UNBOUND_PARAMETER = new Object();
+
+    private final List<Object> binds;
+    private final PhoenixParameterMetaData bindMetaData;
+
+    public BindManager(List<Object> binds) {
+        this.binds = binds;
+        this.bindMetaData = new PhoenixParameterMetaData(binds.size());
+    }
+
+    public ParameterMetaData getParameterMetaData() {
+        return bindMetaData;
+    }
+    
+    public Object getBindValue(BindParseNode node) throws SQLException {
+        int index = node.getIndex();
+        if (index < 0 || index >= binds.size()) {
+            throw new SQLExceptionInfo.Builder(SQLExceptionCode.PARAM_INDEX_OUT_OF_BOUND)
+                .setMessage("binds size: " + binds.size() + "; index: " + index).build().buildException();
+        }
+        Object value = binds.get(index);
+        if (value == UNBOUND_PARAMETER) {
+            throw new SQLExceptionInfo.Builder(SQLExceptionCode.PARAM_VALUE_UNBOUND)
+            .setMessage(node.toString()).build().buildException();
+        }
+        return value;
+    }
+
+    public void addParamMetaData(BindParseNode bind, PDatum column) throws SQLException {
+        bindMetaData.addParam(bind,column);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/compile/ColumnProjector.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ColumnProjector.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ColumnProjector.java
new file mode 100644
index 0000000..8c1697b
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ColumnProjector.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import java.sql.SQLException;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+
+
+/**
+ * 
+ * Interface used to access the value of a projected column.
+ * 
+ * @author jtaylor
+ * @since 0.1
+ */
+public interface ColumnProjector {
+    /**
+     * Get the column name as it was referenced in the query
+     * @return the database column name
+     */
+    String getName();
+    
+    /**
+     * Get the expression
+     * @return the expression for the column projector
+     */
+    public Expression getExpression();
+    
+    // TODO: An expression may contain references to multiple tables.
+    /**
+     * Get the name of the hbase table containing the column
+     * @return the hbase table name
+     */
+    String getTableName();
+    
+    /**
+     * Get the value of the column, coercing it if necessary to the specified type
+     * @param tuple the row containing the column
+     * @param type the type to which to coerce the binary value
+     * @param ptr used to retrieve the value
+     * @return the object representation of the column value.
+     * @throws SQLException
+     */
+    Object getValue(Tuple tuple, PDataType type, ImmutableBytesWritable ptr) throws SQLException;
+    
+    boolean isCaseSensitive();
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/compile/ColumnResolver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ColumnResolver.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ColumnResolver.java
new file mode 100644
index 0000000..49a1947
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ColumnResolver.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.phoenix.schema.ColumnRef;
+import org.apache.phoenix.schema.TableRef;
+
+
+
+/**
+ * 
+ * Interface used to resolve column references occurring
+ * in the select statement.
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public interface ColumnResolver {
+    
+    /**
+     * Returns the collection of resolved tables in the FROM clause.
+     */
+    public List<TableRef> getTables();
+    
+    /**
+     * Resolves column using name and alias.
+     * @param schemaName TODO
+     * @param tableName TODO
+     * @param colName TODO
+     * @return the resolved ColumnRef
+     * @throws ColumnNotFoundException if the column could not be resolved
+     * @throws AmbiguousColumnException if the column name is ambiguous
+     */
+    public ColumnRef resolveColumn(String schemaName, String tableName, String colName) throws SQLException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java
new file mode 100644
index 0000000..718d09a
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import java.sql.ParameterMetaData;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.hbase.client.Scan;
+
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.expression.LiteralExpression;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.parse.CreateIndexStatement;
+import org.apache.phoenix.parse.ParseNode;
+import org.apache.phoenix.schema.MetaDataClient;
+
+public class CreateIndexCompiler {
+    private final PhoenixStatement statement;
+
+    public CreateIndexCompiler(PhoenixStatement statement) {
+        this.statement = statement;
+    }
+
+    public MutationPlan compile(final CreateIndexStatement create) throws SQLException {
+        final PhoenixConnection connection = statement.getConnection();
+        final ColumnResolver resolver = FromCompiler.getResolver(create, connection);
+        Scan scan = new Scan();
+        final StatementContext context = new StatementContext(statement, resolver, statement.getParameters(), scan);
+        ExpressionCompiler expressionCompiler = new ExpressionCompiler(context);
+        List<ParseNode> splitNodes = create.getSplitNodes();
+        final byte[][] splits = new byte[splitNodes.size()][];
+        for (int i = 0; i < splits.length; i++) {
+            ParseNode node = splitNodes.get(i);
+            if (!node.isStateless()) {
+                throw new SQLExceptionInfo.Builder(SQLExceptionCode.SPLIT_POINT_NOT_CONSTANT)
+                    .setMessage("Node: " + node).build().buildException();
+            }
+            LiteralExpression expression = (LiteralExpression)node.accept(expressionCompiler);
+            splits[i] = expression.getBytes();
+        }
+        final MetaDataClient client = new MetaDataClient(connection);
+        
+        return new MutationPlan() {
+
+            @Override
+            public ParameterMetaData getParameterMetaData() {
+                return context.getBindManager().getParameterMetaData();
+            }
+
+            @Override
+            public PhoenixConnection getConnection() {
+                return connection;
+            }
+
+            @Override
+            public MutationState execute() throws SQLException {
+                return client.createIndex(create, splits);
+            }
+
+            @Override
+            public ExplainPlan getExplainPlan() throws SQLException {
+                return new ExplainPlan(Collections.singletonList("CREATE INDEX"));
+            }
+        };
+    }
+}