You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by dg...@apache.org on 2018/10/17 12:40:48 UTC
ignite git commit: IGNITE-9898 fix checkpoint thread hangs on await
async task completion - Fixes #5002.
Repository: ignite
Updated Branches:
refs/heads/master a1cb021c0 -> 10c2b10e6
IGNITE-9898 fix checkpoint thread hangs on await async task completion - Fixes #5002.
Signed-off-by: Dmitriy Govorukhin <dm...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/10c2b10e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/10c2b10e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/10c2b10e
Branch: refs/heads/master
Commit: 10c2b10e605d372e832b65da52d67dc9656b53c1
Parents: a1cb021
Author: Dmitriy Govorukhin <dm...@gmail.com>
Authored: Wed Oct 17 15:40:40 2018 +0300
Committer: Dmitriy Govorukhin <dm...@gmail.com>
Committed: Wed Oct 17 15:40:40 2018 +0300
----------------------------------------------------------------------
.../GridCacheDatabaseSharedManager.java | 47 ++---
.../ignite/internal/util/IgniteUtils.java | 22 +++
.../IgniteTaskTrackingThreadPoolExecutor.java | 180 -------------------
.../testsuites/IgniteUtilSelfTestSuite.java | 7 +-
...gniteTaskTrackingThreadPoolExecutorTest.java | 140 ---------------
5 files changed, 49 insertions(+), 347 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/10c2b10e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index d69b83c..e74954f 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -140,6 +140,7 @@ import org.apache.ignite.internal.util.GridMultiCollectionWrapper;
import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.future.CountDownFuture;
+import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridInClosure3X;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -159,7 +160,7 @@ import org.apache.ignite.lang.IgniteOutClosure;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.mxbean.DataStorageMetricsMXBean;
import org.apache.ignite.thread.IgniteThread;
-import org.apache.ignite.thread.IgniteTaskTrackingThreadPoolExecutor;
+import org.apache.ignite.thread.IgniteThreadPoolExecutor;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentLinkedHashMap;
@@ -290,7 +291,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
private boolean stopping;
/** Checkpoint runner thread pool. If null tasks are to be run in single thread */
- @Nullable private IgniteTaskTrackingThreadPoolExecutor asyncRunner;
+ @Nullable private IgniteThreadPoolExecutor asyncRunner;
/** Thread local with buffers for the checkpoint threads. Each buffer represent one page for durable memory. */
private ThreadLocal<ByteBuffer> threadBuf;
@@ -727,15 +728,13 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
*/
private void initDataBase() {
if (persistenceCfg.getCheckpointThreads() > 1)
- asyncRunner = new IgniteTaskTrackingThreadPoolExecutor(
+ asyncRunner = new IgniteThreadPoolExecutor(
CHECKPOINT_RUNNER_THREAD_PREFIX,
cctx.igniteInstanceName(),
persistenceCfg.getCheckpointThreads(),
persistenceCfg.getCheckpointThreads(),
- 30_000, // A value is ignored if corePoolSize equals to maxPoolSize
- new LinkedBlockingQueue<Runnable>(),
- GridIoPolicy.UNDEFINED,
- cctx.kernalContext().uncaughtExceptionHandler()
+ 30_000,
+ new LinkedBlockingQueue<Runnable>()
);
}
@@ -3585,8 +3584,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
final PartitionAllocationMap map = new PartitionAllocationMap();
- if (asyncRunner != null)
- asyncRunner.reset();
+ GridCompoundFuture asyncLsnrFut = asyncRunner == null ? null : new GridCompoundFuture();
DbCheckpointListener.Context ctx0 = new DbCheckpointListener.Context() {
@Override public boolean nextSnapshot() {
@@ -3607,10 +3605,14 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
@Override public Executor executor() {
return asyncRunner == null ? null : cmd -> {
try {
- asyncRunner.execute(cmd);
+ GridFutureAdapter<?> res = new GridFutureAdapter<>();
+
+ asyncRunner.execute(U.wrapIgniteFuture(cmd, res));
+
+ asyncLsnrFut.add(res);
}
catch (RejectedExecutionException e) {
- assert false: "A task should never be rejected by async runner";
+ assert false : "A task should never be rejected by async runner";
}
};
}
@@ -3620,17 +3622,16 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
for (DbCheckpointListener lsnr : lsnrs)
lsnr.onCheckpointBegin(ctx0);
- if (asyncRunner != null) {
- asyncRunner.markInitialized();
+ if (asyncLsnrFut != null) {
+ asyncLsnrFut.markInitialized();
- asyncRunner.awaitDone();
+ asyncLsnrFut.get();
}
if (curr.nextSnapshot)
snapFut = snapshotMgr.onMarkCheckPointBegin(curr.snapshotOperation, map);
- if (asyncRunner != null)
- asyncRunner.reset();
+ GridCompoundFuture grpHandleFut = asyncRunner == null ? null : new GridCompoundFuture();
for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
if (grp.isLocal() || !grp.walEnabled())
@@ -3662,17 +3663,21 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
r.run();
else
try {
- asyncRunner.execute(r);
+ GridFutureAdapter<?> res = new GridFutureAdapter<>();
+
+ asyncRunner.execute(U.wrapIgniteFuture(r, res));
+
+ grpHandleFut.add(res);
}
catch (RejectedExecutionException e) {
- assert false: "Task should never be rejected by async runner";
+ assert false : "Task should never be rejected by async runner";
}
}
- if (asyncRunner != null) {
- asyncRunner.markInitialized();
+ if (grpHandleFut != null) {
+ grpHandleFut.markInitialized();
- asyncRunner.awaitDone();
+ grpHandleFut.get();
}
cpPagesTuple = beginAllCheckpoints();
http://git-wip-us.apache.org/repos/asf/ignite/blob/10c2b10e/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 8dbea17..edb9871 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -207,6 +207,7 @@ import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException
import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.future.IgniteFutureImpl;
import org.apache.ignite.internal.util.io.GridFilenameUtils;
import org.apache.ignite.internal.util.ipc.shmem.IpcSharedMemoryNativeLoader;
@@ -10723,6 +10724,27 @@ public abstract class IgniteUtils {
/**
*
+ * @param r Runnable.
+ * @param fut Grid future apater.
+ * @return Runnable with wrapped future.
+ */
+ public static Runnable wrapIgniteFuture(Runnable r, GridFutureAdapter<?> fut) {
+ return () -> {
+ try {
+ r.run();
+
+ fut.onDone();
+ }
+ catch (Throwable e) {
+ fut.onDone(e);
+
+ throw e;
+ }
+ };
+ }
+
+ /**
+ *
*/
public static class ReentrantReadWriteLockTracer implements ReadWriteLock {
/** Read lock. */
http://git-wip-us.apache.org/repos/asf/ignite/blob/10c2b10e/modules/core/src/main/java/org/apache/ignite/thread/IgniteTaskTrackingThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/thread/IgniteTaskTrackingThreadPoolExecutor.java b/modules/core/src/main/java/org/apache/ignite/thread/IgniteTaskTrackingThreadPoolExecutor.java
deleted file mode 100644
index 6cae57e..0000000
--- a/modules/core/src/main/java/org/apache/ignite/thread/IgniteTaskTrackingThreadPoolExecutor.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.thread;
-
-import java.lang.Thread.UncaughtExceptionHandler;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.atomic.LongAdder;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.internal.managers.communication.GridIoPolicy;
-
-/**
- * An {@link ExecutorService} that executes submitted tasks using pooled grid threads.
- *
- * In addition to what it allows to track all enqueued tasks completion or failure during execution.
- */
-public class IgniteTaskTrackingThreadPoolExecutor extends IgniteThreadPoolExecutor {
- /** */
- private final LongAdder pendingTaskCnt = new LongAdder();
-
- /** */
- private final LongAdder completedTaskCnt = new LongAdder();
-
- /** */
- private volatile boolean initialized;
-
- /** */
- private volatile AtomicReference<Throwable> err = new AtomicReference<>();
-
- /**
- * Creates a new service with the given initial parameters.
- *
- * @param threadNamePrefix Will be added at the beginning of all created threads.
- * @param igniteInstanceName Must be the name of the grid.
- * @param corePoolSize The number of threads to keep in the pool, even if they are idle.
- * @param maxPoolSize The maximum number of threads to allow in the pool.
- * @param keepAliveTime When the number of threads is greater than the core, this is the maximum time
- * that excess idle threads will wait for new tasks before terminating.
- * @param workQ The queue to use for holding tasks before they are executed. This queue will hold only
- * runnable tasks submitted by the {@link #execute(Runnable)} method.
- */
- public IgniteTaskTrackingThreadPoolExecutor(String threadNamePrefix, String igniteInstanceName, int corePoolSize,
- int maxPoolSize, long keepAliveTime, BlockingQueue<Runnable> workQ) {
- super(threadNamePrefix, igniteInstanceName, corePoolSize, maxPoolSize, keepAliveTime, workQ);
- }
-
- /**
- * Creates a new service with the given initial parameters.
- *
- * @param threadNamePrefix Will be added at the beginning of all created threads.
- * @param igniteInstanceName Must be the name of the grid.
- * @param corePoolSize The number of threads to keep in the pool, even if they are idle.
- * @param maxPoolSize The maximum number of threads to allow in the pool.
- * @param keepAliveTime When the number of threads is greater than the core, this is the maximum time
- * that excess idle threads will wait for new tasks before terminating.
- * @param workQ The queue to use for holding tasks before they are executed. This queue will hold only
- * runnable tasks submitted by the {@link #execute(Runnable)} method.
- * @param plc {@link GridIoPolicy} for thread pool.
- * @param eHnd Uncaught exception handler for thread pool.
- */
- public IgniteTaskTrackingThreadPoolExecutor(String threadNamePrefix, String igniteInstanceName, int corePoolSize,
- int maxPoolSize, long keepAliveTime, BlockingQueue<Runnable> workQ, byte plc,
- UncaughtExceptionHandler eHnd) {
- super(threadNamePrefix, igniteInstanceName, corePoolSize, maxPoolSize, keepAliveTime, workQ, plc, eHnd);
- }
-
- /**
- * Creates a new service with the given initial parameters.
- *
- * @param corePoolSize The number of threads to keep in the pool, even if they are idle.
- * @param maxPoolSize The maximum number of threads to allow in the pool.
- * @param keepAliveTime When the number of threads is greater than the core, this is the maximum time
- * that excess idle threads will wait for new tasks before terminating.
- * @param workQ The queue to use for holding tasks before they are executed. This queue will hold only the
- * runnable tasks submitted by the {@link #execute(Runnable)} method.
- * @param threadFactory Thread factory.
- */
- public IgniteTaskTrackingThreadPoolExecutor(int corePoolSize, int maxPoolSize, long keepAliveTime,
- BlockingQueue<Runnable> workQ, ThreadFactory threadFactory) {
- super(corePoolSize, maxPoolSize, keepAliveTime, workQ, threadFactory);
- }
-
- /** {@inheritDoc} */
- @Override public void execute(Runnable cmd) {
- pendingTaskCnt.add(1);
-
- super.execute(cmd);
- }
-
- /** {@inheritDoc} */
- @Override protected void afterExecute(Runnable r, Throwable t) {
- super.afterExecute(r, t);
-
- completedTaskCnt.add(1);
-
- if (t != null && err.compareAndSet(null, t) || isDone()) {
- synchronized (this) {
- notifyAll();
- }
- }
- }
-
- /**
- * Mark this executor as initialized.
- * This method should be called when all required tasks are enqueued for execution.
- */
- public final void markInitialized() {
- initialized = true;
- }
-
- /**
- * Check error status.
- *
- * @return {@code True} if any task execution resulted in error.
- */
- @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
- public final boolean isError() {
- return err.get() != null;
- }
-
- /**
- * Check done status.
- *
- * @return {@code True} when all enqueued task are completed.
- */
- public final boolean isDone() {
- return initialized && completedTaskCnt.sum() == pendingTaskCnt.sum();
- }
-
- /**
- * Wait synchronously until all tasks are completed or error has occurred.
- *
- * @throws IgniteCheckedException if task execution resulted in error.
- */
- public final synchronized void awaitDone() throws IgniteCheckedException {
- // There are no guarantee what all enqueued tasks will be finished if an error has occurred.
- while(!isError() && !isDone()) {
- try {
- wait();
- }
- catch (InterruptedException e) {
- err.set(e);
-
- Thread.currentThread().interrupt();
- }
- }
-
- if (isError())
- throw new IgniteCheckedException("Task execution resulted in error", err.get());
- }
-
- /**
- * Reset tasks tracking context.
- * The method should be called before adding new tasks to the executor.
- */
- public final void reset() {
- initialized = false;
- completedTaskCnt.reset();
- pendingTaskCnt.reset();
- err.set(null);
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/10c2b10e/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java
index 673269b..a281662 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java
@@ -17,6 +17,7 @@
package org.apache.ignite.testsuites;
+import java.util.Set;
import junit.framework.TestSuite;
import org.apache.ignite.internal.commandline.CommandHandlerParsingTest;
import org.apache.ignite.internal.pagemem.impl.PageIdUtilsSelfTest;
@@ -57,14 +58,11 @@ import org.apache.ignite.util.GridSpinReadWriteLockSelfTest;
import org.apache.ignite.util.GridStringBuilderFactorySelfTest;
import org.apache.ignite.util.GridTopologyHeapSizeSelfTest;
import org.apache.ignite.util.GridTransientTest;
-import org.apache.ignite.util.IgniteTaskTrackingThreadPoolExecutorTest;
import org.apache.ignite.util.mbeans.GridMBeanDisableSelfTest;
import org.apache.ignite.util.mbeans.GridMBeanExoticNamesSelfTest;
import org.apache.ignite.util.mbeans.GridMBeanSelfTest;
import org.apache.ignite.util.mbeans.WorkersControlMXBeanTest;
-import java.util.Set;
-
/**
* Test suite for Ignite utility classes.
*/
@@ -141,9 +139,6 @@ public class IgniteUtilSelfTestSuite extends TestSuite {
// control.sh
suite.addTestSuite(CommandHandlerParsingTest.class);
- // Thread pool.
- suite.addTestSuite(IgniteTaskTrackingThreadPoolExecutorTest.class);
-
return suite;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/10c2b10e/modules/core/src/test/java/org/apache/ignite/util/IgniteTaskTrackingThreadPoolExecutorTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/util/IgniteTaskTrackingThreadPoolExecutorTest.java b/modules/core/src/test/java/org/apache/ignite/util/IgniteTaskTrackingThreadPoolExecutorTest.java
deleted file mode 100644
index 3db02b0..0000000
--- a/modules/core/src/test/java/org/apache/ignite/util/IgniteTaskTrackingThreadPoolExecutorTest.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.util;
-
-import java.util.List;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.atomic.LongAdder;
-import junit.framework.TestCase;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.managers.communication.GridIoPolicy;
-import org.apache.ignite.internal.util.typedef.X;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.thread.IgniteTaskTrackingThreadPoolExecutor;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Tests for tracking thread pool executor.
- */
-public class IgniteTaskTrackingThreadPoolExecutorTest extends TestCase {
- /** */
- private IgniteTaskTrackingThreadPoolExecutor executor;
-
- /** {@inheritDoc} */
- @Override protected void setUp() throws Exception {
- int procs = Runtime.getRuntime().availableProcessors();
-
- executor = new IgniteTaskTrackingThreadPoolExecutor("test", "default",
- procs * 2, procs * 2, 30_000, new LinkedBlockingQueue<>(), GridIoPolicy.UNDEFINED, (t, e) -> {
- // No-op.
- });
- }
-
- /** {@inheritDoc} */
- @Override protected void tearDown() throws Exception {
- List<Runnable> runnables = executor.shutdownNow();
-
- assertEquals("Some tasks are not completed", 0, runnables.size());
- }
-
- /** */
- public void testSimple() throws IgniteCheckedException {
- doTest(null);
- }
-
- /** */
- @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
- public void testWithException() throws IgniteCheckedException {
- int fail = 5555;
-
- try {
- doTest(fail);
-
- fail();
- }
- catch (Throwable t) {
- TestException cause = (TestException)X.getCause(t);
-
- assertEquals(fail, cause.idx);
- }
-
- AtomicReference<Throwable> err = U.field(executor, "err");
- err.set(null);
-
- executor.awaitDone();
- }
-
- /** */
- public void testReuse() throws IgniteCheckedException {
- long avg = 0;
-
- long warmUp = 30;
-
- int iters = 150;
-
- for (int i = 0; i < iters; i++) {
- long t1 = System.nanoTime();
-
- doTest(null);
-
- if (i >= warmUp)
- avg += System.nanoTime() - t1;
-
- executor.reset();
- }
-
- X.print("Average time per iteration: " + (avg / (iters - warmUp)) / 1000 / 1000. + " ms");
- }
-
- /** */
- private void doTest(@Nullable Integer fail) throws IgniteCheckedException {
- LongAdder cnt = new LongAdder();
-
- int exp = 100_000;
-
- for (int i = 0; i < exp; i++) {
- final int finalI = i;
- executor.execute(() -> {
- if (fail != null && fail == finalI)
- throw new TestException(finalI);
- else
- cnt.add(1);
- });
- }
-
- executor.markInitialized();
-
- executor.awaitDone();
-
- assertEquals("Counter is not as expected", exp, cnt.sum());
- }
-
- /** */
- private static class TestException extends RuntimeException {
- /** */
- final int idx;
-
- /**
- * @param idx Index.
- */
- public TestException(int idx) {
- this.idx = idx;
- }
- }
-}