You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tl...@apache.org on 2021/04/13 08:55:30 UTC
[ignite] branch sql-calcite updated: IGNITE-14262 Calcite
improvements. Fix close/cancel of the query execution (get rid of numerous
ExecutionCancelledException)
This is an automated email from the ASF dual-hosted git repository.
tledkov pushed a commit to branch sql-calcite
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/sql-calcite by this push:
new 44df9ee IGNITE-14262 Calcite improvements. Fix close/cancel of the query execution (get rid of numerous ExecutionCancelledException)
44df9ee is described below
commit 44df9ee220b24399a5fc36e1954cddbfecfc7d7b
Author: zstan <st...@gmail.com>
AuthorDate: Tue Apr 13 11:54:59 2021 +0300
IGNITE-14262 Calcite improvements. Fix close/cancel of the query execution (get rid of numerous ExecutionCancelledException)
---
.../query/calcite/exec/ExchangeServiceImpl.java | 39 ++++++++-----
.../query/calcite/exec/ExecutionContext.java | 28 ++++++++++
.../query/calcite/exec/ExecutionServiceImpl.java | 13 +++--
.../query/calcite/exec/MailboxRegistryImpl.java | 14 +++++
.../query/calcite/exec/QueryTaskExecutor.java | 13 +++++
.../query/calcite/exec/QueryTaskExecutorImpl.java | 6 ++
.../query/calcite/exec/rel/HashAggregateNode.java | 14 ++---
.../query/calcite/exec/rel/MergeJoinNode.java | 3 +-
.../query/calcite/exec/rel/RootNode.java | 8 ++-
.../query/calcite/exec/rel/ScanNode.java | 17 +++---
.../query/calcite/exec/rel/SortNode.java | 3 +
.../query/calcite/CalciteQueryProcessorTest.java | 64 ++++++++++++++++++++--
.../processors/query/calcite/CancelTest.java | 43 +--------------
.../processors/query/calcite/QueryChecker.java | 41 ++++++++++++++
.../thread/IgniteStripedThreadPoolExecutor.java | 15 +++++
15 files changed, 239 insertions(+), 82 deletions(-)
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
index c795694..02af657 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
@@ -17,10 +17,14 @@
package org.apache.ignite.internal.processors.query.calcite.exec;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashSet;
import java.util.List;
import java.util.Objects;
+import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import com.google.common.collect.ImmutableMap;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
@@ -157,36 +161,45 @@ public class ExchangeServiceImpl extends AbstractService implements ExchangeServ
/** */
protected void onMessage(UUID nodeId, InboxCloseMessage msg) {
Collection<Inbox<?>> inboxes = mailboxRegistry().inboxes(msg.queryId(), msg.fragmentId(), msg.exchangeId());
+
if (!F.isEmpty(inboxes)) {
- for (Inbox<?> inbox : inboxes) {
- inbox.context().cancel();
+ for (Inbox<?> inbox : inboxes)
inbox.context().execute(inbox::close, inbox::onError);
- }
}
else if (log.isDebugEnabled()) {
log.debug("Stale inbox cancel message received: [" +
- "nodeId=" + nodeId + ", " +
- "queryId=" + msg.queryId() + ", " +
- "fragmentId=" + msg.fragmentId() + ", " +
- "exchangeId=" + msg.exchangeId() + "]");
+ "nodeId=" + nodeId +
+ ", queryId=" + msg.queryId() +
+ ", fragmentId=" + msg.fragmentId() +
+ ", exchangeId=" + msg.exchangeId() + "]");
}
}
/** */
protected void onMessage(UUID nodeId, OutboxCloseMessage msg) {
Collection<Outbox<?>> outboxes = mailboxRegistry().outboxes(msg.queryId(), msg.fragmentId(), msg.exchangeId());
+
if (!F.isEmpty(outboxes)) {
+ List<CompletableFuture<?>> futs = new ArrayList<>(outboxes.size());
+
+ Set<ExecutionContext<?>> ctxs = new HashSet<>();
+
for (Outbox<?> outbox : outboxes) {
- outbox.context().cancel();
- outbox.context().execute(outbox::close, outbox::onError);
+ CompletableFuture<?> fut = outbox.context().submit(outbox::close, outbox::onError);
+
+ futs.add(fut);
+
+ ctxs.add(outbox.context());
}
+
+ CompletableFuture.allOf(futs.toArray(new CompletableFuture<?>[0])).thenRun(() -> ctxs.forEach(ExecutionContext::cancel));
}
else if (log.isDebugEnabled()) {
log.debug("Stale oubox cancel message received: [" +
- "nodeId=" + nodeId + ", " +
- "queryId=" + msg.queryId() + ", " +
- "fragmentId=" + msg.fragmentId() + ", " +
- "exchangeId=" + msg.exchangeId() + "]");
+ "nodeId=" + nodeId +
+ ", queryId=" + msg.queryId() +
+ ", fragmentId=" + msg.fragmentId() +
+ ", exchangeId=" + msg.exchangeId() + "]");
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
index 6d65beb..47d453b 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
@@ -21,6 +21,7 @@ import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
@@ -230,6 +231,9 @@ public class ExecutionContext<Row> implements DataContext {
* @param task Query task.
*/
public void execute(RunnableX task, Consumer<Throwable> onError) {
+ if (isCancelled())
+ return;
+
executor.execute(qryId, fragmentId(), () -> {
try {
task.run();
@@ -242,6 +246,29 @@ public class ExecutionContext<Row> implements DataContext {
});
}
+ /**
+ * Submits a Runnable task for execution and returns a Future
+ * representing that task. The Future's {@code get} method will
+ * return {@code null} upon <em>successful</em> completion.
+ *
+ * @param task the task to submit.
+ * @return a {@link CompletableFuture} representing pending task
+ */
+ public CompletableFuture<?> submit(RunnableX task, Consumer<Throwable> onError) {
+ assert !isCancelled() : "Call submit after execution was cancelled.";
+
+ return executor.submit(qryId, fragmentId(), () -> {
+ try {
+ task.run();
+ }
+ catch (Throwable e) {
+ onError.accept(e);
+
+ throw new IgniteException("Unexpected exception", e);
+ }
+ });
+ }
+
/** */
@FunctionalInterface
public interface RunnableX {
@@ -258,6 +285,7 @@ public class ExecutionContext<Row> implements DataContext {
return !cancelFlag.get() && cancelFlag.compareAndSet(false, true);
}
+ /** */
public boolean isCancelled() {
return cancelFlag.get();
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
index 354b926..fa97dbd 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
@@ -951,7 +951,7 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
private final Set<RemoteFragmentKey> waiting;
/** */
- private QueryState state;
+ private volatile QueryState state;
/** */
private QueryInfo(ExecutionContext<Row> ctx, MultiStepPlan plan, Node<Row> root) {
@@ -1001,10 +1001,7 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
if (state == QueryState.RUNNING)
state0 = state = QueryState.CLOSING;
- // 1) Cancel local fragment
- ctx.cancel();
-
- // 2) close local fragment
+ // 1) close local fragment
root.closeInternal();
if (state == QueryState.CLOSING && waiting.isEmpty())
@@ -1012,12 +1009,13 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
}
if (state0 == QueryState.CLOSED) {
- // 3) unregister runing query
+ // 2) unregister runing query
running.remove(ctx.queryId());
// 4) close remote fragments
IgniteException wrpEx = null;
+ // 3) close remote fragments
for (UUID nodeId : remotes) {
try {
exchangeService().closeOutbox(nodeId, ctx.queryId(), -1, -1);
@@ -1030,6 +1028,9 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
}
}
+ // 4) Cancel local fragment
+ ctx.cancel();
+
if (wrpEx != null)
throw wrpEx;
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistryImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistryImpl.java
index 39ace60..cbf7d95 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistryImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistryImpl.java
@@ -33,6 +33,8 @@ import org.apache.ignite.internal.processors.query.calcite.exec.rel.Inbox;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.Mailbox;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.Outbox;
import org.apache.ignite.internal.processors.query.calcite.util.AbstractService;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
import org.jetbrains.annotations.Nullable;
/**
@@ -49,9 +51,11 @@ public class MailboxRegistryImpl extends AbstractService implements MailboxRegis
private final Map<MailboxKey, Inbox<?>> remotes;
/** */
+ @GridToStringExclude
private final DiscoveryEventListener discoLsnr;
/** */
+ @GridToStringExclude
private GridEventStorageManager evtMgr;
/**
@@ -164,6 +168,11 @@ public class MailboxRegistryImpl extends AbstractService implements MailboxRegis
return filter;
}
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(MailboxRegistryImpl.class, this);
+ }
+
/** */
private static class MailboxKey {
/** */
@@ -198,5 +207,10 @@ public class MailboxRegistryImpl extends AbstractService implements MailboxRegis
res = 31 * res + (int) (exchangeId ^ (exchangeId >>> 32));
return res;
}
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(MailboxKey.class, this);
+ }
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/QueryTaskExecutor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/QueryTaskExecutor.java
index 4d7861c..830a4d2 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/QueryTaskExecutor.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/QueryTaskExecutor.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.query.calcite.exec;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.processors.query.calcite.util.Service;
@@ -33,4 +34,16 @@ public interface QueryTaskExecutor extends Service {
* @param qryTask Query task.
*/
void execute(UUID qryId, long fragmentId, Runnable qryTask);
+
+ /**
+ * Returns a new CompletableFuture that is asynchronously completed
+ * by a task running in the given executor after it runs the given
+ * action.
+ *
+ * @param qryId Id of the query this task created for.
+ * @param fragmentId Id of the particular fragment this task created for.
+ * @param qryTask The task to submit.
+ * @return the new CompletableFuture
+ */
+ CompletableFuture<?> submit(UUID qryId, long fragmentId, Runnable qryTask);
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/QueryTaskExecutorImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/QueryTaskExecutorImpl.java
index b0db389..9a31205 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/QueryTaskExecutorImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/QueryTaskExecutorImpl.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.query.calcite.exec;
import java.util.Objects;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor;
@@ -82,6 +83,11 @@ public class QueryTaskExecutorImpl extends AbstractService implements QueryTaskE
}
/** {@inheritDoc} */
+ @Override public CompletableFuture<?> submit(UUID qryId, long fragmentId, Runnable qryTask) {
+ return stripedThreadPoolExecutor.submit(qryTask, hash(qryId, fragmentId));
+ }
+
+ /** {@inheritDoc} */
@Override public void onStart(GridKernalContext ctx) {
exceptionHandler(ctx.uncaughtExceptionHandler());
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashAggregateNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashAggregateNode.java
index 1f4728e..0911e4c 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashAggregateNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashAggregateNode.java
@@ -111,7 +111,7 @@ public class HashAggregateNode<Row> extends AbstractNode<Row> implements SingleN
if (waiting == 0)
source().request(waiting = IN_BUFFER_SIZE);
else if (!inLoop)
- context().execute(this::doFlush, this::onError);
+ context().execute(this::flush, this::onError);
}
/** {@inheritDoc} */
@@ -158,14 +158,12 @@ public class HashAggregateNode<Row> extends AbstractNode<Row> implements SingleN
}
/** */
- private void doFlush() throws Exception {
- checkState();
+ private void flush() throws Exception {
+ if (isClosed())
+ return;
- flush();
- }
+ checkState();
- /** */
- private void flush() throws Exception {
assert waiting == -1;
int processed = 0;
@@ -189,7 +187,7 @@ public class HashAggregateNode<Row> extends AbstractNode<Row> implements SingleN
if (processed >= IN_BUFFER_SIZE && requested > 0) {
// allow others to do their job
- context().execute(this::doFlush, this::onError);
+ context().execute(this::flush, this::onError);
return;
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinNode.java
index e97968b85..3d291c7 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinNode.java
@@ -283,7 +283,8 @@ public abstract class MergeJoinNode<Row> extends AbstractNode<Row> {
@Override protected void join() throws Exception {
inLoop = true;
try {
- while (requested > 0 && (left != null || !leftInBuf.isEmpty()) && (right != null || !rightInBuf.isEmpty() || rightMaterialization != null)) {
+ while (requested > 0 && (left != null || !leftInBuf.isEmpty()) && (right != null || !rightInBuf.isEmpty()
+ || rightMaterialization != null)) {
checkState();
if (left == null)
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java
index a99693b..8b8838a 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java
@@ -22,6 +22,7 @@ import java.util.Deque;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.UUID;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
@@ -121,7 +122,12 @@ public class RootNode<Row> extends AbstractNode<Row> implements SingleNode<Row>,
/** {@inheritDoc} */
@Override public void closeInternal() {
- context().execute(() -> sources().forEach(U::closeQuiet), this::onError);
+ try {
+ context().submit(() -> sources().forEach(U::closeQuiet), this::onError).get();
+ }
+ catch (InterruptedException | ExecutionException e) {
+ U.warn(context().planningContext().logger(), "Execution is cancelled.", e);
+ }
}
/** {@inheritDoc} */
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java
index c4805d6..c6ed6d5 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java
@@ -59,14 +59,7 @@ public class ScanNode<Row> extends AbstractNode<Row> implements SingleNode<Row>
requested = rowsCnt;
if (!inLoop)
- context().execute(this::doPush, this::onError);
- }
-
- /** */
- private void doPush() throws Exception {
- checkState();
-
- push();
+ context().execute(this::push, this::onError);
}
/** {@inheritDoc} */
@@ -96,6 +89,11 @@ public class ScanNode<Row> extends AbstractNode<Row> implements SingleNode<Row>
/** */
private void push() throws Exception {
+ if (isClosed())
+ return;
+
+ checkState();
+
inLoop = true;
try {
if (it == null)
@@ -110,7 +108,7 @@ public class ScanNode<Row> extends AbstractNode<Row> implements SingleNode<Row>
if (++processed == IN_BUFFER_SIZE && requested > 0) {
// allow others to do their job
- context().execute(this::doPush, this::onError);
+ context().execute(this::push, this::onError);
return;
}
@@ -125,6 +123,7 @@ public class ScanNode<Row> extends AbstractNode<Row> implements SingleNode<Row>
it = null;
requested = 0;
+
downstream().end();
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortNode.java
index 9f7a31f..00f3fdb5 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortNode.java
@@ -109,6 +109,9 @@ public class SortNode<Row> extends AbstractNode<Row> implements SingleNode<Row>,
/** */
private void flush() throws Exception {
+ if (isClosed())
+ return;
+
assert waiting == -1;
int processed = 0;
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
index e5a94a9..bee1930 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
@@ -39,18 +39,27 @@ import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.processors.query.QueryEngine;
+import org.apache.ignite.internal.processors.query.calcite.exec.MailboxRegistryImpl;
+import org.apache.ignite.internal.processors.query.calcite.exec.rel.Inbox;
+import org.apache.ignite.internal.processors.query.calcite.exec.rel.Outbox;
import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.LogListener;
import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
+import static org.apache.ignite.internal.processors.query.calcite.QueryChecker.awaitReservationsRelease;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertThat;
@@ -62,6 +71,22 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
/** */
private static IgniteEx client;
+ /** Log listener. */
+ private ListeningTestLogger listeningLog = new ListeningTestLogger(log);
+
+ /** */
+ private static LogListener lsnr = LogListener.matches(s ->
+ s.contains("Execution is cancelled") ||
+ s.contains("NullPointer") ||
+ s.contains("AssertionError")).build();
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ listeningLog.registerListener(lsnr);
+
+ return super.getConfiguration(igniteInstanceName).setGridLogger(listeningLog);
+ }
+
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
startGrids(5);
@@ -140,9 +165,11 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
.setCacheMode(CacheMode.REPLICATED)
);
- Map<Integer, RISK> mRisk = new HashMap<>(65000);
+ int numRiskRows = 65_000;
+
+ Map<Integer, RISK> mRisk = new HashMap<>(numRiskRows);
- for (int i = 0; i < 65000; i++)
+ for (int i = 0; i < numRiskRows; i++)
mRisk.put(i, new RISK(i));
RISK.putAll(mRisk);
@@ -191,6 +218,33 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
assertEquals(40L, res.get(0).get(0));
}
}
+
+ assertFalse(lsnr.check());
+
+ listeningLog.clearListeners();
+
+ awaitReservationsRelease("RISK");
+ awaitReservationsRelease("TRADE");
+ awaitReservationsRelease("BATCH");
+ }
+
+ /**
+ * Checks all grids execution contexts are closed or registered (Out|In)boxes are present.
+ */
+ private void checkContextCancelled() throws IgniteInterruptedCheckedException {
+ for (Ignite instance : G.allGrids()) {
+ QueryEngine engineCli = Commons.lookupComponent(((IgniteEx)instance).context(), QueryEngine.class);
+
+ MailboxRegistryImpl mailReg = GridTestUtils.getFieldValue(engineCli, CalciteQueryProcessor.class, "mailboxRegistry");
+
+ Map<Object, Inbox<?>> remotes = GridTestUtils.getFieldValue(mailReg, MailboxRegistryImpl.class, "remotes");
+
+ Map<Object, Outbox<?>> locals = GridTestUtils.getFieldValue(mailReg, MailboxRegistryImpl.class, "locals");
+
+ waitForCondition(() -> remotes.isEmpty() || remotes.values().stream().allMatch(s -> s.context().isCancelled()), 5_000);
+
+ waitForCondition(() -> locals.isEmpty() || locals.values().stream().allMatch(s -> s.context().isCancelled()), 5_000);
+ }
}
/** */
@@ -1005,8 +1059,8 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
}
/** */
- private List<List<?>> sql(String sql) {
- QueryEngine engineSrv = Commons.lookupComponent(grid(1).context(), QueryEngine.class);
+ private List<List<?>> sql(String sql) throws IgniteInterruptedCheckedException {
+ QueryEngine engineSrv = Commons.lookupComponent(grid(0).context(), QueryEngine.class);
assertTrue(client.configuration().isClientMode());
@@ -1022,6 +1076,8 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
allSrv = srvCursor.getAll();
assertEquals(allSrv.size(), cliCursor.getAll().size());
+
+ checkContextCancelled();
}
return allSrv;
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CancelTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CancelTest.java
index 48af61c..b20c3c3 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CancelTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CancelTest.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.query.calcite;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
-import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.QueryEntity;
@@ -29,30 +28,24 @@ import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache;
-import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.QueryEngine;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
-import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Ignore;
import org.junit.Test;
import static java.util.Collections.singletonList;
import static org.apache.ignite.cache.query.QueryCancelledException.ERR_MSG;
+import static org.apache.ignite.internal.processors.query.calcite.QueryChecker.awaitReservationsRelease;
/**
* Cancel query test.
*/
public class CancelTest extends GridCommonAbstractTest {
- /** Partition release timeout. */
- private static final long PART_RELEASE_TIMEOUT = 5000L;
-
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
startGrids(2);
@@ -121,6 +114,7 @@ public class CancelTest extends GridCommonAbstractTest {
/**
*
*/
+ @Ignore("https://issues.apache.org/jira/browse/IGNITE-14289")
@Test
public void testNotOriginatorNodeStop() throws Exception {
QueryEngine engine = Commons.lookupComponent(grid(0).context(), QueryEngine.class);
@@ -244,35 +238,4 @@ public class CancelTest extends GridCommonAbstractTest {
awaitPartitionMapExchange();
}
-
- /**
- * @param cacheName Cache to check
- * @throws IgniteInterruptedCheckedException
- */
- void awaitReservationsRelease(String cacheName) throws IgniteInterruptedCheckedException {
- for (Ignite ign : G.allGrids())
- awaitReservationsRelease((IgniteEx)ign, "TEST");
- }
-
- /**
- * @param node Node to check reservation.
- * @param cacheName Cache to check reservations.
- */
- void awaitReservationsRelease(IgniteEx node, String cacheName) throws IgniteInterruptedCheckedException {
- GridDhtAtomicCache c = GridTestUtils.getFieldValue(node.cachex(cacheName), "delegate");
-
- List<GridDhtLocalPartition> parts = c.topology().localPartitions();
-
- GridTestUtils.waitForCondition(() -> {
- for (GridDhtLocalPartition p : parts) {
- if (p.reservations() > 0)
- return false;
- }
-
- return true;
- }, PART_RELEASE_TIMEOUT);
-
- for (GridDhtLocalPartition p : parts)
- assertEquals("Partition is reserved: " + p, 0, p.reservations());
- }
}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/QueryChecker.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/QueryChecker.java
index c50b1ea..67a25cb 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/QueryChecker.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/QueryChecker.java
@@ -28,10 +28,17 @@ import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import org.apache.ignite.Ignite;
import org.apache.ignite.cache.query.FieldsQueryCursor;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.query.QueryEngine;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.testframework.GridTestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.core.SubstringMatcher;
@@ -45,6 +52,9 @@ import static org.junit.Assert.fail;
* Query checker.
*/
public abstract class QueryChecker {
+ /** Partition release timeout. */
+ private static final long PART_RELEASE_TIMEOUT = 5_000L;
+
/**
* Ignite table scan matcher.
*
@@ -407,4 +417,35 @@ public abstract class QueryChecker {
return 0;
}
}
+
+ /**
+ * @param cacheName Cache to check
+ * @throws IgniteInterruptedCheckedException
+ */
+ public static void awaitReservationsRelease(String cacheName) throws IgniteInterruptedCheckedException {
+ for (Ignite ign : G.allGrids())
+ awaitReservationsRelease((IgniteEx)ign, cacheName);
+ }
+
+ /**
+ * @param node Node to check reservation.
+ * @param cacheName Cache to check reservations.
+ */
+ public static void awaitReservationsRelease(IgniteEx node, String cacheName) throws IgniteInterruptedCheckedException {
+ GridDhtAtomicCache c = GridTestUtils.getFieldValue(node.cachex(cacheName), "delegate");
+
+ List<GridDhtLocalPartition> parts = c.topology().localPartitions();
+
+ GridTestUtils.waitForCondition(() -> {
+ for (GridDhtLocalPartition p : parts) {
+ if (p.reservations() > 0)
+ return false;
+ }
+
+ return true;
+ }, PART_RELEASE_TIMEOUT);
+
+ for (GridDhtLocalPartition p : parts)
+ assertEquals("Partition is reserved: " + p, 0, p.reservations());
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java b/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java
index 6c1c62f..114120b 100644
--- a/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java
+++ b/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java
@@ -23,6 +23,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
@@ -90,6 +91,20 @@ public class IgniteStripedThreadPoolExecutor implements ExecutorService {
}
/**
+ * Submits a {@link Runnable} task for execution and returns a {@link CompletableFuture} representing that task.
+ * The command with the same {@code index} will be executed in the same thread.
+ *
+ * @param task The task to submit.
+ * @return a {@link Future} representing pending completion of the task.
+ * @throws RejectedExecutionException if the task cannot be
+ * scheduled for execution.
+ * @throws NullPointerException if the task is {@code null}.
+ */
+ public CompletableFuture<?> submit(Runnable task, int idx) {
+ return CompletableFuture.runAsync(task, execs[threadId(idx)]);
+ }
+
+ /**
* @param idx Index.
* @return Stripped thread ID.
*/