You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pt...@apache.org on 2022/09/30 06:22:00 UTC
[ignite-3] branch main updated: IGNITE-17118 Replace CompletionStage with CompletableFuture in public APIs (#1141)
This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 9d4b8a7e18 IGNITE-17118 Replace CompletionStage with CompletableFuture in public APIs (#1141)
9d4b8a7e18 is described below
commit 9d4b8a7e1890e453234533d425b2bfc307d5710a
Author: Pavel Tupitsyn <pt...@apache.org>
AuthorDate: Fri Sep 30 09:21:53 2022 +0300
IGNITE-17118 Replace CompletionStage with CompletableFuture in public APIs (#1141)
Make async public APIs consistent.
---
.../src/main/java/org/apache/ignite/sql/async/AsyncResultSet.java | 6 +++---
.../apache/ignite/client/handler/requests/jdbc/JdbcQueryCursor.java | 3 +--
.../org/apache/ignite/internal/client/sql/ClientAsyncResultSet.java | 5 ++---
.../java/org/apache/ignite/client/fakes/FakeAsyncResultSet.java | 5 ++---
.../src/test/java/org/apache/ignite/client/fakes/FakeCursor.java | 3 +--
.../java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java | 5 ++---
.../java/org/apache/ignite/internal/sql/engine/AsyncCursor.java | 3 +--
.../org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java | 3 +--
.../org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java | 3 +--
.../org/apache/ignite/internal/sql/engine/exec/AsyncWrapper.java | 3 +--
.../ignite/internal/sql/engine/exec/ExecutionServiceImpl.java | 5 ++---
.../apache/ignite/internal/sql/engine/exec/rel/AsyncRootNode.java | 3 +--
.../org/apache/ignite/internal/sql/engine/IgniteSqlApiTest.java | 5 +++--
13 files changed, 21 insertions(+), 31 deletions(-)
diff --git a/modules/api/src/main/java/org/apache/ignite/sql/async/AsyncResultSet.java b/modules/api/src/main/java/org/apache/ignite/sql/async/AsyncResultSet.java
index 9ff9e262c1..4fc166f6a4 100644
--- a/modules/api/src/main/java/org/apache/ignite/sql/async/AsyncResultSet.java
+++ b/modules/api/src/main/java/org/apache/ignite/sql/async/AsyncResultSet.java
@@ -17,7 +17,7 @@
package org.apache.ignite.sql.async;
-import java.util.concurrent.CompletionStage;
+import java.util.concurrent.CompletableFuture;
import org.apache.ignite.sql.NoRowSetExpectedException;
import org.apache.ignite.sql.ResultSet;
import org.apache.ignite.sql.ResultSetMetadata;
@@ -118,7 +118,7 @@ public interface AsyncResultSet {
* @return Operation future.
* @throws NoRowSetExpectedException if no row set is expected as a query result.
*/
- CompletionStage<? extends AsyncResultSet> fetchNextPage();
+ CompletableFuture<? extends AsyncResultSet> fetchNextPage();
/**
* Returns whether there are more pages of results.
@@ -132,5 +132,5 @@ public interface AsyncResultSet {
*
* @return Operation future.
*/
- CompletionStage<Void> closeAsync();
+ CompletableFuture<Void> closeAsync();
}
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/JdbcQueryCursor.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/JdbcQueryCursor.java
index e0986137c3..9a379ec965 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/JdbcQueryCursor.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/JdbcQueryCursor.java
@@ -19,7 +19,6 @@ package org.apache.ignite.client.handler.requests.jdbc;
import java.util.List;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
import org.apache.ignite.internal.sql.engine.SqlQueryType;
@@ -54,7 +53,7 @@ public class JdbcQueryCursor<T> implements AsyncSqlCursor<T> {
/** {@inheritDoc} */
@Override
- public CompletionStage<BatchedResult<T>> requestNextAsync(int rows) {
+ public CompletableFuture<BatchedResult<T>> requestNextAsync(int rows) {
long fetched0 = fetched.addAndGet(rows);
return cur.requestNextAsync(rows).thenApply(batch -> {
if (maxRows == 0 || fetched0 < maxRows) {
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientAsyncResultSet.java b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientAsyncResultSet.java
index e976b33490..47de94944c 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientAsyncResultSet.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientAsyncResultSet.java
@@ -25,7 +25,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
import org.apache.ignite.internal.client.ClientChannel;
import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
import org.apache.ignite.internal.client.proto.ClientOp;
@@ -133,7 +132,7 @@ class ClientAsyncResultSet implements AsyncResultSet {
/** {@inheritDoc} */
@Override
- public CompletionStage<? extends AsyncResultSet> fetchNextPage() {
+ public CompletableFuture<? extends AsyncResultSet> fetchNextPage() {
requireResultSet();
if (closed) {
@@ -169,7 +168,7 @@ class ClientAsyncResultSet implements AsyncResultSet {
/** {@inheritDoc} */
@Override
- public CompletionStage<Void> closeAsync() {
+ public CompletableFuture<Void> closeAsync() {
if (resourceId == null || closed) {
return CompletableFuture.completedFuture(null);
}
diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeAsyncResultSet.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeAsyncResultSet.java
index 9140d33218..3762fa8f0a 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeAsyncResultSet.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeAsyncResultSet.java
@@ -30,7 +30,6 @@ import java.util.BitSet;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.internal.client.sql.ClientSqlRow;
import org.apache.ignite.sql.ColumnMetadata;
@@ -193,7 +192,7 @@ public class FakeAsyncResultSet implements AsyncResultSet {
/** {@inheritDoc} */
@Override
- public CompletionStage<? extends AsyncResultSet> fetchNextPage() {
+ public CompletableFuture<? extends AsyncResultSet> fetchNextPage() {
return null;
}
@@ -205,7 +204,7 @@ public class FakeAsyncResultSet implements AsyncResultSet {
/** {@inheritDoc} */
@Override
- public CompletionStage<Void> closeAsync() {
+ public CompletableFuture<Void> closeAsync() {
return CompletableFuture.completedFuture(null);
}
diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCursor.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCursor.java
index b689efc636..8aaf11306a 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCursor.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCursor.java
@@ -22,7 +22,6 @@ import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
import org.apache.ignite.internal.sql.engine.SqlCursor;
import org.apache.ignite.internal.sql.engine.SqlQueryType;
@@ -44,7 +43,7 @@ public class FakeCursor implements AsyncSqlCursor<List<Object>> {
}
@Override
- public CompletionStage<BatchedResult<List<Object>>> requestNextAsync(int rows) {
+ public CompletableFuture<BatchedResult<List<Object>>> requestNextAsync(int rows) {
var batch = new ArrayList<List<Object>>();
for (int i = 0; i < rows; i++) {
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java
index 3ed48a60ce..39b64a5694 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java
@@ -28,7 +28,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.internal.sql.engine.AsyncCursor.BatchedResult;
import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
@@ -133,7 +132,7 @@ public class AsyncResultSetImpl implements AsyncResultSet {
/** {@inheritDoc} */
@Override
- public CompletionStage<? extends AsyncResultSet> fetchNextPage() {
+ public CompletableFuture<? extends AsyncResultSet> fetchNextPage() {
requireResultSet();
if (!hasMorePages()) {
@@ -156,7 +155,7 @@ public class AsyncResultSetImpl implements AsyncResultSet {
/** {@inheritDoc} */
@Override
- public CompletionStage<Void> closeAsync() {
+ public CompletableFuture<Void> closeAsync() {
return cur.closeAsync().thenRun(closeRun);
}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncCursor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncCursor.java
index ec63b1f066..bb1b373f9d 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncCursor.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncCursor.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.sql.engine;
import java.util.List;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
/**
* Asynchronous cursor.
@@ -38,7 +37,7 @@ public interface AsyncCursor<T> {
* @param rows Desired amount of rows.
* @return A completion stage that will be completed with batch of size {@code rows} or less if there is no more data.
*/
- CompletionStage<BatchedResult<T>> requestNextAsync(int rows);
+ CompletableFuture<BatchedResult<T>> requestNextAsync(int rows);
/**
* Releases resources acquired by the cursor.
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java
index 1f19bba897..4ce6b57681 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.sql.engine;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.sql.ResultSetMetadata;
@@ -68,7 +67,7 @@ public class AsyncSqlCursorImpl<T> implements AsyncSqlCursor<T> {
/** {@inheritDoc} */
@Override
- public CompletionStage<BatchedResult<T>> requestNextAsync(int rows) {
+ public CompletableFuture<BatchedResult<T>> requestNextAsync(int rows) {
return dataCursor.requestNextAsync(rows).handle((batch, t) -> {
if (t != null) {
if (implicitTx != null) {
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index 534402ea3f..7009e39cb2 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -31,7 +31,6 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -416,7 +415,7 @@ public class SqlQueryProcessor implements QueryProcessor {
implicitTx,
new AsyncCursor<List<Object>>() {
@Override
- public CompletionStage<BatchedResult<List<Object>>> requestNextAsync(int rows) {
+ public CompletableFuture<BatchedResult<List<Object>>> requestNextAsync(int rows) {
session.touch();
return dataCursor.requestNextAsync(rows);
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/AsyncWrapper.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/AsyncWrapper.java
index c7fb32dfd5..429448a256 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/AsyncWrapper.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/AsyncWrapper.java
@@ -22,7 +22,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.function.Function;
import org.apache.ignite.internal.sql.engine.AsyncCursor;
@@ -76,7 +75,7 @@ public class AsyncWrapper<T> implements AsyncCursor<T> {
/** {@inheritDoc} */
@Override
- public CompletionStage<BatchedResult<T>> requestNextAsync(int rows) {
+ public CompletableFuture<BatchedResult<T>> requestNextAsync(int rows) {
CompletableFuture<BatchedResult<T>> next = new CompletableFuture<>();
CompletableFuture<BatchedResult<T>> prev;
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
index c2afd25a9d..ab71eab935 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
@@ -34,7 +34,6 @@ import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
-import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -243,7 +242,7 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve
}
/** Cancels the query with given id. */
- public CompletionStage<?> cancel(UUID qryId) {
+ public CompletableFuture<?> cancel(UUID qryId) {
var mgr = queryManagerMap.get(qryId);
if (mgr == null) {
@@ -594,7 +593,7 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve
return new AsyncCursor<>() {
@Override
- public CompletionStage<BatchedResult<List<Object>>> requestNextAsync(int rows) {
+ public CompletableFuture<BatchedResult<List<Object>>> requestNextAsync(int rows) {
return root.thenCompose(cur -> {
var fut = cur.requestNextAsync(rows);
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNode.java
index cdcf626e48..e159e4a79a 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNode.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNode.java
@@ -21,7 +21,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -151,7 +150,7 @@ public class AsyncRootNode<InRowT, OutRowT> implements Downstream<InRowT>, Async
/** {@inheritDoc} */
@Override
- public CompletionStage<BatchedResult<OutRowT>> requestNextAsync(int rows) {
+ public CompletableFuture<BatchedResult<OutRowT>> requestNextAsync(int rows) {
CompletableFuture<BatchedResult<OutRowT>> next = new CompletableFuture<>();
Throwable t = ex.get();
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/IgniteSqlApiTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/IgniteSqlApiTest.java
index e41f213866..41bac8c4ce 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/IgniteSqlApiTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/IgniteSqlApiTest.java
@@ -554,8 +554,9 @@ public class IgniteSqlApiTest {
AsyncResultSet page1 = Mockito.mock(AsyncResultSet.class);
Mockito.when(page1.currentPage()).thenReturn(rows.subList(0, 2));
Mockito.when(page1.hasMorePages()).thenReturn(true);
- Mockito.when(page1.fetchNextPage())
- .thenReturn((CompletionStage) CompletableFuture.completedFuture(page2));
+
+ //noinspection unchecked,rawtypes
+ Mockito.when(page1.fetchNextPage()).thenReturn((CompletableFuture) CompletableFuture.completedFuture(page2));
return CompletableFuture.completedFuture(page1);
});