You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by if...@apache.org on 2020/04/27 19:29:26 UTC
[cassandra-in-jvm-dtest-api] 01/02: In-jvm dtest IInstance and
ICoordinator should use QueryResult as the base API
This is an automated email from the ASF dual-hosted git repository.
ifesdjeen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/cassandra-in-jvm-dtest-api.git
commit cc3e43c710a0fb683b7e955f641e221ccc2e5d54
Author: David Capwell <dc...@gmail.com>
AuthorDate: Wed Apr 22 19:14:24 2020 -0700
In-jvm dtest IInstance and ICoordinator should use QueryResult as the base API
Patch by David Capwell; reviewed by Alex Petrov for CASSANDRA-15756.
---
pom.xml | 23 +++
.../cassandra/distributed/api/ICoordinator.java | 28 ++-
.../cassandra/distributed/api/IInstance.java | 7 +-
.../cassandra/distributed/api/NodeToolResult.java | 3 +-
.../cassandra/distributed/api/QueryResult.java | 91 ++-------
.../cassandra/distributed/api/QueryResults.java | 204 +++++++++++++++++++++
.../org/apache/cassandra/distributed/api/Row.java | 116 ++++++++++--
.../{QueryResult.java => SimpleQueryResult.java} | 46 +++--
.../cassandra/distributed/shared/AssertUtils.java | 37 ++++
.../cassandra/distributed/shared/FutureUtils.java | 95 ++++++++++
.../cassandra/distributed/api/QueryResultTest.java | 198 ++++++++++++++++++++
11 files changed, 740 insertions(+), 108 deletions(-)
diff --git a/pom.xml b/pom.xml
index df733c6..62c90d8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -50,6 +50,25 @@
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
+
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-api</artifactId>
+ <version>5.6.2</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-engine</artifactId>
+ <version>5.6.2</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.assertj</groupId>
+ <artifactId>assertj-core</artifactId>
+ <version>3.15.0</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
@@ -65,6 +84,10 @@
</excludes>
</configuration>
</plugin>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.22.2</version>
+ </plugin>
</plugins>
</pluginManagement>
<plugins>
diff --git a/src/main/java/org/apache/cassandra/distributed/api/ICoordinator.java b/src/main/java/org/apache/cassandra/distributed/api/ICoordinator.java
index 34087d0..3a96b63 100644
--- a/src/main/java/org/apache/cassandra/distributed/api/ICoordinator.java
+++ b/src/main/java/org/apache/cassandra/distributed/api/ICoordinator.java
@@ -22,6 +22,8 @@ import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.Future;
+import org.apache.cassandra.distributed.shared.FutureUtils;
+
// The cross-version API requires that a Coordinator can be constructed without any constructor arguments
public interface ICoordinator
{
@@ -30,13 +32,31 @@ public interface ICoordinator
return executeWithResult(query, consistencyLevel, boundValues).toObjectArrays();
}
- QueryResult executeWithResult(String query, ConsistencyLevel consistencyLevel, Object... boundValues);
+ SimpleQueryResult executeWithResult(String query, ConsistencyLevel consistencyLevel, Object... boundValues);
+
+ default Iterator<Object[]> executeWithPaging(String query, ConsistencyLevel consistencyLevel, int pageSize, Object... boundValues)
+ {
+ return executeWithPagingWithResult(query, consistencyLevel, pageSize, boundValues).map(Row::toObjectArray);
+ }
+
+ QueryResult executeWithPagingWithResult(String query, ConsistencyLevel consistencyLevel, int pageSize, Object... boundValues);
+
+ default Future<Object[][]> asyncExecuteWithTracing(UUID sessionId, String query, ConsistencyLevel consistencyLevel, Object... boundValues)
+ {
+ return FutureUtils.map(asyncExecuteWithTracingWithResult(sessionId, query, consistencyLevel, boundValues), r -> r.toObjectArrays());
+ }
- Iterator<Object[]> executeWithPaging(String query, ConsistencyLevel consistencyLevel, int pageSize, Object... boundValues);
+ Future<SimpleQueryResult> asyncExecuteWithTracingWithResult(UUID sessionId, String query, ConsistencyLevel consistencyLevel, Object... boundValues);
- Future<Object[][]> asyncExecuteWithTracing(UUID sessionId, String query, ConsistencyLevel consistencyLevel, Object... boundValues);
+ default Object[][] executeWithTracing(UUID sessionId, String query, ConsistencyLevel consistencyLevel, Object... boundValues)
+ {
+ return executeWithTracingWithResult(sessionId, query, consistencyLevel, boundValues).toObjectArrays();
+ }
- Object[][] executeWithTracing(UUID sessionId, String query, ConsistencyLevel consistencyLevel, Object... boundValues);
+ default SimpleQueryResult executeWithTracingWithResult(UUID sessionId, String query, ConsistencyLevel consistencyLevel, Object... boundValues)
+ {
+ return FutureUtils.waitOn(asyncExecuteWithTracingWithResult(sessionId, query, consistencyLevel, boundValues));
+ }
IInstance instance();
}
diff --git a/src/main/java/org/apache/cassandra/distributed/api/IInstance.java b/src/main/java/org/apache/cassandra/distributed/api/IInstance.java
index 90c8242..8895d94 100644
--- a/src/main/java/org/apache/cassandra/distributed/api/IInstance.java
+++ b/src/main/java/org/apache/cassandra/distributed/api/IInstance.java
@@ -31,7 +31,12 @@ public interface IInstance extends IIsolatedExecutor
void schemaChangeInternal(String query);
- public Object[][] executeInternal(String query, Object... args);
+ default Object[][] executeInternal(String query, Object... args)
+ {
+ return executeInternalWithResult(query, args).toObjectArrays();
+ }
+
+ SimpleQueryResult executeInternalWithResult(String query, Object... args);
IInstanceConfig config();
diff --git a/src/main/java/org/apache/cassandra/distributed/api/NodeToolResult.java b/src/main/java/org/apache/cassandra/distributed/api/NodeToolResult.java
index 2e5c5f0..bdd75b5 100644
--- a/src/main/java/org/apache/cassandra/distributed/api/NodeToolResult.java
+++ b/src/main/java/org/apache/cassandra/distributed/api/NodeToolResult.java
@@ -147,7 +147,8 @@ public class NodeToolResult
}
}
- private static String getStackTraceAsString(Throwable throwable) {
+ private static String getStackTraceAsString(Throwable throwable)
+ {
StringWriter stringWriter = new StringWriter();
throwable.printStackTrace(new PrintWriter(stringWriter));
return stringWriter.toString();
diff --git a/src/main/java/org/apache/cassandra/distributed/api/QueryResult.java b/src/main/java/org/apache/cassandra/distributed/api/QueryResult.java
index e72d33e..9281794 100644
--- a/src/main/java/org/apache/cassandra/distributed/api/QueryResult.java
+++ b/src/main/java/org/apache/cassandra/distributed/api/QueryResult.java
@@ -15,12 +15,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.cassandra.distributed.api;
import java.util.Iterator;
-import java.util.NoSuchElementException;
-import java.util.Objects;
+import java.util.List;
+import java.util.function.Function;
import java.util.function.Predicate;
/**
@@ -52,88 +51,34 @@ import java.util.function.Predicate;
* }</code>
* <p>
* Both cases have the same issue; reference to a row from a previous call to {@link #hasNext()}. Since the same {@link Row}
- * object can be used accross different calls to {@link #hasNext()} this would mean any attempt to access after the fact
+ * object can be used across different calls to {@link #hasNext()} this would mean any attempt to access after the fact
* points to newer data. If this behavior is not desirable and access is needed between calls, then {@link Row#copy()}
* should be used; this will clone the {@link Row} and return a new object pointing to the same data.
*/
-public class QueryResult implements Iterator<Row>
+public interface QueryResult extends Iterator<Row>
{
- public static final QueryResult EMPTY = new QueryResult(new String[0], null);
-
- private final String[] names;
- private final Object[][] results;
- private final Predicate<Row> filter;
- private final Row row;
- private int offset = -1;
-
- public QueryResult(String[] names, Object[][] results)
- {
- this.names = Objects.requireNonNull(names, "names");
- this.results = results;
- this.row = new Row(names);
- this.filter = ignore -> true;
- }
-
- private QueryResult(String[] names, Object[][] results, Predicate<Row> filter, int offset)
- {
- this.names = names;
- this.results = results;
- this.filter = filter;
- this.offset = offset;
- this.row = new Row(names);
- }
-
- public String[] getNames()
- {
- return names;
- }
-
- public boolean isEmpty()
- {
- return results.length == 0;
- }
-
- public int size()
- {
- return results.length;
- }
-
- public QueryResult filter(Predicate<Row> fn)
- {
- return new QueryResult(names, results, filter.and(fn), offset);
- }
+ List<String> names();
- /**
- * Get all rows as a 2d array. Any calls to {@link #filter(Predicate)} will be ignored and the array returned will
- * be the full set from the query.
- */
- public Object[][] toObjectArrays()
+ default QueryResult filter(Predicate<Row> fn)
{
- return results;
+ return QueryResults.filter(this, fn);
}
- @Override
- public boolean hasNext()
+ default <A> Iterator<A> map(Function<? super Row, ? extends A> fn)
{
- if (results == null)
- return false;
- while ((offset += 1) < results.length)
+ return new Iterator<A>()
{
- row.setResults(results[offset]);
- if (filter.test(row))
+ @Override
+ public boolean hasNext()
{
- return true;
+ return QueryResult.this.hasNext();
}
- }
- row.setResults(null);
- return false;
- }
- @Override
- public Row next()
- {
- if (offset < 0 || offset >= results.length)
- throw new NoSuchElementException();
- return row;
+ @Override
+ public A next()
+ {
+ return fn.apply(QueryResult.this.next());
+ }
+ };
}
}
diff --git a/src/main/java/org/apache/cassandra/distributed/api/QueryResults.java b/src/main/java/org/apache/cassandra/distributed/api/QueryResults.java
new file mode 100644
index 0000000..80202eb
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/api/QueryResults.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.distributed.api;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.function.Predicate;
+
+public final class QueryResults
+{
+ private static final SimpleQueryResult EMPTY = new SimpleQueryResult(new String[0], null);
+
+ private QueryResults() {}
+
+ public static SimpleQueryResult empty()
+ {
+ return EMPTY;
+ }
+
+ public static QueryResult fromIterator(String[] names, Iterator<Row> iterator)
+ {
+ Objects.requireNonNull(names, "names");
+ Objects.requireNonNull(iterator, "iterator");
+ return new IteratorQueryResult(names, iterator);
+ }
+
+ public static QueryResult fromObjectArrayIterator(String[] names, Iterator<Object[]> iterator)
+ {
+ Row row = new Row(names);
+ return fromIterator(names, new Iterator<Row>()
+ {
+ @Override
+ public boolean hasNext()
+ {
+ return iterator.hasNext();
+ }
+
+ @Override
+ public Row next()
+ {
+ row.setResults(iterator.next());
+ return row;
+ }
+ });
+ }
+
+ public static QueryResult filter(QueryResult result, Predicate<Row> fn)
+ {
+ return new FilterQueryResult(result, fn);
+ }
+
+ public static Builder builder()
+ {
+ return new Builder();
+ }
+
+ public static final class Builder
+ {
+ private static final int UNSET = -1;
+
+ private int numColumns = UNSET;
+ private String[] names;
+ private List<Object[]> results = new ArrayList<>();
+
+ public Builder columns(String... columns)
+ {
+ if (columns != null)
+ {
+ if (numColumns == UNSET)
+ numColumns = columns.length;
+
+ if (numColumns != columns.length)
+ throw new AssertionError("Attempted to add column names with different column count; " +
+ "expected " + numColumns + " columns but given " + Arrays.toString(columns));
+ }
+
+ names = columns;
+ return this;
+ }
+
+ public Builder row(Object... values)
+ {
+ if (numColumns == UNSET)
+ numColumns = values.length;
+
+ if (numColumns != values.length)
+ throw new AssertionError("Attempted to add row with different column count; " +
+ "expected " + numColumns + " columns but given " + Arrays.toString(values));
+ results.add(values);
+ return this;
+ }
+
+ public SimpleQueryResult build()
+ {
+ if (names == null)
+ {
+ if (numColumns == UNSET)
+ return QueryResults.empty();
+ names = new String[numColumns];
+ for (int i = 0; i < numColumns; i++)
+ names[i] = "unknown";
+ }
+ return new SimpleQueryResult(names, results.stream().toArray(Object[][]::new));
+ }
+ }
+
+ private static final class IteratorQueryResult implements QueryResult
+ {
+ private final List<String> names;
+ private final Iterator<Row> iterator;
+
+ private IteratorQueryResult(String[] names, Iterator<Row> iterator)
+ {
+ this(Collections.unmodifiableList(Arrays.asList(names)), iterator);
+ }
+
+ private IteratorQueryResult(List<String> names, Iterator<Row> iterator)
+ {
+ this.names = names;
+ this.iterator = iterator;
+ }
+
+ @Override
+ public List<String> names()
+ {
+ return names;
+ }
+
+ @Override
+ public boolean hasNext()
+ {
+ return iterator.hasNext();
+ }
+
+ @Override
+ public Row next()
+ {
+ return iterator.next();
+ }
+ }
+
+ private static final class FilterQueryResult implements QueryResult
+ {
+ private final QueryResult delegate;
+ private final Predicate<Row> filter;
+ private Row current;
+
+ private FilterQueryResult(QueryResult delegate, Predicate<Row> filter)
+ {
+ this.delegate = delegate;
+ this.filter = filter;
+ }
+
+ @Override
+ public List<String> names()
+ {
+ return delegate.names();
+ }
+
+ @Override
+ public boolean hasNext()
+ {
+ while (delegate.hasNext())
+ {
+ Row row = delegate.next();
+ if (filter.test(row))
+ {
+ current = row;
+ return true;
+ }
+ }
+ current = null;
+ return false;
+ }
+
+ @Override
+ public Row next()
+ {
+ if (current == null)
+ throw new NoSuchElementException();
+ return current;
+ }
+ }
+}
diff --git a/src/main/java/org/apache/cassandra/distributed/api/Row.java b/src/main/java/org/apache/cassandra/distributed/api/Row.java
index 530edc1..ff4efbe 100644
--- a/src/main/java/org/apache/cassandra/distributed/api/Row.java
+++ b/src/main/java/org/apache/cassandra/distributed/api/Row.java
@@ -19,8 +19,10 @@
package org.apache.cassandra.distributed.api;
import java.util.Arrays;
+import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
@@ -30,27 +32,33 @@ import java.util.UUID;
/**
* Data representing a single row in a query result.
* <p>
- * This class is mutable from the parent {@link QueryResult} and can have the row it points to changed between calls
- * to {@link QueryResult#hasNext()}, for this reason it is unsafe to hold reference to this class after that call;
+ * This class is mutable from the parent {@link SimpleQueryResult} and can have the row it points to changed between calls
+ * to {@link SimpleQueryResult#hasNext()}, for this reason it is unsafe to hold reference to this class after that call;
* to get around this, a call to {@link #copy()} will return a new object pointing to the same row.
*/
public class Row
{
+ private static final int NOT_FOUND = -1;
+
+ private final String[] names;
private final Map<String, Integer> nameIndex;
private Object[] results; // mutable to avoid allocations in loops
public Row(String[] names)
{
Objects.requireNonNull(names, "names");
+ this.names = names;
this.nameIndex = new HashMap<>(names.length);
for (int i = 0; i < names.length; i++)
{
- nameIndex.put(names[i], i);
+ // if duplicate names, always index by the first one seen
+ nameIndex.putIfAbsent(names[i], i);
}
}
- private Row(Map<String, Integer> nameIndex)
+ private Row(String[] names, Map<String, Integer> nameIndex)
{
+ this.names = names;
this.nameIndex = nameIndex;
}
@@ -60,49 +68,135 @@ public class Row
}
/**
- * Creates a copy of the current row; can be used past calls to {@link QueryResult#hasNext()}.
+ * Creates a copy of the current row; can be used past calls to {@link SimpleQueryResult#hasNext()}.
*/
public Row copy()
{
- Row copy = new Row(nameIndex);
+ Row copy = new Row(names, nameIndex);
copy.setResults(results);
return copy;
}
+ public <T> T get(int index)
+ {
+ checkAccess();
+ if (index < 0 || index >= results.length)
+ throw new NoSuchElementException("by index: " + index);
+ return (T) results[index];
+ }
+
public <T> T get(String name)
{
checkAccess();
int idx = findIndex(name);
- if (idx == -1)
- return null;
+ if (idx == NOT_FOUND)
+ throw new NoSuchElementException("by name: " + name);
return (T) results[idx];
}
+ public Short getShort(int index)
+ {
+ return get(index);
+ }
+
+ public Short getShort(String name)
+ {
+ return get(name);
+ }
+
+ public Integer getInteger(int index)
+ {
+ return get(index);
+ }
+
+ public Integer getInteger(String name)
+ {
+ return get(name);
+ }
+
+ public Long getLong(int index)
+ {
+ return get(index);
+ }
+
+ public Long getLong(String name)
+ {
+ return get(name);
+ }
+
+ public Float getFloat(int index)
+ {
+ return get(index);
+ }
+
+ public Float getFloat(String name)
+ {
+ return get(name);
+ }
+
+ public Double getDouble(int index)
+ {
+ return get(index);
+ }
+
+ public Double getDouble(String name)
+ {
+ return get(name);
+ }
+
+ public String getString(int index)
+ {
+ return get(index);
+ }
+
public String getString(String name)
{
return get(name);
}
+ public UUID getUUID(int index)
+ {
+ return get(index);
+ }
+
public UUID getUUID(String name)
{
return get(name);
}
+ public Date getTimestamp(int index)
+ {
+ return get(index);
+ }
+
public Date getTimestamp(String name)
{
return get(name);
}
+ public <T> Set<T> getSet(int index)
+ {
+ return get(index);
+ }
+
public <T> Set<T> getSet(String name)
{
return get(name);
}
+ /**
+ * Get the row as a array.
+ */
+ public Object[] toObjectArray()
+ {
+ return results;
+ }
+
public String toString()
{
return "Row{" +
- "names=" + nameIndex.keySet() +
- ", results=" + Arrays.toString(results) +
+ "names=" + Arrays.toString(names) +
+ ", results=" + (results == null ? "[]" : Arrays.toString(results)) +
'}';
}
@@ -114,6 +208,6 @@ public class Row
private int findIndex(String name)
{
- return nameIndex.getOrDefault(name, -1);
+ return nameIndex.getOrDefault(name, NOT_FOUND);
}
}
diff --git a/src/main/java/org/apache/cassandra/distributed/api/QueryResult.java b/src/main/java/org/apache/cassandra/distributed/api/SimpleQueryResult.java
similarity index 76%
copy from src/main/java/org/apache/cassandra/distributed/api/QueryResult.java
copy to src/main/java/org/apache/cassandra/distributed/api/SimpleQueryResult.java
index e72d33e..a44411d 100644
--- a/src/main/java/org/apache/cassandra/distributed/api/QueryResult.java
+++ b/src/main/java/org/apache/cassandra/distributed/api/SimpleQueryResult.java
@@ -15,13 +15,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.cassandra.distributed.api;
-import java.util.Iterator;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
/**
* A table of data representing a complete query result.
@@ -56,17 +59,15 @@ import java.util.function.Predicate;
* points to newer data. If this behavior is not desirable and access is needed between calls, then {@link Row#copy()}
* should be used; this will clone the {@link Row} and return a new object pointing to the same data.
*/
-public class QueryResult implements Iterator<Row>
+public class SimpleQueryResult implements QueryResult
{
- public static final QueryResult EMPTY = new QueryResult(new String[0], null);
-
private final String[] names;
private final Object[][] results;
private final Predicate<Row> filter;
private final Row row;
private int offset = -1;
- public QueryResult(String[] names, Object[][] results)
+ public SimpleQueryResult(String[] names, Object[][] results)
{
this.names = Objects.requireNonNull(names, "names");
this.results = results;
@@ -74,7 +75,7 @@ public class QueryResult implements Iterator<Row>
this.filter = ignore -> true;
}
- private QueryResult(String[] names, Object[][] results, Predicate<Row> filter, int offset)
+ private SimpleQueryResult(String[] names, Object[][] results, Predicate<Row> filter, int offset)
{
this.names = names;
this.results = results;
@@ -83,24 +84,23 @@ public class QueryResult implements Iterator<Row>
this.row = new Row(names);
}
- public String[] getNames()
- {
- return names;
- }
-
- public boolean isEmpty()
+ public List<String> names()
{
- return results.length == 0;
+ return Collections.unmodifiableList(Arrays.asList(names));
}
- public int size()
+ public SimpleQueryResult filter(Predicate<Row> fn)
{
- return results.length;
+ return new SimpleQueryResult(names, results, filter.and(fn), offset);
}
- public QueryResult filter(Predicate<Row> fn)
+ /**
+ * Reset the cursor to the start of the query result; if the query result has not been iterated, this has no effect.
+ */
+ public void reset()
{
- return new QueryResult(names, results, filter.and(fn), offset);
+ offset = -1;
+ row.setResults(null);
}
/**
@@ -132,8 +132,18 @@ public class QueryResult implements Iterator<Row>
@Override
public Row next()
{
+ // no null check needed for results since offset only increments IFF results is not null
if (offset < 0 || offset >= results.length)
throw new NoSuchElementException();
return row;
}
+
+ @Override
+ public String toString() {
+ if (results == null)
+ return "[]";
+ return Stream.of(results)
+ .map(Arrays::toString)
+ .collect(Collectors.joining(",", "[", "]"));
+ }
}
diff --git a/src/main/java/org/apache/cassandra/distributed/shared/AssertUtils.java b/src/main/java/org/apache/cassandra/distributed/shared/AssertUtils.java
index 8e6254a..a388af3 100644
--- a/src/main/java/org/apache/cassandra/distributed/shared/AssertUtils.java
+++ b/src/main/java/org/apache/cassandra/distributed/shared/AssertUtils.java
@@ -18,6 +18,10 @@
package org.apache.cassandra.distributed.shared;
+import org.apache.cassandra.distributed.api.QueryResult;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.distributed.api.Row;
+
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
@@ -26,6 +30,34 @@ import java.util.List;
public class AssertUtils
{
+ public static void assertRows(QueryResult actual, QueryResult expected)
+ {
+ if (actual instanceof SimpleQueryResult && expected instanceof SimpleQueryResult)
+ {
+ assertRows((SimpleQueryResult) actual, (SimpleQueryResult) expected);
+ }
+ else
+ {
+ assertRows(actual.map(Row::toObjectArray), expected.map(Row::toObjectArray));
+ }
+ }
+
+ public static void assertRows(SimpleQueryResult actual, SimpleQueryResult expected)
+ {
+ while (actual.hasNext()) {
+ if (!expected.hasNext())
+ throw new AssertionError(rowsNotEqualErrorMessage(actual, expected));
+
+ Row next = actual.next();
+ Row exectedRow = expected.next();
+
+ assertTrue(rowsNotEqualErrorMessage(actual, expected),
+ Arrays.equals(next.toObjectArray(), exectedRow.toObjectArray()));
+ }
+ if (expected.hasNext())
+ throw new AssertionError(rowsNotEqualErrorMessage(actual, expected));
+ }
+
public static void assertRows(Object[][] actual, Object[]... expected)
{
assertEquals(rowsNotEqualErrorMessage(actual, expected),
@@ -82,6 +114,11 @@ public class AssertUtils
Arrays.toString(actual));
}
+ public static String rowsNotEqualErrorMessage(SimpleQueryResult actual, SimpleQueryResult expected)
+ {
+ return String.format("Expected: %s\nActual: %s\n", expected, actual);
+ }
+
public static String rowsNotEqualErrorMessage(Object[][] actual, Object[][] expected)
{
return String.format("Expected: %s\nActual: %s\n",
diff --git a/src/main/java/org/apache/cassandra/distributed/shared/FutureUtils.java b/src/main/java/org/apache/cassandra/distributed/shared/FutureUtils.java
new file mode 100644
index 0000000..9f97e8d
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/shared/FutureUtils.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.distributed.shared;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
+
+public final class FutureUtils {
+ private FutureUtils() { }
+
+ public static <T> T waitOn(Future<T> f)
+ {
+ try
+ {
+ return f.get();
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ catch (ExecutionException e)
+ {
+ Throwable cause = e.getCause();
+ if (cause instanceof Error) throw (Error) cause;
+ if (cause instanceof RuntimeException) throw (RuntimeException) cause;
+ throw new RuntimeException(cause);
+ }
+ }
+
+ public static <A, B> Future<B> map(Future<A> future, Function<? super A, ? extends B> fn) {
+ if (future == null) throw new NullPointerException("Future is null");
+ if (fn == null) throw new NullPointerException("Function is null");
+
+ if (future instanceof CompletableFuture) {
+ return ((CompletableFuture<A>) future).thenApply(fn);
+ }
+ return new MapFuture<>(future, fn);
+ }
+
+ private static final class MapFuture<A, B> implements Future<B>
+ {
+ private final Future<A> parent;
+ private final Function<? super A, ? extends B> fn;
+
+ private MapFuture(Future<A> parent, Function<? super A, ? extends B> fn) {
+ this.parent = parent;
+ this.fn = fn;
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ return parent.cancel(mayInterruptIfRunning);
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return parent.isCancelled();
+ }
+
+ @Override
+ public boolean isDone() {
+ return parent.isDone();
+ }
+
+ @Override
+ public B get() throws InterruptedException, ExecutionException {
+ return fn.apply(parent.get());
+ }
+
+ @Override
+ public B get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ return fn.apply(parent.get(timeout, unit));
+ }
+ }
+}
diff --git a/src/test/java/org/apache/cassandra/distributed/api/QueryResultTest.java b/src/test/java/org/apache/cassandra/distributed/api/QueryResultTest.java
new file mode 100644
index 0000000..6bef78b
--- /dev/null
+++ b/src/test/java/org/apache/cassandra/distributed/api/QueryResultTest.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.api;
+
+import org.apache.cassandra.distributed.shared.AssertUtils;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+public class QueryResultTest
+{
+ @Test
+ public void empty()
+ {
+ QueryResult result = QueryResults.empty();
+
+ assertThat(result.names()).isEmpty();
+ assertThat(result.toString()).isEqualTo("[]");
+
+ assertThat(result.hasNext()).isFalse();
+ assertThatThrownBy(result::next).isInstanceOf(NoSuchElementException.class);
+
+ QueryResult filtered = result.filter(ignore -> true);
+ assertThat(filtered.hasNext()).isFalse();
+ assertThatThrownBy(filtered::next).isInstanceOf(NoSuchElementException.class);
+
+ Iterator<Object> it = result.map(r -> r.get("undefined"));
+ assertThat(it.hasNext()).isFalse();
+ assertThatThrownBy(it::next).isInstanceOf(NoSuchElementException.class);
+ }
+
+ @Test
+ public void equals()
+ {
+ String[] names = { "fname", "lname"};
+ Object[][] rows = {
+ new Object[] { "david", "capwell"},
+ new Object[] { "alex", "petrov"},
+ new Object[] { "dinesh", "joshi"},
+ };
+ SimpleQueryResult result = new SimpleQueryResult(names, rows);
+ SimpleQueryResult fromBuilder = QueryResults.builder()
+ .columns(names)
+ .row(rows[0])
+ .row(rows[1])
+ .row(rows[2])
+ .build();
+
+ AssertUtils.assertRows(result, fromBuilder);
+ }
+
+ @Test
+ public void notEqualLength()
+ {
+ String[] names = { "fname", "lname"};
+ Object[][] rows = {
+ new Object[] { "david", "capwell"},
+ new Object[] { "alex", "petrov"},
+ new Object[] { "dinesh", "joshi"},
+ };
+ SimpleQueryResult result = new SimpleQueryResult(names, rows);
+ SimpleQueryResult fromBuilder = QueryResults.builder()
+ .columns(names)
+ .row(rows[0])
+ .row(rows[1])
+ .row(rows[2])
+ .row("chris", "lohfink")
+ .build();
+
+ assertThatThrownBy(() -> AssertUtils.assertRows(result, fromBuilder))
+ .isInstanceOf(AssertionError.class)
+ .hasMessageContaining("Expected: ")
+ .hasMessageContaining("Actual: ");
+ }
+
+ @Test
+ public void notEqualColumnLength()
+ {
+ String[] names = { "fname", "lname"};
+ Object[][] rows = {
+ new Object[] { "david", "capwell"},
+ new Object[] { "alex", "petrov"},
+ new Object[] { "dinesh", "joshi"},
+ };
+ SimpleQueryResult result = new SimpleQueryResult(names, rows);
+ SimpleQueryResult fromBuilder = QueryResults.builder()
+ .columns("fname")
+ .row("david")
+ .row("alex")
+ .row("dinesh")
+ .build();
+
+ assertThatThrownBy(() -> AssertUtils.assertRows(result, fromBuilder))
+ .isInstanceOf(AssertionError.class)
+ .hasMessageContaining("Expected: ")
+ .hasMessageContaining("Actual: ");
+ }
+
+ @Test
+ public void notEqualContent()
+ {
+ String[] names = { "fname", "lname"};
+ Object[][] rows = {
+ new Object[] { "david", "capwell"},
+ new Object[] { "alex", "petrov"},
+ new Object[] { "dinesh", "joshi"},
+ };
+ SimpleQueryResult result = new SimpleQueryResult(names, rows);
+ SimpleQueryResult fromBuilder = QueryResults.builder()
+ .columns(names)
+ .row("david", "Capwell")
+ .row("alex", "Petrov")
+ .row("dinesh", "Joshi")
+ .build();
+
+ assertThatThrownBy(() -> AssertUtils.assertRows(result, fromBuilder))
+ .isInstanceOf(AssertionError.class)
+ .hasMessageContaining("Expected: ")
+ .hasMessageContaining("Actual: ");
+ }
+
+ @Test
+ public void completeFilter()
+ {
+ SimpleQueryResult qr = QueryResults.builder()
+ .row(1, 2, 3, 4)
+ .row(5, 6, 7, 7)
+ .row(1, 2, 4, 8)
+ .row(2, 4, 6, 12)
+ .build();
+
+ SimpleQueryResult filtered = qr.filter(row -> row.getInteger(0).intValue() != 1);
+
+ AssertUtils.assertRows(filtered, QueryResults.builder()
+ .row(5, 6, 7, 7)
+ .row(2, 4, 6, 12)
+ .build());
+ }
+
+ @Test
+ public void completeMap()
+ {
+ SimpleQueryResult qr = QueryResults.builder()
+ .row(1, 2, 3, 4)
+ .row(5, 6, 7, 7)
+ .row(1, 2, 4, 8)
+ .row(2, 4, 6, 12)
+ .build();
+
+ Iterator<Integer> it = qr.map(r -> r.getInteger(0));
+ List<Integer> result = new ArrayList<>(4);
+ it.forEachRemaining(result::add);
+
+ assertThat(result).isEqualTo(Arrays.asList(1, 5, 1, 2));
+ }
+
+ @Test
+ public void iteratorFilter()
+ {
+ String[] names = {"first"};
+ List<Object[]> values = new ArrayList<>();
+ values.add(new Object[] { "david" });
+ values.add(new Object[] { "alex" });
+ values.add(new Object[] { "dinesh" });
+
+ QueryResult qr = QueryResults.fromObjectArrayIterator(names, values.iterator())
+ .filter(r -> !"david".equals(r.getString("first")))
+ .filter(r -> !"alex".equals(r.getString("first")));
+
+ AssertUtils.assertRows(qr, QueryResults.builder()
+ .columns(names)
+ .row("dinesh")
+ .build());
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org