You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by al...@apache.org on 2020/04/27 08:15:11 UTC
[ignite] branch master updated: IGNITE-12835 Thin client: compute
support - Fixes #7572.
This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 9dcdd54 IGNITE-12835 Thin client: compute support - Fixes #7572.
9dcdd54 is described below
commit 9dcdd54c7c9804bd570f02fc8025e194ca2b9fd9
Author: Aleksey Plekhanov <pl...@gmail.com>
AuthorDate: Mon Apr 27 11:12:09 2020 +0300
IGNITE-12835 Thin client: compute support - Fixes #7572.
Signed-off-by: Aleksey Plekhanov <pl...@gmail.com>
---
.../org/apache/ignite/client/ClientCluster.java | 25 +
.../ClientClusterGroup.java} | 44 +-
.../org/apache/ignite/client/ClientCompute.java | 84 +++
.../org/apache/ignite/client/ClientException.java | 2 +-
...lientFeatureNotSupportedByServerException.java} | 38 +-
.../org/apache/ignite/client/IgniteClient.java | 24 +
.../configuration/ThinClientConfiguration.java | 28 +
.../ignite/internal/client/thin/ClientChannel.java | 22 +-
.../client/thin/ClientClusterGroupImpl.java | 67 ++
.../thin/ClientClusterImpl.java} | 37 +-
.../internal/client/thin/ClientComputeImpl.java | 423 +++++++++++++
.../internal/client/thin/ClientFutureImpl.java | 117 ++++
.../internal/client/thin/ClientOperation.java | 42 +-
.../thin/NotificationListener.java} | 41 +-
.../client/thin/ProtocolBitmaskFeature.java | 5 +-
.../internal/client/thin/ReliableChannel.java | 80 ++-
.../internal/client/thin/TcpClientChannel.java | 209 +++---
.../internal/client/thin/TcpIgniteClient.java | 28 +
.../processors/odbc/ClientListenerNioListener.java | 9 +-
.../processors/odbc/ClientListenerResponse.java | 9 +-
.../platform/client/ClientBitmaskFeature.java | 5 +-
.../platform/client/ClientConnectionContext.java | 50 ++
.../processors/platform/client/ClientFlag.java | 3 +
.../platform/client/ClientMessageParser.java | 15 +-
.../platform/client/ClientNotification.java | 105 ++++
...ientFlag.java => ClientObjectNotification.java} | 42 +-
...{ClientFlag.java => ClientOutgoingMessage.java} | 37 +-
.../processors/platform/client/ClientResponse.java | 4 +-
.../processors/platform/client/ClientStatus.java | 6 +
.../platform/client/compute/ClientComputeTask.java | 166 +++++
.../client/compute/ClientExecuteTaskRequest.java | 90 +++
.../client/compute/ClientExecuteTaskResponse.java | 54 ++
.../platform/utils/PlatformConfigurationUtils.java | 5 +-
.../internal/client/thin/ComputeTaskTest.java | 699 +++++++++++++++++++++
.../ThinClientAbstractPartitionAwarenessTest.java | 2 +-
...lientPartitionAwarenessResourceReleaseTest.java | 5 +-
.../org/apache/ignite/client/ClientTestSuite.java | 2 +
.../Config/full-config.xml | 2 +-
.../IgniteConfigurationSerializerTest.cs | 7 +-
.../Configuration/ClientConnectorConfiguration.cs | 4 +-
.../Configuration/ThinClientConfiguration.cs | 13 +
.../IgniteConfigurationSection.xsd | 5 +
42 files changed, 2389 insertions(+), 266 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/client/ClientCluster.java b/modules/core/src/main/java/org/apache/ignite/client/ClientCluster.java
new file mode 100644
index 0000000..7ff0e51
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/client/ClientCluster.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client;
+
+/**
+ * Thin client cluster facade. Represents whole cluster (all available nodes).
+ */
+public interface ClientCluster extends ClientClusterGroup {
+ // No-op.
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientFlag.java b/modules/core/src/main/java/org/apache/ignite/client/ClientClusterGroup.java
similarity index 50%
copy from modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientFlag.java
copy to modules/core/src/main/java/org/apache/ignite/client/ClientClusterGroup.java
index 0471339..08b451e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientFlag.java
+++ b/modules/core/src/main/java/org/apache/ignite/client/ClientClusterGroup.java
@@ -15,39 +15,29 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.platform.client;
+package org.apache.ignite.client;
+
+import java.util.Collection;
+import java.util.UUID;
/**
- * Client response flag.
+ * Thin client cluster group facade. Defines a cluster group which contains all or a subset of cluster nodes.
*/
-public class ClientFlag {
+public interface ClientClusterGroup {
/**
- * No-op constructor to prevent instantiation.
+ * Creates a cluster group over nodes with specified node IDs.
+ *
+ * @param ids Collection of node IDs.
+ * @return Cluster group over nodes with the specified node IDs.
*/
- private ClientFlag() {
- // No-op.
- }
+ public ClientClusterGroup forNodeIds(Collection<UUID> ids);
/**
- * @return Flags for response message.
- * @param error Error flag.
- * @param topologyChanged Affinity topology changed flag.
+ * Creates a cluster group for a node with the specified ID.
+ *
+ * @param id Node ID to get the cluster group for.
+ * @param ids Optional additional node IDs to include into the cluster group.
+ * @return Cluster group over the node with the specified node IDs.
*/
- public static short makeFlags(boolean error, boolean topologyChanged) {
- short flags = 0;
-
- if (error)
- flags |= ClientFlag.ERROR;
-
- if (topologyChanged)
- flags |= ClientFlag.AFFINITY_TOPOLOGY_CHANGED;
-
- return flags;
- }
-
- /** Error flag. */
- public static final short ERROR = 1;
-
- /** Affinity topology change flag. */
- public static final short AFFINITY_TOPOLOGY_CHANGED = 1 << 1;
+ public ClientClusterGroup forNodeId(UUID id, UUID... ids);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/client/ClientCompute.java b/modules/core/src/main/java/org/apache/ignite/client/ClientCompute.java
new file mode 100644
index 0000000..5d16a6f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/client/ClientCompute.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client;
+
+import java.util.concurrent.Future;
+import org.apache.ignite.compute.ComputeTask;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Thin client compute facade. Defines compute grid functionality for executing tasks over nodes
+ * in the {@link ClientClusterGroup}
+ */
+public interface ClientCompute {
+ /**
+ * Gets cluster group to which this {@code ClientCompute} instance belongs.
+ *
+ * @return Cluster group to which this {@code ClientCompute} instance belongs.
+ */
+ public ClientClusterGroup clusterGroup();
+
+ /**
+ * Executes given task within the cluster group. For step-by-step explanation of task execution process
+ * refer to {@link ComputeTask} documentation.
+ *
+ * @param taskName Name of the task to execute.
+ * @param arg Optional argument of task execution, can be {@code null}.
+ * @return Task result.
+ * @throws ClientException If task failed.
+ * @throws InterruptedException If the wait for task completion was interrupted.
+ * @see ComputeTask for information about task execution.
+ */
+ public <T, R> R execute(String taskName, @Nullable T arg) throws ClientException, InterruptedException;
+
+ /**
+ * Executes given task asynchronously within the cluster group. For step-by-step explanation of task execution
+ * process refer to {@link ComputeTask} documentation.
+ *
+ * @param taskName Name of the task to execute.
+ * @param arg Optional argument of task execution, can be {@code null}.
+ * @return A Future representing pending completion of the task.
+ * @throws ClientException If task failed.
+ * @see ComputeTask for information about task execution.
+ */
+ public <T, R> Future<R> executeAsync(String taskName, @Nullable T arg) throws ClientException;
+
+ /**
+ * Sets timeout for tasks executed by returned {@code ClientCompute} instance.
+ *
+ * @return {@code ClientCompute} instance with given timeout.
+ */
+ public ClientCompute withTimeout(long timeout);
+
+ /**
+ * Sets no-failover flag for tasks executed by returned {@code ClientCompute} instance.
+ * If flag is set, job will be never failed over even if remote node crashes or rejects execution.
+ * See {@link ComputeTask} documentation for more information about jobs failover.
+ *
+ * @return {@code ClientCompute} instance with no-failover flag.
+ */
+ public ClientCompute withNoFailover();
+
+ /**
+ * Disables result caching for tasks executed by returned {@code ClientCompute} instance.
+ * See {@link ComputeTask} documentation for more information tasks result caching.
+ *
+ * @return {@code ClientCompute} instance with "no result cache" flag.
+ */
+ public ClientCompute withNoResultCache();
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/client/ClientException.java b/modules/core/src/main/java/org/apache/ignite/client/ClientException.java
index b0d9f6c..e7177ce 100644
--- a/modules/core/src/main/java/org/apache/ignite/client/ClientException.java
+++ b/modules/core/src/main/java/org/apache/ignite/client/ClientException.java
@@ -18,7 +18,7 @@
package org.apache.ignite.client;
/**
- * Common thin client checked exception.
+ * Common thin client unchecked exception.
*/
public class ClientException extends RuntimeException {
/** Serial version uid. */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientFlag.java b/modules/core/src/main/java/org/apache/ignite/client/ClientFeatureNotSupportedByServerException.java
similarity index 50%
copy from modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientFlag.java
copy to modules/core/src/main/java/org/apache/ignite/client/ClientFeatureNotSupportedByServerException.java
index 0471339..e8c2110 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientFlag.java
+++ b/modules/core/src/main/java/org/apache/ignite/client/ClientFeatureNotSupportedByServerException.java
@@ -15,39 +15,21 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.platform.client;
+package org.apache.ignite.client;
/**
- * Client response flag.
+ * Feature not supported by server exception.
*/
-public class ClientFlag {
- /**
- * No-op constructor to prevent instantiation.
- */
- private ClientFlag() {
- // No-op.
- }
+public class ClientFeatureNotSupportedByServerException extends ClientException {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
/**
- * @return Flags for response message.
- * @param error Error flag.
- * @param topologyChanged Affinity topology changed flag.
+ * Constructs a new exception with the specified detail message.
+ *
+ * @param msg the detail message.
*/
- public static short makeFlags(boolean error, boolean topologyChanged) {
- short flags = 0;
-
- if (error)
- flags |= ClientFlag.ERROR;
-
- if (topologyChanged)
- flags |= ClientFlag.AFFINITY_TOPOLOGY_CHANGED;
-
- return flags;
+ public ClientFeatureNotSupportedByServerException(String msg) {
+ super(msg);
}
-
- /** Error flag. */
- public static final short ERROR = 1;
-
- /** Affinity topology change flag. */
- public static final short AFFINITY_TOPOLOGY_CHANGED = 1 << 1;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/client/IgniteClient.java b/modules/core/src/main/java/org/apache/ignite/client/IgniteClient.java
index 5c48ead..fa8cf8f 100644
--- a/modules/core/src/main/java/org/apache/ignite/client/IgniteClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/client/IgniteClient.java
@@ -95,4 +95,28 @@ public interface IgniteClient extends AutoCloseable {
* @return Client transactions facade.
*/
public ClientTransactions transactions();
+
+ /**
+ * Gets compute facade over all cluster nodes started in server mode.
+ *
+ * @return Compute instance over all cluster nodes started in server mode.
+ */
+ public ClientCompute compute();
+
+ /**
+ * Gets compute facade over the specified cluster group. All operations
+ * on the returned {@link ClientCompute} instance will only include nodes from
+ * this cluster group.
+ *
+ * @param grp Cluster group.
+ * @return Compute instance over given cluster group.
+ */
+ public ClientCompute compute(ClientClusterGroup grp);
+
+ /**
+ * Gets client cluster facade.
+ *
+ * @return Client cluster facade.
+ */
+ public ClientCluster cluster();
}
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/ThinClientConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/ThinClientConfiguration.java
index b4c3e21..94efd64 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/ThinClientConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/ThinClientConfiguration.java
@@ -28,9 +28,15 @@ public class ThinClientConfiguration {
/** Default limit of active transactions count per connection. */
public static final int DFLT_MAX_ACTIVE_TX_PER_CONNECTION = 100;
+ /** Default limit of active compute tasks per connection. */
+ public static final int DFLT_MAX_ACTIVE_COMPUTE_TASKS_PER_CONNECTION = 0;
+
/** Active transactions count per connection limit. */
private int maxActiveTxPerConn = DFLT_MAX_ACTIVE_TX_PER_CONNECTION;
+ /** Active compute tasks per connection limit. */
+ private int maxActiveComputeTasksPerConn = DFLT_MAX_ACTIVE_COMPUTE_TASKS_PER_CONNECTION;
+
/**
* Creates thin-client configuration with all default values.
*/
@@ -47,6 +53,7 @@ public class ThinClientConfiguration {
assert cfg != null;
maxActiveTxPerConn = cfg.maxActiveTxPerConn;
+ maxActiveComputeTasksPerConn = cfg.maxActiveComputeTasksPerConn;
}
/**
@@ -67,6 +74,27 @@ public class ThinClientConfiguration {
return this;
}
+ /**
+ * Gets active compute tasks per connection limit.
+ *
+ * @return {@code True} if compute is enabled for thin client.
+ */
+ public int getMaxActiveComputeTasksPerConnection() {
+ return maxActiveComputeTasksPerConn;
+ }
+
+ /**
+ * Sets active compute tasks per connection limit.
+ * Value {@code 0} means that compute grid functionality is disabled for thin clients.
+ *
+ * @return {@code this} for chaining.
+ */
+ public ThinClientConfiguration setMaxActiveComputeTasksPerConnection(int maxActiveComputeTasksPerConn) {
+ this.maxActiveComputeTasksPerConn = maxActiveComputeTasksPerConn;
+
+ return this;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(ThinClientConfiguration.class, this);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientChannel.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientChannel.java
index c1d8b7f..be5e597 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientChannel.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientChannel.java
@@ -22,6 +22,7 @@ import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.ignite.client.ClientAuthorizationException;
import org.apache.ignite.client.ClientConnectionException;
+import org.apache.ignite.client.ClientException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
/**
@@ -35,9 +36,16 @@ interface ClientChannel extends AutoCloseable {
* @param payloadWriter Payload writer to stream or {@code null} if request has no payload.
* @param payloadReader Payload reader from stream.
* @return Received operation payload or {@code null} if response has no payload.
+ * @throws ClientException Thrown by {@code payloadWriter} or {@code payloadReader}.
+ * @throws ClientAuthorizationException When user has no permission to perform operation.
+ * @throws ClientServerError When failed to process request on server.
+ * @throws ClientConnectionException In case of IO errors.
*/
- public <T> T service(ClientOperation op, Consumer<PayloadOutputChannel> payloadWriter,
- Function<PayloadInputChannel, T> payloadReader) throws ClientConnectionException, ClientAuthorizationException;
+ public <T> T service(
+ ClientOperation op,
+ Consumer<PayloadOutputChannel> payloadWriter,
+ Function<PayloadInputChannel, T> payloadReader
+ ) throws ClientException, ClientAuthorizationException, ClientServerError, ClientConnectionException;
/**
* @return Protocol context.
@@ -58,4 +66,14 @@ interface ClientChannel extends AutoCloseable {
* Add topology change listener.
*/
public void addTopologyChangeListener(Consumer<ClientChannel> lsnr);
+
+ /**
+ * Add notifications (from server to client) listener.
+ */
+ public void addNotificationListener(NotificationListener lsnr);
+
+ /**
+ * @return {@code True} channel is closed.
+ */
+ public boolean closed();
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientClusterGroupImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientClusterGroupImpl.java
new file mode 100644
index 0000000..e4591e4
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientClusterGroupImpl.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.UUID;
+import org.apache.ignite.client.ClientClusterGroup;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * Implementation of {@link ClientClusterGroup}.
+ */
+@SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
+class ClientClusterGroupImpl implements ClientClusterGroup {
+ /** Node id's. */
+ private final Collection<UUID> nodeIds;
+
+ /**
+ * @param ids Ids.
+ */
+ ClientClusterGroupImpl(Collection<UUID> ids) {
+ nodeIds = ids;
+ }
+
+ /** {@inheritDoc} */
+ @Override public ClientClusterGroup forNodeIds(Collection<UUID> ids) {
+ return new ClientClusterGroupImpl(new HashSet<>(ids));
+ }
+
+ /** {@inheritDoc} */
+ @Override public ClientClusterGroup forNodeId(UUID id, UUID ... ids) {
+ Collection<UUID> nodeIds = U.newHashSet(1 + (ids == null ? 0 : ids.length));
+
+ nodeIds.add(id);
+
+ if (!F.isEmpty(ids))
+ nodeIds.addAll(Arrays.asList(ids));
+
+ return new ClientClusterGroupImpl(nodeIds);
+ }
+
+ /**
+ * Gets node id's.
+ */
+ public Collection<UUID> nodeIds() {
+ return nodeIds == null ? null : Collections.unmodifiableCollection(nodeIds);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientFlag.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientClusterImpl.java
similarity index 50%
copy from modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientFlag.java
copy to modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientClusterImpl.java
index 0471339..dd43a03 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientFlag.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientClusterImpl.java
@@ -15,39 +15,18 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.platform.client;
+package org.apache.ignite.internal.client.thin;
+
+import org.apache.ignite.client.ClientCluster;
/**
- * Client response flag.
+ * Implementation of {@link ClientCluster}.
*/
-public class ClientFlag {
- /**
- * No-op constructor to prevent instantiation.
- */
- private ClientFlag() {
- // No-op.
- }
-
+class ClientClusterImpl extends ClientClusterGroupImpl implements ClientCluster {
/**
- * @return Flags for response message.
- * @param error Error flag.
- * @param topologyChanged Affinity topology changed flag.
+ * Default constructor.
*/
- public static short makeFlags(boolean error, boolean topologyChanged) {
- short flags = 0;
-
- if (error)
- flags |= ClientFlag.ERROR;
-
- if (topologyChanged)
- flags |= ClientFlag.AFFINITY_TOPOLOGY_CHANGED;
-
- return flags;
+ ClientClusterImpl() {
+ super(null);
}
-
- /** Error flag. */
- public static final short ERROR = 1;
-
- /** Affinity topology change flag. */
- public static final short AFFINITY_TOPOLOGY_CHANGED = 1 << 1;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientComputeImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientComputeImpl.java
new file mode 100644
index 0000000..287aba3
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientComputeImpl.java
@@ -0,0 +1,423 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.ignite.client.ClientClusterGroup;
+import org.apache.ignite.client.ClientCompute;
+import org.apache.ignite.client.ClientException;
+import org.apache.ignite.client.ClientFeatureNotSupportedByServerException;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.binary.BinaryRawWriterEx;
+import org.apache.ignite.internal.binary.BinaryWriterExImpl;
+import org.apache.ignite.internal.binary.streams.BinaryHeapInputStream;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.client.thin.ClientOperation.COMPUTE_TASK_EXECUTE;
+import static org.apache.ignite.internal.client.thin.ClientOperation.RESOURCE_CLOSE;
+
+/**
+ * Implementation of {@link ClientCompute}.
+ */
+class ClientComputeImpl implements ClientCompute, NotificationListener {
+ /** No failover flag mask. */
+ private static final byte NO_FAILOVER_FLAG_MASK = 0x01;
+
+ /** No result cache flag mask. */
+ private static final byte NO_RESULT_CACHE_FLAG_MASK = 0x02;
+
+ /** Channel. */
+ private final ReliableChannel ch;
+
+ /** Binary marshaller. */
+ private final ClientBinaryMarshaller marsh;
+
+ /** Utils for serialization/deserialization. */
+ private final ClientUtils utils;
+
+ /** ClientCluster instance. */
+ private final ClientClusterImpl cluster;
+
+ /** Active tasks. */
+ private final Map<ClientChannel, Map<Long, ClientComputeTask<Object>>> activeTasks = new ConcurrentHashMap<>();
+
+ /** Guard lock for active tasks. */
+ private final ReadWriteLock guard = new ReentrantReadWriteLock();
+
+ /** Constructor. */
+ ClientComputeImpl(ReliableChannel ch, ClientBinaryMarshaller marsh, ClientClusterImpl cluster) {
+ this.ch = ch;
+ this.marsh = marsh;
+ this.cluster = cluster;
+
+ utils = new ClientUtils(marsh);
+
+ ch.addNotificationListener(this);
+
+ ch.addChannelCloseListener(clientCh -> {
+ guard.writeLock().lock();
+
+ try {
+ Map<Long, ClientComputeTask<Object>> chTasks = activeTasks.remove(clientCh);
+
+ if (!F.isEmpty(chTasks)) {
+ for (ClientComputeTask<?> task : chTasks.values())
+ task.fut.onDone(new ClientException("Channel to server is closed"));
+ }
+ }
+ finally {
+ guard.writeLock().unlock();
+ }
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override public ClientClusterGroup clusterGroup() {
+ return cluster;
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T, R> R execute(String taskName, @Nullable T arg) throws ClientException, InterruptedException {
+ return execute0(taskName, arg, cluster, (byte)0, 0L);
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T, R> Future<R> executeAsync(String taskName, @Nullable T arg) throws ClientException {
+ return executeAsync0(taskName, arg, cluster, (byte)0, 0L);
+ }
+
+ /** {@inheritDoc} */
+ @Override public ClientCompute withTimeout(long timeout) {
+ return timeout == 0L ? this : new ClientComputeModificator(this, cluster, (byte)0, timeout);
+ }
+
+ /** {@inheritDoc} */
+ @Override public ClientCompute withNoFailover() {
+ return new ClientComputeModificator(this, cluster, NO_FAILOVER_FLAG_MASK, 0L);
+ }
+
+ /** {@inheritDoc} */
+ @Override public ClientCompute withNoResultCache() {
+ return new ClientComputeModificator(this, cluster, NO_RESULT_CACHE_FLAG_MASK, 0L);
+ }
+
+ /**
+ * Gets compute facade over the specified cluster group.
+ *
+ * @param grp Cluster group.
+ */
+ public ClientCompute withClusterGroup(ClientClusterGroupImpl grp) {
+ return new ClientComputeModificator(this, grp, (byte)0, 0L);
+ }
+
+ /**
+ * @param taskName Task name.
+ * @param arg Argument.
+ * @param clusterGrp Cluster group.
+ * @param flags Flags.
+ * @param timeout Timeout.
+ */
+ private <T, R> R execute0(
+ String taskName,
+ @Nullable T arg,
+ ClientClusterGroupImpl clusterGrp,
+ byte flags,
+ long timeout
+ ) throws ClientException {
+ try {
+ return (R)executeAsync0(taskName, arg, clusterGrp, flags, timeout).get();
+ }
+ catch (ExecutionException | InterruptedException e) {
+ if (e.getCause() instanceof ClientException)
+ throw (ClientException)e.getCause();
+ else
+ throw new ClientException(e);
+ }
+ }
+
+ /**
+ * @param taskName Task name.
+ * @param arg Argument.
+ * @param clusterGrp Cluster group.
+ * @param flags Flags.
+ * @param timeout Timeout.
+ */
+ private <T, R> Future<R> executeAsync0(
+ String taskName,
+ @Nullable T arg,
+ ClientClusterGroupImpl clusterGrp,
+ byte flags,
+ long timeout
+ ) throws ClientException {
+ Collection<UUID> nodeIds = clusterGrp == cluster ? null : clusterGrp.nodeIds();
+
+ if (F.isEmpty(taskName))
+ throw new ClientException("Task name can't be null or empty.");
+
+ if (nodeIds != null && nodeIds.isEmpty())
+ throw new ClientException("Cluster group is empty.");
+
+ while (true) {
+ Consumer<PayloadOutputChannel> payloadWriter =
+ ch -> writeExecuteTaskRequest(ch, taskName, arg, nodeIds, flags, timeout);
+
+ Function<PayloadInputChannel, T2<ClientChannel, Long>> payloadReader =
+ ch -> new T2<>(ch.clientChannel(), ch.in().readLong());
+
+ T2<ClientChannel, Long> taskParams;
+
+ try {
+ taskParams = ch.service(COMPUTE_TASK_EXECUTE, payloadWriter, payloadReader);
+ }
+ catch (ClientServerError error) {
+ throw new ClientException(error.getMessage());
+ }
+
+ ClientComputeTask<Object> task = addTask(taskParams.get1(), taskParams.get2());
+
+ if (task == null) // Channel is closed concurrently, retry with another channel.
+ continue;
+
+ task.fut.listen(f -> removeTask(task.ch, task.taskId));
+
+ return new ClientFutureImpl<>((GridFutureAdapter<R>)task.fut);
+ }
+ }
+
+ /**
+ *
+ */
+ private <T> void writeExecuteTaskRequest(
+ PayloadOutputChannel ch,
+ String taskName,
+ @Nullable T arg,
+ Collection<UUID> nodeIds,
+ byte flags,
+ long timeout
+ ) throws ClientException {
+ if (!ch.clientChannel().protocolCtx().isFeatureSupported(ProtocolBitmaskFeature.EXECUTE_TASK_BY_NAME)) {
+ throw new ClientFeatureNotSupportedByServerException("Compute grid functionality for thin " +
+ "client not supported by server node (" + ch.clientChannel().serverNodeId() + ')');
+ }
+
+ try (BinaryRawWriterEx w = new BinaryWriterExImpl(marsh.context(), ch.out(), null, null)) {
+ if (nodeIds == null) // Include all nodes.
+ w.writeInt(0);
+ else {
+ w.writeInt(nodeIds.size());
+
+ for (UUID nodeId : nodeIds)
+ w.writeUuid(nodeId);
+ }
+
+ w.writeByte(flags);
+ w.writeLong(timeout);
+ w.writeString(taskName);
+ utils.writeObject(ch.out(), arg);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void acceptNotification(
+ ClientChannel ch,
+ ClientOperation op,
+ long rsrcId,
+ byte[] payload,
+ Exception err
+ ) {
+ if (op == ClientOperation.COMPUTE_TASK_FINISHED) {
+ Object res = payload == null ? null : utils.readObject(new BinaryHeapInputStream(payload), false);
+
+ ClientComputeTask<Object> task = removeTask(ch, rsrcId);
+
+ if (task != null) { // If channel is closed concurrently, task is already done with "channel closed" reason.
+ if (err == null)
+ task.fut.onDone(res);
+ else
+ task.fut.onDone(err);
+ }
+ }
+ }
+
+ /**
+ * @param ch Client channel.
+ * @param taskId Task id.
+ * @return Already registered task, new task if task wasn't registered before, or {@code null} if channel was
+ * closed concurrently.
+ */
+ private ClientComputeTask<Object> addTask(ClientChannel ch, long taskId) {
+ guard.readLock().lock();
+
+ try {
+ // If channel is closed we should only get task if it was registered before, but not add new one.
+ boolean closed = ch.closed();
+
+ Map<Long, ClientComputeTask<Object>> chTasks = closed ? activeTasks.get(ch) :
+ activeTasks.computeIfAbsent(ch, c -> new ConcurrentHashMap<>());
+
+ if (chTasks == null)
+ return null;
+
+ return closed ? chTasks.get(taskId) :
+ chTasks.computeIfAbsent(taskId, t -> new ClientComputeTask<>(ch, taskId));
+ }
+ finally {
+ guard.readLock().unlock();
+ }
+ }
+
+ /**
+ * @param ch Client channel.
+ * @param taskId Task id.
+ */
+ private ClientComputeTask<Object> removeTask(ClientChannel ch, long taskId) {
+ Map<Long, ClientComputeTask<Object>> chTasks = activeTasks.get(ch);
+
+ if (!F.isEmpty(chTasks))
+ return chTasks.remove(taskId);
+
+ return null;
+ }
+
+ /**
+ * Gets tasks future for active tasks started by client.
+ *
+ * @return Map of active tasks keyed by their unique per client task ID.
+ */
+ Map<IgniteUuid, IgniteInternalFuture<?>> activeTaskFutures() {
+ Map<IgniteUuid, IgniteInternalFuture<?>> res = new HashMap<>();
+
+ for (Map.Entry<ClientChannel, Map<Long, ClientComputeTask<Object>>> chTasks : activeTasks.entrySet()) {
+ for (Map.Entry<Long, ClientComputeTask<Object>> task : chTasks.getValue().entrySet())
+ res.put(new IgniteUuid(chTasks.getKey().serverNodeId(), task.getKey()), task.getValue().fut);
+ }
+
+ return res;
+ }
+
+ /**
+ * ClientCompute with modificators.
+ */
+ private static class ClientComputeModificator implements ClientCompute {
+ /** Delegate. */
+ private final ClientComputeImpl delegate;
+
+ /** Cluster group. */
+ private final ClientClusterGroupImpl clusterGrp;
+
+ /** Task flags. */
+ private final byte flags;
+
+ /** Task timeout. */
+ private final long timeout;
+
+ /**
+ * Constructor.
+ */
+ private ClientComputeModificator(ClientComputeImpl delegate, ClientClusterGroupImpl clusterGrp, byte flags,
+ long timeout) {
+ this.delegate = delegate;
+ this.clusterGrp = clusterGrp;
+ this.flags = flags;
+ this.timeout = timeout;
+ }
+
+ /** {@inheritDoc} */
+ @Override public ClientClusterGroup clusterGroup() {
+ return clusterGrp;
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T, R> R execute(String taskName, @Nullable T arg) throws ClientException, InterruptedException {
+ return delegate.execute0(taskName, arg, clusterGrp, flags, timeout);
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T, R> Future<R> executeAsync(String taskName, @Nullable T arg) throws ClientException {
+ return delegate.executeAsync0(taskName, arg, clusterGrp, flags, timeout);
+ }
+
+ /** {@inheritDoc} */
+ @Override public ClientCompute withTimeout(long timeout) {
+ return timeout == this.timeout ? this : new ClientComputeModificator(delegate, clusterGrp, flags, timeout);
+ }
+
+ /** {@inheritDoc} */
+ @Override public ClientCompute withNoFailover() {
+ return (flags & NO_FAILOVER_FLAG_MASK) != 0 ? this :
+ new ClientComputeModificator(delegate, clusterGrp, (byte) (flags | NO_FAILOVER_FLAG_MASK), timeout);
+ }
+
+ /** {@inheritDoc} */
+ @Override public ClientCompute withNoResultCache() {
+ return (flags & NO_RESULT_CACHE_FLAG_MASK) != 0 ? this :
+ new ClientComputeModificator(delegate, clusterGrp, (byte) (flags | NO_RESULT_CACHE_FLAG_MASK), timeout);
+ }
+ }
+
+ /**
+ * Compute task internal class.
+ *
+ * @param <R> Result type.
+ */
+ private static class ClientComputeTask<R> {
+ /** Client channel. */
+ private final ClientChannel ch;
+
+ /** Task id. */
+ private final long taskId;
+
+ /** Future. */
+ private final GridFutureAdapter<R> fut;
+
+ /**
+ * @param ch Client channel.
+ * @param taskId Task id.
+ */
+ private ClientComputeTask(ClientChannel ch, long taskId) {
+ this.ch = ch;
+ this.taskId = taskId;
+
+ fut = new GridFutureAdapter<R>() {
+ @Override public boolean cancel() {
+ if (onCancelled()) {
+ ch.service(RESOURCE_CLOSE, req -> req.out().writeLong(taskId), null);
+
+ return true;
+ }
+ else
+ return false;
+ }
+ };
+ }
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientFutureImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientFutureImpl.java
new file mode 100644
index 0000000..c9b3070
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientFutureImpl.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.client.ClientException;
+import org.apache.ignite.internal.IgniteFutureCancelledCheckedException;
+import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+
+/**
+ * Implementation of {@link Future} for thin-client operations.
+ */
+class ClientFutureImpl<R> implements Future<R> {
+ /** Delegate. */
+ private final GridFutureAdapter<R> delegate;
+
+ /**
+ * Default constructor.
+ *
+ * @param delegate Delegate internal future.
+ */
+ ClientFutureImpl(GridFutureAdapter<R> delegate) {
+ this.delegate = delegate;
+ }
+
+ /** {@inheritDoc} */
+ @Override public R get() throws ExecutionException, CancellationException, InterruptedException {
+ try {
+ return delegate.get();
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ throw new InterruptedException(e.getMessage());
+ }
+ catch (IgniteFutureCancelledCheckedException e) {
+ throw new CancellationException(e.getMessage());
+ }
+ catch (Exception e) {
+ throw new ExecutionException(unwrapException(e));
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public R get(long timeout, TimeUnit unit)
+ throws ExecutionException, TimeoutException, CancellationException, InterruptedException {
+ try {
+ return delegate.get(timeout, unit);
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ throw new InterruptedException(e.getMessage());
+ }
+ catch (IgniteFutureTimeoutCheckedException e) {
+ throw new TimeoutException(e.getMessage());
+ }
+ catch (IgniteFutureCancelledCheckedException e) {
+ throw new CancellationException(e.getMessage());
+ }
+ catch (Exception e) {
+ throw new ExecutionException(unwrapException(e));
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isDone() {
+ return delegate.isDone();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean cancel(boolean mayInterruptIfRunning) throws ClientException {
+ try {
+ return mayInterruptIfRunning ? delegate.cancel() : delegate.onCancelled();
+ }
+ catch (IgniteCheckedException e) {
+ throw unwrapException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isCancelled() {
+ return delegate.isCancelled();
+ }
+
+ /**
+ * Unwraps checked exception to client exception.
+ *
+ * @param e Exception.
+ */
+ private static ClientException unwrapException(Exception e) {
+ Throwable e0 = e instanceof IgniteCheckedException && e.getCause() != null ? e.getCause() : e;
+
+ if (e0 instanceof ClientException)
+ return (ClientException)e0;
+
+ return new ClientException(e0);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java
index 9091849..8ad6b64 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java
@@ -17,6 +17,10 @@
package org.apache.ignite.internal.client.thin;
+import java.util.HashMap;
+import java.util.Map;
+import org.jetbrains.annotations.Nullable;
+
/** Operation codes. */
enum ClientOperation {
/** Resource close. */RESOURCE_CLOSE(0),
@@ -56,14 +60,25 @@ enum ClientOperation {
/** Put binary type. */PUT_BINARY_TYPE(3003),
/** Get binary type name. */GET_BINARY_TYPE_NAME(3000),
/** Start new transaction. */TX_START(4000),
- /** End the transaction (commit or rollback). */TX_END(4001);
+ /** End the transaction (commit or rollback). */TX_END(4001),
+ /** Execute compute task. */COMPUTE_TASK_EXECUTE(6000),
+ /** Finished compute task notification. */COMPUTE_TASK_FINISHED(6001, true);
/** Code. */
private final int code;
+ /** Is notification. */
+ private final boolean notification;
+
/** Constructor. */
ClientOperation(int code) {
+ this(code, false);
+ }
+
+ /** Constructor. */
+ ClientOperation(int code, boolean notification) {
this.code = code;
+ this.notification = notification;
}
/**
@@ -72,4 +87,29 @@ enum ClientOperation {
public short code() {
return (short)code;
}
+
+ /**
+ * @return {@code True} if operation is notification.
+ */
+ public boolean isNotification() {
+ return notification;
+ }
+
+ /** Enum mapping from code to values. */
+ private static final Map<Short, ClientOperation> map;
+
+ static {
+ map = new HashMap<>();
+
+ for (ClientOperation op : values())
+ map.put(op.code(), op);
+ }
+
+ /**
+ * @param code Code to convert to enum.
+ * @return Enum.
+ */
+ @Nullable public static ClientOperation fromCode(short code) {
+ return map.get(code);
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientFlag.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/NotificationListener.java
similarity index 50%
copy from modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientFlag.java
copy to modules/core/src/main/java/org/apache/ignite/internal/client/thin/NotificationListener.java
index 0471339..ae1b7fa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientFlag.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/NotificationListener.java
@@ -15,39 +15,20 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.platform.client;
+package org.apache.ignite.internal.client.thin;
/**
- * Client response flag.
+ * Server to client notification listener.
*/
-public class ClientFlag {
+interface NotificationListener {
/**
- * No-op constructor to prevent instantiation.
+ * Accept notification.
+ *
+ * @param ch Client channel which was notified.
+ * @param op Client operation.
+ * @param rsrcId Resource id.
+ * @param payload Notification payload or {@code null} if there is no payload.
+ * @param err Error.
*/
- private ClientFlag() {
- // No-op.
- }
-
- /**
- * @return Flags for response message.
- * @param error Error flag.
- * @param topologyChanged Affinity topology changed flag.
- */
- public static short makeFlags(boolean error, boolean topologyChanged) {
- short flags = 0;
-
- if (error)
- flags |= ClientFlag.ERROR;
-
- if (topologyChanged)
- flags |= ClientFlag.AFFINITY_TOPOLOGY_CHANGED;
-
- return flags;
- }
-
- /** Error flag. */
- public static final short ERROR = 1;
-
- /** Affinity topology change flag. */
- public static final short AFFINITY_TOPOLOGY_CHANGED = 1 << 1;
+ public void acceptNotification(ClientChannel ch, ClientOperation op, long rsrcId, byte[] payload, Exception err);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolBitmaskFeature.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolBitmaskFeature.java
index a1b5f84..e57a9ce 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolBitmaskFeature.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolBitmaskFeature.java
@@ -26,7 +26,10 @@ import java.util.EnumSet;
*/
public enum ProtocolBitmaskFeature {
/** Feature for user attributes. */
- USER_ATTRIBUTES(0);
+ USER_ATTRIBUTES(0),
+
+ /** Compute tasks (execute by task name). */
+ EXECUTE_TASK_BY_NAME(1);
/** */
private static final EnumSet<ProtocolBitmaskFeature> ALL_FEATURES_AS_ENUM_SET =
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
index 9ce6ad985..89cc783 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
@@ -25,6 +25,7 @@ import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
@@ -36,6 +37,8 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.ignite.IgniteBinary;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.client.ClientAuthenticationException;
+import org.apache.ignite.client.ClientAuthorizationException;
import org.apache.ignite.client.ClientConnectionException;
import org.apache.ignite.client.ClientException;
import org.apache.ignite.configuration.ClientConfiguration;
@@ -48,12 +51,12 @@ import org.jetbrains.annotations.NotNull;
/**
* Communication channel with failover and partition awareness.
*/
-final class ReliableChannel implements AutoCloseable {
+final class ReliableChannel implements AutoCloseable, NotificationListener {
/** Timeout to wait for executor service to shutdown (in milliseconds). */
private static final long EXECUTOR_SHUTDOWN_TIMEOUT = 10_000L;
/** Async runner thread name. */
- static final String ASYNC_RUNNER_THREAD_NAME = "thin-client-channel-async-runner";
+ static final String ASYNC_RUNNER_THREAD_NAME = "thin-client-channel-async-init";
/** Channel factory. */
private final Function<ClientChannelConfiguration, ClientChannel> chFactory;
@@ -73,6 +76,12 @@ final class ReliableChannel implements AutoCloseable {
/** Node channels. */
private final Map<UUID, ClientChannelHolder> nodeChannels = new ConcurrentHashMap<>();
+ /** Notification listeners. */
+ private final Collection<NotificationListener> notificationLsnrs = new CopyOnWriteArrayList<>();
+
+ /** Listeners of channel close events. */
+ private final Collection<Consumer<ClientChannel>> channelCloseLsnrs = new CopyOnWriteArrayList<>();
+
/** Async tasks thread pool. */
private final ExecutorService asyncRunner = Executors.newSingleThreadExecutor(
new ThreadFactory() {
@@ -163,12 +172,18 @@ final class ReliableChannel implements AutoCloseable {
/**
* Send request and handle response.
+ *
+ * @throws ClientException Thrown by {@code payloadWriter} or {@code payloadReader}.
+ * @throws ClientAuthenticationException When user name or password is invalid.
+ * @throws ClientAuthorizationException When user has no permission to perform operation.
+ * @throws ClientProtocolError When failed to handshake with server.
+ * @throws ClientServerError When failed to process request on server.
*/
public <T> T service(
ClientOperation op,
Consumer<PayloadOutputChannel> payloadWriter,
Function<PayloadInputChannel, T> payloadReader
- ) throws ClientException {
+ ) throws ClientException, ClientError {
ClientConnectionException failure = null;
for (int i = 0; i < channels.length; i++) {
@@ -196,14 +211,15 @@ final class ReliableChannel implements AutoCloseable {
* Send request without payload and handle response.
*/
public <T> T service(ClientOperation op, Function<PayloadInputChannel, T> payloadReader)
- throws ClientException {
+ throws ClientException, ClientError {
return service(op, null, payloadReader);
}
/**
* Send request and handle response without payload.
*/
- public void request(ClientOperation op, Consumer<PayloadOutputChannel> payloadWriter) throws ClientException {
+ public void request(ClientOperation op, Consumer<PayloadOutputChannel> payloadWriter)
+ throws ClientException, ClientError {
service(op, payloadWriter, null);
}
@@ -216,7 +232,7 @@ final class ReliableChannel implements AutoCloseable {
ClientOperation op,
Consumer<PayloadOutputChannel> payloadWriter,
Function<PayloadInputChannel, T> payloadReader
- ) throws ClientException {
+ ) throws ClientException, ClientError {
if (partitionAwarenessEnabled && !nodeChannels.isEmpty() && affinityInfoIsUpToDate(cacheId)) {
UUID affinityNodeId = affinityCtx.affinityNode(cacheId, key);
@@ -243,6 +259,42 @@ final class ReliableChannel implements AutoCloseable {
}
/**
+ * Add notification listener.
+ *
+ * @param lsnr Listener.
+ */
+ public void addNotificationListener(NotificationListener lsnr) {
+ notificationLsnrs.add(lsnr);
+ }
+
+ /**
+ * Add listener of channel close event.
+ *
+ * @param lsnr Listener.
+ */
+ public void addChannelCloseListener(Consumer<ClientChannel> lsnr) {
+ channelCloseLsnrs.add(lsnr);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void acceptNotification(
+ ClientChannel ch,
+ ClientOperation op,
+ long rsrcId,
+ byte[] payload,
+ Exception err
+ ) {
+ for (NotificationListener lsnr : notificationLsnrs) {
+ try {
+ lsnr.acceptNotification(ch, op, rsrcId, payload, err);
+ }
+ catch (Exception ignore) {
+ // No-op.
+ }
+ }
+ }
+
+ /**
* Checks if affinity information for the cache is up to date and tries to update it if not.
*
* @return {@code True} if affinity information is up to date, {@code false} if there is not affinity information
@@ -455,14 +507,16 @@ final class ReliableChannel implements AutoCloseable {
/**
* Get or create channel.
*/
- private synchronized ClientChannel getOrCreateChannel() {
+ private synchronized ClientChannel getOrCreateChannel()
+ throws ClientConnectionException, ClientAuthenticationException, ClientProtocolError {
return getOrCreateChannel(false);
}
/**
* Get or create channel.
*/
- private synchronized ClientChannel getOrCreateChannel(boolean ignoreThrottling) {
+ private synchronized ClientChannel getOrCreateChannel(boolean ignoreThrottling)
+ throws ClientConnectionException, ClientAuthenticationException, ClientProtocolError {
if (ch == null) {
if (!ignoreThrottling && applyReconnectionThrottling())
throw new ClientConnectionException("Reconnect is not allowed due to applied throttling");
@@ -471,6 +525,7 @@ final class ReliableChannel implements AutoCloseable {
if (ch.serverNodeId() != null) {
ch.addTopologyChangeListener(ReliableChannel.this::onTopologyChanged);
+ ch.addNotificationListener(ReliableChannel.this);
nodeChannels.values().remove(this);
@@ -485,9 +540,14 @@ final class ReliableChannel implements AutoCloseable {
* Close channel.
*/
private synchronized void closeChannel() {
- U.closeQuiet(ch);
+ if (ch != null) {
+ U.closeQuiet(ch);
+
+ for (Consumer<ClientChannel> lsnr : channelCloseLsnrs)
+ lsnr.accept(ch);
- ch = null;
+ ch = null;
+ }
}
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
index 2e00374..91e5f0f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
@@ -39,6 +39,7 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -65,7 +66,6 @@ import org.apache.ignite.client.SslMode;
import org.apache.ignite.client.SslProtocol;
import org.apache.ignite.configuration.ClientConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.binary.BinaryCachingMetadataHandler;
import org.apache.ignite.internal.binary.BinaryContext;
import org.apache.ignite.internal.binary.BinaryPrimitives;
@@ -82,6 +82,7 @@ import org.apache.ignite.internal.processors.platform.client.ClientFlag;
import org.apache.ignite.internal.processors.platform.client.ClientStatus;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.client.thin.ProtocolBitmaskFeature.USER_ATTRIBUTES;
@@ -105,6 +106,9 @@ class TcpClientChannel implements ClientChannel {
/** Protocol version used by default on first connection attempt. */
private static final ProtocolVersion DEFAULT_VERSION = LATEST_VER;
+ /** Receiver thread prefix. */
+ static final String RECEIVER_THREAD_PREFIX = "thin-client-channel#";
+
/** Supported protocol versions. */
private static final Collection<ProtocolVersion> supportedVers = Arrays.asList(
V1_7_0,
@@ -117,9 +121,6 @@ class TcpClientChannel implements ClientChannel {
V1_0_0
);
- /** Timeout before next attempt to lock channel and process next response by current thread. */
- private static final long PAYLOAD_WAIT_TIMEOUT = 10L;
-
/** Protocol context. */
private ProtocolContext protocolCtx;
@@ -144,17 +145,24 @@ class TcpClientChannel implements ClientChannel {
/** Send lock. */
private final Lock sndLock = new ReentrantLock();
- /** Receive lock. */
- private final Lock rcvLock = new ReentrantLock();
-
/** Pending requests. */
private final Map<Long, ClientRequestFuture> pendingReqs = new ConcurrentHashMap<>();
/** Topology change listeners. */
private final Collection<Consumer<ClientChannel>> topChangeLsnrs = new CopyOnWriteArrayList<>();
+ /** Notification listeners. */
+ private final Collection<NotificationListener> notificationLsnrs = new CopyOnWriteArrayList<>();
+
+ /** Closed flag. */
+ private final AtomicBoolean closed = new AtomicBoolean();
+
+ /** Receiver thread (processes incoming messages). */
+ private Thread receiverThread;
+
/** Constructor. */
- TcpClientChannel(ClientChannelConfiguration cfg) throws ClientConnectionException, ClientAuthenticationException {
+ TcpClientChannel(ClientChannelConfiguration cfg)
+ throws ClientConnectionException, ClientAuthenticationException, ClientProtocolError {
validateConfiguration(cfg);
try {
@@ -171,18 +179,41 @@ class TcpClientChannel implements ClientChannel {
}
/** {@inheritDoc} */
- @Override public void close() throws Exception {
- dataInput.close();
- out.close();
- sock.close();
+ @Override public void close() {
+ close(null);
+ }
+
+ /**
+ * Close the channel with cause.
+ */
+ private void close(Throwable cause) {
+ if (closed.compareAndSet(false, true)) {
+ U.closeQuiet(dataInput);
+ U.closeQuiet(out);
+ U.closeQuiet(sock);
+
+ sndLock.lock(); // Lock here to prevent creation of new pending requests.
+
+ try {
+ for (ClientRequestFuture pendingReq : pendingReqs.values())
+ pendingReq.onDone(new ClientConnectionException("Channel is closed", cause));
+
+ if (receiverThread != null)
+ receiverThread.interrupt();
+ }
+ finally {
+ sndLock.unlock();
+ }
- for (ClientRequestFuture pendingReq : pendingReqs.values())
- pendingReq.onDone(new ClientConnectionException("Channel is closed"));
+ }
}
/** {@inheritDoc} */
- @Override public <T> T service(ClientOperation op, Consumer<PayloadOutputChannel> payloadWriter,
- Function<PayloadInputChannel, T> payloadReader) throws ClientConnectionException, ClientAuthorizationException {
+ @Override public <T> T service(
+ ClientOperation op,
+ Consumer<PayloadOutputChannel> payloadWriter,
+ Function<PayloadInputChannel, T> payloadReader
+ ) throws ClientConnectionException, ClientAuthorizationException, ClientServerError, ClientException {
long id = send(op, payloadWriter);
return receive(id, payloadReader);
@@ -194,13 +225,18 @@ class TcpClientChannel implements ClientChannel {
* @return Request ID.
*/
private long send(ClientOperation op, Consumer<PayloadOutputChannel> payloadWriter)
- throws ClientConnectionException {
+ throws ClientException, ClientConnectionException {
long id = reqId.getAndIncrement();
// Only one thread at a time can have access to write to the channel.
sndLock.lock();
try (PayloadOutputChannel payloadCh = new PayloadOutputChannel(this)) {
+ if (closed())
+ throw new ClientConnectionException("Channel is closed");
+
+ initReceiverThread(); // Start the receiver thread with the first request.
+
pendingReqs.put(id, new ClientRequestFuture());
BinaryOutputStream req = payloadCh.out();
@@ -234,39 +270,18 @@ class TcpClientChannel implements ClientChannel {
* @return Received operation payload or {@code null} if response has no payload.
*/
private <T> T receive(long reqId, Function<PayloadInputChannel, T> payloadReader)
- throws ClientConnectionException, ClientAuthorizationException {
+ throws ClientServerError, ClientException, ClientConnectionException, ClientAuthorizationException {
ClientRequestFuture pendingReq = pendingReqs.get(reqId);
assert pendingReq != null : "Pending request future not found for request " + reqId;
- // Each thread creates a future on request sent and returns a response when this future is completed.
- // Only one thread at a time can have access to read from the channel. This thread reads the next available
- // response and complete corresponding future. All other concurrent threads wait for their own futures with
- // a timeout and periodically try to lock the channel to process the next response.
try {
- while (true) {
- if (rcvLock.tryLock()) {
- try {
- if (!pendingReq.isDone())
- processNextResponse();
- }
- finally {
- rcvLock.unlock();
- }
- }
-
- try {
- byte[] payload = pendingReq.get(PAYLOAD_WAIT_TIMEOUT);
+ byte[] payload = pendingReq.get();
- if (payload == null || payloadReader == null)
- return null;
+ if (payload == null || payloadReader == null)
+ return null;
- return payloadReader.apply(new PayloadInputChannel(this, payload));
- }
- catch (IgniteFutureTimeoutCheckedException ignore) {
- // Next cycle if timed out.
- }
- }
+ return payloadReader.apply(new PayloadInputChannel(this, payload));
}
catch (IgniteCheckedException e) {
if (e.getCause() instanceof ClientError)
@@ -283,25 +298,49 @@ class TcpClientChannel implements ClientChannel {
}
/**
- * Process next response from the input stream and complete corresponding future.
+ * Init and start receiver thread if it wasn't started before.
+ *
+ * Note: Method should be called only under external synchronization.
*/
- private void processNextResponse() throws ClientProtocolError, ClientConnectionException {
- int resSize = dataInput.readInt();
+ private void initReceiverThread() {
+ if (receiverThread == null) {
+ Socket sock = this.sock;
- if (resSize <= 0)
- throw new ClientProtocolError(String.format("Invalid response size: %s", resSize));
+ String sockInfo = sock == null ? null : sock.getInetAddress().getHostName() + ":" + sock.getPort();
+
+ receiverThread = new Thread(() -> {
+ try {
+ while (!closed())
+ processNextMessage();
+ }
+ catch (Throwable e) {
+ close(e);
+ }
+ }, RECEIVER_THREAD_PREFIX + sockInfo);
- long bytesReadOnStartReq = dataInput.totalBytesRead();
+ receiverThread.setDaemon(true);
- long resId = dataInput.readLong();
+ receiverThread.start();
+ }
+ }
- ClientRequestFuture pendingReq = pendingReqs.get(resId);
+ /**
+ * Process next message from the input stream and complete corresponding future.
+ */
+ private void processNextMessage() throws ClientProtocolError, ClientConnectionException {
+ int msgSize = dataInput.readInt();
- if (pendingReq == null)
- throw new ClientProtocolError(String.format("Unexpected response ID [%s]", resId));
+ if (msgSize <= 0)
+ throw new ClientProtocolError(String.format("Invalid message size: %s", msgSize));
+
+ long bytesReadOnStartMsg = dataInput.totalBytesRead();
+
+ long resId = dataInput.readLong();
int status = 0;
+ ClientOperation notificationOp = null;
+
BinaryInputStream resIn;
if (protocolCtx.isFeatureSupported(PARTITION_AWARENESS)) {
@@ -317,31 +356,51 @@ class TcpClientChannel implements ClientChannel {
lsnr.accept(this);
}
+ if ((flags & ClientFlag.NOTIFICATION) != 0) {
+ short notificationCode = dataInput.readShort();
+
+ notificationOp = ClientOperation.fromCode(notificationCode);
+
+ if (notificationOp == null || !notificationOp.isNotification())
+ throw new ClientProtocolError(String.format("Unexpected notification code [%d]", notificationCode));
+ }
+
if ((flags & ClientFlag.ERROR) != 0)
status = dataInput.readInt();
}
else
status = dataInput.readInt();
- int hdrSize = (int)(dataInput.totalBytesRead() - bytesReadOnStartReq);
+ int hdrSize = (int)(dataInput.totalBytesRead() - bytesReadOnStartMsg);
+
+ byte[] res = null;
+ Exception err = null;
if (status == 0) {
- if (resSize <= hdrSize)
- pendingReq.onDone();
- else
- pendingReq.onDone(dataInput.read(resSize - hdrSize));
+ if (msgSize > hdrSize)
+ res = dataInput.read(msgSize - hdrSize);
}
+ else if (status == ClientStatus.SECURITY_VIOLATION)
+ err = new ClientAuthorizationException();
else {
- resIn = new BinaryHeapInputStream(dataInput.read(resSize - hdrSize));
+ resIn = new BinaryHeapInputStream(dataInput.read(msgSize - hdrSize));
- String err = new BinaryReaderExImpl(null, resIn, null, true).readString();
+ String errMsg = new BinaryReaderExImpl(null, resIn, null, true).readString();
- switch (status) {
- case ClientStatus.SECURITY_VIOLATION:
- pendingReq.onDone(new ClientAuthorizationException());
- default:
- pendingReq.onDone(new ClientServerError(err, status, resId));
- }
+ err = new ClientServerError(errMsg, status, resId);
+ }
+
+ if (notificationOp == null) { // Respone received.
+ ClientRequestFuture pendingReq = pendingReqs.get(resId);
+
+ if (pendingReq == null)
+ throw new ClientProtocolError(String.format("Unexpected response ID [%s]", resId));
+
+ pendingReq.onDone(res, err);
+ }
+ else { // Notification received.
+ for (NotificationListener lsnr : notificationLsnrs)
+ lsnr.acceptNotification(this, notificationOp, resId, res, err);
}
}
@@ -365,6 +424,16 @@ class TcpClientChannel implements ClientChannel {
topChangeLsnrs.add(lsnr);
}
+ /** {@inheritDoc} */
+ @Override public void addNotificationListener(NotificationListener lsnr) {
+ notificationLsnrs.add(lsnr);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean closed() {
+ return closed.get();
+ }
+
/** Validate {@link ClientConfiguration}. */
private static void validateConfiguration(ClientChannelConfiguration cfg) {
String error = null;
@@ -402,7 +471,7 @@ class TcpClientChannel implements ClientChannel {
/** Client handshake. */
private void handshake(ProtocolVersion ver, String user, String pwd, Map<String, String> userAttrs)
- throws ClientConnectionException, ClientAuthenticationException {
+ throws ClientConnectionException, ClientAuthenticationException, ClientProtocolError {
handshakeReq(ver, user, pwd, userAttrs);
handshakeRes(ver, user, pwd, userAttrs);
}
@@ -459,7 +528,7 @@ class TcpClientChannel implements ClientChannel {
/** Receive and handle handshake response. */
private void handshakeRes(ProtocolVersion proposedVer, String user, String pwd, Map<String, String> userAttrs)
- throws ClientConnectionException, ClientAuthenticationException {
+ throws ClientConnectionException, ClientAuthenticationException, ClientProtocolError {
int resSize = dataInput.readInt();
if (resSize <= 0)
@@ -545,7 +614,7 @@ class TcpClientChannel implements ClientChannel {
* Auxiliary class to read byte buffers and numeric values, counting total bytes read.
* Numeric values are read in the little-endian byte order.
*/
- private class ByteCountingDataInput {
+ private class ByteCountingDataInput implements AutoCloseable {
/** Input stream. */
private final InputStream in;
@@ -635,7 +704,7 @@ class TcpClientChannel implements ClientChannel {
/**
* Close input stream.
*/
- public void close() throws IOException {
+ @Override public void close() throws IOException {
in.close();
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
index a131918..ee9003f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
@@ -34,6 +34,9 @@ import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.client.ClientCache;
import org.apache.ignite.client.ClientCacheConfiguration;
+import org.apache.ignite.client.ClientCluster;
+import org.apache.ignite.client.ClientClusterGroup;
+import org.apache.ignite.client.ClientCompute;
import org.apache.ignite.client.ClientException;
import org.apache.ignite.client.ClientTransactions;
import org.apache.ignite.client.IgniteClient;
@@ -69,6 +72,12 @@ public class TcpIgniteClient implements IgniteClient {
/** Transactions facade. */
private final TcpClientTransactions transactions;
+ /** Compute facade. */
+ private final ClientComputeImpl compute;
+
+ /** Cluster facade. */
+ private final ClientClusterImpl cluster;
+
/** Marshaller. */
private final ClientBinaryMarshaller marsh;
@@ -102,6 +111,10 @@ public class TcpIgniteClient implements IgniteClient {
transactions = new TcpClientTransactions(ch, marsh,
new ClientTransactionConfiguration(cfg.getTransactionConfiguration()));
+
+ cluster = new ClientClusterImpl();
+
+ compute = new ClientComputeImpl(ch, marsh, cluster);
}
/** {@inheritDoc} */
@@ -200,6 +213,21 @@ public class TcpIgniteClient implements IgniteClient {
return transactions;
}
+ /** {@inheritDoc} */
+ @Override public ClientCompute compute() {
+ return compute;
+ }
+
+ /** {@inheritDoc} */
+ @Override public ClientCompute compute(ClientClusterGroup grp) {
+ return compute.withClusterGroup((ClientClusterGroupImpl)grp);
+ }
+
+ /** {@inheritDoc} */
+ @Override public ClientCluster cluster() {
+ return cluster;
+ }
+
/**
* Initializes new instance of {@link IgniteClient}.
*
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java
index 5be565b..caa3f6e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java
@@ -41,6 +41,7 @@ import org.apache.ignite.internal.processors.platform.client.ClientConnectionCon
import org.apache.ignite.internal.processors.platform.client.ClientStatus;
import org.apache.ignite.internal.processors.security.OperationSecurityContext;
import org.apache.ignite.internal.util.GridSpinBusyLock;
+import org.apache.ignite.internal.util.nio.GridNioFuture;
import org.apache.ignite.internal.util.nio.GridNioServerListenerAdapter;
import org.apache.ignite.internal.util.nio.GridNioSession;
import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey;
@@ -213,7 +214,12 @@ public class ClientListenerNioListener extends GridNioServerListenerAdapter<byte
byte[] outMsg = parser.encode(resp);
- ses.send(outMsg);
+ GridNioFuture<?> fut = ses.send(outMsg);
+
+ fut.listen(f -> {
+ if (f.error() == null)
+ resp.onSent();
+ });
}
}
catch (Exception e) {
@@ -362,7 +368,6 @@ public class ClientListenerNioListener extends GridNioServerListenerAdapter<byte
/**
* Prepare context.
*
- * @param ses Session.
* @param clientType Client type.
* @return Context.
* @throws IgniteCheckedException If failed.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerResponse.java
index 342062e..4488c79 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerResponse.java
@@ -41,7 +41,7 @@ public abstract class ClientListenerResponse {
* @param status Response status.
* @param err Error, {@code null} if success is {@code true}.
*/
- public ClientListenerResponse(int status, @Nullable String err) {
+ protected ClientListenerResponse(int status, @Nullable String err) {
this.status = status;
this.err = err;
}
@@ -73,4 +73,11 @@ public abstract class ClientListenerResponse {
public void error(String err) {
this.err = err;
}
+
+ /**
+ * Callback for response sent event.
+ */
+ public void onSent() {
+ // No-op.
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientBitmaskFeature.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientBitmaskFeature.java
index 2146974..6610c0a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientBitmaskFeature.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientBitmaskFeature.java
@@ -25,7 +25,10 @@ import org.apache.ignite.internal.ThinProtocolFeature;
*/
public enum ClientBitmaskFeature implements ThinProtocolFeature {
/** Feature for user attributes. */
- USER_ATTRIBUTES(0);
+ USER_ATTRIBUTES(0),
+
+ /** Compute tasks (execute by task name). */
+ EXECUTE_TASK_BY_NAME(1);
/** */
private static final EnumSet<ClientBitmaskFeature> ALL_FEATURES_AS_ENUM_SET =
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientConnectionContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientConnectionContext.java
index 3a1dbb5..a9d38c4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientConnectionContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientConnectionContext.java
@@ -112,6 +112,9 @@ public class ClientConnectionContext extends ClientListenerAbstractConnectionCon
/** Last reported affinity topology version. */
private AtomicReference<AffinityTopologyVersion> lastAffinityTopologyVersion = new AtomicReference<>();
+ /** Client session. */
+ private GridNioSession ses;
+
/** Cursor counter. */
private final AtomicLong curCnt = new AtomicLong();
@@ -127,6 +130,12 @@ public class ClientConnectionContext extends ClientListenerAbstractConnectionCon
/** Active transactions count. */
private final AtomicInteger txsCnt = new AtomicInteger();
+ /** Active compute tasks limit. */
+ private final int maxActiveComputeTasks;
+
+ /** Active compute tasks count. */
+ private final AtomicInteger activeTasksCnt = new AtomicInteger();
+
/**
* Ctor.
*
@@ -140,6 +149,7 @@ public class ClientConnectionContext extends ClientListenerAbstractConnectionCon
this.maxCursors = maxCursors;
maxActiveTxCnt = thinCfg.getMaxActiveTxPerConnection();
+ maxActiveComputeTasks = thinCfg.getMaxActiveComputeTasksPerConnection();
}
/**
@@ -208,6 +218,8 @@ public class ClientConnectionContext extends ClientListenerAbstractConnectionCon
handler = new ClientRequestHandler(this, authCtx, currentProtocolContext);
parser = new ClientMessageParser(this, currentProtocolContext);
+
+ this.ses = ses;
}
/** {@inheritDoc} */
@@ -330,4 +342,42 @@ public class ClientConnectionContext extends ClientListenerAbstractConnectionCon
txs.clear();
}
+
+ /**
+ * Send notification to the client.
+ *
+ * @param notification Notification.
+ */
+ public void notifyClient(ClientNotification notification) {
+ ses.send(parser.encode(notification));
+ }
+
+ /**
+ * Increments the active compute tasks count.
+ */
+ public void incrementActiveTasksCount() {
+ if (maxActiveComputeTasks == 0) {
+ throw new IgniteClientException(ClientStatus.FUNCTIONALITY_DISABLED,
+ "Compute grid functionality is disabled for thin clients on server node. " +
+ "To enable it set up the ThinClientConfiguration.MaxActiveComputeTasksPerConnection property.");
+ }
+
+ if (activeTasksCnt.incrementAndGet() > maxActiveComputeTasks) {
+ activeTasksCnt.decrementAndGet();
+
+ throw new IgniteClientException(ClientStatus.TOO_MANY_COMPUTE_TASKS, "Active compute tasks per connection " +
+ "limit (" + maxActiveComputeTasks + ") exceeded. To start a new task you need to wait for some of " +
+ "currently active tasks complete. To change the limit set up the " +
+ "ThinClientConfiguration.MaxActiveComputeTasksPerConnection property.");
+ }
+ }
+
+ /**
+ * Decrements the active compute tasks count.
+ */
+ public void decrementActiveTasksCount() {
+ int cnt = activeTasksCnt.decrementAndGet();
+
+ assert cnt >= 0 : "Unexpected active tasks count: " + cnt;
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientFlag.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientFlag.java
index 0471339..e381b68 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientFlag.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientFlag.java
@@ -50,4 +50,7 @@ public class ClientFlag {
/** Affinity topology change flag. */
public static final short AFFINITY_TOPOLOGY_CHANGED = 1 << 1;
+
+ /** Server to client notification flag. */
+ public static final short NOTIFICATION = 1 << 2;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
index b9baffb..71a466b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
@@ -67,6 +67,7 @@ import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheRe
import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheScanQueryRequest;
import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheSqlFieldsQueryRequest;
import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheSqlQueryRequest;
+import org.apache.ignite.internal.processors.platform.client.compute.ClientExecuteTaskRequest;
import org.apache.ignite.internal.processors.platform.client.tx.ClientTxEndRequest;
import org.apache.ignite.internal.processors.platform.client.tx.ClientTxStartRequest;
import org.apache.ignite.internal.processors.platform.client.cluster.ClientClusterChangeStateRequest;
@@ -238,6 +239,13 @@ public class ClientMessageParser implements ClientListenerMessageParser {
/** */
private static final short OP_CLUSTER_GROUP_GET_NODE_INFO = 5101;
+ /* Compute operations. */
+ /** */
+ private static final short OP_COMPUTE_TASK_EXECUTE = 6000;
+
+ /** */
+ public static final short OP_COMPUTE_TASK_FINISHED = 6001;
+
/** Marshaller. */
private final GridBinaryMarshaller marsh;
@@ -432,6 +440,9 @@ public class ClientMessageParser implements ClientListenerMessageParser {
case OP_CLUSTER_GROUP_GET_NODE_INFO:
return new ClientClusterGroupGetNodesDetailsRequest(reader);
+
+ case OP_COMPUTE_TASK_EXECUTE:
+ return new ClientExecuteTaskRequest(reader);
}
return new ClientRawRequest(reader.readLong(), ClientStatus.INVALID_OP_CODE,
@@ -446,7 +457,9 @@ public class ClientMessageParser implements ClientListenerMessageParser {
BinaryRawWriterEx writer = marsh.writer(outStream);
- ((ClientResponse)resp).encode(ctx, writer);
+ assert resp instanceof ClientOutgoingMessage : "Unexpected response type: " + resp.getClass();
+
+ ((ClientOutgoingMessage)resp).encode(ctx, writer);
return outStream.arrayCopy();
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientNotification.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientNotification.java
new file mode 100644
index 0000000..8539fc6
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientNotification.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.client;
+
+import org.apache.ignite.internal.binary.BinaryRawWriterEx;
+import org.apache.ignite.internal.processors.odbc.ClientListenerResponse;
+
+/**
+ * Server to client notification for some resource.
+ */
+public class ClientNotification extends ClientListenerResponse implements ClientOutgoingMessage {
+ /** Resource id. */
+ private final long rsrcId;
+
+ /** Operation code. */
+ private final short opCode;
+
+ /**
+ * Constructor.
+ *
+ * @param opCode Operation code.
+ * @param rsrcId Resource id.
+ */
+ public ClientNotification(short opCode, long rsrcId) {
+ super(ClientStatus.SUCCESS, null);
+
+ this.rsrcId = rsrcId;
+ this.opCode = opCode;
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param opCode Operation code.
+ * @param rsrcId Resource id.
+ * @param err Error message.
+ */
+ public ClientNotification(short opCode, long rsrcId, String err) {
+ super(ClientStatus.FAILED, err);
+
+ this.rsrcId = rsrcId;
+ this.opCode = opCode;
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param opCode Operation code.
+ * @param rsrcId Resource id.
+ * @param status Status code.
+ * @param err Error message.
+ */
+ public ClientNotification(short opCode, long rsrcId, int status, String err) {
+ super(status, err);
+
+ this.rsrcId = rsrcId;
+ this.opCode = opCode;
+ }
+
+ /**
+ * Encodes the notification data.
+ *
+ * @param ctx Connection context.
+ * @param writer Writer.
+ */
+ @Override public void encode(ClientConnectionContext ctx, BinaryRawWriterEx writer) {
+ writer.writeLong(rsrcId);
+
+ short flags = (short) (ClientFlag.NOTIFICATION | (status() == ClientStatus.SUCCESS ? 0 : ClientFlag.ERROR));
+
+ writer.writeShort(flags);
+
+ writer.writeShort(opCode);
+
+ if (status() != ClientStatus.SUCCESS) {
+ writer.writeInt(status());
+
+ writer.writeString(error());
+ }
+ }
+
+ /**
+ * Gets the resource id.
+ *
+ * @return Resource id.
+ */
+ public long resourceId() {
+ return rsrcId;
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientFlag.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientObjectNotification.java
similarity index 55%
copy from modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientFlag.java
copy to modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientObjectNotification.java
index 0471339..128f54c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientFlag.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientObjectNotification.java
@@ -17,37 +17,31 @@
package org.apache.ignite.internal.processors.platform.client;
+import org.apache.ignite.internal.binary.BinaryRawWriterEx;
+
/**
- * Client response flag.
+ * Notification with object payload.
*/
-public class ClientFlag {
- /**
- * No-op constructor to prevent instantiation.
- */
- private ClientFlag() {
- // No-op.
- }
+public class ClientObjectNotification extends ClientNotification {
+ /** */
+ private final Object val;
/**
- * @return Flags for response message.
- * @param error Error flag.
- * @param topologyChanged Affinity topology changed flag.
+ * Constructor.
+ *
+ * @param rsrcId Resource id.
+ * @param val Object to send to client.
*/
- public static short makeFlags(boolean error, boolean topologyChanged) {
- short flags = 0;
+ public ClientObjectNotification(short opCode, long rsrcId, Object val) {
+ super(opCode, rsrcId);
- if (error)
- flags |= ClientFlag.ERROR;
-
- if (topologyChanged)
- flags |= ClientFlag.AFFINITY_TOPOLOGY_CHANGED;
-
- return flags;
+ this.val = val;
}
- /** Error flag. */
- public static final short ERROR = 1;
+ /** {@inheritDoc} */
+ @Override public void encode(ClientConnectionContext ctx, BinaryRawWriterEx writer) {
+ super.encode(ctx, writer);
- /** Affinity topology change flag. */
- public static final short AFFINITY_TOPOLOGY_CHANGED = 1 << 1;
+ writer.writeObject(val);
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientFlag.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientOutgoingMessage.java
similarity index 54%
copy from modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientFlag.java
copy to modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientOutgoingMessage.java
index 0471339..45fc3dc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientFlag.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientOutgoingMessage.java
@@ -17,37 +17,16 @@
package org.apache.ignite.internal.processors.platform.client;
+import org.apache.ignite.internal.binary.BinaryRawWriterEx;
+
/**
- * Client response flag.
+ * Thin client outgoing message (from server to client).
*/
-public class ClientFlag {
+public interface ClientOutgoingMessage {
/**
- * No-op constructor to prevent instantiation.
+ * Encodes the message data.
+ * @param ctx Connection context.
+ * @param writer Writer.
*/
- private ClientFlag() {
- // No-op.
- }
-
- /**
- * @return Flags for response message.
- * @param error Error flag.
- * @param topologyChanged Affinity topology changed flag.
- */
- public static short makeFlags(boolean error, boolean topologyChanged) {
- short flags = 0;
-
- if (error)
- flags |= ClientFlag.ERROR;
-
- if (topologyChanged)
- flags |= ClientFlag.AFFINITY_TOPOLOGY_CHANGED;
-
- return flags;
- }
-
- /** Error flag. */
- public static final short ERROR = 1;
-
- /** Affinity topology change flag. */
- public static final short AFFINITY_TOPOLOGY_CHANGED = 1 << 1;
+ public void encode(ClientConnectionContext ctx, BinaryRawWriterEx writer);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientResponse.java
index e9dd39e..49c2cab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientResponse.java
@@ -25,7 +25,7 @@ import static org.apache.ignite.internal.processors.platform.client.ClientProtoc
/**
* Thin client response.
*/
-public class ClientResponse extends ClientListenerResponse {
+public class ClientResponse extends ClientListenerResponse implements ClientOutgoingMessage {
/** Request id. */
private final long reqId;
@@ -106,7 +106,7 @@ public class ClientResponse extends ClientListenerResponse {
* @param ctx Connection context.
* @param writer Writer.
*/
- public void encode(ClientConnectionContext ctx, BinaryRawWriterEx writer) {
+ @Override public void encode(ClientConnectionContext ctx, BinaryRawWriterEx writer) {
encode(ctx, writer, ctx.checkAffinityTopologyVersion());
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientStatus.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientStatus.java
index f5c7886..fcf094a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientStatus.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientStatus.java
@@ -37,6 +37,9 @@ public final class ClientStatus {
/** Invalid op code. */
public static final int INVALID_OP_CODE = 2;
+ /** Functionality is disabled. */
+ public static final int FUNCTIONALITY_DISABLED = 100;
+
/** Cache does not exist. */
public static final int CACHE_DOES_NOT_EXIST = 1000;
@@ -58,6 +61,9 @@ public final class ClientStatus {
/** Transaction not found. */
public static final int TX_NOT_FOUND = 1021;
+ /** Too many compute tasks. */
+ public static final int TOO_MANY_COMPUTE_TASKS = 1030;
+
/** Authentication failed. */
public static final int AUTH_FAILED = 2000;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/compute/ClientComputeTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/compute/ClientComputeTask.java
new file mode 100644
index 0000000..9e47347
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/compute/ClientComputeTask.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.client.compute;
+
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.ComputeTaskInternalFuture;
+import org.apache.ignite.internal.processors.platform.client.ClientCloseableResource;
+import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
+import org.apache.ignite.internal.processors.platform.client.ClientNotification;
+import org.apache.ignite.internal.processors.platform.client.ClientObjectNotification;
+import org.apache.ignite.internal.processors.platform.client.ClientStatus;
+import org.apache.ignite.internal.processors.platform.client.IgniteClientException;
+import org.apache.ignite.internal.processors.task.GridTaskProcessor;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgnitePredicate;
+
+import static org.apache.ignite.internal.processors.platform.client.ClientMessageParser.OP_COMPUTE_TASK_FINISHED;
+import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_NO_FAILOVER;
+import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_NO_RESULT_CACHE;
+import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_SUBGRID_PREDICATE;
+import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_SUBJ_ID;
+import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_TIMEOUT;
+
+/**
+ * Client compute task.
+ */
+class ClientComputeTask implements ClientCloseableResource {
+ /** No failover flag mask. */
+ private static final byte NO_FAILOVER_FLAG_MASK = 0x01;
+
+ /** No result cache flag mask. */
+ private static final byte NO_RESULT_CACHE_FLAG_MASK = 0x02;
+
+ /** Context. */
+ private final ClientConnectionContext ctx;
+
+ /** Logger. */
+ private final IgniteLogger log;
+
+ /** Task id. */
+ private volatile long taskId;
+
+ /** Task future. */
+ private volatile ComputeTaskInternalFuture<Object> taskFut;
+
+ /** Task closed flag. */
+ private final AtomicBoolean closed = new AtomicBoolean();
+
+ /**
+ * Ctor.
+ *
+ * @param ctx Connection context.
+ */
+ ClientComputeTask(ClientConnectionContext ctx) {
+ assert ctx != null;
+
+ this.ctx = ctx;
+
+ log = ctx.kernalContext().log(getClass());
+ }
+
+ /**
+ * @param taskId Task ID.
+ * @param taskName Task name.
+ * @param arg Task arguments.
+ * @param nodeIds Nodes to run task jobs.
+ * @param flags Flags for task.
+ * @param timeout Task timeout.
+ */
+ void execute(long taskId, String taskName, Object arg, Set<UUID> nodeIds, byte flags, long timeout) {
+ assert taskName != null;
+
+ this.taskId = taskId;
+
+ GridTaskProcessor task = ctx.kernalContext().task();
+
+ IgnitePredicate<ClusterNode> nodePredicate = F.isEmpty(nodeIds) ? F.alwaysTrue() : F.nodeForNodeIds(nodeIds);
+ UUID subjId = ctx.securityContext() == null ? null : ctx.securityContext().subject().id();
+
+ task.setThreadContext(TC_SUBGRID_PREDICATE, nodePredicate);
+ task.setThreadContextIfNotNull(TC_SUBJ_ID, subjId);
+ task.setThreadContext(TC_TIMEOUT, timeout);
+ task.setThreadContext(TC_NO_FAILOVER, (flags & NO_FAILOVER_FLAG_MASK) != 0);
+ task.setThreadContext(TC_NO_RESULT_CACHE, (flags & NO_RESULT_CACHE_FLAG_MASK) != 0);
+
+ taskFut = task.execute(taskName, arg);
+
+ // Fail fast.
+ if (taskFut.isDone() && taskFut.error() != null)
+ throw new IgniteClientException(ClientStatus.FAILED, taskFut.error().getMessage());
+ }
+
+ /**
+ * Callback for response sent event.
+ */
+ void onResponseSent() {
+ // Listener should be registered only after response for this task was sent, to ensure that client doesn't
+ // receive notification before response for the task.
+ taskFut.listen(f -> {
+ try {
+ ClientNotification notification;
+
+ if (f.error() != null)
+ notification = new ClientNotification(OP_COMPUTE_TASK_FINISHED, taskId, f.error().getMessage());
+ else if (f.isCancelled())
+ notification = new ClientNotification(OP_COMPUTE_TASK_FINISHED, taskId, "Task was cancelled");
+ else
+ notification = new ClientObjectNotification(OP_COMPUTE_TASK_FINISHED, taskId, f.result());
+
+ ctx.notifyClient(notification);
+ }
+ finally {
+ // If task was explicitly closed before, resource is already released.
+ if (closed.compareAndSet(false, true)) {
+ ctx.decrementActiveTasksCount();
+
+ ctx.resources().release(taskId);
+ }
+ }
+ });
+ }
+
+ /**
+ * Gets task ID.
+ */
+ long taskId() {
+ return taskId;
+ }
+
+ /**
+ * Closes the task resource.
+ */
+ @Override public void close() {
+ if (closed.compareAndSet(false, true)) {
+ ctx.decrementActiveTasksCount();
+
+ try {
+ if (taskFut != null)
+ taskFut.cancel();
+ }
+ catch (IgniteCheckedException e) {
+ log.warning("Failed to cancel task", e);
+ }
+ }
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/compute/ClientExecuteTaskRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/compute/ClientExecuteTaskRequest.java
new file mode 100644
index 0000000..9924b91
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/compute/ClientExecuteTaskRequest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.client.compute;
+
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.binary.BinaryRawReader;
+import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
+import org.apache.ignite.internal.processors.platform.client.ClientRequest;
+import org.apache.ignite.internal.processors.platform.client.ClientResponse;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * Request to execute compute task.
+ */
+public class ClientExecuteTaskRequest extends ClientRequest {
+ /** Nodes to execute task. */
+ private final Set<UUID> nodeIds;
+
+ /** Task name. */
+ private final String taskName;
+
+ /** Task argument. */
+ private final Object arg;
+
+ /** Task timeout. */
+ private final long timeout;
+
+ /** Task flags. */
+ private final byte flags;
+
+ /**
+ * Constructor.
+ *
+ * @param reader Reader.
+ */
+ public ClientExecuteTaskRequest(BinaryRawReader reader) {
+ super(reader);
+
+ int cnt = reader.readInt();
+
+ nodeIds = U.newHashSet(cnt);
+
+ for (int i = 0; i < cnt; i++)
+ nodeIds.add(reader.readUuid());
+
+ flags = reader.readByte();
+
+ timeout = reader.readLong();
+
+ taskName = reader.readString();
+
+ arg = reader.readObject();
+ }
+
+ /** {@inheritDoc} */
+ @Override public ClientResponse process(ClientConnectionContext ctx) {
+ ClientComputeTask task = new ClientComputeTask(ctx);
+
+ ctx.incrementActiveTasksCount();
+
+ long taskId = ctx.resources().put(task);
+
+ try {
+ task.execute(taskId, taskName, arg, nodeIds, flags, timeout);
+ }
+ catch (Exception e) {
+ ctx.resources().release(taskId);
+
+ throw e;
+ }
+
+ return new ClientExecuteTaskResponse(requestId(), task);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/compute/ClientExecuteTaskResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/compute/ClientExecuteTaskResponse.java
new file mode 100644
index 0000000..b60b0fe
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/compute/ClientExecuteTaskResponse.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.client.compute;
+
+import org.apache.ignite.internal.binary.BinaryRawWriterEx;
+import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
+import org.apache.ignite.internal.processors.platform.client.ClientResponse;
+
+/**
+ * Execute task response.
+ */
+public class ClientExecuteTaskResponse extends ClientResponse {
+ /** */
+ private final ClientComputeTask task;
+
+ /**
+ * Constructor.
+ *
+ * @param reqId Request id.
+ * @param task Compute task.
+ */
+ public ClientExecuteTaskResponse(long reqId, ClientComputeTask task) {
+ super(reqId);
+
+ this.task = task;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void encode(ClientConnectionContext ctx, BinaryRawWriterEx writer) {
+ super.encode(ctx, writer);
+
+ writer.writeLong(task.taskId());
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onSent() {
+ task.onResponseSent();
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
index a16dbf3..5327919 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
@@ -1833,7 +1833,9 @@ public class PlatformConfigurationUtils {
if (in.readBoolean()) {
cfg.setThinClientConfiguration(new ThinClientConfiguration()
- .setMaxActiveTxPerConnection(in.readInt()));
+ .setMaxActiveTxPerConnection(in.readInt())
+ .setMaxActiveComputeTasksPerConnection(in.readInt())
+ );
}
return cfg;
@@ -1871,6 +1873,7 @@ public class PlatformConfigurationUtils {
if (thinCfg != null) {
w.writeBoolean(true);
w.writeInt(thinCfg.getMaxActiveTxPerConnection());
+ w.writeInt(thinCfg.getMaxActiveComputeTasksPerConnection());
}
else
w.writeBoolean(false);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ComputeTaskTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ComputeTaskTest.java
new file mode 100644
index 0000000..7dc23f2
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ComputeTaskTest.java
@@ -0,0 +1,699 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.client.ClientCache;
+import org.apache.ignite.client.ClientClusterGroup;
+import org.apache.ignite.client.ClientCompute;
+import org.apache.ignite.client.ClientException;
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.compute.ComputeJobResultPolicy;
+import org.apache.ignite.compute.ComputeTask;
+import org.apache.ignite.compute.ComputeTaskAdapter;
+import org.apache.ignite.compute.ComputeTaskName;
+import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.configuration.ClientConnectorConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.ThinClientConfiguration;
+import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.mxbean.ClientProcessorMXBean;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * Checks compute grid funtionality of thin client.
+ */
+@SuppressWarnings("ThrowableNotThrown")
+public class ComputeTaskTest extends GridCommonAbstractTest {
+ /** Grids count. */
+ private static final int GRIDS_CNT = 4;
+
+ /** Active tasks limit. */
+ private static final int ACTIVE_TASKS_LIMIT = 50;
+
+ /** Default timeout value. */
+ private static final long TIMEOUT = 1_000L;
+
+ /** Test task name. */
+ private static final String TEST_TASK_NAME = "TestTask";
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ return super.getConfiguration(igniteInstanceName).setClientConnectorConfiguration(
+ new ClientConnectorConfiguration().setThinClientConfiguration(
+ new ThinClientConfiguration().setMaxActiveComputeTasksPerConnection(
+ getTestIgniteInstanceIndex(igniteInstanceName) <= 1 ? ACTIVE_TASKS_LIMIT : 0)));
+ }
+
+ /**
+ *
+ */
+ private IgniteClient startClient(int ... gridIdxs) {
+ String[] addrs = new String[gridIdxs.length];
+
+ for (int i = 0; i < gridIdxs.length; i++)
+ addrs[i] = "127.0.0.1:" + (ClientConnectorConfiguration.DFLT_PORT + gridIdxs[i]);
+
+ return Ignition.startClient(new ClientConfiguration().setAddresses(addrs));
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ startGrids(GRIDS_CNT);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+
+ super.afterTestsStopped();
+ }
+
+ /**
+ *
+ */
+ @Test
+ public void testExecuteTaskByClassName() throws Exception {
+ try (IgniteClient client = startClient(0)) {
+ T2<UUID, Set<UUID>> val = client.compute().execute(TestTask.class.getName(), null);
+
+ assertEquals(nodeId(0), val.get1());
+ assertEquals(new HashSet<>(F.nodeIds(grid(0).cluster().nodes())), val.get2());
+ }
+ }
+
+ /**
+ *
+ */
+ @Test(expected = ClientException.class)
+ public void testComputeDisabled() throws Exception {
+ // Only grid(0) and grid(1) is allowed to execute thin client compute tasks.
+ try (IgniteClient client = startClient(2)) {
+ client.compute().execute(TestTask.class.getName(), null);
+ }
+ }
+
+ /**
+ *
+ */
+ @Test
+ public void testExecuteTaskByName() throws Exception {
+ try (IgniteClient client = startClient(0)) {
+ // We should deploy class manually to make it visible to server node by task name.
+ grid(0).compute().localDeployTask(TestTask.class, TestTask.class.getClassLoader());
+
+ T2<UUID, Set<UUID>> val = client.compute().execute(TEST_TASK_NAME, null);
+
+ assertEquals(nodeId(0), val.get1());
+ assertEquals(new HashSet<>(F.nodeIds(grid(0).cluster().nodes())), val.get2());
+ }
+ }
+
+ /**
+ *
+ */
+ @Test
+ public void testExecuteTaskAsync() throws Exception {
+ try (IgniteClient client = startClient(0)) {
+ TestLatchTask.latch = new CountDownLatch(1);
+
+ Future<T2<UUID, Set<UUID>>> fut = client.compute().executeAsync(TestLatchTask.class.getName(), null);
+
+ GridTestUtils.assertThrowsAnyCause(
+ null,
+ () -> fut.get(10L, TimeUnit.MILLISECONDS),
+ TimeoutException.class,
+ null
+ );
+
+ assertFalse(fut.isDone());
+
+ TestLatchTask.latch.countDown();
+
+ T2<UUID, Set<UUID>> val = fut.get();
+
+ assertTrue(fut.isDone());
+ assertEquals(nodeId(0), val.get1());
+ assertEquals(new HashSet<>(F.nodeIds(grid(0).cluster().nodes())), val.get2());
+ }
+ }
+
+ /**
+ *
+ */
+ @Test(expected = CancellationException.class)
+ public void testTaskCancellation() throws Exception {
+ try (IgniteClient client = startClient(0)) {
+ Future<T2<UUID, List<UUID>>> fut = client.compute().executeAsync(TestTask.class.getName(), TIMEOUT);
+
+ assertFalse(fut.isCancelled());
+ assertFalse(fut.isDone());
+
+ fut.cancel(true);
+
+ assertTrue(((ClientComputeImpl)client.compute()).activeTaskFutures().isEmpty());
+
+ assertTrue(fut.isCancelled());
+ assertTrue(fut.isDone());
+
+ fut.get();
+ }
+ }
+
+ /**
+ *
+ */
+ @Test(expected = ClientException.class)
+ public void testTaskWithTimeout() throws Exception {
+ try (IgniteClient client = startClient(0)) {
+ client.compute().withTimeout(TIMEOUT / 5).execute(TestTask.class.getName(), TIMEOUT);
+ }
+ }
+
+ /**
+ *
+ */
+ @Test
+ public void testTaskWithNoResultCache() throws Exception {
+ try (IgniteClient client = startClient(0)) {
+ ClientCompute computeWithCache = client.compute();
+ ClientCompute computeWithNoCache = client.compute().withNoResultCache();
+
+ assertTrue(computeWithCache.execute(TestResultCacheTask.class.getName(), null));
+ assertFalse(computeWithNoCache.execute(TestResultCacheTask.class.getName(), null));
+ }
+ }
+
+ /**
+ *
+ */
+ @Test
+ public void testTaskWithNoFailover() throws Exception {
+ try (IgniteClient client = startClient(0)) {
+ ClientCompute computeWithFailover = client.compute();
+ ClientCompute computeWithNoFailover = client.compute().withNoFailover();
+
+ assertTrue(computeWithFailover.execute(TestFailoverTask.class.getName(), null));
+ assertFalse(computeWithNoFailover.execute(TestFailoverTask.class.getName(), null));
+ }
+ }
+
+ /**
+ *
+ */
+ @Test
+ public void testExecuteTaskOnClusterGroup() throws Exception {
+ try (IgniteClient client = startClient(0)) {
+ ClientClusterGroup grp = client.cluster().forNodeIds(nodeIds(1, 2));
+
+ T2<UUID, List<UUID>> val = client.compute(grp).execute(TestTask.class.getName(), null);
+
+ assertEquals(nodeId(0), val.get1());
+ assertEquals(nodeIds(1, 2), val.get2());
+ }
+ }
+
+
+ /**
+ *
+ */
+ @Test(expected = ClientException.class)
+ public void testExecuteTaskOnEmptyClusterGroup() throws Exception {
+ try (IgniteClient client = startClient(0)) {
+ ClientClusterGroup grp = client.cluster().forNodeIds(Collections.emptyList());
+
+ client.compute(grp).execute(TestTask.class.getName(), null);
+ }
+ }
+
+ /**
+ *
+ */
+ @Test
+ public void testComputeWithMixedModificators() throws Exception {
+ try (IgniteClient client = startClient(0)) {
+ ClientClusterGroup grp = client.cluster().forNodeId(nodeId(1), nodeId(2));
+
+ ClientCompute compute = client.compute(grp).withNoFailover().withNoResultCache().withTimeout(TIMEOUT / 5);
+
+ T2<UUID, List<UUID>> val = client.compute(grp).execute(TestTask.class.getName(), null);
+
+ assertEquals(nodeIds(1, 2), val.get2());
+
+ assertFalse(compute.execute(TestFailoverTask.class.getName(), null));
+
+ assertFalse(compute.execute(TestResultCacheTask.class.getName(), null));
+
+ GridTestUtils.assertThrowsAnyCause(
+ null,
+ () -> compute.execute(TestTask.class.getName(), TIMEOUT),
+ ClientException.class,
+ null
+ );
+ }
+ }
+
+ /**
+ *
+ */
+ @Test(expected = ClientException.class)
+ public void testExecuteUnknownTask() throws Exception {
+ try (IgniteClient client = startClient(0)) {
+ client.compute().execute("NoSuchTask", null);
+ }
+ }
+
+ /**
+ *
+ */
+ @Test
+ public void testExecuteTaskConnectionLost() throws Exception {
+ try (IgniteClient client = startClient(0, 1)) {
+ ClientComputeImpl compute = (ClientComputeImpl)client.compute();
+
+ Future<Object> fut1 = compute.executeAsync(TestTask.class.getName(), TIMEOUT);
+
+ dropAllThinClientConnections();
+
+ Future<Object> fut2 = compute.executeAsync(TestTask.class.getName(), TIMEOUT);
+
+ dropAllThinClientConnections();
+
+ TestLatchTask.latch = new CountDownLatch(1);
+
+ Future<Object> fut3 = compute.executeAsync(TestLatchTask.class.getName(), null);
+
+ assertEquals(1, compute.activeTaskFutures().size());
+
+ compute.execute(TestTask.class.getName(), null);
+
+ assertEquals(1, compute.activeTaskFutures().size());
+
+ assertTrue(fut1.isDone());
+
+ GridTestUtils.assertThrowsAnyCause(null, fut1::get, ClientException.class, "closed");
+
+ assertTrue(fut2.isDone());
+
+ GridTestUtils.assertThrowsAnyCause(null, fut2::get, ClientException.class, "closed");
+
+ assertFalse(fut3.isDone());
+
+ TestLatchTask.latch.countDown();
+
+ fut3.get(TIMEOUT, TimeUnit.MILLISECONDS);
+
+ assertTrue(compute.activeTaskFutures().isEmpty());
+ }
+ }
+
+ /**
+ *
+ */
+ @Test
+ public void testExecuteTwoTasksMisorderedResults() throws Exception {
+ try (IgniteClient client = startClient(0)) {
+ ClientCompute compute1 = client.compute(client.cluster().forNodeId(nodeId(1)));
+ ClientCompute compute2 = client.compute(client.cluster().forNodeId(nodeId(2)));
+
+ CountDownLatch latch1 = TestLatchTask.latch = new CountDownLatch(1);
+
+ Future<T2<UUID, Set<UUID>>> fut1 = compute1.executeAsync(TestLatchTask.class.getName(), null);
+
+ CountDownLatch latch2 = TestLatchTask.latch = new CountDownLatch(1);
+
+ Future<T2<UUID, Set<UUID>>> fut2 = compute2.executeAsync(TestLatchTask.class.getName(), null);
+
+ latch2.countDown();
+
+ assertEquals(nodeIds(2), fut2.get().get2());
+
+ assertFalse(fut1.isDone());
+
+ latch1.countDown();
+
+ assertEquals(nodeIds(1), fut1.get().get2());
+ }
+ }
+
+ /**
+ *
+ */
+ @Test
+ public void testExecuteTaskTwoClientsToOneNode() throws Exception {
+ try (IgniteClient client1 = startClient(0); IgniteClient client2 = startClient(0)) {
+ ClientCompute compute1 = client1.compute(client1.cluster().forNodeId(nodeId(1)));
+ ClientCompute compute2 = client2.compute(client2.cluster().forNodeId(nodeId(2)));
+
+ CountDownLatch latch1 = TestLatchTask.latch = new CountDownLatch(1);
+
+ Future<T2<UUID, Set<UUID>>> fut1 = compute1.executeAsync(TestLatchTask.class.getName(), null);
+
+ CountDownLatch latch2 = TestLatchTask.latch = new CountDownLatch(1);
+
+ Future<T2<UUID, Set<UUID>>> fut2 = compute2.executeAsync(TestLatchTask.class.getName(), null);
+
+ latch2.countDown();
+
+ assertEquals(nodeIds(2), fut2.get().get2());
+
+ assertFalse(fut1.isDone());
+
+ latch1.countDown();
+
+ assertEquals(nodeIds(1), fut1.get().get2());
+ }
+ }
+
+ /**
+ *
+ */
+ @Test
+ public void testActiveTasksLimit() throws Exception {
+ try (IgniteClient client = startClient(0)) {
+ ClientCompute compute = client.compute(client.cluster().forNodeId(nodeId(1)));
+
+ CountDownLatch latch = TestLatchTask.latch = new CountDownLatch(1);
+
+ List<Future<T2<UUID, Set<UUID>>>> futs = new ArrayList<>(ACTIVE_TASKS_LIMIT);
+
+ for (int i = 0; i < ACTIVE_TASKS_LIMIT; i++)
+ futs.add(compute.executeAsync(TestLatchTask.class.getName(), null));
+
+ // Check that we can't start more tasks.
+ GridTestUtils.assertThrowsAnyCause(
+ null,
+ () -> compute.executeAsync(TestLatchTask.class.getName(), null),
+ ClientException.class,
+ "limit"
+ );
+
+ // Check that cancelled tasks restore limit.
+ for (int i = 0; i < ACTIVE_TASKS_LIMIT / 2; i++)
+ futs.get(i).cancel(true);
+
+ latch.countDown();
+
+ // Check that successfully complited tasks restore limit.
+ for (int i = ACTIVE_TASKS_LIMIT / 2; i < ACTIVE_TASKS_LIMIT; i++)
+ assertEquals(nodeIds(1), futs.get(i).get(TIMEOUT, TimeUnit.MILLISECONDS).get2());
+
+ // Check that complited with error tasks restore limit.
+ GridTestUtils.assertThrowsAnyCause(
+ null,
+ () -> compute.execute("NoSuchTask", null),
+ ClientException.class,
+ null
+ );
+
+ // Check that we can start up to ACTIVE_TASKS_LIMIT new active tasks again.
+ latch = TestLatchTask.latch = new CountDownLatch(1);
+
+ futs = new ArrayList<>(ACTIVE_TASKS_LIMIT);
+
+ for (int i = 0; i < ACTIVE_TASKS_LIMIT; i++)
+ futs.add(compute.executeAsync(TestLatchTask.class.getName(), null));
+
+ latch.countDown();
+
+ for (Future<T2<UUID, Set<UUID>>> fut : futs)
+ assertEquals(nodeIds(1), fut.get(TIMEOUT, TimeUnit.MILLISECONDS).get2());
+ }
+ }
+
+ /**
+ *
+ */
+ @Test
+ @Ignore("https://issues.apache.org/jira/browse/IGNITE-12845")
+ public void testExecuteTaskConcurrentLoad() throws Exception {
+ try (IgniteClient client = startClient(0)) {
+ int threadsCnt = 20;
+ int iterations = 20;
+
+ ClientCache<Integer, Integer> cache = client.getOrCreateCache(DEFAULT_CACHE_NAME);
+
+ AtomicInteger threadIdxs = new AtomicInteger();
+
+ CyclicBarrier barrier = new CyclicBarrier(threadsCnt);
+
+ GridTestUtils.runMultiThreaded(
+ () -> {
+ int threadIdx = threadIdxs.incrementAndGet();
+
+ Random rnd = new Random();
+
+ try {
+ barrier.await();
+
+ for (int i = 0; i < iterations; i++) {
+ int nodeIdx = rnd.nextInt(GRIDS_CNT);
+
+ cache.put(threadIdx, i);
+
+ ClientCompute compute = client.compute(client.cluster().forNodeId(nodeId(nodeIdx)));
+
+ Future<T2<UUID, Set<UUID>>> fut = compute.executeAsync(TestTask.class.getName(), null);
+
+ assertEquals((Integer)i, cache.get(threadIdx));
+
+ assertEquals(nodeIds(nodeIdx), fut.get().get2());
+ }
+ }
+ catch (ExecutionException e) {
+ log.error("Task failed: ", e);
+
+ fail("Task failed");
+ }
+ catch (InterruptedException | BrokenBarrierException ignore) {
+ // No-op.
+ }
+
+ }, threadsCnt, "run-task-async");
+
+ assertTrue(((ClientComputeImpl)client.compute()).activeTaskFutures().isEmpty());
+ }
+ }
+
+ /**
+ * Returns set of node IDs by given grid indexes.
+ *
+ * @param gridIdxs Grid indexes.
+ */
+ private Set<UUID> nodeIds(int ... gridIdxs) {
+ Set<UUID> res = new HashSet<>();
+
+ for (int i : gridIdxs)
+ res.add(nodeId(i));
+
+ return res;
+ }
+
+ /**
+ *
+ */
+ private void dropAllThinClientConnections() {
+ for (Ignite ignite : G.allGrids()) {
+ ClientProcessorMXBean mxBean = getMxBean(ignite.name(), "Clients",
+ ClientListenerProcessor.class, ClientProcessorMXBean.class);
+
+ mxBean.dropAllConnections();
+ }
+ }
+
+ /**
+ * Compute job which returns node id where it was executed.
+ */
+ private static class TestJob implements ComputeJob {
+ /** Ignite. */
+ @IgniteInstanceResource
+ Ignite ignite;
+
+ /** Sleep time. */
+ private final Long sleepTime;
+
+ /**
+ * @param sleepTime Sleep time.
+ */
+ private TestJob(Long sleepTime) {
+ this.sleepTime = sleepTime;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void cancel() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object execute() throws IgniteException {
+ if (sleepTime != null)
+ doSleep(sleepTime);
+
+ return ignite.cluster().localNode().id();
+ }
+ }
+
+ /**
+ * Compute task which returns node id for routing node and list of node ids for each node was affected.
+ */
+ @ComputeTaskName(TEST_TASK_NAME)
+ private static class TestTask extends ComputeTaskAdapter<Long, T2<UUID, Set<UUID>>> {
+ /** Ignite. */
+ @IgniteInstanceResource
+ Ignite ignite;
+
+ /** {@inheritDoc} */
+ @Override public @NotNull Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid,
+ @Nullable Long arg) throws IgniteException {
+ return subgrid.stream().collect(Collectors.toMap(node -> new TestJob(arg), node -> node));
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public T2<UUID, Set<UUID>> reduce(List<ComputeJobResult> results) throws IgniteException {
+ return new T2<>(ignite.cluster().localNode().id(),
+ results.stream().map(res -> (UUID)res.getData()).collect(Collectors.toSet()));
+ }
+ }
+
+ /**
+ * Compute task with latch on routing node.
+ */
+ @ComputeTaskName("TestLatchTask")
+ private static class TestLatchTask extends TestTask {
+ /** Global latch. */
+ private static volatile CountDownLatch latch;
+
+ /** Local latch. */
+ private final CountDownLatch locLatch;
+
+ /**
+ * Default constructor.
+ */
+ public TestLatchTask() {
+ locLatch = latch;
+ }
+
+ /** {@inheritDoc} */
+ @Override public @Nullable T2<UUID, Set<UUID>> reduce(List<ComputeJobResult> results) throws IgniteException {
+ try {
+ if (locLatch != null)
+ locLatch.await(TIMEOUT, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException ignore) {
+ // No-op.
+ }
+
+ return super.reduce(results);
+ }
+ }
+
+ /**
+ * Task to test failover.
+ */
+ private static class TestFailoverTask implements ComputeTask<Long, Boolean> {
+ /** */
+ private final AtomicBoolean firstJobProcessed = new AtomicBoolean();
+
+ /** */
+ private volatile boolean failedOver;
+
+ /** {@inheritDoc} */
+ @Override public @NotNull Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid,
+ @Nullable Long arg) throws IgniteException {
+ return F.asMap(new TestJob(null), subgrid.get(0));
+ }
+
+ /** {@inheritDoc} */
+ @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) {
+ if (firstJobProcessed.compareAndSet(false, true))
+ return ComputeJobResultPolicy.FAILOVER;
+ else {
+ failedOver = true;
+
+ return ComputeJobResultPolicy.WAIT;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public Boolean reduce(List<ComputeJobResult> results) throws IgniteException {
+ return failedOver;
+ }
+ }
+
+ /**
+ * Task to test "no result cache" flag.
+ */
+ private static class TestResultCacheTask implements ComputeTask<Long, Boolean> {
+ /** Is result cached. */
+ private volatile boolean cached;
+
+ /** {@inheritDoc} */
+ @Override public @NotNull Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid,
+ @Nullable Long arg) throws IgniteException {
+ return F.asMap(new TestJob(null), subgrid.get(0));
+ }
+
+ /** {@inheritDoc} */
+ @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) {
+ if (!F.isEmpty(rcvd))
+ cached = true;
+
+ return ComputeJobResultPolicy.WAIT;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public Boolean reduce(List<ComputeJobResult> results) throws IgniteException {
+ return cached;
+ }
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractPartitionAwarenessTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractPartitionAwarenessTest.java
index 8d335f8..9adb507 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractPartitionAwarenessTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractPartitionAwarenessTest.java
@@ -323,7 +323,7 @@ public abstract class ThinClientAbstractPartitionAwarenessTest extends GridCommo
}
/** {@inheritDoc} */
- @Override public void close() throws Exception {
+ @Override public void close() {
super.close();
closed = true;
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessResourceReleaseTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessResourceReleaseTest.java
index 281be74e..7dc6222 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessResourceReleaseTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessResourceReleaseTest.java
@@ -24,6 +24,7 @@ import org.apache.ignite.testframework.GridTestUtils;
import org.junit.Test;
import static org.apache.ignite.internal.client.thin.ReliableChannel.ASYNC_RUNNER_THREAD_NAME;
+import static org.apache.ignite.internal.client.thin.TcpClientChannel.RECEIVER_THREAD_PREFIX;
/**
* Test resource releasing by thin client.
@@ -46,12 +47,14 @@ public class ThinClientPartitionAwarenessResourceReleaseTest extends ThinClientA
assertFalse(channels[0].isClosed());
assertFalse(channels[1].isClosed());
assertEquals(1, threadsCount(ASYNC_RUNNER_THREAD_NAME));
+ assertEquals(2, threadsCount(RECEIVER_THREAD_PREFIX));
client.close();
assertTrue(channels[0].isClosed());
assertTrue(channels[1].isClosed());
assertTrue(GridTestUtils.waitForCondition(() -> threadsCount(ASYNC_RUNNER_THREAD_NAME) == 0, 1_000L));
+ assertTrue(GridTestUtils.waitForCondition(() -> threadsCount(RECEIVER_THREAD_PREFIX) == 0, 1_000L));
}
/**
@@ -65,7 +68,7 @@ public class ThinClientPartitionAwarenessResourceReleaseTest extends ThinClientA
for (long id : threadIds) {
ThreadInfo info = U.getThreadMx().getThreadInfo(id);
- if (info != null && info.getThreadState() != Thread.State.TERMINATED && name.equals(info.getThreadName()))
+ if (info != null && info.getThreadState() != Thread.State.TERMINATED && info.getThreadName().startsWith(name))
cnt++;
}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
index f0ac9a8..9059aa7 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
@@ -17,6 +17,7 @@
package org.apache.ignite.client;
+import org.apache.ignite.internal.client.thin.ComputeTaskTest;
import org.apache.ignite.internal.client.thin.ThinClientPartitionAwarenessResourceReleaseTest;
import org.apache.ignite.internal.client.thin.ThinClientPartitionAwarenessStableTopologyTest;
import org.apache.ignite.internal.client.thin.ThinClientPartitionAwarenessUnstableTopologyTest;
@@ -41,6 +42,7 @@ import org.junit.runners.Suite;
ConnectionTest.class,
ConnectToStartingNodeTest.class,
AsyncChannelTest.class,
+ ComputeTaskTest.class,
ThinClientPartitionAwarenessStableTopologyTest.class,
ThinClientPartitionAwarenessUnstableTopologyTest.class,
ThinClientPartitionAwarenessResourceReleaseTest.class
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/full-config.xml b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/full-config.xml
index 4678d3d..e9e5ff4 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/full-config.xml
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/full-config.xml
@@ -121,7 +121,7 @@
</memoryConfiguration>
<sqlConnectorConfiguration host='bar' port='10' portRange='11' socketSendBufferSize='12' socketReceiveBufferSize='13' tcpNoDelay='true' maxOpenCursorsPerConnection='14' threadPoolSize='15' />
<clientConnectorConfiguration host='bar' port='10' portRange='11' socketSendBufferSize='12' socketReceiveBufferSize='13' tcpNoDelay='true' maxOpenCursorsPerConnection='14' threadPoolSize='15' idleTimeout = "00:00:19">
- <thinClientConfiguration maxActiveTxPerConnection='20' />
+ <thinClientConfiguration maxActiveTxPerConnection='20' maxActiveComputeTasksPerConnection='21' />
</clientConnectorConfiguration>
<persistentStoreConfiguration alwaysWriteFullPages='true' checkpointingFrequency='00:00:1' checkpointingPageBufferSize='2'
checkpointingThreads='3' lockWaitTime='00:00:04' persistentStorePath='foo' tlbSize='5'
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationSerializerTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationSerializerTest.cs
index c05e798..bffe94a 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationSerializerTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationSerializerTest.cs
@@ -271,6 +271,7 @@ namespace Apache.Ignite.Core.Tests
Assert.AreEqual(15, client.ThreadPoolSize);
Assert.AreEqual(19, client.IdleTimeout.TotalSeconds);
Assert.AreEqual(20, client.ThinClientConfiguration.MaxActiveTxPerConnection);
+ Assert.AreEqual(21, client.ThinClientConfiguration.MaxActiveComputeTasksPerConnection);
var pers = cfg.PersistentStoreConfiguration;
@@ -944,8 +945,10 @@ namespace Apache.Ignite.Core.Tests
JdbcEnabled = false,
ThreadPoolSize = 7,
IdleTimeout = TimeSpan.FromMinutes(5),
- ThinClientConfiguration = new ThinClientConfiguration {
- MaxActiveTxPerConnection = 8
+ ThinClientConfiguration = new ThinClientConfiguration
+ {
+ MaxActiveTxPerConnection = 8,
+ MaxActiveComputeTasksPerConnection = 9
}
},
PersistentStoreConfiguration = new PersistentStoreConfiguration
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Configuration/ClientConnectorConfiguration.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Configuration/ClientConnectorConfiguration.cs
index 4ab5696..74ac44b 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Configuration/ClientConnectorConfiguration.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Configuration/ClientConnectorConfiguration.cs
@@ -131,7 +131,8 @@ namespace Apache.Ignite.Core.Configuration
{
ThinClientConfiguration = new ThinClientConfiguration
{
- MaxActiveTxPerConnection = reader.ReadInt()
+ MaxActiveTxPerConnection = reader.ReadInt(),
+ MaxActiveComputeTasksPerConnection = reader.ReadInt()
};
}
}
@@ -164,6 +165,7 @@ namespace Apache.Ignite.Core.Configuration
{
writer.WriteBoolean(true);
writer.WriteInt(ThinClientConfiguration.MaxActiveTxPerConnection);
+ writer.WriteInt(ThinClientConfiguration.MaxActiveComputeTasksPerConnection);
}
else
{
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Configuration/ThinClientConfiguration.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Configuration/ThinClientConfiguration.cs
index 7c9fcca..015dd22 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Configuration/ThinClientConfiguration.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Configuration/ThinClientConfiguration.cs
@@ -30,11 +30,17 @@ namespace Apache.Ignite.Core.Configuration
public const int DefaultMaxActiveTxPerConnection = 100;
/// <summary>
+ /// Default limit of active compute tasks per connection.
+ /// </summary>
+ public const int DefaultMaxActiveComputeTasksPerConnection = 0;
+
+ /// <summary>
/// Initializes a new instance of the <see cref="ThinClientConfiguration"/> class.
/// </summary>
public ThinClientConfiguration()
{
MaxActiveTxPerConnection = DefaultMaxActiveTxPerConnection;
+ MaxActiveComputeTasksPerConnection = DefaultMaxActiveComputeTasksPerConnection;
}
/// <summary>
@@ -42,5 +48,12 @@ namespace Apache.Ignite.Core.Configuration
/// </summary>
[DefaultValue(DefaultMaxActiveTxPerConnection)]
public int MaxActiveTxPerConnection { get; set; }
+
+ /// <summary>
+ /// Gets or sets active compute tasks per connection limit.
+ /// Value <c>0</c> means that compute grid functionality is disabled for thin clients.
+ /// </summary>
+ [DefaultValue(DefaultMaxActiveComputeTasksPerConnection)]
+ public int MaxActiveComputeTasksPerConnection { get; set; }
}
}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd b/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd
index 74dc534..0ab59ec 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd
@@ -1512,6 +1512,11 @@
<xs:documentation>Active transactions count per connection limit.</xs:documentation>
</xs:annotation>
</xs:attribute>
+ <xs:attribute name="maxActiveComputeTasksPerConnection" type="xs:int">
+ <xs:annotation>
+ <xs:documentation>Active compute tasks per connection limit. Or 0 if compute grid functionality is disabled for thin clients.</xs:documentation>
+ </xs:annotation>
+ </xs:attribute>
</xs:complexType>
</xs:element>
</xs:all>