You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/01/27 23:16:01 UTC

[45/51] [partial] Initial commit of master branch from github

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java
new file mode 100644
index 0000000..46a771b
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java
@@ -0,0 +1,228 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.index.write.recovery;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+
+import com.google.common.collect.Multimap;
+import org.apache.hadoop.hbase.index.CapturingAbortable;
+import org.apache.hadoop.hbase.index.exception.MultiIndexWriteFailureException;
+import org.apache.hadoop.hbase.index.exception.SingleIndexWriteFailureException;
+import org.apache.hadoop.hbase.index.parallel.EarlyExitFailure;
+import org.apache.hadoop.hbase.index.parallel.Task;
+import org.apache.hadoop.hbase.index.parallel.TaskBatch;
+import org.apache.hadoop.hbase.index.parallel.TaskRunner;
+import org.apache.hadoop.hbase.index.parallel.ThreadPoolBuilder;
+import org.apache.hadoop.hbase.index.parallel.ThreadPoolManager;
+import org.apache.hadoop.hbase.index.parallel.WaitForCompletionTaskRunner;
+import org.apache.hadoop.hbase.index.table.CachingHTableFactory;
+import org.apache.hadoop.hbase.index.table.HTableFactory;
+import org.apache.hadoop.hbase.index.table.HTableInterfaceReference;
+import org.apache.hadoop.hbase.index.write.IndexCommitter;
+import org.apache.hadoop.hbase.index.write.IndexWriter;
+import org.apache.hadoop.hbase.index.write.IndexWriterUtils;
+import org.apache.hadoop.hbase.index.write.ParallelWriterIndexCommitter;
+
+/**
+ * Like the {@link ParallelWriterIndexCommitter}, but blocks until all writes have attempted to
+ * allow the caller to retrieve the failed and succeeded index updates. Therefore, this class will
+ * be a lot slower, in the face of failures, when compared to the
+ * {@link ParallelWriterIndexCommitter} (though as fast for writes), so it should be used only when
+ * you need to at least attempt all writes and know their result; for instance, this is fine for
+ * doing WAL recovery - it's not a performance intensive situation and we want to limit the the
+ * edits we need to retry.
+ * <p>
+ * On failure to {@link #write(Multimap)}, we return a {@link MultiIndexWriteFailureException} that
+ * contains the list of {@link HTableInterfaceReference} that didn't complete successfully.
+ * <p>
+ * Failures to write to the index can happen several different ways:
+ * <ol>
+ * <li><tt>this</tt> is {@link #stop(String) stopped} or aborted (via the passed {@link Abortable}.
+ * This causing any pending tasks to fail whatever they are doing as fast as possible. Any writes
+ * that have not begun are not even attempted and marked as failures.</li>
+ * <li>A batch write fails. This is the generic HBase write failure - it may occur because the index
+ * table is not available, .META. or -ROOT- is unavailable, or any other (of many) possible HBase
+ * exceptions.</li>
+ * </ol>
+ * Regardless of how the write fails, we still wait for all writes to complete before passing the
+ * failure back to the client.
+ */
+public class TrackingParallelWriterIndexCommitter implements IndexCommitter {
+  private static final Log LOG = LogFactory.getLog(TrackingParallelWriterIndexCommitter.class);
+
+  public static final String NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY = "index.trackingwriter.threads.max";
+  private static final int DEFAULT_CONCURRENT_INDEX_WRITER_THREADS = 10;
+  private static final String INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY =
+      "index.trackingwriter.threads.keepalivetime";
+  
+  private TaskRunner pool;
+  private HTableFactory factory;
+  private CapturingAbortable abortable;
+  private Stoppable stopped;
+
+  @Override
+  public void setup(IndexWriter parent, RegionCoprocessorEnvironment env, String name) {
+    Configuration conf = env.getConfiguration();
+    setup(IndexWriterUtils.getDefaultDelegateHTableFactory(env),
+      ThreadPoolManager.getExecutor(
+        new ThreadPoolBuilder(name, conf).
+          setMaxThread(NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY,
+            DEFAULT_CONCURRENT_INDEX_WRITER_THREADS).
+          setCoreTimeout(INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env),
+      env.getRegionServerServices(), parent, CachingHTableFactory.getCacheSize(conf));
+  }
+
+  /**
+   * Setup <tt>this</tt>.
+   * <p>
+   * Exposed for TESTING
+   */
+  void setup(HTableFactory factory, ExecutorService pool, Abortable abortable, Stoppable stop,
+      int cacheSize) {
+    this.pool = new WaitForCompletionTaskRunner(pool);
+    this.factory = new CachingHTableFactory(factory, cacheSize);
+    this.abortable = new CapturingAbortable(abortable);
+    this.stopped = stop;
+  }
+
+  @Override
+  public void write(Multimap<HTableInterfaceReference, Mutation> toWrite)
+      throws MultiIndexWriteFailureException {
+    Set<Entry<HTableInterfaceReference, Collection<Mutation>>> entries = toWrite.asMap().entrySet();
+    TaskBatch<Boolean> tasks = new TaskBatch<Boolean>(entries.size());
+    List<HTableInterfaceReference> tables = new ArrayList<HTableInterfaceReference>(entries.size());
+    for (Entry<HTableInterfaceReference, Collection<Mutation>> entry : entries) {
+      // get the mutations for each table. We leak the implementation here a little bit to save
+      // doing a complete copy over of all the index update for each table.
+      final List<Mutation> mutations = (List<Mutation>) entry.getValue();
+      // track each reference so we can get at it easily later, when determing failures
+      final HTableInterfaceReference tableReference = entry.getKey();
+      tables.add(tableReference);
+
+      /*
+       * Write a batch of index updates to an index table. This operation stops (is cancelable) via
+       * two mechanisms: (1) setting aborted or stopped on the IndexWriter or, (2) interrupting the
+       * running thread. The former will only work if we are not in the midst of writing the current
+       * batch to the table, though we do check these status variables before starting and before
+       * writing the batch. The latter usage, interrupting the thread, will work in the previous
+       * situations as was at some points while writing the batch, depending on the underlying
+       * writer implementation (HTableInterface#batch is blocking, but doesn't elaborate when is
+       * supports an interrupt).
+       */
+      tasks.add(new Task<Boolean>() {
+
+        /**
+         * Do the actual write to the primary table. We don't need to worry about closing the table
+         * because that is handled the {@link CachingHTableFactory}.
+         */
+        @Override
+        public Boolean call() throws Exception {
+          try {
+            // this may have been queued, but there was an abort/stop so we try to early exit
+            throwFailureIfDone();
+
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Writing index update:" + mutations + " to table: " + tableReference);
+            }
+            HTableInterface table = factory.getTable(tableReference.get());
+            throwFailureIfDone();
+            table.batch(mutations);
+          } catch (InterruptedException e) {
+            // reset the interrupt status on the thread
+            Thread.currentThread().interrupt();
+            throw e;
+          } catch (Exception e) {
+            throw e;
+          }
+          return Boolean.TRUE;
+        }
+
+        private void throwFailureIfDone() throws SingleIndexWriteFailureException {
+          if (stopped.isStopped() || abortable.isAborted()
+              || Thread.currentThread().isInterrupted()) {
+            throw new SingleIndexWriteFailureException(
+                "Pool closed, not attempting to write to the index!", null);
+          }
+
+        }
+      });
+    }
+
+    List<Boolean> results = null;
+    try {
+      LOG.debug("Waiting on index update tasks to complete...");
+      results = this.pool.submitUninterruptible(tasks);
+    } catch (ExecutionException e) {
+      throw new RuntimeException(
+          "Should not fail on the results while using a WaitForCompletionTaskRunner", e);
+    } catch (EarlyExitFailure e) {
+      throw new RuntimeException("Stopped while waiting for batch, quiting!", e);
+    }
+    
+    // track the failures. We only ever access this on return from our calls, so no extra
+    // synchronization is needed. We could update all the failures as we find them, but that add a
+    // lot of locking overhead, and just doing the copy later is about as efficient.
+    List<HTableInterfaceReference> failures = new ArrayList<HTableInterfaceReference>();
+    int index = 0;
+    for (Boolean result : results) {
+      // there was a failure
+      if (result == null) {
+        // we know which table failed by the index of the result
+        failures.add(tables.get(index));
+      }
+      index++;
+    }
+
+    // if any of the tasks failed, then we need to propagate the failure
+    if (failures.size() > 0) {
+      // make the list unmodifiable to avoid any more synchronization concerns
+      throw new MultiIndexWriteFailureException(Collections.unmodifiableList(failures));
+    }
+    return;
+  }
+
+  @Override
+  public void stop(String why) {
+    LOG.info("Shutting down " + this.getClass().getSimpleName());
+    this.pool.stop(why);
+    this.factory.shutdown();
+  }
+
+  @Override
+  public boolean isStopped() {
+    return this.stopped.isStopped();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexKeyValueSkipListSet.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexKeyValueSkipListSet.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexKeyValueSkipListSet.java
new file mode 100644
index 0000000..8314fef
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexKeyValueSkipListSet.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.Comparator;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+import org.apache.hadoop.hbase.KeyValue;
+
+/**
+ * Like a {@link KeyValueSkipListSet}, but also exposes useful, atomic methods (e.g.
+ * {@link #putIfAbsent(KeyValue)}).
+ */
+public class IndexKeyValueSkipListSet extends KeyValueSkipListSet {
+
+  // this is annoying that we need to keep this extra pointer around here, but its pretty minimal
+  // and means we don't need to change the HBase code.
+  private ConcurrentSkipListMap<KeyValue, KeyValue> delegate;
+
+  /**
+   * Create a new {@link IndexKeyValueSkipListSet} based on the passed comparator.
+   * @param comparator to use when comparing keyvalues. It is used both to determine sort order as
+   *          well as object equality in the map.
+   * @return a map that uses the passed comparator
+   */
+  public static IndexKeyValueSkipListSet create(Comparator<KeyValue> comparator) {
+    ConcurrentSkipListMap<KeyValue, KeyValue> delegate =
+        new ConcurrentSkipListMap<KeyValue, KeyValue>(comparator);
+    IndexKeyValueSkipListSet ret = new IndexKeyValueSkipListSet(delegate);
+    return ret;
+  }
+
+  /**
+   * @param delegate map to which to delegate all calls
+   */
+  public IndexKeyValueSkipListSet(ConcurrentSkipListMap<KeyValue, KeyValue> delegate) {
+    super(delegate);
+    this.delegate = delegate;
+  }
+
+  /**
+   * Add the passed {@link KeyValue} to the set, only if one is not already set. This is equivalent
+   * to
+   * <pre>
+   * if (!set.containsKey(key))
+   *   return set.put(key);
+   * else
+   *  return map.set(key);
+   * </pre>
+   * except that the action is performed atomically.
+   * @param kv {@link KeyValue} to add
+   * @return the previous value associated with the specified key, or <tt>null</tt> if there was no
+   *         previously stored key
+   * @throws ClassCastException if the specified key cannot be compared with the keys currently in
+   *           the map
+   * @throws NullPointerException if the specified key is null
+   */
+  public KeyValue putIfAbsent(KeyValue kv) {
+    return this.delegate.putIfAbsent(kv, kv);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedHLogReader.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedHLogReader.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedHLogReader.java
new file mode 100644
index 0000000..bad82c4
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedHLogReader.java
@@ -0,0 +1,152 @@
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
+import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
+import org.apache.hadoop.io.Writable;
+
+
+
+/**
+ * A WALReader that can also deserialize custom {@link WALEdit}s that contain index information.
+ * <p>
+ * This is basically a wrapper around a {@link SequenceFileLogReader} that has a custom
+ * {@link SequenceFileLogReader.WALReader#next(Object)} method that only replaces the creation of the WALEdit with our own custom
+ * type
+ * <p>
+ * This is a little bit of a painful way of going about this, but saves the effort of hacking the
+ * HBase source (and deal with getting it reviewed and backported, etc.) and still works.
+ */
+/*
+ * TODO: Support splitting index updates into their own WAL entries on recovery (basically, just
+ * queue them up in next), if we know that the region was on the server when it crashed. However,
+ * this is kind of difficult as we need to know a lot of things the state of the system - basically,
+ * we need to track which of the regions were on the server when it crashed only only split those
+ * edits out into their respective regions.
+ */
+public class IndexedHLogReader implements Reader {
+  private static final Log LOG = LogFactory.getLog(IndexedHLogReader.class);
+
+  private SequenceFileLogReader delegate;
+
+
+  private static class IndexedWALReader extends SequenceFileLogReader.WALReader {
+
+    /**
+     * @param fs
+     * @param p
+     * @param c
+     * @throws IOException
+     */
+    IndexedWALReader(FileSystem fs, Path p, Configuration c) throws IOException {
+      super(fs, p, c);
+    }
+
+    /**
+     * we basically have to reproduce what the SequenceFile.Reader is doing in next(), but without
+     * the check on the value class, since we have a special value class that doesn't directly match
+     * what was specified in the file header
+     */
+    @Override
+    public synchronized boolean next(Writable key, Writable val) throws IOException {
+      boolean more = next(key);
+
+      if (more) {
+        getCurrentValue(val);
+      }
+
+      return more;
+    }
+
+  }
+
+  public IndexedHLogReader() {
+    this.delegate = new SequenceFileLogReader();
+  }
+
+  @Override
+  public void init(final FileSystem fs, final Path path, Configuration conf) throws IOException {
+    this.delegate.init(fs, path, conf);
+    // close the old reader and replace with our own, custom one
+    this.delegate.reader.close();
+    this.delegate.reader = new IndexedWALReader(fs, path, conf);
+    Exception e = new Exception();
+    LOG.info("Instantiated indexed log reader." + Arrays.toString(e.getStackTrace()));
+    LOG.info("Got conf: " + conf);
+  }
+
+  @Override
+  public void close() throws IOException {
+    this.delegate.close();
+  }
+
+  @Override
+  public Entry next() throws IOException {
+    return next(null);
+  }
+
+  @Override
+  public Entry next(Entry reuse) throws IOException {
+    delegate.entryStart = delegate.reader.getPosition();
+    HLog.Entry e = reuse;
+    if (e == null) {
+      HLogKey key;
+      if (delegate.keyClass == null) {
+        key = HLog.newKey(delegate.conf);
+      } else {
+        try {
+          key = delegate.keyClass.newInstance();
+        } catch (InstantiationException ie) {
+          throw new IOException(ie);
+        } catch (IllegalAccessException iae) {
+          throw new IOException(iae);
+        }
+      }
+      WALEdit val = new WALEdit();
+      e = new HLog.Entry(key, val);
+    }
+
+    // now read in the HLog.Entry from the WAL
+    boolean nextPairValid = false;
+    try {
+      if (delegate.compressionContext != null) {
+        throw new UnsupportedOperationException(
+            "Reading compression isn't supported with the IndexedHLogReader! Compresed WALEdits "
+                + "are only support for HBase 0.94.9+ and with the IndexedWALEditCodec!");
+      }
+      // this is the special bit - we use our custom entry to read in the key-values that have index
+      // information, but otherwise it looks just like a regular WALEdit
+      IndexedWALEdit edit = new IndexedWALEdit(e.getEdit());
+      nextPairValid = delegate.reader.next(e.getKey(), edit);
+    } catch (IOException ioe) {
+      throw delegate.addFileInfoToException(ioe);
+    }
+    delegate.edit++;
+    if (delegate.compressionContext != null && delegate.emptyCompressionContext) {
+      delegate.emptyCompressionContext = false;
+    }
+    return nextPairValid ? e : null;
+  }
+
+  @Override
+  public void seek(long pos) throws IOException {
+    this.delegate.seek(pos);
+  }
+
+  @Override
+  public long getPosition() throws IOException {
+    return this.delegate.getPosition();
+  }
+
+  @Override
+  public void reset() throws IOException {
+    this.delegate.reset();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEdit.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEdit.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEdit.java
new file mode 100644
index 0000000..6749cc9
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEdit.java
@@ -0,0 +1,91 @@
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import org.apache.hadoop.hbase.index.wal.KeyValueCodec;
+
+/**
+ * Read in data for a delegate {@link WALEdit}. This should only be used in concert with an IndexedHLogReader
+ * <p>
+ * This class should only be used with HBase &lt; 0.94.9. Newer installations of HBase should
+ * instead use the IndexedWALEditCodec along with the correct configuration options.
+ */
+public class IndexedWALEdit extends WALEdit {
+  //reproduced here so we don't need to modify the HBase source.
+  private static final int VERSION_2 = -1;
+  private WALEdit delegate;
+
+  /**
+   * Copy-constructor. Only does a surface copy of the delegates fields - no actual data is copied, only referenced.
+   * @param delegate to copy
+   */
+  @SuppressWarnings("deprecation")
+  public IndexedWALEdit(WALEdit delegate) {
+    this.delegate = delegate;
+    // reset the delegate's fields
+    this.delegate.getKeyValues().clear();
+    if (this.delegate.getScopes() != null) {
+      this.delegate.getScopes().clear();
+    }
+  }
+
+  public IndexedWALEdit() {
+
+  }
+
+  @Override
+public void setCompressionContext(CompressionContext context) {
+    throw new UnsupportedOperationException(
+        "Compression not supported for IndexedWALEdit! If you are using HBase 0.94.9+, use IndexedWALEditCodec instead.");
+  }
+
+  @SuppressWarnings("deprecation")
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    delegate.getKeyValues().clear();
+    if (delegate.getScopes() != null) {
+      delegate.getScopes().clear();
+    }
+    // ----------------------------------------------------------------------------------------
+    // no compression, so we do pretty much what the usual WALEdit does, plus a little magic to
+    // capture the index updates
+    // -----------------------------------------------------------------------------------------
+    int versionOrLength = in.readInt();
+    if (versionOrLength != VERSION_2) {
+      throw new IOException("You must update your cluster to the lastest version of HBase and"
+          + " clean out all logs (cleanly start and then shutdown) before enabling indexing!");
+    }
+    // this is new style HLog entry containing multiple KeyValues.
+    List<KeyValue> kvs = KeyValueCodec.readKeyValues(in);
+    delegate.getKeyValues().addAll(kvs);
+
+    // then read in the rest of the WALEdit
+    int numFamilies = in.readInt();
+    NavigableMap<byte[], Integer> scopes = delegate.getScopes();
+    if (numFamilies > 0) {
+      if (scopes == null) {
+        scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
+      }
+      for (int i = 0; i < numFamilies; i++) {
+        byte[] fam = Bytes.readByteArray(in);
+        int scope = in.readInt();
+        scopes.put(fam, scope);
+      }
+      delegate.setScopes(scopes);
+    }
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    throw new IOException(
+        "Indexed WALEdits aren't written directly out - use IndexedKeyValues instead");
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEditCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEditCodec.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEditCodec.java
new file mode 100644
index 0000000..01d7390
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEditCodec.java
@@ -0,0 +1,195 @@
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.codec.BaseDecoder;
+import org.apache.hadoop.hbase.codec.BaseEncoder;
+import org.apache.hadoop.hbase.codec.Decoder;
+import org.apache.hadoop.hbase.codec.Encoder;
+
+import org.apache.hadoop.hbase.index.wal.IndexedKeyValue;
+import org.apache.hadoop.hbase.index.wal.KeyValueCodec;
+
+
+/**
+ * Support custom indexing {@link KeyValue}s when written to the WAL.
+ * <p>
+ * Currently, we don't support reading older WAL files - only new WAL files. Therefore, this should
+ * not be installed on a running cluster, but rather one that has been cleanly shutdown and requires
+ * no WAL replay on startup.
+ */
+public class IndexedWALEditCodec extends WALEditCodec {
+
+  // can't have negative values because reading off a stream returns a negative if its the end of
+  // the stream
+  private static final int REGULAR_KEY_VALUE_MARKER = 0;
+  private CompressionContext compression;
+
+  /** Required nullary constructor */
+  public IndexedWALEditCodec() {
+  }
+
+  /**
+   * Override the parent implementation so we can get access to the current context too
+   * @param compression compression to support for the encoder/decoder
+   */
+  @Override
+  public void setCompression(CompressionContext compression) {
+    super.setCompression(compression);
+    this.compression = compression;
+  }
+
+  @Override
+  public Decoder getDecoder(InputStream is) {
+    // compression isn't enabled
+    if (this.compression == null) {
+      return new IndexKeyValueDecoder(is);
+    }
+
+    // there is compression, so we get the standard decoder to handle reading those kvs
+    Decoder decoder = super.getDecoder(is);
+    // compression is on, reqturn our custom decoder
+    return new CompressedIndexKeyValueDecoder(is, decoder);
+  }
+
+  @Override
+  public Encoder getEncoder(OutputStream os) {
+    // compression isn't on, do the default thing
+    if (this.compression == null) {
+      return new IndexKeyValueEncoder(os);
+    }
+
+    // compression is on, return our one that will handle putting in the correct markers
+    Encoder encoder = super.getEncoder(os);
+    return new CompressedIndexKeyValueEncoder(os, encoder);
+  }
+
+  /**
+   * Custom {@link Decoder} that can handle a stream of regular and indexed {@link KeyValue}s.
+   */
+  public class IndexKeyValueDecoder extends BaseDecoder {
+
+    /**
+     * Create a {@link Decoder} on the given input stream with the given {@link Decoder} to parse
+     * generic {@link KeyValue}s.
+     * @param is stream to read from
+     */
+    public IndexKeyValueDecoder(InputStream is){
+      super(is);
+    }
+
+    @Override
+    protected KeyValue parseCell() throws IOException{
+      return KeyValueCodec.readKeyValue((DataInput) this.in);
+    }
+  }
+
+  public class CompressedIndexKeyValueDecoder extends BaseDecoder {
+
+    private Decoder decoder;
+
+    /**
+     * Create a {@link Decoder} on the given input stream with the given {@link Decoder} to parse
+     * generic {@link KeyValue}s.
+     * @param is stream to read from
+     * @param compressedDecoder decoder for generic {@link KeyValue}s. Should support the expected
+     *          compression.
+     */
+    public CompressedIndexKeyValueDecoder(InputStream is, Decoder compressedDecoder) {
+      super(is);
+      this.decoder = compressedDecoder;
+    }
+
+    @Override
+    protected KeyValue parseCell() throws IOException {
+      // reader the marker
+      int marker = this.in.read();
+      if (marker < 0) {
+        throw new EOFException(
+            "Unexepcted end of stream found while reading next (Indexed) KeyValue");
+      }
+
+      // do the normal thing, if its a regular kv
+      if (marker == REGULAR_KEY_VALUE_MARKER) {
+        if (!this.decoder.advance()) {
+          throw new IOException("Could not read next key-value from generic KeyValue Decoder!");
+        }
+        return this.decoder.current();
+      }
+
+      // its an indexedKeyValue, so parse it out specially
+      return KeyValueCodec.readKeyValue((DataInput) this.in);
+    }
+  }
+
+  /**
+   * Encode {@link IndexedKeyValue}s via the {@link KeyValueCodec}. Does <b>not</b> support
+   * compression.
+   */
+  private static class IndexKeyValueEncoder extends BaseEncoder {
+    public IndexKeyValueEncoder(OutputStream os) {
+      super(os);
+    }
+
+    @Override
+    public void flush() throws IOException {
+      super.flush();
+    }
+
+    @Override
+    public void write(KeyValue cell) throws IOException {
+      // make sure we are open
+      checkFlushed();
+
+      // use the standard encoding mechanism
+      KeyValueCodec.write((DataOutput) this.out, cell);
+    }
+  }
+
+  /**
+   * Write {@link IndexedKeyValue}s along side compressed {@link KeyValue}s. This Encoder is
+   * <b>not</b> compatible with the {@link IndexKeyValueDecoder} - one cannot intermingle compressed
+   * and uncompressed WALs that contain index entries.
+   */
+  private static class CompressedIndexKeyValueEncoder extends BaseEncoder {
+    private Encoder compressedKvEncoder;
+
+    public CompressedIndexKeyValueEncoder(OutputStream os, Encoder compressedKvEncoder) {
+      super(os);
+      this.compressedKvEncoder = compressedKvEncoder;
+    }
+
+    @Override
+    public void flush() throws IOException {
+      this.compressedKvEncoder.flush();
+      super.flush();
+    }
+
+    @Override
+    public void write(KeyValue cell) throws IOException {
+      //make sure we are open
+      checkFlushed();
+      
+      //write the special marker so we can figure out which kind of kv is it
+      int marker = IndexedWALEditCodec.REGULAR_KEY_VALUE_MARKER;
+      if (cell instanceof IndexedKeyValue) {
+        marker = KeyValueCodec.INDEX_TYPE_LENGTH_MARKER;
+      }
+      out.write(marker);
+      
+      //then serialize based on the marker
+      if (marker == IndexedWALEditCodec.REGULAR_KEY_VALUE_MARKER) {
+        this.compressedKvEncoder.write(cell);
+      }
+      else{
+        KeyValueCodec.write((DataOutput) out, cell);
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java
new file mode 100644
index 0000000..77684c5
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java
@@ -0,0 +1,114 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.cache;
+
+import static org.apache.phoenix.query.QueryServices.MAX_MEMORY_PERC_ATTRIB;
+import static org.apache.phoenix.query.QueryServices.MAX_MEMORY_WAIT_MS_ATTRIB;
+import static org.apache.phoenix.query.QueryServices.MAX_TENANT_MEMORY_PERC_ATTRIB;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.memory.ChildMemoryManager;
+import org.apache.phoenix.memory.GlobalMemoryManager;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PTable;
+
+
+/**
+ * 
+ * Global root cache for the server. Each tenant is managed as a child tenant cache of this one. Queries
+ * not associated with a particular tenant use this as their tenant cache.
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class GlobalCache extends TenantCacheImpl {
+    private static GlobalCache INSTANCE; 
+    
+    private final Configuration config;
+    // TODO: Use Guava cache with auto removal after lack of access 
+    private final ConcurrentMap<ImmutableBytesWritable,TenantCache> perTenantCacheMap = new ConcurrentHashMap<ImmutableBytesWritable,TenantCache>();
+    // Cache for lastest PTable for a given Phoenix table
+    private final ConcurrentHashMap<ImmutableBytesPtr,PTable> metaDataCacheMap = new ConcurrentHashMap<ImmutableBytesPtr,PTable>();
+    
+    public static synchronized GlobalCache getInstance(RegionCoprocessorEnvironment env) {
+        // See http://www.cs.umd.edu/~pugh/java/memoryModel/DoubleCheckedLocking.html
+        // for explanation of why double locking doesn't work. 
+        if (INSTANCE == null) {
+            INSTANCE = new GlobalCache(env.getConfiguration());
+        }
+        return INSTANCE;
+    }
+    
+    public ConcurrentHashMap<ImmutableBytesPtr,PTable> getMetaDataCache() {
+        return metaDataCacheMap;
+    }
+    
+    /**
+     * Get the tenant cache associated with the tenantId. If tenantId is not applicable, null may be
+     * used in which case a global tenant cache is returned.
+     * @param env the HBase configuration
+     * @param tenantId the tenant ID or null if not applicable.
+     * @return TenantCache
+     */
+    public static TenantCache getTenantCache(RegionCoprocessorEnvironment env, ImmutableBytesWritable tenantId) {
+        GlobalCache globalCache = GlobalCache.getInstance(env);
+        TenantCache tenantCache = tenantId == null ? globalCache : globalCache.getChildTenantCache(tenantId);      
+        return tenantCache;
+    }
+    
+    private GlobalCache(Configuration config) {
+        super(new GlobalMemoryManager(Runtime.getRuntime().totalMemory() * 
+                                          config.getInt(MAX_MEMORY_PERC_ATTRIB, QueryServicesOptions.DEFAULT_MAX_MEMORY_PERC) / 100,
+                                      config.getInt(MAX_MEMORY_WAIT_MS_ATTRIB, QueryServicesOptions.DEFAULT_MAX_MEMORY_WAIT_MS)),
+              config.getInt(QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS, QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS));
+        this.config = config;
+    }
+    
+    public Configuration getConfig() {
+        return config;
+    }
+    
+    /**
+     * Retrieve the tenant cache given an tenantId.
+     * @param tenantId the ID that identifies the tenant
+     * @return the existing or newly created TenantCache
+     */
+    public TenantCache getChildTenantCache(ImmutableBytesWritable tenantId) {
+        TenantCache tenantCache = perTenantCacheMap.get(tenantId);
+        if (tenantCache == null) {
+            int maxTenantMemoryPerc = config.getInt(MAX_TENANT_MEMORY_PERC_ATTRIB, QueryServicesOptions.DEFAULT_MAX_TENANT_MEMORY_PERC);
+            int maxServerCacheTimeToLive = config.getInt(QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS, QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS);
+            TenantCacheImpl newTenantCache = new TenantCacheImpl(new ChildMemoryManager(getMemoryManager(), maxTenantMemoryPerc), maxServerCacheTimeToLive);
+            tenantCache = perTenantCacheMap.putIfAbsent(tenantId, newTenantCache);
+            if (tenantCache == null) {
+                tenantCache = newTenantCache;
+            }
+        }
+        return tenantCache;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/cache/HashCache.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/HashCache.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/HashCache.java
new file mode 100644
index 0000000..4260a50
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/HashCache.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.cache;
+
+import java.io.Closeable;
+import java.util.List;
+
+import org.apache.http.annotation.Immutable;
+
+import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+
+/**
+ * Encapsulate deserialized hash cache from bytes into Map.
+ * The Map uses the row key as the key and the row as the value.
+ * @author jtaylor
+ * @since 0.1
+ */
+@Immutable
+public interface HashCache extends Closeable {
+    public List<Tuple> get(ImmutableBytesPtr hashKey);
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java
new file mode 100644
index 0000000..fac78de
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java
@@ -0,0 +1,10 @@
+package org.apache.phoenix.cache;
+
+import java.io.Closeable;
+import java.util.List;
+
+import org.apache.phoenix.index.IndexMaintainer;
+
+public interface IndexMetaDataCache extends Closeable {
+    public List<IndexMaintainer> getIndexMaintainers();
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
new file mode 100644
index 0000000..8703c7c
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
@@ -0,0 +1,278 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.cache;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.phoenix.compile.ScanRanges;
+import org.apache.phoenix.coprocessor.ServerCachingProtocol;
+import org.apache.phoenix.coprocessor.ServerCachingProtocol.ServerCacheFactory;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.job.JobManager.JobCallable;
+import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.util.Closeables;
+import org.apache.phoenix.util.SQLCloseable;
+import org.apache.phoenix.util.SQLCloseables;
+
+/**
+ * 
+ * Client for sending cache to each region server
+ * 
+ * @author jtaylor
+ * @since 0.1
+ */
+public class ServerCacheClient {
+    public static final int UUID_LENGTH = Bytes.SIZEOF_LONG;
+    private static final Log LOG = LogFactory.getLog(ServerCacheClient.class);
+    private static final Random RANDOM = new Random();
+    private final PhoenixConnection connection;
+    private final Map<Integer, TableRef> cacheUsingTableRefMap = new HashMap<Integer, TableRef>();
+
+    /**
+     * Construct client used to create a serialized cached snapshot of a table and send it to each region server
+     * for caching during hash join processing.
+     * @param connection the client connection
+     * 
+     * TODO: instead of minMaxKeyRange, have an interface for iterating through ranges as we may be sending to
+     * servers when we don't have to if the min is in first region and max is in last region, especially for point queries.
+     */
+    public ServerCacheClient(PhoenixConnection connection) {
+        this.connection = connection;
+    }
+
+    public PhoenixConnection getConnection() {
+        return connection;
+    }
+    
+    /**
+     * Client-side representation of a server cache.  Call {@link #close()} when usage
+     * is complete to free cache up on region server
+     *
+     * @author jtaylor
+     * @since 0.1
+     */
+    public class ServerCache implements SQLCloseable {
+        private final int size;
+        private final byte[] id;
+        private final ImmutableSet<HRegionLocation> servers;
+        
+        public ServerCache(byte[] id, Set<HRegionLocation> servers, int size) {
+            this.id = id;
+            this.servers = ImmutableSet.copyOf(servers);
+            this.size = size;
+        }
+
+        /**
+         * Gets the size in bytes of hash cache
+         */
+        public int getSize() {
+            return size;
+        }
+
+        /**
+         * Gets the unique identifier for this hash cache
+         */
+        public byte[] getId() {
+            return id;
+        }
+
+        /**
+         * Call to free up cache on region servers when no longer needed
+         */
+        @Override
+        public void close() throws SQLException {
+            removeServerCache(id, servers);
+        }
+
+    }
+    
+    public ServerCache addServerCache(ScanRanges keyRanges, final ImmutableBytesWritable cachePtr, final ServerCacheFactory cacheFactory, final TableRef cacheUsingTableRef) throws SQLException {
+        ConnectionQueryServices services = connection.getQueryServices();
+        MemoryChunk chunk = services.getMemoryManager().allocate(cachePtr.getLength());
+        List<Closeable> closeables = new ArrayList<Closeable>();
+        closeables.add(chunk);
+        ServerCache hashCacheSpec = null;
+        SQLException firstException = null;
+        final byte[] cacheId = generateId();
+        /**
+         * Execute EndPoint in parallel on each server to send compressed hash cache 
+         */
+        // TODO: generalize and package as a per region server EndPoint caller
+        // (ideally this would be functionality provided by the coprocessor framework)
+        boolean success = false;
+        ExecutorService executor = services.getExecutor();
+        List<Future<Boolean>> futures = Collections.emptyList();
+        try {
+            List<HRegionLocation> locations = services.getAllTableRegions(cacheUsingTableRef.getTable().getPhysicalName().getBytes());
+            int nRegions = locations.size();
+            // Size these based on worst case
+            futures = new ArrayList<Future<Boolean>>(nRegions);
+            Set<HRegionLocation> servers = new HashSet<HRegionLocation>(nRegions);
+            for (HRegionLocation entry : locations) {
+                // Keep track of servers we've sent to and only send once
+                if ( ! servers.contains(entry) && 
+                        keyRanges.intersect(entry.getRegionInfo().getStartKey(), entry.getRegionInfo().getEndKey())) {  // Call RPC once per server
+                    servers.add(entry);
+                    if (LOG.isDebugEnabled()) {LOG.debug("Adding cache entry to be sent for " + entry);}
+                    final byte[] key = entry.getRegionInfo().getStartKey();
+                    final HTableInterface htable = services.getTable(cacheUsingTableRef.getTable().getPhysicalName().getBytes());
+                    closeables.add(htable);
+                    futures.add(executor.submit(new JobCallable<Boolean>() {
+                        
+                        @Override
+                        public Boolean call() throws Exception {
+                            ServerCachingProtocol protocol = htable.coprocessorProxy(ServerCachingProtocol.class, key);
+                            return protocol.addServerCache(connection.getTenantId() == null ? null : connection.getTenantId().getBytes(), cacheId, cachePtr, cacheFactory);
+                        }
+
+                        /**
+                         * Defines the grouping for round robin behavior.  All threads spawned to process
+                         * this scan will be grouped together and time sliced with other simultaneously
+                         * executing parallel scans.
+                         */
+                        @Override
+                        public Object getJobId() {
+                            return ServerCacheClient.this;
+                        }
+                    }));
+                } else {
+                    if (LOG.isDebugEnabled()) {LOG.debug("NOT adding cache entry to be sent for " + entry + " since one already exists for that entry");}
+                }
+            }
+            
+            hashCacheSpec = new ServerCache(cacheId,servers,cachePtr.getLength());
+            // Execute in parallel
+            int timeoutMs = services.getProps().getInt(QueryServices.THREAD_TIMEOUT_MS_ATTRIB, QueryServicesOptions.DEFAULT_THREAD_TIMEOUT_MS);
+            for (Future<Boolean> future : futures) {
+                future.get(timeoutMs, TimeUnit.MILLISECONDS);
+            }
+            
+            cacheUsingTableRefMap.put(Bytes.mapKey(cacheId), cacheUsingTableRef);
+            success = true;
+        } catch (SQLException e) {
+            firstException = e;
+        } catch (Exception e) {
+            firstException = new SQLException(e);
+        } finally {
+            try {
+                if (!success) {
+                    SQLCloseables.closeAllQuietly(Collections.singletonList(hashCacheSpec));
+                    for (Future<Boolean> future : futures) {
+                        future.cancel(true);
+                    }
+                }
+            } finally {
+                try {
+                    Closeables.closeAll(closeables);
+                } catch (IOException e) {
+                    if (firstException == null) {
+                        firstException = new SQLException(e);
+                    }
+                } finally {
+                    if (firstException != null) {
+                        throw firstException;
+                    }
+                }
+            }
+        }
+        return hashCacheSpec;
+    }
+    
+    /**
+     * Remove the cached table from all region servers
+     * @param cacheId unique identifier for the hash join (returned from {@link #addHashCache(HTable, Scan, Set)})
+     * @param servers list of servers upon which table was cached (filled in by {@link #addHashCache(HTable, Scan, Set)})
+     * @throws SQLException
+     * @throws IllegalStateException if hashed table cannot be removed on any region server on which it was added
+     */
+    private void removeServerCache(byte[] cacheId, Set<HRegionLocation> servers) throws SQLException {
+        ConnectionQueryServices services = connection.getQueryServices();
+        Throwable lastThrowable = null;
+        TableRef cacheUsingTableRef = cacheUsingTableRefMap.get(Bytes.mapKey(cacheId));
+        byte[] tableName = cacheUsingTableRef.getTable().getPhysicalName().getBytes();
+        HTableInterface iterateOverTable = services.getTable(tableName);
+        List<HRegionLocation> locations = services.getAllTableRegions(tableName);
+        Set<HRegionLocation> remainingOnServers = new HashSet<HRegionLocation>(servers);
+        /**
+         * Allow for the possibility that the region we based where to send our cache has split and been
+         * relocated to another region server *after* we sent it, but before we removed it. To accommodate
+         * this, we iterate through the current metadata boundaries and remove the cache once for each
+         * server that we originally sent to.
+         */
+        for (HRegionLocation entry : locations) {
+            if (remainingOnServers.contains(entry)) {  // Call once per server
+                try {
+                    byte[] key = entry.getRegionInfo().getStartKey();
+                    ServerCachingProtocol protocol = iterateOverTable.coprocessorProxy(ServerCachingProtocol.class, key);
+                    protocol.removeServerCache(connection.getTenantId() == null ? null : connection.getTenantId().getBytes(), cacheId);
+                    remainingOnServers.remove(entry);
+                } catch (Throwable t) {
+                    lastThrowable = t;
+                    LOG.error("Error trying to remove hash cache for " + entry, t);
+                }
+            }
+        }
+        if (!remainingOnServers.isEmpty()) {
+            LOG.warn("Unable to remove hash cache for " + remainingOnServers, lastThrowable);
+        }
+    }
+
+    /**
+     * Create an ID to keep the cached information across other operations independent.
+     * Using simple long random number, since the length of time we need this to be unique
+     * is very limited. 
+     */
+    public static byte[] generateId() {
+        long rand = RANDOM.nextLong();
+        return Bytes.toBytes(rand);
+    }
+    
+    public static String idToString(byte[] uuid) {
+        assert(uuid.length == Bytes.SIZEOF_LONG);
+        return Long.toString(Bytes.toLong(uuid));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java
new file mode 100644
index 0000000..ad40d8c
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.cache;
+
+import java.io.Closeable;
+import java.sql.SQLException;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.coprocessor.ServerCachingProtocol.ServerCacheFactory;
+import org.apache.phoenix.memory.MemoryManager;
+
+
+/**
+ * 
+ * Inteface to set and set cached values for a tenant
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public interface TenantCache {
+    MemoryManager getMemoryManager();
+    Closeable getServerCache(ImmutableBytesPtr cacheId);
+    Closeable addServerCache(ImmutableBytesPtr cacheId, ImmutableBytesWritable cachePtr, ServerCacheFactory cacheFactory) throws SQLException;
+    void removeServerCache(ImmutableBytesPtr cacheId) throws SQLException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java
new file mode 100644
index 0000000..eabeb11
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.cache;
+
+import java.io.Closeable;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import com.google.common.cache.*;
+import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.coprocessor.ServerCachingProtocol.ServerCacheFactory;
+import org.apache.phoenix.memory.MemoryManager;
+import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
+import org.apache.phoenix.util.Closeables;
+
+/**
+ * 
+ * Cache per tenant on server side.  Tracks memory usage for each
+ * tenat as well and rolling up usage to global memory manager.
+ * 
+ * @author jtaylor
+ * @since 0.1
+ */
+public class TenantCacheImpl implements TenantCache {
+    private final int maxTimeToLiveMs;
+    private final MemoryManager memoryManager;
+    private volatile Cache<ImmutableBytesPtr, Closeable> serverCaches;
+
+    public TenantCacheImpl(MemoryManager memoryManager, int maxTimeToLiveMs) {
+        this.memoryManager = memoryManager;
+        this.maxTimeToLiveMs = maxTimeToLiveMs;
+    }
+    
+    @Override
+    public MemoryManager getMemoryManager() {
+        return memoryManager;
+    }
+
+    private Cache<ImmutableBytesPtr,Closeable> getServerCaches() {
+        /* Delay creation of this map until it's needed */
+        if (serverCaches == null) {
+            synchronized(this) {
+                if (serverCaches == null) {
+                    serverCaches = CacheBuilder.newBuilder()
+                        .expireAfterAccess(maxTimeToLiveMs, TimeUnit.MILLISECONDS)
+                        .removalListener(new RemovalListener<ImmutableBytesPtr, Closeable>(){
+                            @Override
+                            public void onRemoval(RemovalNotification<ImmutableBytesPtr, Closeable> notification) {
+                                Closeables.closeAllQuietly(Collections.singletonList(notification.getValue()));
+                            }
+                        })
+                        .build();
+                }
+            }
+        }
+        return serverCaches;
+    }
+    
+    @Override
+    public Closeable getServerCache(ImmutableBytesPtr cacheId) {
+        return getServerCaches().getIfPresent(cacheId);
+    }
+    
+    @Override
+    public Closeable addServerCache(ImmutableBytesPtr cacheId, ImmutableBytesWritable cachePtr, ServerCacheFactory cacheFactory) throws SQLException {
+        MemoryChunk chunk = this.getMemoryManager().allocate(cachePtr.getLength());
+        Closeable element = cacheFactory.newCache(cachePtr, chunk);
+        getServerCaches().put(cacheId, element);
+        return element;
+    }
+    
+    @Override
+    public void removeServerCache(ImmutableBytesPtr cacheId) throws SQLException {
+        getServerCaches().invalidate(cacheId);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillFile.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillFile.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillFile.java
new file mode 100644
index 0000000..f2ad908
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillFile.java
@@ -0,0 +1,142 @@
+package org.apache.phoenix.cache.aggcache;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileChannel.MapMode;
+import java.util.Map;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Maps;
+import com.google.common.io.Closeables;
+
+/**
+ * This class abstracts a SpillFile It is a accessible on a per page basis
+ * For every SpillFile object a single spill file is always created. 
+ * Additional overflow files are dynamically created in case the page index requested is not covered by
+ * the spillFiles allocated so far
+ */
+public class SpillFile implements Closeable {
+
+    private static final Logger logger = LoggerFactory.getLogger(SpillFile.class);
+    // Default size for a single spillFile 2GB
+    private static final int SPILL_FILE_SIZE = Integer.MAX_VALUE;
+    // Page size for a spill file 4K
+    static final int DEFAULT_PAGE_SIZE = 4096;
+    // Map of initial SpillFile at index 0, and overflow spillFiles
+    private Map<Integer, TempFile> tempFiles;
+    
+    // Wrapper class for a TempFile: File + RandomAccessFile
+    private static class TempFile implements Closeable{
+    	private RandomAccessFile rndFile;
+    	private File file;
+    	
+    	public TempFile(File file, RandomAccessFile rndFile) {
+    		this.file = file;
+    		this.rndFile = rndFile;
+    	}    	
+    	    	
+    	public FileChannel getChannel() {
+    		return rndFile.getChannel();
+    	}
+
+		@Override
+		public void close() throws IOException {
+			Closeables.closeQuietly(rndFile.getChannel());
+			Closeables.closeQuietly(rndFile);
+			
+			if (file != null) {
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Deleting tempFile: " + file.getAbsolutePath());
+                }
+                try {
+                    file.delete();
+                } catch (SecurityException e) {
+                    logger.warn("IOException thrown while closing Closeable." + e);
+            	}
+            }
+		}
+    }
+    
+    /**
+     * Create a new SpillFile using the Java TempFile creation function. SpillFile is access in
+     * pages.
+     */
+    public static SpillFile createSpillFile() {
+    	try {    		
+    		return new SpillFile(createTempFile());    		
+    	} catch (IOException ioe) {
+        	throw new RuntimeException("Could not create Spillfile " + ioe);
+        }
+    }
+    
+    
+    private static TempFile createTempFile() throws IOException {
+        File tempFile = File.createTempFile(UUID.randomUUID().toString(), null);
+        if (logger.isDebugEnabled()) {
+            logger.debug("Creating new SpillFile: " + tempFile.getAbsolutePath());
+        }
+        RandomAccessFile file = new RandomAccessFile(tempFile, "rw");
+        file.setLength(SPILL_FILE_SIZE);
+        
+        return new TempFile(tempFile, file);
+    }
+
+    
+    private SpillFile(TempFile spFile) throws IOException {
+        this.tempFiles = Maps.newHashMap();
+        // Init the first pre-allocated spillFile
+        tempFiles.put(0, spFile);
+    }
+
+    /**
+     * Random access to a page of the current spill file
+     * @param index
+     */
+    public MappedByteBuffer getPage(int index) {
+        try {
+        	TempFile tempFile = null;
+        	int fileIndex = 0;
+        	
+            long offset = (long) index * (long) DEFAULT_PAGE_SIZE;            
+            if(offset >= SPILL_FILE_SIZE) {
+            	// Offset exceeds the first SpillFile size
+            	// Get the index of the file that should contain the pageID
+            	fileIndex = (int)(offset / SPILL_FILE_SIZE);
+            	if(!tempFiles.containsKey(fileIndex)) {
+            		// Dynamically add new spillFiles if directory grows beyond 
+            		// max page ID.
+            		tempFile = createTempFile();
+            		tempFiles.put(fileIndex, tempFile);
+            	}
+            }
+        	tempFile = tempFiles.get(fileIndex);
+        	// Channel gets buffered in file object
+        	FileChannel fc = tempFile.getChannel();
+
+        	return fc.map(MapMode.READ_WRITE, offset, DEFAULT_PAGE_SIZE);
+        } catch (IOException ioe) {
+            // Close resource
+            close();
+            throw new RuntimeException("Could not get page at index: " + index);
+        } catch (IllegalArgumentException iae) {
+            // Close resource
+            close();
+            throw iae;
+        }
+    }
+
+    @Override
+    public void close() {
+    	for(TempFile file : tempFiles.values()) {
+            // Swallow IOExceptions
+            Closeables.closeQuietly(file);
+    	}
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillManager.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillManager.java
new file mode 100644
index 0000000..448ffa5
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillManager.java
@@ -0,0 +1,323 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.cache.aggcache;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.WritableUtils;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.io.Closeables;
+import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.expression.aggregator.Aggregator;
+import org.apache.phoenix.expression.aggregator.ServerAggregators;
+import org.apache.phoenix.expression.function.SingleAggregateFunction;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.KeyValueSchema;
+import org.apache.phoenix.schema.ValueBitSet;
+import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.TupleUtil;
+
+/**
+ * Class servers as an adapter between the in-memory LRU cache and the Spill data structures. It
+ * takes care of serializing / deserializing the key/value groupby tuples, and spilling them to the
+ * correct spill partition
+ */
+public class SpillManager implements Closeable {
+
+    // Wrapper class for DESERIALIZED groupby key/value tuples
+    public static class CacheEntry<T extends ImmutableBytesWritable> implements
+            Map.Entry<T, Aggregator[]> {
+
+        protected T key;
+        protected Aggregator[] aggs;
+
+        public CacheEntry(T key, Aggregator[] aggs) {
+            this.key = key;
+            this.aggs = aggs;
+        }
+
+        public Aggregator[] getValue(Configuration conf) {
+            return aggs;
+        }
+
+        public int getKeyLength() {
+            return key.getLength();
+        }
+
+        @Override
+        public Aggregator[] getValue() {
+            return aggs;
+        }
+
+        @Override
+        public Aggregator[] setValue(Aggregator[] arg0) {
+            this.aggs = arg0;
+            return aggs;
+        }
+
+        @Override
+        public T getKey() {
+            return key;
+        }
+
+    }
+
+    private final ArrayList<SpillMap> spillMaps;
+    private final int numSpillFiles;
+
+    private final ServerAggregators aggregators;
+    private final Configuration conf;
+
+    /**
+     * SpillManager takes care of spilling and loading tuples from spilled data structs
+     * @param numSpillFiles
+     * @param serverAggregators
+     */
+    public SpillManager(int numSpillFiles, ServerAggregators serverAggregators,
+            Configuration conf, SpillableGroupByCache.QueryCache cache) {
+        try {
+            int estValueSize = serverAggregators.getEstimatedByteSize();
+            spillMaps = Lists.newArrayList();
+            this.numSpillFiles = numSpillFiles;
+            this.aggregators = serverAggregators;
+            this.conf = conf;
+            
+            // Ensure that a single element fits onto a page!!!
+            Preconditions.checkArgument(SpillFile.DEFAULT_PAGE_SIZE > estValueSize);
+
+            // Create a list of spillFiles
+            // Each Spillfile only handles up to 2GB data
+            for (int i = 0; i < numSpillFiles; i++) {
+                SpillFile file = SpillFile.createSpillFile();
+                spillMaps.add(new SpillMap(file, SpillFile.DEFAULT_PAGE_SIZE, estValueSize, cache));
+            }
+        } catch (IOException ioe) {
+            throw new RuntimeException("Could not init the SpillManager");
+        }
+    }
+
+    // serialize a key/value tuple into a byte array
+    // WARNING: expensive
+    private byte[] serialize(ImmutableBytesPtr key, Aggregator[] aggs,
+            ServerAggregators serverAggs) throws IOException {
+
+        DataOutputStream output = null;
+        ByteArrayOutputStream bai = null;
+        try {
+            bai = new ByteArrayOutputStream();
+            output = new DataOutputStream(bai);
+            // key length
+            WritableUtils.writeVInt(output, key.getLength());
+            // key
+            output.write(key.get(), key.getOffset(), key.getLength());
+            byte[] aggsByte = serverAggs.toBytes(aggs);
+            // aggs length
+            WritableUtils.writeVInt(output, aggsByte.length);
+            // aggs
+            output.write(aggsByte);
+            return bai.toByteArray();
+        } finally {
+
+            if (bai != null) {
+                bai.close();
+                bai = null;
+            }
+            if (output != null) {
+                output.close();
+                output = null;
+            }
+        }
+    }
+
+    /**
+     * Helper method to deserialize the key part from a serialized byte array
+     * @param data
+     * @return
+     * @throws IOException
+     */
+    static ImmutableBytesPtr getKey(byte[] data) throws IOException {
+        DataInputStream input = null;
+        try {
+            input = new DataInputStream(new ByteArrayInputStream(data));
+            // key length
+            int keyLength = WritableUtils.readVInt(input);
+            int offset = WritableUtils.getVIntSize(keyLength);
+            // key
+            return new ImmutableBytesPtr(data, offset, keyLength);
+        } finally {
+            if (input != null) {
+                input.close();
+                input = null;
+            }
+        }
+    }
+
+    
+    // Instantiate Aggregators form a serialized byte array
+    private Aggregator[] getAggregators(byte[] data) throws IOException {
+        DataInputStream input = null;
+        try {
+            input = new DataInputStream(new ByteArrayInputStream(data));
+            // key length
+            int keyLength = WritableUtils.readVInt(input);
+            int vIntKeyLength = WritableUtils.getVIntSize(keyLength);
+            ImmutableBytesPtr ptr = new ImmutableBytesPtr(data, vIntKeyLength, keyLength);
+
+            // value length
+            input.skip(keyLength);
+            int valueLength = WritableUtils.readVInt(input);
+            int vIntValLength = WritableUtils.getVIntSize(keyLength);
+            KeyValue keyValue =
+                    KeyValueUtil.newKeyValue(ptr.get(), ptr.getOffset(), ptr.getLength(),
+                        QueryConstants.SINGLE_COLUMN_FAMILY, QueryConstants.SINGLE_COLUMN,
+                        QueryConstants.AGG_TIMESTAMP, data, vIntKeyLength + keyLength + vIntValLength, valueLength);
+            Tuple result = new SingleKeyValueTuple(keyValue);
+            TupleUtil.getAggregateValue(result, ptr);
+            KeyValueSchema schema = aggregators.getValueSchema();
+            ValueBitSet tempValueSet = ValueBitSet.newInstance(schema);
+            tempValueSet.clear();
+            tempValueSet.or(ptr);
+
+            int i = 0, maxOffset = ptr.getOffset() + ptr.getLength();
+            SingleAggregateFunction[] funcArray = aggregators.getFunctions();
+            Aggregator[] sAggs = new Aggregator[funcArray.length];
+            Boolean hasValue;
+            schema.iterator(ptr);
+            while ((hasValue = schema.next(ptr, i, maxOffset, tempValueSet)) != null) {
+                SingleAggregateFunction func = funcArray[i];
+                sAggs[i++] =
+                        hasValue ? func.newServerAggregator(conf, ptr) : func
+                                .newServerAggregator(conf);
+            }
+            return sAggs;
+
+        } finally {
+            Closeables.closeQuietly(input);
+        }
+    }
+
+    /**
+     * Helper function to deserialize a byte array into a CacheEntry
+     * @param <K>
+     * @param bytes
+     * @throws IOException
+     */
+    @SuppressWarnings("unchecked")
+    public <K extends ImmutableBytesWritable> CacheEntry<K> toCacheEntry(byte[] bytes)
+            throws IOException {
+        ImmutableBytesPtr key = SpillManager.getKey(bytes);
+        Aggregator[] aggs = getAggregators(bytes);
+
+        return new CacheEntry<K>((K) key, aggs);
+    }
+
+    // Determines the partition, i.e. spillFile the tuple should get spilled to.
+    private int getPartition(ImmutableBytesWritable key) {
+        // Simple implementation hash mod numFiles
+        return Math.abs(key.hashCode()) % numSpillFiles;
+    }
+
+    /**
+     * Function that spills a key/value groupby tuple into a partition Spilling always triggers a
+     * serialize call
+     * @param key
+     * @param value
+     * @throws IOException
+     */
+    public void spill(ImmutableBytesWritable key, Aggregator[] value) throws IOException {
+        SpillMap spillMap = spillMaps.get(getPartition(key));
+        ImmutableBytesPtr keyPtr = new ImmutableBytesPtr(key);
+        byte[] data = serialize(keyPtr, value, aggregators);
+        spillMap.put(keyPtr, data);
+    }
+
+    /**
+     * Function that loads a spilled key/value groupby tuple from one of the spill partitions into
+     * the LRU cache. Loading always involves deserialization
+     * @throws IOException
+     */
+    public Aggregator[] loadEntry(ImmutableBytesWritable key) throws IOException {
+        SpillMap spillMap = spillMaps.get(getPartition(key));
+        byte[] data = spillMap.get(key);
+        if (data != null) {
+            return getAggregators(data);
+        }
+        return null;
+    }
+
+    /**
+     * Close the attached spillMap
+     */
+    @Override
+    public void close() {
+        for (int i = 0; i < spillMaps.size(); i++) {
+            Closeables.closeQuietly(spillMaps.get(i).getSpillFile());
+        }
+    }
+
+    /**
+     * Function returns an iterator over all spilled Tuples
+     */
+    public SpillMapIterator newDataIterator() {
+        return new SpillMapIterator();
+    }
+
+    private final class SpillMapIterator implements Iterator<byte[]> {
+
+        int index = 0;
+        Iterator<byte[]> spillIter = spillMaps.get(index).iterator();
+
+        @Override
+        public boolean hasNext() {
+            if (!spillIter.hasNext()) {
+                if (index < (numSpillFiles - 1)) {
+                    // Current spillFile exhausted get iterator over new one
+                    spillIter = spillMaps.get(++index).iterator();
+                }
+            }
+            return spillIter.hasNext();
+        }
+
+        @Override
+        public byte[] next() {
+            return spillIter.next();
+        }
+
+        @Override
+        public void remove() {
+            throw new IllegalAccessError("Remove is not supported for this type of iterator");
+        }
+    }
+}