You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sd...@apache.org on 2022/01/13 09:43:54 UTC
[ignite-3] branch main updated: IGNITE-16281 Fix deadlock
This is an automated email from the ASF dual-hosted git repository.
sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 6fa4799 IGNITE-16281 Fix deadlock
6fa4799 is described below
commit 6fa4799448226427c25b4d0ec1e0ba4f54423f01
Author: Semyon Danilov <sa...@yandex.ru>
AuthorDate: Wed Jan 12 15:39:31 2022 +0300
IGNITE-16281 Fix deadlock
---
.../client/proto/query/JdbcQueryEventHandler.java | 37 ++++-----
.../client/handler/JdbcQueryEventHandlerImpl.java | 94 ++++++++++------------
.../requests/sql/ClientSqlCloseRequest.java | 9 +--
.../sql/ClientSqlColumnMetadataRequest.java | 9 +--
.../requests/sql/ClientSqlExecuteBatchRequest.java | 9 +--
.../requests/sql/ClientSqlExecuteRequest.java | 9 +--
.../requests/sql/ClientSqlFetchRequest.java | 9 +--
.../sql/ClientSqlPrimaryKeyMetadataRequest.java | 9 +--
.../sql/ClientSqlQueryMetadataRequest.java | 9 +--
.../sql/ClientSqlSchemasMetadataRequest.java | 9 +--
.../sql/ClientSqlTableMetadataRequest.java | 9 +--
.../handler/requests/sql/JdbcMetadataCatalog.java | 88 +++++++++-----------
.../ignite/internal/client/TcpIgniteClient.java | 9 ++-
.../client/query/JdbcClientQueryEventHandler.java | 55 +++++--------
.../ignite/internal/jdbc/JdbcDatabaseMetadata.java | 9 ++-
.../apache/ignite/internal/jdbc/JdbcResultSet.java | 6 +-
.../apache/ignite/internal/jdbc/JdbcStatement.java | 4 +-
.../internal/runner/app/ItNoThreadsLeftTest.java | 1 +
.../internal/table/distributed/TableManager.java | 44 +++++-----
19 files changed, 174 insertions(+), 254 deletions(-)
diff --git a/modules/client-common/src/main/java/org/apache/ignite/client/proto/query/JdbcQueryEventHandler.java b/modules/client-common/src/main/java/org/apache/ignite/client/proto/query/JdbcQueryEventHandler.java
index c8f5540..70738d4 100644
--- a/modules/client-common/src/main/java/org/apache/ignite/client/proto/query/JdbcQueryEventHandler.java
+++ b/modules/client-common/src/main/java/org/apache/ignite/client/proto/query/JdbcQueryEventHandler.java
@@ -17,6 +17,7 @@
package org.apache.ignite.client.proto.query;
+import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.proto.query.event.BatchExecuteRequest;
import org.apache.ignite.client.proto.query.event.BatchExecuteResult;
import org.apache.ignite.client.proto.query.event.JdbcMetaColumnsRequest;
@@ -43,71 +44,71 @@ public interface JdbcQueryEventHandler {
* {@link QueryExecuteRequest} command handler.
*
* @param req Execute query request.
- * @return Result.
+ * @return Result future.
*/
- QueryExecuteResult query(QueryExecuteRequest req);
+ CompletableFuture<QueryExecuteResult> queryAsync(QueryExecuteRequest req);
/**
* {@link QueryFetchRequest} command handler.
*
* @param req Fetch query request.
- * @return Result.
+ * @return Result future.
*/
- QueryFetchResult fetch(QueryFetchRequest req);
+ CompletableFuture<QueryFetchResult> fetchAsync(QueryFetchRequest req);
/**
* {@link BatchExecuteRequest} command handler.
*
* @param req Batch query request.
- * @return Result.
+ * @return Result future.
*/
- BatchExecuteResult batch(BatchExecuteRequest req);
+ CompletableFuture<BatchExecuteResult> batchAsync(BatchExecuteRequest req);
/**
* {@link QueryCloseRequest} command handler.
*
* @param req Close query request.
- * @return Result.
+ * @return Result future.
*/
- QueryCloseResult close(QueryCloseRequest req);
+ CompletableFuture<QueryCloseResult> closeAsync(QueryCloseRequest req);
/**
* {@link JdbcMetaTablesRequest} command handler.
*
* @param req Jdbc tables metadata request.
- * @return Result.
+ * @return Result future.
*/
- JdbcMetaTablesResult tablesMeta(JdbcMetaTablesRequest req);
+ CompletableFuture<JdbcMetaTablesResult> tablesMetaAsync(JdbcMetaTablesRequest req);
/**
* {@link JdbcMetaColumnsRequest} command handler.
*
* @param req Jdbc columns metadata request.
- * @return Result.
+ * @return Result future.
*/
- JdbcMetaColumnsResult columnsMeta(JdbcMetaColumnsRequest req);
+ CompletableFuture<JdbcMetaColumnsResult> columnsMetaAsync(JdbcMetaColumnsRequest req);
/**
* {@link JdbcMetaSchemasRequest} command handler.
*
* @param req Jdbc schemas metadata request.
- * @return Result.
+ * @return Result future.
*/
- JdbcMetaSchemasResult schemasMeta(JdbcMetaSchemasRequest req);
+ CompletableFuture<JdbcMetaSchemasResult> schemasMetaAsync(JdbcMetaSchemasRequest req);
/**
* {@link JdbcMetaPrimaryKeysRequest} command handler.
*
* @param req Jdbc primary keys metadata request.
- * @return Result.
+ * @return Result future.
*/
- JdbcMetaPrimaryKeysResult primaryKeysMeta(JdbcMetaPrimaryKeysRequest req);
+ CompletableFuture<JdbcMetaPrimaryKeysResult> primaryKeysMetaAsync(JdbcMetaPrimaryKeysRequest req);
/**
* {@link JdbcQueryMetadataRequest} command handler.
*
* @param req Jdbc query metadata request.
- * @return Result.
+ * @return Result future.
*/
- JdbcMetaColumnsResult queryMetadata(JdbcQueryMetadataRequest req);
+ CompletableFuture<JdbcMetaColumnsResult> queryMetadataAsync(JdbcQueryMetadataRequest req);
}
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImpl.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImpl.java
index 195b68a..f8681b9 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImpl.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImpl.java
@@ -22,8 +22,8 @@ import static org.apache.ignite.client.proto.query.IgniteQueryErrorCode.UNSUPPOR
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
@@ -40,9 +40,7 @@ import org.apache.ignite.client.proto.query.event.JdbcMetaSchemasRequest;
import org.apache.ignite.client.proto.query.event.JdbcMetaSchemasResult;
import org.apache.ignite.client.proto.query.event.JdbcMetaTablesRequest;
import org.apache.ignite.client.proto.query.event.JdbcMetaTablesResult;
-import org.apache.ignite.client.proto.query.event.JdbcPrimaryKeyMeta;
import org.apache.ignite.client.proto.query.event.JdbcQueryMetadataRequest;
-import org.apache.ignite.client.proto.query.event.JdbcTableMeta;
import org.apache.ignite.client.proto.query.event.QueryCloseRequest;
import org.apache.ignite.client.proto.query.event.QueryCloseResult;
import org.apache.ignite.client.proto.query.event.QueryExecuteRequest;
@@ -87,10 +85,10 @@ public class JdbcQueryEventHandlerImpl implements JdbcQueryEventHandler {
/** {@inheritDoc} */
@Override
- public QueryExecuteResult query(QueryExecuteRequest req) {
+ public CompletableFuture<QueryExecuteResult> queryAsync(QueryExecuteRequest req) {
if (req.pageSize() <= 0) {
- return new QueryExecuteResult(Response.STATUS_FAILED,
- "Invalid fetch size : [fetchSize=" + req.pageSize() + ']');
+ return CompletableFuture.completedFuture(new QueryExecuteResult(Response.STATUS_FAILED,
+ "Invalid fetch size : [fetchSize=" + req.pageSize() + ']'));
}
List<SqlCursor<List<?>>> cursors;
@@ -99,13 +97,13 @@ public class JdbcQueryEventHandlerImpl implements JdbcQueryEventHandler {
} catch (Exception e) {
StringWriter sw = getWriterWithStackTrace(e);
- return new QueryExecuteResult(Response.STATUS_FAILED,
- "Exception while executing query " + req.sqlQuery() + ". Error message: " + sw);
+ return CompletableFuture.completedFuture(new QueryExecuteResult(Response.STATUS_FAILED,
+ "Exception while executing query " + req.sqlQuery() + ". Error message: " + sw));
}
if (cursors.isEmpty()) {
- return new QueryExecuteResult(Response.STATUS_FAILED,
- "At least one cursor is expected for query " + req.sqlQuery());
+ return CompletableFuture.completedFuture(new QueryExecuteResult(Response.STATUS_FAILED,
+ "At least one cursor is expected for query " + req.sqlQuery()));
}
List<QuerySingleResult> results = new ArrayList<>();
@@ -118,26 +116,26 @@ public class JdbcQueryEventHandlerImpl implements JdbcQueryEventHandler {
} catch (Exception ex) {
StringWriter sw = getWriterWithStackTrace(ex);
- return new QueryExecuteResult(Response.STATUS_FAILED,
- "Failed to fetch results for query " + req.sqlQuery() + ". Error message: " + sw);
+ return CompletableFuture.completedFuture(new QueryExecuteResult(Response.STATUS_FAILED,
+ "Failed to fetch results for query " + req.sqlQuery() + ". Error message: " + sw));
}
- return new QueryExecuteResult(results);
+ return CompletableFuture.completedFuture(new QueryExecuteResult(results));
}
/** {@inheritDoc} */
@Override
- public QueryFetchResult fetch(QueryFetchRequest req) {
+ public CompletableFuture<QueryFetchResult> fetchAsync(QueryFetchRequest req) {
Cursor<List<?>> cur = openCursors.get(req.cursorId());
if (cur == null) {
- return new QueryFetchResult(Response.STATUS_FAILED,
- "Failed to find query cursor with ID: " + req.cursorId());
+ return CompletableFuture.completedFuture(new QueryFetchResult(Response.STATUS_FAILED,
+ "Failed to find query cursor with ID: " + req.cursorId()));
}
if (req.pageSize() <= 0) {
- return new QueryFetchResult(Response.STATUS_FAILED,
- "Invalid fetch size : [fetchSize=" + req.pageSize() + ']');
+ return CompletableFuture.completedFuture(new QueryFetchResult(Response.STATUS_FAILED,
+ "Invalid fetch size : [fetchSize=" + req.pageSize() + ']'));
}
List<List<Object>> fetch;
@@ -149,28 +147,28 @@ public class JdbcQueryEventHandlerImpl implements JdbcQueryEventHandler {
} catch (Exception ex) {
StringWriter sw = getWriterWithStackTrace(ex);
- return new QueryFetchResult(Response.STATUS_FAILED,
- "Failed to fetch results for cursor id " + req.cursorId() + ". Error message: " + sw);
+ return CompletableFuture.completedFuture(new QueryFetchResult(Response.STATUS_FAILED,
+ "Failed to fetch results for cursor id " + req.cursorId() + ". Error message: " + sw));
}
- return new QueryFetchResult(fetch, hasNext);
+ return CompletableFuture.completedFuture(new QueryFetchResult(fetch, hasNext));
}
/** {@inheritDoc} */
@Override
- public BatchExecuteResult batch(BatchExecuteRequest req) {
- return new BatchExecuteResult(UNSUPPORTED_OPERATION,
- "ExecuteBatch operation is not implemented yet.");
+ public CompletableFuture<BatchExecuteResult> batchAsync(BatchExecuteRequest req) {
+ return CompletableFuture.completedFuture(new BatchExecuteResult(UNSUPPORTED_OPERATION,
+ "ExecuteBatch operation is not implemented yet."));
}
/** {@inheritDoc} */
@Override
- public QueryCloseResult close(QueryCloseRequest req) {
+ public CompletableFuture<QueryCloseResult> closeAsync(QueryCloseRequest req) {
Cursor<List<?>> cur = openCursors.remove(req.cursorId());
if (cur == null) {
- return new QueryCloseResult(Response.STATUS_FAILED,
- "Failed to find query cursor with ID: " + req.cursorId());
+ return CompletableFuture.completedFuture(new QueryCloseResult(Response.STATUS_FAILED,
+ "Failed to find query cursor with ID: " + req.cursorId()));
}
try {
@@ -178,35 +176,35 @@ public class JdbcQueryEventHandlerImpl implements JdbcQueryEventHandler {
} catch (Exception ex) {
StringWriter sw = getWriterWithStackTrace(ex);
- return new QueryCloseResult(Response.STATUS_FAILED,
- "Failed to close SQL query [curId=" + req.cursorId() + "]. Error message: " + sw);
+ return CompletableFuture.completedFuture(new QueryCloseResult(Response.STATUS_FAILED,
+ "Failed to close SQL query [curId=" + req.cursorId() + "]. Error message: " + sw));
}
- return new QueryCloseResult();
+ return CompletableFuture.completedFuture(new QueryCloseResult());
}
/** {@inheritDoc} */
@Override
- public JdbcMetaColumnsResult queryMetadata(JdbcQueryMetadataRequest req) {
+ public CompletableFuture<JdbcMetaColumnsResult> queryMetadataAsync(JdbcQueryMetadataRequest req) {
SqlCursor<List<?>> cur = openCursors.get(req.cursorId());
if (cur == null) {
- return new JdbcMetaColumnsResult(Response.STATUS_FAILED,
- "Failed to find query cursor with ID: " + req.cursorId());
+ return CompletableFuture.completedFuture(new JdbcMetaColumnsResult(Response.STATUS_FAILED,
+ "Failed to find query cursor with ID: " + req.cursorId()));
}
ResultSetMetadata metadata = cur.metadata();
if (metadata == null) {
- return new JdbcMetaColumnsResult(Response.STATUS_FAILED,
- "Failed to get query metadata for cursor with ID : " + req.cursorId());
+ return CompletableFuture.completedFuture(new JdbcMetaColumnsResult(Response.STATUS_FAILED,
+ "Failed to get query metadata for cursor with ID : " + req.cursorId()));
}
List<JdbcColumnMeta> meta = metadata.fields().stream()
.map(this::createColumnMetadata)
.collect(Collectors.toList());
- return new JdbcMetaColumnsResult(meta);
+ return CompletableFuture.completedFuture(new JdbcMetaColumnsResult(meta));
}
/**
@@ -236,34 +234,26 @@ public class JdbcQueryEventHandlerImpl implements JdbcQueryEventHandler {
/** {@inheritDoc} */
@Override
- public JdbcMetaTablesResult tablesMeta(JdbcMetaTablesRequest req) {
- List<JdbcTableMeta> tblsMeta = meta.getTablesMeta(req.schemaName(), req.tableName(), req.tableTypes());
-
- return new JdbcMetaTablesResult(tblsMeta);
+ public CompletableFuture<JdbcMetaTablesResult> tablesMetaAsync(JdbcMetaTablesRequest req) {
+ return meta.getTablesMeta(req.schemaName(), req.tableName(), req.tableTypes()).thenApply(JdbcMetaTablesResult::new);
}
/** {@inheritDoc} */
@Override
- public JdbcMetaColumnsResult columnsMeta(JdbcMetaColumnsRequest req) {
- Collection<JdbcColumnMeta> tblsMeta = meta.getColumnsMeta(req.schemaName(), req.tableName(), req.columnName());
-
- return new JdbcMetaColumnsResult(tblsMeta);
+ public CompletableFuture<JdbcMetaColumnsResult> columnsMetaAsync(JdbcMetaColumnsRequest req) {
+ return meta.getColumnsMeta(req.schemaName(), req.tableName(), req.columnName()).thenApply(JdbcMetaColumnsResult::new);
}
/** {@inheritDoc} */
@Override
- public JdbcMetaSchemasResult schemasMeta(JdbcMetaSchemasRequest req) {
- Collection<String> tblsMeta = meta.getSchemasMeta(req.schemaName());
-
- return new JdbcMetaSchemasResult(tblsMeta);
+ public CompletableFuture<JdbcMetaSchemasResult> schemasMetaAsync(JdbcMetaSchemasRequest req) {
+ return meta.getSchemasMeta(req.schemaName()).thenApply(JdbcMetaSchemasResult::new);
}
/** {@inheritDoc} */
@Override
- public JdbcMetaPrimaryKeysResult primaryKeysMeta(JdbcMetaPrimaryKeysRequest req) {
- Collection<JdbcPrimaryKeyMeta> tblsMeta = meta.getPrimaryKeys(req.schemaName(), req.tableName());
-
- return new JdbcMetaPrimaryKeysResult(tblsMeta);
+ public CompletableFuture<JdbcMetaPrimaryKeysResult> primaryKeysMetaAsync(JdbcMetaPrimaryKeysRequest req) {
+ return meta.getPrimaryKeys(req.schemaName(), req.tableName()).thenApply(JdbcMetaPrimaryKeysResult::new);
}
/**
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCloseRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCloseRequest.java
index a9e3dd1..af1e513 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCloseRequest.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCloseRequest.java
@@ -20,7 +20,6 @@ package org.apache.ignite.client.handler.requests.sql;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.proto.query.JdbcQueryEventHandler;
import org.apache.ignite.client.proto.query.event.QueryCloseRequest;
-import org.apache.ignite.client.proto.query.event.QueryCloseResult;
import org.apache.ignite.internal.client.proto.ClientMessagePacker;
import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
@@ -34,7 +33,7 @@ public class ClientSqlCloseRequest {
* @param in Client message unpacker.
* @param out Client message packer.
* @param handler Query event handler.
- * @return null value indicates synchronous operation.
+ * @return Operation future.
*/
public static CompletableFuture<Void> process(
ClientMessageUnpacker in,
@@ -45,10 +44,6 @@ public class ClientSqlCloseRequest {
req.readBinary(in);
- QueryCloseResult res = handler.close(req);
-
- res.writeBinary(out);
-
- return null;
+ return handler.closeAsync(req).thenAccept(res -> res.writeBinary(out));
}
}
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlColumnMetadataRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlColumnMetadataRequest.java
index 9ffc615..0ef129e 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlColumnMetadataRequest.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlColumnMetadataRequest.java
@@ -20,7 +20,6 @@ package org.apache.ignite.client.handler.requests.sql;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.proto.query.JdbcQueryEventHandler;
import org.apache.ignite.client.proto.query.event.JdbcMetaColumnsRequest;
-import org.apache.ignite.client.proto.query.event.JdbcMetaColumnsResult;
import org.apache.ignite.internal.client.proto.ClientMessagePacker;
import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
@@ -34,7 +33,7 @@ public class ClientSqlColumnMetadataRequest {
* @param in Client message unpacker.
* @param out Client message packer.
* @param handler Query event handler.
- * @return null value indicates synchronous operation.
+ * @return Operation future.
*/
public static CompletableFuture<Void> process(
ClientMessageUnpacker in,
@@ -45,10 +44,6 @@ public class ClientSqlColumnMetadataRequest {
req.readBinary(in);
- JdbcMetaColumnsResult res = handler.columnsMeta(req);
-
- res.writeBinary(out);
-
- return null;
+ return handler.columnsMetaAsync(req).thenAccept(res -> res.writeBinary(out));
}
}
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteBatchRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteBatchRequest.java
index d9eff7e..f0f2322 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteBatchRequest.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteBatchRequest.java
@@ -20,7 +20,6 @@ package org.apache.ignite.client.handler.requests.sql;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.proto.query.JdbcQueryEventHandler;
import org.apache.ignite.client.proto.query.event.BatchExecuteRequest;
-import org.apache.ignite.client.proto.query.event.BatchExecuteResult;
import org.apache.ignite.internal.client.proto.ClientMessagePacker;
import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
@@ -34,7 +33,7 @@ public class ClientSqlExecuteBatchRequest {
* @param in Client message unpacker.
* @param out Client message packer.
* @param handler Query event handler.
- * @return null value indicates synchronous operation.
+ * @return Operation future.
*/
public static CompletableFuture<Void> process(
ClientMessageUnpacker in,
@@ -45,10 +44,6 @@ public class ClientSqlExecuteBatchRequest {
req.readBinary(in);
- BatchExecuteResult res = handler.batch(req);
-
- res.writeBinary(out);
-
- return null;
+ return handler.batchAsync(req).thenAccept(res -> res.writeBinary(out));
}
}
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
index 8a3677d..3a78526 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
@@ -20,7 +20,6 @@ package org.apache.ignite.client.handler.requests.sql;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.proto.query.JdbcQueryEventHandler;
import org.apache.ignite.client.proto.query.event.QueryExecuteRequest;
-import org.apache.ignite.client.proto.query.event.QueryExecuteResult;
import org.apache.ignite.internal.client.proto.ClientMessagePacker;
import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
@@ -34,7 +33,7 @@ public class ClientSqlExecuteRequest {
* @param in Client message unpacker.
* @param out Client message packer.
* @param handler Query event handler.
- * @return null value indicates synchronous operation.
+ * @return Operation future.
*/
public static CompletableFuture<Void> execute(
ClientMessageUnpacker in,
@@ -45,10 +44,6 @@ public class ClientSqlExecuteRequest {
req.readBinary(in);
- QueryExecuteResult res = handler.query(req);
-
- res.writeBinary(out);
-
- return null;
+ return handler.queryAsync(req).thenAccept(res -> res.writeBinary(out));
}
}
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlFetchRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlFetchRequest.java
index 6a08deb..3168bcf 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlFetchRequest.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlFetchRequest.java
@@ -20,7 +20,6 @@ package org.apache.ignite.client.handler.requests.sql;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.proto.query.JdbcQueryEventHandler;
import org.apache.ignite.client.proto.query.event.QueryFetchRequest;
-import org.apache.ignite.client.proto.query.event.QueryFetchResult;
import org.apache.ignite.internal.client.proto.ClientMessagePacker;
import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
@@ -34,7 +33,7 @@ public class ClientSqlFetchRequest {
* @param in Client message unpacker.
* @param out Client message packer.
* @param handler Query event handler.
- * @return null value indicates synchronous operation.
+ * @return Operation future.
*/
public static CompletableFuture<Void> process(
ClientMessageUnpacker in,
@@ -45,10 +44,6 @@ public class ClientSqlFetchRequest {
req.readBinary(in);
- QueryFetchResult res = handler.fetch(req);
-
- res.writeBinary(out);
-
- return null;
+ return handler.fetchAsync(req).thenAccept(res -> res.writeBinary(out));
}
}
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlPrimaryKeyMetadataRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlPrimaryKeyMetadataRequest.java
index 3cbefad..07c94ad 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlPrimaryKeyMetadataRequest.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlPrimaryKeyMetadataRequest.java
@@ -20,7 +20,6 @@ package org.apache.ignite.client.handler.requests.sql;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.proto.query.JdbcQueryEventHandler;
import org.apache.ignite.client.proto.query.event.JdbcMetaPrimaryKeysRequest;
-import org.apache.ignite.client.proto.query.event.JdbcMetaPrimaryKeysResult;
import org.apache.ignite.internal.client.proto.ClientMessagePacker;
import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
@@ -34,7 +33,7 @@ public class ClientSqlPrimaryKeyMetadataRequest {
* @param in Client message unpacker.
* @param out Client message packer.
* @param handler Query event handler.
- * @return null value indicates synchronous operation.
+ * @return Operation future.
*/
public static CompletableFuture<Void> process(
ClientMessageUnpacker in,
@@ -45,10 +44,6 @@ public class ClientSqlPrimaryKeyMetadataRequest {
req.readBinary(in);
- JdbcMetaPrimaryKeysResult res = handler.primaryKeysMeta(req);
-
- res.writeBinary(out);
-
- return null;
+ return handler.primaryKeysMetaAsync(req).thenAccept(res -> res.writeBinary(out));
}
}
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlQueryMetadataRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlQueryMetadataRequest.java
index 1b46df7..9524097 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlQueryMetadataRequest.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlQueryMetadataRequest.java
@@ -19,7 +19,6 @@ package org.apache.ignite.client.handler.requests.sql;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.proto.query.JdbcQueryEventHandler;
-import org.apache.ignite.client.proto.query.event.JdbcMetaColumnsResult;
import org.apache.ignite.client.proto.query.event.JdbcQueryMetadataRequest;
import org.apache.ignite.internal.client.proto.ClientMessagePacker;
import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
@@ -34,7 +33,7 @@ public class ClientSqlQueryMetadataRequest {
* @param in Client message unpacker.
* @param out Client message packer.
* @param handler Query event handler.
- * @return null value indicates synchronous operation.
+ * @return Operation future.
*/
public static CompletableFuture<Void> process(
ClientMessageUnpacker in,
@@ -45,10 +44,6 @@ public class ClientSqlQueryMetadataRequest {
req.readBinary(in);
- JdbcMetaColumnsResult res = handler.queryMetadata(req);
-
- res.writeBinary(out);
-
- return null;
+ return handler.queryMetadataAsync(req).thenAccept(res -> res.writeBinary(out));
}
}
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlSchemasMetadataRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlSchemasMetadataRequest.java
index a751ca2..3c2390e 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlSchemasMetadataRequest.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlSchemasMetadataRequest.java
@@ -20,7 +20,6 @@ package org.apache.ignite.client.handler.requests.sql;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.proto.query.JdbcQueryEventHandler;
import org.apache.ignite.client.proto.query.event.JdbcMetaSchemasRequest;
-import org.apache.ignite.client.proto.query.event.JdbcMetaSchemasResult;
import org.apache.ignite.internal.client.proto.ClientMessagePacker;
import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
@@ -34,7 +33,7 @@ public class ClientSqlSchemasMetadataRequest {
* @param in Client message unpacker.
* @param out Client message packer.
* @param handler Query event handler.
- * @return null value indicates synchronous operation.
+ * @return Operation future.
*/
public static CompletableFuture<Void> process(
ClientMessageUnpacker in,
@@ -45,10 +44,6 @@ public class ClientSqlSchemasMetadataRequest {
req.readBinary(in);
- JdbcMetaSchemasResult res = handler.schemasMeta(req);
-
- res.writeBinary(out);
-
- return null;
+ return handler.schemasMetaAsync(req).thenAccept(res -> res.writeBinary(out));
}
}
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlTableMetadataRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlTableMetadataRequest.java
index 3c9893f..9f7cb4b 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlTableMetadataRequest.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlTableMetadataRequest.java
@@ -20,7 +20,6 @@ package org.apache.ignite.client.handler.requests.sql;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.proto.query.JdbcQueryEventHandler;
import org.apache.ignite.client.proto.query.event.JdbcMetaTablesRequest;
-import org.apache.ignite.client.proto.query.event.JdbcMetaTablesResult;
import org.apache.ignite.internal.client.proto.ClientMessagePacker;
import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
@@ -34,7 +33,7 @@ public class ClientSqlTableMetadataRequest {
* @param in Client message unpacker.
* @param out Client message packer.
* @param handler Query event handler.
- * @return null value indicates synchronous operation.
+ * @return Operation future.
*/
public static CompletableFuture<Void> process(
ClientMessageUnpacker in,
@@ -45,10 +44,6 @@ public class ClientSqlTableMetadataRequest {
req.readBinary(in);
- JdbcMetaTablesResult res = handler.tablesMeta(req);
-
- res.writeBinary(out);
-
- return null;
+ return handler.tablesMetaAsync(req).thenAccept(res -> res.writeBinary(out));
}
}
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/JdbcMetadataCatalog.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/JdbcMetadataCatalog.java
index 64d2d87..fdffb98 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/JdbcMetadataCatalog.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/JdbcMetadataCatalog.java
@@ -22,11 +22,11 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
-import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
+import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.ignite.client.proto.query.event.JdbcColumnMeta;
@@ -93,24 +93,18 @@ public class JdbcMetadataCatalog {
*
* @param schemaNamePtrn Sql pattern for schema name.
* @param tblNamePtrn Sql pattern for table name.
- * @return Collection of primary keys information for tables that matches specified schema and table name patterns.
+ * @return Future of the collection of primary keys information for tables that matches specified schema and table name patterns.
*/
- public Collection<JdbcPrimaryKeyMeta> getPrimaryKeys(String schemaNamePtrn, String tblNamePtrn) {
- Collection<JdbcPrimaryKeyMeta> metaSet = new HashSet<>();
-
+ public CompletableFuture<Collection<JdbcPrimaryKeyMeta>> getPrimaryKeys(String schemaNamePtrn, String tblNamePtrn) {
String schemaNameRegex = translateSqlWildcardsToRegex(schemaNamePtrn);
String tlbNameRegex = translateSqlWildcardsToRegex(tblNamePtrn);
- tables.tables().stream()
+ return tables.tablesAsync().thenApply(tableList -> tableList.stream()
.filter(t -> matches(getTblSchema(t.name()), schemaNameRegex))
.filter(t -> matches(getTblName(t.name()), tlbNameRegex))
- .forEach(tbl -> {
- JdbcPrimaryKeyMeta meta = createPrimaryKeyMeta(tbl);
-
- metaSet.add(meta);
- });
-
- return metaSet;
+ .map(this::createPrimaryKeyMeta)
+ .collect(Collectors.toSet())
+ );
}
/**
@@ -124,21 +118,20 @@ public class JdbcMetadataCatalog {
* @param schemaNamePtrn Sql pattern for schema name.
* @param tblNamePtrn Sql pattern for table name.
* @param tblTypes Requested table types.
- * @return List of metadatas of tables that matches.
+ * @return Future of the list of metadatas of tables that matches.
*/
- public List<JdbcTableMeta> getTablesMeta(String schemaNamePtrn, String tblNamePtrn, String[] tblTypes) {
+ public CompletableFuture<List<JdbcTableMeta>> getTablesMeta(String schemaNamePtrn, String tblNamePtrn, String[] tblTypes) {
String schemaNameRegex = translateSqlWildcardsToRegex(schemaNamePtrn);
String tlbNameRegex = translateSqlWildcardsToRegex(tblNamePtrn);
- List<Table> tblsMeta = tables.tables().stream()
- .filter(t -> matches(getTblSchema(t.name()), schemaNameRegex))
- .filter(t -> matches(getTblName(t.name()), tlbNameRegex))
- .collect(Collectors.toList());
-
- return tblsMeta.stream()
- .sorted(byTblTypeThenSchemaThenTblName)
- .map(t -> new JdbcTableMeta(getTblSchema(t.name()), getTblName(t.name()), TBL_TYPE))
- .collect(Collectors.toList());
+ return tables.tablesAsync().thenApply(tablesList -> {
+ return tablesList.stream()
+ .filter(t -> matches(getTblSchema(t.name()), schemaNameRegex))
+ .filter(t -> matches(getTblName(t.name()), tlbNameRegex))
+ .sorted(byTblTypeThenSchemaThenTblName)
+ .map(t -> new JdbcTableMeta(getTblSchema(t.name()), getTblName(t.name()), TBL_TYPE))
+ .collect(Collectors.toList());
+ });
}
/**
@@ -149,19 +142,18 @@ public class JdbcMetadataCatalog {
* @param schemaNamePtrn Schema name java regex pattern.
* @param tblNamePtrn Table name java regex pattern.
* @param colNamePtrn Column name java regex pattern.
- * @return List of metadatas about columns that match specified schema/tablename/columnname criterias.
+ * @return Future of the list of metadatas about columns that match specified schema/tablename/columnname criterias.
*/
- public Collection<JdbcColumnMeta> getColumnsMeta(String schemaNamePtrn, String tblNamePtrn, String colNamePtrn) {
- Collection<JdbcColumnMeta> metas = new LinkedHashSet<>();
-
+ public CompletableFuture<Collection<JdbcColumnMeta>> getColumnsMeta(String schemaNamePtrn, String tblNamePtrn, String colNamePtrn) {
String schemaNameRegex = translateSqlWildcardsToRegex(schemaNamePtrn);
String tlbNameRegex = translateSqlWildcardsToRegex(tblNamePtrn);
String colNameRegex = translateSqlWildcardsToRegex(colNamePtrn);
- tables.tables().stream()
- .filter(t -> matches(getTblSchema(t.name()), schemaNameRegex))
- .filter(t -> matches(getTblName(t.name()), tlbNameRegex))
- .flatMap(
+ return tables.tablesAsync().thenApply(tablesList -> {
+ return tablesList.stream()
+ .filter(t -> matches(getTblSchema(t.name()), schemaNameRegex))
+ .filter(t -> matches(getTblName(t.name()), tlbNameRegex))
+ .flatMap(
tbl -> {
SchemaDescriptor schema = ((TableImpl) tbl).schemaView().schema();
@@ -177,17 +169,11 @@ public class JdbcMetadataCatalog {
return tblColPairs.stream();
})
- .filter(e -> matches(e.getSecond().name(), colNameRegex))
- .sorted(bySchemaThenTabNameThenColOrder)
- .forEachOrdered(pair -> {
- JdbcColumnMeta colMeta = createColumnMeta(pair.getFirst(), pair.getSecond());
-
- if (!metas.contains(colMeta)) {
- metas.add(colMeta);
- }
- });
-
- return metas;
+ .filter(e -> matches(e.getSecond().name(), colNameRegex))
+ .sorted(bySchemaThenTabNameThenColOrder)
+ .map(pair -> createColumnMeta(pair.getFirst(), pair.getSecond()))
+ .collect(Collectors.toCollection(LinkedHashSet::new));
+ });
}
/**
@@ -196,9 +182,9 @@ public class JdbcMetadataCatalog {
* <p>Ignite has only one possible CATALOG_NAME, it is handled on the client (driver) side.
*
* @param schemaNamePtrn Sql pattern for schema name filter.
- * @return schema names that matches provided pattern.
+ * @return Future of the schema names that matches provided pattern.
*/
- public Collection<String> getSchemasMeta(String schemaNamePtrn) {
+ public CompletableFuture<Collection<String>> getSchemasMeta(String schemaNamePtrn) {
SortedSet<String> schemas = new TreeSet<>(); // to have values sorted.
String schemaNameRegex = translateSqlWildcardsToRegex(schemaNamePtrn);
@@ -207,12 +193,12 @@ public class JdbcMetadataCatalog {
schemas.add(DEFAULT_SCHEMA_NAME);
}
- tables.tables().stream()
- .map(tbl -> getTblSchema(tbl.name()))
- .filter(schema -> matches(schema, schemaNameRegex))
- .forEach(schemas::add);
-
- return schemas;
+ return tables.tablesAsync().thenApply(tablesList ->
+ tablesList.stream()
+ .map(tbl -> getTblSchema(tbl.name()))
+ .filter(schema -> matches(schema, schemaNameRegex))
+ .collect(Collectors.toCollection(() -> schemas))
+ );
}
/**
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
index 86f4d74..32ccfe3 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
@@ -134,16 +134,17 @@ public class TcpIgniteClient implements IgniteClient {
}
/**
- * Send ClientMessage request to server size and reads ClientMessage result.
+ * Sends ClientMessage request to server side asynchronously and returns result future.
*
* @param opCode Operation code.
* @param req ClientMessage request.
* @param res ClientMessage result.
+ * @return Response future.
*/
- public void sendRequest(int opCode, ClientMessage req, ClientMessage res) {
- ch.serviceAsync(opCode, w -> req.writeBinary(w.out()), p -> {
+ public <T extends ClientMessage> CompletableFuture<T> sendRequestAsync(int opCode, ClientMessage req, T res) {
+ return ch.serviceAsync(opCode, w -> req.writeBinary(w.out()), p -> {
res.readBinary(p.in());
return res;
- }).join();
+ });
}
}
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/query/JdbcClientQueryEventHandler.java b/modules/client/src/main/java/org/apache/ignite/internal/client/query/JdbcClientQueryEventHandler.java
index 74099d2..9dbcbfb 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/query/JdbcClientQueryEventHandler.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/query/JdbcClientQueryEventHandler.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.client.query;
+import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.proto.query.JdbcQueryEventHandler;
import org.apache.ignite.client.proto.query.event.BatchExecuteRequest;
import org.apache.ignite.client.proto.query.event.BatchExecuteResult;
@@ -56,91 +57,73 @@ public class JdbcClientQueryEventHandler implements JdbcQueryEventHandler {
/** {@inheritDoc} */
@Override
- public QueryExecuteResult query(QueryExecuteRequest req) {
+ public CompletableFuture<QueryExecuteResult> queryAsync(QueryExecuteRequest req) {
QueryExecuteResult res = new QueryExecuteResult();
- client.sendRequest(ClientOp.SQL_EXEC, req, res);
-
- return res;
+ return client.sendRequestAsync(ClientOp.SQL_EXEC, req, res);
}
/** {@inheritDoc} */
@Override
- public QueryFetchResult fetch(QueryFetchRequest req) {
+ public CompletableFuture<QueryFetchResult> fetchAsync(QueryFetchRequest req) {
QueryFetchResult res = new QueryFetchResult();
- client.sendRequest(ClientOp.SQL_NEXT, req, res);
-
- return res;
+ return client.sendRequestAsync(ClientOp.SQL_NEXT, req, res);
}
/** {@inheritDoc} */
@Override
- public BatchExecuteResult batch(BatchExecuteRequest req) {
+ public CompletableFuture<BatchExecuteResult> batchAsync(BatchExecuteRequest req) {
BatchExecuteResult res = new BatchExecuteResult();
- client.sendRequest(ClientOp.SQL_EXEC_BATCH, req, res);
-
- return res;
+ return client.sendRequestAsync(ClientOp.SQL_EXEC_BATCH, req, res);
}
/** {@inheritDoc} */
@Override
- public QueryCloseResult close(QueryCloseRequest req) {
+ public CompletableFuture<QueryCloseResult> closeAsync(QueryCloseRequest req) {
QueryCloseResult res = new QueryCloseResult();
- client.sendRequest(ClientOp.SQL_CURSOR_CLOSE, req, res);
-
- return res;
+ return client.sendRequestAsync(ClientOp.SQL_CURSOR_CLOSE, req, res);
}
/** {@inheritDoc} */
@Override
- public JdbcMetaTablesResult tablesMeta(JdbcMetaTablesRequest req) {
+ public CompletableFuture<JdbcMetaTablesResult> tablesMetaAsync(JdbcMetaTablesRequest req) {
JdbcMetaTablesResult res = new JdbcMetaTablesResult();
- client.sendRequest(ClientOp.SQL_TABLE_META, req, res);
-
- return res;
+ return client.sendRequestAsync(ClientOp.SQL_TABLE_META, req, res);
}
/** {@inheritDoc} */
@Override
- public JdbcMetaColumnsResult columnsMeta(JdbcMetaColumnsRequest req) {
+ public CompletableFuture<JdbcMetaColumnsResult> columnsMetaAsync(JdbcMetaColumnsRequest req) {
JdbcMetaColumnsResult res = new JdbcMetaColumnsResult();
- client.sendRequest(ClientOp.SQL_COLUMN_META, req, res);
-
- return res;
+ return client.sendRequestAsync(ClientOp.SQL_COLUMN_META, req, res);
}
/** {@inheritDoc} */
@Override
- public JdbcMetaSchemasResult schemasMeta(JdbcMetaSchemasRequest req) {
+ public CompletableFuture<JdbcMetaSchemasResult> schemasMetaAsync(JdbcMetaSchemasRequest req) {
JdbcMetaSchemasResult res = new JdbcMetaSchemasResult();
- client.sendRequest(ClientOp.SQL_SCHEMAS_META, req, res);
-
- return res;
+ return client.sendRequestAsync(ClientOp.SQL_SCHEMAS_META, req, res);
}
/** {@inheritDoc} */
@Override
- public JdbcMetaPrimaryKeysResult primaryKeysMeta(JdbcMetaPrimaryKeysRequest req) {
+ public CompletableFuture<JdbcMetaPrimaryKeysResult> primaryKeysMetaAsync(JdbcMetaPrimaryKeysRequest req) {
JdbcMetaPrimaryKeysResult res = new JdbcMetaPrimaryKeysResult();
- client.sendRequest(ClientOp.SQL_PK_META, req, res);
-
- return res;
+ return client.sendRequestAsync(ClientOp.SQL_PK_META, req, res);
}
/** {@inheritDoc} */
@Override
- public JdbcMetaColumnsResult queryMetadata(JdbcQueryMetadataRequest req) {
+ public CompletableFuture<JdbcMetaColumnsResult> queryMetadataAsync(JdbcQueryMetadataRequest req) {
JdbcMetaColumnsResult res = new JdbcMetaColumnsResult();
- client.sendRequest(ClientOp.SQL_QUERY_META, req, res);
-
- return res;
+ return client.sendRequestAsync(ClientOp.SQL_QUERY_META, req, res);
}
}
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/jdbc/JdbcDatabaseMetadata.java b/modules/client/src/main/java/org/apache/ignite/internal/jdbc/JdbcDatabaseMetadata.java
index e1c21e8..b0ea555 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/jdbc/JdbcDatabaseMetadata.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/jdbc/JdbcDatabaseMetadata.java
@@ -863,7 +863,7 @@ public class JdbcDatabaseMetadata implements DatabaseMetaData {
}
JdbcMetaTablesResult res
- = conn.handler().tablesMeta(new JdbcMetaTablesRequest(schemaPtrn, tblNamePtrn, tblTypes));
+ = conn.handler().tablesMetaAsync(new JdbcMetaTablesRequest(schemaPtrn, tblNamePtrn, tblTypes)).join();
if (!res.hasResults()) {
throw IgniteQueryErrorCode.createJdbcSqlException(res.err(), res.status());
@@ -898,7 +898,7 @@ public class JdbcDatabaseMetadata implements DatabaseMetaData {
return new JdbcResultSet(Collections.emptyList(), meta);
}
- JdbcMetaSchemasResult res = conn.handler().schemasMeta(new JdbcMetaSchemasRequest(schemaPtrn));
+ JdbcMetaSchemasResult res = conn.handler().schemasMetaAsync(new JdbcMetaSchemasRequest(schemaPtrn)).join();
if (!res.hasResults()) {
throw IgniteQueryErrorCode.createJdbcSqlException(res.err(), res.status());
@@ -971,7 +971,8 @@ public class JdbcDatabaseMetadata implements DatabaseMetaData {
return new JdbcResultSet(Collections.emptyList(), meta);
}
- JdbcMetaColumnsResult res = conn.handler().columnsMeta(new JdbcMetaColumnsRequest(schemaPtrn, tblNamePtrn, colNamePtrn));
+ JdbcMetaColumnsResult res = conn.handler().columnsMetaAsync(new JdbcMetaColumnsRequest(schemaPtrn, tblNamePtrn, colNamePtrn))
+ .join();
if (!res.hasResults()) {
throw IgniteQueryErrorCode.createJdbcSqlException(res.err(), res.status());
@@ -1065,7 +1066,7 @@ public class JdbcDatabaseMetadata implements DatabaseMetaData {
return new JdbcResultSet(Collections.emptyList(), meta);
}
- JdbcMetaPrimaryKeysResult res = conn.handler().primaryKeysMeta(new JdbcMetaPrimaryKeysRequest(schema, tbl));
+ JdbcMetaPrimaryKeysResult res = conn.handler().primaryKeysMetaAsync(new JdbcMetaPrimaryKeysRequest(schema, tbl)).join();
if (!res.hasResults()) {
throw IgniteQueryErrorCode.createJdbcSqlException(res.err(), res.status());
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/jdbc/JdbcResultSet.java b/modules/client/src/main/java/org/apache/ignite/internal/jdbc/JdbcResultSet.java
index 690a81a..72d670f 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/jdbc/JdbcResultSet.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/jdbc/JdbcResultSet.java
@@ -207,7 +207,7 @@ public class JdbcResultSet implements ResultSet {
ensureNotClosed();
if ((rowsIter == null || !rowsIter.hasNext()) && !finished) {
- QueryFetchResult res = qryHandler.fetch(new QueryFetchRequest(cursorId, fetchSize));
+ QueryFetchResult res = qryHandler.fetchAsync(new QueryFetchRequest(cursorId, fetchSize)).join();
if (!res.hasResults()) {
throw IgniteQueryErrorCode.createJdbcSqlException(res.err(), res.status());
@@ -259,7 +259,7 @@ public class JdbcResultSet implements ResultSet {
try {
if (stmt != null && (!finished || (isQuery && !autoClose))) {
- QueryCloseResult res = qryHandler.close(new QueryCloseRequest(cursorId));
+ QueryCloseResult res = qryHandler.closeAsync(new QueryCloseRequest(cursorId)).join();
if (!res.hasResults()) {
throw IgniteQueryErrorCode.createJdbcSqlException(res.err(), res.status());
@@ -2186,7 +2186,7 @@ public class JdbcResultSet implements ResultSet {
}
if (!metaInit) {
- JdbcMetaColumnsResult res = qryHandler.queryMetadata(new JdbcQueryMetadataRequest(cursorId));
+ JdbcMetaColumnsResult res = qryHandler.queryMetadataAsync(new JdbcQueryMetadataRequest(cursorId)).join();
meta = res.meta();
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/jdbc/JdbcStatement.java b/modules/client/src/main/java/org/apache/ignite/internal/jdbc/JdbcStatement.java
index e54ba62..b10b22b 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/jdbc/JdbcStatement.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/jdbc/JdbcStatement.java
@@ -130,7 +130,7 @@ public class JdbcStatement implements Statement {
QueryExecuteRequest req = new QueryExecuteRequest(schema, pageSize, maxRows, sql,
args == null ? ArrayUtils.OBJECT_EMPTY_ARRAY : args.toArray());
- QueryExecuteResult res = conn.handler().query(req);
+ QueryExecuteResult res = conn.handler().queryAsync(req).join();
if (!res.hasResults()) {
throw IgniteQueryErrorCode.createJdbcSqlException(res.err(), res.status());
@@ -547,7 +547,7 @@ public class JdbcStatement implements Statement {
BatchExecuteRequest req = new BatchExecuteRequest(conn.getSchema(), batch, conn.getAutoCommit());
try {
- BatchExecuteResult res = conn.handler().batch(req);
+ BatchExecuteResult res = conn.handler().batchAsync(req).join();
if (!res.hasResults()) {
throw new BatchUpdateException(res.err(),
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItNoThreadsLeftTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItNoThreadsLeftTest.java
index 484efab..032fc99 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItNoThreadsLeftTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItNoThreadsLeftTest.java
@@ -121,6 +121,7 @@ public class ItNoThreadsLeftTest extends IgniteAbstractTest {
!thread.getName().startsWith("nioEventLoopGroup")
&& !thread.getName().startsWith("globalEventExecutor")
&& !thread.getName().startsWith("ForkJoinPool")
+ && !thread.getName().startsWith("process reaper")
&& !thread.getName().startsWith("parallel"))
.collect(Collectors.toSet());
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index a7b7232..b8659cd 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -998,33 +998,35 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
* @return Future representing pending completion of the operation.
*/
private CompletableFuture<List<Table>> tablesAsyncInternal() {
- List<IgniteUuid> tableIds = directTableIds();
+ // TODO: IGNITE-16288 directTableIds should use async configuration API
+ return CompletableFuture.supplyAsync(this::directTableIds)
+ .thenCompose(tableIds -> {
+ var tableFuts = new CompletableFuture[tableIds.size()];
- var tableFuts = new CompletableFuture[tableIds.size()];
+ var i = 0;
- var i = 0;
-
- for (IgniteUuid tblId : tableIds) {
- tableFuts[i++] = tableAsyncInternal(tblId, false);
- }
+ for (IgniteUuid tblId : tableIds) {
+ tableFuts[i++] = tableAsyncInternal(tblId, false);
+ }
- return CompletableFuture.allOf(tableFuts).thenApply(unused -> {
- var tables = new ArrayList<Table>(tableIds.size());
+ return CompletableFuture.allOf(tableFuts).thenApply(unused -> {
+ var tables = new ArrayList<Table>(tableIds.size());
- try {
- for (var fut : tableFuts) {
- var table = fut.get();
+ try {
+ for (var fut : tableFuts) {
+ var table = fut.get();
- if (table != null) {
- tables.add((Table) table);
- }
- }
- } catch (Throwable t) {
- throw new CompletionException(t);
- }
+ if (table != null) {
+ tables.add((Table) table);
+ }
+ }
+ } catch (Throwable t) {
+ throw new CompletionException(t);
+ }
- return tables;
- });
+ return tables;
+ });
+ });
}
/**