You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pt...@apache.org on 2022/03/31 11:37:37 UTC
[ignite-3] branch main updated: IGNITE-16734 Java thin: Add Compute (#757)
This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 16574b2 IGNITE-16734 Java thin: Add Compute (#757)
16574b2 is described below
commit 16574b2d22671f1b4dc3332231da0de3006f492a
Author: Pavel Tupitsyn <pt...@apache.org>
AuthorDate: Thu Mar 31 14:37:30 2022 +0300
IGNITE-16734 Java thin: Add Compute (#757)
* Implement `IgniteCompute` interface.
* `execute` and `broadcast` use the same `ClientOp#COMPUTE_EXECUTE`.
* For now all operations go through the default node; cluster awareness to be implemented separately (IGNITE-16771).
* Add temporary `Ignite#clusterNodes` to enable Compute usage while Cluster API is not available.
---
.../src/main/java/org/apache/ignite/Ignite.java | 21 ++-
.../internal/client/proto/ClientMessagePacker.java | 152 ++++++++-------
.../client/proto/ClientMessageUnpacker.java | 13 ++
.../ignite/internal/client/proto/ClientOp.java | 6 +
.../ignite/client/handler/ItClientHandlerTest.java | 4 +-
.../ignite/client/handler/ClientHandlerModule.java | 24 ++-
.../handler/ClientInboundMessageHandler.java | 30 ++-
.../cluster/ClientClusterGetNodesRequest.java | 54 ++++++
.../compute/ClientComputeExecuteRequest.java | 80 ++++++++
.../ignite/internal/client/TcpIgniteClient.java | 39 +++-
.../internal/client/compute/ClientCompute.java | 118 ++++++++++++
.../java/org/apache/ignite/client/TestServer.java | 5 +
.../org/apache/ignite/client/fakes/FakeIgnite.java | 14 ++
.../ignite/internal/compute/IgniteComputeImpl.java | 21 +++
.../runner/app/client/ItThinClientComputeTest.java | 206 +++++++++++++++++++++
.../org/apache/ignite/internal/app/IgniteImpl.java | 24 ++-
16 files changed, 730 insertions(+), 81 deletions(-)
diff --git a/modules/api/src/main/java/org/apache/ignite/Ignite.java b/modules/api/src/main/java/org/apache/ignite/Ignite.java
index c6f0b75..aab4a82 100644
--- a/modules/api/src/main/java/org/apache/ignite/Ignite.java
+++ b/modules/api/src/main/java/org/apache/ignite/Ignite.java
@@ -17,16 +17,19 @@
package org.apache.ignite;
+import java.util.Collection;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.IgniteCompute;
import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.table.manager.IgniteTables;
import org.apache.ignite.tx.IgniteTransactions;
import org.jetbrains.annotations.ApiStatus.Experimental;
/**
- * Ignite node interface. Main entry-point for all Ignite APIs.
+ * Ignite API entry point.
*/
public interface Ignite extends AutoCloseable {
/**
@@ -79,4 +82,20 @@ public interface Ignite extends AutoCloseable {
* @see ComputeJob
*/
IgniteCompute compute();
+
+ /**
+ * Gets the cluster nodes.
+ * NOTE: Temporary API to enable Compute until we have proper Cluster API.
+ *
+ * @return Collection of cluster nodes.
+ */
+ Collection<ClusterNode> clusterNodes();
+
+ /**
+ * Gets the cluster nodes.
+ * NOTE: Temporary API to enable Compute until we have proper Cluster API.
+ *
+ * @return Collection of cluster nodes.
+ */
+ CompletableFuture<Collection<ClusterNode>> clusterNodesAsync();
}
diff --git a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessagePacker.java b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessagePacker.java
index f35cd24..2d0ff70 100644
--- a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessagePacker.java
+++ b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessagePacker.java
@@ -785,6 +785,87 @@ public class ClientMessagePacker implements AutoCloseable {
}
/**
+ * Packs object type ({@link ClientDataType}) and value.
+ *
+ * @param obj Object.
+ */
+ public void packObjectWithType(Object obj) {
+ if (obj == null) {
+ packNil();
+
+ return;
+ }
+
+ Class<?> cls = obj.getClass();
+
+ if (cls == Boolean.class) {
+ packInt(ClientDataType.BOOLEAN);
+ packBoolean((Boolean) obj);
+ } else if (cls == Byte.class) {
+ packInt(ClientDataType.INT8);
+ packByte((Byte) obj);
+ } else if (cls == Short.class) {
+ packInt(ClientDataType.INT16);
+ packShort((Short) obj);
+ } else if (cls == Integer.class) {
+ packInt(ClientDataType.INT32);
+ packInt((Integer) obj);
+ } else if (cls == Long.class) {
+ packInt(ClientDataType.INT64);
+ packLong((Long) obj);
+ } else if (cls == Float.class) {
+ packInt(ClientDataType.FLOAT);
+ packFloat((Float) obj);
+ } else if (cls == Double.class) {
+ packInt(ClientDataType.DOUBLE);
+ packDouble((Double) obj);
+ } else if (cls == String.class) {
+ packInt(ClientDataType.STRING);
+ packString((String) obj);
+ } else if (cls == UUID.class) {
+ packInt(ClientDataType.UUID);
+ packUuid((UUID) obj);
+ } else if (cls == LocalDate.class) {
+ packInt(ClientDataType.DATE);
+ packDate((LocalDate) obj);
+ } else if (cls == LocalTime.class) {
+ packInt(ClientDataType.TIME);
+ packTime((LocalTime) obj);
+ } else if (cls == LocalDateTime.class) {
+ packInt(ClientDataType.DATETIME);
+ packDateTime((LocalDateTime) obj);
+ } else if (cls == Instant.class) {
+ packInt(ClientDataType.TIMESTAMP);
+ packTimestamp((Instant) obj);
+ } else if (cls == byte[].class) {
+ packInt(ClientDataType.BYTES);
+
+ packBinaryHeader(((byte[]) obj).length);
+ writePayload((byte[]) obj);
+ } else if (cls == Date.class) {
+ packInt(ClientDataType.DATE);
+ packDate(((Date) obj).toLocalDate());
+ } else if (cls == Time.class) {
+ packInt(ClientDataType.TIME);
+ packTime(((Time) obj).toLocalTime());
+ } else if (cls == Timestamp.class) {
+ packInt(ClientDataType.TIMESTAMP);
+ packTimestamp(((java.util.Date) obj).toInstant());
+ } else if (cls == BigDecimal.class) {
+ packInt(ClientDataType.DECIMAL);
+ packDecimal(((BigDecimal) obj));
+ } else if (cls == BigInteger.class) {
+ packInt(ClientDataType.BIGINTEGER);
+ packBigInteger(((BigInteger) obj));
+ } else if (cls == BitSet.class) {
+ packInt(ClientDataType.BITMASK);
+ packBitSet((BitSet) obj);
+ } else {
+ throw new UnsupportedOperationException("Custom objects are not supported");
+ }
+ }
+
+ /**
* Packs an array of different objects.
*
* @param args Object array.
@@ -802,76 +883,7 @@ public class ClientMessagePacker implements AutoCloseable {
packArrayHeader(args.length);
for (Object arg : args) {
- if (arg == null) {
- packNil();
-
- continue;
- }
-
- Class<?> cls = arg.getClass();
-
- if (cls == Boolean.class) {
- packInt(ClientDataType.BOOLEAN);
- packBoolean((Boolean) arg);
- } else if (cls == Byte.class) {
- packInt(ClientDataType.INT8);
- packByte((Byte) arg);
- } else if (cls == Short.class) {
- packInt(ClientDataType.INT16);
- packShort((Short) arg);
- } else if (cls == Integer.class) {
- packInt(ClientDataType.INT32);
- packInt((Integer) arg);
- } else if (cls == Long.class) {
- packInt(ClientDataType.INT64);
- packLong((Long) arg);
- } else if (cls == Float.class) {
- packInt(ClientDataType.FLOAT);
- packFloat((Float) arg);
- } else if (cls == Double.class) {
- packInt(ClientDataType.DOUBLE);
- packDouble((Double) arg);
- } else if (cls == String.class) {
- packInt(ClientDataType.STRING);
- packString((String) arg);
- } else if (cls == UUID.class) {
- packInt(ClientDataType.UUID);
- packUuid((UUID) arg);
- } else if (cls == LocalDate.class) {
- packInt(ClientDataType.DATE);
- packDate((LocalDate) arg);
- } else if (cls == LocalTime.class) {
- packInt(ClientDataType.TIME);
- packTime((LocalTime) arg);
- } else if (cls == LocalDateTime.class) {
- packInt(ClientDataType.DATETIME);
- packDateTime((LocalDateTime) arg);
- } else if (cls == Instant.class) {
- packInt(ClientDataType.TIMESTAMP);
- packTimestamp((Instant) arg);
- } else if (cls == byte[].class) {
- packInt(ClientDataType.BYTES);
-
- packBinaryHeader(((byte[]) arg).length);
- writePayload((byte[]) arg);
- } else if (cls == Date.class) {
- packInt(ClientDataType.DATE);
- packDate(((Date) arg).toLocalDate());
- } else if (cls == Time.class) {
- packInt(ClientDataType.TIME);
- packTime(((Time) arg).toLocalTime());
- } else if (cls == Timestamp.class) {
- packInt(ClientDataType.TIMESTAMP);
- packTimestamp(((java.util.Date) arg).toInstant());
- } else if (cls == BigDecimal.class) {
- packInt(ClientDataType.DECIMAL);
- packDecimal(((BigDecimal) arg));
- } else if (cls == BigInteger.class) {
- packInt(ClientDataType.BIGINTEGER);
- packBigInteger(((BigInteger) arg));
- } else {
- throw new UnsupportedOperationException("Custom objects are not supported");
- }
+ packObjectWithType(arg);
}
}
diff --git a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessageUnpacker.java b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessageUnpacker.java
index 833f8e2..973dca0 100644
--- a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessageUnpacker.java
+++ b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessageUnpacker.java
@@ -938,6 +938,19 @@ public class ClientMessageUnpacker implements AutoCloseable {
}
/**
+ * Unpacks object type ({@link ClientDataType}) and value.
+ *
+ * @return Object value.
+ */
+ public Object unpackObjectWithType() {
+ if (tryUnpackNil()) {
+ return null;
+ }
+
+ return unpackObject(unpackInt());
+ }
+
+ /**
* Unpacks an object based on the specified type.
*
* @param dataType Data type code.
diff --git a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java
index 8916f00..3dad93f 100644
--- a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java
+++ b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java
@@ -122,4 +122,10 @@ public class ClientOp {
/** Execute prepared statement batch query. */
public static final int SQL_EXEC_PS_BATCH = 46;
+
+ /** Execute compute job. */
+ public static final int COMPUTE_EXECUTE = 47;
+
+ /** Get cluster nodes. */
+ public static final int CLUSTER_GET_NODES = 48;
}
diff --git a/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java b/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
index 176a2d9..2779686 100644
--- a/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
+++ b/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
@@ -30,11 +30,13 @@ import java.net.Socket;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import org.apache.ignite.compute.IgniteCompute;
import org.apache.ignite.configuration.schemas.clientconnector.ClientConnectorConfiguration;
import org.apache.ignite.configuration.schemas.network.NetworkConfiguration;
import org.apache.ignite.internal.configuration.ConfigurationManager;
import org.apache.ignite.internal.configuration.storage.TestConfigurationStorage;
import org.apache.ignite.internal.sql.engine.QueryProcessor;
+import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.NettyBootstrapFactory;
import org.apache.ignite.table.manager.IgniteTables;
import org.apache.ignite.tx.IgniteTransactions;
@@ -206,7 +208,7 @@ public class ItClientHandlerTest {
bootstrapFactory.start();
var module = new ClientHandlerModule(mock(QueryProcessor.class), mock(IgniteTables.class), mock(IgniteTransactions.class), registry,
- bootstrapFactory);
+ mock(IgniteCompute.class), mock(ClusterService.class), bootstrapFactory);
module.start();
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
index 25cc93b..bd8ab25 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
@@ -30,6 +30,7 @@ import io.netty.handler.timeout.IdleStateHandler;
import java.net.BindException;
import java.net.SocketAddress;
import java.util.concurrent.TimeUnit;
+import org.apache.ignite.compute.IgniteCompute;
import org.apache.ignite.configuration.schemas.clientconnector.ClientConnectorConfiguration;
import org.apache.ignite.internal.client.proto.ClientMessageDecoder;
import org.apache.ignite.internal.configuration.ConfigurationRegistry;
@@ -37,6 +38,7 @@ import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.sql.engine.QueryProcessor;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.NettyBootstrapFactory;
import org.apache.ignite.table.manager.IgniteTables;
import org.apache.ignite.tx.IgniteTransactions;
@@ -64,6 +66,12 @@ public class ClientHandlerModule implements IgniteComponent {
/** Processor. */
private final QueryProcessor queryProcessor;
+ /** Compute. */
+ private final IgniteCompute igniteCompute;
+
+ /** Cluster. */
+ private final ClusterService clusterService;
+
/** Netty bootstrap factory. */
private final NettyBootstrapFactory bootstrapFactory;
@@ -74,6 +82,8 @@ public class ClientHandlerModule implements IgniteComponent {
* @param igniteTables Ignite.
* @param igniteTransactions Transactions.
* @param registry Configuration registry.
+ * @param igniteCompute Compute.
+ * @param clusterService Cluster.
* @param bootstrapFactory Bootstrap factory.
*/
public ClientHandlerModule(
@@ -81,15 +91,21 @@ public class ClientHandlerModule implements IgniteComponent {
IgniteTables igniteTables,
IgniteTransactions igniteTransactions,
ConfigurationRegistry registry,
+ IgniteCompute igniteCompute,
+ ClusterService clusterService,
NettyBootstrapFactory bootstrapFactory) {
assert igniteTables != null;
assert registry != null;
assert queryProcessor != null;
+ assert igniteCompute != null;
+ assert clusterService != null;
assert bootstrapFactory != null;
this.queryProcessor = queryProcessor;
this.igniteTables = igniteTables;
this.igniteTransactions = igniteTransactions;
+ this.igniteCompute = igniteCompute;
+ this.clusterService = clusterService;
this.registry = registry;
this.bootstrapFactory = bootstrapFactory;
}
@@ -159,7 +175,13 @@ public class ClientHandlerModule implements IgniteComponent {
ch.pipeline().addLast(
new ClientMessageDecoder(),
- new ClientInboundMessageHandler(igniteTables, igniteTransactions, queryProcessor, configuration));
+ new ClientInboundMessageHandler(
+ igniteTables,
+ igniteTransactions,
+ queryProcessor,
+ configuration,
+ igniteCompute,
+ clusterService));
}
})
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, configuration.connectTimeout());
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
index c625ee4..4554057 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -24,6 +24,8 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.BitSet;
import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.client.handler.requests.cluster.ClientClusterGetNodesRequest;
+import org.apache.ignite.client.handler.requests.compute.ClientComputeExecuteRequest;
import org.apache.ignite.client.handler.requests.sql.ClientSqlCloseRequest;
import org.apache.ignite.client.handler.requests.sql.ClientSqlColumnMetadataRequest;
import org.apache.ignite.client.handler.requests.sql.ClientSqlExecuteBatchRequest;
@@ -59,6 +61,7 @@ import org.apache.ignite.client.handler.requests.tx.ClientTransactionBeginReques
import org.apache.ignite.client.handler.requests.tx.ClientTransactionCommitRequest;
import org.apache.ignite.client.handler.requests.tx.ClientTransactionRollbackRequest;
import org.apache.ignite.client.proto.query.JdbcQueryEventHandler;
+import org.apache.ignite.compute.IgniteCompute;
import org.apache.ignite.configuration.schemas.clientconnector.ClientConnectorView;
import org.apache.ignite.internal.client.proto.ClientErrorCode;
import org.apache.ignite.internal.client.proto.ClientMessageCommon;
@@ -70,6 +73,7 @@ import org.apache.ignite.internal.client.proto.ServerMessageType;
import org.apache.ignite.internal.sql.engine.QueryProcessor;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.network.ClusterService;
import org.apache.ignite.table.manager.IgniteTables;
import org.apache.ignite.tx.IgniteTransactions;
@@ -96,30 +100,44 @@ public class ClientInboundMessageHandler extends ChannelInboundHandlerAdapter {
/** Configuration. */
private final ClientConnectorView configuration;
+ /** Compute. */
+ private final IgniteCompute compute;
+
+ /** Cluster. */
+ private final ClusterService clusterService;
+
/** Context. */
private ClientContext clientContext;
/**
* Constructor.
*
- * @param igniteTables Ignite tables API entry point.
+ * @param igniteTables Ignite tables API entry point.
* @param igniteTransactions Transactions API.
* @param processor Sql query processor.
* @param configuration Configuration.
+ * @param compute Compute.
+ * @param clusterService Cluster.
*/
public ClientInboundMessageHandler(
IgniteTables igniteTables,
IgniteTransactions igniteTransactions,
QueryProcessor processor,
- ClientConnectorView configuration) {
+ ClientConnectorView configuration,
+ IgniteCompute compute,
+ ClusterService clusterService) {
assert igniteTables != null;
assert igniteTransactions != null;
assert processor != null;
assert configuration != null;
+ assert compute != null;
+ assert clusterService != null;
this.igniteTables = igniteTables;
this.igniteTransactions = igniteTransactions;
this.configuration = configuration;
+ this.compute = compute;
+ this.clusterService = clusterService;
this.jdbcQueryEventHandler = new JdbcQueryEventHandlerImpl(processor, new JdbcMetadataCatalog(igniteTables));
}
@@ -215,6 +233,8 @@ public class ClientInboundMessageHandler extends ChannelInboundHandlerAdapter {
}
private void writeError(long requestId, Throwable err, ChannelHandlerContext ctx) {
+ LOG.error("Error processing client request", err);
+
var packer = getPacker(ctx.alloc());
try {
@@ -391,6 +411,12 @@ public class ClientInboundMessageHandler extends ChannelInboundHandlerAdapter {
case ClientOp.TX_ROLLBACK:
return ClientTransactionRollbackRequest.process(in, resources);
+ case ClientOp.COMPUTE_EXECUTE:
+ return ClientComputeExecuteRequest.process(in, out, compute, clusterService);
+
+ case ClientOp.CLUSTER_GET_NODES:
+ return ClientClusterGetNodesRequest.process(out, clusterService);
+
default:
throw new IgniteException("Unexpected operation code: " + opCode);
}
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/cluster/ClientClusterGetNodesRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/cluster/ClientClusterGetNodesRequest.java
new file mode 100644
index 0000000..294da97
--- /dev/null
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/cluster/ClientClusterGetNodesRequest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.handler.requests.cluster;
+
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.client.proto.ClientMessagePacker;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterService;
+
+/**
+ * Cluster nodes request.
+ */
+public class ClientClusterGetNodesRequest {
+ /**
+ * Processes the request.
+ *
+ * @param out Packer.
+ * @param clusterService Cluster.
+ * @return Future.
+ */
+ public static CompletableFuture<Void> process(
+ ClientMessagePacker out,
+ ClusterService clusterService) {
+ Collection<ClusterNode> nodes = clusterService.topologyService().allMembers();
+
+ out.packArrayHeader(nodes.size());
+
+ for (ClusterNode node : nodes) {
+ out.packString(node.id());
+ out.packString(node.name());
+ out.packString(node.address().host());
+ out.packInt(node.address().port());
+ }
+
+ // Null future indicates synchronous completion.
+ return null;
+ }
+}
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java
new file mode 100644
index 0000000..78c4ff5
--- /dev/null
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.handler.requests.compute;
+
+import static org.apache.ignite.internal.util.ArrayUtils.OBJECT_EMPTY_ARRAY;
+
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.compute.IgniteCompute;
+import org.apache.ignite.internal.client.proto.ClientMessagePacker;
+import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.NetworkAddress;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Compute execute request.
+ */
+public class ClientComputeExecuteRequest {
+ /**
+ * Processes the request.
+ *
+ * @param in Unpacker.
+ * @param out Packer.
+ * @param compute Compute.
+ * @param cluster Cluster.
+ * @return Future.
+ */
+ public static CompletableFuture<Void> process(
+ ClientMessageUnpacker in,
+ ClientMessagePacker out,
+ IgniteCompute compute,
+ ClusterService cluster) {
+ var node = in.tryUnpackNil()
+ ? cluster.topologyService().localMember()
+ : new ClusterNode(in.unpackString(), in.unpackString(), new NetworkAddress(in.unpackString(), in.unpackInt()));
+
+ String jobClassName = in.unpackString();
+
+ Object[] args = unpackArgs(in);
+
+ return compute.execute(Set.of(node), jobClassName, args).thenAccept(out::packObjectWithType);
+ }
+
+ @NotNull
+ private static Object[] unpackArgs(ClientMessageUnpacker in) {
+ if (in.tryUnpackNil()) {
+ return OBJECT_EMPTY_ARRAY;
+ }
+
+ int argCnt = in.unpackArrayHeader();
+
+ if (argCnt == 0) {
+ return OBJECT_EMPTY_ARRAY;
+ }
+
+ Object[] args = new Object[argCnt];
+
+ for (int i = 0; i < argCnt; i++) {
+ args[i] = in.unpackObjectWithType();
+ }
+ return args;
+ }
+}
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
index 6ad8fb5..2e33177 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
@@ -17,6 +17,11 @@
package org.apache.ignite.internal.client;
+import static org.apache.ignite.internal.client.ClientUtils.sync;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
@@ -25,9 +30,13 @@ import org.apache.ignite.client.IgniteClientConfiguration;
import org.apache.ignite.client.IgniteClientException;
import org.apache.ignite.client.proto.query.ClientMessage;
import org.apache.ignite.compute.IgniteCompute;
+import org.apache.ignite.internal.client.compute.ClientCompute;
import org.apache.ignite.internal.client.io.ClientConnectionMultiplexer;
+import org.apache.ignite.internal.client.proto.ClientOp;
import org.apache.ignite.internal.client.table.ClientTables;
import org.apache.ignite.internal.client.tx.ClientTransactions;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.table.manager.IgniteTables;
import org.apache.ignite.tx.IgniteTransactions;
@@ -47,6 +56,9 @@ public class TcpIgniteClient implements IgniteClient {
/** Transactions. */
private final ClientTransactions transactions;
+ /** Compute. */
+ private final ClientCompute compute;
+
/**
* Constructor.
*
@@ -74,6 +86,7 @@ public class TcpIgniteClient implements IgniteClient {
ch = new ReliableChannel(chFactory, cfg);
tables = new ClientTables(ch);
transactions = new ClientTransactions(ch);
+ compute = new ClientCompute(ch);
}
/**
@@ -119,7 +132,31 @@ public class TcpIgniteClient implements IgniteClient {
/** {@inheritDoc} */
@Override
public IgniteCompute compute() {
- throw new UnsupportedOperationException("Not implemented yet");
+ return compute;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Collection<ClusterNode> clusterNodes() {
+ return sync(clusterNodesAsync());
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public CompletableFuture<Collection<ClusterNode>> clusterNodesAsync() {
+ return ch.serviceAsync(ClientOp.CLUSTER_GET_NODES, r -> {
+ int cnt = r.in().unpackArrayHeader();
+ List<ClusterNode> res = new ArrayList<>(cnt);
+
+ for (int i = 0; i < cnt; i++) {
+ res.add(new ClusterNode(
+ r.in().unpackString(),
+ r.in().unpackString(),
+ new NetworkAddress(r.in().unpackString(), r.in().unpackInt())));
+ }
+
+ return res;
+ });
}
/** {@inheritDoc} */
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
new file mode 100644
index 0000000..aa17dc1
--- /dev/null
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.compute;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.IgniteCompute;
+import org.apache.ignite.internal.client.ReliableChannel;
+import org.apache.ignite.internal.client.proto.ClientOp;
+import org.apache.ignite.network.ClusterNode;
+
+/**
+ * Client compute implementation.
+ */
+public class ClientCompute implements IgniteCompute {
+ /** Channel. */
+ private final ReliableChannel ch;
+
+ /**
+ * Constructor.
+ *
+ * @param ch Channel.
+ */
+ public ClientCompute(ReliableChannel ch) {
+ this.ch = ch;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public <R> CompletableFuture<R> execute(Set<ClusterNode> nodes, Class<? extends ComputeJob<R>> jobClass, Object... args) {
+ return execute(nodes, jobClass.getName(), args);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public <R> CompletableFuture<R> execute(Set<ClusterNode> nodes, String jobClassName, Object... args) {
+ Objects.requireNonNull(nodes);
+ Objects.requireNonNull(jobClassName);
+
+ if (nodes.isEmpty()) {
+ throw new IllegalArgumentException("nodes must not be empty.");
+ }
+
+ // TODO: Cluster awareness (IGNITE-16771): match specified nodes to known connections.
+ ClusterNode node = randomNode(nodes);
+
+ return executeOnOneNode(node, jobClassName, args);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public <R> Map<ClusterNode, CompletableFuture<R>> broadcast(Set<ClusterNode> nodes, Class<? extends ComputeJob<R>> jobClass,
+ Object... args) {
+ return broadcast(nodes, jobClass.getName(), args);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public <R> Map<ClusterNode, CompletableFuture<R>> broadcast(Set<ClusterNode> nodes, String jobClassName, Object... args) {
+ Objects.requireNonNull(nodes);
+ Objects.requireNonNull(jobClassName);
+
+ Map<ClusterNode, CompletableFuture<R>> map = new HashMap<>(nodes.size());
+
+ for (ClusterNode node : nodes) {
+ if (map.put(node, executeOnOneNode(node, jobClassName, args)) != null) {
+ throw new IllegalStateException("Node can't be specified more than once: " + node);
+ }
+ }
+
+ return map;
+ }
+
+ private <R> CompletableFuture<R> executeOnOneNode(ClusterNode node, String jobClassName, Object[] args) {
+ return ch.serviceAsync(ClientOp.COMPUTE_EXECUTE, w -> {
+ // TODO: Cluster awareness (IGNITE-16771): if the specified node matches existing connection, send nil.
+ w.out().packString(node.id());
+ w.out().packString(node.name());
+ w.out().packString(node.address().host());
+ w.out().packInt(node.address().port());
+
+ w.out().packString(jobClassName);
+ w.out().packObjectArray(args);
+ }, r -> (R) r.in().unpackObjectWithType());
+ }
+
+ private ClusterNode randomNode(Set<ClusterNode> nodes) {
+ int nodesToSkip = ThreadLocalRandom.current().nextInt(nodes.size());
+
+ Iterator<ClusterNode> iterator = nodes.iterator();
+ for (int i = 0; i < nodesToSkip; i++) {
+ iterator.next();
+ }
+
+ return iterator.next();
+ }
+}
diff --git a/modules/client/src/test/java/org/apache/ignite/client/TestServer.java b/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
index 5f5560a..cbf84d1 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
@@ -18,16 +18,19 @@
package org.apache.ignite.client;
import static org.apache.ignite.configuration.annotation.ConfigurationType.LOCAL;
+import static org.mockito.Mockito.mock;
import java.util.List;
import java.util.Map;
import org.apache.ignite.Ignite;
import org.apache.ignite.client.fakes.FakeIgnite;
import org.apache.ignite.client.handler.ClientHandlerModule;
+import org.apache.ignite.compute.IgniteCompute;
import org.apache.ignite.configuration.schemas.clientconnector.ClientConnectorConfiguration;
import org.apache.ignite.configuration.schemas.network.NetworkConfiguration;
import org.apache.ignite.internal.configuration.ConfigurationRegistry;
import org.apache.ignite.internal.configuration.storage.TestConfigurationStorage;
+import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.NettyBootstrapFactory;
/**
@@ -77,6 +80,8 @@ public class TestServer implements AutoCloseable {
ignite.tables(),
ignite.transactions(),
cfg,
+ mock(IgniteCompute.class),
+ mock(ClusterService.class),
bootstrapFactory
);
diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgnite.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgnite.java
index 0082d63..9183f5d 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgnite.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgnite.java
@@ -17,11 +17,13 @@
package org.apache.ignite.client.fakes;
+import java.util.Collection;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.Ignite;
import org.apache.ignite.compute.IgniteCompute;
import org.apache.ignite.internal.sql.engine.QueryProcessor;
+import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.table.manager.IgniteTables;
import org.apache.ignite.tx.IgniteTransactions;
import org.apache.ignite.tx.Transaction;
@@ -85,6 +87,18 @@ public class FakeIgnite implements Ignite {
/** {@inheritDoc} */
@Override
+ public Collection<ClusterNode> clusterNodes() {
+ throw new UnsupportedOperationException("Not implemented yet");
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public CompletableFuture<Collection<ClusterNode>> clusterNodesAsync() {
+ throw new UnsupportedOperationException("Not implemented yet");
+ }
+
+ /** {@inheritDoc} */
+ @Override
public void close() {
// No-op.
}
diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
index 9feb6c7..feddc78 100644
--- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
+++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
@@ -21,6 +21,7 @@ import static java.util.stream.Collectors.toUnmodifiableMap;
import java.util.Iterator;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
@@ -46,12 +47,26 @@ public class IgniteComputeImpl implements IgniteCompute {
/** {@inheritDoc} */
@Override
public <R> CompletableFuture<R> execute(Set<ClusterNode> nodes, Class<? extends ComputeJob<R>> jobClass, Object... args) {
+ Objects.requireNonNull(nodes);
+ Objects.requireNonNull(jobClass);
+
+ if (nodes.isEmpty()) {
+ throw new IllegalArgumentException("nodes must not be empty.");
+ }
+
return executeOnOneNode(randomNode(nodes), jobClass, args);
}
/** {@inheritDoc} */
@Override
public <R> CompletableFuture<R> execute(Set<ClusterNode> nodes, String jobClassName, Object... args) {
+ Objects.requireNonNull(nodes);
+ Objects.requireNonNull(jobClassName);
+
+ if (nodes.isEmpty()) {
+ throw new IllegalArgumentException("nodes must not be empty.");
+ }
+
return executeOnOneNode(randomNode(nodes), jobClassName, args);
}
@@ -93,6 +108,9 @@ public class IgniteComputeImpl implements IgniteCompute {
Class<? extends ComputeJob<R>> jobClass,
Object... args
) {
+ Objects.requireNonNull(nodes);
+ Objects.requireNonNull(jobClass);
+
return nodes.stream()
.collect(toUnmodifiableMap(node -> node, node -> executeOnOneNode(node, jobClass, args)));
}
@@ -100,6 +118,9 @@ public class IgniteComputeImpl implements IgniteCompute {
/** {@inheritDoc} */
@Override
public <R> Map<ClusterNode, CompletableFuture<R>> broadcast(Set<ClusterNode> nodes, String jobClassName, Object... args) {
+ Objects.requireNonNull(nodes);
+ Objects.requireNonNull(jobClassName);
+
return nodes.stream()
.collect(toUnmodifiableMap(node -> node, node -> executeOnOneNode(node, jobClassName, args)));
}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
new file mode 100644
index 0000000..638343c
--- /dev/null
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.runner.app.client;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.stream.Collectors;
+import org.apache.ignite.client.IgniteClientException;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.JobExecutionContext;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.tx.TransactionException;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Thin client compute integration test.
+ */
+public class ItThinClientComputeTest extends ItAbstractThinClientTest {
+ @Test
+ void testClusterNodes() {
+ List<ClusterNode> nodes = sortedNodes();
+
+ assertEquals(2, nodes.size());
+
+ assertEquals("ItThinClientComputeTest_null_3344", nodes.get(0).name());
+ assertEquals(3344, nodes.get(0).address().port());
+ assertTrue(nodes.get(0).id().length() > 10);
+
+ assertEquals("ItThinClientComputeTest_null_3345", nodes.get(1).name());
+ assertEquals(3345, nodes.get(1).address().port());
+ assertTrue(nodes.get(1).id().length() > 10);
+ }
+
+ @Test
+ void testExecuteOnSpecificNode() {
+ String res1 = client().compute().execute(Set.of(node(0)), NodeNameJob.class).join();
+ String res2 = client().compute().execute(Set.of(node(1)), NodeNameJob.class).join();
+
+ assertEquals("ItThinClientComputeTest_null_3344", res1);
+ assertEquals("ItThinClientComputeTest_null_3345", res2);
+ }
+
+ @Test
+ void testExecuteOnRandomNode() {
+ String res = client().compute().execute(new HashSet<>(sortedNodes()), NodeNameJob.class).join();
+
+ assertTrue(Set.of("ItThinClientComputeTest_null_3344", "ItThinClientComputeTest_null_3345").contains(res));
+ }
+
+ @Test
+ void testBroadcastOneNode() {
+ Map<ClusterNode, CompletableFuture<String>> futuresPerNode = client().compute().broadcast(
+ Set.of(node(1)),
+ NodeNameJob.class,
+ "_",
+ 123);
+
+ assertEquals(1, futuresPerNode.size());
+
+ String res = futuresPerNode.get(node(1)).join();
+
+ assertEquals("ItThinClientComputeTest_null_3345__123", res);
+ }
+
+ @Test
+ void testBroadcastAllNodes() {
+ Map<ClusterNode, CompletableFuture<String>> futuresPerNode = client().compute().broadcast(
+ new HashSet<>(sortedNodes()),
+ NodeNameJob.class,
+ "_",
+ 123);
+
+ assertEquals(2, futuresPerNode.size());
+
+ String res1 = futuresPerNode.get(node(0)).join();
+ String res2 = futuresPerNode.get(node(1)).join();
+
+ assertEquals("ItThinClientComputeTest_null_3344__123", res1);
+ assertEquals("ItThinClientComputeTest_null_3345__123", res2);
+ }
+
+ @Test
+ void testExecuteWithArgs() {
+ var nodes = new HashSet<>(client().clusterNodes());
+ String res = client().compute().execute(nodes, ConcatJob.class, 1, "2", 3.3).join();
+
+ assertEquals("1_2_3.3", res);
+ }
+
+ @Test
+ void testJobErrorPropagatesToClientWithClassAndMessage() {
+ CompletionException ex = assertThrows(
+ CompletionException.class,
+ () -> client().compute().execute(Set.of(node(0)), ErrorJob.class).join());
+
+ IgniteClientException cause = (IgniteClientException) ex.getCause();
+
+ assertThat(cause.getMessage(), containsString("TransactionException: Custom job error"));
+ }
+
+ @Test
+ void testAllSupportedArgTypes() {
+ testEchoArg(Byte.MAX_VALUE);
+ testEchoArg(Short.MAX_VALUE);
+ testEchoArg(Integer.MAX_VALUE);
+ testEchoArg(Long.MAX_VALUE);
+ testEchoArg(Float.MAX_VALUE);
+ testEchoArg(Double.MAX_VALUE);
+ testEchoArg(BigDecimal.TEN);
+ testEchoArg(UUID.randomUUID());
+ testEchoArg("string");
+ testEchoArg(new byte[] {1, 2, 3});
+ testEchoArg(new BitSet(10));
+ testEchoArg(LocalDate.now());
+ testEchoArg(LocalTime.now());
+ testEchoArg(LocalDateTime.now());
+ testEchoArg(Instant.now());
+ testEchoArg(true);
+
+ // TODO IGNITE-16772
+ // testEchoArg(BigInteger.TEN);
+ }
+
+ private void testEchoArg(Object arg) {
+ Object res = client().compute().execute(Set.of(node(0)), EchoJob.class, arg).join();
+
+ if (arg instanceof byte[]) {
+ assertArrayEquals((byte[]) arg, (byte[]) res);
+ } else {
+ assertEquals(arg, res);
+ }
+ }
+
+ private ClusterNode node(int idx) {
+ return sortedNodes().get(idx);
+ }
+
+ private List<ClusterNode> sortedNodes() {
+ return client().clusterNodes().stream()
+ .sorted(Comparator.comparing(ClusterNode::name))
+ .collect(Collectors.toList());
+ }
+
+ private static class NodeNameJob implements ComputeJob<String> {
+ @Override
+ public String execute(JobExecutionContext context, Object... args) {
+ return context.ignite().name() + Arrays.stream(args).map(Object::toString).collect(Collectors.joining("_"));
+ }
+ }
+
+ private static class ConcatJob implements ComputeJob<String> {
+ @Override
+ public String execute(JobExecutionContext context, Object... args) {
+ return Arrays.stream(args).map(Object::toString).collect(Collectors.joining("_"));
+ }
+ }
+
+ private static class ErrorJob implements ComputeJob<String> {
+ @Override
+ public String execute(JobExecutionContext context, Object... args) {
+ throw new TransactionException("Custom job error");
+ }
+ }
+
+ private static class EchoJob implements ComputeJob<Object> {
+ @Override
+ public Object execute(JobExecutionContext context, Object... args) {
+ return args[0];
+ }
+ }
+}
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 1ae5e0d..efe723c 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -22,6 +22,7 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.ListIterator;
import java.util.Set;
@@ -159,8 +160,8 @@ public class IgniteImpl implements Ignite {
/** Distributed configuration storage. */
private final ConfigurationStorage cfgStorage;
- @Nullable
- private transient IgniteCompute compute;
+ /** Compute. */
+ private final IgniteCompute compute;
/** JVM pause detector. */
private final LongJvmPauseDetector longJvmPauseDetector;
@@ -271,11 +272,15 @@ public class IgniteImpl implements Ignite {
distributedTblMgr
);
+ compute = new IgniteComputeImpl(clusterSvc.topologyService(), computeComponent);
+
clientHandlerModule = new ClientHandlerModule(
qryEngine,
distributedTblMgr,
new IgniteTransactionsImpl(txManager),
nodeCfgMgr.configurationRegistry(),
+ compute,
+ clusterSvc,
nettyBootstrapFactory
);
@@ -467,12 +472,21 @@ public class IgniteImpl implements Ignite {
/** {@inheritDoc} */
@Override
public IgniteCompute compute() {
- if (compute == null) {
- compute = new IgniteComputeImpl(clusterSvc.topologyService(), computeComponent);
- }
return compute;
}
+ /** {@inheritDoc} */
+ @Override
+ public Collection<ClusterNode> clusterNodes() {
+ return clusterSvc.topologyService().allMembers();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public CompletableFuture<Collection<ClusterNode>> clusterNodesAsync() {
+ return CompletableFuture.completedFuture(clusterNodes());
+ }
+
/**
* Returns node configuration.
*/