You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/01/27 23:16:00 UTC
[44/51] [partial] Initial commit of master branch from github
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillMap.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillMap.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillMap.java
new file mode 100644
index 0000000..d93d08c
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillMap.java
@@ -0,0 +1,494 @@
+package org.apache.phoenix.cache.aggcache;
+
+import java.io.IOException;
+import java.nio.BufferOverflowException;
+import java.nio.MappedByteBuffer;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.hbase.util.Bytes;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.hash.BloomFilter;
+import com.google.common.hash.Funnels;
+import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
+
+/**
+ * Class implements an active spilled partition serialized tuples are first written into an in-memory data structure
+ * that represents a single page. As the page fills up, it is written to the current spillFile or spill partition For
+ * fast tuple discovery, the class maintains a per page bloom-filter and never de-serializes elements. The element
+ * spilling employs an extentible hashing technique.
+ */
+public class SpillMap extends AbstractMap<ImmutableBytesPtr, byte[]> implements Iterable<byte[]> {
+
+ // Threshold is typically the page size
+ private final int thresholdBytes;
+ private final int pageInserts;
+ // Global directory depth
+ private int globalDepth;
+ private int curMapBufferIndex;
+ private SpillFile spillFile;
+ // Directory of hash buckets --> extendible hashing implementation
+ private MappedByteBufferMap[] directory;
+ private final SpillableGroupByCache.QueryCache cache;
+
+ public SpillMap(SpillFile file, int thresholdBytes, int estValueSize, SpillableGroupByCache.QueryCache cache)
+ throws IOException {
+ this.thresholdBytes = thresholdBytes - Bytes.SIZEOF_INT;
+ this.pageInserts = thresholdBytes / estValueSize;
+ this.spillFile = file;
+ this.cache = cache;
+
+ // Init the e-hashing directory structure
+ globalDepth = 1;
+ directory = new MappedByteBufferMap[(1 << globalDepth)];
+
+ for (int i = 0; i < directory.length; i++) {
+ // Create an empty bucket list
+ directory[i] = new MappedByteBufferMap(i, this.thresholdBytes, pageInserts, file);
+ directory[i].flushBuffer();
+ }
+ directory[0].pageIn();
+ curMapBufferIndex = 0;
+ }
+
+ // Get the directoy index for a specific key
+ private int getBucketIndex(ImmutableBytesPtr key) {
+ // Get key hash
+ int hashCode = key.hashCode();
+
+ // Mask all but globalDepth low n bits
+ return hashCode & ((1 << globalDepth) - 1);
+ }
+
+ // Function redistributes the elements in the current index
+ // to two new buckets, based on the bit at localDepth + 1 position.
+ // Optionally this function also doubles the directory to allow
+ // for bucket splits
+ private void redistribute(int index, ImmutableBytesPtr keyNew, byte[] valueNew) {
+ // Get the respective bucket
+ MappedByteBufferMap byteMap = directory[index];
+
+ // Get the actual bucket index, that the directory index points to
+ int mappedIdx = byteMap.pageIndex;
+
+ int localDepth = byteMap.localDepth;
+ ArrayList<Integer> buckets = Lists.newArrayList();
+ // Get all directory entries that point to the same bucket.
+ // TODO: can be made faster!
+ for (int i = 0; i < directory.length; i++) {
+ if (directory[i].pageIndex == mappedIdx) {
+ buckets.add(i);
+ }
+ }
+
+ // Assuming no directory doubling for now
+ // compute the two new bucket Ids for splitting
+ // SpillFile adds new files dynamically in case the directory points to pageIDs
+ // that exceed the size limit of a single file.
+
+ // TODO verify if some sort of de-fragmentation might be helpful
+ int tmpIndex = index ^ ((1 << localDepth));
+ int b1Index = Math.min(index, tmpIndex);
+ int b2Index = Math.max(index, tmpIndex);
+
+ // Create two new split buckets
+ MappedByteBufferMap b1 = new MappedByteBufferMap(b1Index, thresholdBytes, pageInserts, spillFile);
+ MappedByteBufferMap b2 = new MappedByteBufferMap(b2Index, thresholdBytes, pageInserts, spillFile);
+
+ // redistribute old elements into b1 and b2
+ for (Entry<ImmutableBytesPtr, byte[]> element : byteMap.pageMap.entrySet()) {
+ ImmutableBytesPtr key = element.getKey();
+ byte[] value = element.getValue();
+ // Only add key during redistribution if its not in the cache
+ // Otherwise this is an good point to reduce the number of spilled elements
+ if (!cache.isKeyContained(key)) {
+ // Re-distribute element onto the new 2 split buckets
+ if ((key.hashCode() & ((1 << localDepth))) != 0) {
+ b2.addElement(null, key, value);
+ } else {
+ b1.addElement(null, key, value);
+ }
+ }
+ }
+
+ // Clear and GC the old now redistributed bucket
+ byteMap.pageMap.clear();
+ byteMap = null;
+
+ // Increase local bucket depths
+ b1.localDepth = localDepth + 1;
+ b2.localDepth = localDepth + 1;
+ boolean doubleDir = false;
+
+ if (globalDepth < (localDepth + 1)) {
+ // Double directory structure and re-adjust pointers
+ doubleDir = true;
+
+ b2Index = doubleDirectory(b2Index, keyNew);
+ }
+
+ if (!doubleDir) {
+ // This is a bit more tricky, we have to cover scenarios where
+ // globalDepth - localDepth > 1
+ // Here even after bucket splitting, multiple directory entries point to
+ // the new buckets
+ for (int i = 0; i < buckets.size(); i++) {
+ if ((buckets.get(i) & (1 << (localDepth))) != 0) {
+ directory[buckets.get(i)] = b2;
+ } else {
+ directory[buckets.get(i)] = b1;
+ }
+ }
+ } else {
+ // Update the directory indexes in case of directory doubling
+ directory[b1Index] = b1;
+ directory[b2Index] = b2;
+ }
+ }
+
+ // Doubles the directory and readjusts pointers.
+ private int doubleDirectory(int b2Index, ImmutableBytesPtr keyNew) {
+ // Double the directory in size, second half points to original first half
+ int newDirSize = 1 << (globalDepth + 1);
+
+ // Ensure that the new directory size does not exceed size limits
+ Preconditions.checkArgument(newDirSize < Integer.MAX_VALUE);
+
+ // Double it!
+ MappedByteBufferMap[] newDirectory = new MappedByteBufferMap[newDirSize];
+ for (int i = 0; i < directory.length; i++) {
+ newDirectory[i] = directory[i];
+ newDirectory[i + directory.length] = directory[i];
+ }
+ directory = newDirectory;
+ newDirectory = null;
+
+ // Adjust the index for new split bucket, according to the directory double
+ b2Index = (keyNew.hashCode() & ((1 << globalDepth) - 1)) | (1 << globalDepth);
+
+ // Increment global depth
+ globalDepth++;
+
+ return b2Index;
+ }
+
+ /**
+ * Get a key from the spillable data structures. page is determined via hash partitioning, and a bloomFilter check
+ * is used to determine if its worth paging in the data.
+ */
+ @Override
+ public byte[] get(Object key) {
+ if (!(key instanceof ImmutableBytesPtr)) {
+ // TODO ... work on type safety
+ }
+ ImmutableBytesPtr ikey = (ImmutableBytesPtr)key;
+ byte[] value = null;
+
+ int bucketIndex = getBucketIndex(ikey);
+ MappedByteBufferMap byteMap = directory[bucketIndex];
+
+ // Decision based on bucket ID, not the directory ID due to the n:1 relationship
+ if (directory[curMapBufferIndex].pageIndex != byteMap.pageIndex) {
+ // map not paged in
+ MappedByteBufferMap curByteMap = directory[curMapBufferIndex];
+
+ // Use bloomFilter to check if key was spilled before
+ if (byteMap.containsKey(ikey.copyBytesIfNecessary())) {
+ // ensure consistency and flush current memory page to disk
+ // fflush current buffer
+ curByteMap.flushBuffer();
+ // page in new buffer
+ byteMap.pageIn();
+ // update index
+ curMapBufferIndex = bucketIndex;
+ }
+ }
+ // get KV from current map
+ value = byteMap.getPagedInElement(ikey);
+ return value;
+ }
+
+ // Similar as get(Object key) function, however
+ // always pages in page a key is spilled to, no bloom filter decision
+ private byte[] getAlways(ImmutableBytesPtr key) {
+ byte[] value = null;
+ int bucketIndex = getBucketIndex(key);
+ MappedByteBufferMap byteMap = directory[bucketIndex];
+
+ if (directory[curMapBufferIndex].pageIndex != byteMap.pageIndex) {
+ MappedByteBufferMap curByteMap = directory[curMapBufferIndex];
+ // ensure consistency and flush current memory page to disk
+ curByteMap.flushBuffer();
+
+ byteMap.pageIn();
+ curMapBufferIndex = bucketIndex;
+ }
+ // get KV from current queue
+ value = byteMap.getPagedInElement(key);
+ return value;
+ }
+
+ /**
+ * Spill a key First we discover if the key has been spilled before and load it into memory: #ref get() if it was
+ * loaded before just replace the old value in the memory page if it was not loaded before try to store it in the
+ * current page alternatively if not enough memory available, request new page.
+ */
+ @Override
+ public byte[] put(ImmutableBytesPtr key, byte[] value) {
+ boolean redistributed = false;
+ // page in element and replace if present
+ byte[] spilledValue = getAlways(key);
+
+ MappedByteBufferMap byteMap = directory[curMapBufferIndex];
+ int index = curMapBufferIndex;
+
+ // TODO: We split buckets until the new element fits onto a
+ // one of the new buckets. Might consider the use of an overflow
+ // bucket, especially in case the directory runs out of page IDs.
+ while (!byteMap.canFit(spilledValue, value)) {
+ // Element does not fit... Split the bucket!
+ redistribute(index, key, value);
+ redistributed = true;
+
+ index = getBucketIndex(key);
+ byteMap = directory[index];
+ }
+ // Ensure that all pages that were paged in during redistribution are flushed back out
+ // to disk to keep memory footprint small.
+ if (redistributed) {
+ for (int i = 0; i < directory.length; i++) {
+ if (directory[i].pageIndex != byteMap.pageIndex) {
+ directory[i].flushBuffer();
+ }
+ }
+ // Ensure the page that receives the new key is in memory
+ spilledValue = getAlways(key);
+ }
+ byteMap.addElement(spilledValue, key, value);
+
+ return value;
+ }
+
+ /**
+ * Function returns the current spill file
+ */
+ public SpillFile getSpillFile() {
+ return spillFile;
+ }
+
+ /**
+ * This inner class represents the currently mapped file region. It uses a Map to represent the current in memory
+ * page for easy get() and update() calls on an individual key The class keeps track of the current size of the in
+ * memory page and handles flushing and paging in respectively
+ */
+ private static class MappedByteBufferMap {
+ private SpillFile spillFile;
+ private int pageIndex;
+ private final int thresholdBytes;
+ private long totalResultSize;
+ private boolean pagedIn;
+ private int localDepth;
+ // dirtyPage flag tracks if a paged in page was modified
+ // if not, no need to flush it back out to disk
+ private boolean dirtyPage;
+ // Use a map for in memory page representation
+ Map<ImmutableBytesPtr, byte[]> pageMap = Maps.newHashMap();
+ // Used to determine is an element was written to this page before or not
+ BloomFilter<byte[]> bFilter;
+
+ public MappedByteBufferMap(int id, int thresholdBytes, int pageInserts, SpillFile spillFile) {
+ this.spillFile = spillFile;
+ // size threshold of a page
+ this.thresholdBytes = thresholdBytes;
+ this.pageIndex = id;
+ pageMap.clear();
+ bFilter = BloomFilter.create(Funnels.byteArrayFunnel(), pageInserts);
+ pagedIn = true;
+ totalResultSize = 0;
+ localDepth = 1;
+ dirtyPage = true;
+ }
+
+ private boolean containsKey(byte[] key) {
+ return bFilter.mightContain(key);
+ }
+
+ private boolean canFit(byte[] curValue, byte[] newValue) {
+ if (thresholdBytes < newValue.length) {
+ // TODO resize page size if single element is too big,
+ // Can this ever happen?
+ throw new RuntimeException("page size too small to store a single KV element");
+ }
+
+ int resultSize = newValue.length + Bytes.SIZEOF_INT;
+ if (curValue != null) {
+ // Key existed before
+ // Ensure to compensate for potential larger byte[] for agg
+ resultSize = Math.max(0, resultSize - (curValue.length + Bytes.SIZEOF_INT));
+ }
+
+ if ((thresholdBytes - totalResultSize) <= (resultSize)) {
+ // KV does not fit
+ return false;
+ }
+ // KV fits
+ return true;
+ }
+
+ // Flush the current page to the memory mapped byte buffer
+ private void flushBuffer() throws BufferOverflowException {
+ if (pagedIn) {
+ MappedByteBuffer buffer;
+ // Only flush if page was changed
+ if (dirtyPage) {
+ Collection<byte[]> values = pageMap.values();
+ buffer = spillFile.getPage(pageIndex);
+ buffer.clear();
+ // number of elements
+ buffer.putInt(values.size());
+ for (byte[] value : values) {
+ // element length
+ buffer.putInt(value.length);
+ // element
+ buffer.put(value, 0, value.length);
+ }
+ }
+ buffer = null;
+ // Reset page stats
+ pageMap.clear();
+ totalResultSize = 0;
+ }
+ pagedIn = false;
+ dirtyPage = false;
+ }
+
+ // load memory mapped region into a map for fast element access
+ private void pageIn() throws IndexOutOfBoundsException {
+ if (!pagedIn) {
+ // Map the memory region
+ MappedByteBuffer buffer = spillFile.getPage(pageIndex);
+ int numElements = buffer.getInt();
+ for (int i = 0; i < numElements; i++) {
+ int kvSize = buffer.getInt();
+ byte[] data = new byte[kvSize];
+ buffer.get(data, 0, kvSize);
+ try {
+ pageMap.put(SpillManager.getKey(data), data);
+ totalResultSize += (data.length + Bytes.SIZEOF_INT);
+ } catch (IOException ioe) {
+ // Error during key access on spilled resource
+ // TODO rework error handling
+ throw new RuntimeException(ioe);
+ }
+ }
+ pagedIn = true;
+ dirtyPage = false;
+ }
+ }
+
+ /**
+ * Return a cache element currently page into memory Direct access via mapped page map
+ *
+ * @param key
+ * @return
+ */
+ public byte[] getPagedInElement(ImmutableBytesPtr key) {
+ return pageMap.get(key);
+ }
+
+ /**
+ * Inserts / Replaces cache element in the currently loaded page. Direct access via mapped page map
+ *
+ * @param key
+ * @param value
+ */
+ public void addElement(byte[] spilledValue, ImmutableBytesPtr key, byte[] value) {
+
+ // put Element into map
+ pageMap.put(key, value);
+ // Update bloom filter
+ bFilter.put(key.copyBytesIfNecessary());
+ // track current Map size to prevent Buffer overflows
+ if (spilledValue != null) {
+ // if previous key was present, just add the size difference
+ totalResultSize += Math.max(0, value.length - (spilledValue.length));
+ } else {
+ // Add new size information
+ totalResultSize += (value.length + Bytes.SIZEOF_INT);
+ }
+
+ dirtyPage = true;
+ }
+
+ /**
+ * Returns a value iterator over the pageMap
+ */
+ public Iterator<byte[]> getPageMapEntries() {
+ pageIn();
+ return pageMap.values().iterator();
+ }
+ }
+
+ /**
+ * Iterate over all spilled elements, including the ones that are currently paged into memory
+ */
+ @Override
+ public Iterator<byte[]> iterator() {
+ directory[curMapBufferIndex].flushBuffer();
+
+ return new Iterator<byte[]>() {
+ int pageIndex = 0;
+ Iterator<byte[]> entriesIter = directory[pageIndex].getPageMapEntries();
+ HashSet<Integer> dups = new HashSet<Integer>();
+
+ @Override
+ public boolean hasNext() {
+ if (!entriesIter.hasNext()) {
+ boolean found = false;
+ // Clear in memory map
+
+ while (!found) {
+ pageIndex++;
+ if (pageIndex >= directory.length) { return false; }
+ directory[pageIndex - 1].pageMap.clear();
+ // get keys from all spilled pages
+ if (!dups.contains(directory[pageIndex].pageIndex)) {
+ dups.add(directory[pageIndex].pageIndex);
+ entriesIter = directory[pageIndex].getPageMapEntries();
+ if (entriesIter.hasNext()) {
+ found = true;
+ }
+ }
+ }
+ }
+ dups.add(directory[pageIndex].pageIndex);
+ return true;
+ }
+
+ @Override
+ public byte[] next() {
+ // get elements from in memory map first
+ return entriesIter.next();
+ }
+
+ @Override
+ public void remove() {
+ throw new IllegalAccessError("Iterator does not support removal operation");
+ }
+ };
+ }
+
+ // TODO implement this method to make the SpillMap a true Map implementation
+ @Override
+ public Set<java.util.Map.Entry<ImmutableBytesPtr, byte[]>> entrySet() {
+ throw new IllegalAccessError("entrySet is not supported for this type of cache");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java
new file mode 100644
index 0000000..41a4e65
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java
@@ -0,0 +1,361 @@
+package org.apache.phoenix.cache.aggcache;
+
+import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+import static org.apache.phoenix.query.QueryServices.GROUPBY_MAX_CACHE_SIZE_ATTRIB;
+import static org.apache.phoenix.query.QueryServices.GROUPBY_SPILL_FILES_ATTRIB;
+import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_GROUPBY_MAX_CACHE_MAX;
+import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_GROUPBY_SPILL_FILES;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.io.Closeables;
+import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.cache.GlobalCache;
+import org.apache.phoenix.cache.TenantCache;
+import org.apache.phoenix.cache.aggcache.SpillManager.CacheEntry;
+import org.apache.phoenix.coprocessor.BaseRegionScanner;
+import org.apache.phoenix.coprocessor.GroupByCache;
+import org.apache.phoenix.coprocessor.GroupedAggregateRegionObserver;
+import org.apache.phoenix.expression.aggregator.Aggregator;
+import org.apache.phoenix.expression.aggregator.ServerAggregators;
+import org.apache.phoenix.memory.InsufficientMemoryException;
+import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
+import org.apache.phoenix.util.KeyValueUtil;
+
+/**
+ * The main entry point is in GroupedAggregateRegionObserver. It instantiates a SpillableGroupByCache and invokes a
+ * get() method on it. There is no: "if key not exists -> put into map" case, since the cache is a Loading cache and
+ * therefore handles the put under the covers. I tried to implement the final cache element accesses (RegionScanner
+ * below) streaming, i.e. there is just an iterator on it and removed the existing result materialization.
+ * SpillableGroupByCache implements a LRU cache using a LinkedHashMap with access order. There is a configurable an
+ * upper and lower size limit in bytes which are used as follows to compute the initial cache size in number of
+ * elements: Max(lowerBoundElements, Min(upperBoundElements, estimatedCacheSize)). Once the number of cached elements
+ * exceeds this number, the cache size is increased by a factor of 1.5. This happens until the additional memory to grow
+ * the cache cannot be requested. At this point the Cache starts spilling elements. As long as no eviction happens no
+ * spillable data structures are allocated, this only happens as soon as the first element is evicted from the cache. We
+ * cannot really make any assumptions on which keys arrive at the map, but assume the LRU would at least cover the cases
+ * where some keys have a slight skew and they should stay memory resident. Once a key gets evicted, the spillManager is
+ * instantiated. It basically takes care of spilling an element to disk and does all the SERDE work. It pre-allocates a
+ * configurable number of SpillFiles (spill partition) which are memory mapped temp files. The SpillManager keeps a list
+ * of these and hash distributes the keys within this list. Once an element gets spilled, it is serialized and will only
+ * get deserialized again, when it is requested from the client, i.e. loaded back into the LRU cache. The SpillManager
+ * holds a single SpillMap object in memory for every spill partition (SpillFile). The SpillMap is an in memory Map
+ * representation of a single page of spilled serialized key/value pairs. To achieve fast key lookup the key is hash
+ * partitioned into random pages of the current spill file. The code implements an extendible hashing approach which
+ * dynamically adjusts the hash function, in order to adapt to growing number of storage pages and avoiding long chains
+ * of overflow buckets. For an excellent discussion of the algorithm please refer to the following online resource:
+ * http://db.inf.uni-tuebingen.de/files/teaching/ws1011/db2/db2-hash-indexes.pdf . For this, each SpillFile keeps a
+ * directory of pointers to Integer.MAX_VALUE 4K pages in memory, which allows each directory to address more pages than
+ * a single memory mapped temp file could theoretically store. In case directory doubling, requests a page index that
+ * exceeds the limits of the initial temp file limits, the implementation dynamically allocates additional temp files to
+ * the SpillFile. The directory starts with a global depth of 1 and therefore a directory size of 2 buckets. Only during
+ * bucket split and directory doubling more than one page is temporarily kept in memory until all elements have been
+ * redistributed. The current implementation conducts bucket splits as long as an element does not fit onto a page. No
+ * overflow chain is created, which might be an alternative. For get requests, each directory entry maintains a
+ * bloomFilter to prevent page-in operations in case an element has never been spilled before. The deserialization is
+ * only triggered when a key a loaded back into the LRU cache. The aggregators are returned from the LRU cache and the
+ * next value is computed. In case the key is not found on any page, the Loader create new aggregators for it.
+ */
+
+public class SpillableGroupByCache implements GroupByCache {
+
+ private static final Logger logger = LoggerFactory.getLogger(SpillableGroupByCache.class);
+
+ // Min size of 1st level main memory cache in bytes --> lower bound
+ private static final int SPGBY_CACHE_MIN_SIZE = 4096; // 4K
+
+ // TODO Generally better to use Collection API with generics instead of
+ // array types
+ private final LinkedHashMap<ImmutableBytesWritable, Aggregator[]> cache;
+ private SpillManager spillManager = null;
+ private int curNumCacheElements;
+ private final ServerAggregators aggregators;
+ private final RegionCoprocessorEnvironment env;
+ private final MemoryChunk chunk;
+
+ /*
+ * inner class that makes cache queryable for other classes that should not get the full instance. Queryable view of
+ * the cache
+ */
+ public class QueryCache {
+ public boolean isKeyContained(ImmutableBytesPtr key) {
+ return cache.containsKey(key);
+ }
+ }
+
+ /**
+ * Instantiates a Loading LRU Cache that stores key / aggregator[] tuples used for group by queries
+ *
+ * @param estSize
+ * @param estValueSize
+ * @param aggs
+ * @param ctxt
+ */
+ public SpillableGroupByCache(final RegionCoprocessorEnvironment env, ImmutableBytesWritable tenantId,
+ ServerAggregators aggs, final int estSizeNum) {
+ curNumCacheElements = 0;
+ this.aggregators = aggs;
+ this.env = env;
+
+ final int estValueSize = aggregators.getEstimatedByteSize();
+ final TenantCache tenantCache = GlobalCache.getTenantCache(env, tenantId);
+
+ // Compute Map initial map
+ final Configuration conf = env.getConfiguration();
+ final long maxCacheSizeConf = conf.getLong(GROUPBY_MAX_CACHE_SIZE_ATTRIB, DEFAULT_GROUPBY_MAX_CACHE_MAX);
+ final int numSpillFilesConf = conf.getInt(GROUPBY_SPILL_FILES_ATTRIB, DEFAULT_GROUPBY_SPILL_FILES);
+
+ final int maxSizeNum = (int)(maxCacheSizeConf / estValueSize);
+ final int minSizeNum = (SPGBY_CACHE_MIN_SIZE / estValueSize);
+
+ // use upper and lower bounds for the cache size
+ final int maxCacheSize = Math.max(minSizeNum, Math.min(maxSizeNum, estSizeNum));
+ final int estSize = GroupedAggregateRegionObserver.sizeOfUnorderedGroupByMap(maxCacheSize, estValueSize);
+ try {
+ this.chunk = tenantCache.getMemoryManager().allocate(estSize);
+ } catch (InsufficientMemoryException ime) {
+ logger.error("Requested Map size exceeds memory limit, please decrease max size via config paramter: "
+ + GROUPBY_MAX_CACHE_SIZE_ATTRIB);
+ throw ime;
+ }
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Instantiating LRU groupby cache of element size: " + maxCacheSize);
+ }
+
+ // LRU cache implemented as LinkedHashMap with access order
+ cache = new LinkedHashMap<ImmutableBytesWritable, Aggregator[]>(maxCacheSize, 0.75f, true) {
+ boolean spill = false;
+ int cacheSize = maxCacheSize;
+
+ @Override
+ protected boolean removeEldestEntry(Map.Entry<ImmutableBytesWritable, Aggregator[]> eldest) {
+ if (!spill && size() > cacheSize) { // increase allocation
+ cacheSize *= 1.5f;
+ int estSize = GroupedAggregateRegionObserver.sizeOfUnorderedGroupByMap(cacheSize, estValueSize);
+ try {
+ chunk.resize(estSize);
+ } catch (InsufficientMemoryException im) {
+ // Cannot extend Map anymore, start spilling
+ spill = true;
+ }
+ }
+
+ if (spill) {
+ try {
+ if (spillManager == null) {
+ // Lazy instantiation of spillable data
+ // structures
+ //
+ // Only create spill data structs if LRU
+ // cache is too small
+ spillManager = new SpillManager(numSpillFilesConf, aggregators, env.getConfiguration(),
+ new QueryCache());
+ }
+ spillManager.spill(eldest.getKey(), eldest.getValue());
+ // keep track of elements in cache
+ curNumCacheElements--;
+ } catch (IOException ioe) {
+ // Ensure that we always close and delete the temp files
+ try {
+ throw new RuntimeException(ioe);
+ } finally {
+ Closeables.closeQuietly(SpillableGroupByCache.this);
+ }
+ }
+ return true;
+ }
+
+ return false;
+ }
+ };
+ }
+
+ /**
+ * Size function returns the estimate LRU cache size in bytes
+ */
+ @Override
+ public int size() {
+ return curNumCacheElements * aggregators.getEstimatedByteSize();
+ }
+
+ /**
+ * Extract an element from the Cache If element is not present in in-memory cache / or in spill files cache
+ * implements an implicit put() of a new key/value tuple and loads it into the cache
+ */
+ @Override
+ public Aggregator[] cache(ImmutableBytesWritable cacheKey) {
+ ImmutableBytesPtr key = new ImmutableBytesPtr(cacheKey);
+ Aggregator[] rowAggregators = cache.get(key);
+ if (rowAggregators == null) {
+ // If Aggregators not found for this distinct
+ // value, clone our original one (we need one
+ // per distinct value)
+ if (spillManager != null) {
+ // Spill manager present, check if key has been
+ // spilled before
+ try {
+ rowAggregators = spillManager.loadEntry(key);
+ } catch (IOException ioe) {
+ // Ensure that we always close and delete the temp files
+ try {
+ throw new RuntimeException(ioe);
+ } finally {
+ Closeables.closeQuietly(SpillableGroupByCache.this);
+ }
+ }
+ }
+ if (rowAggregators == null) {
+ // No, key never spilled before, create a new tuple
+ rowAggregators = aggregators.newAggregators(env.getConfiguration());
+ if (logger.isDebugEnabled()) {
+ logger.debug("Adding new aggregate bucket for row key "
+ + Bytes.toStringBinary(key.get(), key.getOffset(), key.getLength()));
+ }
+ }
+ cache.put(key, rowAggregators);
+ // keep track of elements in cache
+ curNumCacheElements++;
+ }
+ return rowAggregators;
+ }
+
+ /**
+ * Iterator over the cache and the spilled data structures by returning CacheEntries. CacheEntries are either
+ * extracted from the LRU cache or from the spillable data structures.The key/value tuples are returned in
+ * non-deterministic order.
+ */
+ private final class EntryIterator implements Iterator<Map.Entry<ImmutableBytesWritable, Aggregator[]>> {
+ final Iterator<Map.Entry<ImmutableBytesWritable, Aggregator[]>> cacheIter;
+ final Iterator<byte[]> spilledCacheIter;
+
+ private EntryIterator() {
+ cacheIter = cache.entrySet().iterator();
+ if (spillManager != null) {
+ spilledCacheIter = spillManager.newDataIterator();
+ } else {
+ spilledCacheIter = null;
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ return cacheIter.hasNext();
+ }
+
+ @Override
+ public Map.Entry<ImmutableBytesWritable, Aggregator[]> next() {
+ if (spilledCacheIter != null && spilledCacheIter.hasNext()) {
+ try {
+ byte[] value = spilledCacheIter.next();
+ // Deserialize into a CacheEntry
+ Map.Entry<ImmutableBytesWritable, Aggregator[]> spilledEntry = spillManager.toCacheEntry(value);
+
+ boolean notFound = false;
+ // check against map and return only if not present
+ while (cache.containsKey(spilledEntry.getKey())) {
+ // LRU Cache entries always take precedence,
+ // since they are more up to date
+ if (spilledCacheIter.hasNext()) {
+ value = spilledCacheIter.next();
+ spilledEntry = spillManager.toCacheEntry(value);
+ } else {
+ notFound = true;
+ break;
+ }
+ }
+ if (!notFound) {
+ // Return a spilled entry, this only happens if the
+ // entry was not
+ // found in the LRU cache
+ return spilledEntry;
+ }
+ } catch (IOException ioe) {
+ // TODO rework error handling
+ throw new RuntimeException(ioe);
+ }
+ }
+ // Spilled elements exhausted
+ // Finally return all elements from LRU cache
+ Map.Entry<ImmutableBytesWritable, Aggregator[]> entry = cacheIter.next();
+ return new CacheEntry<ImmutableBytesWritable>(entry.getKey(), entry.getValue());
+ }
+
+ /**
+ * Remove??? Denied!!!
+ */
+ @Override
+ public void remove() {
+ throw new IllegalAccessError("Remove is not supported for this type of iterator");
+ }
+ }
+
+ /**
+ * Closes cache and releases spill resources
+ *
+ * @throws IOException
+ */
+ @Override
+ public void close() throws IOException {
+ // Close spillable resources
+ Closeables.closeQuietly(spillManager);
+ Closeables.closeQuietly(chunk);
+ }
+
+ @Override
+ public RegionScanner getScanner(final RegionScanner s) {
+ final Iterator<Entry<ImmutableBytesWritable, Aggregator[]>> cacheIter = new EntryIterator();
+
+ // scanner using the spillable implementation
+ return new BaseRegionScanner() {
+ @Override
+ public HRegionInfo getRegionInfo() {
+ return s.getRegionInfo();
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ s.close();
+ } finally {
+ // Always close gbCache and swallow possible Exceptions
+ Closeables.closeQuietly(SpillableGroupByCache.this);
+ }
+ }
+
+ @Override
+ public boolean next(List<KeyValue> results) throws IOException {
+ if (!cacheIter.hasNext()) { return false; }
+ Map.Entry<ImmutableBytesWritable, Aggregator[]> ce = cacheIter.next();
+ ImmutableBytesWritable key = ce.getKey();
+ Aggregator[] aggs = ce.getValue();
+ byte[] value = aggregators.toBytes(aggs);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Adding new distinct group: "
+ + Bytes.toStringBinary(key.get(), key.getOffset(), key.getLength()) + " with aggregators "
+ + aggs.toString() + " value = " + Bytes.toStringBinary(value));
+ }
+ results.add(KeyValueUtil.newKeyValue(key.get(), key.getOffset(), key.getLength(), SINGLE_COLUMN_FAMILY,
+ SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length));
+ return cacheIter.hasNext();
+ }
+ };
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/client/ClientKeyValue.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/client/ClientKeyValue.java b/phoenix-core/src/main/java/org/apache/phoenix/client/ClientKeyValue.java
new file mode 100644
index 0000000..3288c4d
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/client/ClientKeyValue.java
@@ -0,0 +1,503 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.client;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * {@link KeyValue} that should only be used from the client side. Enables clients to be more
+ * flexible with the byte arrays they use when building a {@link KeyValue}, but still wire
+ * compatible.
+ * <p>
+ * All<tt> byte[]</tt> (or {@link ImmutableBytesWritable}) passed into the constructor are only ever
+ * read once - when writing <tt>this</tt> onto the wire. They are never copied into another array or
+ * reused. This has the advantage of being much more efficient than the usual {@link KeyValue}
+ * <p>
+ * The down side is that we no longer can support some of the usual methods like
+ * {@link #getBuffer()} or {@link #getKey()} since its is backed with multiple <tt>byte[]</tt> and
+ * <i>should only be used by the client to <b>send</b> information</i>
+ * <p>
+ * <b>WARNING:</b> should only be used by advanced users who know how to construct their own
+ * KeyValues
+ */
+public class ClientKeyValue extends KeyValue {
+
+ private static ImmutableBytesWritable NULL = new ImmutableBytesWritable(new byte[0]);
+ private ImmutableBytesWritable row;
+ private ImmutableBytesWritable family;
+ private ImmutableBytesWritable qualifier;
+ private Type type;
+ private long ts;
+ private ImmutableBytesWritable value;
+
+ /**
+ * @param row must not be <tt>null</tt>
+ * @param type must not be <tt>null</tt>
+ */
+ public ClientKeyValue(ImmutableBytesWritable row, ImmutableBytesWritable family,
+ ImmutableBytesWritable qualifier, long ts, Type type, ImmutableBytesWritable value) {
+ this.row = row;
+ this.family = family == null ? NULL : family;
+ this.qualifier = qualifier == null ? NULL : qualifier;
+ this.type = type;
+ this.ts = ts;
+ this.value = value == null ? NULL : value;
+ }
+
+ /**
+ * Convenience constructor that just wraps all the bytes in {@link ImmutableBytesWritable}
+ */
+ public ClientKeyValue(byte[] row, byte[] family, byte[] qualifier, long ts, Type t, byte[] value) {
+ this(wrap(row), wrap(family), wrap(qualifier), ts, t, wrap(value));
+ }
+
+ /**
+ * Convenience constructor that just wraps all the bytes in {@link ImmutableBytesWritable}
+ */
+ public ClientKeyValue(byte[] row, byte[] family, byte[] qualifier, long ts, Type t) {
+ this(wrap(row), wrap(family), wrap(qualifier), ts, t, null);
+ }
+
+ private static ImmutableBytesWritable wrap(byte[] b) {
+ return b == null ? NULL : new ImmutableBytesWritable(b);
+ }
+
+ @Override
+ public KeyValue clone() {
+ return new ClientKeyValue(copy(row), copy(family), copy(qualifier), ts, type, copy(value));
+ }
+
+ private ImmutableBytesWritable copy(ImmutableBytesWritable bytes) {
+ return new ImmutableBytesWritable(bytes.copyBytes());
+ }
+
+ private static byte[] copyIfNecessary(ImmutableBytesWritable bytes) {
+ byte[] b = bytes.get();
+ if (bytes.getLength() == b.length && bytes.getOffset() == 0) {
+ return b;
+ }
+ return Arrays.copyOfRange(b, bytes.getOffset(), bytes.getOffset() + bytes.getLength());
+ }
+
+ @Override
+ public KeyValue shallowCopy() {
+ return new ClientKeyValue(row, family, qualifier, ts, type, value);
+ }
+
+ @Override
+ public int getValueOffset() {
+ return value.getOffset();
+ }
+
+ @Override
+ public int getValueLength() {
+ return value.getLength();
+ }
+
+ @Override
+ public int getRowOffset() {
+ return row.getOffset();
+ }
+
+ @Override
+ public short getRowLength() {
+ return (short) row.getLength();
+ }
+
+ @Override
+ public int getFamilyOffset() {
+ return family.getOffset();
+ }
+
+ @Override
+ public byte getFamilyLength() {
+ return (byte) family.getLength();
+ }
+
+ @Override
+ public byte getFamilyLength(int foffset) {
+ return this.getFamilyLength();
+ }
+
+ @Override
+ public int getQualifierOffset() {
+ return qualifier.getOffset();
+ }
+
+ @Override
+ public int getQualifierLength() {
+ return qualifier.getLength();
+ }
+
+ @Override
+ public int getQualifierLength(int rlength, int flength) {
+ return this.getQualifierLength();
+ }
+
+ @Override
+ public int getTotalColumnLength(int rlength, int foffset) {
+ return this.getFamilyLength() + this.getQualifierLength();
+ }
+
+ @Override
+ public int getTotalColumnLength() {
+ return qualifier.getLength() + family.getLength();
+ }
+
+ @Override
+ public byte[] getValue() {
+ return copyIfNecessary(value);
+ }
+
+ @Override
+ public byte[] getRow() {
+ return copyIfNecessary(row);
+ }
+
+ @Override
+ public long getTimestamp() {
+ return ts;
+ }
+
+ @Override
+ public byte[] getFamily() {
+ return copyIfNecessary(family);
+ }
+
+ @Override
+ public byte[] getQualifier() {
+ return copyIfNecessary(qualifier);
+ }
+
+ @Override
+ public byte getType() {
+ return this.type.getCode();
+ }
+
+ @Override
+ public boolean matchingFamily(byte[] family) {
+ if (family == null) {
+ if (this.family.getLength() == 0) {
+ return true;
+ }
+ return false;
+ }
+ return matchingFamily(family, 0, family.length);
+ }
+
+ @Override
+ public boolean matchingFamily(byte[] family, int offset, int length) {
+ if (family == null) {
+ if (this.family.getLength() == 0) {
+ return true;
+ }
+ return false;
+ }
+ return matches(family, offset, length, this.family);
+ }
+
+ @Override
+ public boolean matchingFamily(KeyValue other) {
+ if(other == null) {
+ return false;
+ }
+ if(other instanceof ClientKeyValue) {
+ ClientKeyValue kv = (ClientKeyValue)other;
+ return this.family.compareTo(kv.family) == 0;
+ }
+ return matchingFamily(other.getBuffer(), other.getFamilyOffset(), other.getFamilyLength());
+ }
+
+ private boolean matches(byte[] b, int offset, int length, ImmutableBytesWritable bytes) {
+ return Bytes.equals(b, offset, length, bytes.get(), bytes.getOffset(), bytes.getLength());
+ }
+
+ @Override
+ public boolean matchingQualifier(byte[] qualifier) {
+ if (qualifier == null) {
+ if (this.qualifier.getLength() == 0) {
+ return true;
+ }
+ return false;
+ }
+ return matchingQualifier(qualifier, 0, qualifier.length);
+ }
+
+ @Override
+ public boolean matchingQualifier(byte[] qualifier, int offset, int length) {
+ if (qualifier == null) {
+ if (this.qualifier.getLength() == 0) {
+ return true;
+ }
+ return false;
+ }
+ return matches(qualifier, offset, length, this.qualifier);
+ }
+
+ @Override
+ public boolean matchingQualifier(KeyValue other) {
+ if (other == null) {
+ return false;
+ }
+ if (other instanceof ClientKeyValue) {
+ ClientKeyValue kv = (ClientKeyValue) other;
+ return this.row.compareTo(kv.row) == 0;
+ }
+ return matchingQualifier(other.getBuffer(), other.getQualifierOffset(),
+ other.getQualifierLength());
+ }
+
+ @Override
+ public boolean matchingRow(byte[] row){
+ if (row == null) {
+ return false;
+ }
+ return matches(row, 0, row.length, this.row);
+ }
+
+ @Override
+ public boolean matchingRow(byte[] row, int offset, int length) {
+ if (row == null) {
+ return false;
+ }
+ return matches(row, offset, length, this.row);
+ }
+
+ @Override
+ public boolean matchingRow(KeyValue other) {
+ return matchingRow(other.getBuffer(), other.getRowOffset(), other.getRowLength());
+ }
+
+ @Override
+ public boolean matchingColumnNoDelimiter(byte[] column) {
+ // match both the family and qualifier
+ if (matchingFamily(column, 0, this.family.getLength())) {
+ return matchingQualifier(column, family.getLength(), column.length - family.getLength());
+ }
+ return false;
+ }
+
+ @Override
+ public boolean matchingColumn(byte[] family, byte[] qualifier) {
+ return this.matchingFamily(family) && matchingQualifier(qualifier);
+ }
+
+ @Override
+ public boolean nonNullRowAndColumn() {
+ return (this.row != null && row.getLength() > 0) && !isEmptyColumn();
+ }
+
+ @Override
+ public boolean isEmptyColumn() {
+ return this.qualifier != null && this.qualifier.getLength() > 0;
+ }
+
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ // we need to simulate the keyvalue writing, but actually step through each buffer.
+ //start with keylength
+ long longkeylength = KeyValue.KEY_INFRASTRUCTURE_SIZE + row.getLength() + family.getLength()
+ + qualifier.getLength();
+ if (longkeylength > Integer.MAX_VALUE) {
+ throw new IllegalArgumentException("keylength " + longkeylength + " > " + Integer.MAX_VALUE);
+ }
+ // need to figure out the max length before we start
+ int length = this.getLength();
+ out.writeInt(length);
+
+ // write the actual data
+ int keylength = (int) longkeylength;
+ out.writeInt(keylength);
+ int vlength = value == null ? 0 : value.getLength();
+ out.writeInt(vlength);
+ out.writeShort((short) (row.getLength() & 0x0000ffff));
+ out.write(this.row.get(), this.row.getOffset(), this.row.getLength());
+ out.writeByte((byte) (family.getLength() & 0x0000ff));
+ if (family.getLength() != 0) {
+ out.write(this.family.get(), this.family.getOffset(), this.family.getLength());
+ }
+ if (qualifier != NULL) {
+ out.write(this.qualifier.get(), this.qualifier.getOffset(), this.qualifier.getLength());
+ }
+ out.writeLong(ts);
+ out.writeByte(this.type.getCode());
+ if (this.value != NULL) {
+ out.write(this.value.get(), this.value.getOffset(), this.value.getLength());
+ }
+ }
+
+ @Override
+ public int getLength() {
+ return KEYVALUE_INFRASTRUCTURE_SIZE + KeyValue.ROW_LENGTH_SIZE + row.getLength()
+ + KeyValue.FAMILY_LENGTH_SIZE + family.getLength() + qualifier.getLength()
+ + KeyValue.TIMESTAMP_SIZE + KeyValue.TYPE_SIZE + value.getLength();
+ }
+
+ @Override
+ public String toString() {
+ return keyToString() + "/vlen=" + getValueLength() + "/ts=" + getMemstoreTS();
+ }
+
+ private String keyToString() {
+ String row = Bytes.toStringBinary(this.row.get(), this.row.getOffset(), this.row.getLength());
+ String family = this.family.getLength() == 0 ? "" : Bytes.toStringBinary(this.family.get(),
+ this.family.getOffset(), this.family.getLength());
+ String qualifier = this.qualifier.getLength() == 0 ? "" : Bytes.toStringBinary(
+ this.qualifier.get(), this.qualifier.getOffset(), this.qualifier.getLength());
+ String timestampStr = Long.toString(ts);
+ byte type = this.type.getCode();
+ return row + "/" + family + (family != null && family.length() > 0 ? ":" : "") + qualifier
+ + "/" + timestampStr + "/" + Type.codeToType(type);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) return true;
+ if (!super.equals(obj)) return false;
+ if (getClass() != obj.getClass()) return false;
+ ClientKeyValue other = (ClientKeyValue) obj;
+ if (family == null) {
+ if (other.family != null) return false;
+ } else if (!family.equals(other.family)) return false;
+ if (qualifier == null) {
+ if (other.qualifier != null) return false;
+ } else if (!qualifier.equals(other.qualifier)) return false;
+ if (row == null) {
+ if (other.row != null) return false;
+ } else if (!row.equals(other.row)) return false;
+ if (ts != other.ts) return false;
+ if (type != other.type) return false;
+ if (value == null) {
+ if (other.value != null) return false;
+ } else if (!value.equals(other.value)) return false;
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ // TODO do we need to keep the same hashcode logic as KeyValue? Everywhere else we don't keep
+ // them by reference, but presumably clients might hash them.
+ final int prime = 31;
+ int result = super.hashCode();
+ result = prime * result + family.hashCode();
+ result = prime * result + qualifier.hashCode();
+ result = prime * result + row.hashCode();
+ result = prime * result + (int) (ts ^ (ts >>> 32));
+ result = prime * result + type.hashCode();
+ result = prime * result + value.hashCode();
+ return result;
+ }
+
+ @Override
+ public void readFields(int length, DataInput in) throws IOException {
+ throw new UnsupportedOperationException(ClientKeyValue.class.getSimpleName()
+ + " should not be used for server-side operations");
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ throw new UnsupportedOperationException(ClientKeyValue.class.getSimpleName()
+ + " should not be used for server-side operations");
+ }
+
+ @Override
+ public int getKeyOffset() {
+ return 0;
+ }
+
+
+ @Override
+ public int getFamilyOffset(int rlength) {
+ return 0;
+ }
+
+ @Override
+ public int getQualifierOffset(int foffset) {
+ return 0;
+ }
+
+ @Override
+ public int getTimestampOffset() {
+ return 0;
+ }
+
+ @Override
+ public int getTimestampOffset(int keylength) {
+ return 0;
+ }
+
+ @Override
+ public int getOffset() {
+ return 0;
+ }
+
+ @Override
+ public boolean updateLatestStamp(byte[] now) {
+ if (this.isLatestTimestamp()) {
+ // unfortunately, this is a bit slower than the usual kv, but we don't expect this to happen
+ // all that often on the client (unless users are updating the ts this way), as it generally
+ // happens on the server
+ this.ts = Bytes.toLong(now);
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public boolean isLatestTimestamp() {
+ return this.ts == HConstants.LATEST_TIMESTAMP;
+ }
+
+ @Override
+ public int getKeyLength() {
+ return KEY_INFRASTRUCTURE_SIZE + getRowLength() + getFamilyLength() + getQualifierLength();
+ }
+
+ @Override
+ public byte[] getKey() {
+ throw new UnsupportedOperationException(ClientKeyValue.class.getSimpleName()
+ + " does not support a single backing buffer.");
+ }
+
+ @Override
+ public String getKeyString() {
+ throw new UnsupportedOperationException(ClientKeyValue.class.getSimpleName()
+ + " does not support a single backing buffer.");
+ }
+
+ @Override
+ public SplitKeyValue split() {
+ throw new UnsupportedOperationException(ClientKeyValue.class.getSimpleName()
+ + " should not be used for server-side operations");
+ }
+
+ @Override
+ public byte[] getBuffer() {
+ throw new UnsupportedOperationException(ClientKeyValue.class.getSimpleName()
+ + " does not support a single backing buffer.");
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/client/ClientKeyValueBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/client/ClientKeyValueBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/client/ClientKeyValueBuilder.java
new file mode 100644
index 0000000..09e295a
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/client/ClientKeyValueBuilder.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.client;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+
+/**
+ * A {@link KeyValueBuilder} that builds {@link ClientKeyValue}, eliminating the extra byte copies
+ * inherent in the standard {@link KeyValue} implementation.
+ * <p>
+ * This {@link KeyValueBuilder} is only supported in HBase 0.94.14+ (
+ * {@link PhoenixDatabaseMetaData#CLIENT_KEY_VALUE_BUILDER_THRESHOLD}), with the addition of
+ * HBASE-9834.
+ */
+public class ClientKeyValueBuilder extends KeyValueBuilder {
+
+ public static final KeyValueBuilder INSTANCE = new ClientKeyValueBuilder();
+
+ private ClientKeyValueBuilder() {
+ // private ctor for singleton
+ }
+
+ @Override
+ public KeyValue buildPut(ImmutableBytesWritable row, ImmutableBytesWritable family,
+ ImmutableBytesWritable qualifier, long ts, ImmutableBytesWritable value) {
+ return new ClientKeyValue(row, family, qualifier, ts, Type.Put, value);
+ }
+
+ @Override
+ public KeyValue buildDeleteFamily(ImmutableBytesWritable row, ImmutableBytesWritable family,
+ ImmutableBytesWritable qualifier, long ts) {
+ return new ClientKeyValue(row, family, qualifier, ts, Type.DeleteFamily, null);
+ }
+
+ @Override
+ public KeyValue buildDeleteColumns(ImmutableBytesWritable row, ImmutableBytesWritable family,
+ ImmutableBytesWritable qualifier, long ts) {
+ return new ClientKeyValue(row, family, qualifier, ts, Type.DeleteColumn, null);
+ }
+
+ @Override
+ public KeyValue buildDeleteColumn(ImmutableBytesWritable row, ImmutableBytesWritable family,
+ ImmutableBytesWritable qualifier, long ts) {
+ return new ClientKeyValue(row, family, qualifier, ts, Type.Delete, null);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/client/GenericKeyValueBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/client/GenericKeyValueBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/client/GenericKeyValueBuilder.java
new file mode 100644
index 0000000..b3771ca
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/client/GenericKeyValueBuilder.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.client;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import static org.apache.hadoop.hbase.index.util.ImmutableBytesPtr.copyBytesIfNecessary;
+
+/**
+ * {@link KeyValueBuilder} that does simple byte[] copies to build the underlying key-value. This is
+ * exactly the same behavior as currently used in {@link Delete} and {@link Put}.
+ */
+public class GenericKeyValueBuilder extends KeyValueBuilder {
+
+ public static final KeyValueBuilder INSTANCE = new GenericKeyValueBuilder();
+
+ private GenericKeyValueBuilder() {
+ // private ctor for singleton
+ }
+
+ @Override
+ public KeyValue buildPut(ImmutableBytesWritable row, ImmutableBytesWritable family,
+ ImmutableBytesWritable qualifier, long ts, ImmutableBytesWritable value) {
+ return build(row, family, qualifier, ts, Type.Put, value);
+ }
+
+ @Override
+ public KeyValue buildDeleteFamily(ImmutableBytesWritable row, ImmutableBytesWritable family,
+ ImmutableBytesWritable qualifier, long ts) {
+ return build(row, family, qualifier, ts, Type.DeleteFamily, null);
+ }
+
+ @Override
+ public KeyValue buildDeleteColumns(ImmutableBytesWritable row, ImmutableBytesWritable family,
+ ImmutableBytesWritable qualifier, long ts) {
+ return build(row, family, qualifier, ts, Type.DeleteColumn, null);
+ }
+
+ @Override
+ public KeyValue buildDeleteColumn(ImmutableBytesWritable row, ImmutableBytesWritable family,
+ ImmutableBytesWritable qualifier, long ts) {
+ return build(row, family, qualifier, ts, Type.Delete, null);
+ }
+
+ private KeyValue build(ImmutableBytesWritable row, ImmutableBytesWritable family,
+ ImmutableBytesWritable qualifier, long ts, KeyValue.Type type, ImmutableBytesWritable value) {
+ return new KeyValue(copyBytesIfNecessary(row), copyBytesIfNecessary(family),
+ copyBytesIfNecessary(qualifier), ts, type, value == null? null: copyBytesIfNecessary(value));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/client/KeyValueBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/client/KeyValueBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/client/KeyValueBuilder.java
new file mode 100644
index 0000000..48608ff
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/client/KeyValueBuilder.java
@@ -0,0 +1,108 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.client;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.util.MetaDataUtil;
+
+/**
+ * Build {@link KeyValue} in an efficient way
+ */
+public abstract class KeyValueBuilder {
+
+ /**
+ * Helper method for a {@link KeyValueBuilder} that catches an IOException from a {@link Put}
+ * when adding a {@link KeyValue} generated by the KeyValueBuilder.
+ * @throws RuntimeException if there is an IOException thrown from the underlying {@link Put}
+ */
+ @SuppressWarnings("javadoc")
+ public static void addQuietly(Put put, KeyValueBuilder builder, KeyValue kv) {
+ try {
+ put.add(kv);
+ } catch (IOException e) {
+ throw new RuntimeException("KeyValue Builder " + builder + " created an invalid kv: "
+ + kv + "!");
+ }
+ }
+
+ /**
+ * Helper method for a {@link KeyValueBuilder} that catches an IOException from a {@link Put}
+ * when adding a {@link KeyValue} generated by the KeyValueBuilder.
+ * @throws RuntimeException if there is an IOException thrown from the underlying {@link Put}
+ */
+ @SuppressWarnings("javadoc")
+ public static void deleteQuietly(Delete delete, KeyValueBuilder builder, KeyValue kv) {
+ try {
+ delete.addDeleteMarker(kv);
+ } catch (IOException e) {
+ throw new RuntimeException("KeyValue Builder " + builder + " created an invalid kv: "
+ + kv + "!");
+ }
+ }
+
+ private static final int CUSTOM_KEY_VALUE_MIN_VERSION = MetaDataUtil.encodeVersion("0.94.14");
+
+ public static KeyValueBuilder get(String hbaseVersion) {
+ int version = MetaDataUtil.encodeVersion(hbaseVersion);
+ if (version >= CUSTOM_KEY_VALUE_MIN_VERSION) {
+ return ClientKeyValueBuilder.INSTANCE;
+ }
+ return GenericKeyValueBuilder.INSTANCE;
+ }
+
+ public KeyValue buildPut(ImmutableBytesWritable row, ImmutableBytesWritable family,
+ ImmutableBytesWritable qualifier, ImmutableBytesWritable value) {
+ return buildPut(row, family, qualifier, HConstants.LATEST_TIMESTAMP, value);
+ }
+
+ public abstract KeyValue buildPut(ImmutableBytesWritable row, ImmutableBytesWritable family,
+ ImmutableBytesWritable qualifier, long ts, ImmutableBytesWritable value);
+
+ public KeyValue buildDeleteFamily(ImmutableBytesWritable row, ImmutableBytesWritable family,
+ ImmutableBytesWritable qualifier) {
+ return buildDeleteFamily(row, family, qualifier, HConstants.LATEST_TIMESTAMP);
+ }
+
+ public abstract KeyValue buildDeleteFamily(ImmutableBytesWritable row,
+ ImmutableBytesWritable family, ImmutableBytesWritable qualifier, long ts);
+
+ public KeyValue buildDeleteColumns(ImmutableBytesWritable row, ImmutableBytesWritable family,
+ ImmutableBytesWritable qualifier) {
+ return buildDeleteColumns(row, family, qualifier, HConstants.LATEST_TIMESTAMP);
+ }
+
+ public abstract KeyValue buildDeleteColumns(ImmutableBytesWritable row,
+ ImmutableBytesWritable family, ImmutableBytesWritable qualifier, long ts);
+
+ public KeyValue buildDeleteColumn(ImmutableBytesWritable row, ImmutableBytesWritable family,
+ ImmutableBytesWritable qualifier) {
+ return buildDeleteColumn(row, family, qualifier, HConstants.LATEST_TIMESTAMP);
+ }
+
+ public abstract KeyValue buildDeleteColumn(ImmutableBytesWritable row,
+ ImmutableBytesWritable family, ImmutableBytesWritable qualifier, long ts);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/compile/AggregationManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/AggregationManager.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/AggregationManager.java
new file mode 100644
index 0000000..70ea3a2
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/AggregationManager.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import org.apache.phoenix.expression.aggregator.ClientAggregators;
+
+/**
+ *
+ * Class that manages aggregations during query compilation
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class AggregationManager {
+ private ClientAggregators aggregators;
+ private int position = 0;
+
+ public AggregationManager() {
+ }
+
+ public ClientAggregators getAggregators() {
+ return aggregators;
+ }
+
+ /**
+ * @return allocate the next available zero-based positional index
+ * for the client-side aggregate function.
+ */
+ protected int nextPosition() {
+ return position++;
+ }
+
+ public void setAggregators(ClientAggregators clientAggregator) {
+ this.aggregators = clientAggregator;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/compile/BindManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/BindManager.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/BindManager.java
new file mode 100644
index 0000000..859d233
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/BindManager.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import java.sql.ParameterMetaData;
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.jdbc.PhoenixParameterMetaData;
+import org.apache.phoenix.parse.BindParseNode;
+import org.apache.phoenix.schema.PDatum;
+
+
+/**
+ *
+ * Class that manages binding parameters and checking type matching. There are
+ * two main usages:
+ *
+ * 1) the standard query case where we have the values for the binds.
+ * 2) the retrieve param metadata case where we don't have the bind values.
+ *
+ * In both cases, during query compilation we figure out what type the bind variable
+ * "should" be, based on how it's used in the query. For example foo < ? would expect
+ * that the bind variable type matches or can be coerced to the type of foo. For (1),
+ * we check that the bind value has the correct type and for (2) we set the param
+ * metadata type.
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class BindManager {
+ public static final Object UNBOUND_PARAMETER = new Object();
+
+ private final List<Object> binds;
+ private final PhoenixParameterMetaData bindMetaData;
+
+ public BindManager(List<Object> binds) {
+ this.binds = binds;
+ this.bindMetaData = new PhoenixParameterMetaData(binds.size());
+ }
+
+ public ParameterMetaData getParameterMetaData() {
+ return bindMetaData;
+ }
+
+ public Object getBindValue(BindParseNode node) throws SQLException {
+ int index = node.getIndex();
+ if (index < 0 || index >= binds.size()) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.PARAM_INDEX_OUT_OF_BOUND)
+ .setMessage("binds size: " + binds.size() + "; index: " + index).build().buildException();
+ }
+ Object value = binds.get(index);
+ if (value == UNBOUND_PARAMETER) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.PARAM_VALUE_UNBOUND)
+ .setMessage(node.toString()).build().buildException();
+ }
+ return value;
+ }
+
+ public void addParamMetaData(BindParseNode bind, PDatum column) throws SQLException {
+ bindMetaData.addParam(bind,column);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/compile/ColumnProjector.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ColumnProjector.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ColumnProjector.java
new file mode 100644
index 0000000..8c1697b
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ColumnProjector.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import java.sql.SQLException;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+
+
+/**
+ *
+ * Interface used to access the value of a projected column.
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public interface ColumnProjector {
+ /**
+ * Get the column name as it was referenced in the query
+ * @return the database column name
+ */
+ String getName();
+
+ /**
+ * Get the expression
+ * @return the expression for the column projector
+ */
+ public Expression getExpression();
+
+ // TODO: An expression may contain references to multiple tables.
+ /**
+ * Get the name of the hbase table containing the column
+ * @return the hbase table name
+ */
+ String getTableName();
+
+ /**
+ * Get the value of the column, coercing it if necessary to the specified type
+ * @param tuple the row containing the column
+ * @param type the type to which to coerce the binary value
+ * @param ptr used to retrieve the value
+ * @return the object representation of the column value.
+ * @throws SQLException
+ */
+ Object getValue(Tuple tuple, PDataType type, ImmutableBytesWritable ptr) throws SQLException;
+
+ boolean isCaseSensitive();
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/compile/ColumnResolver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ColumnResolver.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ColumnResolver.java
new file mode 100644
index 0000000..49a1947
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ColumnResolver.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.phoenix.schema.ColumnRef;
+import org.apache.phoenix.schema.TableRef;
+
+
+
+/**
+ *
+ * Interface used to resolve column references occurring
+ * in the select statement.
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public interface ColumnResolver {
+
+ /**
+ * Returns the collection of resolved tables in the FROM clause.
+ */
+ public List<TableRef> getTables();
+
+ /**
+ * Resolves column using name and alias.
+ * @param schemaName TODO
+ * @param tableName TODO
+ * @param colName TODO
+ * @return the resolved ColumnRef
+ * @throws ColumnNotFoundException if the column could not be resolved
+ * @throws AmbiguousColumnException if the column name is ambiguous
+ */
+ public ColumnRef resolveColumn(String schemaName, String tableName, String colName) throws SQLException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java
new file mode 100644
index 0000000..718d09a
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import java.sql.ParameterMetaData;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.hbase.client.Scan;
+
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.expression.LiteralExpression;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.parse.CreateIndexStatement;
+import org.apache.phoenix.parse.ParseNode;
+import org.apache.phoenix.schema.MetaDataClient;
+
+public class CreateIndexCompiler {
+ private final PhoenixStatement statement;
+
+ public CreateIndexCompiler(PhoenixStatement statement) {
+ this.statement = statement;
+ }
+
+ public MutationPlan compile(final CreateIndexStatement create) throws SQLException {
+ final PhoenixConnection connection = statement.getConnection();
+ final ColumnResolver resolver = FromCompiler.getResolver(create, connection);
+ Scan scan = new Scan();
+ final StatementContext context = new StatementContext(statement, resolver, statement.getParameters(), scan);
+ ExpressionCompiler expressionCompiler = new ExpressionCompiler(context);
+ List<ParseNode> splitNodes = create.getSplitNodes();
+ final byte[][] splits = new byte[splitNodes.size()][];
+ for (int i = 0; i < splits.length; i++) {
+ ParseNode node = splitNodes.get(i);
+ if (!node.isStateless()) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.SPLIT_POINT_NOT_CONSTANT)
+ .setMessage("Node: " + node).build().buildException();
+ }
+ LiteralExpression expression = (LiteralExpression)node.accept(expressionCompiler);
+ splits[i] = expression.getBytes();
+ }
+ final MetaDataClient client = new MetaDataClient(connection);
+
+ return new MutationPlan() {
+
+ @Override
+ public ParameterMetaData getParameterMetaData() {
+ return context.getBindManager().getParameterMetaData();
+ }
+
+ @Override
+ public PhoenixConnection getConnection() {
+ return connection;
+ }
+
+ @Override
+ public MutationState execute() throws SQLException {
+ return client.createIndex(create, splits);
+ }
+
+ @Override
+ public ExplainPlan getExplainPlan() throws SQLException {
+ return new ExplainPlan(Collections.singletonList("CREATE INDEX"));
+ }
+ };
+ }
+}