You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ch...@apache.org on 2021/07/15 09:02:50 UTC

[phoenix] branch master updated: PHOENIX-6507 DistinctAggregatingResultIterator should keep original tuple order of the AggregatingResultIterator

This is an automated email from the ASF dual-hosted git repository.

chenglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new a073d5b  PHOENIX-6507 DistinctAggregatingResultIterator should keep original tuple order of the AggregatingResultIterator
a073d5b is described below

commit a073d5bb19915df4424efc619323fb82e3e838e2
Author: chenglei <ch...@apache.org>
AuthorDate: Thu Jul 15 17:02:22 2021 +0800

    PHOENIX-6507 DistinctAggregatingResultIterator should keep original tuple order of the AggregatingResultIterator
---
 .../org/apache/phoenix/end2end/AggregateIT.java    |  71 ++++++
 .../expression/KeyValueColumnExpression.java       |   7 +-
 .../iterate/DistinctAggregatingResultIterator.java | 135 +++++-----
 .../DistinctAggregatingResultIteratorTest.java     | 273 +++++++++++++++++++++
 4 files changed, 406 insertions(+), 80 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AggregateIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AggregateIT.java
index d52025e..32fb8b4 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AggregateIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AggregateIT.java
@@ -331,5 +331,76 @@ public class AggregateIT extends BaseAggregateIT {
         }
     }
 
+    @Test
+    public void testDistinctAggregatingResultIteratorBug6507() throws Exception {
+        doTestDistinctAggregatingResultIteratorBug6507(false, false);
+        doTestDistinctAggregatingResultIteratorBug6507(false, true);
+        doTestDistinctAggregatingResultIteratorBug6507(true, false);
+        doTestDistinctAggregatingResultIteratorBug6507(true, true);
+    }
+
+    private void doTestDistinctAggregatingResultIteratorBug6507(boolean desc ,boolean salted) throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = null;
+        try {
+            conn = DriverManager.getConnection(getUrl(), props);
+            String tableName = generateUniqueName();
+            String sql = "create table " + tableName + "( "+
+                    " pk1 varchar not null , " +
+                    " pk2 varchar not null, " +
+                    " pk3 varchar not null," +
+                    " v1 varchar, " +
+                    " v2 varchar, " +
+                    " CONSTRAINT TEST_PK PRIMARY KEY ( "+
+                    "pk1 "+(desc ? "desc" : "")+", "+
+                    "pk2 "+(desc ? "desc" : "")+", "+
+                    "pk3 "+(desc ? "desc" : "")+
+                    " )) "+(salted ? "SALT_BUCKETS =4" : "split on('b')");
+            conn.createStatement().execute(sql);
+
+            conn.createStatement().execute("UPSERT INTO "+tableName+" VALUES ('a11','a12','a13','a14','a15')");
+            conn.createStatement().execute("UPSERT INTO "+tableName+" VALUES ('a21','a22','a23','a24','a25')");
+            conn.createStatement().execute("UPSERT INTO "+tableName+" VALUES ('a31','a32','a33','a38','a35')");
+            conn.createStatement().execute("UPSERT INTO "+tableName+" VALUES ('b11','b12','b13','b14','b15')");
+            conn.createStatement().execute("UPSERT INTO "+tableName+" VALUES ('b21','b22','b23','b24','b25')");
+            conn.createStatement().execute("UPSERT INTO "+tableName+" VALUES ('b31','b32','b33','b34','b35')");
+            conn.createStatement().execute("UPSERT INTO "+tableName+" VALUES ('a31','c12','c13','a34','a35')");
+            conn.createStatement().execute("UPSERT INTO "+tableName+" VALUES ('a31','a32','c13','a34','a35')");
+            conn.createStatement().execute("UPSERT INTO "+tableName+" VALUES ('a31','a32','d13','a35','a35')");
+            conn.createStatement().execute("UPSERT INTO "+tableName+" VALUES ('d31','a32','c13','a35','a35')");
+            conn.commit();
+
+            sql = "select distinct pk1,max(v1) from "+tableName+" group by pk1,pk2,pk3 order by pk1,pk2,pk3";
+
+            ResultSet rs = conn.prepareStatement(sql).executeQuery();
+            assertResultSet(rs, new Object[][]{
+                {"a11","a14"},
+                {"a21","a24"},
+                {"a31","a38"},
+                {"a31","a34"},
+                {"a31","a35"},
+                {"b11","b14"},
+                {"b21","b24"},
+                {"b31","b34"},
+                {"d31","a35"}});
+
+            sql = "select distinct pk2,max(v1) from "+tableName+" group by pk2,pk3 order by pk2,pk3";
+
+            rs = conn.prepareStatement(sql).executeQuery();
+            assertResultSet(rs, new Object[][]{
+                {"a12","a14"},
+                {"a22","a24"},
+                {"a32","a38"},
+                {"a32","a35"},
+                {"b12","b14"},
+                {"b22","b24"},
+                {"b32","b34"},
+                {"c12","a34"}});
+        } finally {
+            if(conn != null) {
+                conn.close();
+            }
+        }
+    }
 }
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/KeyValueColumnExpression.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/KeyValueColumnExpression.java
index f8432c5..62665e4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/expression/KeyValueColumnExpression.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/KeyValueColumnExpression.java
@@ -45,7 +45,12 @@ public class KeyValueColumnExpression extends ColumnExpression {
 
     public KeyValueColumnExpression() {
     }
-    
+
+    public KeyValueColumnExpression(final byte[] cf, final byte[] cq) {
+        this.cf = cf;
+        this.cq = cq;
+    }
+
     public KeyValueColumnExpression(PColumn column) {
         super(column);
         this.cf = column.getFamilyName().getBytes();
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DistinctAggregatingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DistinctAggregatingResultIterator.java
index a412582..59cd333 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DistinctAggregatingResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DistinctAggregatingResultIterator.java
@@ -18,18 +18,17 @@
 package org.apache.phoenix.iterate;
 
 import java.sql.SQLException;
-import java.util.Collections;
-import java.util.Iterator;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
 
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.compile.ColumnProjector;
 import org.apache.phoenix.compile.ExplainPlanAttributes
     .ExplainPlanAttributesBuilder;
 import org.apache.phoenix.compile.RowProjector;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.aggregator.Aggregator;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.schema.tuple.Tuple;
 
 import org.apache.phoenix.thirdparty.com.google.common.collect.Sets;
@@ -43,27 +42,44 @@ import org.apache.phoenix.thirdparty.com.google.common.collect.Sets;
  * @since 1.2
  */
 public class DistinctAggregatingResultIterator implements AggregatingResultIterator {
-    private final AggregatingResultIterator delegate;
+    /**
+     * Original AggregatingResultIterator
+     */
+    private final AggregatingResultIterator targetAggregatingResultIterator;
     private final RowProjector rowProjector;
-    private Iterator<ResultEntry> resultIterator;
-    private final ImmutableBytesWritable ptr1 = new ImmutableBytesWritable();
-    private final ImmutableBytesWritable ptr2 = new ImmutableBytesWritable();
+    /**
+     * Cached tuples already seen.
+     */
+    private final Set<ResultEntry> resultEntries =
+            Sets.<ResultEntry>newHashSet();
 
     private class ResultEntry {
+        /**
+         * cached hashCode.
+         */
         private final int hashCode;
         private final Tuple result;
+        /**
+         * cached column values.
+         */
+        private final ImmutableBytesPtr[] columnValues;
 
         ResultEntry(Tuple result) {
-            final int prime = 31;
             this.result = result;
-            int hashCode = 0;
-            for (ColumnProjector column : rowProjector.getColumnProjectors()) {
-                Expression e = column.getExpression();
-                if (e.evaluate(this.result, ptr1)) {
-                    hashCode = prime * hashCode + ptr1.hashCode();
+            this.columnValues =
+                    new ImmutableBytesPtr[rowProjector.getColumnCount()];
+            int columnIndex = 0;
+            for (ColumnProjector columnProjector : rowProjector.getColumnProjectors()) {
+                Expression expression = columnProjector.getExpression();
+                ImmutableBytesPtr ptr = new ImmutableBytesPtr();
+                if (!expression.evaluate(this.result, ptr)) {
+                    columnValues[columnIndex] = null;
+                } else {
+                    columnValues[columnIndex] = ptr;
                 }
+                columnIndex++;
             }
-            this.hashCode = hashCode;
+            this.hashCode = Arrays.hashCode(columnValues);
         }
 
         @Override
@@ -78,91 +94,53 @@ public class DistinctAggregatingResultIterator implements AggregatingResultItera
                 return false;
             }
             ResultEntry that = (ResultEntry) o;
-            for (ColumnProjector column : rowProjector.getColumnProjectors()) {
-                Expression e = column.getExpression();
-                boolean isNull1 = !e.evaluate(this.result, ptr1);
-                boolean isNull2 = !e.evaluate(that.result, ptr2);
-                if (isNull1 && isNull2) {
-                    return true;
-                }
-                if (isNull1 || isNull2) {
-                    return false;
-                }
-                if (ptr1.compareTo(ptr2) != 0) {
-                    return false;
-                }
-            }
-            return true;
+            return Arrays.equals(this.columnValues, that.columnValues);
         }
-        
+
         @Override
         public int hashCode() {
             return hashCode;
         }
-        
-        Tuple getResult() {
-            return result;
-        }
     }
-    
-    protected ResultIterator getDelegate() {
-        return delegate;
-    }
-    
+
     public DistinctAggregatingResultIterator(AggregatingResultIterator delegate,
             RowProjector rowProjector) {
-        this.delegate = delegate;
+        this.targetAggregatingResultIterator = delegate;
         this.rowProjector = rowProjector;
     }
 
     @Override
     public Tuple next() throws SQLException {
-        Iterator<ResultEntry> iterator = getResultIterator();
-        if (iterator.hasNext()) {
-            ResultEntry entry = iterator.next();
-            Tuple tuple = entry.getResult();
-            aggregate(tuple);
-            return tuple;
-        }
-        resultIterator = Collections.emptyIterator();
-        return null;
-    }
-    
-    private Iterator<ResultEntry> getResultIterator() throws SQLException {
-        if (resultIterator != null) {
-            return resultIterator;
-        }
-        
-        Set<ResultEntry> entries = Sets.<ResultEntry>newHashSet(); // TODO: size?
-        try {
-            for (Tuple result = delegate.next(); result != null; result = delegate.next()) {
-                ResultEntry entry = new ResultEntry(result);
-                entries.add(entry);
+        while (true) {
+            Tuple nextTuple = this.targetAggregatingResultIterator.next();
+            if (nextTuple == null) {
+                return null;
+            }
+            ResultEntry resultEntry = new ResultEntry(nextTuple);
+            if (!this.resultEntries.contains(resultEntry)) {
+                this.resultEntries.add(resultEntry);
+                return nextTuple;
             }
-        } finally {
-            delegate.close();
         }
-        
-        resultIterator = entries.iterator();
-        return resultIterator;
     }
 
     @Override
-    public void close()  {
-        resultIterator = Collections.emptyIterator();
-    }
-
+    public void close() throws SQLException {
+        this.targetAggregatingResultIterator.close();
+     }
 
     @Override
     public void explain(List<String> planSteps) {
-        delegate.explain(planSteps);
+        targetAggregatingResultIterator.explain(planSteps);
         planSteps.add("CLIENT DISTINCT ON " + rowProjector.toString());
     }
 
     @Override
     public void explain(List<String> planSteps,
             ExplainPlanAttributesBuilder explainPlanAttributesBuilder) {
-        delegate.explain(planSteps, explainPlanAttributesBuilder);
+        targetAggregatingResultIterator.explain(
+                planSteps,
+                explainPlanAttributesBuilder);
         explainPlanAttributesBuilder.setClientDistinctFilter(
             rowProjector.toString());
         planSteps.add("CLIENT DISTINCT ON " + rowProjector.toString());
@@ -170,13 +148,12 @@ public class DistinctAggregatingResultIterator implements AggregatingResultItera
 
     @Override
     public Aggregator[] aggregate(Tuple result) {
-        return delegate.aggregate(result);
+        return targetAggregatingResultIterator.aggregate(result);
     }
 
-	@Override
-	public String toString() {
-		return "DistinctAggregatingResultIterator [delegate=" + delegate
-				+ ", rowProjector=" + rowProjector + ", resultIterator="
-				+ resultIterator + ", ptr1=" + ptr1 + ", ptr2=" + ptr2 + "]";
-	}
+    @Override
+    public String toString() {
+        return "DistinctAggregatingResultIterator [targetAggregatingResultIterator=" + targetAggregatingResultIterator
+                + ", rowProjector=" + rowProjector;
+    }
 }
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/iterate/DistinctAggregatingResultIteratorTest.java b/phoenix-core/src/test/java/org/apache/phoenix/iterate/DistinctAggregatingResultIteratorTest.java
new file mode 100644
index 0000000..65b7382
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/iterate/DistinctAggregatingResultIteratorTest.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.compile.ColumnProjector;
+import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.expression.KeyValueColumnExpression;
+import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.schema.types.PInteger;
+import org.apache.phoenix.util.AssertResults;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class DistinctAggregatingResultIteratorTest {
+    private final static byte[] cf = Bytes.toBytes("cf");
+    private final static byte[] cq1 = Bytes.toBytes("cq1");
+    private final static byte[] cq2 = Bytes.toBytes("cq2");
+    private final static byte[] cq3 = Bytes.toBytes("cq3");
+    private final static byte[] rowKey1 = Bytes.toBytes("rowKey1");
+    private final static byte[] rowKey2 = Bytes.toBytes("rowKey2");
+    private final static byte[] rowKey3 = Bytes.toBytes("rowKey3");
+    private final static byte[] rowKey4 = Bytes.toBytes("rowKey4");
+    private final static byte[] rowKey5 = Bytes.toBytes("rowKey4");
+
+    @Test
+    public void testDistinctAggregatingResultIterator() throws Throwable {
+        //Test with duplicate
+        Tuple[] input1 = new Tuple[] {
+                new MultiKeyValueTuple(
+                        Arrays.<Cell> asList(
+                                new KeyValue(rowKey1, cf, cq1, PInteger.INSTANCE.toBytes(1)),
+                                new KeyValue(rowKey1, cf, cq2, PInteger.INSTANCE.toBytes(2)))),
+
+                new MultiKeyValueTuple(
+                        Arrays.<Cell> asList(
+                                new KeyValue(rowKey2, cf, cq1, PInteger.INSTANCE.toBytes(11)),
+                                new KeyValue(rowKey2, cf, cq2, PInteger.INSTANCE.toBytes(12)))),
+
+                new MultiKeyValueTuple(
+                        Arrays.<Cell> asList(
+                                new KeyValue(rowKey3, cf, cq1, PInteger.INSTANCE.toBytes(4)),
+                                new KeyValue(rowKey3, cf, cq2, PInteger.INSTANCE.toBytes(2)))),
+
+                new MultiKeyValueTuple(
+                        Arrays.<Cell> asList(
+                                new KeyValue(rowKey1, cf, cq1, PInteger.INSTANCE.toBytes(1)),
+                                new KeyValue(rowKey1, cf, cq2, PInteger.INSTANCE.toBytes(2)))),
+
+                new MultiKeyValueTuple(
+                        Arrays.<Cell> asList(
+                                new KeyValue(rowKey4, cf, cq1, PInteger.INSTANCE.toBytes(7)),
+                                new KeyValue(rowKey4, cf, cq2, PInteger.INSTANCE.toBytes(8)))),
+
+                new MultiKeyValueTuple(
+                        Arrays.<Cell> asList(
+                                new KeyValue(rowKey2, cf, cq1, PInteger.INSTANCE.toBytes(11)),
+                                new KeyValue(rowKey2, cf, cq2, PInteger.INSTANCE.toBytes(12)))),
+
+                new MultiKeyValueTuple(
+                        Arrays.<Cell> asList(
+                                new KeyValue(rowKey5, cf, cq1, PInteger.INSTANCE.toBytes(90)),
+                                new KeyValue(rowKey5, cf, cq2, PInteger.INSTANCE.toBytes(100)))),
+                null
+
+        };
+
+        Tuple[] result1 = new Tuple[] {
+                new MultiKeyValueTuple(
+                        Arrays.<Cell> asList(
+                                new KeyValue(rowKey1, cf, cq1, PInteger.INSTANCE.toBytes(1)),
+                                new KeyValue(rowKey1, cf, cq2, PInteger.INSTANCE.toBytes(2)))),
+
+                new MultiKeyValueTuple(
+                        Arrays.<Cell> asList(
+                                new KeyValue(rowKey2, cf, cq1, PInteger.INSTANCE.toBytes(11)),
+                                new KeyValue(rowKey2, cf, cq2, PInteger.INSTANCE.toBytes(12)))),
+
+                new MultiKeyValueTuple(
+                        Arrays.<Cell> asList(
+                                new KeyValue(rowKey3, cf, cq1, PInteger.INSTANCE.toBytes(4)),
+                                new KeyValue(rowKey3, cf, cq2, PInteger.INSTANCE.toBytes(2)))),
+
+                new MultiKeyValueTuple(
+                        Arrays.<Cell> asList(
+                                new KeyValue(rowKey4, cf, cq1, PInteger.INSTANCE.toBytes(7)),
+                                new KeyValue(rowKey4, cf, cq2, PInteger.INSTANCE.toBytes(8)))),
+
+                new MultiKeyValueTuple(
+                        Arrays.<Cell> asList(
+                                new KeyValue(rowKey5, cf, cq1, PInteger.INSTANCE.toBytes(90)),
+                                new KeyValue(rowKey5, cf, cq2, PInteger.INSTANCE.toBytes(100))))
+
+        };
+        RowProjector mockRowProjector = Mockito.mock(RowProjector.class);
+        Mockito.when(mockRowProjector.getColumnCount()).thenReturn(2);
+
+        KeyValueColumnExpression columnExpression1 = new KeyValueColumnExpression(cf, cq1);
+        KeyValueColumnExpression columnExpression2 = new KeyValueColumnExpression(cf, cq2);
+        final ColumnProjector mockColumnProjector1 = Mockito.mock(ColumnProjector.class);
+        Mockito.when(mockColumnProjector1.getExpression()).thenReturn(columnExpression1);
+        final ColumnProjector mockColumnProjector2 = Mockito.mock(ColumnProjector.class);
+        Mockito.when(mockColumnProjector2.getExpression()).thenReturn(columnExpression2);
+
+        Mockito.when(mockRowProjector.getColumnProjectors()).thenAnswer(
+                new Answer<List<ColumnProjector> >() {
+                    @Override
+                    public List<ColumnProjector> answer(InvocationOnMock invocation) throws Throwable {
+                        return Arrays.asList(mockColumnProjector1,mockColumnProjector2);
+                    }
+                });
+
+        assertResults(
+                input1, result1, mockRowProjector);
+
+        //Test with duplicate and null
+        Tuple[] input2 = new Tuple[] {
+                new MultiKeyValueTuple(
+                        Arrays.<Cell> asList(
+                                new KeyValue(rowKey1, cf, cq1, PInteger.INSTANCE.toBytes(1)),
+                                new KeyValue(rowKey1, cf, cq2, PInteger.INSTANCE.toBytes(2)))),
+
+                new MultiKeyValueTuple(
+                        Arrays.<Cell> asList(
+                                new KeyValue(rowKey2, cf, cq1, PInteger.INSTANCE.toBytes(11)),
+                                new KeyValue(rowKey2, cf, cq2, PInteger.INSTANCE.toBytes(12)))),
+
+                new MultiKeyValueTuple(
+                        Arrays.<Cell> asList(
+                                new KeyValue(rowKey3, cf, cq1, PInteger.INSTANCE.toBytes(4)),
+                                new KeyValue(rowKey3, cf, cq2, PInteger.INSTANCE.toBytes(2)))),
+
+                new MultiKeyValueTuple(
+                        Arrays.<Cell> asList(
+                                new KeyValue(rowKey1, cf, cq2, PInteger.INSTANCE.toBytes(1)),
+                                new KeyValue(rowKey1, cf, cq1, PInteger.INSTANCE.toBytes(2)))),
+
+                new MultiKeyValueTuple(
+                        Arrays.<Cell> asList(
+                                new KeyValue(rowKey4, cf, cq1, PInteger.INSTANCE.toBytes(7)),
+                                new KeyValue(rowKey4, cf, cq2, PInteger.INSTANCE.toBytes(8)))),
+
+                new MultiKeyValueTuple(
+                        Arrays.<Cell> asList(
+                                new KeyValue(rowKey2, cf, cq1, PInteger.INSTANCE.toBytes(11)),
+                                new KeyValue(rowKey2, cf, cq3, PInteger.INSTANCE.toBytes(12)))),
+
+                new MultiKeyValueTuple(
+                        Arrays.<Cell> asList(
+                                new KeyValue(rowKey1, cf, cq2, PInteger.INSTANCE.toBytes(1)),
+                                new KeyValue(rowKey1, cf, cq1, PInteger.INSTANCE.toBytes(2)))),
+
+                new MultiKeyValueTuple(
+                        Arrays.<Cell> asList(
+                                new KeyValue(rowKey5, cf, cq1, PInteger.INSTANCE.toBytes(90)),
+                                new KeyValue(rowKey5, cf, cq2, PInteger.INSTANCE.toBytes(100)))),
+
+                new MultiKeyValueTuple(
+                        Arrays.<Cell> asList(
+                                new KeyValue(rowKey2, cf, cq1, PInteger.INSTANCE.toBytes(11)),
+                                new KeyValue(rowKey2, cf, cq3, PInteger.INSTANCE.toBytes(12)))),
+
+                null
+
+        };
+
+        Tuple[] result2 = new Tuple[] {
+                new MultiKeyValueTuple(
+                        Arrays.<Cell> asList(
+                                new KeyValue(rowKey1, cf, cq1, PInteger.INSTANCE.toBytes(1)),
+                                new KeyValue(rowKey1, cf, cq2, PInteger.INSTANCE.toBytes(2)))),
+
+                new MultiKeyValueTuple(
+                        Arrays.<Cell> asList(
+                                new KeyValue(rowKey2, cf, cq1, PInteger.INSTANCE.toBytes(11)),
+                                new KeyValue(rowKey2, cf, cq2, PInteger.INSTANCE.toBytes(12)))),
+
+                new MultiKeyValueTuple(
+                        Arrays.<Cell> asList(
+                                new KeyValue(rowKey3, cf, cq1, PInteger.INSTANCE.toBytes(4)),
+                                new KeyValue(rowKey3, cf, cq2, PInteger.INSTANCE.toBytes(2)))),
+
+                new MultiKeyValueTuple(
+                        Arrays.<Cell> asList(
+                                new KeyValue(rowKey1, cf, cq2, PInteger.INSTANCE.toBytes(1)),
+                                new KeyValue(rowKey1, cf, cq1, PInteger.INSTANCE.toBytes(2)))),
+
+                new MultiKeyValueTuple(
+                        Arrays.<Cell> asList(
+                                new KeyValue(rowKey4, cf, cq1, PInteger.INSTANCE.toBytes(7)),
+                                new KeyValue(rowKey4, cf, cq2, PInteger.INSTANCE.toBytes(8)))),
+
+                new MultiKeyValueTuple(
+                        Arrays.<Cell> asList(
+                                new KeyValue(rowKey2, cf, cq1, PInteger.INSTANCE.toBytes(11)),
+                                new KeyValue(rowKey2, cf, cq3, PInteger.INSTANCE.toBytes(12)))),
+
+                new MultiKeyValueTuple(
+                        Arrays.<Cell> asList(
+                                new KeyValue(rowKey5, cf, cq1, PInteger.INSTANCE.toBytes(90)),
+                                new KeyValue(rowKey5, cf, cq2, PInteger.INSTANCE.toBytes(100))))
+
+        };
+        assertResults(
+                input2, result2, mockRowProjector);
+
+        //Test with no duplicate
+        int n = 100;
+        Tuple[] input3 = new Tuple[n + 1];
+        for(int i = 0; i <= n; i++) {
+            byte[] rowKey = PInteger.INSTANCE.toBytes(i);
+            input3[i] =  new MultiKeyValueTuple(
+                            Arrays.<Cell> asList(
+                               new KeyValue(rowKey, cf, cq1, PInteger.INSTANCE.toBytes(i + 1)),
+                               new KeyValue(rowKey, cf, cq2, PInteger.INSTANCE.toBytes(i + 2))));
+        }
+        input3[n] = null;
+        Tuple[] result3 = Arrays.copyOfRange(input3, 0, n);
+        assertResults(
+                input3, result3, mockRowProjector);
+
+        //Test with all duplicate
+        Tuple[] input4 = new Tuple[n + 1];
+        for(int i = 0; i <= n; i++) {
+            byte[] rowKey = PInteger.INSTANCE.toBytes(1);
+            input4[i] =  new MultiKeyValueTuple(
+                            Arrays.<Cell> asList(
+                               new KeyValue(rowKey, cf, cq1, PInteger.INSTANCE.toBytes(2)),
+                               new KeyValue(rowKey, cf, cq2, PInteger.INSTANCE.toBytes(3))));
+        }
+        input4[n] = null;
+        Tuple[] result4 = new Tuple[] {input4[0]};
+        assertResults(
+                input4, result4, mockRowProjector);
+
+    }
+
+    private void assertResults(Tuple[] input, Tuple[] result,  RowProjector rowProjector) throws Exception {
+        AggregatingResultIterator mockAggregatingResultIterator =
+                Mockito.mock(AggregatingResultIterator.class);
+        Mockito.when(mockAggregatingResultIterator.next()).thenReturn(
+                input[0], Arrays.copyOfRange(input, 1, input.length));
+
+        DistinctAggregatingResultIterator distinctAggregatingResultIterator =
+                new DistinctAggregatingResultIterator(mockAggregatingResultIterator, rowProjector);
+        AssertResults.assertResults(
+                distinctAggregatingResultIterator, result);
+    }
+
+}