You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by mu...@apache.org on 2014/02/15 01:07:41 UTC

[08/15] Rename package from org.apache.hadoop.hbase.index.* to org.apache.phoenix.index.* to fix classloader issue causing mutable index performance regression - https://issues.apache.org/jira/browse/PHOENIX-38

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/filter/ApplyAndFilterDeletesFilter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/filter/ApplyAndFilterDeletesFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/filter/ApplyAndFilterDeletesFilter.java
new file mode 100644
index 0000000..dbf13aa
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/filter/ApplyAndFilterDeletesFilter.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.covered.filter;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.filter.FilterBase;
+
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+
+/**
+ * Only allow the 'latest' timestamp of each family:qualifier pair, ensuring that they aren't
+ * covered by a previous delete. This is similar to some of the work the ScanQueryMatcher does to
+ * ensure correct visibility of keys based on deletes.
+ * <p>
+ * No actual delete {@link KeyValue}s are allowed to pass through this filter - they are always
+ * skipped.
+ * <p>
+ * Note there is a little bit of conceptually odd behavior (though it matches the HBase
+ * specifications) around point deletes ({@link KeyValue} of type {@link Type#Delete}. These deletes
+ * only apply to a single {@link KeyValue} at a single point in time - they essentially completely
+ * 'cover' the existing {@link Put} at that timestamp. However, they don't 'cover' any other
+ * keyvalues at older timestamps. Therefore, if there is a point-delete at ts = 5, and puts at ts =
+ * 4, and ts = 5, we will only allow the put at ts = 4.
+ * <p>
+ * Expects {@link KeyValue}s to arrive in sorted order, with 'Delete' {@link Type} {@link KeyValue}s
+ * ({@link Type#DeleteColumn}, {@link Type#DeleteFamily}, {@link Type#Delete})) before their regular
+ * {@link Type#Put} counterparts.
+ */
+public class ApplyAndFilterDeletesFilter extends FilterBase {
+
+  private boolean done = false;
+  List<ImmutableBytesPtr> families;
+  private final DeleteTracker coveringDelete = new DeleteTracker();
+  private Hinter currentHint;
+  private DeleteColumnHinter columnHint = new DeleteColumnHinter();
+  private DeleteFamilyHinter familyHint = new DeleteFamilyHinter();
+  
+  /**
+   * Setup the filter to only include the given families. This allows us to seek intelligently pass
+   * families we don't care about.
+   * @param families
+   */
+  public ApplyAndFilterDeletesFilter(Set<ImmutableBytesPtr> families) {
+    this.families = new ArrayList<ImmutableBytesPtr>(families);
+    Collections.sort(this.families);
+  }
+      
+  
+  private ImmutableBytesPtr getNextFamily(ImmutableBytesPtr family) {
+    int index = Collections.binarySearch(families, family);
+    //doesn't match exactly, be we can find the right next match
+    //this is pretty unlikely, but just incase
+    if(index < 0){
+      //the actual location of the next match
+      index = -index -1;
+    }else{
+      //its an exact match for a family, so we get the next entry
+      index = index +1;
+    }
+    //now we have the location of the next entry
+    if(index >= families.size()){
+      return null;
+    }
+    return  families.get(index);
+  }
+  
+  @Override
+  public void reset(){
+    this.coveringDelete.reset();
+    this.done = false;
+  }
+  
+  
+  @Override
+  public KeyValue getNextKeyHint(KeyValue peeked){
+    return currentHint.getHint(peeked);
+  }
+
+  @Override
+  public ReturnCode filterKeyValue(KeyValue next) {
+    // we marked ourselves done, but the END_ROW_KEY didn't manage to seek to the very last key
+    if (this.done) {
+      return ReturnCode.SKIP;
+    }
+
+    switch (KeyValue.Type.codeToType(next.getType())) {
+    /*
+     * DeleteFamily will always sort first because those KVs (we assume) don't have qualifiers (or
+     * rather are null). Therefore, we have to keep a hold of all the delete families until we get
+     * to a Put entry that is covered by that delete (in which case, we are done with the family).
+     */
+    case DeleteFamily:
+      // track the family to delete. If we are updating the delete, that means we have passed all
+      // kvs in the last column, so we can safely ignore the last deleteFamily, and just use this
+      // one. In fact, it means that all the previous deletes can be ignored because the family must
+      // not match anymore.
+      this.coveringDelete.reset();
+      this.coveringDelete.deleteFamily = next;
+      return ReturnCode.SKIP;
+    case DeleteColumn:
+      // similar to deleteFamily, all the newer deletes/puts would have been seen at this point, so
+      // we can safely replace the more recent delete column with the more recent one
+      this.coveringDelete.pointDelete = null;
+      this.coveringDelete.deleteColumn = next;
+      return ReturnCode.SKIP;
+    case Delete:
+      // we are just deleting the single column value at this point.
+      // therefore we just skip this entry and go onto the next one. The only caveat is that
+      // we should still cover the next entry if this delete applies to the next entry, so we
+      // have to keep around a reference to the KV to compare against the next valid entry
+      this.coveringDelete.pointDelete = next;
+      return ReturnCode.SKIP;
+    default:
+      // no covering deletes
+      if (coveringDelete.empty()) {
+        return ReturnCode.INCLUDE;
+      }
+
+      if (coveringDelete.matchesFamily(next)) {
+        this.currentHint = familyHint;
+        return ReturnCode.SEEK_NEXT_USING_HINT;
+      }
+
+      if (coveringDelete.matchesColumn(next)) {
+        // hint to the next column
+        this.currentHint = columnHint;
+        return ReturnCode.SEEK_NEXT_USING_HINT;
+      }
+
+      if (coveringDelete.matchesPoint(next)) {
+        return ReturnCode.SKIP;
+      }
+
+    }
+
+    // none of the deletes matches, we are done
+    return ReturnCode.INCLUDE;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    throw new UnsupportedOperationException("Server-side only filter, cannot be serialized!");
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    throw new UnsupportedOperationException("Server-side only filter, cannot be deserialized!");
+  }
+
+  /**
+   * Get the next hint for a given peeked keyvalue
+   */
+  interface Hinter {
+    public abstract KeyValue getHint(KeyValue peek);
+  }
+
+  /**
+   * Entire family has been deleted, so either seek to the next family, or if none are present in
+   * the original set of families to include, seek to the "last possible key"(or rather our best
+   * guess) and be done.
+   */
+  class DeleteFamilyHinter implements Hinter {
+
+    @Override
+    public KeyValue getHint(KeyValue peeked) {
+      // check to see if we have another column to seek
+      ImmutableBytesPtr nextFamily =
+          getNextFamily(new ImmutableBytesPtr(peeked.getBuffer(), peeked.getFamilyOffset(),
+              peeked.getFamilyLength()));
+      if (nextFamily == null) {
+        // no known next family, so we can be completely done
+        done = true;
+        return KeyValue.LOWESTKEY;
+      }
+        // there is a valid family, so we should seek to that
+      return KeyValue.createFirstOnRow(peeked.getRow(), nextFamily.copyBytesIfNecessary(),
+        HConstants.EMPTY_BYTE_ARRAY);
+    }
+
+  }
+
+  /**
+   * Hint the next column-qualifier after the given keyvalue. We can't be smart like in the
+   * ScanQueryMatcher since we don't know the columns ahead of time.
+   */
+  class DeleteColumnHinter implements Hinter {
+
+    @Override
+    public KeyValue getHint(KeyValue kv) {
+      return KeyValue.createLastOnRow(kv.getBuffer(), kv.getRowOffset(), kv.getRowLength(),
+        kv.getBuffer(), kv.getFamilyOffset(), kv.getFamilyLength(), kv.getBuffer(),
+        kv.getQualifierOffset(), kv.getQualifierLength());
+    }
+  }
+
+  class DeleteTracker {
+
+    public KeyValue deleteFamily;
+    public KeyValue deleteColumn;
+    public KeyValue pointDelete;
+
+    public void reset() {
+      this.deleteFamily = null;
+      this.deleteColumn = null;
+      this.pointDelete = null;
+
+    }
+
+    /**
+     * Check to see if we should skip this {@link KeyValue} based on the family.
+     * <p>
+     * Internally, also resets the currently tracked "Delete Family" marker we are tracking if the
+     * keyvalue is into another family (since CFs sort lexicographically, we can discard the current
+     * marker since it must not be applicable to any more kvs in a linear scan).
+     * @param next
+     * @return <tt>true</tt> if this {@link KeyValue} matches a delete.
+     */
+    public boolean matchesFamily(KeyValue next) {
+      if (deleteFamily == null) {
+        return false;
+      }
+      if (deleteFamily.matchingFamily(next)) {
+        // falls within the timestamp range
+        if (deleteFamily.getTimestamp() >= next.getTimestamp()) {
+          return true;
+        }
+      } else {
+        // only can reset the delete family because we are on to another family
+        deleteFamily = null;
+      }
+
+      return false;
+    }
+
+
+    /**
+     * @param next
+     * @return
+     */
+    public boolean matchesColumn(KeyValue next) {
+      if (deleteColumn == null) {
+        return false;
+      }
+      if (deleteColumn.matchingFamily(next) && deleteColumn.matchingQualifier(next)) {
+        // falls within the timestamp range
+        if (deleteColumn.getTimestamp() >= next.getTimestamp()) {
+          return true;
+        }
+      } else {
+        deleteColumn = null;
+      }
+      return false;
+    }
+
+    /**
+     * @param next
+     * @return
+     */
+    public boolean matchesPoint(KeyValue next) {
+      // point deletes only apply to the exact KV that they reference, so we only need to ensure
+      // that the timestamp matches exactly. Because we sort by timestamp first, either the next
+      // keyvalue has the exact timestamp or is an older (smaller) timestamp, and we can allow that
+      // one.
+      if (pointDelete != null && pointDelete.matchingFamily(next)
+          && pointDelete.matchingQualifier(next)) {
+        if (pointDelete.getTimestamp() == next.getTimestamp()) {
+          return true;
+        }
+        // clear the point delete since the TS must not be matching
+        coveringDelete.pointDelete = null;
+      }
+      return false;
+    }
+
+    /**
+     * @return <tt>true</tt> if no delete has been set
+     */
+    public boolean empty() {
+      return deleteFamily == null && deleteColumn == null && pointDelete == null;
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/filter/ColumnTrackingNextLargestTimestampFilter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/filter/ColumnTrackingNextLargestTimestampFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/filter/ColumnTrackingNextLargestTimestampFilter.java
new file mode 100644
index 0000000..2ad04ff
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/filter/ColumnTrackingNextLargestTimestampFilter.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.covered.filter;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.filter.FilterBase;
+
+import org.apache.phoenix.hbase.index.covered.update.ColumnTracker;
+
+/**
+ * Similar to the {@link MaxTimestampFilter}, but also updates the 'next largest' timestamp seen
+ * that is not skipped by the below criteria. Note that it isn't as quick as the
+ * {@link MaxTimestampFilter} as we can't just seek ahead to a key with the matching timestamp, but
+ * have to iterate each kv until we find the right one with an allowed timestamp.
+ * <p>
+ * Inclusively filter on the maximum timestamp allowed. Excludes all elements greater than (but not
+ * equal to) the given timestamp, so given ts = 5, a {@link KeyValue} with ts 6 is excluded, but not
+ * one with ts = 5.
+ * <p>
+ * This filter generally doesn't make sense on its own - it should follow a per-column filter and
+ * possible a per-delete filter to only track the most recent (but not exposed to the user)
+ * timestamp.
+ */
+public class ColumnTrackingNextLargestTimestampFilter extends FilterBase {
+
+  private long ts;
+  private ColumnTracker column;
+
+  public ColumnTrackingNextLargestTimestampFilter(long maxTime, ColumnTracker toTrack) {
+    this.ts = maxTime;
+    this.column = toTrack;
+  }
+
+  @Override
+  public ReturnCode filterKeyValue(KeyValue v) {
+    long timestamp = v.getTimestamp();
+    if (timestamp > ts) {
+      this.column.setTs(timestamp);
+      return ReturnCode.SKIP;
+    }
+    return ReturnCode.INCLUDE;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    throw new UnsupportedOperationException("Server-side only filter, cannot be serialized!");
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    throw new UnsupportedOperationException("Server-side only filter, cannot be deserialized!");
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/filter/FamilyOnlyFilter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/filter/FamilyOnlyFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/filter/FamilyOnlyFilter.java
new file mode 100644
index 0000000..080bf92
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/filter/FamilyOnlyFilter.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.covered.filter;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.filter.BinaryComparator;
+import org.apache.hadoop.hbase.filter.FamilyFilter;
+import org.apache.hadoop.hbase.filter.WritableByteArrayComparable;
+
+/**
+ * Similar to the {@link FamilyFilter} but stops when the end of the family is reached and only
+ * supports equality
+ */
+public class FamilyOnlyFilter extends FamilyFilter {
+
+  boolean done = false;
+  private boolean previousMatchFound;
+
+  /**
+   * Filter on exact binary matches to the passed family
+   * @param family to compare against
+   */
+  public FamilyOnlyFilter(final byte[] family) {
+    this(new BinaryComparator(family));
+  }
+
+  public FamilyOnlyFilter(final WritableByteArrayComparable familyComparator) {
+    super(CompareOp.EQUAL, familyComparator);
+  }
+
+
+  @Override
+  public boolean filterAllRemaining() {
+    return done;
+  }
+
+  @Override
+  public void reset() {
+    done = false;
+    previousMatchFound = false;
+  }
+
+  @Override
+  public ReturnCode filterKeyValue(KeyValue v) {
+    if (done) {
+      return ReturnCode.SKIP;
+    }
+    ReturnCode code = super.filterKeyValue(v);
+    if (previousMatchFound) {
+      // we found a match before, and now we are skipping the key because of the family, therefore
+      // we are done (no more of the family).
+      if (code.equals(ReturnCode.SKIP)) {
+      done = true;
+      }
+    } else {
+      // if we haven't seen a match before, then it doesn't matter what we see now, except to mark
+      // if we've seen a match
+      if (code.equals(ReturnCode.INCLUDE)) {
+        previousMatchFound = true;
+      }
+    }
+    return code;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/filter/MaxTimestampFilter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/filter/MaxTimestampFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/filter/MaxTimestampFilter.java
new file mode 100644
index 0000000..e8e6347
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/filter/MaxTimestampFilter.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.covered.filter;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Inclusive filter on the maximum timestamp allowed. Excludes all elements greater than (but not
+ * equal to) the given timestamp, so given ts = 5, a {@link KeyValue} with ts 6 is excluded, but not
+ * one with ts = 5.
+ */
+public class MaxTimestampFilter extends FilterBase {
+
+  private long ts;
+
+  public MaxTimestampFilter(long maxTime) {
+    this.ts = maxTime;
+  }
+
+  @Override
+  public KeyValue getNextKeyHint(KeyValue currentKV) {
+    // this might be a little excessive right now - better safe than sorry though, so we don't mess
+    // with other filters too much.
+    KeyValue kv = currentKV.deepCopy();
+    int offset =kv.getTimestampOffset();
+    //set the timestamp in the buffer
+    byte[] buffer = kv.getBuffer();
+    byte[] ts = Bytes.toBytes(this.ts);
+    System.arraycopy(ts, 0, buffer, offset, ts.length);
+
+    return kv;
+  }
+
+  @Override
+  public ReturnCode filterKeyValue(KeyValue v) {
+    long timestamp = v.getTimestamp();
+    if (timestamp > ts) {
+      return ReturnCode.SEEK_NEXT_USING_HINT;
+    }
+    return ReturnCode.INCLUDE;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    throw new UnsupportedOperationException("Server-side only filter, cannot be serialized!");
+
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    throw new UnsupportedOperationException("Server-side only filter, cannot be deserialized!");
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/filter/NewerTimestampFilter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/filter/NewerTimestampFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/filter/NewerTimestampFilter.java
new file mode 100644
index 0000000..7209ee2
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/filter/NewerTimestampFilter.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.hbase.index.covered.filter;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.filter.FilterBase;
+
+/**
+ * Server-side only class used in the indexer to filter out keyvalues newer than a given timestamp
+ * (so allows anything <code><=</code> timestamp through).
+ * <p>
+ * Note,<tt>this</tt> doesn't support {@link #write(DataOutput)} or {@link #readFields(DataInput)}.
+ */
+public class NewerTimestampFilter extends FilterBase {
+
+  private long timestamp;
+
+  public NewerTimestampFilter(long timestamp) {
+    this.timestamp = timestamp;
+  }
+
+  @Override
+  public ReturnCode filterKeyValue(KeyValue ignored) {
+    return ignored.getTimestamp() > timestamp ? ReturnCode.SKIP : ReturnCode.INCLUDE;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    throw new UnsupportedOperationException("TimestampFilter is server-side only!");
+  }
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    throw new UnsupportedOperationException("TimestampFilter is server-side only!");
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/ColumnReference.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/ColumnReference.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/ColumnReference.java
new file mode 100644
index 0000000..ae0421d
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/ColumnReference.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.covered.update;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+
+/**
+ * 
+ */
+public class ColumnReference implements Comparable<ColumnReference> {
+    
+  public static final byte[] ALL_QUALIFIERS = new byte[0];
+  
+  private static int calcHashCode(byte[] family, byte[] qualifier) {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + Bytes.hashCode(family);
+    result = prime * result + Bytes.hashCode(qualifier);
+    return result;
+  }
+
+  private final int hashCode;
+  protected final byte[] family;
+  protected final byte[] qualifier;
+    private volatile ImmutableBytesWritable familyPtr;
+    private volatile ImmutableBytesWritable qualifierPtr;
+
+  public ColumnReference(byte[] family, byte[] qualifier) {
+    this.family = family;
+    this.qualifier = qualifier;
+    this.hashCode = calcHashCode(family, qualifier);
+  }
+
+  public byte[] getFamily() {
+    return this.family;
+  }
+
+  public byte[] getQualifier() {
+    return this.qualifier;
+  }
+  
+    public ImmutableBytesWritable getFamilyWritable() {
+        if (this.familyPtr == null) {
+            synchronized (this.family) {
+                if (this.familyPtr == null) {
+                    this.familyPtr = new ImmutableBytesPtr(this.family);
+                }
+            }
+        }
+        return this.familyPtr;
+    }
+
+    public ImmutableBytesWritable getQualifierWritable() {
+        if (this.qualifierPtr == null) {
+            synchronized (this.qualifier) {
+                if (this.qualifierPtr == null) {
+                    this.qualifierPtr = new ImmutableBytesPtr(this.qualifier);
+                }
+            }
+        }
+        return this.qualifierPtr;
+    }
+
+  public boolean matches(KeyValue kv) {
+    if (matchesFamily(kv.getBuffer(), kv.getFamilyOffset(), kv.getFamilyLength())) {
+      return matchesQualifier(kv.getBuffer(), kv.getQualifierOffset(), kv.getQualifierLength());
+    }
+    return false;
+  }
+
+  /**
+   * @param qual to check against
+   * @return <tt>true</tt> if this column covers the given qualifier.
+   */
+  public boolean matchesQualifier(byte[] qual) {
+    return matchesQualifier(qual, 0, qual.length);
+  }
+
+  public boolean matchesQualifier(byte[] bytes, int offset, int length) {
+    return allColumns() ? true : match(bytes, offset, length, qualifier);
+  }
+
+  /**
+   * @param family to check against
+   * @return <tt>true</tt> if this column covers the given family.
+   */
+  public boolean matchesFamily(byte[] family) {
+    return matchesFamily(family, 0, family.length);
+  }
+
+  public boolean matchesFamily(byte[] bytes, int offset, int length) {
+    return match(bytes, offset, length, family);
+  }
+
+  /**
+   * @return <tt>true</tt> if this should include all column qualifiers, <tt>false</tt> otherwise
+   */
+  public boolean allColumns() {
+    return this.qualifier == ALL_QUALIFIERS;
+  }
+
+  /**
+   * Check to see if the passed bytes match the stored bytes
+   * @param first
+   * @param storedKey the stored byte[], should never be <tt>null</tt>
+   * @return <tt>true</tt> if they are byte-equal
+   */
+  private boolean match(byte[] first, int offset, int length, byte[] storedKey) {
+    return first == null ? false : Bytes.equals(first, offset, length, storedKey, 0,
+      storedKey.length);
+  }
+
+  public KeyValue getFirstKeyValueForRow(byte[] row) {
+    return KeyValue.createFirstOnRow(row, family, qualifier == ALL_QUALIFIERS ? null : qualifier);
+  }
+
+  @Override
+  public int compareTo(ColumnReference o) {
+    int c = Bytes.compareTo(family, o.family);
+    if (c == 0) {
+      // matching families, compare qualifiers
+      c = Bytes.compareTo(qualifier, o.qualifier);
+    }
+    return c;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (o instanceof ColumnReference) {
+      ColumnReference other = (ColumnReference) o;
+      if (hashCode == other.hashCode && Bytes.equals(family, other.family)) {
+        return Bytes.equals(qualifier, other.qualifier);
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return hashCode;
+  }
+
+  @Override
+  public String toString() {
+    return "ColumnReference - " + Bytes.toString(family) + ":" + Bytes.toString(qualifier);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/ColumnTracker.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/ColumnTracker.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/ColumnTracker.java
new file mode 100644
index 0000000..b9f3858
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/ColumnTracker.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.covered.update;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+
+/**
+ * Simple POJO for tracking a bunch of column references and the next-newest timestamp for those
+ * columns
+ * <p>
+ * Two {@link ColumnTracker}s are considered equal if they track the same columns, even if their
+ * timestamps are different.
+ */
+public class ColumnTracker implements IndexedColumnGroup {
+
+  public static final long NO_NEWER_PRIMARY_TABLE_ENTRY_TIMESTAMP = Long.MAX_VALUE;
+  public static final long GUARANTEED_NEWER_UPDATES = Long.MIN_VALUE;
+  private final List<ColumnReference> columns;
+  private long ts = NO_NEWER_PRIMARY_TABLE_ENTRY_TIMESTAMP;
+  private final int hashCode;
+
+  private static int calcHashCode(List<ColumnReference> columns) {
+      return columns.hashCode();
+    }
+
+  public ColumnTracker(Collection<? extends ColumnReference> columns) {
+    this.columns = new ArrayList<ColumnReference>(columns);
+    // sort the columns
+    Collections.sort(this.columns);
+    this.hashCode = calcHashCode(this.columns);
+  }
+
+  /**
+   * Set the current timestamp, only if the passed timestamp is strictly less than the currently
+   * stored timestamp
+   * @param ts the timestmap to potentially store.
+   * @return the currently stored timestamp.
+   */
+  public long setTs(long ts) {
+    this.ts = this.ts > ts ? ts : this.ts;
+    return this.ts;
+  }
+
+  public long getTS() {
+    return this.ts;
+  }
+
+  @Override
+  public int hashCode() {
+    return hashCode;
+  }
+
+  @Override
+  public boolean equals(Object o){
+    if(!(o instanceof ColumnTracker)){
+      return false;
+    }
+    ColumnTracker other = (ColumnTracker)o;
+    if (hashCode != other.hashCode) {
+        return false;
+    }
+    if (other.columns.size() != columns.size()) {
+      return false;
+    }
+
+    // check each column to see if they match
+    for (int i = 0; i < columns.size(); i++) {
+      if (!columns.get(i).equals(other.columns.get(i))) {
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  @Override
+  public List<ColumnReference> getColumns() {
+    return this.columns;
+  }
+
+  /**
+   * @return <tt>true</tt> if this set of columns has seen a column with a timestamp newer than the
+   *         requested timestamp, <tt>false</tt> otherwise.
+   */
+  public boolean hasNewerTimestamps() {
+    return !isNewestTime(this.ts);
+  }
+
+  /**
+   * @param ts timestamp to check
+   * @return <tt>true</tt> if the timestamp is at the most recent timestamp for a column
+   */
+  public static boolean isNewestTime(long ts) {
+    return ts == NO_NEWER_PRIMARY_TABLE_ENTRY_TIMESTAMP;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/IndexUpdateManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/IndexUpdateManager.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/IndexUpdateManager.java
new file mode 100644
index 0000000..c8a188d
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/IndexUpdateManager.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.covered.update;
+
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Longs;
+
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+
+/**
+ * Keeps track of the index updates
+ */
+public class IndexUpdateManager {
+
+  public Comparator<Mutation> COMPARATOR = new MutationComparator();
+  class MutationComparator implements Comparator<Mutation> {
+
+    @Override
+    public int compare(Mutation o1, Mutation o2) {
+      // always sort rows first
+      int compare = o1.compareTo(o2);
+      if (compare != 0) {
+        return compare;
+      }
+
+      // if same row, sort by reverse timestamp (larger first)
+      compare = Longs.compare(o2.getTimeStamp(), o1.getTimeStamp());
+      if (compare != 0) {
+        return compare;
+      }
+      // deletes always sort before puts for the same row
+      if (o1 instanceof Delete) {
+        // same row, same ts == same delete since we only delete rows
+        if (o2 instanceof Delete) {
+          return 0;
+        } else {
+          // o2 has to be a put
+          return -1;
+        }
+      }
+      // o1 must be a put
+      if (o2 instanceof Delete) {
+        return 1;
+      } else if (o2 instanceof Put) {
+        return comparePuts((Put) o1, (Put) o2);
+      }
+
+      throw new RuntimeException(
+          "Got unexpected mutation types! Can only be Put or Delete, but got: " + o1 + ", and "
+              + o2);
+    }
+
+    private int comparePuts(Put p1, Put p2) {
+      int p1Size = p1.size();
+      int p2Size = p2.size();
+      int compare = p1Size - p2Size;
+      if (compare == 0) {
+        // TODO: make this a real comparison
+        // this is a little cheating, but we don't really need to worry too much about this being
+        // the same - chances are that exact matches here are really the same update.
+        return Longs.compare(p1.heapSize(), p2.heapSize());
+      }
+      return compare;
+    }
+
+  }
+
+  private static final String PHOENIX_HBASE_TEMP_DELETE_MARKER = "phoenix.hbase.temp.delete.marker";
+  private static final byte[] TRUE_MARKER = new byte[] { 1 };
+
+  protected final Map<ImmutableBytesPtr, Collection<Mutation>> map =
+      new HashMap<ImmutableBytesPtr, Collection<Mutation>>();
+
+  /**
+   * Add an index update. Keeps the latest {@link Put} for a given timestamp
+   * @param tableName
+   * @param m
+   */
+  public void addIndexUpdate(byte[] tableName, Mutation m) {
+    // we only keep the most recent update
+    ImmutableBytesPtr key = new ImmutableBytesPtr(tableName);
+    Collection<Mutation> updates = map.get(key);
+    if (updates == null) {
+      updates = new SortedCollection<Mutation>(COMPARATOR);
+      map.put(key, updates);
+    }
+    fixUpCurrentUpdates(updates, m);
+  }
+
+  /**
+   * Fix up the current updates, given the pending mutation.
+   * @param updates current updates
+   * @param pendingMutation
+   */
+  protected void fixUpCurrentUpdates(Collection<Mutation> updates, Mutation pendingMutation) {
+    // need to check for each entry to see if we have a duplicate
+    Mutation toRemove = null;
+    Delete pendingDelete = pendingMutation instanceof Delete ? (Delete) pendingMutation : null;
+    boolean sawRowMatch = false;
+    for (Mutation stored : updates) {
+      int compare = pendingMutation.compareTo(stored);
+      // skip to the right row
+      if (compare < 0) {
+        continue;
+      } else if (compare > 0) {
+        if (sawRowMatch) {
+          break;
+        }
+        continue;
+      }
+
+      // set that we saw a row match, so any greater row will necessarily be the wrong
+      sawRowMatch = true;
+
+      // skip until we hit the right timestamp
+      if (stored.getTimeStamp() < pendingMutation.getTimeStamp()) {
+        continue;
+      }
+
+      if (stored instanceof Delete) {
+        // we already have a delete for this row, so we are done.
+        if (pendingDelete != null) {
+          return;
+        }
+        // pending update must be a Put, so we ignore the Put.
+        // add a marker in the this delete that it has been canceled out already. We need to keep
+        // the delete around though so we can figure out if other Puts would also be canceled out.
+        markMutationForRemoval(stored);
+        return;
+      }
+
+      // otherwise, the stored mutation is a Put. Either way, we want to remove it. If the pending
+      // update is a delete, we need to remove the entry (no longer applies - covered by the
+      // delete), or its an older version of the row, so we cover it with the newer.
+      toRemove = stored;
+      if (pendingDelete != null) {
+        // the pending mutation, but we need to mark the mutation for removal later
+        markMutationForRemoval(pendingMutation);
+        break;
+      }
+    }
+    
+    updates.remove(toRemove);
+    updates.add(pendingMutation);
+  }
+
+  private void markMutationForRemoval(Mutation m) {
+    m.setAttribute(PHOENIX_HBASE_TEMP_DELETE_MARKER, TRUE_MARKER);
+  }
+
+  public List<Pair<Mutation, byte[]>> toMap() {
+    List<Pair<Mutation, byte[]>> updateMap = Lists.newArrayList();
+    for (Entry<ImmutableBytesPtr, Collection<Mutation>> updates : map.entrySet()) {
+      // get is ok because we always set with just the bytes
+      byte[] tableName = updates.getKey().get();
+      // TODO replace this as just storing a byte[], to avoid all the String <-> byte[] swapping
+      // HBase does
+      for (Mutation m : updates.getValue()) {
+        // skip elements that have been marked for delete
+        if (shouldBeRemoved(m)) {
+          continue;
+        }
+        updateMap.add(new Pair<Mutation, byte[]>(m, tableName));
+      }
+    }
+    return updateMap;
+  }
+
+  /**
+   * @param updates
+   */
+  public void addAll(Collection<Pair<Mutation, String>> updates) {
+    for (Pair<Mutation, String> update : updates) {
+      addIndexUpdate(Bytes.toBytes(update.getSecond()), update.getFirst());
+    }
+  }
+
+  private boolean shouldBeRemoved(Mutation m) {
+    return m.getAttribute(PHOENIX_HBASE_TEMP_DELETE_MARKER) != null;
+  }
+
+  @Override
+  public String toString() {
+    StringBuffer sb = new StringBuffer("Pending Index Updates:\n");
+    for (Entry<ImmutableBytesPtr, Collection<Mutation>> entry : map.entrySet()) {
+      String tableName = Bytes.toString(entry.getKey().get());
+      sb.append("   Table: '" + tableName + "'\n");
+      for (Mutation m : entry.getValue()) {
+        sb.append("\t");
+        if (shouldBeRemoved(m)) {
+          sb.append("[REMOVED]");
+        }
+        sb.append(m.getClass().getSimpleName() + ":"
+            + ((m instanceof Put) ? m.getTimeStamp() + " " : ""));
+        sb.append(" row=" + Bytes.toString(m.getRow()));
+        sb.append("\n");
+        if (m.getFamilyMap().isEmpty()) {
+          sb.append("\t\t=== EMPTY ===\n");
+        }
+        for (List<KeyValue> kvs : m.getFamilyMap().values()) {
+          for (KeyValue kv : kvs) {
+            sb.append("\t\t" + kv.toString() + "/value=" + Bytes.toStringBinary(kv.getValue()));
+            sb.append("\n");
+          }
+        }
+      }
+    }
+    return sb.toString();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/IndexedColumnGroup.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/IndexedColumnGroup.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/IndexedColumnGroup.java
new file mode 100644
index 0000000..3c98f0f
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/IndexedColumnGroup.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.covered.update;
+
+import java.util.List;
+
+/**
+ * Group of columns that were requested to build an index
+ */
+public interface IndexedColumnGroup {
+
+  public List<ColumnReference> getColumns();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/SortedCollection.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/SortedCollection.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/SortedCollection.java
new file mode 100644
index 0000000..ee8b453
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/SortedCollection.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.covered.update;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.PriorityQueue;
+
+import com.google.common.collect.Iterators;
+
+/**
+ * A collection whose elements are stored and returned sorted.
+ * <p>
+ * We can't just use something like a {@link PriorityQueue} because it doesn't return the
+ * underlying values in sorted order.
+ * @param <T>
+ */
+class SortedCollection<T> implements Collection<T>, Iterable<T> {
+
+  private PriorityQueue<T> queue;
+  private Comparator<T> comparator;
+
+  /**
+   * Use the given comparator to compare all keys for sorting
+   * @param comparator
+   */
+  public SortedCollection(Comparator<T> comparator) {
+    this.queue = new PriorityQueue<T>(1, comparator);
+    this.comparator = comparator;
+  }
+  
+  /**
+   * All passed elements are expected to be {@link Comparable}
+   */
+  public SortedCollection() {
+    this.queue = new PriorityQueue<T>();
+  }
+  
+  @Override
+  public int size() {
+    return this.queue.size();
+  }
+
+  @Override
+  public boolean isEmpty() {
+    return this.queue.isEmpty();
+  }
+
+  @Override
+  public boolean contains(Object o) {
+    return this.queue.contains(o);
+  }
+
+  @Override
+  public Iterator<T> iterator() {
+    @SuppressWarnings("unchecked")
+    T[] array = (T[]) this.queue.toArray();
+    if (this.comparator == null) {
+      Arrays.sort(array);
+    } else {
+      Arrays.sort(
+     array, this.comparator);}
+    return Iterators.forArray(array);
+  }
+
+  @Override
+  public Object[] toArray() {
+    return this.queue.toArray();
+  }
+
+  @SuppressWarnings("hiding")
+  @Override
+  public <T> T[] toArray(T[] a) {
+    return this.queue.toArray(a);
+  }
+
+  @Override
+  public boolean add(T e) {
+    return this.queue.add(e);
+  }
+
+  @Override
+  public boolean remove(Object o) {
+    return this.queue.remove(o);
+  }
+
+  @Override
+  public boolean containsAll(Collection<?> c) {
+    return this.queue.containsAll(c);
+  }
+
+  @Override
+  public boolean addAll(Collection<? extends T> c) {
+    return this.queue.addAll(c);
+  }
+
+  @Override
+  public boolean removeAll(Collection<?> c) {
+    return queue.removeAll(c);
+  }
+
+  @Override
+  public boolean retainAll(Collection<?> c) {
+    return this.queue.retainAll(c);
+  }
+
+  @Override
+  public void clear() {
+    this.queue.clear();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/exception/IndexWriteException.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/exception/IndexWriteException.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/exception/IndexWriteException.java
new file mode 100644
index 0000000..2ec29bc
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/exception/IndexWriteException.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.exception;
+
+import org.apache.hadoop.hbase.HBaseIOException;
+
+/**
+ * Generic {@link Exception} that an index write has failed
+ */
+@SuppressWarnings("serial")
+public class IndexWriteException extends HBaseIOException {
+
+  public IndexWriteException() {
+    super();
+  }
+
+  public IndexWriteException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public IndexWriteException(String message) {
+    super(message);
+  }
+
+  public IndexWriteException(Throwable cause) {
+    super(cause);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/exception/MultiIndexWriteFailureException.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/exception/MultiIndexWriteFailureException.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/exception/MultiIndexWriteFailureException.java
new file mode 100644
index 0000000..546b43d
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/exception/MultiIndexWriteFailureException.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.exception;
+
+import java.util.List;
+
+import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
+
+/**
+ * Indicate a failure to write to multiple index tables.
+ */
+@SuppressWarnings("serial")
+public class MultiIndexWriteFailureException extends IndexWriteException {
+
+  private List<HTableInterfaceReference> failures;
+
+  /**
+   * @param failures the tables to which the index write did not succeed
+   */
+  public MultiIndexWriteFailureException(List<HTableInterfaceReference> failures) {
+    super("Failed to write to multiple index tables");
+    this.failures = failures;
+
+  }
+
+  public List<HTableInterfaceReference> getFailedTables() {
+    return this.failures;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/exception/SingleIndexWriteFailureException.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/exception/SingleIndexWriteFailureException.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/exception/SingleIndexWriteFailureException.java
new file mode 100644
index 0000000..eb3b521
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/exception/SingleIndexWriteFailureException.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.exception;
+
+import java.util.List;
+
+import org.apache.hadoop.hbase.client.Mutation;
+
+/**
+ * Exception thrown if we cannot successfully write to an index table.
+ */
+@SuppressWarnings("serial")
+public class SingleIndexWriteFailureException extends IndexWriteException {
+
+  private String table;
+
+  /**
+   * Cannot reach the index, but not sure of the table or the mutations that caused the failure
+   * @param msg more description of what happened
+   * @param cause original cause
+   */
+  public SingleIndexWriteFailureException(String msg, Throwable cause) {
+    super(msg, cause);
+  }
+
+  /**
+   * Failed to write the passed mutations to an index table for some reason.
+   * @param targetTableName index table to which we attempted to write
+   * @param mutations mutations that were attempted
+   * @param cause underlying reason for the failure
+   */
+  public SingleIndexWriteFailureException(String targetTableName, List<Mutation> mutations,
+      Exception cause) {
+    super("Failed to make index update:\n\t table: " + targetTableName + "\n\t edits: " + mutations
+        + "\n\tcause: " + cause == null ? "UNKNOWN" : cause.getMessage(), cause);
+    this.table = targetTableName;
+  }
+
+  /**
+   * @return The table to which we failed to write the index updates. If unknown, returns
+   *         <tt>null</tt>
+   */
+  public String getTableName() {
+    return this.table;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/parallel/BaseTaskRunner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/parallel/BaseTaskRunner.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/parallel/BaseTaskRunner.java
new file mode 100644
index 0000000..5cd3fcb
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/parallel/BaseTaskRunner.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.parallel;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Abortable;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+
+/**
+ * {@link TaskRunner} that just manages the underlying thread pool. On called to
+ * {@link #stop(String)}, the thread pool is shutdown immediately - all pending tasks are cancelled
+ * and running tasks receive and interrupt.
+ * <p>
+ * If we find a failure the failure is propagated to the {@link TaskBatch} so any {@link Task} that
+ * is interested can kill itself as well.
+ */
+public abstract class BaseTaskRunner implements TaskRunner {
+
+  private static final Log LOG = LogFactory.getLog(BaseTaskRunner.class);
+  protected ListeningExecutorService writerPool;
+  private boolean stopped;
+
+  public BaseTaskRunner(ExecutorService service) {
+    this.writerPool = MoreExecutors.listeningDecorator(service);
+  }
+
+  @Override
+  public <R> List<R> submit(TaskBatch<R> tasks) throws CancellationException, ExecutionException,
+      InterruptedException {
+    // submit each task to the pool and queue it up to be watched
+    List<ListenableFuture<R>> futures = new ArrayList<ListenableFuture<R>>(tasks.size());
+    for (Task<R> task : tasks.getTasks()) {
+      futures.add(this.writerPool.submit(task));
+    }
+    try {
+      // This logic is actually much more synchronized than the previous logic. Now we rely on a
+      // synchronization around the status to tell us when we are done. While this does have the
+      // advantage of being (1) less code, and (2) supported as part of a library, it is just that
+      // little bit slower. If push comes to shove, we can revert back to the previous
+      // implementation, but for right now, this works just fine.
+      return submitTasks(futures).get();
+    } catch (CancellationException e) {
+      // propagate the failure back out
+      logAndNotifyAbort(e, tasks);
+      throw e;
+    } catch (ExecutionException e) {
+      // propagate the failure back out
+      logAndNotifyAbort(e, tasks);
+      throw e;
+    }
+  }
+
+  private void logAndNotifyAbort(Exception e, Abortable abort) {
+    String msg = "Found a failed task because: " + e.getMessage();
+    LOG.error(msg, e);
+    abort.abort(msg, e.getCause());
+  }
+
+  /**
+   * Build a ListenableFuture for the tasks. Implementing classes can determine return behaviors on
+   * the given tasks
+   * @param futures to wait on
+   * @return a single {@link ListenableFuture} that completes based on the passes tasks.
+   */
+  protected abstract <R> ListenableFuture<List<R>> submitTasks(List<ListenableFuture<R>> futures);
+
+  @Override
+  public <R> List<R> submitUninterruptible(TaskBatch<R> tasks) throws EarlyExitFailure,
+      ExecutionException {
+    boolean interrupted = false;
+    try {
+      while (!this.isStopped()) {
+        try {
+          return this.submit(tasks);
+        } catch (InterruptedException e) {
+          interrupted = true;
+        }
+      }
+    } finally {
+      // restore the interrupted status
+      if (interrupted) {
+        Thread.currentThread().interrupt();
+      }
+    }
+
+    // should only get here if we are interrupted while waiting for a result and have been told to
+    // shutdown by an external source
+    throw new EarlyExitFailure("Interrupted and stopped before computation was complete!");
+  }
+
+  @Override
+  public void stop(String why) {
+    if (this.stopped) {
+      return;
+    }
+    LOG.info("Shutting down task runner because " + why);
+    this.writerPool.shutdownNow();
+  }
+
+  @Override
+  public boolean isStopped() {
+    return this.stopped;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/parallel/EarlyExitFailure.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/parallel/EarlyExitFailure.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/parallel/EarlyExitFailure.java
new file mode 100644
index 0000000..8a0dedc
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/parallel/EarlyExitFailure.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.parallel;
+
+import java.io.IOException;
+
+/**
+ * Exception denoting a need to early-exit a task (or group of tasks) due to external notification
+ */
+@SuppressWarnings("serial")
+public class EarlyExitFailure extends IOException {
+
+  /**
+   * @param msg reason for the early exit
+   */
+  public EarlyExitFailure(String msg) {
+    super(msg);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/parallel/QuickFailingTaskRunner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/parallel/QuickFailingTaskRunner.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/parallel/QuickFailingTaskRunner.java
new file mode 100644
index 0000000..5b9717e
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/parallel/QuickFailingTaskRunner.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.parallel;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
+/**
+ * {@link TaskRunner} that attempts to run all tasks passed, but quits early if any {@link Task}
+ * fails, not waiting for the remaining {@link Task}s to complete.
+ */
+public class QuickFailingTaskRunner extends BaseTaskRunner {
+
+  static final Log LOG = LogFactory.getLog(QuickFailingTaskRunner.class);
+
+  /**
+   * @param service thread pool to which {@link Task}s are submitted. This service is then 'owned'
+   *          by <tt>this</tt> and will be shutdown on calls to {@link #stop(String)}.
+   */
+  public QuickFailingTaskRunner(ExecutorService service) {
+    super(service);
+  }
+
+  @Override
+  protected <R> ListenableFuture<List<R>> submitTasks(List<ListenableFuture<R>> futures) {
+    return Futures.allAsList(futures);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/parallel/Task.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/parallel/Task.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/parallel/Task.java
new file mode 100644
index 0000000..4b32e71
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/parallel/Task.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.parallel;
+
+import java.util.concurrent.Callable;
+
+import org.apache.hadoop.hbase.Abortable;
+
+/**
+ * Like a {@link Callable}, but supports an internal {@link Abortable} that can be checked
+ * periodically to determine if the batch should abort
+ * @param <V> expected result of the task
+ */
+public abstract class Task<V> implements Callable<V> {
+
+  private Abortable batch;
+
+  void setBatchMonitor(Abortable abort) {
+    this.batch = abort;
+  }
+
+  protected boolean isBatchFailed() {
+    return this.batch.isAborted();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/parallel/TaskBatch.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/parallel/TaskBatch.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/parallel/TaskBatch.java
new file mode 100644
index 0000000..62e4522
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/parallel/TaskBatch.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.parallel;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Abortable;
+
+/**
+ * A group of {@link Task}s. The tasks are all bound together using the same {@link Abortable} (
+ * <tt>this</tt>) to ensure that all tasks are aware when any of the other tasks fails.
+ * @param <V> expected result type from all the tasks
+ */
+public class TaskBatch<V> implements Abortable {
+  private static final Log LOG = LogFactory.getLog(TaskBatch.class);
+  private AtomicBoolean aborted = new AtomicBoolean();
+  private List<Task<V>> tasks;
+
+  /**
+   * @param size expected number of tasks
+   */
+  public TaskBatch(int size) {
+    this.tasks = new ArrayList<Task<V>>(size);
+  }
+
+  public void add(Task<V> task) {
+    this.tasks.add(task);
+    task.setBatchMonitor(this);
+  }
+
+  public Collection<Task<V>> getTasks() {
+    return this.tasks;
+  }
+
+  @Override
+  public void abort(String why, Throwable e) {
+    if (this.aborted.getAndSet(true)) {
+      return;
+    }
+    LOG.info("Aborting batch of tasks because " + why);
+  }
+
+  @Override
+  public boolean isAborted() {
+    return this.aborted.get();
+  }
+
+  /**
+   * @return the number of tasks assigned to this batch
+   */
+  public int size() {
+    return this.tasks.size();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/parallel/TaskRunner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/parallel/TaskRunner.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/parallel/TaskRunner.java
new file mode 100644
index 0000000..003e18f
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/parallel/TaskRunner.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.parallel;
+
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.hadoop.hbase.Stoppable;
+
+/**
+ *
+ */
+public interface TaskRunner extends Stoppable {
+
+  /**
+   * Submit the given tasks to the pool and wait for them to complete. fail.
+   * <p>
+   * Non-interruptible method. To stop any running tasks call {@link #stop(String)} - this will
+   * shutdown the thread pool, causing any pending tasks to be failed early (whose failure will be
+   * ignored) and interrupt any running tasks. It is up to the passed tasks to respect the interrupt
+   * notification
+   * @param tasks to run
+   * @return the result from each task
+   * @throws ExecutionException if any of the tasks fails. Wraps the underyling failure, which can
+   *           be retrieved via {@link ExecutionException#getCause()}.
+   * @throws InterruptedException if the current thread is interrupted while waiting for the batch
+   *           to complete
+   */
+  public <R> List<R> submit(TaskBatch<R> tasks) throws
+      ExecutionException, InterruptedException;
+
+  /**
+   * Similar to {@link #submit(TaskBatch)}, but is not interruptible. If an interrupt is found while
+   * waiting for results, we ignore it and only stop is {@link #stop(String)} has been called. On
+   * return from the method, the interrupt status of the thread is restored.
+   * @param tasks to run
+   * @return the result from each task
+   * @throws EarlyExitFailure if there are still tasks to submit to the pool, but there is a stop
+   *           notification
+   * @throws ExecutionException if any of the tasks fails. Wraps the underyling failure, which can
+   *           be retrieved via {@link ExecutionException#getCause()}.
+   */
+  public <R> List<R> submitUninterruptible(TaskBatch<R> tasks) throws EarlyExitFailure,
+      ExecutionException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/parallel/ThreadPoolBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/parallel/ThreadPoolBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/parallel/ThreadPoolBuilder.java
new file mode 100644
index 0000000..58a976a
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/parallel/ThreadPoolBuilder.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.parallel;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.Pair;
+
+/**
+ * Helper utility to make a thread pool from a configuration based on reasonable defaults and passed
+ * configuration keys.
+ */
+public class ThreadPoolBuilder {
+
+  private static final Log LOG = LogFactory.getLog(ThreadPoolBuilder.class);
+  private static final long DEFAULT_TIMEOUT = 60;
+  private static final int DEFAULT_MAX_THREADS = 1;// is there a better default?
+  private Pair<String, Long> timeout;
+  private Pair<String, Integer> maxThreads;
+  private String name;
+  private Configuration conf;
+
+  public ThreadPoolBuilder(String poolName, Configuration conf) {
+    this.name = poolName;
+    this.conf = conf;
+  }
+
+  public ThreadPoolBuilder setCoreTimeout(String confkey, long defaultTime) {
+    if (defaultTime <= 0) {
+      defaultTime = DEFAULT_TIMEOUT;
+    }
+    this.timeout = new Pair<String, Long>(confkey, defaultTime);
+    return this;
+  }
+
+  public ThreadPoolBuilder setCoreTimeout(String confKey) {
+    return this.setCoreTimeout(confKey, DEFAULT_TIMEOUT);
+  }
+
+  public ThreadPoolBuilder setMaxThread(String confkey, int defaultThreads) {
+    if (defaultThreads <= 0) {
+      defaultThreads = DEFAULT_MAX_THREADS;
+    }
+    this.maxThreads = new Pair<String, Integer>(confkey, defaultThreads);
+    return this;
+  }
+
+  String getName() {
+   return this.name;
+  }
+
+  int getMaxThreads() {
+    int maxThreads = DEFAULT_MAX_THREADS;
+    if (this.maxThreads != null) {
+      String key = this.maxThreads.getFirst();
+      maxThreads =
+          key == null ? this.maxThreads.getSecond() : conf.getInt(key, this.maxThreads.getSecond());
+    }
+    LOG.trace("Creating pool builder with max " + maxThreads + " threads ");
+    return maxThreads;
+  }
+
+  long getKeepAliveTime() {
+    long timeout =DEFAULT_TIMEOUT;
+    if (this.timeout != null) {
+      String key = this.timeout.getFirst();
+      timeout =
+          key == null ? this.timeout.getSecond() : conf.getLong(key, this.timeout.getSecond());
+    }
+
+    LOG.trace("Creating pool builder with core thread timeout of " + timeout + " seconds ");
+    return timeout;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/parallel/ThreadPoolManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/parallel/ThreadPoolManager.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/parallel/ThreadPoolManager.java
new file mode 100644
index 0000000..efde03e
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/parallel/ThreadPoolManager.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.parallel;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.util.Threads;
+
+/**
+ * Manage access to thread pools
+ */
+public class ThreadPoolManager {
+
+  private static final Log LOG = LogFactory.getLog(ThreadPoolManager.class);
+
+  /**
+   * Get an executor for the given name, based on the passed {@link Configuration}. If a thread pool
+   * already exists with that name, it will be returned.
+   * @param builder
+   * @param env
+   * @return a {@link ThreadPoolExecutor} for the given name. Thread pool that only shuts down when
+   *         there are no more explicit references to it. You do not need to shutdown the threadpool
+   *         on your own - it is managed for you. When you are done, you merely need to release your
+   *         reference. If you do attempt to shutdown the pool, you should be careful to call
+   *         {@link ThreadPoolExecutor#shutdown()} XOR {@link ThreadPoolExecutor#shutdownNow()} - extra calls to either can lead to
+   *         early shutdown of the pool.
+   */
+  public static synchronized ThreadPoolExecutor getExecutor(ThreadPoolBuilder builder,
+      RegionCoprocessorEnvironment env) {
+    return getExecutor(builder, env.getSharedData());
+  }
+
+  static synchronized ThreadPoolExecutor getExecutor(ThreadPoolBuilder builder,
+      Map<String, Object> poolCache) {
+    ThreadPoolExecutor pool = (ThreadPoolExecutor) poolCache.get(builder.getName());
+    if (pool == null || pool.isTerminating() || pool.isShutdown()) {
+      pool = getDefaultExecutor(builder);
+      LOG.info("Creating new pool for " + builder.getName());
+      poolCache.put(builder.getName(), pool);
+    }
+    ((ShutdownOnUnusedThreadPoolExecutor) pool).addReference();
+
+    return pool;
+  }
+
+  /**
+   * @param conf
+   * @return
+   */
+  private static ShutdownOnUnusedThreadPoolExecutor getDefaultExecutor(ThreadPoolBuilder builder) {
+    int maxThreads = builder.getMaxThreads();
+    long keepAliveTime = builder.getKeepAliveTime();
+
+    // we prefer starting a new thread to queuing (the opposite of the usual ThreadPoolExecutor)
+    // since we are probably writing to a bunch of index tables in this case. Any pending requests
+    // are then queued up in an infinite (Integer.MAX_VALUE) queue. However, we allow core threads
+    // to timeout, to we tune up/down for bursty situations. We could be a bit smarter and more
+    // closely manage the core-thread pool size to handle the bursty traffic (so we can always keep
+    // some core threads on hand, rather than starting from scratch each time), but that would take
+    // even more time. If we shutdown the pool, but are still putting new tasks, we can just do the
+    // usual policy and throw a RejectedExecutionException because we are shutting down anyways and
+    // the worst thing is that this gets unloaded.
+    ShutdownOnUnusedThreadPoolExecutor pool =
+        new ShutdownOnUnusedThreadPoolExecutor(maxThreads, maxThreads, keepAliveTime,
+            TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
+            Threads.newDaemonThreadFactory(builder.getName() + "-"), builder.getName());
+    pool.allowCoreThreadTimeOut(true);
+    return pool;
+  }
+
+  /**
+   * Thread pool that only shuts down when there are no more explicit references to it. A reference
+   * is when obtained and released on calls to {@link #shutdown()} or {@link #shutdownNow()}.
+   * Therefore, users should be careful to call {@link #shutdown()} XOR {@link #shutdownNow()} -
+   * extra calls to either can lead to early shutdown of the pool.
+   */
+  private static class ShutdownOnUnusedThreadPoolExecutor extends ThreadPoolExecutor {
+
+    private AtomicInteger references;
+    private String poolName;
+
+    public ShutdownOnUnusedThreadPoolExecutor(int coreThreads, int maxThreads, long keepAliveTime,
+        TimeUnit timeUnit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
+        String poolName) {
+      super(coreThreads, maxThreads, keepAliveTime, timeUnit, workQueue, threadFactory);
+      this.references = new AtomicInteger();
+      this.poolName = poolName;
+    }
+
+    public void addReference() {
+      this.references.incrementAndGet();
+    }
+
+    @Override
+    protected void finalize() {
+      // override references counter if we go out of scope - ensures the pool gets cleaned up
+      LOG.info("Shutting down pool '" + poolName + "' because no more references");
+      super.finalize();
+    }
+
+    @Override
+    public void shutdown() {
+      if (references.decrementAndGet() <= 0) {
+        LOG.debug("Shutting down pool " + this.poolName);
+        super.shutdown();
+      }
+    }
+
+    @Override
+    public List<Runnable> shutdownNow() {
+      if (references.decrementAndGet() <= 0) {
+        LOG.debug("Shutting down pool " + this.poolName + " NOW!");
+        return super.shutdownNow();
+      }
+      return Collections.emptyList();
+    }
+
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/parallel/WaitForCompletionTaskRunner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/parallel/WaitForCompletionTaskRunner.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/parallel/WaitForCompletionTaskRunner.java
new file mode 100644
index 0000000..2cc5bf6
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/parallel/WaitForCompletionTaskRunner.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.parallel;
+
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
+/**
+ * A {@link TaskRunner} that ensures that all the tasks have been attempted before we return, even
+ * if some of the tasks cause failures.
+ * <p>
+ * Because we wait until the entire batch is complete to see the failure, checking for failure of
+ * the {@link TaskBatch} on the submitted tasks will not help - they will never see the failure of
+ * the other tasks. You will need to provide an external mechanism to propagate the error.
+ * <p>
+ * Does not throw an {@link ExecutionException} if any of the tasks fail.
+ */
+public class WaitForCompletionTaskRunner extends BaseTaskRunner {
+  
+  /**
+   * @param service thread pool to which {@link Task}s are submitted. This service is then 'owned'
+   *          by <tt>this</tt> and will be shutdown on calls to {@link #stop(String)}.
+   */
+  public WaitForCompletionTaskRunner(ExecutorService service) {
+    super(service);
+  }
+
+  @Override
+  public <R> ListenableFuture<List<R>> submitTasks(List<ListenableFuture<R>> futures) {
+    return Futures.successfulAsList(futures);
+  }
+}
\ No newline at end of file