You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2016/05/13 00:05:38 UTC

[1/3] phoenix git commit: PHOENIX-2818 Optimize ORDERED group by

Repository: phoenix
Updated Branches:
  refs/heads/master d97eb4967 -> ae14e38cc


PHOENIX-2818 Optimize ORDERED group by


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/d414505d
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/d414505d
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/d414505d

Branch: refs/heads/master
Commit: d414505df8afeba553734db688547c6a4e9e90d9
Parents: d97eb49
Author: James Taylor <ja...@apache.org>
Authored: Thu May 12 14:29:29 2016 -0700
Committer: James Taylor <ja...@apache.org>
Committed: Thu May 12 14:29:29 2016 -0700

----------------------------------------------------------------------
 .../apache/phoenix/end2end/GroupByCaseIT.java   |  92 +++++++++
 .../apache/phoenix/execute/AggregatePlan.java   |   3 +-
 .../iterate/AggregatingResultIterator.java      |   4 +-
 .../BaseGroupedAggregatingResultIterator.java   |   3 +-
 .../DistinctAggregatingResultIterator.java      |  15 +-
 .../FilterAggregatingResultIterator.java        |   8 +-
 .../OrderedAggregatingResultIterator.java       |   5 +-
 .../RowKeyOrderedAggregateResultIterator.java   | 190 +++++++++++++++++++
 .../iterate/AggregateResultScannerTest.java     | 109 +----------
 .../iterate/ConcatResultIteratorTest.java       |  31 +--
 .../iterate/MaterializedResultIterators.java    |  66 +++++++
 ...owKeyOrderedAggregateResultIteratorTest.java | 149 +++++++++++++++
 .../java/org/apache/phoenix/util/TestUtil.java  |  70 +++++++
 13 files changed, 596 insertions(+), 149 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/d414505d/phoenix-core/src/it/java/org/apache/phoenix/end2end/GroupByCaseIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/GroupByCaseIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/GroupByCaseIT.java
index 44f43b7..b0524da 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/GroupByCaseIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/GroupByCaseIT.java
@@ -32,6 +32,9 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.Properties;
 
+import org.apache.phoenix.schema.types.PChar;
+import org.apache.phoenix.schema.types.PInteger;
+import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.QueryUtil;
 import org.junit.Test;
@@ -343,4 +346,93 @@ public class GroupByCaseIT extends BaseHBaseManagedTimeIT {
                 "    SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [K1]", QueryUtil.getExplainPlan(rs));
     }
     
+    @Test
+    public void testSumGroupByOrderPreservingDesc() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        PreparedStatement stmt = conn.prepareStatement("CREATE TABLE GROUP_BY_DESC (k1 char(1) not null, k2 integer not null, constraint pk primary key (k1,k2)) split on (?,?,?)");
+        stmt.setBytes(1, ByteUtil.concat(PChar.INSTANCE.toBytes("a"), PInteger.INSTANCE.toBytes(3)));
+        stmt.setBytes(2, ByteUtil.concat(PChar.INSTANCE.toBytes("j"), PInteger.INSTANCE.toBytes(3)));
+        stmt.setBytes(3, ByteUtil.concat(PChar.INSTANCE.toBytes("n"), PInteger.INSTANCE.toBytes(3)));
+        stmt.execute();
+        conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('a', 1)");
+        conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('a', 2)");
+        conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('a', 3)");
+        conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('a', 4)");
+        conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('b', 5)");
+        conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('j', 1)");
+        conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('j', 2)");
+        conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('j', 3)");
+        conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('j', 4)");
+        conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('n', 1)");
+        conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('n', 2)");
+        conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('n', 3)");
+        conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('n', 4)");
+        conn.commit();
+        String query = "SELECT k1,sum(k2) FROM GROUP_BY_DESC GROUP BY k1 ORDER BY k1 DESC";
+        ResultSet rs = conn.createStatement().executeQuery(query);
+        assertTrue(rs.next());
+        assertEquals("n", rs.getString(1));
+        assertEquals(10, rs.getInt(2));
+        assertTrue(rs.next());
+        assertEquals("j", rs.getString(1));
+        assertEquals(10, rs.getInt(2));
+        assertTrue(rs.next());
+        assertEquals("b", rs.getString(1));
+        assertEquals(5, rs.getInt(2));
+        assertTrue(rs.next());
+        assertEquals("a", rs.getString(1));
+        assertEquals(10, rs.getInt(2));
+        assertFalse(rs.next());
+        rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+        assertEquals(
+                "CLIENT PARALLEL 1-WAY REVERSE FULL SCAN OVER GROUP_BY_DESC\n" + 
+                "    SERVER FILTER BY FIRST KEY ONLY\n" + 
+                "    SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [K1]", QueryUtil.getExplainPlan(rs));
+    }
+
+    @Test
+    public void testAvgGroupByOrderPreserving() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        PreparedStatement stmt = conn.prepareStatement("CREATE TABLE GROUP_BY_DESC (k1 char(1) not null, k2 integer not null, constraint pk primary key (k1,k2)) split on (?,?,?)");
+        stmt.setBytes(1, ByteUtil.concat(PChar.INSTANCE.toBytes("a"), PInteger.INSTANCE.toBytes(3)));
+        stmt.setBytes(2, ByteUtil.concat(PChar.INSTANCE.toBytes("j"), PInteger.INSTANCE.toBytes(3)));
+        stmt.setBytes(3, ByteUtil.concat(PChar.INSTANCE.toBytes("n"), PInteger.INSTANCE.toBytes(3)));
+        stmt.execute();
+        conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('a', 1)");
+        conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('a', 2)");
+        conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('a', 3)");
+        conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('a', 6)");
+        conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('b', 5)");
+        conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('j', 1)");
+        conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('j', 2)");
+        conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('j', 3)");
+        conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('j', 10)");
+        conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('n', 1)");
+        conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('n', 2)");
+        conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('n', 3)");
+        conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('n', 2)");
+        conn.commit();
+        String query = "SELECT k1,avg(k2) FROM GROUP_BY_DESC GROUP BY k1";
+        ResultSet rs = conn.createStatement().executeQuery(query);
+        assertTrue(rs.next());
+        assertEquals("a", rs.getString(1));
+        assertEquals(3, rs.getInt(2));
+        assertTrue(rs.next());
+        assertEquals("b", rs.getString(1));
+        assertEquals(5, rs.getInt(2));
+        assertTrue(rs.next());
+        assertEquals("j", rs.getString(1));
+        assertEquals(4, rs.getInt(2));
+        assertTrue(rs.next());
+        assertEquals("n", rs.getString(1));
+        assertEquals(2, rs.getInt(2));
+        assertFalse(rs.next());
+        rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+        assertEquals(
+                "CLIENT PARALLEL 1-WAY FULL SCAN OVER GROUP_BY_DESC\n" + 
+                "    SERVER FILTER BY FIRST KEY ONLY\n" + 
+                "    SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [K1]", QueryUtil.getExplainPlan(rs));
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d414505d/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
index 94d1fc8..770cf71 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
@@ -41,6 +41,7 @@ import org.apache.phoenix.iterate.ConcatResultIterator;
 import org.apache.phoenix.iterate.DistinctAggregatingResultIterator;
 import org.apache.phoenix.iterate.FilterAggregatingResultIterator;
 import org.apache.phoenix.iterate.GroupedAggregatingResultIterator;
+import org.apache.phoenix.iterate.RowKeyOrderedAggregateResultIterator;
 import org.apache.phoenix.iterate.LimitingResultIterator;
 import org.apache.phoenix.iterate.MergeSortRowKeyResultIterator;
 import org.apache.phoenix.iterate.OffsetResultIterator;
@@ -226,7 +227,7 @@ public class AggregatePlan extends BaseQueryPlan {
             aggResultIterator = new UngroupedAggregatingResultIterator(new ConcatResultIterator(iterators), aggregators);
         // If salted or local index we still need a merge sort as we'll potentially have multiple group by keys that aren't contiguous.
         } else if (groupBy.isOrderPreserving() && !(this.getTableRef().getTable().getBucketNum() != null || this.getTableRef().getTable().getIndexType() == IndexType.LOCAL)) {
-            aggResultIterator = new GroupedAggregatingResultIterator(new ConcatResultIterator(iterators), aggregators);
+            aggResultIterator = new RowKeyOrderedAggregateResultIterator(iterators, aggregators);
         } else {
             aggResultIterator = new GroupedAggregatingResultIterator(new MergeSortRowKeyResultIterator(iterators), aggregators);            
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d414505d/phoenix-core/src/main/java/org/apache/phoenix/iterate/AggregatingResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/AggregatingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/AggregatingResultIterator.java
index abd0545..59a89ad 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/AggregatingResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/AggregatingResultIterator.java
@@ -17,6 +17,7 @@
  */
 package org.apache.phoenix.iterate;
 
+import org.apache.phoenix.expression.aggregator.Aggregator;
 import org.apache.phoenix.schema.tuple.Tuple;
 
 
@@ -33,6 +34,7 @@ public interface AggregatingResultIterator extends ResultIterator {
      * Provides a means of re-aggregating a result row. For
      * scanners that need to look ahead (i.e. {@link org.apache.phoenix.iterate.OrderedAggregatingResultIterator}
      * @param result the row to re-aggregate
+     * @return Aggregator[] results
      */
-    void aggregate(Tuple result);
+    Aggregator[] aggregate(Tuple result);
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d414505d/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseGroupedAggregatingResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseGroupedAggregatingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseGroupedAggregatingResultIterator.java
index 8fd36b3..84d29ff 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseGroupedAggregatingResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseGroupedAggregatingResultIterator.java
@@ -91,10 +91,11 @@ public abstract class BaseGroupedAggregatingResultIterator implements
     }
     
     @Override
-    public void aggregate(Tuple result) {
+    public Aggregator[] aggregate(Tuple result) {
         Aggregator[] rowAggregators = aggregators.getAggregators();
         aggregators.reset(rowAggregators);
         aggregators.aggregate(rowAggregators, result);
+        return rowAggregators;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d414505d/phoenix-core/src/main/java/org/apache/phoenix/iterate/DistinctAggregatingResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DistinctAggregatingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DistinctAggregatingResultIterator.java
index 1ba134b..cf3fb38 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DistinctAggregatingResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DistinctAggregatingResultIterator.java
@@ -18,17 +18,20 @@
 package org.apache.phoenix.iterate;
 
 import java.sql.SQLException;
-import java.util.*;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
 
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Sets;
 import org.apache.phoenix.compile.ColumnProjector;
 import org.apache.phoenix.compile.RowProjector;
 import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.aggregator.Aggregator;
 import org.apache.phoenix.schema.tuple.Tuple;
 
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Sets;
+
 /**
  * Result scanner that dedups the incoming tuples to make them distinct.
  * <p>
@@ -155,8 +158,8 @@ public class DistinctAggregatingResultIterator implements AggregatingResultItera
     }
 
     @Override
-    public void aggregate(Tuple result) {
-        delegate.aggregate(result);
+    public Aggregator[] aggregate(Tuple result) {
+        return delegate.aggregate(result);
     }
 
 	@Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d414505d/phoenix-core/src/main/java/org/apache/phoenix/iterate/FilterAggregatingResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/FilterAggregatingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/FilterAggregatingResultIterator.java
index 4fa2011..5fd2028 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/FilterAggregatingResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/FilterAggregatingResultIterator.java
@@ -21,10 +21,10 @@ import java.sql.SQLException;
 import java.util.List;
 
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-
 import org.apache.phoenix.expression.Expression;
-import org.apache.phoenix.schema.types.PBoolean;
+import org.apache.phoenix.expression.aggregator.Aggregator;
 import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.schema.types.PBoolean;
 
 
 /**
@@ -66,8 +66,8 @@ public class FilterAggregatingResultIterator  implements AggregatingResultIterat
     }
 
     @Override
-    public void aggregate(Tuple result) {
-        delegate.aggregate(result);
+    public Aggregator[] aggregate(Tuple result) {
+        return delegate.aggregate(result);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d414505d/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedAggregatingResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedAggregatingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedAggregatingResultIterator.java
index da2be48..51a7dd8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedAggregatingResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedAggregatingResultIterator.java
@@ -21,6 +21,7 @@ import java.sql.SQLException;
 import java.util.List;
 
 import org.apache.phoenix.expression.OrderByExpression;
+import org.apache.phoenix.expression.aggregator.Aggregator;
 import org.apache.phoenix.schema.tuple.Tuple;
 
 
@@ -55,7 +56,7 @@ public class OrderedAggregatingResultIterator extends OrderedResultIterator impl
     }
     
     @Override
-    public void aggregate(Tuple result) {
-        getDelegate().aggregate(result);
+    public Aggregator[] aggregate(Tuple result) {
+        return getDelegate().aggregate(result);
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d414505d/phoenix-core/src/main/java/org/apache/phoenix/iterate/RowKeyOrderedAggregateResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RowKeyOrderedAggregateResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RowKeyOrderedAggregateResultIterator.java
new file mode 100644
index 0000000..3c52e51
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RowKeyOrderedAggregateResultIterator.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.expression.aggregator.Aggregator;
+import org.apache.phoenix.expression.aggregator.Aggregators;
+import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.ServerUtil;
+
+
+/**
+ * 
+ * Client-side aggregate for key ordered aggregation. Prevents the comparison of
+ * row keys for rows returned unless we cross a scan boundary.
+ * 
+ */
+public class RowKeyOrderedAggregateResultIterator extends LookAheadResultIterator implements AggregatingResultIterator {
+    private final ResultIterators resultIterators;
+    private List<PeekingResultIterator> iterators;
+    private final Aggregators aggregators;
+    private final ImmutableBytesWritable currentKey = new ImmutableBytesWritable();
+    private final ImmutableBytesWritable previousKey = new ImmutableBytesWritable();
+    private boolean traversedIterator = true;
+    private boolean nextTraversedIterators;
+    private Tuple next;
+    
+   private int index;
+    
+    public RowKeyOrderedAggregateResultIterator(ResultIterators iterators, Aggregators aggregators) {
+        this.resultIterators = iterators;
+        this.aggregators = aggregators;
+    }
+    
+    private List<PeekingResultIterator> getIterators() throws SQLException {
+        if (iterators == null && resultIterators != null) {
+            iterators = resultIterators.getIterators();
+        }
+        return iterators;
+    }
+    
+    @Override
+    public void close() throws SQLException {
+        SQLException toThrow = null;
+        try {
+            if (resultIterators != null) {
+                resultIterators.close();
+            }
+        } catch (Exception e) {
+           toThrow = ServerUtil.parseServerException(e);
+        } finally {
+            try {
+                if (iterators != null) {
+                    for (;index < iterators.size(); index++) {
+                        PeekingResultIterator iterator = iterators.get(index);
+                        try {
+                            iterator.close();
+                        } catch (Exception e) {
+                            if (toThrow == null) {
+                                toThrow = ServerUtil.parseServerException(e);
+                            } else {
+                                toThrow.setNextException(ServerUtil.parseServerException(e));
+                            }
+                        }
+                    }
+                }
+            } finally {
+                if (toThrow != null) {
+                    throw toThrow;
+                }
+            }
+        }
+    }
+
+
+    @Override
+    public void explain(List<String> planSteps) {
+        if (resultIterators != null) {
+            resultIterators.explain(planSteps);
+        }
+    }
+
+    private Tuple nextTuple() throws SQLException {
+        List<PeekingResultIterator> iterators = getIterators();
+        while (index < iterators.size()) {
+            PeekingResultIterator iterator = iterators.get(index);
+            Tuple r = iterator.peek();
+            if (r != null) {
+                return iterator.next();
+            }
+            traversedIterator = true;
+            iterator.close();
+            index++;
+        }
+        return null;
+    }
+    
+    private boolean continueAggregating(Tuple previous, Tuple next) {
+        if (next == null) {
+            return false;
+        }
+        next.getKey(currentKey);
+        previous.getKey(previousKey);
+        return (currentKey.compareTo(previousKey) == 0);
+    }
+    
+    @Override
+    public Tuple next() throws SQLException {
+        Tuple t = super.next();
+        if (t == null) {
+            return null;
+        }
+        aggregate(t);
+        return t;
+    }
+    
+    @Override
+    protected Tuple advance() throws SQLException {
+        Tuple current = this.next;
+        boolean traversedIterators = nextTraversedIterators;
+        if (current == null) {
+            current = nextTuple();
+            traversedIterators = this.traversedIterator;
+        }
+        if (current != null) {
+            Tuple previous = current;
+            Aggregator[] rowAggregators = null;
+            while (true) {
+                current = nextTuple();
+                if (!traversedIterators || !continueAggregating(previous, current)) {
+                    break;
+                }
+                if (rowAggregators == null) {
+                    rowAggregators = aggregate(previous);
+                }
+                aggregators.aggregate(rowAggregators, current); 
+                traversedIterators = this.traversedIterator;
+            }
+            this.next = current;
+            this.nextTraversedIterators = this.traversedIterator;
+            if (rowAggregators == null) {
+                current = previous;
+            } else {
+                byte[] value = aggregators.toBytes(rowAggregators);
+                current = new SingleKeyValueTuple(KeyValueUtil.newKeyValue(previousKey, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length));
+            }
+        }
+        if (current == null) {
+            close(); // Close underlying ResultIterators to free resources sooner rather than later
+        }
+        return current;
+    }
+
+	@Override
+	public String toString() {
+		return "RowKeyOrderedAggregateResultIterator [resultIterators=" + resultIterators + ", index=" + index + "]";
+	}
+
+    @Override
+    public Aggregator[] aggregate(Tuple result) {
+        Aggregator[] rowAggregators = aggregators.getAggregators();
+        aggregators.reset(rowAggregators);
+        aggregators.aggregate(rowAggregators, result);
+        return rowAggregators;
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d414505d/phoenix-core/src/test/java/org/apache/phoenix/iterate/AggregateResultScannerTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/iterate/AggregateResultScannerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/iterate/AggregateResultScannerTest.java
index f53e871..791eb23 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/iterate/AggregateResultScannerTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/iterate/AggregateResultScannerTest.java
@@ -19,40 +19,22 @@ package org.apache.phoenix.iterate;
 
 import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
 import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
-import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY_NAME;
-import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_NAME;
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
 
-import java.sql.DriverManager;
-import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
 
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.compile.AggregationManager;
-import org.apache.phoenix.compile.SequenceManager;
-import org.apache.phoenix.compile.StatementContext;
-import org.apache.phoenix.expression.Expression;
-import org.apache.phoenix.expression.KeyValueColumnExpression;
 import org.apache.phoenix.expression.aggregator.ClientAggregators;
-import org.apache.phoenix.expression.function.SingleAggregateFunction;
-import org.apache.phoenix.expression.function.SumAggregateFunction;
-import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.query.BaseConnectionlessQueryTest;
-import org.apache.phoenix.query.KeyRange;
-import org.apache.phoenix.schema.types.PLong;
-import org.apache.phoenix.schema.PLongColumn;
-import org.apache.phoenix.schema.PName;
-import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.util.AssertResults;
 import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.TestUtil;
 import org.junit.Test;
 
 
@@ -87,90 +69,9 @@ public class AggregateResultScannerTest extends BaseConnectionlessQueryTest {
                 new SingleKeyValueTuple(new KeyValue(B, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, PLong.INSTANCE.toBytes(2L))),
             };
 
-        PhoenixConnection pconn = DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES)).unwrap(PhoenixConnection.class);
-        PhoenixStatement statement = new PhoenixStatement(pconn);
-        StatementContext context = new StatementContext(statement, null, new Scan(), new SequenceManager(statement));
-        AggregationManager aggregationManager = context.getAggregationManager();
-        SumAggregateFunction func = new SumAggregateFunction(Arrays.<Expression>asList(new KeyValueColumnExpression(new PLongColumn() {
-            @Override
-            public PName getName() {
-                return SINGLE_COLUMN_NAME;
-            }
-            @Override
-            public PName getFamilyName() {
-                return SINGLE_COLUMN_FAMILY_NAME;
-            }
-            @Override
-            public int getPosition() {
-                return 0;
-            }
-            
-            @Override
-            public SortOrder getSortOrder() {
-            	return SortOrder.getDefault();
-            }
-            
-            @Override
-            public Integer getArraySize() {
-                return 0;
-            }
-            
-            @Override
-            public byte[] getViewConstant() {
-                return null;
-            }
-            
-            @Override
-            public boolean isViewReferenced() {
-                return false;
-            }
-            
-            @Override
-            public String getExpressionStr() {
-                return null;
-            }
-            @Override
-            public boolean isRowTimestamp() {
-                return false;
-            }
-			@Override
-			public boolean isDynamic() {
-				return false;
-			}
-        })), null);
-        aggregationManager.setAggregators(new ClientAggregators(Collections.<SingleAggregateFunction>singletonList(func), 1));
-        ResultIterators iterators = new ResultIterators() {
-
-            @Override
-            public List<PeekingResultIterator> getIterators() throws SQLException {
-                return results;
-            }
-
-            @Override
-            public int size() {
-                return results.size();
-            }
-
-            @Override
-            public void explain(List<String> planSteps) {
-            }
-
-			@Override
-			public List<KeyRange> getSplits() {
-				return Collections.emptyList();
-			}
-
-			@Override
-			public List<List<Scan>> getScans() {
-				return Collections.emptyList();
-			}
-
-            @Override
-            public void close() throws SQLException {
-            }
-            
-        };
-        ResultIterator scanner = new GroupedAggregatingResultIterator(new MergeSortRowKeyResultIterator(iterators), aggregationManager.getAggregators());
+        ResultIterators iterators = new MaterializedResultIterators(results);
+        ClientAggregators aggregators = TestUtil.getSingleSumAggregator(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES));
+        ResultIterator scanner = new GroupedAggregatingResultIterator(new MergeSortRowKeyResultIterator(iterators), aggregators);
         AssertResults.assertResults(scanner, expectedResults);
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d414505d/phoenix-core/src/test/java/org/apache/phoenix/iterate/ConcatResultIteratorTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/iterate/ConcatResultIteratorTest.java b/phoenix-core/src/test/java/org/apache/phoenix/iterate/ConcatResultIteratorTest.java
index cf71724..67d5cd0 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/iterate/ConcatResultIteratorTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/iterate/ConcatResultIteratorTest.java
@@ -56,36 +56,7 @@ public class ConcatResultIteratorTest {
                 new SingleKeyValueTuple(new KeyValue(B, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(4))),
             };
         final List<PeekingResultIterator>results = Arrays.asList(new PeekingResultIterator[] {new MaterializedResultIterator(Arrays.asList(results1)), new MaterializedResultIterator(Arrays.asList(results2)), new MaterializedResultIterator(Arrays.asList(results3))});
-        ResultIterators iterators = new ResultIterators() {
-
-            @Override
-            public List<PeekingResultIterator> getIterators() throws SQLException {
-                return results;
-            }
-
-            @Override
-            public int size() {
-                return results.size();
-            }
-
-            @Override
-            public void explain(List<String> planSteps) {
-            }
-            
-			@Override
-			public List<KeyRange> getSplits() {
-				return Collections.emptyList();
-			}
-
-			@Override
-			public List<List<Scan>> getScans() {
-				return Collections.emptyList();
-			}
-
-            @Override
-            public void close() throws SQLException {
-            }
-        };
+        ResultIterators iterators = new MaterializedResultIterators(results);
 
         Tuple[] expectedResults = new Tuple[] {
                 new SingleKeyValueTuple(new KeyValue(A, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(1))),

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d414505d/phoenix-core/src/test/java/org/apache/phoenix/iterate/MaterializedResultIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/iterate/MaterializedResultIterators.java b/phoenix-core/src/test/java/org/apache/phoenix/iterate/MaterializedResultIterators.java
new file mode 100644
index 0000000..c4b0265
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/iterate/MaterializedResultIterators.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.query.KeyRange;
+
+/**
+ * 
+ * ResultIteraors implementation backed by in-memory list of PeekingResultIterator
+ *
+ */
+public class MaterializedResultIterators implements ResultIterators {
+    private final List<PeekingResultIterator> results;
+
+    public MaterializedResultIterators(List<PeekingResultIterator> results) {
+        this.results = results;
+    }
+
+    @Override
+    public List<PeekingResultIterator> getIterators() throws SQLException {
+        return results;
+    }
+
+    @Override
+    public int size() {
+        return results.size();
+    }
+
+    @Override
+    public void explain(List<String> planSteps) {
+    }
+
+    @Override
+    public List<KeyRange> getSplits() {
+    	return Collections.emptyList();
+    }
+
+    @Override
+    public List<List<Scan>> getScans() {
+    	return Collections.emptyList();
+    }
+
+    @Override
+    public void close() throws SQLException {
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d414505d/phoenix-core/src/test/java/org/apache/phoenix/iterate/RowKeyOrderedAggregateResultIteratorTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/iterate/RowKeyOrderedAggregateResultIteratorTest.java b/phoenix-core/src/test/java/org/apache/phoenix/iterate/RowKeyOrderedAggregateResultIteratorTest.java
new file mode 100644
index 0000000..347de78
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/iterate/RowKeyOrderedAggregateResultIteratorTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.expression.aggregator.ClientAggregators;
+import org.apache.phoenix.query.BaseConnectionlessQueryTest;
+import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.AssertResults;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.Test;
+
+public class RowKeyOrderedAggregateResultIteratorTest extends BaseConnectionlessQueryTest {
+    private final static byte[] A = Bytes.toBytes("a");
+    private final static byte[] B = Bytes.toBytes("b");
+    private final static byte[] C = Bytes.toBytes("c");
+    private final static byte[] D = Bytes.toBytes("d");
+
+    @Test
+    public void testNoSpan() throws Exception {
+        Tuple[] results1 = new Tuple[] {
+                new SingleKeyValueTuple(new KeyValue(A, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(1L))),
+            };
+        Tuple[] results2 = new Tuple[] {
+                new SingleKeyValueTuple(new KeyValue(B, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(2L)))
+            };
+        Tuple[] results3 = new Tuple[] {
+                new SingleKeyValueTuple(new KeyValue(C, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(3L))),
+                new SingleKeyValueTuple(new KeyValue(D, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(4L))),
+            };
+        final List<PeekingResultIterator>results = Arrays.asList(new PeekingResultIterator[] {new MaterializedResultIterator(Arrays.asList(results1)), new MaterializedResultIterator(Arrays.asList(results2)), new MaterializedResultIterator(Arrays.asList(results3))});
+        ResultIterators iterators = new MaterializedResultIterators(results);
+
+        Tuple[] expectedResults = new Tuple[] {
+                new SingleKeyValueTuple(new KeyValue(A, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(1L))),
+                new SingleKeyValueTuple(new KeyValue(B, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(2L))),
+                new SingleKeyValueTuple(new KeyValue(C, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(3L))),
+                new SingleKeyValueTuple(new KeyValue(D, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(4L))),
+            };
+
+        ClientAggregators aggregators = TestUtil.getSingleSumAggregator(getUrl(), PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES));
+        ResultIterator scanner = new RowKeyOrderedAggregateResultIterator(iterators, aggregators);
+        AssertResults.assertResults(scanner, expectedResults);
+    }
+    
+    @Test
+    public void testSpanThree() throws Exception {
+        Tuple[] results1 = new Tuple[] {
+                new SingleKeyValueTuple(new KeyValue(A, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(1L))),
+                new SingleKeyValueTuple(new KeyValue(B, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(2L)))
+            };
+        Tuple[] results2 = new Tuple[] {
+                new SingleKeyValueTuple(new KeyValue(B, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(3L)))
+            };
+        Tuple[] results3 = new Tuple[] {
+                new SingleKeyValueTuple(new KeyValue(B, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(4L))),
+                new SingleKeyValueTuple(new KeyValue(C, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(5L))),
+            };
+        final List<PeekingResultIterator>results = Arrays.asList(new PeekingResultIterator[] {new MaterializedResultIterator(Arrays.asList(results1)), new MaterializedResultIterator(Arrays.asList(results2)), new MaterializedResultIterator(Arrays.asList(results3))});
+        ResultIterators iterators = new MaterializedResultIterators(results);
+
+        Tuple[] expectedResults = new Tuple[] {
+                new SingleKeyValueTuple(new KeyValue(A, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(1L))),
+                new SingleKeyValueTuple(new KeyValue(B, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(9L))),
+                new SingleKeyValueTuple(new KeyValue(C, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(5L))),
+            };
+
+        ClientAggregators aggregators = TestUtil.getSingleSumAggregator(getUrl(), PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES));
+        ResultIterator scanner = new RowKeyOrderedAggregateResultIterator(iterators, aggregators);
+        AssertResults.assertResults(scanner, expectedResults);
+    }
+
+    @Test
+    public void testSpanAll() throws Exception {
+        Tuple[] results1 = new Tuple[] {
+                new SingleKeyValueTuple(new KeyValue(B, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(2L)))
+            };
+        Tuple[] results2 = new Tuple[] {
+                new SingleKeyValueTuple(new KeyValue(B, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(3L)))
+            };
+        Tuple[] results3 = new Tuple[] {
+                new SingleKeyValueTuple(new KeyValue(B, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(4L))),
+            };
+        final List<PeekingResultIterator>results = Arrays.asList(new PeekingResultIterator[] {new MaterializedResultIterator(Arrays.asList(results1)), new MaterializedResultIterator(Arrays.asList(results2)), new MaterializedResultIterator(Arrays.asList(results3))});
+        ResultIterators iterators = new MaterializedResultIterators(results);
+
+        Tuple[] expectedResults = new Tuple[] {
+                new SingleKeyValueTuple(new KeyValue(B, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(9L))),
+            };
+
+        ClientAggregators aggregators = TestUtil.getSingleSumAggregator(getUrl(), PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES));
+        ResultIterator scanner = new RowKeyOrderedAggregateResultIterator(iterators, aggregators);
+        AssertResults.assertResults(scanner, expectedResults);
+    }
+    
+    @Test
+    public void testSpanEnd() throws Exception {
+        Tuple[] results1 = new Tuple[] {
+                new SingleKeyValueTuple(new KeyValue(A, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(1L))),
+            };
+        Tuple[] results2 = new Tuple[] {
+                new SingleKeyValueTuple(new KeyValue(A, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(2L))),
+                new SingleKeyValueTuple(new KeyValue(B, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(3L))),
+                new SingleKeyValueTuple(new KeyValue(C, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(4L))),
+                new SingleKeyValueTuple(new KeyValue(D, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(5L))),
+            };
+        Tuple[] results3 = new Tuple[] {
+                new SingleKeyValueTuple(new KeyValue(D, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(6L))),
+            };
+        final List<PeekingResultIterator>results = Arrays.asList(new PeekingResultIterator[] {new MaterializedResultIterator(Arrays.asList(results1)), new MaterializedResultIterator(Arrays.asList(results2)), new MaterializedResultIterator(Arrays.asList(results3))});
+        ResultIterators iterators = new MaterializedResultIterators(results);
+
+        Tuple[] expectedResults = new Tuple[] {
+                new SingleKeyValueTuple(new KeyValue(A, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(3L))),
+                new SingleKeyValueTuple(new KeyValue(B, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(3L))),
+                new SingleKeyValueTuple(new KeyValue(C, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(4L))),
+                new SingleKeyValueTuple(new KeyValue(D, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(11L))),
+            };
+
+        ClientAggregators aggregators = TestUtil.getSingleSumAggregator(getUrl(), PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES));
+        ResultIterator scanner = new RowKeyOrderedAggregateResultIterator(iterators, aggregators);
+        AssertResults.assertResults(scanner, expectedResults);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d414505d/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
index c73c160..872555c 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
@@ -18,6 +18,8 @@
 package org.apache.phoenix.util;
 
 import static org.apache.phoenix.query.QueryConstants.MILLIS_IN_DAY;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY_NAME;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_NAME;
 import static org.apache.phoenix.util.PhoenixRuntime.CONNECTIONLESS;
 import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
 import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
@@ -42,6 +44,7 @@ import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
 
@@ -55,6 +58,8 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
 import org.apache.hadoop.hbase.ipc.ServerRpcController;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.compile.AggregationManager;
+import org.apache.phoenix.compile.SequenceManager;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheRequest;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheResponse;
@@ -70,7 +75,10 @@ import org.apache.phoenix.expression.NotExpression;
 import org.apache.phoenix.expression.OrExpression;
 import org.apache.phoenix.expression.RowKeyColumnExpression;
 import org.apache.phoenix.expression.StringBasedLikeExpression;
+import org.apache.phoenix.expression.aggregator.ClientAggregators;
+import org.apache.phoenix.expression.function.SingleAggregateFunction;
 import org.apache.phoenix.expression.function.SubstrFunction;
+import org.apache.phoenix.expression.function.SumAggregateFunction;
 import org.apache.phoenix.filter.MultiCQKeyValueComparisonFilter;
 import org.apache.phoenix.filter.MultiKeyValueComparisonFilter;
 import org.apache.phoenix.filter.RowKeyComparisonFilter;
@@ -79,13 +87,17 @@ import org.apache.phoenix.filter.SingleKeyValueComparisonFilter;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
+import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.parse.LikeParseNode.LikeType;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PLongColumn;
+import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.RowKeyValueAccessor;
+import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.stats.GuidePostsInfo;
 import org.apache.phoenix.schema.stats.PTableStats;
@@ -622,5 +634,63 @@ public class TestUtil {
             tableNameBuilder.append(transactional ? "_TXN" : "_NON_TXN");
         return tableNameBuilder.toString();
     }
+
+    public static ClientAggregators getSingleSumAggregator(String url, Properties props) throws SQLException {
+        try (PhoenixConnection pconn = DriverManager.getConnection(url, props).unwrap(PhoenixConnection.class)) {
+            PhoenixStatement statement = new PhoenixStatement(pconn);
+            StatementContext context = new StatementContext(statement, null, new Scan(), new SequenceManager(statement));
+            AggregationManager aggregationManager = context.getAggregationManager();
+            SumAggregateFunction func = new SumAggregateFunction(Arrays.<Expression>asList(new KeyValueColumnExpression(new PLongColumn() {
+                @Override
+                public PName getName() {
+                    return SINGLE_COLUMN_NAME;
+                }
+                @Override
+                public PName getFamilyName() {
+                    return SINGLE_COLUMN_FAMILY_NAME;
+                }
+                @Override
+                public int getPosition() {
+                    return 0;
+                }
+                
+                @Override
+                public SortOrder getSortOrder() {
+                	return SortOrder.getDefault();
+                }
+                
+                @Override
+                public Integer getArraySize() {
+                    return 0;
+                }
+                
+                @Override
+                public byte[] getViewConstant() {
+                    return null;
+                }
+                
+                @Override
+                public boolean isViewReferenced() {
+                    return false;
+                }
+                
+                @Override
+                public String getExpressionStr() {
+                    return null;
+                }
+                @Override
+                public boolean isRowTimestamp() {
+                    return false;
+                }
+    			@Override
+    			public boolean isDynamic() {
+    				return false;
+    			}
+            })), null);
+            aggregationManager.setAggregators(new ClientAggregators(Collections.<SingleAggregateFunction>singletonList(func), 1));
+            ClientAggregators aggregators = aggregationManager.getAggregators();
+            return aggregators;
+        }
+    }
 }
 


[2/3] phoenix git commit: PHOENIX-2876 Using aggregation function in ORDER BY (Sergey Soldatov)

Posted by ja...@apache.org.
PHOENIX-2876 Using aggregation function in ORDER BY (Sergey Soldatov)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/af8d3b65
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/af8d3b65
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/af8d3b65

Branch: refs/heads/master
Commit: af8d3b65c84fd57b91b99ff36de2194149c5a94e
Parents: d414505
Author: James Taylor <ja...@apache.org>
Authored: Thu May 12 14:32:34 2016 -0700
Committer: James Taylor <ja...@apache.org>
Committed: Thu May 12 14:32:34 2016 -0700

----------------------------------------------------------------------
 .../phoenix/compile/AggregationManager.java     | 60 ++++++++++++++++++++
 .../apache/phoenix/compile/DeleteCompiler.java  |  1 +
 .../apache/phoenix/compile/PostDDLCompiler.java |  1 +
 .../phoenix/compile/ProjectionCompiler.java     | 53 +----------------
 .../apache/phoenix/compile/QueryCompiler.java   |  1 +
 .../apache/phoenix/compile/UpsertCompiler.java  |  6 +-
 .../phoenix/compile/QueryCompilerTest.java      | 16 +++++-
 7 files changed, 86 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/af8d3b65/phoenix-core/src/main/java/org/apache/phoenix/compile/AggregationManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/AggregationManager.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/AggregationManager.java
index ee2497b..c8e672e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/AggregationManager.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/AggregationManager.java
@@ -17,7 +17,21 @@
  */
 package org.apache.phoenix.compile;
 
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Sets;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.aggregator.ClientAggregators;
+import org.apache.phoenix.expression.aggregator.ServerAggregators;
+import org.apache.phoenix.expression.function.SingleAggregateFunction;
+import org.apache.phoenix.expression.visitor.SingleAggregateFunctionVisitor;
+
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
 
 /**
  * 
@@ -52,4 +66,50 @@ public class AggregationManager {
     public void setAggregators(ClientAggregators clientAggregator) {
         this.aggregators = clientAggregator;
     }
+    /**
+     * Compiles projection by:
+     * 1) Adding RowCount aggregate function if not present when limiting rows. We need this
+     *    to track how many rows have been scanned.
+     * 2) Reordering aggregation functions (by putting fixed length aggregates first) to
+     *    optimize the positional access of the aggregated value.
+     */
+    public void compile(StatementContext context, GroupByCompiler.GroupBy groupBy) throws
+            SQLException {
+        final Set<SingleAggregateFunction> aggFuncSet = Sets.newHashSetWithExpectedSize(context.getExpressionManager().getExpressionCount());
+
+        Iterator<Expression> expressions = context.getExpressionManager().getExpressions();
+        while (expressions.hasNext()) {
+            Expression expression = expressions.next();
+            expression.accept(new SingleAggregateFunctionVisitor() {
+                @Override
+                public Iterator<Expression> visitEnter(SingleAggregateFunction function) {
+                    aggFuncSet.add(function);
+                    return Iterators.emptyIterator();
+                }
+            });
+        }
+        if (aggFuncSet.isEmpty() && groupBy.isEmpty()) {
+            return;
+        }
+        List<SingleAggregateFunction> aggFuncs = new ArrayList<SingleAggregateFunction>(aggFuncSet);
+        Collections.sort(aggFuncs, SingleAggregateFunction.SCHEMA_COMPARATOR);
+
+        int minNullableIndex = getMinNullableIndex(aggFuncs,groupBy.isEmpty());
+        context.getScan().setAttribute(BaseScannerRegionObserver.AGGREGATORS, ServerAggregators.serialize(aggFuncs, minNullableIndex));
+        ClientAggregators clientAggregators = new ClientAggregators(aggFuncs, minNullableIndex);
+        context.getAggregationManager().setAggregators(clientAggregators);
+    }
+
+    private static int getMinNullableIndex(List<SingleAggregateFunction> aggFuncs, boolean isUngroupedAggregation) {
+        int minNullableIndex = aggFuncs.size();
+        for (int i = 0; i < aggFuncs.size(); i++) {
+            SingleAggregateFunction aggFunc = aggFuncs.get(i);
+            if (isUngroupedAggregation ? aggFunc.getAggregator().isNullable() : aggFunc.getAggregatorExpression().isNullable()) {
+                minNullableIndex = i;
+                break;
+            }
+        }
+        return minNullableIndex;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/af8d3b65/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index 2a97686..fa3dd62 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -508,6 +508,7 @@ public class DeleteCompiler {
                 // Ignoring ORDER BY, since with auto commit on and no limit makes no difference
                 SelectStatement aggSelect = SelectStatement.create(SelectStatement.COUNT_ONE, delete.getHint());
                 RowProjector projectorToBe = ProjectionCompiler.compile(context, aggSelect, GroupBy.EMPTY_GROUP_BY);
+                context.getAggregationManager().compile(context, GroupBy.EMPTY_GROUP_BY);
                 if (plan.getProjector().projectEveryRow()) {
                     projectorToBe = new RowProjector(projectorToBe,true);
                 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/af8d3b65/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
index a786438..e43b596 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
@@ -253,6 +253,7 @@ public class PostDDLCompiler {
                             }
                             // Need to project all column families into the scan, since we haven't yet created our empty key value
                             RowProjector projector = ProjectionCompiler.compile(context, SelectStatement.COUNT_ONE, GroupBy.EMPTY_GROUP_BY);
+                            context.getAggregationManager().compile(context, GroupBy.EMPTY_GROUP_BY);
                             // Explicitly project these column families and don't project the empty key value,
                             // since at this point we haven't added the empty key value everywhere.
                             if (columnFamilies != null) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/af8d3b65/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java
index 3cf3934..8d7d7cf 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java
@@ -188,8 +188,8 @@ public class ProjectionCompiler {
         try {
         	dataTable = conn.getTable(new PTableKey(tenantId, tableName));
         } catch (TableNotFoundException e) {
-            if (tenantId != null) { 
-            	// Check with null tenantId 
+            if (tenantId != null) {
+                // Check with null tenantId
             	dataTable = conn.getTable(new PTableKey(null, tableName));
             }
             else {
@@ -483,8 +483,6 @@ public class ProjectionCompiler {
                 }
             }
         }
-        
-        selectVisitor.compile();
         boolean isProjectEmptyKeyValue = false;
         if (isWildcard) {
             projectAllColumnFamilies(table, scan);
@@ -576,18 +574,7 @@ public class ProjectionCompiler {
     }
 
     private static class SelectClauseVisitor extends ExpressionCompiler {
-        private static int getMinNullableIndex(List<SingleAggregateFunction> aggFuncs, boolean isUngroupedAggregation) {
-            int minNullableIndex = aggFuncs.size();
-            for (int i = 0; i < aggFuncs.size(); i++) {
-                SingleAggregateFunction aggFunc = aggFuncs.get(i);
-                if (isUngroupedAggregation ? aggFunc.getAggregator().isNullable() : aggFunc.getAggregatorExpression().isNullable()) {
-                    minNullableIndex = i;
-                    break;
-                }
-            }
-            return minNullableIndex;
-        }
-        
+
         /**
          * Track whether or not the projection expression is case sensitive. We use this
          * information to determine whether or not we normalize the column name passed
@@ -613,40 +600,6 @@ public class ProjectionCompiler {
             reset();
         }
 
-
-        /**
-         * Compiles projection by:
-         * 1) Adding RowCount aggregate function if not present when limiting rows. We need this
-         *    to track how many rows have been scanned.
-         * 2) Reordering aggregation functions (by putting fixed length aggregates first) to
-         *    optimize the positional access of the aggregated value.
-         */
-        private void compile() throws SQLException {
-            final Set<SingleAggregateFunction> aggFuncSet = Sets.newHashSetWithExpectedSize(context.getExpressionManager().getExpressionCount());
-    
-            Iterator<Expression> expressions = context.getExpressionManager().getExpressions();
-            while (expressions.hasNext()) {
-                Expression expression = expressions.next();
-                expression.accept(new SingleAggregateFunctionVisitor() {
-                    @Override
-                    public Iterator<Expression> visitEnter(SingleAggregateFunction function) {
-                        aggFuncSet.add(function);
-                        return Iterators.emptyIterator();
-                    }
-                });
-            }
-            if (aggFuncSet.isEmpty() && groupBy.isEmpty()) {
-                return;
-            }
-            List<SingleAggregateFunction> aggFuncs = new ArrayList<SingleAggregateFunction>(aggFuncSet);
-            Collections.sort(aggFuncs, SingleAggregateFunction.SCHEMA_COMPARATOR);
-    
-            int minNullableIndex = getMinNullableIndex(aggFuncs,groupBy.isEmpty());
-            context.getScan().setAttribute(BaseScannerRegionObserver.AGGREGATORS, ServerAggregators.serialize(aggFuncs, minNullableIndex));
-            ClientAggregators clientAggregators = new ClientAggregators(aggFuncs, minNullableIndex);
-            context.getAggregationManager().setAggregators(clientAggregators);
-        }
-        
         @Override
         public void reset() {
             super.reset();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/af8d3b65/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
index 82c9731..4488aff 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -564,6 +564,7 @@ public class QueryCompiler {
         RowProjector projector = ProjectionCompiler.compile(context, select, groupBy, asSubquery ? Collections.<PDatum>emptyList() : targetColumns, where);
         OrderBy orderBy = OrderByCompiler.compile(context, select, groupBy, limit, offset, projector,
                 groupBy == GroupBy.EMPTY_GROUP_BY ? innerPlanTupleProjector : null, isInRowKeyOrder);
+        context.getAggregationManager().compile(context, groupBy);
         // Final step is to build the query plan
         if (!asSubquery) {
             int maxRows = statement.getMaxRows();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/af8d3b65/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index 7c6347f..e2fc2ca 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -636,7 +636,11 @@ public class UpsertCompiler {
                     PTable projectedTable = PTableImpl.makePTable(table, projectedColumns);
                     
                     SelectStatement select = SelectStatement.create(SelectStatement.COUNT_ONE, upsert.getHint());
-                    RowProjector aggProjectorToBe = ProjectionCompiler.compile(queryPlan.getContext(), select, GroupBy.EMPTY_GROUP_BY);
+                    StatementContext statementContext = queryPlan.getContext();
+                    RowProjector aggProjectorToBe = ProjectionCompiler.compile(statementContext, select, GroupBy
+                            .EMPTY_GROUP_BY);
+                    statementContext.getAggregationManager().compile(queryPlan.getContext()
+                            ,GroupBy.EMPTY_GROUP_BY);
                     if (queryPlan.getProjector().projectEveryRow()) {
                         aggProjectorToBe = new RowProjector(aggProjectorToBe,true);
                     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/af8d3b65/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
index 4b756fa..1db90a9 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
@@ -2275,5 +2275,19 @@ public class QueryCompilerTest extends BaseConnectionlessQueryTest {
             conn.close();
         }
     }
-
+    @Test
+    public void testOrderByWithNoProjection() throws SQLException {
+        Connection conn = DriverManager.getConnection(getUrl());
+        try {
+            conn.createStatement().execute("create table x (id integer primary key, A.i1 integer," +
+                    " B.i2 integer)");
+            Scan scan = projectQuery("select A.i1 from X group by i1 order by avg(B.i2) " +
+                    "desc");
+            ServerAggregators aggregators = ServerAggregators.deserialize(scan.getAttribute
+                    (BaseScannerRegionObserver.AGGREGATORS), null);
+            assertEquals(2,aggregators.getAggregatorCount());
+        } finally {
+            conn.close();
+        }
+    }
 }


[3/3] phoenix git commit: PHOENIX-2888 Aggregations for similar IN list expressions doesn't work correctly (Sergey Soldatov)

Posted by ja...@apache.org.
PHOENIX-2888 Aggregations for similar IN list expressions doesn't work correctly (Sergey Soldatov)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/ae14e38c
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/ae14e38c
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/ae14e38c

Branch: refs/heads/master
Commit: ae14e38ccbed601dfbda9ec0dbb66bc88d63bae5
Parents: af8d3b6
Author: James Taylor <ja...@apache.org>
Authored: Thu May 12 14:34:47 2016 -0700
Committer: James Taylor <ja...@apache.org>
Committed: Thu May 12 14:34:47 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/phoenix/expression/InListExpression.java     | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/ae14e38c/phoenix-core/src/main/java/org/apache/phoenix/expression/InListExpression.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/InListExpression.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/InListExpression.java
index a4a9353..a977f1f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/expression/InListExpression.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/InListExpression.java
@@ -175,7 +175,7 @@ public class InListExpression extends BaseSingleExpression {
     public int hashCode() {
         final int prime = 31;
         int result = 1;
-        result = prime * result + values.hashCode();
+        result = prime * result + children.hashCode() + values.hashCode();
         return result;
     }
 
@@ -185,7 +185,7 @@ public class InListExpression extends BaseSingleExpression {
         if (obj == null) return false;
         if (getClass() != obj.getClass()) return false;
         InListExpression other = (InListExpression)obj;
-        if (!values.equals(other.values)) return false;
+        if (!children.equals(other.children) || !values.equals(other.values)) return false;
         return true;
     }