You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/01/27 23:15:41 UTC

[25/51] [partial] Initial commit of master branch from github

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
new file mode 100644
index 0000000..3f56bb8
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
@@ -0,0 +1,102 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.index;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+
+import com.google.common.collect.Multimap;
+import org.apache.hadoop.hbase.index.table.HTableInterfaceReference;
+import org.apache.hadoop.hbase.index.write.KillServerOnFailurePolicy;
+import org.apache.phoenix.coprocessor.MetaDataProtocol;
+import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
+import org.apache.phoenix.coprocessor.MetaDataProtocol.MutationCode;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.schema.PIndexState;
+import org.apache.phoenix.util.SchemaUtil;
+
+/**
+ * 
+ * Handler called in the event that index updates cannot be written to their
+ * region server. First attempts to disable the index and failing that falls
+ * back to the default behavior of killing the region server.
+ *
+ * TODO: use delegate pattern instead
+ * 
+ * @author jtaylor
+ * @since 2.1
+ */
+public class PhoenixIndexFailurePolicy extends  KillServerOnFailurePolicy {
+    private static final Log LOG = LogFactory.getLog(PhoenixIndexFailurePolicy.class);
+    private RegionCoprocessorEnvironment env;
+
+    public PhoenixIndexFailurePolicy() {
+    }
+
+    @Override
+    public void setup(Stoppable parent, RegionCoprocessorEnvironment env) {
+      super.setup(parent, env);
+      this.env = env;
+    }
+
+    @Override
+    public void handleFailure(Multimap<HTableInterfaceReference, Mutation> attempted, Exception cause) throws IOException {
+        Set<HTableInterfaceReference> refs = attempted.asMap().keySet();
+        StringBuilder buf = new StringBuilder("Disabled index" + (refs.size() > 1 ? "es " : " "));
+        try {
+            for (HTableInterfaceReference ref : refs) {
+                // Disable the index by using the updateIndexState method of MetaDataProtocol end point coprocessor.
+                String indexTableName = ref.getTableName();
+                byte[] indexTableKey = SchemaUtil.getTableKeyFromFullName(indexTableName);
+                HTableInterface systemTable = env.getTable(PhoenixDatabaseMetaData.TYPE_TABLE_NAME_BYTES);
+                MetaDataProtocol mdProxy = systemTable.coprocessorProxy(MetaDataProtocol.class, indexTableKey);
+                // Mimic the Put that gets generated by the client on an update of the index state
+                Put put = new Put(indexTableKey);
+                put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_STATE_BYTES, PIndexState.DISABLE.getSerializedBytes());
+                List<Mutation> tableMetadata = Collections.<Mutation>singletonList(put);
+                MetaDataMutationResult result = mdProxy.updateIndexState(tableMetadata);
+                if (result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS) {
+                    LOG.warn("Attempt to disable index " + indexTableName + " failed with code = " + result.getMutationCode() + ". Will use default failure policy instead.");
+                    super.handleFailure(attempted, cause);
+                }
+                LOG.info("Successfully disabled index " + indexTableName);
+                buf.append(indexTableName);
+                buf.append(',');
+            }
+            buf.setLength(buf.length()-1);
+            buf.append(" due to an exception while writing updates");
+        } catch (Throwable t) {
+            super.handleFailure(attempted, cause);
+        }
+        throw new DoNotRetryIOException(buf.toString(), cause);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/iterate/AggregatingResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/AggregatingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/AggregatingResultIterator.java
new file mode 100644
index 0000000..237a89a
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/AggregatingResultIterator.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import org.apache.phoenix.schema.tuple.Tuple;
+
+
+/**
+ * 
+ * Interface for scanners that either do aggregation
+ * or delegate to scanners that do aggregation.
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public interface AggregatingResultIterator extends ResultIterator {
+    /**
+     * Provides a means of re-aggregating a result row. For
+     * scanners that need to look ahead (i.e. {@link org.apache.phoenix.iterate.OrderedAggregatingResultIterator}
+     * @param result the row to re-aggregate
+     */
+    void aggregate(Tuple result);
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterator.java
new file mode 100644
index 0000000..ed6c360
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterator.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.sql.SQLException;
+import java.util.List;
+
+/**
+ * 
+ * Abstract base class for ResultIterator implementations that
+ * do nothing on close and have no explain plan steps
+ *
+ * @author jtaylor
+ * @since 1.2
+ */
+public abstract class BaseResultIterator implements ResultIterator {
+    
+    @Override
+    public void close() throws SQLException {
+    }
+
+    @Override
+    public void explain(List<String> planSteps) {
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java
new file mode 100644
index 0000000..08b4639
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.phoenix.schema.tuple.Tuple;
+
+
+/**
+ * 
+ * Result iterator that concatenates a list of other iterators.
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class ConcatResultIterator implements PeekingResultIterator {
+    private final ResultIterators resultIterators;
+    private List<PeekingResultIterator> iterators;
+    private int index;
+    
+    public ConcatResultIterator(ResultIterators iterators) {
+        this.resultIterators = iterators;
+    }
+    
+    private List<PeekingResultIterator> getIterators() throws SQLException {
+        if (iterators == null) {
+            iterators = resultIterators.getIterators();
+        }
+        return iterators;
+    }
+    
+    @Override
+    public void close() throws SQLException {
+        if (iterators != null) {
+            for (;index < iterators.size(); index++) {
+                PeekingResultIterator iterator = iterators.get(index);
+                iterator.close();
+            }
+        }
+    }
+
+
+    @Override
+    public void explain(List<String> planSteps) {
+        resultIterators.explain(planSteps);
+    }
+
+    private PeekingResultIterator currentIterator() throws SQLException {
+        List<PeekingResultIterator> iterators = getIterators();
+        while (index < iterators.size()) {
+            PeekingResultIterator iterator = iterators.get(index);
+            Tuple r = iterator.peek();
+            if (r != null) {
+                return iterator;
+            }
+            iterator.close();
+            index++;
+        }
+        return EMPTY_ITERATOR;
+    }
+    
+    @Override
+    public Tuple peek() throws SQLException {
+        return currentIterator().peek();
+    }
+
+    @Override
+    public Tuple next() throws SQLException {
+        return currentIterator().next();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelIteratorRegionSplitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelIteratorRegionSplitter.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelIteratorRegionSplitter.java
new file mode 100644
index 0000000..ab194f6
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelIteratorRegionSplitter.java
@@ -0,0 +1,229 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.parse.HintNode;
+import org.apache.phoenix.parse.HintNode.Hint;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.query.StatsManager;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.util.ReadOnlyProps;
+
+
+/**
+ * Default strategy for splitting regions in ParallelIterator. Refactored from the
+ * original version.
+ * 
+ * @author jtaylor
+ * @author zhuang
+ */
+public class DefaultParallelIteratorRegionSplitter implements ParallelIteratorRegionSplitter {
+
+    protected final int targetConcurrency;
+    protected final int maxConcurrency;
+    protected final int maxIntraRegionParallelization;
+    protected final StatementContext context;
+    protected final TableRef tableRef;
+
+    public static DefaultParallelIteratorRegionSplitter getInstance(StatementContext context, TableRef table, HintNode hintNode) {
+        return new DefaultParallelIteratorRegionSplitter(context, table, hintNode);
+    }
+
+    protected DefaultParallelIteratorRegionSplitter(StatementContext context, TableRef table, HintNode hintNode) {
+        this.context = context;
+        this.tableRef = table;
+        ReadOnlyProps props = context.getConnection().getQueryServices().getProps();
+        this.targetConcurrency = props.getInt(QueryServices.TARGET_QUERY_CONCURRENCY_ATTRIB,
+                QueryServicesOptions.DEFAULT_TARGET_QUERY_CONCURRENCY);
+        this.maxConcurrency = props.getInt(QueryServices.MAX_QUERY_CONCURRENCY_ATTRIB,
+                QueryServicesOptions.DEFAULT_MAX_QUERY_CONCURRENCY);
+        Preconditions.checkArgument(targetConcurrency >= 1, "Invalid target concurrency: " + targetConcurrency);
+        Preconditions.checkArgument(maxConcurrency >= targetConcurrency , "Invalid max concurrency: " + maxConcurrency);
+        this.maxIntraRegionParallelization = hintNode.hasHint(Hint.NO_INTRA_REGION_PARALLELIZATION) ? 1 : props.getInt(QueryServices.MAX_INTRA_REGION_PARALLELIZATION_ATTRIB,
+                QueryServicesOptions.DEFAULT_MAX_INTRA_REGION_PARALLELIZATION);
+        Preconditions.checkArgument(maxIntraRegionParallelization >= 1 , "Invalid max intra region parallelization: " + maxIntraRegionParallelization);
+    }
+
+    // Get the mapping between key range and the regions that contains them.
+    protected List<HRegionLocation> getAllRegions() throws SQLException {
+        Scan scan = context.getScan();
+        PTable table = tableRef.getTable();
+        List<HRegionLocation> allTableRegions = context.getConnection().getQueryServices().getAllTableRegions(table.getPhysicalName().getBytes());
+        // If we're not salting, then we've already intersected the minMaxRange with the scan range
+        // so there's nothing to do here.
+        return filterRegions(allTableRegions, scan.getStartRow(), scan.getStopRow());
+    }
+
+    /**
+     * Filters out regions that intersect with key range specified by the startKey and stopKey
+     * @param allTableRegions all region infos for a given table
+     * @param startKey the lower bound of key range, inclusive
+     * @param stopKey the upper bound of key range, inclusive
+     * @return regions that intersect with the key range given by the startKey and stopKey
+     */
+    // exposed for tests
+    public static List<HRegionLocation> filterRegions(List<HRegionLocation> allTableRegions, byte[] startKey, byte[] stopKey) {
+        Iterable<HRegionLocation> regions;
+        final KeyRange keyRange = KeyRange.getKeyRange(startKey, true, stopKey, false);
+        if (keyRange == KeyRange.EVERYTHING_RANGE) {
+            return allTableRegions;
+        }
+        
+        regions = Iterables.filter(allTableRegions, new Predicate<HRegionLocation>() {
+            @Override
+            public boolean apply(HRegionLocation location) {
+                KeyRange regionKeyRange = KeyRange.getKeyRange(location.getRegionInfo().getStartKey(), location.getRegionInfo().getEndKey());
+                return keyRange.intersect(regionKeyRange) != KeyRange.EMPTY_RANGE;
+            }
+        });
+        return Lists.newArrayList(regions);
+    }
+
+    protected List<KeyRange> genKeyRanges(List<HRegionLocation> regions) {
+        if (regions.isEmpty()) {
+            return Collections.emptyList();
+        }
+        
+        StatsManager statsManager = context.getConnection().getQueryServices().getStatsManager();
+        // the splits are computed as follows:
+        //
+        // let's suppose:
+        // t = target concurrency
+        // m = max concurrency
+        // r = the number of regions we need to scan
+        //
+        // if r >= t:
+        //    scan using regional boundaries
+        // elif r > t/2:
+        //    split each region in s splits such that:
+        //    s = max(x) where s * x < m
+        // else:
+        //    split each region in s splits such that:
+        //    s = max(x) where s * x < t
+        //
+        // The idea is to align splits with region boundaries. If rows are not evenly
+        // distributed across regions, using this scheme compensates for regions that
+        // have more rows than others, by applying tighter splits and therefore spawning
+        // off more scans over the overloaded regions.
+        int splitsPerRegion = regions.size() >= targetConcurrency ? 1 : (regions.size() > targetConcurrency / 2 ? maxConcurrency : targetConcurrency) / regions.size();
+        splitsPerRegion = Math.min(splitsPerRegion, maxIntraRegionParallelization);
+        // Create a multi-map of ServerName to List<KeyRange> which we'll use to round robin from to ensure
+        // that we keep each region server busy for each query.
+        ListMultimap<HRegionLocation,KeyRange> keyRangesPerRegion = ArrayListMultimap.create(regions.size(),regions.size() * splitsPerRegion);;
+        if (splitsPerRegion == 1) {
+            for (HRegionLocation region : regions) {
+                keyRangesPerRegion.put(region, ParallelIterators.TO_KEY_RANGE.apply(region));
+            }
+        } else {
+            // Maintain bucket for each server and then returns KeyRanges in round-robin
+            // order to ensure all servers are utilized.
+            for (HRegionLocation region : regions) {
+                byte[] startKey = region.getRegionInfo().getStartKey();
+                byte[] stopKey = region.getRegionInfo().getEndKey();
+                boolean lowerUnbound = Bytes.compareTo(startKey, HConstants.EMPTY_START_ROW) == 0;
+                boolean upperUnbound = Bytes.compareTo(stopKey, HConstants.EMPTY_END_ROW) == 0;
+                /*
+                 * If lower/upper unbound, get the min/max key from the stats manager.
+                 * We use this as the boundary to split on, but we still use the empty
+                 * byte as the boundary in the actual scan (in case our stats are out
+                 * of date).
+                 */
+                if (lowerUnbound) {
+                    startKey = statsManager.getMinKey(tableRef);
+                    if (startKey == null) {
+                        keyRangesPerRegion.put(region,ParallelIterators.TO_KEY_RANGE.apply(region));
+                        continue;
+                    }
+                }
+                if (upperUnbound) {
+                    stopKey = statsManager.getMaxKey(tableRef);
+                    if (stopKey == null) {
+                        keyRangesPerRegion.put(region,ParallelIterators.TO_KEY_RANGE.apply(region));
+                        continue;
+                    }
+                }
+                
+                byte[][] boundaries = null;
+                // Both startKey and stopKey will be empty the first time
+                if (Bytes.compareTo(startKey, stopKey) >= 0 || (boundaries = Bytes.split(startKey, stopKey, splitsPerRegion - 1)) == null) {
+                    // Bytes.split may return null if the key space
+                    // between start and end key is too small
+                    keyRangesPerRegion.put(region,ParallelIterators.TO_KEY_RANGE.apply(region));
+                } else {
+                    keyRangesPerRegion.put(region,KeyRange.getKeyRange(lowerUnbound ? KeyRange.UNBOUND : boundaries[0], boundaries[1]));
+                    if (boundaries.length > 1) {
+                        for (int i = 1; i < boundaries.length-2; i++) {
+                            keyRangesPerRegion.put(region,KeyRange.getKeyRange(boundaries[i], true, boundaries[i+1], false));
+                        }
+                        keyRangesPerRegion.put(region,KeyRange.getKeyRange(boundaries[boundaries.length-2], true, upperUnbound ? KeyRange.UNBOUND : boundaries[boundaries.length-1], false));
+                    }
+                }
+            }
+        }
+        List<KeyRange> splits = Lists.newArrayListWithCapacity(regions.size() * splitsPerRegion);
+        // as documented for ListMultimap
+        Collection<Collection<KeyRange>> values = keyRangesPerRegion.asMap().values();
+        List<Collection<KeyRange>> keyRangesList = Lists.newArrayList(values);
+        // Randomize range order to ensure that we don't hit the region servers in the same order every time
+        // thus helping to distribute the load more evenly
+        Collections.shuffle(keyRangesList);
+        // Transpose values in map to get regions in round-robin server order. This ensures that
+        // all servers will be used to process the set of parallel threads available in our executor.
+        int i = 0;
+        boolean done;
+        do {
+            done = true;
+            for (int j = 0; j < keyRangesList.size(); j++) {
+                List<KeyRange> keyRanges = (List<KeyRange>)keyRangesList.get(j);
+                if (i < keyRanges.size()) {
+                    splits.add(keyRanges.get(i));
+                    done = false;
+                }
+            }
+            i++;
+        } while (!done);
+        return splits;
+    }
+
+    @Override
+    public List<KeyRange> getSplits() throws SQLException {
+        return genKeyRanges(getAllRegions());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/iterate/DelegateResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DelegateResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DelegateResultIterator.java
new file mode 100644
index 0000000..25df5ad
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DelegateResultIterator.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.phoenix.schema.tuple.Tuple;
+
+
+public class DelegateResultIterator implements ResultIterator {
+    private final ResultIterator delegate;
+    
+    public DelegateResultIterator(ResultIterator delegate) {
+        this.delegate = delegate;
+    }
+    
+    @Override
+    public void close() throws SQLException {
+        delegate.close();
+    }
+
+    @Override
+    public Tuple next() throws SQLException {
+        return delegate.next();
+    }
+
+    @Override
+    public void explain(List<String> planSteps) {
+        delegate.explain(planSteps);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/iterate/DistinctAggregatingResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DistinctAggregatingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DistinctAggregatingResultIterator.java
new file mode 100644
index 0000000..f7af26c
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DistinctAggregatingResultIterator.java
@@ -0,0 +1,163 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.sql.SQLException;
+import java.util.*;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Sets;
+import org.apache.phoenix.compile.ColumnProjector;
+import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+/**
+ * Result scanner that dedups the incoming tuples to make them distinct.
+ * <p>
+ * Note that the results are held in memory
+ *  
+ * @author jtaylor
+ * @since 1.2
+ */
+public class DistinctAggregatingResultIterator implements AggregatingResultIterator {
+    private final AggregatingResultIterator delegate;
+    private final RowProjector rowProjector;
+    private Iterator<ResultEntry> resultIterator;
+    private final ImmutableBytesWritable ptr1 = new ImmutableBytesWritable();
+    private final ImmutableBytesWritable ptr2 = new ImmutableBytesWritable();
+
+    private class ResultEntry {
+        private final int hashCode;
+        private final Tuple result;
+
+        ResultEntry(Tuple result) {
+            final int prime = 31;
+            this.result = result;
+            int hashCode = 0;
+            for (ColumnProjector column : rowProjector.getColumnProjectors()) {
+                Expression e = column.getExpression();
+                if (e.evaluate(this.result, ptr1)) {
+                    hashCode = prime * hashCode + ptr1.hashCode();
+                }
+            }
+            this.hashCode = hashCode;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (o == this) {
+                return true;
+            }
+            if (o == null) {
+                return false;
+            }
+            if (o.getClass() != this.getClass()) {
+                return false;
+            }
+            ResultEntry that = (ResultEntry) o;
+            for (ColumnProjector column : rowProjector.getColumnProjectors()) {
+                Expression e = column.getExpression();
+                boolean isNull1 = !e.evaluate(this.result, ptr1);
+                boolean isNull2 = !e.evaluate(that.result, ptr2);
+                if (isNull1 && isNull2) {
+                    return true;
+                }
+                if (isNull1 || isNull2) {
+                    return false;
+                }
+                if (ptr1.compareTo(ptr2) != 0) {
+                    return false;
+                }
+            }
+            return true;
+        }
+        
+        @Override
+        public int hashCode() {
+            return hashCode;
+        }
+        
+        Tuple getResult() {
+            return result;
+        }
+    }
+    
+    protected ResultIterator getDelegate() {
+        return delegate;
+    }
+    
+    public DistinctAggregatingResultIterator(AggregatingResultIterator delegate,
+            RowProjector rowProjector) {
+        this.delegate = delegate;
+        this.rowProjector = rowProjector;
+    }
+
+    @Override
+    public Tuple next() throws SQLException {
+        Iterator<ResultEntry> iterator = getResultIterator();
+        if (iterator.hasNext()) {
+            ResultEntry entry = iterator.next();
+            Tuple tuple = entry.getResult();
+            aggregate(tuple);
+            return tuple;
+        }
+        resultIterator = Iterators.emptyIterator();
+        return null;
+    }
+    
+    private Iterator<ResultEntry> getResultIterator() throws SQLException {
+        if (resultIterator != null) {
+            return resultIterator;
+        }
+        
+        Set<ResultEntry> entries = Sets.<ResultEntry>newHashSet(); // TODO: size?
+        try {
+            for (Tuple result = delegate.next(); result != null; result = delegate.next()) {
+                ResultEntry entry = new ResultEntry(result);
+                entries.add(entry);
+            }
+        } finally {
+            delegate.close();
+        }
+        
+        resultIterator = entries.iterator();
+        return resultIterator;
+    }
+
+    @Override
+    public void close()  {
+        resultIterator = Iterators.emptyIterator();
+    }
+
+
+    @Override
+    public void explain(List<String> planSteps) {
+        delegate.explain(planSteps);
+        planSteps.add("CLIENT DISTINCT ON " + rowProjector.toString());
+    }
+
+    @Override
+    public void aggregate(Tuple result) {
+        delegate.aggregate(result);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
new file mode 100644
index 0000000..f32b377
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
@@ -0,0 +1,271 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.text.Format;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
+import org.apache.hadoop.hbase.filter.PageFilter;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import com.google.common.collect.Iterators;
+import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
+import org.apache.phoenix.compile.ScanRanges;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.query.KeyRange.Bound;
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.RowKeySchema;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.util.StringUtil;
+
+
+public abstract class ExplainTable {
+    private static final List<KeyRange> EVERYTHING = Collections.singletonList(KeyRange.EVERYTHING_RANGE);
+    protected final StatementContext context;
+    protected final TableRef tableRef;
+    protected final GroupBy groupBy;
+   
+    public ExplainTable(StatementContext context, TableRef table) {
+        this(context,table,GroupBy.EMPTY_GROUP_BY);
+    }
+
+    public ExplainTable(StatementContext context, TableRef table, GroupBy groupBy) {
+        this.context = context;
+        this.tableRef = table;
+        this.groupBy = groupBy;
+    }
+
+    private boolean explainSkipScan(StringBuilder buf) {
+        ScanRanges scanRanges = context.getScanRanges();
+        if (scanRanges.useSkipScanFilter()) {
+            buf.append("SKIP SCAN ");
+            int count = 1;
+            boolean hasRanges = false;
+            for (List<KeyRange> ranges : scanRanges.getRanges()) {
+                count *= ranges.size();
+                for (KeyRange range : ranges) {
+                    hasRanges |= !range.isSingleKey();
+                }
+            }
+            buf.append("ON ");
+            buf.append(count);
+            buf.append(hasRanges ? " RANGE" : " KEY");
+            buf.append(count > 1 ? "S " : " ");
+            return true;
+        } else {
+            buf.append("RANGE SCAN ");
+        }
+        return false;
+    }
+    
+    protected void explain(String prefix, List<String> planSteps) {
+        StringBuilder buf = new StringBuilder(prefix);
+        ScanRanges scanRanges = context.getScanRanges();
+        boolean hasSkipScanFilter = false;
+        if (scanRanges.isEverything()) {
+            buf.append("FULL SCAN ");
+        } else {
+            hasSkipScanFilter = explainSkipScan(buf);
+        }
+        buf.append("OVER " + tableRef.getTable().getName().getString());
+        appendKeyRanges(buf);
+        planSteps.add(buf.toString());
+        
+        Scan scan = context.getScan();
+        Filter filter = scan.getFilter();
+        PageFilter pageFilter = null;
+        if (filter != null) {
+            int offset = 0;
+            boolean hasFirstKeyOnlyFilter = false;
+            String filterDesc = "";
+            if (hasSkipScanFilter) {
+                if (filter instanceof FilterList) {
+                    List<Filter> filterList = ((FilterList) filter).getFilters();
+                    if (filterList.get(0) instanceof FirstKeyOnlyFilter) {
+                        hasFirstKeyOnlyFilter = true;
+                        offset = 1;
+                    }
+                    if (filterList.size() > offset+1) {
+                        filterDesc = filterList.get(offset+1).toString();
+                        if (filterList.size() > offset+2) {
+                            pageFilter = (PageFilter) filterList.get(offset+2);
+                        }
+                    }
+                }
+            } else if (filter instanceof FilterList) {
+                List<Filter> filterList = ((FilterList) filter).getFilters();
+                if (filterList.get(0) instanceof FirstKeyOnlyFilter) {
+                    hasFirstKeyOnlyFilter = true;
+                    offset = 1;
+                }
+                if (filterList.size() > offset) {
+                    filterDesc = filterList.get(offset).toString();
+                    if (filterList.size() > offset+1) {
+                        pageFilter = (PageFilter) filterList.get(offset+1);
+                    }
+                }
+            } else {
+                if (filter instanceof FirstKeyOnlyFilter) {
+                    hasFirstKeyOnlyFilter = true;
+                } else {
+                    filterDesc = filter.toString();
+                }
+            }
+            if (filterDesc.length() > 0) {
+                planSteps.add("    SERVER FILTER BY " + (hasFirstKeyOnlyFilter ? "FIRST KEY ONLY AND " : "") + filterDesc);
+            } else if (hasFirstKeyOnlyFilter) {
+                planSteps.add("    SERVER FILTER BY FIRST KEY ONLY");
+            }
+            if (pageFilter != null) {
+                planSteps.add("    SERVER " + pageFilter.getPageSize() + " ROW LIMIT");
+            }
+        }
+        groupBy.explain(planSteps);
+    }
+
+    private void appendPKColumnValue(StringBuilder buf, byte[] range, Boolean isNull, int slotIndex) {
+        if (Boolean.TRUE.equals(isNull)) {
+            buf.append("null");
+            return;
+        }
+        if (Boolean.FALSE.equals(isNull)) {
+            buf.append("not null");
+            return;
+        }
+        if (range.length == 0) {
+            buf.append('*');
+            return;
+        }
+        ScanRanges scanRanges = context.getScanRanges();
+        PDataType type = scanRanges.getSchema().getField(slotIndex).getDataType();
+        ColumnModifier modifier = tableRef.getTable().getPKColumns().get(slotIndex).getColumnModifier();
+        if (modifier != null) {
+            buf.append('~');
+            range = modifier.apply(range, 0, new byte[range.length], 0, range.length);
+        }
+        Format formatter = context.getConnection().getFormatter(type);
+        buf.append(type.toStringLiteral(range, formatter));
+    }
+    
+    private static class RowKeyValueIterator implements Iterator<byte[]> {
+        private final RowKeySchema schema;
+        private ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+        private int position = 0;
+        private final int maxOffset;
+        private byte[] nextValue;
+       
+        public RowKeyValueIterator(RowKeySchema schema, byte[] rowKey) {
+            this.schema = schema;
+            this.maxOffset = schema.iterator(rowKey, ptr);
+            iterate();
+        }
+        
+        private void iterate() {
+            if (schema.next(ptr, position++, maxOffset) == null) {
+                nextValue = null;
+            } else {
+                nextValue = ptr.copyBytes();
+            }
+        }
+        
+        @Override
+        public boolean hasNext() {
+            return nextValue != null;
+        }
+
+        @Override
+        public byte[] next() {
+            if (nextValue == null) {
+                throw new NoSuchElementException();
+            }
+            byte[] value = nextValue;
+            iterate();
+            return value;
+        }
+
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException();
+        }
+        
+    }
+    
+    private void appendScanRow(StringBuilder buf, Bound bound) {
+        ScanRanges scanRanges = context.getScanRanges();
+        KeyRange minMaxRange = context.getMinMaxRange();
+        Iterator<byte[]> minMaxIterator = Iterators.emptyIterator();
+        if (minMaxRange != null) {
+            RowKeySchema schema = tableRef.getTable().getRowKeySchema();
+            if (!minMaxRange.isUnbound(bound)) {
+                minMaxIterator = new RowKeyValueIterator(schema, minMaxRange.getRange(bound));
+            }
+        }
+        int nRanges = scanRanges.getRanges().size();
+        for (int i = 0, minPos = 0; minPos < nRanges || minMaxIterator.hasNext(); i++) {
+            List<KeyRange> ranges = minPos >= nRanges ? EVERYTHING :  scanRanges.getRanges().get(minPos++);
+            KeyRange range = bound == Bound.LOWER ? ranges.get(0) : ranges.get(ranges.size()-1);
+            byte[] b = range.getRange(bound);
+            Boolean isNull = KeyRange.IS_NULL_RANGE == range ? Boolean.TRUE : KeyRange.IS_NOT_NULL_RANGE == range ? Boolean.FALSE : null;
+            if (minMaxIterator.hasNext()) {
+                byte[] bMinMax = minMaxIterator.next();
+                int cmp = Bytes.compareTo(bMinMax, b) * (bound == Bound.LOWER ? 1 : -1);
+                if (cmp > 0) {
+                    minPos = nRanges;
+                    b = bMinMax;
+                    isNull = null;
+                } else if (cmp < 0) {
+                    minMaxIterator = Iterators.emptyIterator();
+                }
+            }
+            appendPKColumnValue(buf, b, isNull, i);
+            buf.append(',');
+        }
+    }
+    
+    private void appendKeyRanges(StringBuilder buf) {
+        ScanRanges scanRanges = context.getScanRanges();
+        KeyRange minMaxRange = context.getMinMaxRange();
+        if (minMaxRange == null && (scanRanges == ScanRanges.EVERYTHING || scanRanges == ScanRanges.NOTHING)) {
+            return;
+        }
+        buf.append(" [");
+        StringBuilder buf1 = new StringBuilder();
+        appendScanRow(buf1, Bound.LOWER);
+        buf.append(buf1);
+        buf.setCharAt(buf.length()-1, ']');
+        StringBuilder buf2 = new StringBuilder();
+        appendScanRow(buf2, Bound.UPPER);
+        if (!StringUtil.equals(buf1, buf2)) {
+            buf.append( " - [");
+            buf.append(buf2);
+        }
+        buf.setCharAt(buf.length()-1, ']');
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/iterate/FilterAggregatingResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/FilterAggregatingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/FilterAggregatingResultIterator.java
new file mode 100644
index 0000000..d687480
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/FilterAggregatingResultIterator.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+
+/**
+ * 
+ * Post aggregation filter for HAVING clause. Due to the way we cache aggregation values
+ * we cannot have a look ahead for this Iterator, because the expressions in the SELECT
+ * clause would return values for the peeked row instead of the current row. If we only
+ * use the Result argument in {@link org.apache.phoenix.expression.Expression}
+ * instead of our cached value in Aggregators, we could have a look ahead.
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class FilterAggregatingResultIterator  implements AggregatingResultIterator {
+    private final AggregatingResultIterator delegate;
+    private final Expression expression;
+    private final ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+    
+    public FilterAggregatingResultIterator(AggregatingResultIterator delegate, Expression expression) {
+        this.delegate = delegate;
+        this.expression = expression;
+        if (expression.getDataType() != PDataType.BOOLEAN) {
+            throw new IllegalArgumentException("FilterResultIterator requires a boolean expression, but got " + expression);
+        }
+    }
+
+    @Override
+    public Tuple next() throws SQLException {
+        Tuple next;
+        do {
+            next = delegate.next();
+        } while (next != null && expression.evaluate(next, ptr) && Boolean.FALSE.equals(expression.getDataType().toObject(ptr)));
+        return next;
+    }
+
+    @Override
+    public void close() throws SQLException {
+        delegate.close();
+    }
+
+    @Override
+    public void aggregate(Tuple result) {
+        delegate.aggregate(result);
+    }
+
+    @Override
+    public void explain(List<String> planSteps) {
+        delegate.explain(planSteps);
+        planSteps.add("CLIENT FILTER BY " + expression.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/iterate/FilterResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/FilterResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/FilterResultIterator.java
new file mode 100644
index 0000000..5518643
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/FilterResultIterator.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+
+/**
+ * 
+ * Result scanner that filters out rows based on the results of a boolean
+ * expression (i.e. filters out if {@link org.apache.phoenix.expression.Expression#evaluate(Tuple, ImmutableBytesWritable)}
+ * returns false or the ptr contains a FALSE value}). May not be used where
+ * the delegate provided is an {@link org.apache.phoenix.iterate.AggregatingResultIterator}.
+ * For these, the {@link org.apache.phoenix.iterate.FilterAggregatingResultIterator} should be used.
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class FilterResultIterator  extends LookAheadResultIterator {
+    private final ResultIterator delegate;
+    private final Expression expression;
+    private final ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+    
+    public FilterResultIterator(ResultIterator delegate, Expression expression) {
+        if (delegate instanceof AggregatingResultIterator) {
+            throw new IllegalArgumentException("FilterResultScanner may not be used with an aggregate delegate. Use phoenix.iterate.FilterAggregateResultScanner instead");
+        }
+        this.delegate = delegate;
+        this.expression = expression;
+        if (expression.getDataType() != PDataType.BOOLEAN) {
+            throw new IllegalArgumentException("FilterResultIterator requires a boolean expression, but got " + expression);
+        }
+    }
+
+    @Override
+    protected Tuple advance() throws SQLException {
+        Tuple next;
+        do {
+            next = delegate.next();
+        } while (next != null && expression.evaluate(next, ptr) && Boolean.FALSE.equals(expression.getDataType().toObject(ptr)));
+        return next;
+    }
+    
+    @Override
+    public void close() throws SQLException {
+        delegate.close();
+    }
+
+    @Override
+    public void explain(List<String> planSteps) {
+        delegate.explain(planSteps);
+        planSteps.add("CLIENT FILTER BY " + expression.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/iterate/GroupedAggregatingResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/GroupedAggregatingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/GroupedAggregatingResultIterator.java
new file mode 100644
index 0000000..7985699
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/GroupedAggregatingResultIterator.java
@@ -0,0 +1,106 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import static org.apache.phoenix.query.QueryConstants.*;
+
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.expression.aggregator.Aggregator;
+import org.apache.phoenix.expression.aggregator.Aggregators;
+import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.TupleUtil;
+
+
+
+/**
+ * 
+ * Result scanner that aggregates the row count value for rows with duplicate keys.
+ * The rows from the backing result iterator must be in key sorted order.  For example,
+ * given the following input:
+ *   a  1
+ *   a  2
+ *   b  1
+ *   b  3
+ *   c  1
+ * the following will be output:
+ *   a  3
+ *   b  4
+ *   c  1
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class GroupedAggregatingResultIterator implements AggregatingResultIterator {
+    private final ImmutableBytesWritable tempPtr = new ImmutableBytesWritable();
+    private final PeekingResultIterator resultIterator;
+    protected final Aggregators aggregators;
+    
+    public GroupedAggregatingResultIterator( PeekingResultIterator resultIterator, Aggregators aggregators) {
+        if (resultIterator == null) throw new NullPointerException();
+        if (aggregators == null) throw new NullPointerException();
+        this.resultIterator = resultIterator;
+        this.aggregators = aggregators;
+    }
+    
+    @Override
+    public Tuple next() throws SQLException {
+        Tuple result = resultIterator.next();
+        if (result == null) {
+            return null;
+        }
+        Aggregator[] rowAggregators = aggregators.getAggregators();
+        aggregators.reset(rowAggregators);
+        while (true) {
+            aggregators.aggregate(rowAggregators, result);
+            Tuple nextResult = resultIterator.peek();
+            if (nextResult == null || !TupleUtil.equals(result, nextResult, tempPtr)) {
+                break;
+            }
+            result = resultIterator.next();
+        }
+        
+        byte[] value = aggregators.toBytes(rowAggregators);
+        result.getKey(tempPtr);
+        return new SingleKeyValueTuple(KeyValueUtil.newKeyValue(tempPtr, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length));
+    }
+    
+    @Override
+    public void close() throws SQLException {
+        resultIterator.close();
+    }
+    
+    @Override
+    public void aggregate(Tuple result) {
+        Aggregator[] rowAggregators = aggregators.getAggregators();
+        aggregators.reset(rowAggregators);
+        aggregators.aggregate(rowAggregators, result);
+    }
+
+    @Override
+    public void explain(List<String> planSteps) {
+        resultIterator.explain(planSteps);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/iterate/LimitingResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/LimitingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LimitingResultIterator.java
new file mode 100644
index 0000000..ecea92d
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LimitingResultIterator.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.phoenix.schema.tuple.Tuple;
+
+/**
+ * 
+ * Iterates through tuples up to a limit
+ *
+ * @author jtaylor
+ * @since 1.2
+ */
+public class LimitingResultIterator extends DelegateResultIterator {
+    private int rowCount;
+    private final int limit;
+    
+    public LimitingResultIterator(ResultIterator delegate, int limit) {
+        super(delegate);
+        this.limit = limit;
+    }
+
+    @Override
+    public Tuple next() throws SQLException {
+        if (rowCount++ >= limit) {
+            return null;
+        }
+        return super.next();
+    }
+
+    @Override
+    public void explain(List<String> planSteps) {
+        super.explain(planSteps);
+        planSteps.add("CLIENT " + limit + " ROW LIMIT");
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java
new file mode 100644
index 0000000..0e4b78b
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.sql.SQLException;
+
+import org.apache.phoenix.schema.tuple.ResultTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+
+abstract public class LookAheadResultIterator implements PeekingResultIterator {
+    private final static Tuple UNINITIALIZED = new ResultTuple();
+    private Tuple next = UNINITIALIZED;
+    
+    abstract protected Tuple advance() throws SQLException;
+    
+    private void init() throws SQLException {
+        if (next == UNINITIALIZED) {
+            next = advance();
+        }
+    }
+    
+    @Override
+    public Tuple next() throws SQLException {
+        init();
+        Tuple next = this.next;
+        this.next = advance();
+        return next;
+    }
+    
+    @Override
+    public Tuple peek() throws SQLException {
+        init();
+        return next;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferSortedQueue.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferSortedQueue.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferSortedQueue.java
new file mode 100644
index 0000000..1606ae6
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferSortedQueue.java
@@ -0,0 +1,389 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileChannel.MapMode;
+import java.util.AbstractQueue;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import com.google.common.collect.MinMaxPriorityQueue;
+import org.apache.phoenix.iterate.OrderedResultIterator.ResultEntry;
+import org.apache.phoenix.schema.tuple.ResultTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+public class MappedByteBufferSortedQueue extends AbstractQueue<ResultEntry> {
+    private Comparator<ResultEntry> comparator;
+    private final int limit;
+    private final int thresholdBytes;
+    private List<MappedByteBufferPriorityQueue> queues = new ArrayList<MappedByteBufferPriorityQueue>();
+    private MappedByteBufferPriorityQueue currentQueue = null;
+    private int currentIndex = 0;
+    MinMaxPriorityQueue<IndexedResultEntry> mergedQueue = null;
+
+    public MappedByteBufferSortedQueue(Comparator<ResultEntry> comparator,
+            Integer limit, int thresholdBytes) throws IOException {
+        this.comparator = comparator;
+        this.limit = limit == null ? -1 : limit;
+        this.thresholdBytes = thresholdBytes;
+        this.currentQueue = new MappedByteBufferPriorityQueue(0,
+                this.limit, thresholdBytes, comparator);
+        this.queues.add(currentQueue);
+    }
+
+    @Override
+    public boolean offer(ResultEntry e) {
+        try {
+            boolean isFlush = this.currentQueue.writeResult(e);
+            if (isFlush) {
+                currentIndex++;
+                currentQueue = new MappedByteBufferPriorityQueue(currentIndex,
+                        limit, thresholdBytes, comparator);
+                queues.add(currentQueue);
+            }
+        } catch (IOException ioe) {
+            throw new RuntimeException(ioe);
+        }
+
+        return true;
+    }
+
+    @Override
+    public ResultEntry poll() {
+        if (mergedQueue == null) {
+            mergedQueue = MinMaxPriorityQueue.<ResultEntry> orderedBy(
+                    comparator).maximumSize(queues.size()).create();
+            for (MappedByteBufferPriorityQueue queue : queues) {
+                try {
+                    IndexedResultEntry next = queue.getNextResult();
+                    if (next != null) {
+                        mergedQueue.add(next);
+                    }
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+        if (!mergedQueue.isEmpty()) {
+            IndexedResultEntry re = mergedQueue.pollFirst();
+            if (re != null) {
+                IndexedResultEntry next = null;
+                try {
+                    next = queues.get(re.getIndex()).getNextResult();
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+                if (next != null) {
+                    mergedQueue.add(next);
+                }
+                return re;
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public ResultEntry peek() {
+        if (mergedQueue == null) {
+            mergedQueue = MinMaxPriorityQueue.<ResultEntry> orderedBy(
+                    comparator).maximumSize(queues.size()).create();
+            for (MappedByteBufferPriorityQueue queue : queues) {
+                try {
+                    IndexedResultEntry next = queue.getNextResult();
+                    if (next != null) {
+                        mergedQueue.add(next);
+                    }
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+        if (!mergedQueue.isEmpty()) {
+            IndexedResultEntry re = mergedQueue.peekFirst();
+            if (re != null) {
+                return re;
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public Iterator<ResultEntry> iterator() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public int size() {
+        int size = 0;
+        for (MappedByteBufferPriorityQueue queue : queues) {
+            size += queue.size();
+        }
+        return size;
+    }
+    
+    public long getByteSize() {
+        return currentQueue.getInMemByteSize();
+    }
+
+    public void close() {
+        if (queues != null) {
+            for (MappedByteBufferPriorityQueue queue : queues) {
+                queue.close();
+            }
+        }
+    }
+
+    private static class IndexedResultEntry extends ResultEntry {
+        private int index;
+
+        public IndexedResultEntry(int index, ResultEntry resultEntry) {
+            super(resultEntry.sortKeys, resultEntry.result);
+            this.index = index;
+        }
+
+        public int getIndex() {
+            return this.index;
+        }
+    }
+
+    private static class MappedByteBufferPriorityQueue {
+        private static final long DEFAULT_MAPPING_SIZE = 1024;
+        
+        private final int limit;
+        private final int thresholdBytes;
+        private long totalResultSize = 0;
+        private int maxResultSize = 0;
+        private long mappingSize = 0;
+        private long writeIndex = 0;
+        private long readIndex = 0;
+        private MappedByteBuffer writeBuffer;
+        private MappedByteBuffer readBuffer;
+        private FileChannel fc;
+        private RandomAccessFile af;
+        private File file;
+        private boolean isClosed = false;
+        MinMaxPriorityQueue<ResultEntry> results = null;
+        private boolean flushBuffer = false;
+        private int index;
+        private int flushedCount;
+
+        public MappedByteBufferPriorityQueue(int index, int limit, int thresholdBytes,
+                Comparator<ResultEntry> comparator) throws IOException {
+            this.index = index;
+            this.limit = limit;
+            this.thresholdBytes = thresholdBytes;
+            results = limit < 0 ? 
+                    MinMaxPriorityQueue.<ResultEntry> orderedBy(comparator).create()
+                  : MinMaxPriorityQueue.<ResultEntry> orderedBy(comparator).maximumSize(limit).create();
+        }
+        
+        public int size() {
+            if (flushBuffer)
+                return flushedCount;
+            return results.size();
+        }
+        
+        public long getInMemByteSize() {
+            if (flushBuffer)
+                return 0;
+            return totalResultSize;
+        }
+
+        private List<KeyValue> toKeyValues(ResultEntry entry) {
+            Tuple result = entry.getResult();
+            int size = result.size();
+            List<KeyValue> kvs = new ArrayList<KeyValue>(size);
+            for (int i = 0; i < size; i++) {
+                kvs.add(result.getValue(i));
+            }
+            return kvs;
+        }
+
+        private int sizeof(List<KeyValue> kvs) {
+            int size = Bytes.SIZEOF_INT; // totalLen
+
+            for (KeyValue kv : kvs) {
+                size += kv.getLength();
+                size += Bytes.SIZEOF_INT; // kv.getLength
+            }
+
+            return size;
+        }
+
+        private int sizeof(ImmutableBytesWritable[] sortKeys) {
+            int size = Bytes.SIZEOF_INT;
+            if (sortKeys != null) {
+                for (ImmutableBytesWritable sortKey : sortKeys) {
+                    if (sortKey != null) {
+                        size += sortKey.getLength();
+                    }
+                    size += Bytes.SIZEOF_INT;
+                }
+            }
+            return size;
+        }
+
+        public boolean writeResult(ResultEntry entry) throws IOException {
+            if (flushBuffer)
+                throw new IOException("Results already flushed");
+            
+            int sortKeySize = sizeof(entry.sortKeys);
+            int resultSize = sizeof(toKeyValues(entry)) + sortKeySize;
+            boolean added = results.add(entry);
+            if (added) {
+                maxResultSize = Math.max(maxResultSize, resultSize);
+                totalResultSize = limit < 0 ? (totalResultSize + resultSize) : maxResultSize * results.size();
+                if (totalResultSize >= thresholdBytes) {
+                    this.file = File.createTempFile(UUID.randomUUID().toString(), null);
+                    this.af = new RandomAccessFile(file, "rw");
+                    this.fc = af.getChannel();
+                    mappingSize = Math.min(Math.max(maxResultSize, DEFAULT_MAPPING_SIZE), totalResultSize);
+                    writeBuffer = fc.map(MapMode.READ_WRITE, writeIndex, mappingSize);
+                
+                    int resSize = results.size();
+                    for (int i = 0; i < resSize; i++) {                
+                        int totalLen = 0;
+                        ResultEntry re = results.pollFirst();
+                        List<KeyValue> keyValues = toKeyValues(re);
+                        for (KeyValue kv : keyValues) {
+                            totalLen += (kv.getLength() + Bytes.SIZEOF_INT);
+                        }
+                        writeBuffer.putInt(totalLen);
+                        for (KeyValue kv : keyValues) {
+                            writeBuffer.putInt(kv.getLength());
+                            writeBuffer.put(kv.getBuffer(), kv.getOffset(), kv
+                                    .getLength());
+                        }
+                        ImmutableBytesWritable[] sortKeys = re.sortKeys;
+                        writeBuffer.putInt(sortKeys.length);
+                        for (ImmutableBytesWritable sortKey : sortKeys) {
+                            if (sortKey != null) {
+                                writeBuffer.putInt(sortKey.getLength());
+                                writeBuffer.put(sortKey.get(), sortKey.getOffset(),
+                                        sortKey.getLength());
+                            } else {
+                                writeBuffer.putInt(0);
+                            }
+                        }
+                        // buffer close to exhausted, re-map.
+                        if (mappingSize - writeBuffer.position() < maxResultSize) {
+                            writeIndex += writeBuffer.position();
+                            writeBuffer = fc.map(MapMode.READ_WRITE, writeIndex, mappingSize);
+                        }
+                    }
+                    writeBuffer.putInt(-1); // end
+                    flushedCount = results.size();
+                    results.clear();
+                    flushBuffer = true;
+                }
+            }
+            return flushBuffer;
+        }
+
+        public IndexedResultEntry getNextResult() throws IOException {
+            if (isClosed)
+                return null;
+            
+            if (!flushBuffer) {
+                ResultEntry re = results.poll();
+                if (re == null) {
+                    reachedEnd();
+                    return null;
+                }
+                return new IndexedResultEntry(index, re);
+            }
+            
+            if (readBuffer == null) {
+                readBuffer = this.fc.map(MapMode.READ_ONLY, readIndex, mappingSize);
+            }
+            
+            int length = readBuffer.getInt();
+            if (length < 0) {
+                reachedEnd();
+                return null;
+            }
+            
+            byte[] rb = new byte[length];
+            readBuffer.get(rb);
+            Result result = new Result(new ImmutableBytesWritable(rb));
+            ResultTuple rt = new ResultTuple(result);
+            int sortKeySize = readBuffer.getInt();
+            ImmutableBytesWritable[] sortKeys = new ImmutableBytesWritable[sortKeySize];
+            for (int i = 0; i < sortKeySize; i++) {
+                int contentLength = readBuffer.getInt();
+                if (contentLength > 0) {
+                    byte[] sortKeyContent = new byte[contentLength];
+                    readBuffer.get(sortKeyContent);
+                    sortKeys[i] = new ImmutableBytesWritable(sortKeyContent);
+                } else {
+                    sortKeys[i] = null;
+                }
+            }
+            // buffer close to exhausted, re-map.
+            if (mappingSize - readBuffer.position() < maxResultSize) {
+                readIndex += readBuffer.position();
+                readBuffer = fc.map(MapMode.READ_ONLY, readIndex, mappingSize);
+            }
+            
+            return new IndexedResultEntry(index, new ResultEntry(sortKeys, rt));
+        }
+
+        private void reachedEnd() {
+            this.isClosed = true;
+            if (this.fc != null) {
+                try {
+                    this.fc.close();
+                } catch (IOException ignored) {
+                }
+                this.fc = null;
+            }
+            if (this.af != null) {
+                try {
+                    this.af.close();
+                } catch (IOException ignored) {
+                }
+                this.af = null;
+            }
+            if (this.file != null) {
+                file.delete();
+                file = null;
+            }
+        }
+
+        public void close() {
+            if (!isClosed) {
+                this.reachedEnd();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/iterate/MaterializedResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MaterializedResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MaterializedResultIterator.java
new file mode 100644
index 0000000..c509b2c
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MaterializedResultIterator.java
@@ -0,0 +1,112 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.sql.SQLException;
+import java.util.*;
+
+import org.apache.phoenix.schema.tuple.Tuple;
+
+
+/**
+ * 
+ * Fully materialized result iterator backed by the result list provided.
+ * No copy is made of the backing results collection.
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class MaterializedResultIterator implements PeekingResultIterator {
+    private final PeekingCollectionIterator iterator;
+    
+    public MaterializedResultIterator(Collection<Tuple> results) {
+        iterator = new PeekingCollectionIterator(results);
+    }
+    
+    @Override
+    public void close() {
+    }
+
+    @Override
+    public Tuple next() throws SQLException {
+        return iterator.nextOrNull();
+    }
+
+    @Override
+    public Tuple peek() throws SQLException {
+        return iterator.peek();
+    }
+
+    private static class PeekingCollectionIterator implements Iterator<Tuple> {
+        private final Iterator<Tuple> iterator;
+        private Tuple current;            
+        
+        private PeekingCollectionIterator(Collection<Tuple> results) {
+            iterator = results.iterator();
+            advance();
+        }
+        
+        private Tuple advance() {
+            if (iterator.hasNext()) {
+                current = iterator.next();
+            } else {
+                current = null;
+            }
+            return current;
+        }
+        
+        @Override
+        public boolean hasNext() {
+            return current != null;
+        }
+
+        @Override
+        public Tuple next() {
+            Tuple next = nextOrNull();
+            if (next == null) {
+                throw new NoSuchElementException();
+            }
+            return next;
+        }
+
+        public Tuple nextOrNull() {
+            if (current == null) {
+                return null;
+            }
+            Tuple next = current;
+            advance();
+            return next;
+        }
+
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException();
+        }
+        
+        public Tuple peek() {
+            return current;
+        }
+
+    }
+
+    @Override
+    public void explain(List<String> planSteps) {
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/iterate/MergeSortResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MergeSortResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MergeSortResultIterator.java
new file mode 100644
index 0000000..58b7e40
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MergeSortResultIterator.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.SQLCloseables;
+
+
+/**
+ * 
+ * Base class for a ResultIterator that does a merge sort on the list of iterators
+ * provided.
+ *
+ * @author jtaylor
+ * @since 1.2
+ */
+public abstract class MergeSortResultIterator implements PeekingResultIterator {
+    protected final ResultIterators resultIterators;
+    protected final ImmutableBytesWritable tempPtr = new ImmutableBytesWritable();
+    private List<PeekingResultIterator> iterators;
+    
+    public MergeSortResultIterator(ResultIterators iterators) {
+        this.resultIterators = iterators;
+    }
+    
+    private List<PeekingResultIterator> getIterators() throws SQLException {
+        if (iterators == null) {
+            iterators = resultIterators.getIterators();
+        }
+        return iterators;
+    }
+    
+    @Override
+    public void close() throws SQLException {
+        if (iterators != null) {
+            SQLCloseables.closeAll(iterators);
+        }
+    }
+
+    abstract protected int compare(Tuple t1, Tuple t2);
+    
+    private PeekingResultIterator minIterator() throws SQLException {
+        List<PeekingResultIterator> iterators = getIterators();
+        Tuple minResult = null;
+        PeekingResultIterator minIterator = EMPTY_ITERATOR;
+        for (int i = iterators.size()-1; i >= 0; i--) {
+            PeekingResultIterator iterator = iterators.get(i);
+            Tuple r = iterator.peek();
+            if (r != null) {
+                if (minResult == null || compare(r, minResult) < 0) {
+                    minResult = r;
+                    minIterator = iterator;
+                }
+                continue;
+            }
+            iterator.close();
+            iterators.remove(i);
+        }
+        return minIterator;
+    }
+    
+    @Override
+    public Tuple peek() throws SQLException {
+        PeekingResultIterator iterator = minIterator();
+        return iterator.peek();
+    }
+
+    @Override
+    public Tuple next() throws SQLException {
+        PeekingResultIterator iterator = minIterator();
+        return iterator.next();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/iterate/MergeSortRowKeyResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MergeSortRowKeyResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MergeSortRowKeyResultIterator.java
new file mode 100644
index 0000000..a411bab
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MergeSortRowKeyResultIterator.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.util.List;
+
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.TupleUtil;
+
+
+/**
+ * 
+ * ResultIterator that does a merge sort on the list of iterators provided,
+ * returning the rows in row key ascending order. The iterators provided
+ * must be in row key ascending order.
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class MergeSortRowKeyResultIterator extends MergeSortResultIterator {
+    private final int keyOffset;
+    private final int factor;
+    
+    public MergeSortRowKeyResultIterator(ResultIterators iterators) {
+        this(iterators, 0, false);
+    }
+    
+    public MergeSortRowKeyResultIterator(ResultIterators iterators, int keyOffset, boolean isReverse) {
+        super(iterators);
+        this.keyOffset = keyOffset;
+        this.factor = isReverse ? -1 : 1;
+    }
+   
+    @Override
+    protected int compare(Tuple t1, Tuple t2) {
+        return factor * TupleUtil.compare(t1, t2, tempPtr, keyOffset);
+    }
+
+    @Override
+    public void explain(List<String> planSteps) {
+        resultIterators.explain(planSteps);
+        planSteps.add("CLIENT MERGE SORT");
+    }
+}
\ No newline at end of file