You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ch...@apache.org on 2021/07/15 09:02:50 UTC
[phoenix] branch master updated: PHOENIX-6507
DistinctAggregatingResultIterator should keep original tuple order of the
AggregatingResultIterator
This is an automated email from the ASF dual-hosted git repository.
chenglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new a073d5b PHOENIX-6507 DistinctAggregatingResultIterator should keep original tuple order of the AggregatingResultIterator
a073d5b is described below
commit a073d5bb19915df4424efc619323fb82e3e838e2
Author: chenglei <ch...@apache.org>
AuthorDate: Thu Jul 15 17:02:22 2021 +0800
PHOENIX-6507 DistinctAggregatingResultIterator should keep original tuple order of the AggregatingResultIterator
---
.../org/apache/phoenix/end2end/AggregateIT.java | 71 ++++++
.../expression/KeyValueColumnExpression.java | 7 +-
.../iterate/DistinctAggregatingResultIterator.java | 135 +++++-----
.../DistinctAggregatingResultIteratorTest.java | 273 +++++++++++++++++++++
4 files changed, 406 insertions(+), 80 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AggregateIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AggregateIT.java
index d52025e..32fb8b4 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AggregateIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AggregateIT.java
@@ -331,5 +331,76 @@ public class AggregateIT extends BaseAggregateIT {
}
}
+ @Test
+ public void testDistinctAggregatingResultIteratorBug6507() throws Exception {
+ doTestDistinctAggregatingResultIteratorBug6507(false, false);
+ doTestDistinctAggregatingResultIteratorBug6507(false, true);
+ doTestDistinctAggregatingResultIteratorBug6507(true, false);
+ doTestDistinctAggregatingResultIteratorBug6507(true, true);
+ }
+
+ private void doTestDistinctAggregatingResultIteratorBug6507(boolean desc ,boolean salted) throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = null;
+ try {
+ conn = DriverManager.getConnection(getUrl(), props);
+ String tableName = generateUniqueName();
+ String sql = "create table " + tableName + "( "+
+ " pk1 varchar not null , " +
+ " pk2 varchar not null, " +
+ " pk3 varchar not null," +
+ " v1 varchar, " +
+ " v2 varchar, " +
+ " CONSTRAINT TEST_PK PRIMARY KEY ( "+
+ "pk1 "+(desc ? "desc" : "")+", "+
+ "pk2 "+(desc ? "desc" : "")+", "+
+ "pk3 "+(desc ? "desc" : "")+
+ " )) "+(salted ? "SALT_BUCKETS =4" : "split on('b')");
+ conn.createStatement().execute(sql);
+
+ conn.createStatement().execute("UPSERT INTO "+tableName+" VALUES ('a11','a12','a13','a14','a15')");
+ conn.createStatement().execute("UPSERT INTO "+tableName+" VALUES ('a21','a22','a23','a24','a25')");
+ conn.createStatement().execute("UPSERT INTO "+tableName+" VALUES ('a31','a32','a33','a38','a35')");
+ conn.createStatement().execute("UPSERT INTO "+tableName+" VALUES ('b11','b12','b13','b14','b15')");
+ conn.createStatement().execute("UPSERT INTO "+tableName+" VALUES ('b21','b22','b23','b24','b25')");
+ conn.createStatement().execute("UPSERT INTO "+tableName+" VALUES ('b31','b32','b33','b34','b35')");
+ conn.createStatement().execute("UPSERT INTO "+tableName+" VALUES ('a31','c12','c13','a34','a35')");
+ conn.createStatement().execute("UPSERT INTO "+tableName+" VALUES ('a31','a32','c13','a34','a35')");
+ conn.createStatement().execute("UPSERT INTO "+tableName+" VALUES ('a31','a32','d13','a35','a35')");
+ conn.createStatement().execute("UPSERT INTO "+tableName+" VALUES ('d31','a32','c13','a35','a35')");
+ conn.commit();
+
+ sql = "select distinct pk1,max(v1) from "+tableName+" group by pk1,pk2,pk3 order by pk1,pk2,pk3";
+
+ ResultSet rs = conn.prepareStatement(sql).executeQuery();
+ assertResultSet(rs, new Object[][]{
+ {"a11","a14"},
+ {"a21","a24"},
+ {"a31","a38"},
+ {"a31","a34"},
+ {"a31","a35"},
+ {"b11","b14"},
+ {"b21","b24"},
+ {"b31","b34"},
+ {"d31","a35"}});
+
+ sql = "select distinct pk2,max(v1) from "+tableName+" group by pk2,pk3 order by pk2,pk3";
+
+ rs = conn.prepareStatement(sql).executeQuery();
+ assertResultSet(rs, new Object[][]{
+ {"a12","a14"},
+ {"a22","a24"},
+ {"a32","a38"},
+ {"a32","a35"},
+ {"b12","b14"},
+ {"b22","b24"},
+ {"b32","b34"},
+ {"c12","a34"}});
+ } finally {
+ if(conn != null) {
+ conn.close();
+ }
+ }
+ }
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/KeyValueColumnExpression.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/KeyValueColumnExpression.java
index f8432c5..62665e4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/expression/KeyValueColumnExpression.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/KeyValueColumnExpression.java
@@ -45,7 +45,12 @@ public class KeyValueColumnExpression extends ColumnExpression {
public KeyValueColumnExpression() {
}
-
+
+ public KeyValueColumnExpression(final byte[] cf, final byte[] cq) {
+ this.cf = cf;
+ this.cq = cq;
+ }
+
public KeyValueColumnExpression(PColumn column) {
super(column);
this.cf = column.getFamilyName().getBytes();
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DistinctAggregatingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DistinctAggregatingResultIterator.java
index a412582..59cd333 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DistinctAggregatingResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DistinctAggregatingResultIterator.java
@@ -18,18 +18,17 @@
package org.apache.phoenix.iterate;
import java.sql.SQLException;
-import java.util.Collections;
-import java.util.Iterator;
+import java.util.Arrays;
import java.util.List;
import java.util.Set;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.phoenix.compile.ColumnProjector;
import org.apache.phoenix.compile.ExplainPlanAttributes
.ExplainPlanAttributesBuilder;
import org.apache.phoenix.compile.RowProjector;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.aggregator.Aggregator;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.thirdparty.com.google.common.collect.Sets;
@@ -43,27 +42,44 @@ import org.apache.phoenix.thirdparty.com.google.common.collect.Sets;
* @since 1.2
*/
public class DistinctAggregatingResultIterator implements AggregatingResultIterator {
- private final AggregatingResultIterator delegate;
+ /**
+ * Original AggregatingResultIterator
+ */
+ private final AggregatingResultIterator targetAggregatingResultIterator;
private final RowProjector rowProjector;
- private Iterator<ResultEntry> resultIterator;
- private final ImmutableBytesWritable ptr1 = new ImmutableBytesWritable();
- private final ImmutableBytesWritable ptr2 = new ImmutableBytesWritable();
+ /**
+ * Cached tuples already seen.
+ */
+ private final Set<ResultEntry> resultEntries =
+ Sets.<ResultEntry>newHashSet();
private class ResultEntry {
+ /**
+ * cached hashCode.
+ */
private final int hashCode;
private final Tuple result;
+ /**
+ * cached column values.
+ */
+ private final ImmutableBytesPtr[] columnValues;
ResultEntry(Tuple result) {
- final int prime = 31;
this.result = result;
- int hashCode = 0;
- for (ColumnProjector column : rowProjector.getColumnProjectors()) {
- Expression e = column.getExpression();
- if (e.evaluate(this.result, ptr1)) {
- hashCode = prime * hashCode + ptr1.hashCode();
+ this.columnValues =
+ new ImmutableBytesPtr[rowProjector.getColumnCount()];
+ int columnIndex = 0;
+ for (ColumnProjector columnProjector : rowProjector.getColumnProjectors()) {
+ Expression expression = columnProjector.getExpression();
+ ImmutableBytesPtr ptr = new ImmutableBytesPtr();
+ if (!expression.evaluate(this.result, ptr)) {
+ columnValues[columnIndex] = null;
+ } else {
+ columnValues[columnIndex] = ptr;
}
+ columnIndex++;
}
- this.hashCode = hashCode;
+ this.hashCode = Arrays.hashCode(columnValues);
}
@Override
@@ -78,91 +94,53 @@ public class DistinctAggregatingResultIterator implements AggregatingResultItera
return false;
}
ResultEntry that = (ResultEntry) o;
- for (ColumnProjector column : rowProjector.getColumnProjectors()) {
- Expression e = column.getExpression();
- boolean isNull1 = !e.evaluate(this.result, ptr1);
- boolean isNull2 = !e.evaluate(that.result, ptr2);
- if (isNull1 && isNull2) {
- return true;
- }
- if (isNull1 || isNull2) {
- return false;
- }
- if (ptr1.compareTo(ptr2) != 0) {
- return false;
- }
- }
- return true;
+ return Arrays.equals(this.columnValues, that.columnValues);
}
-
+
@Override
public int hashCode() {
return hashCode;
}
-
- Tuple getResult() {
- return result;
- }
}
-
- protected ResultIterator getDelegate() {
- return delegate;
- }
-
+
public DistinctAggregatingResultIterator(AggregatingResultIterator delegate,
RowProjector rowProjector) {
- this.delegate = delegate;
+ this.targetAggregatingResultIterator = delegate;
this.rowProjector = rowProjector;
}
@Override
public Tuple next() throws SQLException {
- Iterator<ResultEntry> iterator = getResultIterator();
- if (iterator.hasNext()) {
- ResultEntry entry = iterator.next();
- Tuple tuple = entry.getResult();
- aggregate(tuple);
- return tuple;
- }
- resultIterator = Collections.emptyIterator();
- return null;
- }
-
- private Iterator<ResultEntry> getResultIterator() throws SQLException {
- if (resultIterator != null) {
- return resultIterator;
- }
-
- Set<ResultEntry> entries = Sets.<ResultEntry>newHashSet(); // TODO: size?
- try {
- for (Tuple result = delegate.next(); result != null; result = delegate.next()) {
- ResultEntry entry = new ResultEntry(result);
- entries.add(entry);
+ while (true) {
+ Tuple nextTuple = this.targetAggregatingResultIterator.next();
+ if (nextTuple == null) {
+ return null;
+ }
+ ResultEntry resultEntry = new ResultEntry(nextTuple);
+ if (!this.resultEntries.contains(resultEntry)) {
+ this.resultEntries.add(resultEntry);
+ return nextTuple;
}
- } finally {
- delegate.close();
}
-
- resultIterator = entries.iterator();
- return resultIterator;
}
@Override
- public void close() {
- resultIterator = Collections.emptyIterator();
- }
-
+ public void close() throws SQLException {
+ this.targetAggregatingResultIterator.close();
+ }
@Override
public void explain(List<String> planSteps) {
- delegate.explain(planSteps);
+ targetAggregatingResultIterator.explain(planSteps);
planSteps.add("CLIENT DISTINCT ON " + rowProjector.toString());
}
@Override
public void explain(List<String> planSteps,
ExplainPlanAttributesBuilder explainPlanAttributesBuilder) {
- delegate.explain(planSteps, explainPlanAttributesBuilder);
+ targetAggregatingResultIterator.explain(
+ planSteps,
+ explainPlanAttributesBuilder);
explainPlanAttributesBuilder.setClientDistinctFilter(
rowProjector.toString());
planSteps.add("CLIENT DISTINCT ON " + rowProjector.toString());
@@ -170,13 +148,12 @@ public class DistinctAggregatingResultIterator implements AggregatingResultItera
@Override
public Aggregator[] aggregate(Tuple result) {
- return delegate.aggregate(result);
+ return targetAggregatingResultIterator.aggregate(result);
}
- @Override
- public String toString() {
- return "DistinctAggregatingResultIterator [delegate=" + delegate
- + ", rowProjector=" + rowProjector + ", resultIterator="
- + resultIterator + ", ptr1=" + ptr1 + ", ptr2=" + ptr2 + "]";
- }
+ @Override
+ public String toString() {
+ return "DistinctAggregatingResultIterator [targetAggregatingResultIterator=" + targetAggregatingResultIterator
+ + ", rowProjector=" + rowProjector;
+ }
}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/iterate/DistinctAggregatingResultIteratorTest.java b/phoenix-core/src/test/java/org/apache/phoenix/iterate/DistinctAggregatingResultIteratorTest.java
new file mode 100644
index 0000000..65b7382
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/iterate/DistinctAggregatingResultIteratorTest.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.compile.ColumnProjector;
+import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.expression.KeyValueColumnExpression;
+import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.schema.types.PInteger;
+import org.apache.phoenix.util.AssertResults;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class DistinctAggregatingResultIteratorTest {
+ private final static byte[] cf = Bytes.toBytes("cf");
+ private final static byte[] cq1 = Bytes.toBytes("cq1");
+ private final static byte[] cq2 = Bytes.toBytes("cq2");
+ private final static byte[] cq3 = Bytes.toBytes("cq3");
+ private final static byte[] rowKey1 = Bytes.toBytes("rowKey1");
+ private final static byte[] rowKey2 = Bytes.toBytes("rowKey2");
+ private final static byte[] rowKey3 = Bytes.toBytes("rowKey3");
+ private final static byte[] rowKey4 = Bytes.toBytes("rowKey4");
+ private final static byte[] rowKey5 = Bytes.toBytes("rowKey4");
+
+ @Test
+ public void testDistinctAggregatingResultIterator() throws Throwable {
+ //Test with duplicate
+ Tuple[] input1 = new Tuple[] {
+ new MultiKeyValueTuple(
+ Arrays.<Cell> asList(
+ new KeyValue(rowKey1, cf, cq1, PInteger.INSTANCE.toBytes(1)),
+ new KeyValue(rowKey1, cf, cq2, PInteger.INSTANCE.toBytes(2)))),
+
+ new MultiKeyValueTuple(
+ Arrays.<Cell> asList(
+ new KeyValue(rowKey2, cf, cq1, PInteger.INSTANCE.toBytes(11)),
+ new KeyValue(rowKey2, cf, cq2, PInteger.INSTANCE.toBytes(12)))),
+
+ new MultiKeyValueTuple(
+ Arrays.<Cell> asList(
+ new KeyValue(rowKey3, cf, cq1, PInteger.INSTANCE.toBytes(4)),
+ new KeyValue(rowKey3, cf, cq2, PInteger.INSTANCE.toBytes(2)))),
+
+ new MultiKeyValueTuple(
+ Arrays.<Cell> asList(
+ new KeyValue(rowKey1, cf, cq1, PInteger.INSTANCE.toBytes(1)),
+ new KeyValue(rowKey1, cf, cq2, PInteger.INSTANCE.toBytes(2)))),
+
+ new MultiKeyValueTuple(
+ Arrays.<Cell> asList(
+ new KeyValue(rowKey4, cf, cq1, PInteger.INSTANCE.toBytes(7)),
+ new KeyValue(rowKey4, cf, cq2, PInteger.INSTANCE.toBytes(8)))),
+
+ new MultiKeyValueTuple(
+ Arrays.<Cell> asList(
+ new KeyValue(rowKey2, cf, cq1, PInteger.INSTANCE.toBytes(11)),
+ new KeyValue(rowKey2, cf, cq2, PInteger.INSTANCE.toBytes(12)))),
+
+ new MultiKeyValueTuple(
+ Arrays.<Cell> asList(
+ new KeyValue(rowKey5, cf, cq1, PInteger.INSTANCE.toBytes(90)),
+ new KeyValue(rowKey5, cf, cq2, PInteger.INSTANCE.toBytes(100)))),
+ null
+
+ };
+
+ Tuple[] result1 = new Tuple[] {
+ new MultiKeyValueTuple(
+ Arrays.<Cell> asList(
+ new KeyValue(rowKey1, cf, cq1, PInteger.INSTANCE.toBytes(1)),
+ new KeyValue(rowKey1, cf, cq2, PInteger.INSTANCE.toBytes(2)))),
+
+ new MultiKeyValueTuple(
+ Arrays.<Cell> asList(
+ new KeyValue(rowKey2, cf, cq1, PInteger.INSTANCE.toBytes(11)),
+ new KeyValue(rowKey2, cf, cq2, PInteger.INSTANCE.toBytes(12)))),
+
+ new MultiKeyValueTuple(
+ Arrays.<Cell> asList(
+ new KeyValue(rowKey3, cf, cq1, PInteger.INSTANCE.toBytes(4)),
+ new KeyValue(rowKey3, cf, cq2, PInteger.INSTANCE.toBytes(2)))),
+
+ new MultiKeyValueTuple(
+ Arrays.<Cell> asList(
+ new KeyValue(rowKey4, cf, cq1, PInteger.INSTANCE.toBytes(7)),
+ new KeyValue(rowKey4, cf, cq2, PInteger.INSTANCE.toBytes(8)))),
+
+ new MultiKeyValueTuple(
+ Arrays.<Cell> asList(
+ new KeyValue(rowKey5, cf, cq1, PInteger.INSTANCE.toBytes(90)),
+ new KeyValue(rowKey5, cf, cq2, PInteger.INSTANCE.toBytes(100))))
+
+ };
+ RowProjector mockRowProjector = Mockito.mock(RowProjector.class);
+ Mockito.when(mockRowProjector.getColumnCount()).thenReturn(2);
+
+ KeyValueColumnExpression columnExpression1 = new KeyValueColumnExpression(cf, cq1);
+ KeyValueColumnExpression columnExpression2 = new KeyValueColumnExpression(cf, cq2);
+ final ColumnProjector mockColumnProjector1 = Mockito.mock(ColumnProjector.class);
+ Mockito.when(mockColumnProjector1.getExpression()).thenReturn(columnExpression1);
+ final ColumnProjector mockColumnProjector2 = Mockito.mock(ColumnProjector.class);
+ Mockito.when(mockColumnProjector2.getExpression()).thenReturn(columnExpression2);
+
+ Mockito.when(mockRowProjector.getColumnProjectors()).thenAnswer(
+ new Answer<List<ColumnProjector> >() {
+ @Override
+ public List<ColumnProjector> answer(InvocationOnMock invocation) throws Throwable {
+ return Arrays.asList(mockColumnProjector1,mockColumnProjector2);
+ }
+ });
+
+ assertResults(
+ input1, result1, mockRowProjector);
+
+ //Test with duplicate and null
+ Tuple[] input2 = new Tuple[] {
+ new MultiKeyValueTuple(
+ Arrays.<Cell> asList(
+ new KeyValue(rowKey1, cf, cq1, PInteger.INSTANCE.toBytes(1)),
+ new KeyValue(rowKey1, cf, cq2, PInteger.INSTANCE.toBytes(2)))),
+
+ new MultiKeyValueTuple(
+ Arrays.<Cell> asList(
+ new KeyValue(rowKey2, cf, cq1, PInteger.INSTANCE.toBytes(11)),
+ new KeyValue(rowKey2, cf, cq2, PInteger.INSTANCE.toBytes(12)))),
+
+ new MultiKeyValueTuple(
+ Arrays.<Cell> asList(
+ new KeyValue(rowKey3, cf, cq1, PInteger.INSTANCE.toBytes(4)),
+ new KeyValue(rowKey3, cf, cq2, PInteger.INSTANCE.toBytes(2)))),
+
+ new MultiKeyValueTuple(
+ Arrays.<Cell> asList(
+ new KeyValue(rowKey1, cf, cq2, PInteger.INSTANCE.toBytes(1)),
+ new KeyValue(rowKey1, cf, cq1, PInteger.INSTANCE.toBytes(2)))),
+
+ new MultiKeyValueTuple(
+ Arrays.<Cell> asList(
+ new KeyValue(rowKey4, cf, cq1, PInteger.INSTANCE.toBytes(7)),
+ new KeyValue(rowKey4, cf, cq2, PInteger.INSTANCE.toBytes(8)))),
+
+ new MultiKeyValueTuple(
+ Arrays.<Cell> asList(
+ new KeyValue(rowKey2, cf, cq1, PInteger.INSTANCE.toBytes(11)),
+ new KeyValue(rowKey2, cf, cq3, PInteger.INSTANCE.toBytes(12)))),
+
+ new MultiKeyValueTuple(
+ Arrays.<Cell> asList(
+ new KeyValue(rowKey1, cf, cq2, PInteger.INSTANCE.toBytes(1)),
+ new KeyValue(rowKey1, cf, cq1, PInteger.INSTANCE.toBytes(2)))),
+
+ new MultiKeyValueTuple(
+ Arrays.<Cell> asList(
+ new KeyValue(rowKey5, cf, cq1, PInteger.INSTANCE.toBytes(90)),
+ new KeyValue(rowKey5, cf, cq2, PInteger.INSTANCE.toBytes(100)))),
+
+ new MultiKeyValueTuple(
+ Arrays.<Cell> asList(
+ new KeyValue(rowKey2, cf, cq1, PInteger.INSTANCE.toBytes(11)),
+ new KeyValue(rowKey2, cf, cq3, PInteger.INSTANCE.toBytes(12)))),
+
+ null
+
+ };
+
+ Tuple[] result2 = new Tuple[] {
+ new MultiKeyValueTuple(
+ Arrays.<Cell> asList(
+ new KeyValue(rowKey1, cf, cq1, PInteger.INSTANCE.toBytes(1)),
+ new KeyValue(rowKey1, cf, cq2, PInteger.INSTANCE.toBytes(2)))),
+
+ new MultiKeyValueTuple(
+ Arrays.<Cell> asList(
+ new KeyValue(rowKey2, cf, cq1, PInteger.INSTANCE.toBytes(11)),
+ new KeyValue(rowKey2, cf, cq2, PInteger.INSTANCE.toBytes(12)))),
+
+ new MultiKeyValueTuple(
+ Arrays.<Cell> asList(
+ new KeyValue(rowKey3, cf, cq1, PInteger.INSTANCE.toBytes(4)),
+ new KeyValue(rowKey3, cf, cq2, PInteger.INSTANCE.toBytes(2)))),
+
+ new MultiKeyValueTuple(
+ Arrays.<Cell> asList(
+ new KeyValue(rowKey1, cf, cq2, PInteger.INSTANCE.toBytes(1)),
+ new KeyValue(rowKey1, cf, cq1, PInteger.INSTANCE.toBytes(2)))),
+
+ new MultiKeyValueTuple(
+ Arrays.<Cell> asList(
+ new KeyValue(rowKey4, cf, cq1, PInteger.INSTANCE.toBytes(7)),
+ new KeyValue(rowKey4, cf, cq2, PInteger.INSTANCE.toBytes(8)))),
+
+ new MultiKeyValueTuple(
+ Arrays.<Cell> asList(
+ new KeyValue(rowKey2, cf, cq1, PInteger.INSTANCE.toBytes(11)),
+ new KeyValue(rowKey2, cf, cq3, PInteger.INSTANCE.toBytes(12)))),
+
+ new MultiKeyValueTuple(
+ Arrays.<Cell> asList(
+ new KeyValue(rowKey5, cf, cq1, PInteger.INSTANCE.toBytes(90)),
+ new KeyValue(rowKey5, cf, cq2, PInteger.INSTANCE.toBytes(100))))
+
+ };
+ assertResults(
+ input2, result2, mockRowProjector);
+
+ //Test with no duplicate
+ int n = 100;
+ Tuple[] input3 = new Tuple[n + 1];
+ for(int i = 0; i <= n; i++) {
+ byte[] rowKey = PInteger.INSTANCE.toBytes(i);
+ input3[i] = new MultiKeyValueTuple(
+ Arrays.<Cell> asList(
+ new KeyValue(rowKey, cf, cq1, PInteger.INSTANCE.toBytes(i + 1)),
+ new KeyValue(rowKey, cf, cq2, PInteger.INSTANCE.toBytes(i + 2))));
+ }
+ input3[n] = null;
+ Tuple[] result3 = Arrays.copyOfRange(input3, 0, n);
+ assertResults(
+ input3, result3, mockRowProjector);
+
+ //Test with all duplicate
+ Tuple[] input4 = new Tuple[n + 1];
+ for(int i = 0; i <= n; i++) {
+ byte[] rowKey = PInteger.INSTANCE.toBytes(1);
+ input4[i] = new MultiKeyValueTuple(
+ Arrays.<Cell> asList(
+ new KeyValue(rowKey, cf, cq1, PInteger.INSTANCE.toBytes(2)),
+ new KeyValue(rowKey, cf, cq2, PInteger.INSTANCE.toBytes(3))));
+ }
+ input4[n] = null;
+ Tuple[] result4 = new Tuple[] {input4[0]};
+ assertResults(
+ input4, result4, mockRowProjector);
+
+ }
+
+ private void assertResults(Tuple[] input, Tuple[] result, RowProjector rowProjector) throws Exception {
+ AggregatingResultIterator mockAggregatingResultIterator =
+ Mockito.mock(AggregatingResultIterator.class);
+ Mockito.when(mockAggregatingResultIterator.next()).thenReturn(
+ input[0], Arrays.copyOfRange(input, 1, input.length));
+
+ DistinctAggregatingResultIterator distinctAggregatingResultIterator =
+ new DistinctAggregatingResultIterator(mockAggregatingResultIterator, rowProjector);
+ AssertResults.assertResults(
+ distinctAggregatingResultIterator, result);
+ }
+
+}