You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2017/07/14 17:15:04 UTC
ignite git commit: IGNITE-5452: GridTimeoutProcessor can hang on
stop. This closes #2279.
Repository: ignite
Updated Branches:
refs/heads/master 07cc05f51 -> b95c261f0
IGNITE-5452: GridTimeoutProcessor can hang on stop. This closes #2279.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b95c261f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b95c261f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b95c261f
Branch: refs/heads/master
Commit: b95c261f0b1376e8523dd1d89f253a5874dbf63b
Parents: 07cc05f
Author: Andrey V. Mashenkov <an...@gmail.com>
Authored: Fri Jul 14 20:14:47 2017 +0300
Committer: Andrey V. Mashenkov <an...@gmail.com>
Committed: Fri Jul 14 20:14:47 2017 +0300
----------------------------------------------------------------------
.../timeout/GridTimeoutProcessor.java | 18 +-
.../IgniteTxRemoveTimeoutObjectsTest.java | 194 +++++++++++++++++++
.../timeout/GridTimeoutProcessorSelfTest.java | 68 +++++--
.../testsuites/IgniteCacheTestSuite3.java | 4 +-
4 files changed, 265 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/b95c261f/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java
index 9deca9a..8c71f76 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java
@@ -37,7 +37,7 @@ import org.apache.ignite.thread.IgniteThread;
*/
public class GridTimeoutProcessor extends GridProcessorAdapter {
/** */
- private final IgniteThread timeoutWorker;
+ private final TimeoutWorker timeoutWorker;
/** Time-based sorted set for timeout objects. */
private final GridConcurrentSkipListSet<GridTimeoutObject> timeoutObjs =
@@ -62,13 +62,12 @@ public class GridTimeoutProcessor extends GridProcessorAdapter {
public GridTimeoutProcessor(GridKernalContext ctx) {
super(ctx);
- timeoutWorker = new IgniteThread(ctx.config().getIgniteInstanceName(), "grid-timeout-worker",
- new TimeoutWorker());
+ timeoutWorker = new TimeoutWorker();
}
/** {@inheritDoc} */
@Override public void start() {
- timeoutWorker.start();
+ new IgniteThread(timeoutWorker).start();
if (log.isDebugEnabled())
log.debug("Timeout processor started.");
@@ -76,7 +75,7 @@ public class GridTimeoutProcessor extends GridProcessorAdapter {
/** {@inheritDoc} */
@Override public void stop(boolean cancel) throws IgniteCheckedException {
- U.interrupt(timeoutWorker);
+ timeoutWorker.cancel();
U.join(timeoutWorker);
if (log.isDebugEnabled())
@@ -159,6 +158,13 @@ public class GridTimeoutProcessor extends GridProcessorAdapter {
timeoutObj.onTimeout();
}
catch (Throwable e) {
+ if (isCancelled() && !(e instanceof Error)){
+ if (log.isDebugEnabled())
+ log.debug("Error when executing timeout callback: " + timeoutObj);
+
+ return;
+ }
+
U.error(log, "Error when executing timeout callback: " + timeoutObj, e);
if (e instanceof Error)
@@ -170,7 +176,7 @@ public class GridTimeoutProcessor extends GridProcessorAdapter {
}
synchronized (mux) {
- while (true) {
+ while (!isCancelled()) {
// Access of the first element must be inside of
// synchronization block, so we don't miss out
// on thread notification events sent from
http://git-wip-us.apache.org/repos/asf/ignite/blob/b95c261f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxRemoveTimeoutObjectsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxRemoveTimeoutObjectsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxRemoveTimeoutObjectsTest.java
new file mode 100644
index 0000000..c0f9940
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxRemoveTimeoutObjectsTest.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.GridCacheAbstractSelfTest;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionTimeoutException;
+
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
+
+/**
+ * Test correctness of rollback a transaction with timeout during the grid stop.
+ */
+public class IgniteTxRemoveTimeoutObjectsTest extends GridCacheAbstractSelfTest {
+ /** */
+ private static final int PUT_CNT = 1000;
+
+ /** {@inheritDoc} */
+ @Override protected int gridCount() {
+ return 3;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return 60_000;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxRemoveTimeoutObjects() throws Exception {
+ IgniteCache<Integer, Integer> cache0 = grid(0).cache(DEFAULT_CACHE_NAME);
+ IgniteCache<Integer, Integer> cache1 = grid(1).cache(DEFAULT_CACHE_NAME);
+
+ // start additional grid to be closed.
+ IgniteCache<Integer, Integer> cacheAdditional = startGrid(gridCount()).cache(DEFAULT_CACHE_NAME);
+
+ for (int i = 0; i < PUT_CNT; i++)
+ cache0.put(i, Integer.MAX_VALUE);
+
+ logTimeoutObjectsFrequency();
+
+ info("Tx1 started");
+ try (Transaction tx = grid(gridCount()).transactions().txStart(PESSIMISTIC, SERIALIZABLE, 100, PUT_CNT)) {
+ try {
+ for (int i = 0; i < PUT_CNT; i++) {
+ cacheAdditional.put(i, Integer.MIN_VALUE);
+
+ if (i % 100 == 0)
+ logTimeoutObjectsFrequency();
+ }
+
+ U.sleep(200);
+
+ tx.commit();
+
+ fail("A timeout should have happened.");
+ }
+ catch (Exception e) {
+ assertTrue(X.hasCause(e, TransactionTimeoutException.class));
+ }
+ }
+
+ assertDoesNotContainLockTimeoutObjects();
+
+ logTimeoutObjectsFrequency();
+
+ stopGrid(gridCount());
+
+ awaitPartitionMapExchange();
+
+ info("Grid2 closed.");
+
+ assertDoesNotContainLockTimeoutObjects();
+
+ logTimeoutObjectsFrequency();
+
+ // Check that the values have not changed and lock can be acquired.
+ try (Transaction tx2 = grid(1).transactions().txStart(PESSIMISTIC, SERIALIZABLE)) {
+ info("Tx2 started");
+
+ for (int i = 0; i < PUT_CNT; i++) {
+ assertEquals(cache1.get(i).intValue(), Integer.MAX_VALUE);
+ cache1.put(i, i);
+
+ if (i % (PUT_CNT / 5) == 0)
+ logTimeoutObjectsFrequency();
+ }
+
+ tx2.commit();
+ }
+
+ info("Tx2 stopped");
+
+ // Check that that changes committed.
+ for (int i = 0; i < PUT_CNT; i++)
+ assertEquals(cache0.get(i).intValue(), i);
+ }
+
+ /**
+ * Fails if at least one grid contains LockTimeoutObjects.
+ */
+ private void assertDoesNotContainLockTimeoutObjects() {
+ for (Ignite ignite : G.allGrids()) {
+ for (GridTimeoutObject object : getTimeoutObjects((IgniteEx)ignite)) {
+ if (object.getClass().getSimpleName().equals("LockTimeoutObject"))
+ fail("Grids contain LockTimeoutObjects.");
+ }
+ }
+ }
+
+ /**
+ * Print the number of each timeout object type on each grid to the log.
+ */
+ private void logTimeoutObjectsFrequency() {
+ StringBuilder sb = new StringBuilder("Timeout objects frequency [");
+
+ for (Ignite ignite : G.allGrids()) {
+ IgniteEx igniteEx = (IgniteEx)ignite;
+
+ Map<String, Integer> objFreqMap = new HashMap<>();
+
+ Set<GridTimeoutObject> objs = getTimeoutObjects(igniteEx);
+
+ for (GridTimeoutObject obj : objs) {
+ String clsName = obj.getClass().getSimpleName();
+
+ Integer cnt = objFreqMap.get(clsName);
+
+ if (cnt == null)
+ objFreqMap.put(clsName, 1);
+ else
+ objFreqMap.put(clsName, cnt + 1);
+ }
+
+ sb.append("[")
+ .append(igniteEx.name()).append(": size=")
+ .append(objs.size()).append(", ");
+
+ for (Map.Entry<String, Integer> entry : objFreqMap.entrySet()) {
+ sb.append(entry.getKey()).append("=")
+ .append(entry.getValue())
+ .append(", ");
+ }
+
+ sb.delete(sb.length() - 2, sb.length())
+ .append("]; ");
+ }
+
+ sb.delete(sb.length() - 2, sb.length())
+ .append("]");
+
+ info(sb.toString()
+ .replaceAll("distributed.IgniteTxRollbackOnStopTest", "Grid"));
+ }
+
+ /**
+ * @param igniteEx IgniteEx.
+ * @return Set of timeout objects that process on current IgniteEx.
+ */
+ private Set<GridTimeoutObject> getTimeoutObjects(IgniteEx igniteEx) {
+ GridTimeoutProcessor timeout = igniteEx.context().timeout();
+
+ return GridTestUtils.getFieldValue(timeout, timeout.getClass(), "timeoutObjs");
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b95c261f/modules/core/src/test/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessorSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessorSelfTest.java
index eb248cf..606b102 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessorSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessorSelfTest.java
@@ -41,6 +41,11 @@ public class GridTimeoutProcessorSelfTest extends GridCommonAbstractTest {
private GridTestKernalContext ctx;
/** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return 60_000;
+ }
+
+ /** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
ctx = newContext();
@@ -84,7 +89,9 @@ public class GridTimeoutProcessorSelfTest extends GridCommonAbstractTest {
}
/** {@inheritDoc} */
- @Override public long endTime() { return endTime; }
+ @Override public long endTime() {
+ return endTime;
+ }
/** {@inheritDoc} */
@Override public void onTimeout() {
@@ -152,10 +159,14 @@ public class GridTimeoutProcessorSelfTest extends GridCommonAbstractTest {
private final long endTime = System.currentTimeMillis() + RAND.nextInt(1000) + 500;
/** {@inheritDoc} */
- @Override public IgniteUuid timeoutId() { return id; }
+ @Override public IgniteUuid timeoutId() {
+ return id;
+ }
/** {@inheritDoc} */
- @Override public long endTime() { return endTime; }
+ @Override public long endTime() {
+ return endTime;
+ }
/** {@inheritDoc} */
@Override public void onTimeout() {
@@ -307,9 +318,8 @@ public class GridTimeoutProcessorSelfTest extends GridCommonAbstractTest {
assert timeObjs.size() == max;
// Remove timeout objects so that they aren't able to times out (supposing the cycle takes less than 500 ms).
- for (GridTimeoutObject obj : timeObjs) {
+ for (GridTimeoutObject obj : timeObjs)
ctx.timeout().removeTimeoutObject(obj);
- }
Thread.sleep(1000);
@@ -350,7 +360,9 @@ public class GridTimeoutProcessorSelfTest extends GridCommonAbstractTest {
}
/** {@inheritDoc} */
- @Override public long endTime() { return endTime; }
+ @Override public long endTime() {
+ return endTime;
+ }
/** {@inheritDoc} */
@Override public void onTimeout() {
@@ -370,9 +382,8 @@ public class GridTimeoutProcessorSelfTest extends GridCommonAbstractTest {
// Remove timeout objects so that they aren't able to times out
// (supposing the cycle takes less than 500 ms).
- for (GridTimeoutObject obj : timeObjs) {
+ for (GridTimeoutObject obj : timeObjs)
ctx.timeout().removeTimeoutObject(obj);
- }
}
}, threads, "timeout-test-worker");
@@ -381,6 +392,9 @@ public class GridTimeoutProcessorSelfTest extends GridCommonAbstractTest {
assert callCnt.get() == 0;
}
+ /**
+ * @throws Exception If test failed.
+ */
public void testAddRemoveInterleaving() throws Exception {
final AtomicInteger callCnt = new AtomicInteger(0);
@@ -430,9 +444,8 @@ public class GridTimeoutProcessorSelfTest extends GridCommonAbstractTest {
// Remove timeout objects so that they aren't able to times out
// (supposing the cycle takes less than 500 ms).
- for (GridTimeoutObject obj : timeObjs) {
+ for (GridTimeoutObject obj : timeObjs)
ctx.timeout().removeTimeoutObject(obj);
- }
}
}, 100, "timeout-test-worker");
@@ -516,10 +529,14 @@ public class GridTimeoutProcessorSelfTest extends GridCommonAbstractTest {
private int cnt;
/** {@inheritDoc} */
- @Override public IgniteUuid timeoutId() { return id; }
+ @Override public IgniteUuid timeoutId() {
+ return id;
+ }
/** {@inheritDoc} */
- @Override public long endTime() { return endTime; }
+ @Override public long endTime() {
+ return endTime;
+ }
/** {@inheritDoc} */
@Override public void onTimeout() {
@@ -608,4 +625,31 @@ public class GridTimeoutProcessorSelfTest extends GridCommonAbstractTest {
assert latch.await(3000, MILLISECONDS);
}
+
+ /**
+ * Test that eaten {@link InterruptedException} will not hang on the closing of the grid.
+ *
+ * @throws Exception If test failed.
+ */
+ public void testCancelingWithClearedInterruptedFlag() throws Exception {
+ final CountDownLatch onTimeoutCalled = new CountDownLatch(1);
+
+ ctx.timeout().addTimeoutObject(new GridTimeoutObjectAdapter(10) {
+ /** {@inheritDoc} */
+ @Override public void onTimeout() {
+ try {
+ onTimeoutCalled.countDown();
+
+ // Wait for CacheProcessor has stopped and cause InterruptedException
+ // which clears interrupted flag.
+ Thread.sleep(Long.MAX_VALUE);
+ }
+ catch (InterruptedException ignore) {
+ // No-op.
+ }
+ }
+ });
+
+ onTimeoutCalled.await();
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/b95c261f/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
index 58e9dc3..a6be07e 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
@@ -36,9 +36,9 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheGroupsTest;
import org.apache.ignite.internal.processors.cache.IgniteCacheInterceptorSelfTestSuite;
import org.apache.ignite.internal.processors.cache.IgniteCacheScanPredicateDeploymentSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.CacheAsyncOperationsTest;
-import org.apache.ignite.internal.processors.cache.distributed.CacheGroupsPreloadTest;
import org.apache.ignite.internal.processors.cache.distributed.GridCacheMixedModeSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.IgniteTxGetAfterStopTest;
+import org.apache.ignite.internal.processors.cache.distributed.IgniteTxRemoveTimeoutObjectsTest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheDaemonNodePartitionedSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePartitionedOnlyP2PDisabledByteArrayValuesSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePartitionedOnlyP2PEnabledByteArrayValuesSelfTest;
@@ -199,6 +199,8 @@ public class IgniteCacheTestSuite3 extends TestSuite {
suite.addTestSuite(CacheAsyncOperationsTest.class);
+ suite.addTestSuite(IgniteTxRemoveTimeoutObjectsTest.class);
+
return suite;
}
}