You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2022/03/18 09:04:43 UTC

[GitHub] [ignite-3] SammyVimes commented on a change in pull request #734: IGNITE-16616 [Compute Grid] Implement execute method of IgniteCompute interface

SammyVimes commented on a change in pull request #734:
URL: https://github.com/apache/ignite-3/pull/734#discussion_r829755730



##########
File path: modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute;
+
+import java.lang.reflect.Constructor;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.JobExecutionContext;
+import org.apache.ignite.configuration.schemas.compute.ComputeConfiguration;
+import org.apache.ignite.internal.compute.message.ExecuteRequest;
+import org.apache.ignite.internal.compute.message.ExecuteResponse;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.NetworkAddress;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Implementation of {@link ComputeComponent}.
+ */
+public class ComputeComponentImpl implements ComputeComponent {
+    private static final long NETWORK_TIMEOUT_MILLIS = Long.MAX_VALUE;
+
+    private static final long THREAD_KEEP_ALIVE_SECONDS = 60;
+
+    private final Ignite ignite;
+    private final MessagingService messagingService;
+    private final ComputeConfiguration configuration;
+
+    private ExecutorService jobExecutorService;
+
+    private final ClassLoader jobClassLoader = Thread.currentThread().getContextClassLoader();
+
+    private final ComputeMessagesFactory messagesFactory = new ComputeMessagesFactory();
+
+    /** Busy lock to stop synchronously. */
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    /** Prevents double stopping the component. */
+    private final AtomicBoolean stopGuard = new AtomicBoolean();
+
+    /**
+     * Creates a new instance.
+     */
+    public ComputeComponentImpl(Ignite ignite, MessagingService messagingService, ComputeConfiguration configuration) {
+        this.ignite = ignite;
+        this.messagingService = messagingService;
+        this.configuration = configuration;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public <R> CompletableFuture<R> executeLocally(Class<? extends ComputeJob<R>> jobClass, Object... args) {
+        if (!busyLock.enterBusy()) {
+            return CompletableFuture.failedFuture(new NodeStoppingException());
+        }
+
+        try {
+            return doExecuteLocally(jobClass, args);
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public <R> CompletableFuture<R> executeLocally(String jobClassName, Object... args) {
+        return executeLocally(jobClass(jobClassName), args);
+    }
+
+    private <R> CompletableFuture<R> doExecuteLocally(Class<? extends ComputeJob<R>> jobClass, Object[] args) {
+        assert jobExecutorService != null : "Not started yet!";
+
+        try {
+            return CompletableFuture.supplyAsync(() -> executeOnCurrentThread(jobClass, args), jobExecutorService);
+        } catch (RejectedExecutionException e) {
+            return CompletableFuture.failedFuture(e);
+        }
+    }
+
+    private <R> R executeOnCurrentThread(Class<? extends ComputeJob<R>> jobClass, Object[] args) {
+        ComputeJob<R> job = instantiateJob(jobClass);
+        JobExecutionContext context = new JobExecutionContextImpl(ignite);
+        return job.execute(context, args);
+    }
+
+    private <R> ComputeJob<R> instantiateJob(Class<? extends ComputeJob<R>> jobClass) {
+        try {
+            Constructor<? extends ComputeJob<R>> constructor = jobClass.getDeclaredConstructor();
+
+            if (!constructor.canAccess(null)) {
+                constructor.setAccessible(true);
+            }
+
+            return constructor.newInstance();
+        } catch (ReflectiveOperationException e) {
+            throw new IgniteInternalException("Cannot instantiate job", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public <R> CompletableFuture<R> executeRemotely(ClusterNode remoteNode, Class<? extends ComputeJob<R>> jobClass, Object... args) {
+        if (!busyLock.enterBusy()) {
+            return CompletableFuture.failedFuture(new NodeStoppingException());
+        }
+
+        try {
+            return doExecuteRemotely(remoteNode, jobClass, args);
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public <R> CompletableFuture<R> executeRemotely(ClusterNode remoteNode, String jobClassName, Object... args) {
+        return executeRemotely(remoteNode, jobClass(jobClassName), args);
+    }
+
+    private <R> CompletableFuture<R> doExecuteRemotely(ClusterNode remoteNode, Class<? extends ComputeJob<R>> jobClass, Object[] args) {
+        ExecuteRequest executeRequest = messagesFactory.executeRequest()
+                .jobClassName(jobClass.getName())
+                .args(args)
+                .build();
+
+        return messagingService.invoke(remoteNode, executeRequest, NETWORK_TIMEOUT_MILLIS)
+                .thenCompose(message -> resultFromExecuteResponse((ExecuteResponse) message));
+    }
+
+    @SuppressWarnings("unchecked")
+    private <R> CompletableFuture<R> resultFromExecuteResponse(ExecuteResponse executeResponse) {
+        if (executeResponse.throwable() != null) {
+            return CompletableFuture.failedFuture(executeResponse.throwable());
+        }
+
+        return CompletableFuture.completedFuture((R) executeResponse.result());
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public synchronized void start() {
+        jobExecutorService = new ThreadPoolExecutor(
+                configuration.threadPoolSize().value(),
+                configuration.threadPoolSize().value(),
+                THREAD_KEEP_ALIVE_SECONDS,
+                TimeUnit.SECONDS,
+                newExecutorServiceTaskQueue(),
+                new NamedThreadFactory("[" + ignite.name() + "] Compute-")
+        );
+
+        messagingService.addMessageHandler(ComputeMessageTypes.class, (message, senderAddr, correlationId) -> {
+            assert correlationId != null;
+
+            if (message instanceof ExecuteRequest) {
+                processExecuteRequest((ExecuteRequest) message, senderAddr, correlationId);
+
+                return;
+            }
+
+            throw new IgniteInternalException("Unexpected message type " + message.getClass());
+        });
+    }
+
+    BlockingQueue<Runnable> newExecutorServiceTaskQueue() {
+        return new LinkedBlockingQueue<>();
+    }
+
+    private void processExecuteRequest(ExecuteRequest executeRequest, NetworkAddress senderAddr, long correlationId) {
+        if (!busyLock.enterBusy()) {
+            sendExecuteResponse(null, new NodeStoppingException(), senderAddr, correlationId);
+            return;
+        }
+
+        try {
+            Class<ComputeJob<Object>> jobClass = jobClass(executeRequest.jobClassName());
+
+            doExecuteLocally(jobClass, executeRequest.args())
+                    .handle((result, ex) -> sendExecuteResponse(result, ex, senderAddr, correlationId));
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    @Nullable
+    private Object sendExecuteResponse(Object result, Throwable ex, NetworkAddress senderAddr, Long correlationId) {
+        ExecuteResponse executeResponse = messagesFactory.executeResponse()
+                .result(result)
+                .throwable(ex)
+                .build();
+
+        messagingService.respond(senderAddr, executeResponse, correlationId);
+
+        return null;
+    }
+
+    @SuppressWarnings("unchecked")
+    private <R, J extends ComputeJob<R>> Class<J> jobClass(String jobClassName) {
+        try {
+            return (Class<J>) Class.forName(jobClassName, true, jobClassLoader);

Review comment:
       Let's check if this class is actually a ComputeJob

##########
File path: modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute;
+
+import java.lang.reflect.Constructor;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.JobExecutionContext;
+import org.apache.ignite.configuration.schemas.compute.ComputeConfiguration;
+import org.apache.ignite.internal.compute.message.ExecuteRequest;
+import org.apache.ignite.internal.compute.message.ExecuteResponse;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.NetworkAddress;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Implementation of {@link ComputeComponent}.
+ */
+public class ComputeComponentImpl implements ComputeComponent {
+    private static final long NETWORK_TIMEOUT_MILLIS = Long.MAX_VALUE;
+
+    private static final long THREAD_KEEP_ALIVE_SECONDS = 60;
+
+    private final Ignite ignite;
+    private final MessagingService messagingService;
+    private final ComputeConfiguration configuration;
+
+    private ExecutorService jobExecutorService;
+
+    private final ClassLoader jobClassLoader = Thread.currentThread().getContextClassLoader();
+
+    private final ComputeMessagesFactory messagesFactory = new ComputeMessagesFactory();
+
+    /** Busy lock to stop synchronously. */
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    /** Prevents double stopping the component. */
+    private final AtomicBoolean stopGuard = new AtomicBoolean();
+
+    /**
+     * Creates a new instance.
+     */
+    public ComputeComponentImpl(Ignite ignite, MessagingService messagingService, ComputeConfiguration configuration) {
+        this.ignite = ignite;
+        this.messagingService = messagingService;
+        this.configuration = configuration;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public <R> CompletableFuture<R> executeLocally(Class<? extends ComputeJob<R>> jobClass, Object... args) {
+        if (!busyLock.enterBusy()) {
+            return CompletableFuture.failedFuture(new NodeStoppingException());
+        }
+
+        try {
+            return doExecuteLocally(jobClass, args);
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public <R> CompletableFuture<R> executeLocally(String jobClassName, Object... args) {
+        return executeLocally(jobClass(jobClassName), args);
+    }
+
+    private <R> CompletableFuture<R> doExecuteLocally(Class<? extends ComputeJob<R>> jobClass, Object[] args) {
+        assert jobExecutorService != null : "Not started yet!";
+
+        try {
+            return CompletableFuture.supplyAsync(() -> executeOnCurrentThread(jobClass, args), jobExecutorService);
+        } catch (RejectedExecutionException e) {
+            return CompletableFuture.failedFuture(e);
+        }
+    }
+
+    private <R> R executeOnCurrentThread(Class<? extends ComputeJob<R>> jobClass, Object[] args) {
+        ComputeJob<R> job = instantiateJob(jobClass);
+        JobExecutionContext context = new JobExecutionContextImpl(ignite);
+        return job.execute(context, args);
+    }
+
+    private <R> ComputeJob<R> instantiateJob(Class<? extends ComputeJob<R>> jobClass) {
+        try {
+            Constructor<? extends ComputeJob<R>> constructor = jobClass.getDeclaredConstructor();
+
+            if (!constructor.canAccess(null)) {
+                constructor.setAccessible(true);
+            }
+
+            return constructor.newInstance();
+        } catch (ReflectiveOperationException e) {
+            throw new IgniteInternalException("Cannot instantiate job", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public <R> CompletableFuture<R> executeRemotely(ClusterNode remoteNode, Class<? extends ComputeJob<R>> jobClass, Object... args) {
+        if (!busyLock.enterBusy()) {
+            return CompletableFuture.failedFuture(new NodeStoppingException());
+        }
+
+        try {
+            return doExecuteRemotely(remoteNode, jobClass, args);
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public <R> CompletableFuture<R> executeRemotely(ClusterNode remoteNode, String jobClassName, Object... args) {
+        return executeRemotely(remoteNode, jobClass(jobClassName), args);
+    }
+
+    private <R> CompletableFuture<R> doExecuteRemotely(ClusterNode remoteNode, Class<? extends ComputeJob<R>> jobClass, Object[] args) {
+        ExecuteRequest executeRequest = messagesFactory.executeRequest()
+                .jobClassName(jobClass.getName())
+                .args(args)
+                .build();
+
+        return messagingService.invoke(remoteNode, executeRequest, NETWORK_TIMEOUT_MILLIS)
+                .thenCompose(message -> resultFromExecuteResponse((ExecuteResponse) message));
+    }
+
+    @SuppressWarnings("unchecked")
+    private <R> CompletableFuture<R> resultFromExecuteResponse(ExecuteResponse executeResponse) {
+        if (executeResponse.throwable() != null) {
+            return CompletableFuture.failedFuture(executeResponse.throwable());
+        }
+
+        return CompletableFuture.completedFuture((R) executeResponse.result());
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public synchronized void start() {
+        jobExecutorService = new ThreadPoolExecutor(
+                configuration.threadPoolSize().value(),
+                configuration.threadPoolSize().value(),
+                THREAD_KEEP_ALIVE_SECONDS,
+                TimeUnit.SECONDS,
+                newExecutorServiceTaskQueue(),
+                new NamedThreadFactory("[" + ignite.name() + "] Compute-")
+        );
+
+        messagingService.addMessageHandler(ComputeMessageTypes.class, (message, senderAddr, correlationId) -> {
+            assert correlationId != null;
+
+            if (message instanceof ExecuteRequest) {
+                processExecuteRequest((ExecuteRequest) message, senderAddr, correlationId);
+
+                return;
+            }
+
+            throw new IgniteInternalException("Unexpected message type " + message.getClass());
+        });
+    }
+
+    BlockingQueue<Runnable> newExecutorServiceTaskQueue() {
+        return new LinkedBlockingQueue<>();
+    }
+
+    private void processExecuteRequest(ExecuteRequest executeRequest, NetworkAddress senderAddr, long correlationId) {
+        if (!busyLock.enterBusy()) {
+            sendExecuteResponse(null, new NodeStoppingException(), senderAddr, correlationId);
+            return;
+        }
+
+        try {
+            Class<ComputeJob<Object>> jobClass = jobClass(executeRequest.jobClassName());
+
+            doExecuteLocally(jobClass, executeRequest.args())
+                    .handle((result, ex) -> sendExecuteResponse(result, ex, senderAddr, correlationId));
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    @Nullable
+    private Object sendExecuteResponse(Object result, Throwable ex, NetworkAddress senderAddr, Long correlationId) {
+        ExecuteResponse executeResponse = messagesFactory.executeResponse()
+                .result(result)
+                .throwable(ex)
+                .build();
+
+        messagingService.respond(senderAddr, executeResponse, correlationId);
+
+        return null;
+    }
+
+    @SuppressWarnings("unchecked")
+    private <R, J extends ComputeJob<R>> Class<J> jobClass(String jobClassName) {
+        try {
+            return (Class<J>) Class.forName(jobClassName, true, jobClassLoader);
+        } catch (ClassNotFoundException e) {
+            throw new IgniteInternalException("Cannot load job class", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void stop() throws Exception {
+        if (!stopGuard.compareAndSet(false, true)) {
+            return;
+        }
+
+        busyLock.block();
+
+        IgniteUtils.shutdownAndAwaitTermination(jobExecutorService, stopTimeoutMillis(), TimeUnit.MILLISECONDS);
+    }
+
+    int stopTimeoutMillis() {
+        return 10_000;

Review comment:
       Probably should be in the configuration

##########
File path: modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute;
+
+import java.lang.reflect.Constructor;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.JobExecutionContext;
+import org.apache.ignite.configuration.schemas.compute.ComputeConfiguration;
+import org.apache.ignite.internal.compute.message.ExecuteRequest;
+import org.apache.ignite.internal.compute.message.ExecuteResponse;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.NetworkAddress;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Implementation of {@link ComputeComponent}.
+ */
+public class ComputeComponentImpl implements ComputeComponent {
+    private static final long NETWORK_TIMEOUT_MILLIS = Long.MAX_VALUE;
+
+    private static final long THREAD_KEEP_ALIVE_SECONDS = 60;
+
+    private final Ignite ignite;
+    private final MessagingService messagingService;
+    private final ComputeConfiguration configuration;
+
+    private ExecutorService jobExecutorService;
+
+    private final ClassLoader jobClassLoader = Thread.currentThread().getContextClassLoader();
+
+    private final ComputeMessagesFactory messagesFactory = new ComputeMessagesFactory();
+
+    /** Busy lock to stop synchronously. */
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    /** Prevents double stopping the component. */
+    private final AtomicBoolean stopGuard = new AtomicBoolean();
+
+    /**
+     * Creates a new instance.
+     */
+    public ComputeComponentImpl(Ignite ignite, MessagingService messagingService, ComputeConfiguration configuration) {
+        this.ignite = ignite;
+        this.messagingService = messagingService;
+        this.configuration = configuration;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public <R> CompletableFuture<R> executeLocally(Class<? extends ComputeJob<R>> jobClass, Object... args) {
+        if (!busyLock.enterBusy()) {
+            return CompletableFuture.failedFuture(new NodeStoppingException());
+        }
+
+        try {
+            return doExecuteLocally(jobClass, args);
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public <R> CompletableFuture<R> executeLocally(String jobClassName, Object... args) {
+        return executeLocally(jobClass(jobClassName), args);
+    }
+
+    private <R> CompletableFuture<R> doExecuteLocally(Class<? extends ComputeJob<R>> jobClass, Object[] args) {
+        assert jobExecutorService != null : "Not started yet!";
+
+        try {
+            return CompletableFuture.supplyAsync(() -> executeOnCurrentThread(jobClass, args), jobExecutorService);
+        } catch (RejectedExecutionException e) {
+            return CompletableFuture.failedFuture(e);
+        }
+    }
+
+    private <R> R executeOnCurrentThread(Class<? extends ComputeJob<R>> jobClass, Object[] args) {

Review comment:
       Maybe just `executeJob`? Because this "currentThread" is always a thread from the executor

##########
File path: modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
##########
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
+import static org.hamcrest.Matchers.startsWith;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.time.Duration;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.JobExecutionContext;
+import org.apache.ignite.configuration.ConfigurationValue;
+import org.apache.ignite.configuration.schemas.compute.ComputeConfiguration;
+import org.apache.ignite.internal.compute.message.ExecuteRequest;
+import org.apache.ignite.internal.compute.message.ExecuteResponse;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.NetworkMessageHandler;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+@Timeout(10)
+class ComputeComponentImplTest {
+    private static final String INSTANCE_NAME = "Ignite-0";
+
+    @Mock
+    private Ignite ignite;
+
+    @Mock
+    private MessagingService messagingService;
+
+    @Mock
+    private ComputeConfiguration computeConfiguration;
+
+    @Mock
+    private ConfigurationValue<Integer> threadPoolSizeValue;
+
+    @InjectMocks
+    private ComputeComponentImpl computeComponent;
+
+    @Captor
+    private ArgumentCaptor<ExecuteRequest> executeRequestCaptor;
+    @Captor
+    private ArgumentCaptor<ExecuteResponse> executeResponseCaptor;
+
+    private final ClusterNode remoteNode = new ClusterNode("remote", "remote", new NetworkAddress("remote-host", 1, "remote"));
+
+    private final AtomicReference<NetworkMessageHandler> computeMessageHandlerRef = new AtomicReference<>();
+
+    private final AtomicBoolean responseSent = new AtomicBoolean(false);
+
+    @BeforeEach
+    void setUp() {
+        when(computeConfiguration.threadPoolSize()).thenReturn(threadPoolSizeValue);
+        when(threadPoolSizeValue.value()).thenReturn(8);
+
+        lenient().when(ignite.name()).thenReturn(INSTANCE_NAME);

Review comment:
       I haven't seen lenient in a while, why do we use it here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org