You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by if...@apache.org on 2020/04/27 19:29:26 UTC

[cassandra-in-jvm-dtest-api] 01/02: In-jvm dtest IInstance and ICoordinator should use QueryResult as the base API

This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/cassandra-in-jvm-dtest-api.git

commit cc3e43c710a0fb683b7e955f641e221ccc2e5d54
Author: David Capwell <dc...@gmail.com>
AuthorDate: Wed Apr 22 19:14:24 2020 -0700

    In-jvm dtest IInstance and ICoordinator should use QueryResult as the base API
    
    Patch by David Capwell; reviewed by Alex Petrov for CASSANDRA-15756.
---
 pom.xml                                            |  23 +++
 .../cassandra/distributed/api/ICoordinator.java    |  28 ++-
 .../cassandra/distributed/api/IInstance.java       |   7 +-
 .../cassandra/distributed/api/NodeToolResult.java  |   3 +-
 .../cassandra/distributed/api/QueryResult.java     |  91 ++-------
 .../cassandra/distributed/api/QueryResults.java    | 204 +++++++++++++++++++++
 .../org/apache/cassandra/distributed/api/Row.java  | 116 ++++++++++--
 .../{QueryResult.java => SimpleQueryResult.java}   |  46 +++--
 .../cassandra/distributed/shared/AssertUtils.java  |  37 ++++
 .../cassandra/distributed/shared/FutureUtils.java  |  95 ++++++++++
 .../cassandra/distributed/api/QueryResultTest.java | 198 ++++++++++++++++++++
 11 files changed, 740 insertions(+), 108 deletions(-)

diff --git a/pom.xml b/pom.xml
index df733c6..62c90d8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -50,6 +50,25 @@
             <artifactId>slf4j-api</artifactId>
             <version>1.7.25</version>
         </dependency>
+
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-api</artifactId>
+            <version>5.6.2</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-engine</artifactId>
+            <version>5.6.2</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.assertj</groupId>
+            <artifactId>assertj-core</artifactId>
+            <version>3.15.0</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
@@ -65,6 +84,10 @@
                         </excludes>
                     </configuration>
                 </plugin>
+                <plugin>
+                    <artifactId>maven-surefire-plugin</artifactId>
+                    <version>2.22.2</version>
+                </plugin>
             </plugins>
         </pluginManagement>
         <plugins>
diff --git a/src/main/java/org/apache/cassandra/distributed/api/ICoordinator.java b/src/main/java/org/apache/cassandra/distributed/api/ICoordinator.java
index 34087d0..3a96b63 100644
--- a/src/main/java/org/apache/cassandra/distributed/api/ICoordinator.java
+++ b/src/main/java/org/apache/cassandra/distributed/api/ICoordinator.java
@@ -22,6 +22,8 @@ import java.util.Iterator;
 import java.util.UUID;
 import java.util.concurrent.Future;
 
+import org.apache.cassandra.distributed.shared.FutureUtils;
+
 // The cross-version API requires that a Coordinator can be constructed without any constructor arguments
 public interface ICoordinator
 {
@@ -30,13 +32,31 @@ public interface ICoordinator
         return executeWithResult(query, consistencyLevel, boundValues).toObjectArrays();
     }
 
-    QueryResult executeWithResult(String query, ConsistencyLevel consistencyLevel, Object... boundValues);
+    SimpleQueryResult executeWithResult(String query, ConsistencyLevel consistencyLevel, Object... boundValues);
+
+    default Iterator<Object[]> executeWithPaging(String query, ConsistencyLevel consistencyLevel, int pageSize, Object... boundValues)
+    {
+        return executeWithPagingWithResult(query, consistencyLevel, pageSize, boundValues).map(Row::toObjectArray);
+    }
+
+    QueryResult executeWithPagingWithResult(String query, ConsistencyLevel consistencyLevel, int pageSize, Object... boundValues);
+
+    default Future<Object[][]> asyncExecuteWithTracing(UUID sessionId, String query, ConsistencyLevel consistencyLevel, Object... boundValues)
+    {
+        return FutureUtils.map(asyncExecuteWithTracingWithResult(sessionId, query, consistencyLevel, boundValues), r -> r.toObjectArrays());
+    }
 
-    Iterator<Object[]> executeWithPaging(String query, ConsistencyLevel consistencyLevel, int pageSize, Object... boundValues);
+    Future<SimpleQueryResult> asyncExecuteWithTracingWithResult(UUID sessionId, String query, ConsistencyLevel consistencyLevel, Object... boundValues);
 
-    Future<Object[][]> asyncExecuteWithTracing(UUID sessionId, String query, ConsistencyLevel consistencyLevel, Object... boundValues);
+    default Object[][] executeWithTracing(UUID sessionId, String query, ConsistencyLevel consistencyLevel, Object... boundValues)
+    {
+        return executeWithTracingWithResult(sessionId, query, consistencyLevel, boundValues).toObjectArrays();
+    }
 
-    Object[][] executeWithTracing(UUID sessionId, String query, ConsistencyLevel consistencyLevel, Object... boundValues);
+    default SimpleQueryResult executeWithTracingWithResult(UUID sessionId, String query, ConsistencyLevel consistencyLevel, Object... boundValues)
+    {
+        return FutureUtils.waitOn(asyncExecuteWithTracingWithResult(sessionId, query, consistencyLevel, boundValues));
+    }
 
     IInstance instance();
 }
diff --git a/src/main/java/org/apache/cassandra/distributed/api/IInstance.java b/src/main/java/org/apache/cassandra/distributed/api/IInstance.java
index 90c8242..8895d94 100644
--- a/src/main/java/org/apache/cassandra/distributed/api/IInstance.java
+++ b/src/main/java/org/apache/cassandra/distributed/api/IInstance.java
@@ -31,7 +31,12 @@ public interface IInstance extends IIsolatedExecutor
 
     void schemaChangeInternal(String query);
 
-    public Object[][] executeInternal(String query, Object... args);
+    default Object[][] executeInternal(String query, Object... args)
+    {
+        return executeInternalWithResult(query, args).toObjectArrays();
+    }
+
+    SimpleQueryResult executeInternalWithResult(String query, Object... args);
 
     IInstanceConfig config();
 
diff --git a/src/main/java/org/apache/cassandra/distributed/api/NodeToolResult.java b/src/main/java/org/apache/cassandra/distributed/api/NodeToolResult.java
index 2e5c5f0..bdd75b5 100644
--- a/src/main/java/org/apache/cassandra/distributed/api/NodeToolResult.java
+++ b/src/main/java/org/apache/cassandra/distributed/api/NodeToolResult.java
@@ -147,7 +147,8 @@ public class NodeToolResult
         }
     }
 
-    private static String getStackTraceAsString(Throwable throwable) {
+    private static String getStackTraceAsString(Throwable throwable)
+    {
         StringWriter stringWriter = new StringWriter();
         throwable.printStackTrace(new PrintWriter(stringWriter));
         return stringWriter.toString();
diff --git a/src/main/java/org/apache/cassandra/distributed/api/QueryResult.java b/src/main/java/org/apache/cassandra/distributed/api/QueryResult.java
index e72d33e..9281794 100644
--- a/src/main/java/org/apache/cassandra/distributed/api/QueryResult.java
+++ b/src/main/java/org/apache/cassandra/distributed/api/QueryResult.java
@@ -15,12 +15,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.cassandra.distributed.api;
 
 import java.util.Iterator;
-import java.util.NoSuchElementException;
-import java.util.Objects;
+import java.util.List;
+import java.util.function.Function;
 import java.util.function.Predicate;
 
 /**
@@ -52,88 +51,34 @@ import java.util.function.Predicate;
  * }</code>
  * <p>
  * Both cases have the same issue; reference to a row from a previous call to {@link #hasNext()}.  Since the same {@link Row}
- * object can be used accross different calls to {@link #hasNext()} this would mean any attempt to access after the fact
+ * object can be used across different calls to {@link #hasNext()} this would mean any attempt to access after the fact
  * points to newer data.  If this behavior is not desirable and access is needed between calls, then {@link Row#copy()}
  * should be used; this will clone the {@link Row} and return a new object pointing to the same data.
  */
-public class QueryResult implements Iterator<Row>
+public interface QueryResult extends Iterator<Row>
 {
-    public static final QueryResult EMPTY = new QueryResult(new String[0], null);
-
-    private final String[] names;
-    private final Object[][] results;
-    private final Predicate<Row> filter;
-    private final Row row;
-    private int offset = -1;
-
-    public QueryResult(String[] names, Object[][] results)
-    {
-        this.names = Objects.requireNonNull(names, "names");
-        this.results = results;
-        this.row = new Row(names);
-        this.filter = ignore -> true;
-    }
-
-    private QueryResult(String[] names, Object[][] results, Predicate<Row> filter, int offset)
-    {
-        this.names = names;
-        this.results = results;
-        this.filter = filter;
-        this.offset = offset;
-        this.row = new Row(names);
-    }
-
-    public String[] getNames()
-    {
-        return names;
-    }
-
-    public boolean isEmpty()
-    {
-        return results.length == 0;
-    }
-
-    public int size()
-    {
-        return results.length;
-    }
-
-    public QueryResult filter(Predicate<Row> fn)
-    {
-        return new QueryResult(names, results, filter.and(fn), offset);
-    }
+    List<String> names();
 
-    /**
-     * Get all rows as a 2d array.  Any calls to {@link #filter(Predicate)} will be ignored and the array returned will
-     * be the full set from the query.
-     */
-    public Object[][] toObjectArrays()
+    default QueryResult filter(Predicate<Row> fn)
     {
-        return results;
+        return QueryResults.filter(this, fn);
     }
 
-    @Override
-    public boolean hasNext()
+    default <A> Iterator<A> map(Function<? super Row, ? extends A> fn)
     {
-        if (results == null)
-            return false;
-        while ((offset += 1) < results.length)
+        return new Iterator<A>()
         {
-            row.setResults(results[offset]);
-            if (filter.test(row))
+            @Override
+            public boolean hasNext()
             {
-                return true;
+                return QueryResult.this.hasNext();
             }
-        }
-        row.setResults(null);
-        return false;
-    }
 
-    @Override
-    public Row next()
-    {
-        if (offset < 0 || offset >= results.length)
-            throw new NoSuchElementException();
-        return row;
+            @Override
+            public A next()
+            {
+                return fn.apply(QueryResult.this.next());
+            }
+        };
     }
 }
diff --git a/src/main/java/org/apache/cassandra/distributed/api/QueryResults.java b/src/main/java/org/apache/cassandra/distributed/api/QueryResults.java
new file mode 100644
index 0000000..80202eb
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/api/QueryResults.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.distributed.api;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.function.Predicate;
+
+public final class QueryResults
+{
+    private static final SimpleQueryResult EMPTY = new SimpleQueryResult(new String[0], null);
+
+    private QueryResults() {}
+
+    public static SimpleQueryResult empty()
+    {
+        return EMPTY;
+    }
+
+    public static QueryResult fromIterator(String[] names, Iterator<Row> iterator)
+    {
+        Objects.requireNonNull(names, "names");
+        Objects.requireNonNull(iterator, "iterator");
+        return new IteratorQueryResult(names, iterator);
+    }
+
+    public static QueryResult fromObjectArrayIterator(String[] names, Iterator<Object[]> iterator)
+    {
+        Row row = new Row(names);
+        return fromIterator(names, new Iterator<Row>()
+        {
+            @Override
+            public boolean hasNext()
+            {
+                return iterator.hasNext();
+            }
+
+            @Override
+            public Row next()
+            {
+                row.setResults(iterator.next());
+                return row;
+            }
+        });
+    }
+
+    public static QueryResult filter(QueryResult result, Predicate<Row> fn)
+    {
+        return new FilterQueryResult(result, fn);
+    }
+
+    public static Builder builder()
+    {
+        return new Builder();
+    }
+
+    public static final class Builder
+    {
+        private static final int UNSET = -1;
+
+        private int numColumns = UNSET;
+        private String[] names;
+        private List<Object[]> results = new ArrayList<>();
+
+        public Builder columns(String... columns)
+        {
+            if (columns != null)
+            {
+                if (numColumns == UNSET)
+                    numColumns = columns.length;
+
+                if (numColumns != columns.length)
+                    throw new AssertionError("Attempted to add column names with different column count; " +
+                                             "expected " + numColumns + " columns but given " + Arrays.toString(columns));
+            }
+
+            names = columns;
+            return this;
+        }
+
+        public Builder row(Object... values)
+        {
+            if (numColumns == UNSET)
+                numColumns = values.length;
+
+            if (numColumns != values.length)
+                throw new AssertionError("Attempted to add row with different column count; " +
+                                         "expected " + numColumns + " columns but given " + Arrays.toString(values));
+            results.add(values);
+            return this;
+        }
+
+        public SimpleQueryResult build()
+        {
+            if (names == null)
+            {
+                if (numColumns == UNSET)
+                    return QueryResults.empty();
+                names = new String[numColumns];
+                for (int i = 0; i < numColumns; i++)
+                    names[i] = "unknown";
+            }
+            return new SimpleQueryResult(names, results.stream().toArray(Object[][]::new));
+        }
+    }
+
+    private static final class IteratorQueryResult implements QueryResult
+    {
+        private final List<String> names;
+        private final Iterator<Row> iterator;
+
+        private IteratorQueryResult(String[] names, Iterator<Row> iterator)
+        {
+            this(Collections.unmodifiableList(Arrays.asList(names)), iterator);
+        }
+
+        private IteratorQueryResult(List<String> names, Iterator<Row> iterator)
+        {
+            this.names = names;
+            this.iterator = iterator;
+        }
+
+        @Override
+        public List<String> names()
+        {
+            return names;
+        }
+
+        @Override
+        public boolean hasNext()
+        {
+            return iterator.hasNext();
+        }
+
+        @Override
+        public Row next()
+        {
+            return iterator.next();
+        }
+    }
+
+    private static final class FilterQueryResult implements QueryResult
+    {
+        private final QueryResult delegate;
+        private final Predicate<Row> filter;
+        private Row current;
+
+        private FilterQueryResult(QueryResult delegate, Predicate<Row> filter)
+        {
+            this.delegate = delegate;
+            this.filter = filter;
+        }
+
+        @Override
+        public List<String> names()
+        {
+            return delegate.names();
+        }
+
+        @Override
+        public boolean hasNext()
+        {
+            while (delegate.hasNext())
+            {
+                Row row = delegate.next();
+                if (filter.test(row))
+                {
+                    current = row;
+                    return true;
+                }
+            }
+            current = null;
+            return false;
+        }
+
+        @Override
+        public Row next()
+        {
+            if (current == null)
+                throw new NoSuchElementException();
+            return current;
+        }
+    }
+}
diff --git a/src/main/java/org/apache/cassandra/distributed/api/Row.java b/src/main/java/org/apache/cassandra/distributed/api/Row.java
index 530edc1..ff4efbe 100644
--- a/src/main/java/org/apache/cassandra/distributed/api/Row.java
+++ b/src/main/java/org/apache/cassandra/distributed/api/Row.java
@@ -19,8 +19,10 @@
 package org.apache.cassandra.distributed.api;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Objects;
@@ -30,27 +32,33 @@ import java.util.UUID;
 /**
  * Data representing a single row in a query result.
  * <p>
- * This class is mutable from the parent {@link QueryResult} and can have the row it points to changed between calls
- * to {@link QueryResult#hasNext()}, for this reason it is unsafe to hold reference to this class after that call;
+ * This class is mutable from the parent {@link SimpleQueryResult} and can have the row it points to changed between calls
+ * to {@link SimpleQueryResult#hasNext()}, for this reason it is unsafe to hold reference to this class after that call;
  * to get around this, a call to {@link #copy()} will return a new object pointing to the same row.
  */
 public class Row
 {
+    private static final int NOT_FOUND = -1;
+
+    private final String[] names;
     private final Map<String, Integer> nameIndex;
     private Object[] results; // mutable to avoid allocations in loops
 
     public Row(String[] names)
     {
         Objects.requireNonNull(names, "names");
+        this.names = names;
         this.nameIndex = new HashMap<>(names.length);
         for (int i = 0; i < names.length; i++)
         {
-            nameIndex.put(names[i], i);
+            // if duplicate names, always index by the first one seen
+            nameIndex.putIfAbsent(names[i], i);
         }
     }
 
-    private Row(Map<String, Integer> nameIndex)
+    private Row(String[] names, Map<String, Integer> nameIndex)
     {
+        this.names = names;
         this.nameIndex = nameIndex;
     }
 
@@ -60,49 +68,135 @@ public class Row
     }
 
     /**
-     * Creates a copy of the current row; can be used past calls to {@link QueryResult#hasNext()}.
+     * Creates a copy of the current row; can be used past calls to {@link SimpleQueryResult#hasNext()}.
      */
     public Row copy()
     {
-        Row copy = new Row(nameIndex);
+        Row copy = new Row(names, nameIndex);
         copy.setResults(results);
         return copy;
     }
 
+    public <T> T get(int index)
+    {
+        checkAccess();
+        if (index < 0 || index >= results.length)
+            throw new NoSuchElementException("by index: " + index);
+        return (T) results[index];
+    }
+
     public <T> T get(String name)
     {
         checkAccess();
         int idx = findIndex(name);
-        if (idx == -1)
-            return null;
+        if (idx == NOT_FOUND)
+            throw new NoSuchElementException("by name: " + name);
         return (T) results[idx];
     }
 
+    public Short getShort(int index)
+    {
+        return get(index);
+    }
+
+    public Short getShort(String name)
+    {
+        return get(name);
+    }
+
+    public Integer getInteger(int index)
+    {
+        return get(index);
+    }
+
+    public Integer getInteger(String name)
+    {
+        return get(name);
+    }
+
+    public Long getLong(int index)
+    {
+        return get(index);
+    }
+
+    public Long getLong(String name)
+    {
+        return get(name);
+    }
+
+    public Float getFloat(int index)
+    {
+        return get(index);
+    }
+
+    public Float getFloat(String name)
+    {
+        return get(name);
+    }
+
+    public Double getDouble(int index)
+    {
+        return get(index);
+    }
+
+    public Double getDouble(String name)
+    {
+        return get(name);
+    }
+
+    public String getString(int index)
+    {
+        return get(index);
+    }
+
     public String getString(String name)
     {
         return get(name);
     }
 
+    public UUID getUUID(int index)
+    {
+        return get(index);
+    }
+
     public UUID getUUID(String name)
     {
         return get(name);
     }
 
+    public Date getTimestamp(int index)
+    {
+        return get(index);
+    }
+
     public Date getTimestamp(String name)
     {
         return get(name);
     }
 
+    public <T> Set<T> getSet(int index)
+    {
+        return get(index);
+    }
+
     public <T> Set<T> getSet(String name)
     {
         return get(name);
     }
 
+    /**
+     * Get the row as a array.
+     */
+    public Object[] toObjectArray()
+    {
+        return results;
+    }
+
     public String toString()
     {
         return "Row{" +
-               "names=" + nameIndex.keySet() +
-               ", results=" + Arrays.toString(results) +
+               "names=" + Arrays.toString(names) +
+               ", results=" + (results == null ? "[]" : Arrays.toString(results)) +
                '}';
     }
 
@@ -114,6 +208,6 @@ public class Row
 
     private int findIndex(String name)
     {
-        return nameIndex.getOrDefault(name, -1);
+        return nameIndex.getOrDefault(name, NOT_FOUND);
     }
 }
diff --git a/src/main/java/org/apache/cassandra/distributed/api/QueryResult.java b/src/main/java/org/apache/cassandra/distributed/api/SimpleQueryResult.java
similarity index 76%
copy from src/main/java/org/apache/cassandra/distributed/api/QueryResult.java
copy to src/main/java/org/apache/cassandra/distributed/api/SimpleQueryResult.java
index e72d33e..a44411d 100644
--- a/src/main/java/org/apache/cassandra/distributed/api/QueryResult.java
+++ b/src/main/java/org/apache/cassandra/distributed/api/SimpleQueryResult.java
@@ -15,13 +15,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.cassandra.distributed.api;
 
-import java.util.Iterator;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.Objects;
 import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * A table of data representing a complete query result.
@@ -56,17 +59,15 @@ import java.util.function.Predicate;
  * points to newer data.  If this behavior is not desirable and access is needed between calls, then {@link Row#copy()}
  * should be used; this will clone the {@link Row} and return a new object pointing to the same data.
  */
-public class QueryResult implements Iterator<Row>
+public class SimpleQueryResult implements QueryResult
 {
-    public static final QueryResult EMPTY = new QueryResult(new String[0], null);
-
     private final String[] names;
     private final Object[][] results;
     private final Predicate<Row> filter;
     private final Row row;
     private int offset = -1;
 
-    public QueryResult(String[] names, Object[][] results)
+    public SimpleQueryResult(String[] names, Object[][] results)
     {
         this.names = Objects.requireNonNull(names, "names");
         this.results = results;
@@ -74,7 +75,7 @@ public class QueryResult implements Iterator<Row>
         this.filter = ignore -> true;
     }
 
-    private QueryResult(String[] names, Object[][] results, Predicate<Row> filter, int offset)
+    private SimpleQueryResult(String[] names, Object[][] results, Predicate<Row> filter, int offset)
     {
         this.names = names;
         this.results = results;
@@ -83,24 +84,23 @@ public class QueryResult implements Iterator<Row>
         this.row = new Row(names);
     }
 
-    public String[] getNames()
-    {
-        return names;
-    }
-
-    public boolean isEmpty()
+    public List<String> names()
     {
-        return results.length == 0;
+        return Collections.unmodifiableList(Arrays.asList(names));
     }
 
-    public int size()
+    public SimpleQueryResult filter(Predicate<Row> fn)
     {
-        return results.length;
+        return new SimpleQueryResult(names, results, filter.and(fn), offset);
     }
 
-    public QueryResult filter(Predicate<Row> fn)
+    /**
+     * Reset the cursor to the start of the query result; if the query result has not been iterated, this has no effect.
+     */
+    public void reset()
     {
-        return new QueryResult(names, results, filter.and(fn), offset);
+        offset = -1;
+        row.setResults(null);
     }
 
     /**
@@ -132,8 +132,18 @@ public class QueryResult implements Iterator<Row>
     @Override
     public Row next()
     {
+        // no null check needed for results since offset only increments IFF results is not null
         if (offset < 0 || offset >= results.length)
             throw new NoSuchElementException();
         return row;
     }
+
+    @Override
+    public String toString() {
+        if (results == null)
+            return "[]";
+        return Stream.of(results)
+                .map(Arrays::toString)
+                .collect(Collectors.joining(",", "[", "]"));
+    }
 }
diff --git a/src/main/java/org/apache/cassandra/distributed/shared/AssertUtils.java b/src/main/java/org/apache/cassandra/distributed/shared/AssertUtils.java
index 8e6254a..a388af3 100644
--- a/src/main/java/org/apache/cassandra/distributed/shared/AssertUtils.java
+++ b/src/main/java/org/apache/cassandra/distributed/shared/AssertUtils.java
@@ -18,6 +18,10 @@
 
 package org.apache.cassandra.distributed.shared;
 
+import org.apache.cassandra.distributed.api.QueryResult;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.distributed.api.Row;
+
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Iterator;
@@ -26,6 +30,34 @@ import java.util.List;
 public class AssertUtils
 {
 
+    public static void assertRows(QueryResult actual, QueryResult expected)
+    {
+        if (actual instanceof SimpleQueryResult && expected instanceof SimpleQueryResult)
+        {
+            assertRows((SimpleQueryResult) actual, (SimpleQueryResult) expected);
+        }
+        else
+        {
+            assertRows(actual.map(Row::toObjectArray), expected.map(Row::toObjectArray));
+        }
+    }
+
+    public static void assertRows(SimpleQueryResult actual, SimpleQueryResult expected)
+    {
+        while (actual.hasNext()) {
+            if (!expected.hasNext())
+                throw new AssertionError(rowsNotEqualErrorMessage(actual, expected));
+
+            Row next = actual.next();
+            Row exectedRow = expected.next();
+
+            assertTrue(rowsNotEqualErrorMessage(actual, expected),
+                    Arrays.equals(next.toObjectArray(), exectedRow.toObjectArray()));
+        }
+        if (expected.hasNext())
+            throw new AssertionError(rowsNotEqualErrorMessage(actual, expected));
+    }
+
     public static void assertRows(Object[][] actual, Object[]... expected)
     {
         assertEquals(rowsNotEqualErrorMessage(actual, expected),
@@ -82,6 +114,11 @@ public class AssertUtils
                              Arrays.toString(actual));
     }
 
+    public static String rowsNotEqualErrorMessage(SimpleQueryResult actual, SimpleQueryResult expected)
+    {
+        return String.format("Expected: %s\nActual: %s\n", expected, actual);
+    }
+
     public static String rowsNotEqualErrorMessage(Object[][] actual, Object[][] expected)
     {
         return String.format("Expected: %s\nActual: %s\n",
diff --git a/src/main/java/org/apache/cassandra/distributed/shared/FutureUtils.java b/src/main/java/org/apache/cassandra/distributed/shared/FutureUtils.java
new file mode 100644
index 0000000..9f97e8d
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/shared/FutureUtils.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.distributed.shared;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
+
+public final class FutureUtils {
+    private FutureUtils() { }
+
+    public static <T> T waitOn(Future<T> f)
+    {
+        try
+        {
+            return f.get();
+        }
+        catch (InterruptedException e)
+        {
+            Thread.currentThread().interrupt();
+            throw new RuntimeException(e);
+        }
+        catch (ExecutionException e)
+        {
+            Throwable cause = e.getCause();
+            if (cause instanceof Error) throw (Error) cause;
+            if (cause instanceof RuntimeException) throw (RuntimeException) cause;
+            throw new RuntimeException(cause);
+        }
+    }
+
+    public static <A, B> Future<B> map(Future<A> future, Function<? super A, ? extends B> fn) {
+        if (future == null) throw new NullPointerException("Future is null");
+        if (fn == null) throw new NullPointerException("Function is null");
+
+        if (future instanceof CompletableFuture) {
+            return ((CompletableFuture<A>) future).thenApply(fn);
+        }
+        return new MapFuture<>(future, fn);
+    }
+
+    private static final class MapFuture<A, B> implements Future<B>
+    {
+        private final Future<A> parent;
+        private final Function<? super A, ? extends B> fn;
+
+        private MapFuture(Future<A> parent, Function<? super A, ? extends B> fn) {
+            this.parent = parent;
+            this.fn = fn;
+        }
+
+        @Override
+        public boolean cancel(boolean mayInterruptIfRunning) {
+            return parent.cancel(mayInterruptIfRunning);
+        }
+
+        @Override
+        public boolean isCancelled() {
+            return parent.isCancelled();
+        }
+
+        @Override
+        public boolean isDone() {
+            return parent.isDone();
+        }
+
+        @Override
+        public B get() throws InterruptedException, ExecutionException {
+            return fn.apply(parent.get());
+        }
+
+        @Override
+        public B get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+            return fn.apply(parent.get(timeout, unit));
+        }
+    }
+}
diff --git a/src/test/java/org/apache/cassandra/distributed/api/QueryResultTest.java b/src/test/java/org/apache/cassandra/distributed/api/QueryResultTest.java
new file mode 100644
index 0000000..6bef78b
--- /dev/null
+++ b/src/test/java/org/apache/cassandra/distributed/api/QueryResultTest.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.api;
+
+import org.apache.cassandra.distributed.shared.AssertUtils;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+public class QueryResultTest
+{
+    @Test
+    public void empty()
+    {
+        QueryResult result = QueryResults.empty();
+
+        assertThat(result.names()).isEmpty();
+        assertThat(result.toString()).isEqualTo("[]");
+
+        assertThat(result.hasNext()).isFalse();
+        assertThatThrownBy(result::next).isInstanceOf(NoSuchElementException.class);
+
+        QueryResult filtered = result.filter(ignore -> true);
+        assertThat(filtered.hasNext()).isFalse();
+        assertThatThrownBy(filtered::next).isInstanceOf(NoSuchElementException.class);
+
+        Iterator<Object> it = result.map(r -> r.get("undefined"));
+        assertThat(it.hasNext()).isFalse();
+        assertThatThrownBy(it::next).isInstanceOf(NoSuchElementException.class);
+    }
+
+    @Test
+    public void equals()
+    {
+        String[] names = { "fname", "lname"};
+        Object[][] rows = {
+                new Object[] { "david", "capwell"},
+                new Object[] { "alex", "petrov"},
+                new Object[] { "dinesh", "joshi"},
+        };
+        SimpleQueryResult result = new SimpleQueryResult(names, rows);
+        SimpleQueryResult fromBuilder = QueryResults.builder()
+                                                    .columns(names)
+                                                    .row(rows[0])
+                                                    .row(rows[1])
+                                                    .row(rows[2])
+                                                    .build();
+
+        AssertUtils.assertRows(result, fromBuilder);
+    }
+
+    @Test
+    public void notEqualLength()
+    {
+        String[] names = { "fname", "lname"};
+        Object[][] rows = {
+                new Object[] { "david", "capwell"},
+                new Object[] { "alex", "petrov"},
+                new Object[] { "dinesh", "joshi"},
+        };
+        SimpleQueryResult result = new SimpleQueryResult(names, rows);
+        SimpleQueryResult fromBuilder = QueryResults.builder()
+                                                    .columns(names)
+                                                    .row(rows[0])
+                                                    .row(rows[1])
+                                                    .row(rows[2])
+                                                    .row("chris", "lohfink")
+                                                    .build();
+
+        assertThatThrownBy(() -> AssertUtils.assertRows(result, fromBuilder))
+                .isInstanceOf(AssertionError.class)
+                .hasMessageContaining("Expected: ")
+                .hasMessageContaining("Actual: ");
+    }
+
+    @Test
+    public void notEqualColumnLength()
+    {
+        String[] names = { "fname", "lname"};
+        Object[][] rows = {
+                new Object[] { "david", "capwell"},
+                new Object[] { "alex", "petrov"},
+                new Object[] { "dinesh", "joshi"},
+        };
+        SimpleQueryResult result = new SimpleQueryResult(names, rows);
+        SimpleQueryResult fromBuilder = QueryResults.builder()
+                                                    .columns("fname")
+                                                    .row("david")
+                                                    .row("alex")
+                                                    .row("dinesh")
+                                                    .build();
+
+        assertThatThrownBy(() -> AssertUtils.assertRows(result, fromBuilder))
+                .isInstanceOf(AssertionError.class)
+                .hasMessageContaining("Expected: ")
+                .hasMessageContaining("Actual: ");
+    }
+
+    @Test
+    public void notEqualContent()
+    {
+        String[] names = { "fname", "lname"};
+        Object[][] rows = {
+                new Object[] { "david", "capwell"},
+                new Object[] { "alex", "petrov"},
+                new Object[] { "dinesh", "joshi"},
+        };
+        SimpleQueryResult result = new SimpleQueryResult(names, rows);
+        SimpleQueryResult fromBuilder = QueryResults.builder()
+                                                    .columns(names)
+                                                    .row("david", "Capwell")
+                                                    .row("alex", "Petrov")
+                                                    .row("dinesh", "Joshi")
+                                                    .build();
+
+        assertThatThrownBy(() -> AssertUtils.assertRows(result, fromBuilder))
+                .isInstanceOf(AssertionError.class)
+                .hasMessageContaining("Expected: ")
+                .hasMessageContaining("Actual: ");
+    }
+
+    @Test
+    public void completeFilter()
+    {
+        SimpleQueryResult qr = QueryResults.builder()
+                                           .row(1, 2, 3, 4)
+                                           .row(5, 6, 7, 7)
+                                           .row(1, 2, 4, 8)
+                                           .row(2, 4, 6, 12)
+                                           .build();
+
+        SimpleQueryResult filtered = qr.filter(row -> row.getInteger(0).intValue() != 1);
+
+        AssertUtils.assertRows(filtered, QueryResults.builder()
+                .row(5, 6, 7, 7)
+                .row(2, 4, 6, 12)
+                .build());
+    }
+
+    @Test
+    public void completeMap()
+    {
+        SimpleQueryResult qr = QueryResults.builder()
+                                           .row(1, 2, 3, 4)
+                                           .row(5, 6, 7, 7)
+                                           .row(1, 2, 4, 8)
+                                           .row(2, 4, 6, 12)
+                                           .build();
+
+        Iterator<Integer> it = qr.map(r -> r.getInteger(0));
+        List<Integer> result = new ArrayList<>(4);
+        it.forEachRemaining(result::add);
+
+        assertThat(result).isEqualTo(Arrays.asList(1, 5, 1, 2));
+    }
+
+    @Test
+    public void iteratorFilter()
+    {
+        String[] names = {"first"};
+        List<Object[]> values = new ArrayList<>();
+        values.add(new Object[] { "david" });
+        values.add(new Object[] { "alex" });
+        values.add(new Object[] { "dinesh" });
+
+        QueryResult qr = QueryResults.fromObjectArrayIterator(names, values.iterator())
+                .filter(r -> !"david".equals(r.getString("first")))
+                .filter(r -> !"alex".equals(r.getString("first")));
+
+        AssertUtils.assertRows(qr, QueryResults.builder()
+                                               .columns(names)
+                                               .row("dinesh")
+                                               .build());
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org