You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ir...@apache.org on 2018/10/18 11:57:04 UTC
[1/3] ignite git commit: IGNITE-9082 Throwing checked exception
during tx commit without node stopping leads to data corruption - Fixes
#4809.
Repository: ignite
Updated Branches:
refs/heads/master 829dc1f24 -> 5eb871e19
http://git-wip-us.apache.org/repos/asf/ignite/blob/5eb871e1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDataConsistencyOnCommitFailureTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDataConsistencyOnCommitFailureTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDataConsistencyOnCommitFailureTest.java
new file mode 100644
index 0000000..881c680
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDataConsistencyOnCommitFailureTest.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.transactions;
+
+import java.util.UUID;
+import java.util.function.Supplier;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteTransactions;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.testsuites.IgniteIgnore;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+import org.jetbrains.annotations.Nullable;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/**
+ * Tests data consistency if transaction is failed due to heuristic exception on originating node.
+ */
+public class TxDataConsistencyOnCommitFailureTest extends GridCommonAbstractTest {
+ /** */
+ public static final int KEY = 0;
+
+ /** */
+ public static final String CLIENT = "client";
+
+ /** */
+ private int nodesCnt;
+
+ /** */
+ private int backups;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setClientMode(igniteInstanceName.startsWith(CLIENT));
+
+ cfg.setCacheConfiguration(new CacheConfiguration(DEFAULT_CACHE_NAME).
+ setCacheMode(CacheMode.PARTITIONED).
+ setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL).
+ setBackups(backups).
+ setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC));
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+ }
+
+ /** */
+ public void testCommitErrorOnNearNode2PC() throws Exception {
+ nodesCnt = 3;
+
+ backups = 2;
+
+ doTestCommitError(() -> {
+ try {
+ return startGrid("client");
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ /** */
+ public void testCommitErrorOnNearNode1PC() throws Exception {
+ nodesCnt = 2;
+
+ backups = 1;
+
+ doTestCommitError(() -> {
+ try {
+ return startGrid("client");
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ /** */
+ @IgniteIgnore(value = "https://issues.apache.org/jira/browse/IGNITE-9806", forceFailure = false)
+ public void testCommitErrorOnColocatedNode2PC() throws Exception {
+ nodesCnt = 3;
+
+ backups = 2;
+
+ doTestCommitError(() -> primaryNode(KEY, DEFAULT_CACHE_NAME));
+ }
+
+ /**
+ * @param factory Factory.
+ */
+ private void doTestCommitError(Supplier<Ignite> factory) throws Exception {
+ Ignite crd = startGridsMultiThreaded(nodesCnt);
+
+ crd.cache(DEFAULT_CACHE_NAME).put(KEY, KEY);
+
+ Ignite ignite = factory.get();
+
+ if (ignite == null)
+ ignite = startGrid("client");
+
+ assertNotNull(ignite.cache(DEFAULT_CACHE_NAME));
+
+ injectMockedTxManager(ignite);
+
+ checkKey();
+
+ IgniteTransactions transactions = ignite.transactions();
+
+ try(Transaction tx = transactions.txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ, 0, 1)) {
+ assertNotNull(transactions.tx());
+
+ ignite.cache(DEFAULT_CACHE_NAME).put(KEY, KEY + 1);
+
+ tx.commit();
+
+ fail();
+ }
+ catch (Exception t) {
+ // No-op.
+ }
+
+ checkKey();
+
+ checkFutures();
+ }
+
+ /**
+ * @param ignite Ignite.
+ */
+ private void injectMockedTxManager(Ignite ignite) {
+ IgniteEx igniteEx = (IgniteEx)ignite;
+
+ GridCacheSharedContext<Object, Object> ctx = igniteEx.context().cache().context();
+
+ IgniteTxManager tm = ctx.tm();
+
+ IgniteTxManager mockTm = Mockito.spy(tm);
+
+ MockGridNearTxLocal locTx = new MockGridNearTxLocal(ctx, false, false, false, GridIoPolicy.SYSTEM_POOL,
+ TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ, 0, true, null, 1, null, 0, null);
+
+ Mockito.doAnswer(new Answer<GridNearTxLocal>() {
+ @Override public GridNearTxLocal answer(InvocationOnMock invocation) throws Throwable {
+ mockTm.onCreated(null, locTx);
+
+ return locTx;
+ }
+ }).when(mockTm).
+ newTx(locTx.implicit(), locTx.implicitSingle(), null, locTx.concurrency(),
+ locTx.isolation(), locTx.timeout(), locTx.storeEnabled(), null, locTx.size(), locTx.label());
+
+ ctx.setTxManager(mockTm);
+ }
+
+ /** */
+ private void checkKey() {
+ for (Ignite ignite : G.allGrids()) {
+ if (!ignite.configuration().isClientMode())
+ assertNotNull(ignite.cache(DEFAULT_CACHE_NAME).localPeek(KEY));
+ }
+ }
+
+ /** */
+ private static class MockGridNearTxLocal extends GridNearTxLocal {
+ /** Empty constructor. */
+ public MockGridNearTxLocal() {
+ }
+
+ /**
+ * @param ctx Context.
+ * @param implicit Implicit.
+ * @param implicitSingle Implicit single.
+ * @param sys System.
+ * @param plc Policy.
+ * @param concurrency Concurrency.
+ * @param isolation Isolation.
+ * @param timeout Timeout.
+ * @param storeEnabled Store enabled.
+ * @param mvccOp Mvcc op.
+ * @param txSize Tx size.
+ * @param subjId Subj id.
+ * @param taskNameHash Task name hash.
+ * @param lb Label.
+ */
+ public MockGridNearTxLocal(GridCacheSharedContext ctx, boolean implicit, boolean implicitSingle, boolean sys,
+ byte plc, TransactionConcurrency concurrency, TransactionIsolation isolation, long timeout,
+ boolean storeEnabled, Boolean mvccOp, int txSize, @Nullable UUID subjId, int taskNameHash, @Nullable String lb) {
+ super(ctx, implicit, implicitSingle, sys, plc, concurrency, isolation, timeout, storeEnabled, mvccOp,
+ txSize, subjId, taskNameHash, lb);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void userCommit() throws IgniteCheckedException {
+ throw new IgniteCheckedException("Force failure");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5eb871e1/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
index 7e98ec7..7091a09 100755
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
@@ -1996,17 +1996,26 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
final Collection<GridCacheFuture<?>> futs = ig.context().cache().context().mvcc().activeFutures();
- for (GridCacheFuture<?> fut : futs)
- log.info("Waiting for future: " + fut);
+ boolean hasFutures = false;
- assertTrue("Expecting no active futures: node=" + ig.localNode().id(), futs.isEmpty());
+ for (GridCacheFuture<?> fut : futs) {
+ if (!fut.isDone()) {
+ log.error("Expecting no active future [node=" + ig.localNode().id() + ", fut=" + fut + ']');
+
+ hasFutures = true;
+ }
+ }
+
+ if (hasFutures)
+ fail("Some mvcc futures are not finished");
Collection<IgniteInternalTx> txs = ig.context().cache().context().tm().activeTransactions();
for (IgniteInternalTx tx : txs)
- log.info("Waiting for tx: " + tx);
+ log.error("Expecting no active transaction [node=" + ig.localNode().id() + ", tx=" + tx + ']');
- assertTrue("Expecting no active transactions: node=" + ig.localNode().id(), txs.isEmpty());
+ if (!txs.isEmpty())
+ fail("Some transaction are not finished");
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5eb871e1/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite9.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite9.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite9.java
index 386b17b..7dba461 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite9.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite9.java
@@ -25,6 +25,7 @@ import org.apache.ignite.internal.processors.cache.distributed.CacheAtomicPrimar
import org.apache.ignite.internal.processors.cache.distributed.IgniteCachePrimarySyncTest;
import org.apache.ignite.internal.processors.cache.distributed.IgniteTxCachePrimarySyncTest;
import org.apache.ignite.internal.processors.cache.distributed.IgniteTxCacheWriteSynchronizationModesMultithreadedTest;
+import org.apache.ignite.internal.processors.cache.transactions.TxDataConsistencyOnCommitFailureTest;
import org.apache.ignite.testframework.junits.GridAbstractTest;
/**
@@ -49,6 +50,8 @@ public class IgniteCacheTestSuite9 extends TestSuite {
suite.addTestSuite(CacheAtomicPrimarySyncBackPressureTest.class);
+ suite.addTestSuite(TxDataConsistencyOnCommitFailureTest.class);
+
return suite;
}
}
[3/3] ignite git commit: IGNITE-9082 Throwing checked exception
during tx commit without node stopping leads to data corruption - Fixes
#4809.
Posted by ir...@apache.org.
IGNITE-9082 Throwing checked exception during tx commit without node stopping leads to data corruption - Fixes #4809.
Signed-off-by: Ivan Rakov <ir...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5eb871e1
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5eb871e1
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5eb871e1
Branch: refs/heads/master
Commit: 5eb871e191a14fc21f6e2c62bdfa742e27c14695
Parents: 829dc1f
Author: Aleksei Scherbakov <al...@gmail.com>
Authored: Thu Oct 18 14:52:34 2018 +0300
Committer: Ivan Rakov <ir...@apache.org>
Committed: Thu Oct 18 14:52:34 2018 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheMapEntry.java | 5 -
.../cache/GridCacheSharedContext.java | 9 +
.../GridDistributedTxRemoteAdapter.java | 535 +++++++++----------
.../distributed/dht/GridDhtTxFinishFuture.java | 11 +-
.../cache/distributed/dht/GridDhtTxLocal.java | 10 +-
.../distributed/dht/GridDhtTxLocalAdapter.java | 3 +
.../distributed/dht/GridDhtTxPrepareFuture.java | 59 +-
.../near/GridNearTxFinishFuture.java | 38 +-
.../cache/distributed/near/GridNearTxLocal.java | 2 +-
.../cache/transactions/IgniteTxAdapter.java | 31 ++
.../cache/transactions/IgniteTxHandler.java | 119 ++---
.../transactions/IgniteTxLocalAdapter.java | 519 +++++++++---------
.../processors/failure/FailureProcessor.java | 8 +
.../org/apache/ignite/spi/IgniteSpiAdapter.java | 1 -
.../cache/GridCacheAbstractSelfTest.java | 9 +-
.../cache/query/IndexingSpiQuerySelfTest.java | 66 +--
.../cache/query/IndexingSpiQueryTxSelfTest.java | 74 +--
.../AbstractTransactionIntergrityTest.java | 111 ++--
...IntegrityWithPrimaryIndexCorruptionTest.java | 268 ++++++----
...ctionIntegrityWithSystemWorkerDeathTest.java | 6 +-
.../TxDataConsistencyOnCommitFailureTest.java | 234 ++++++++
.../junits/common/GridCommonAbstractTest.java | 19 +-
.../testsuites/IgniteCacheTestSuite9.java | 3 +
23 files changed, 1220 insertions(+), 920 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/5eb871e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 9bb8aec..ab5b725 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -2906,11 +2906,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
ver = newVer;
flags &= ~IS_EVICT_DISABLED;
- if (cctx.mvccEnabled())
- cctx.offheap().mvccRemoveAll(this);
- else
- removeValue();
-
onInvalidate();
return obsoleteVersionExtras() != null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/5eb871e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index 52d8525..b5cd82b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -1141,4 +1141,13 @@ public class GridCacheSharedContext<K, V> {
public void readOnlyMode(boolean readOnlyMode) {
this.readOnlyMode = readOnlyMode;
}
+
+ /**
+ * For test purposes.
+ * @param txMgr Tx manager.
+ */
+ public void setTxManager(IgniteTxManager txMgr) {
+ this.txMgr = txMgr;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5eb871e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index 7313197..4db4685 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -33,7 +33,6 @@ import org.apache.ignite.IgniteException;
import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.failure.FailureType;
import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.InvalidEnvironmentException;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.pagemem.wal.WALPointer;
import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
@@ -55,7 +54,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.PartitionUpda
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
-import org.apache.ignite.internal.processors.cache.persistence.StorageException;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxAdapter;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
@@ -512,312 +510,267 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
batchStoreCommit(writeMap().values());
- try {
- // Node that for near transactions we grab all entries.
- for (IgniteTxEntry txEntry : entries) {
- GridCacheContext cacheCtx = txEntry.context();
+ // Node that for near transactions we grab all entries.
+ for (IgniteTxEntry txEntry : entries) {
+ GridCacheContext cacheCtx = txEntry.context();
- boolean replicate = cacheCtx.isDrEnabled();
+ boolean replicate = cacheCtx.isDrEnabled();
+ while (true) {
try {
- while (true) {
- try {
- GridCacheEntryEx cached = txEntry.cached();
+ GridCacheEntryEx cached = txEntry.cached();
- if (cached == null)
- txEntry.cached(cached = cacheCtx.cache().entryEx(txEntry.key(), topologyVersion()));
+ if (cached == null)
+ txEntry.cached(cached = cacheCtx.cache().entryEx(txEntry.key(), topologyVersion()));
- if (near() && cacheCtx.dr().receiveEnabled()) {
- cached.markObsolete(xidVer);
+ if (near() && cacheCtx.dr().receiveEnabled()) {
+ cached.markObsolete(xidVer);
- break;
- }
+ break;
+ }
- GridNearCacheEntry nearCached = null;
+ GridNearCacheEntry nearCached = null;
- if (updateNearCache(cacheCtx, txEntry.key(), topVer))
- nearCached = cacheCtx.dht().near().peekExx(txEntry.key());
+ if (updateNearCache(cacheCtx, txEntry.key(), topVer))
+ nearCached = cacheCtx.dht().near().peekExx(txEntry.key());
- if (!F.isEmpty(txEntry.entryProcessors()))
- txEntry.cached().unswap(false);
+ if (!F.isEmpty(txEntry.entryProcessors()))
+ txEntry.cached().unswap(false);
- IgniteBiTuple<GridCacheOperation, CacheObject> res =
- applyTransformClosures(txEntry, false, ret);
+ IgniteBiTuple<GridCacheOperation, CacheObject> res =
+ applyTransformClosures(txEntry, false, ret);
- GridCacheOperation op = res.get1();
- CacheObject val = res.get2();
+ GridCacheOperation op = res.get1();
+ CacheObject val = res.get2();
- GridCacheVersion explicitVer = txEntry.conflictVersion();
+ GridCacheVersion explicitVer = txEntry.conflictVersion();
- if (explicitVer == null)
- explicitVer = writeVersion();
+ if (explicitVer == null)
+ explicitVer = writeVersion();
- if (txEntry.ttl() == CU.TTL_ZERO)
- op = DELETE;
+ if (txEntry.ttl() == CU.TTL_ZERO)
+ op = DELETE;
- boolean conflictNeedResolve = cacheCtx.conflictNeedResolve();
+ boolean conflictNeedResolve = cacheCtx.conflictNeedResolve();
- GridCacheVersionConflictContext conflictCtx = null;
+ GridCacheVersionConflictContext conflictCtx = null;
- if (conflictNeedResolve) {
- IgniteBiTuple<GridCacheOperation, GridCacheVersionConflictContext>
- drRes = conflictResolve(op, txEntry, val, explicitVer, cached);
+ if (conflictNeedResolve) {
+ IgniteBiTuple<GridCacheOperation, GridCacheVersionConflictContext>
+ drRes = conflictResolve(op, txEntry, val, explicitVer, cached);
- assert drRes != null;
+ assert drRes != null;
- conflictCtx = drRes.get2();
+ conflictCtx = drRes.get2();
- if (conflictCtx.isUseOld())
- op = NOOP;
- else if (conflictCtx.isUseNew()) {
- txEntry.ttl(conflictCtx.ttl());
- txEntry.conflictExpireTime(conflictCtx.expireTime());
- }
- else if (conflictCtx.isMerge()) {
- op = drRes.get1();
- val = txEntry.context().toCacheObject(conflictCtx.mergeValue());
- explicitVer = writeVersion();
+ if (conflictCtx.isUseOld())
+ op = NOOP;
+ else if (conflictCtx.isUseNew()) {
+ txEntry.ttl(conflictCtx.ttl());
+ txEntry.conflictExpireTime(conflictCtx.expireTime());
+ }
+ else if (conflictCtx.isMerge()) {
+ op = drRes.get1();
+ val = txEntry.context().toCacheObject(conflictCtx.mergeValue());
+ explicitVer = writeVersion();
- txEntry.ttl(conflictCtx.ttl());
- txEntry.conflictExpireTime(conflictCtx.expireTime());
- }
- }
- else
- // Nullify explicit version so that innerSet/innerRemove will work as usual.
- explicitVer = null;
-
- GridCacheVersion dhtVer = cached.isNear() ? writeVersion() : null;
-
- if (!near() && cacheCtx.group().persistenceEnabled() && cacheCtx.group().walEnabled() &&
- op != NOOP && op != RELOAD && (op != READ || cctx.snapshot().needTxReadLogging())) {
- if (dataEntries == null)
- dataEntries = new ArrayList<>(entries.size());
-
- dataEntries.add(
- new T2<>(
- new DataEntry(
- cacheCtx.cacheId(),
- txEntry.key(),
- val,
- op,
- nearXidVersion(),
- writeVersion(),
- 0,
- txEntry.key().partition(),
- txEntry.updateCounter()
- ),
- txEntry
- )
- );
- }
+ txEntry.ttl(conflictCtx.ttl());
+ txEntry.conflictExpireTime(conflictCtx.expireTime());
+ }
+ }
+ else
+ // Nullify explicit version so that innerSet/innerRemove will work as usual.
+ explicitVer = null;
+
+ GridCacheVersion dhtVer = cached.isNear() ? writeVersion() : null;
+
+ if (!near() && cacheCtx.group().persistenceEnabled() && cacheCtx.group().walEnabled() &&
+ op != NOOP && op != RELOAD && (op != READ || cctx.snapshot().needTxReadLogging())) {
+ if (dataEntries == null)
+ dataEntries = new ArrayList<>(entries.size());
+
+ dataEntries.add(
+ new T2<>(
+ new DataEntry(
+ cacheCtx.cacheId(),
+ txEntry.key(),
+ val,
+ op,
+ nearXidVersion(),
+ writeVersion(),
+ 0,
+ txEntry.key().partition(),
+ txEntry.updateCounter()
+ ),
+ txEntry
+ )
+ );
+ }
- if (op == CREATE || op == UPDATE) {
- // Invalidate only for near nodes (backups cannot be invalidated).
- if (isSystemInvalidate() || (isInvalidate() && cacheCtx.isNear()))
- cached.innerRemove(this,
- eventNodeId(),
- nodeId,
- false,
- true,
- true,
- txEntry.keepBinary(),
- txEntry.hasOldValue(),
- txEntry.oldValue(),
- topVer,
- null,
- replicate ? DR_BACKUP : DR_NONE,
- near() ? null : explicitVer,
- CU.subjectId(this, cctx),
- resolveTaskName(),
- dhtVer,
- txEntry.updateCounter(),
- mvccSnapshot());
- else {
- assert val != null : txEntry;
-
- GridCacheUpdateTxResult updRes = cached.innerSet(this,
- eventNodeId(),
- nodeId,
- val,
- false,
- false,
- txEntry.ttl(),
- true,
- true,
- txEntry.keepBinary(),
- txEntry.hasOldValue(),
- txEntry.oldValue(),
- topVer,
- null,
- replicate ? DR_BACKUP : DR_NONE,
- txEntry.conflictExpireTime(),
- near() ? null : explicitVer,
- CU.subjectId(this, cctx),
- resolveTaskName(),
- dhtVer,
- txEntry.updateCounter(),
- mvccSnapshot());
-
- txEntry.updateCounter(updRes.updateCounter());
-
- if (updRes.loggedPointer() != null)
- ptr = updRes.loggedPointer();
-
- // Keep near entry up to date.
- if (nearCached != null) {
- CacheObject val0 = cached.valueBytes();
-
- nearCached.updateOrEvict(xidVer,
- val0,
- cached.expireTime(),
- cached.ttl(),
- nodeId,
- topVer);
- }
- }
- }
- else if (op == DELETE) {
- GridCacheUpdateTxResult updRes = cached.innerRemove(this,
- eventNodeId(),
+ if (op == CREATE || op == UPDATE) {
+ // Invalidate only for near nodes (backups cannot be invalidated).
+ if (isSystemInvalidate() || (isInvalidate() && cacheCtx.isNear()))
+ cached.innerRemove(this,
+ eventNodeId(),
+ nodeId,
+ false,
+ true,
+ true,
+ txEntry.keepBinary(),
+ txEntry.hasOldValue(),
+ txEntry.oldValue(),
+ topVer,
+ null,
+ replicate ? DR_BACKUP : DR_NONE,
+ near() ? null : explicitVer,
+ CU.subjectId(this, cctx),
+ resolveTaskName(),
+ dhtVer,
+ txEntry.updateCounter(),
+ mvccSnapshot());
+ else {
+ assert val != null : txEntry;
+
+ GridCacheUpdateTxResult updRes = cached.innerSet(this,
+ eventNodeId(),
+ nodeId,
+ val,
+ false,
+ false,
+ txEntry.ttl(),
+ true,
+ true,
+ txEntry.keepBinary(),
+ txEntry.hasOldValue(),
+ txEntry.oldValue(),
+ topVer,
+ null,
+ replicate ? DR_BACKUP : DR_NONE,
+ txEntry.conflictExpireTime(),
+ near() ? null : explicitVer,
+ CU.subjectId(this, cctx),
+ resolveTaskName(),
+ dhtVer,
+ txEntry.updateCounter(),
+ mvccSnapshot());
+
+ txEntry.updateCounter(updRes.updateCounter());
+
+ if (updRes.loggedPointer() != null)
+ ptr = updRes.loggedPointer();
+
+ // Keep near entry up to date.
+ if (nearCached != null) {
+ CacheObject val0 = cached.valueBytes();
+
+ nearCached.updateOrEvict(xidVer,
+ val0,
+ cached.expireTime(),
+ cached.ttl(),
nodeId,
- false,
- true,
- true,
- txEntry.keepBinary(),
- txEntry.hasOldValue(),
- txEntry.oldValue(),
- topVer,
- null,
- replicate ? DR_BACKUP : DR_NONE,
- near() ? null : explicitVer,
- CU.subjectId(this, cctx),
- resolveTaskName(),
- dhtVer,
- txEntry.updateCounter(),
- mvccSnapshot());
-
- txEntry.updateCounter(updRes.updateCounter());
-
- if (updRes.loggedPointer() != null)
- ptr = updRes.loggedPointer();
-
- // Keep near entry up to date.
- if (nearCached != null)
- nearCached.updateOrEvict(xidVer, null, 0, 0, nodeId, topVer);
- }
- else if (op == RELOAD) {
- CacheObject reloaded = cached.innerReload();
-
- if (nearCached != null) {
- nearCached.innerReload();
-
- nearCached.updateOrEvict(cached.version(),
- reloaded,
- cached.expireTime(),
- cached.ttl(),
- nodeId,
- topVer);
- }
+ topVer);
}
- else if (op == READ) {
- assert near();
-
- if (log.isDebugEnabled())
- log.debug("Ignoring READ entry when committing: " + txEntry);
- }
- // No-op.
- else {
- if (conflictCtx == null || !conflictCtx.isUseOld()) {
- if (txEntry.ttl() != CU.TTL_NOT_CHANGED)
- cached.updateTtl(null, txEntry.ttl());
-
- if (nearCached != null) {
- CacheObject val0 = cached.valueBytes();
-
- nearCached.updateOrEvict(xidVer,
- val0,
- cached.expireTime(),
- cached.ttl(),
- nodeId,
- topVer);
- }
- }
- }
-
- // Assert after setting values as we want to make sure
- // that if we replaced removed entries.
- assert
- txEntry.op() == READ || onePhaseCommit() ||
- // If candidate is not there, then lock was explicit
- // and we simply allow the commit to proceed.
- !cached.hasLockCandidateUnsafe(xidVer) || cached.lockedByUnsafe(xidVer) :
- "Transaction does not own lock for commit [entry=" + cached +
- ", tx=" + this + ']';
-
- // Break out of while loop.
- break;
- }
- catch (GridCacheEntryRemovedException ignored) {
- if (log.isDebugEnabled())
- log.debug("Attempting to commit a removed entry (will retry): " + txEntry);
-
- // Renew cached entry.
- txEntry.cached(cacheCtx.cache().entryEx(txEntry.key(), topologyVersion()));
}
}
- }
- catch (Throwable ex) {
- boolean isNodeStopping = X.hasCause(ex, NodeStoppingException.class);
- boolean hasInvalidEnvironmentIssue = X.hasCause(ex, InvalidEnvironmentException.class);
-
- // In case of error, we still make the best effort to commit,
- // as there is no way to rollback at this point.
- err = new IgniteTxHeuristicCheckedException("Commit produced a runtime exception " +
- "(all transaction entries will be invalidated): " + CU.txString(this), ex);
-
- if (isNodeStopping) {
- U.warn(log, "Failed to commit transaction, node is stopping [tx=" + this +
- ", err=" + ex + ']');
- }
- else if (hasInvalidEnvironmentIssue) {
- U.warn(log, "Failed to commit transaction, node is in invalid state and will be stopped [tx=" + this +
- ", err=" + ex + ']');
+ else if (op == DELETE) {
+ GridCacheUpdateTxResult updRes = cached.innerRemove(this,
+ eventNodeId(),
+ nodeId,
+ false,
+ true,
+ true,
+ txEntry.keepBinary(),
+ txEntry.hasOldValue(),
+ txEntry.oldValue(),
+ topVer,
+ null,
+ replicate ? DR_BACKUP : DR_NONE,
+ near() ? null : explicitVer,
+ CU.subjectId(this, cctx),
+ resolveTaskName(),
+ dhtVer,
+ txEntry.updateCounter(),
+ mvccSnapshot());
+
+ txEntry.updateCounter(updRes.updateCounter());
+
+ if (updRes.loggedPointer() != null)
+ ptr = updRes.loggedPointer();
+
+ // Keep near entry up to date.
+ if (nearCached != null)
+ nearCached.updateOrEvict(xidVer, null, 0, 0, nodeId, topVer);
}
- else
- U.error(log, "Commit failed.", err);
-
- state(UNKNOWN);
-
- if (hasInvalidEnvironmentIssue)
- cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, ex));
- else if (!isNodeStopping) { // Skip fair uncommit in case of node stopping or invalidation.
- try {
- // Courtesy to minimize damage.
- uncommit();
+ else if (op == RELOAD) {
+ CacheObject reloaded = cached.innerReload();
+
+ if (nearCached != null) {
+ nearCached.innerReload();
+
+ nearCached.updateOrEvict(cached.version(),
+ reloaded,
+ cached.expireTime(),
+ cached.ttl(),
+ nodeId,
+ topVer);
}
- catch (Throwable ex1) {
- U.error(log, "Failed to uncommit transaction: " + this, ex1);
+ }
+ else if (op == READ) {
+ assert near();
- if (ex1 instanceof Error)
- throw ex1;
+ if (log.isDebugEnabled())
+ log.debug("Ignoring READ entry when committing: " + txEntry);
+ }
+ // No-op.
+ else {
+ if (conflictCtx == null || !conflictCtx.isUseOld()) {
+ if (txEntry.ttl() != CU.TTL_NOT_CHANGED)
+ cached.updateTtl(null, txEntry.ttl());
+
+ if (nearCached != null) {
+ CacheObject val0 = cached.valueBytes();
+
+ nearCached.updateOrEvict(xidVer,
+ val0,
+ cached.expireTime(),
+ cached.ttl(),
+ nodeId,
+ topVer);
+ }
}
}
- if (ex instanceof Error)
- throw (Error) ex;
+ // Assert after setting values as we want to make sure
+ // that if we replaced removed entries.
+ assert
+ txEntry.op() == READ || onePhaseCommit() ||
+ // If candidate is not there, then lock was explicit
+ // and we simply allow the commit to proceed.
+ !cached.hasLockCandidateUnsafe(xidVer) || cached.lockedByUnsafe(xidVer) :
+ "Transaction does not own lock for commit [entry=" + cached +
+ ", tx=" + this + ']';
+
+ // Break out of while loop.
+ break;
+ }
+ catch (GridCacheEntryRemovedException ignored) {
+ if (log.isDebugEnabled())
+ log.debug("Attempting to commit a removed entry (will retry): " + txEntry);
- throw err;
+ // Renew cached entry.
+ txEntry.cached(cacheCtx.cache().entryEx(txEntry.key(), topologyVersion()));
}
}
+ }
- // Apply cache size deltas.
- applyTxSizes();
+ // Apply cache size deltas.
+ applyTxSizes();
- TxCounters txCntrs = txCounters(false);
+ TxCounters txCntrs = txCounters(false);
- // Apply update counters.
- if (txCntrs != null)
- applyPartitionsUpdatesCounters(txCntrs.updateCounters());
+ // Apply update counters.
+ if (txCntrs != null)
+ applyPartitionsUpdatesCounters(txCntrs.updateCounters());
cctx.mvccCaching().onTxFinished(this, true);
@@ -827,18 +780,32 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
.map(tuple -> tuple.get1().partitionCounter(tuple.get2().updateCounter()))
.collect(Collectors.toList());
- cctx.wal().log(new DataRecord(entriesWithCounters));
- }
+ cctx.wal().log(new DataRecord(entriesWithCounters));
+ }
+
+ if (ptr != null && !cctx.tm().logTxRecords())
+ cctx.wal().flush(ptr, false);
+ }
+ catch (Throwable ex) {
+ state(UNKNOWN);
- if (ptr != null && !cctx.tm().logTxRecords())
- cctx.wal().flush(ptr, false);
+ if (X.hasCause(ex, NodeStoppingException.class)) {
+ U.warn(log, "Failed to commit transaction, node is stopping [tx=" + CU.txString(this) +
+ ", err=" + ex + ']');
+
+ return;
}
- catch (StorageException e) {
- err = e;
- throw new IgniteCheckedException("Failed to log transaction record " +
- "(transaction will be rolled back): " + this, e);
+ err = heuristicException(ex);
+
+ try {
+ uncommit();
+ }
+ catch (Throwable e) {
+ err.addSuppressed(e);
}
+
+ throw err;
}
finally {
cctx.database().checkpointReadUnlock();
@@ -878,9 +845,19 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
throw new IgniteCheckedException("Invalid transaction state for commit [state=" + state + ", tx=" + this + ']');
rollbackRemoteTx();
+
+ return;
}
- commitIfLocked();
+ try {
+ commitIfLocked();
+ }
+ catch (IgniteTxHeuristicCheckedException e) {
+ // Treat heuristic exception as critical.
+ cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
+
+ throw e;
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/5eb871e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index 21eb7b2..9f96b46 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@ -173,10 +173,13 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity
if (ERR_UPD.compareAndSet(this, null, e)) {
tx.setRollbackOnly();
- if (X.hasCause(e, InvalidEnvironmentException.class, NodeStoppingException.class))
+ if (X.hasCause(e, NodeStoppingException.class) || cctx.kernalContext().failure().nodeStopping())
onComplete();
- else
+ else {
+ // Rolling back a remote transaction may result in partial commit.
+ // This is only acceptable in tests with no-op failure handler.
finish(false);
+ }
}
}
@@ -230,9 +233,9 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity
if (this.tx.onePhaseCommit() && (this.tx.state() == COMMITTING)) {
try {
- boolean hasInvalidEnvironmentIssue = X.hasCause(err, InvalidEnvironmentException.class, NodeStoppingException.class);
+ boolean nodeStopping = X.hasCause(err, NodeStoppingException.class);
- this.tx.tmFinish(err == null, hasInvalidEnvironmentIssue, false);
+ this.tx.tmFinish(err == null, nodeStopping || cctx.kernalContext().failure().nodeStopping(), false);
}
catch (IgniteCheckedException finishErr) {
U.error(log, "Failed to finish tx: " + tx, e);
http://git-wip-us.apache.org/repos/asf/ignite/blob/5eb871e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
index a091d44..ca451f0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@ -24,6 +24,8 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.failure.FailureContext;
+import org.apache.ignite.failure.FailureType;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -39,6 +41,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPr
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
@@ -46,6 +49,7 @@ import org.apache.ignite.internal.util.tostring.GridToStringBuilder;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
@@ -467,7 +471,11 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
", tx=" + CU.txString(this) + ']');
}
catch (IgniteCheckedException e) {
- U.error(log, "Failed to finish transaction [commit=" + commit + ", tx=" + this + ']', e);
+ logTxFinishErrorSafe(log, commit, e);
+
+ // Treat heuristic exception as critical.
+ if (X.hasCause(e, IgniteTxHeuristicCheckedException.class))
+ cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
err = e;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5eb871e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index ffa383b..483990f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@ -30,6 +30,8 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.failure.FailureContext;
+import org.apache.ignite.failure.FailureType;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
@@ -43,6 +45,7 @@ import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTx
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteTxAdapter;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalAdapter;
import org.apache.ignite.internal.processors.cache.transactions.TxCounters;
http://git-wip-us.apache.org/repos/asf/ignite/blob/5eb871e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 741faee..c505677 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -36,6 +36,8 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteInterruptedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.failure.FailureContext;
+import org.apache.ignite.failure.FailureType;
import org.apache.ignite.internal.IgniteDiagnosticAware;
import org.apache.ignite.internal.IgniteDiagnosticPrepareContext;
import org.apache.ignite.internal.IgniteInternalFuture;
@@ -740,8 +742,6 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
if (tx.commitOnPrepare()) {
if (tx.markFinalizing(IgniteInternalTx.FinalizationStatus.USER_FINISH)) {
- IgniteInternalFuture<IgniteInternalTx> fut = null;
-
CIX1<IgniteInternalFuture<IgniteInternalTx>> resClo =
new CIX1<IgniteInternalFuture<IgniteInternalTx>>() {
@Override public void applyx(IgniteInternalFuture<IgniteInternalTx> fut) {
@@ -753,42 +753,43 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
}
};
- if (prepErr == null) {
- try {
- fut = tx.commitAsync();
- }
- catch (RuntimeException | Error e) {
- Exception hEx = new IgniteTxHeuristicCheckedException("Commit produced a runtime " +
- "exception: " + CU.txString(tx), e);
-
- res.error(hEx);
+ try {
+ if (prepErr == null) {
+ try {
+ tx.commitAsync().listen(resClo);
+ }
+ catch (Throwable e) {
+ res.error(e);
- tx.systemInvalidate(true);
+ tx.systemInvalidate(true);
- try {
- fut = tx.rollbackAsync();
+ try {
+ tx.rollbackAsync().listen(resClo);
+ }
+ catch (Throwable e1) {
+ e.addSuppressed(e1);
+ }
- fut.listen(resClo);
+ throw e;
}
- catch (Throwable e1) {
- e.addSuppressed(e1);
+ }
+ else if (!cctx.kernalContext().isStopping()) {
+ try {
+ tx.rollbackAsync().listen(resClo);
}
+ catch (Throwable e) {
+ if (err != null)
+ err.addSuppressed(e);
- throw e;
+ throw err;
+ }
}
-
}
- else if (!cctx.kernalContext().isStopping())
- try {
- fut = tx.rollbackAsync();
- }
- catch (Throwable e) {
- err.addSuppressed(e);
- fut = null;
- }
+ catch (Throwable e){
+ tx.logTxFinishErrorSafe(log, true, e);
- if (fut != null)
- fut.listen(resClo);
+ cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
+ }
}
}
else {
http://git-wip-us.apache.org/repos/asf/ignite/blob/5eb871e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index 4a4d8e3..befa305 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -311,7 +311,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit
if (err != null) {
tx.setRollbackOnly();
- nodeStop = err instanceof NodeStoppingException;
+ nodeStop = err instanceof NodeStoppingException || cctx.kernalContext().failure().nodeStopping();
}
if (commit) {
@@ -357,29 +357,6 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit
}
if (super.onDone(tx0, err)) {
- if (error() instanceof IgniteTxHeuristicCheckedException && !nodeStop) {
- AffinityTopologyVersion topVer = tx.topologyVersion();
-
- for (IgniteTxEntry e : tx.writeMap().values()) {
- GridCacheContext cacheCtx = e.context();
-
- try {
- if (e.op() != NOOP && !cacheCtx.affinity().keyLocalNode(e.key(), topVer)) {
- GridCacheEntryEx entry = cacheCtx.cache().peekEx(e.key());
-
- if (entry != null)
- entry.invalidate(tx.xidVersion());
- }
- }
- catch (Throwable t) {
- U.error(log, "Failed to invalidate entry.", t);
-
- if (t instanceof Error)
- throw (Error)t;
- }
- }
- }
-
// Don't forget to clean up.
cctx.mvcc().removeFuture(futId);
@@ -402,8 +379,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit
}
/** {@inheritDoc} */
- @Override @SuppressWarnings("ForLoopReplaceableByForEach")
- public void finish(final boolean commit, final boolean clearThreadMap, final boolean onTimeout) {
+ @Override public void finish(final boolean commit, final boolean clearThreadMap, final boolean onTimeout) {
if (!cctx.mvcc().addFuture(this, futureId()))
return;
@@ -490,18 +466,22 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit
}
}
+ // Cleanup transaction if heuristic failure.
+ if (tx.state() == UNKNOWN)
+ cctx.tm().rollbackTx(tx, clearThreadMap, false);
+
if ((tx.onePhaseCommit() && needFinishOnePhase(commit)) || (!tx.onePhaseCommit() && mappings != null)) {
if (mappings.single()) {
GridDistributedTxMapping mapping = mappings.singleMapping();
if (mapping != null) {
- assert !hasFutures() || waitTxs != null : futures();
+ assert !hasFutures() || isDone() || waitTxs != null : futures();
finish(1, mapping, commit, !clearThreadMap);
}
}
else {
- assert !hasFutures() || waitTxs != null : futures();
+ assert !hasFutures() || isDone() || waitTxs != null : futures();
finish(mappings.mappings(), commit, !clearThreadMap);
}
@@ -762,7 +742,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit
/**
* @param mappings Mappings.
* @param commit Commit flag.
- * @param {@code true} If need to add completed version on finish.
+ * @param useCompletedVer {@code True} if need to add completed version on finish.
*/
private void finish(Iterable<GridDistributedTxMapping> mappings, boolean commit, boolean useCompletedVer) {
int miniId = 0;
http://git-wip-us.apache.org/repos/asf/ignite/blob/5eb871e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 76d464e..f56d99b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -3955,7 +3955,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
assert rollbackFut.isDone() : rollbackFut;
}
- else
+ else // First finish attempt was unsuccessful. Try again.
rollbackFut.finish(false, clearThreadMap, onTimeout);
}
else {
http://git-wip-us.apache.org/repos/asf/ignite/blob/5eb871e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index b091061..0d3ba75 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -75,6 +75,7 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx;
import org.apache.ignite.internal.processors.cluster.BaselineTopology;
+import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
import org.apache.ignite.internal.util.GridSetWrapper;
@@ -764,6 +765,36 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
"[timeout=" + timeout() + ", tx=" + CU.txString(this) + ']');
}
+ /**
+ * @param ex Root cause.
+ */
+ public final IgniteCheckedException heuristicException(Throwable ex) {
+ return new IgniteTxHeuristicCheckedException("Committing a transaction has produced runtime exception", ex);
+ }
+
+ /**
+ * @param log Log.
+ * @param commit Commit.
+ * @param e Exception.
+ */
+ public void logTxFinishErrorSafe(@Nullable IgniteLogger log, boolean commit, Throwable e) {
+ assert e != null : "Exception is expected";
+
+ final String fmt = "Failed completing the transaction: [commit=%s, tx=%s, plc=%s]";
+
+ try {
+ // First try printing a full transaction. This is error prone.
+ U.error(log, String.format(fmt, commit, this,
+ cctx.gridConfig().getFailureHandler().getClass().getSimpleName()), e);
+ }
+ catch (Throwable e0) {
+ e.addSuppressed(e0);
+
+ U.error(log, String.format(fmt, commit, CU.txString(this),
+ cctx.gridConfig().getFailureHandler().getClass().getSimpleName()), e);
+ }
+ }
+
/** {@inheritDoc} */
@Override public GridCacheVersion xidVersion() {
return xidVer;
http://git-wip-us.apache.org/repos/asf/ignite/blob/5eb871e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 4c7b65d..895a9d1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -1030,45 +1030,34 @@ public class IgniteTxHandler {
}
catch (Throwable e) {
try {
- U.error(log, "Failed completing transaction [commit=" + req.commit() + ", tx=" + tx + ']', e);
- }
- catch (Throwable e0) {
- ClusterNode node0 = ctx.discovery().node(nodeId);
-
- U.error(log, "Failed completing transaction [commit=" + req.commit() + ", tx=" +
- CU.txString(tx) + ']', e);
-
- U.error(log, "Failed to log message due to an error: ", e0);
+ if (tx != null) {
+ tx.commitError(e);
- if (node0 != null && (!node0.isClient() || node0.isLocal())) {
- ctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
+ tx.systemInvalidate(true);
- throw e;
- }
- }
-
- if (tx != null) {
- tx.commitError(e);
-
- tx.systemInvalidate(true);
+ try {
+ IgniteInternalFuture<IgniteInternalTx> res = tx.rollbackDhtLocalAsync();
- try {
- IgniteInternalFuture<IgniteInternalTx> res = tx.rollbackDhtLocalAsync();
+ // Only for error logging.
+ res.listen(CU.errorLogger(log));
- // Only for error logging.
- res.listen(CU.errorLogger(log));
+ return res;
+ }
+ catch (Throwable e1) {
+ e.addSuppressed(e1);
+ }
- return res;
+ tx.logTxFinishErrorSafe(log, req.commit(), e);
}
- catch (Throwable e1) {
- e.addSuppressed(e1);
- }
- }
- if (e instanceof Error)
- throw (Error)e;
+ if (e instanceof Error)
+ throw (Error)e;
- return new GridFinishedFuture<>(e);
+ return new GridFinishedFuture<>(e);
+ }
+ finally {
+ ctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
+ }
}
}
@@ -1093,20 +1082,26 @@ public class IgniteTxHandler {
return tx.rollbackAsyncLocal();
}
catch (Throwable e) {
- U.error(log, "Failed completing transaction [commit=" + commit + ", tx=" + tx + ']', e);
-
- if (e instanceof Error)
- throw e;
+ try {
+ if (tx != null) {
+ try {
+ return tx.rollbackNearTxLocalAsync();
+ }
+ catch (Throwable e1) {
+ e.addSuppressed(e1);
+ }
- if (tx != null)
- try {
- return tx.rollbackNearTxLocalAsync();
- }
- catch (Throwable e1) {
- e.addSuppressed(e1);
+ tx.logTxFinishErrorSafe(log, commit, e);
}
- return new GridFinishedFuture<>(e);
+ if (e instanceof Error)
+ throw e;
+
+ return new GridFinishedFuture<>(e);
+ }
+ finally {
+ ctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
+ }
}
}
@@ -1193,10 +1188,6 @@ public class IgniteTxHandler {
if (log.isDebugEnabled())
log.debug("Optimistic failure for remote transaction (will rollback): " + req);
}
- else if (e instanceof IgniteTxHeuristicCheckedException) {
- U.warn(log, "Failed to commit transaction (all transaction entries were invalidated): " +
- CU.txString(dhtTx));
- }
else
U.error(log, "Failed to process prepare request: " + req, e);
@@ -1421,9 +1412,10 @@ public class IgniteTxHandler {
tx.rollbackRemoteTx();
}
}
+ catch (IgniteTxHeuristicCheckedException e) {
+ // Already uncommitted.
+ }
catch (Throwable e) {
- U.error(log, "Failed completing transaction [commit=" + req.commit() + ", tx=" + tx + ']', e);
-
// Mark transaction for invalidate.
tx.invalidate(true);
tx.systemInvalidate(true);
@@ -1441,6 +1433,8 @@ public class IgniteTxHandler {
}
/**
+ * Finish for one-phase distributed tx.
+ *
* @param tx Transaction.
* @param req Request.
*/
@@ -1464,22 +1458,27 @@ public class IgniteTxHandler {
throw e;
}
catch (Throwable e) {
- U.error(log, "Failed committing transaction [tx=" + tx + ']', e);
+ try {
+ // Mark transaction for invalidate.
+ tx.invalidate(true);
- // Mark transaction for invalidate.
- tx.invalidate(true);
- tx.systemInvalidate(true);
+ tx.systemInvalidate(true);
- try {
- tx.rollbackRemoteTx();
+ try {
+ tx.rollbackRemoteTx();
+ }
+ catch (Throwable e1) {
+ e.addSuppressed(e1);
+ }
+
+ tx.logTxFinishErrorSafe(log, true, e);
+
+ if (e instanceof Error)
+ throw (Error)e;
}
- catch (Throwable e1) {
- e.addSuppressed(e1);
- U.error(log, "Failed to automatically rollback transaction: " + tx, e1);
+ finally {
+ ctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
}
-
- if (e instanceof Error)
- throw (Error)e;
}
}
[2/3] ignite git commit: IGNITE-9082 Throwing checked exception
during tx commit without node stopping leads to data corruption - Fixes
#4809.
Posted by ir...@apache.org.
http://git-wip-us.apache.org/repos/asf/ignite/blob/5eb871e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 775b61c..7e04292 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -558,7 +558,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
/** {@inheritDoc} */
@SuppressWarnings({"CatchGenericClass"})
- @Override public final void userCommit() throws IgniteCheckedException {
+ @Override public void userCommit() throws IgniteCheckedException {
TransactionState state = state();
if (state != COMMITTING) {
@@ -590,7 +590,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
WALPointer ptr = null;
- Exception err = null;
+ IgniteCheckedException err = null;
cctx.database().checkpointReadLock();
@@ -609,176 +609,175 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
UUID nodeId = txEntry.nodeId() == null ? this.nodeId : txEntry.nodeId();
- try {
- while (true) {
- try {
- GridCacheEntryEx cached = txEntry.cached();
+ while (true) {
+ try {
+ GridCacheEntryEx cached = txEntry.cached();
- // Must try to evict near entries before committing from
- // transaction manager to make sure locks are held.
- if (!evictNearEntry(txEntry, false)) {
- if (cacheCtx.isNear() && cacheCtx.dr().receiveEnabled()) {
- cached.markObsolete(xidVer);
+ // Must try to evict near entries before committing from
+ // transaction manager to make sure locks are held.
+ if (!evictNearEntry(txEntry, false)) {
+ if (cacheCtx.isNear() && cacheCtx.dr().receiveEnabled()) {
+ cached.markObsolete(xidVer);
- break;
- }
+ break;
+ }
- if (cached.detached())
- break;
+ if (cached.detached())
+ break;
- boolean updateNearCache = updateNearCache(cacheCtx, txEntry.key(), topVer);
+ boolean updateNearCache = updateNearCache(cacheCtx, txEntry.key(), topVer);
- boolean metrics = true;
+ boolean metrics = true;
- if (!updateNearCache && cacheCtx.isNear() && txEntry.locallyMapped())
- metrics = false;
+ if (!updateNearCache && cacheCtx.isNear() && txEntry.locallyMapped())
+ metrics = false;
- boolean evt = !isNearLocallyMapped(txEntry, false);
+ boolean evt = !isNearLocallyMapped(txEntry, false);
- if (!F.isEmpty(txEntry.entryProcessors()) || !F.isEmpty(txEntry.filters()))
- txEntry.cached().unswap(false);
+ if (!F.isEmpty(txEntry.entryProcessors()) || !F.isEmpty(txEntry.filters()))
+ txEntry.cached().unswap(false);
- IgniteBiTuple<GridCacheOperation, CacheObject> res = applyTransformClosures(txEntry,
- true, null);
+ IgniteBiTuple<GridCacheOperation, CacheObject> res = applyTransformClosures(txEntry,
+ true, null);
- GridCacheVersion dhtVer = null;
+ GridCacheVersion dhtVer = null;
- // For near local transactions we must record DHT version
- // in order to keep near entries on backup nodes until
- // backup remote transaction completes.
- if (cacheCtx.isNear()) {
- if (txEntry.op() == CREATE || txEntry.op() == UPDATE ||
- txEntry.op() == DELETE || txEntry.op() == TRANSFORM)
- dhtVer = txEntry.dhtVersion();
+ // For near local transactions we must record DHT version
+ // in order to keep near entries on backup nodes until
+ // backup remote transaction completes.
+ if (cacheCtx.isNear()) {
+ if (txEntry.op() == CREATE || txEntry.op() == UPDATE ||
+ txEntry.op() == DELETE || txEntry.op() == TRANSFORM)
+ dhtVer = txEntry.dhtVersion();
- if ((txEntry.op() == CREATE || txEntry.op() == UPDATE) &&
- txEntry.conflictExpireTime() == CU.EXPIRE_TIME_CALCULATE) {
- ExpiryPolicy expiry = cacheCtx.expiryForTxEntry(txEntry);
+ if ((txEntry.op() == CREATE || txEntry.op() == UPDATE) &&
+ txEntry.conflictExpireTime() == CU.EXPIRE_TIME_CALCULATE) {
+ ExpiryPolicy expiry = cacheCtx.expiryForTxEntry(txEntry);
- if (expiry != null) {
- txEntry.cached().unswap(false);
+ if (expiry != null) {
+ txEntry.cached().unswap(false);
- Duration duration = cached.hasValue() ?
- expiry.getExpiryForUpdate() : expiry.getExpiryForCreation();
+ Duration duration = cached.hasValue() ?
+ expiry.getExpiryForUpdate() : expiry.getExpiryForCreation();
- txEntry.ttl(CU.toTtl(duration));
- }
+ txEntry.ttl(CU.toTtl(duration));
}
}
+ }
- GridCacheOperation op = res.get1();
- CacheObject val = res.get2();
+ GridCacheOperation op = res.get1();
+ CacheObject val = res.get2();
- // Deal with conflicts.
- GridCacheVersion explicitVer = txEntry.conflictVersion() != null ?
- txEntry.conflictVersion() : writeVersion();
+ // Deal with conflicts.
+ GridCacheVersion explicitVer = txEntry.conflictVersion() != null ?
+ txEntry.conflictVersion() : writeVersion();
- if ((op == CREATE || op == UPDATE) &&
- txEntry.conflictExpireTime() == CU.EXPIRE_TIME_CALCULATE) {
- ExpiryPolicy expiry = cacheCtx.expiryForTxEntry(txEntry);
+ if ((op == CREATE || op == UPDATE) &&
+ txEntry.conflictExpireTime() == CU.EXPIRE_TIME_CALCULATE) {
+ ExpiryPolicy expiry = cacheCtx.expiryForTxEntry(txEntry);
- if (expiry != null) {
- Duration duration = cached.hasValue() ?
- expiry.getExpiryForUpdate() : expiry.getExpiryForCreation();
+ if (expiry != null) {
+ Duration duration = cached.hasValue() ?
+ expiry.getExpiryForUpdate() : expiry.getExpiryForCreation();
- long ttl = CU.toTtl(duration);
+ long ttl = CU.toTtl(duration);
- txEntry.ttl(ttl);
+ txEntry.ttl(ttl);
- if (ttl == CU.TTL_ZERO)
- op = DELETE;
- }
+ if (ttl == CU.TTL_ZERO)
+ op = DELETE;
}
+ }
- boolean conflictNeedResolve = cacheCtx.conflictNeedResolve();
-
- GridCacheVersionConflictContext<?, ?> conflictCtx = null;
-
- if (conflictNeedResolve) {
- IgniteBiTuple<GridCacheOperation, GridCacheVersionConflictContext> conflictRes =
- conflictResolve(op, txEntry, val, explicitVer, cached);
+ boolean conflictNeedResolve = cacheCtx.conflictNeedResolve();
- assert conflictRes != null;
+ GridCacheVersionConflictContext<?, ?> conflictCtx = null;
- conflictCtx = conflictRes.get2();
+ if (conflictNeedResolve) {
+ IgniteBiTuple<GridCacheOperation, GridCacheVersionConflictContext> conflictRes =
+ conflictResolve(op, txEntry, val, explicitVer, cached);
- if (conflictCtx.isUseOld())
- op = NOOP;
- else if (conflictCtx.isUseNew()) {
- txEntry.ttl(conflictCtx.ttl());
- txEntry.conflictExpireTime(conflictCtx.expireTime());
- }
- else {
- assert conflictCtx.isMerge();
+ assert conflictRes != null;
- op = conflictRes.get1();
- val = txEntry.context().toCacheObject(conflictCtx.mergeValue());
- explicitVer = writeVersion();
+ conflictCtx = conflictRes.get2();
- txEntry.ttl(conflictCtx.ttl());
- txEntry.conflictExpireTime(conflictCtx.expireTime());
- }
+ if (conflictCtx.isUseOld())
+ op = NOOP;
+ else if (conflictCtx.isUseNew()) {
+ txEntry.ttl(conflictCtx.ttl());
+ txEntry.conflictExpireTime(conflictCtx.expireTime());
}
- else
- // Nullify explicit version so that innerSet/innerRemove will work as usual.
- explicitVer = null;
+ else {
+ assert conflictCtx.isMerge();
- if (sndTransformedVals || conflictNeedResolve) {
- assert sndTransformedVals && cacheCtx.isReplicated() || conflictNeedResolve;
+ op = conflictRes.get1();
+ val = txEntry.context().toCacheObject(conflictCtx.mergeValue());
+ explicitVer = writeVersion();
- txEntry.value(val, true, false);
- txEntry.op(op);
- txEntry.entryProcessors(null);
- txEntry.conflictVersion(explicitVer);
+ txEntry.ttl(conflictCtx.ttl());
+ txEntry.conflictExpireTime(conflictCtx.expireTime());
}
+ }
+ else
+ // Nullify explicit version so that innerSet/innerRemove will work as usual.
+ explicitVer = null;
- if (dhtVer == null)
- dhtVer = explicitVer != null ? explicitVer : writeVersion();
+ if (sndTransformedVals || conflictNeedResolve) {
+ assert sndTransformedVals && cacheCtx.isReplicated() || conflictNeedResolve;
- if (op == CREATE || op == UPDATE) {
- assert val != null : txEntry;
+ txEntry.value(val, true, false);
+ txEntry.op(op);
+ txEntry.entryProcessors(null);
+ txEntry.conflictVersion(explicitVer);
+ }
- GridCacheUpdateTxResult updRes = cached.innerSet(
- this,
- eventNodeId(),
- txEntry.nodeId(),
- val,
- false,
- false,
- txEntry.ttl(),
- evt,
- metrics,
- txEntry.keepBinary(),
- txEntry.hasOldValue(),
- txEntry.oldValue(),
- topVer,
- null,
- cached.detached() ? DR_NONE : drType,
- txEntry.conflictExpireTime(),
- cached.isNear() ? null : explicitVer,
- CU.subjectId(this, cctx),
- resolveTaskName(),
- dhtVer,
- null,
- mvccSnapshot());
-
- if (updRes.success()) {
- txEntry.updateCounter(updRes.updateCounter());
-
- GridLongList waitTxs = updRes.mvccWaitTransactions();
-
- updateWaitTxs(waitTxs);
- }
+ if (dhtVer == null)
+ dhtVer = explicitVer != null ? explicitVer : writeVersion();
+
+ if (op == CREATE || op == UPDATE) {
+ assert val != null : txEntry;
+
+ GridCacheUpdateTxResult updRes = cached.innerSet(
+ this,
+ eventNodeId(),
+ txEntry.nodeId(),
+ val,
+ false,
+ false,
+ txEntry.ttl(),
+ evt,
+ metrics,
+ txEntry.keepBinary(),
+ txEntry.hasOldValue(),
+ txEntry.oldValue(),
+ topVer,
+ null,
+ cached.detached() ? DR_NONE : drType,
+ txEntry.conflictExpireTime(),
+ cached.isNear() ? null : explicitVer,
+ CU.subjectId(this, cctx),
+ resolveTaskName(),
+ dhtVer,
+ null,
+ mvccSnapshot());
+
+ if (updRes.success()) {
+ txEntry.updateCounter(updRes.updateCounter());
+
+ GridLongList waitTxs = updRes.mvccWaitTransactions();
+
+ updateWaitTxs(waitTxs);
+ }
- if (updRes.loggedPointer() != null)
- ptr = updRes.loggedPointer();
+ if (updRes.loggedPointer() != null)
+ ptr = updRes.loggedPointer();
- if (updRes.success() && updateNearCache) {
- final CacheObject val0 = val;
- final boolean metrics0 = metrics;
- final GridCacheVersion dhtVer0 = dhtVer;
+ if (updRes.success() && updateNearCache) {
+ final CacheObject val0 = val;
+ final boolean metrics0 = metrics;
+ final GridCacheVersion dhtVer0 = dhtVer;
- updateNearEntrySafely(cacheCtx, txEntry.key(), entry -> entry.innerSet(
+ updateNearEntrySafely(cacheCtx, txEntry.key(), entry -> entry.innerSet(
null,
eventNodeId(),
nodeId,
@@ -801,46 +800,46 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
dhtVer0,
null,
mvccSnapshot())
- );
- }
+ );
+ }
+ }
+ else if (op == DELETE) {
+ GridCacheUpdateTxResult updRes = cached.innerRemove(
+ this,
+ eventNodeId(),
+ txEntry.nodeId(),
+ false,
+ evt,
+ metrics,
+ txEntry.keepBinary(),
+ txEntry.hasOldValue(),
+ txEntry.oldValue(),
+ topVer,
+ null,
+ cached.detached() ? DR_NONE : drType,
+ cached.isNear() ? null : explicitVer,
+ CU.subjectId(this, cctx),
+ resolveTaskName(),
+ dhtVer,
+ null,
+ mvccSnapshot());
+
+ if (updRes.success()) {
+ txEntry.updateCounter(updRes.updateCounter());
+
+ GridLongList waitTxs = updRes.mvccWaitTransactions();
+
+ updateWaitTxs(waitTxs);
}
- else if (op == DELETE) {
- GridCacheUpdateTxResult updRes = cached.innerRemove(
- this,
- eventNodeId(),
- txEntry.nodeId(),
- false,
- evt,
- metrics,
- txEntry.keepBinary(),
- txEntry.hasOldValue(),
- txEntry.oldValue(),
- topVer,
- null,
- cached.detached() ? DR_NONE : drType,
- cached.isNear() ? null : explicitVer,
- CU.subjectId(this, cctx),
- resolveTaskName(),
- dhtVer,
- null,
- mvccSnapshot());
-
- if (updRes.success()) {
- txEntry.updateCounter(updRes.updateCounter());
-
- GridLongList waitTxs = updRes.mvccWaitTransactions();
-
- updateWaitTxs(waitTxs);
- }
- if (updRes.loggedPointer() != null)
- ptr = updRes.loggedPointer();
+ if (updRes.loggedPointer() != null)
+ ptr = updRes.loggedPointer();
- if (updRes.success() && updateNearCache) {
- final boolean metrics0 = metrics;
- final GridCacheVersion dhtVer0 = dhtVer;
+ if (updRes.success() && updateNearCache) {
+ final boolean metrics0 = metrics;
+ final GridCacheVersion dhtVer0 = dhtVer;
- updateNearEntrySafely(cacheCtx, txEntry.key(), entry -> entry.innerRemove(
+ updateNearEntrySafely(cacheCtx, txEntry.key(), entry -> entry.innerRemove(
null,
eventNodeId(),
nodeId,
@@ -859,125 +858,78 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
dhtVer0,
null,
mvccSnapshot())
- );
- }
+ );
}
- else if (op == RELOAD) {
- cached.innerReload();
+ }
+ else if (op == RELOAD) {
+ cached.innerReload();
- if (updateNearCache)
- updateNearEntrySafely(cacheCtx, txEntry.key(), entry -> entry.innerReload());
+ if (updateNearCache)
+ updateNearEntrySafely(cacheCtx, txEntry.key(), entry -> entry.innerReload());
+ }
+ else if (op == READ) {
+ CacheGroupContext grp = cacheCtx.group();
+
+ if (grp.persistenceEnabled() && grp.walEnabled() &&
+ cctx.snapshot().needTxReadLogging()) {
+ ptr = cctx.wal().log(new DataRecord(new DataEntry(
+ cacheCtx.cacheId(),
+ txEntry.key(),
+ val,
+ op,
+ nearXidVersion(),
+ writeVersion(),
+ 0,
+ txEntry.key().partition(),
+ txEntry.updateCounter())));
}
- else if (op == READ) {
- CacheGroupContext grp = cacheCtx.group();
-
- if (grp.persistenceEnabled() && grp.walEnabled() &&
- cctx.snapshot().needTxReadLogging()) {
- ptr = cctx.wal().log(new DataRecord(new DataEntry(
- cacheCtx.cacheId(),
- txEntry.key(),
- val,
- op,
- nearXidVersion(),
- writeVersion(),
- 0,
- txEntry.key().partition(),
- txEntry.updateCounter())));
- }
- ExpiryPolicy expiry = cacheCtx.expiryForTxEntry(txEntry);
+ ExpiryPolicy expiry = cacheCtx.expiryForTxEntry(txEntry);
- if (expiry != null) {
- Duration duration = expiry.getExpiryForAccess();
+ if (expiry != null) {
+ Duration duration = expiry.getExpiryForAccess();
- if (duration != null)
- cached.updateTtl(null, CU.toTtl(duration));
- }
-
- if (log.isDebugEnabled())
- log.debug("Ignoring READ entry when committing: " + txEntry);
+ if (duration != null)
+ cached.updateTtl(null, CU.toTtl(duration));
}
- else {
- assert ownsLock(txEntry.cached()) :
- "Transaction does not own lock for group lock entry during commit [tx=" +
- this + ", txEntry=" + txEntry + ']';
- if (conflictCtx == null || !conflictCtx.isUseOld()) {
- if (txEntry.ttl() != CU.TTL_NOT_CHANGED)
- cached.updateTtl(null, txEntry.ttl());
- }
-
- if (log.isDebugEnabled())
- log.debug("Ignoring NOOP entry when committing: " + txEntry);
- }
+ if (log.isDebugEnabled())
+ log.debug("Ignoring READ entry when committing: " + txEntry);
}
+ else {
+ assert ownsLock(txEntry.cached()) :
+ "Transaction does not own lock for group lock entry during commit [tx=" +
+ this + ", txEntry=" + txEntry + ']';
+
+ if (conflictCtx == null || !conflictCtx.isUseOld()) {
+ if (txEntry.ttl() != CU.TTL_NOT_CHANGED)
+ cached.updateTtl(null, txEntry.ttl());
+ }
- // Check commit locks after set, to make sure that
- // we are not changing obsolete entries.
- // (innerSet and innerRemove will throw an exception
- // if an entry is obsolete).
- if (txEntry.op() != READ)
- checkCommitLocks(cached);
-
- // Break out of while loop.
- break;
- }
- // If entry cached within transaction got removed.
- catch (GridCacheEntryRemovedException ignored) {
- if (log.isDebugEnabled())
- log.debug("Got removed entry during transaction commit (will retry): " + txEntry);
-
- txEntry.cached(entryEx(cacheCtx, txEntry.txKey(), topologyVersion()));
+ if (log.isDebugEnabled())
+ log.debug("Ignoring NOOP entry when committing: " + txEntry);
+ }
}
- }
- }
- catch (Throwable ex) {
- // We are about to initiate transaction rollback when tx has started to committing.
- // Need to remove version from committed list.
- cctx.tm().removeCommittedTx(this);
- boolean isNodeStopping = X.hasCause(ex, NodeStoppingException.class);
- boolean hasInvalidEnvironmentIssue = X.hasCause(ex, InvalidEnvironmentException.class);
+ // Check commit locks after set, to make sure that
+ // we are not changing obsolete entries.
+ // (innerSet and innerRemove will throw an exception
+ // if an entry is obsolete).
+ if (txEntry.op() != READ)
+ checkCommitLocks(cached);
- IgniteCheckedException err0 = new IgniteTxHeuristicCheckedException("Failed to locally write to cache " +
- "(all transaction entries will be invalidated, however there was a window when " +
- "entries for this transaction were visible to others): " + this, ex);
-
- if (isNodeStopping) {
- U.warn(log, "Failed to commit transaction, node is stopping [tx=" + this +
- ", err=" + ex + ']');
- }
- else if (hasInvalidEnvironmentIssue) {
- U.warn(log, "Failed to commit transaction, node is in invalid state and will be stopped [tx=" + this +
- ", err=" + ex + ']');
+ // Break out of while loop.
+ break;
}
- else
- U.error(log, "Commit failed.", err0);
-
- COMMIT_ERR_UPD.compareAndSet(this, null, err0);
+ // If entry cached within transaction got removed.
+ catch (GridCacheEntryRemovedException ignored) {
+ if (log.isDebugEnabled())
+ log.debug("Got removed entry during transaction commit (will retry): " + txEntry);
- state(UNKNOWN);
-
- if (hasInvalidEnvironmentIssue)
- cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, ex));
- else if (!isNodeStopping) { // Skip fair uncommit in case of node stopping or invalidation.
- try {
- // Courtesy to minimize damage.
- uncommit();
- }
- catch (Throwable ex1) {
- U.error(log, "Failed to uncommit transaction: " + this, ex1);
-
- if (ex1 instanceof Error)
- throw ex1;
- }
+ txEntry.cached(entryEx(cacheCtx, txEntry.txKey(), topologyVersion()));
}
-
- if (ex instanceof Error)
- throw ex;
-
- throw err0;
}
+
}
// Apply cache sizes only for primary nodes. Update counters were applied on prepare state.
@@ -988,11 +940,32 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
if (ptr != null && !cctx.tm().logTxRecords())
cctx.wal().flush(ptr, false);
}
- catch (StorageException e) {
- err = e;
+ catch (Throwable ex) {
+ // We are about to initiate transaction rollback when tx has started to committing.
+ // Need to remove version from committed list.
+ cctx.tm().removeCommittedTx(this);
+
+ if (X.hasCause(ex, NodeStoppingException.class)) {
+ U.warn(log, "Failed to commit transaction, node is stopping [tx=" + CU.txString(this) +
+ ", err=" + ex + ']');
+
+ return;
+ }
+
+ err = heuristicException(ex);
+
+ COMMIT_ERR_UPD.compareAndSet(this, null, err);
+
+ state(UNKNOWN);
+
+ try {
+ uncommit();
+ }
+ catch (Throwable e) {
+ err.addSuppressed(e);
+ }
- throw new IgniteCheckedException("Failed to log transaction record " +
- "(transaction will be rolled back): " + this, e);
+ throw err;
}
finally {
cctx.database().checkpointReadUnlock();
http://git-wip-us.apache.org/repos/asf/ignite/blob/5eb871e1/modules/core/src/main/java/org/apache/ignite/internal/processors/failure/FailureProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/failure/FailureProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/failure/FailureProcessor.java
index f31f0e9..ceeb4e1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/failure/FailureProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/failure/FailureProcessor.java
@@ -24,6 +24,7 @@ import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.failure.FailureHandler;
+import org.apache.ignite.failure.NoOpFailureHandler;
import org.apache.ignite.failure.StopNodeOrHaltFailureHandler;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
@@ -79,6 +80,13 @@ public class FailureProcessor extends GridProcessorAdapter {
}
/**
+ * @return @{code True} if a node will be stopped by current handler in near time.
+ */
+ public boolean nodeStopping() {
+ return failureCtx != null && !(hnd instanceof NoOpFailureHandler);
+ }
+
+ /**
* This method is used to initialize local failure handler if {@link IgniteConfiguration} don't contain configured one.
*
* @return Default {@link FailureHandler} implementation.
http://git-wip-us.apache.org/repos/asf/ignite/blob/5eb871e1/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
index a7e6e8c..f68ecd6 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
@@ -41,7 +41,6 @@ import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.timeout.GridSpiTimeoutObject;
import org.apache.ignite.internal.util.IgniteExceptionRegistry;
import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.SB;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
http://git-wip-us.apache.org/repos/asf/ignite/blob/5eb871e1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java
index facea69..e69aff8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java
@@ -256,7 +256,7 @@ public abstract class GridCacheAbstractSelfTest extends GridCommonAbstractTest {
cfg.setIndexedTypes(idxTypes);
if (cacheMode() == PARTITIONED)
- cfg.setBackups(1);
+ cfg.setBackups(backups());
return cfg;
}
@@ -362,6 +362,13 @@ public abstract class GridCacheAbstractSelfTest extends GridCommonAbstractTest {
}
/**
+ * @return Backups.
+ */
+ protected int backups() {
+ return 1;
+ }
+
+ /**
* @param idx Index of grid.
* @return Default cache.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/5eb871e1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQuerySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQuerySelfTest.java
index 5f2e2ed..3e59c2a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQuerySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQuerySelfTest.java
@@ -26,12 +26,10 @@ import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import javax.cache.Cache;
-import junit.framework.TestCase;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.IgniteTransactions;
-import org.apache.ignite.Ignition;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.query.QueryCursor;
@@ -47,6 +45,7 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.spi.indexing.IndexingQueryFilter;
import org.apache.ignite.spi.indexing.IndexingSpi;
import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
@@ -57,25 +56,19 @@ import org.jetbrains.annotations.Nullable;
/**
* Indexing Spi query only test
*/
-public class IndexingSpiQuerySelfTest extends TestCase {
- public static final String CACHE_NAME = "test-cache";
+public class IndexingSpiQuerySelfTest extends GridCommonAbstractTest {
+ private IndexingSpi indexingSpi;
/** {@inheritDoc} */
- @Override public void tearDown() throws Exception {
- Ignition.stopAll(true);
- }
-
- /**
- * @return Configuration.
- */
- protected IgniteConfiguration configuration() {
- IgniteConfiguration cfg = new IgniteConfiguration();
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
TcpDiscoverySpi disco = new TcpDiscoverySpi();
disco.setIpFinder(ipFinder);
+ cfg.setIndexingSpi(indexingSpi);
cfg.setDiscoverySpi(disco);
return cfg;
@@ -86,17 +79,22 @@ public class IndexingSpiQuerySelfTest extends TestCase {
return new CacheConfiguration<>(cacheName);
}
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+ }
+
/**
* @throws Exception If failed.
*/
public void testSimpleIndexingSpi() throws Exception {
- IgniteConfiguration cfg = configuration();
-
- cfg.setIndexingSpi(new MyIndexingSpi());
+ indexingSpi = new MyIndexingSpi();
- Ignite ignite = Ignition.start(cfg);
+ Ignite ignite = startGrid(0);
- CacheConfiguration<Integer, Integer> ccfg = cacheConfiguration(CACHE_NAME);
+ CacheConfiguration<Integer, Integer> ccfg = cacheConfiguration(DEFAULT_CACHE_NAME);
IgniteCache<Integer, Integer> cache = ignite.createCache(ccfg);
@@ -113,13 +111,11 @@ public class IndexingSpiQuerySelfTest extends TestCase {
* @throws Exception If failed.
*/
public void testIndexingSpiWithDisabledQueryProcessor() throws Exception {
- IgniteConfiguration cfg = configuration();
-
- cfg.setIndexingSpi(new MyIndexingSpi());
+ indexingSpi = new MyIndexingSpi();
- Ignite ignite = Ignition.start(cfg);
+ Ignite ignite = startGrid(0);
- CacheConfiguration<Integer, Integer> ccfg = cacheConfiguration(CACHE_NAME);
+ CacheConfiguration<Integer, Integer> ccfg = cacheConfiguration(DEFAULT_CACHE_NAME);
IgniteCache<Integer, Integer> cache = ignite.createCache(ccfg);
@@ -136,13 +132,11 @@ public class IndexingSpiQuerySelfTest extends TestCase {
* @throws Exception If failed.
*/
public void testBinaryIndexingSpi() throws Exception {
- IgniteConfiguration cfg = configuration();
+ indexingSpi = new MyBinaryIndexingSpi();
- cfg.setIndexingSpi(new MyBinaryIndexingSpi());
+ Ignite ignite = startGrid(0);
- Ignite ignite = Ignition.start(cfg);
-
- CacheConfiguration<PersonKey, Person> ccfg = cacheConfiguration(CACHE_NAME);
+ CacheConfiguration<PersonKey, Person> ccfg = cacheConfiguration(DEFAULT_CACHE_NAME);
IgniteCache<PersonKey, Person> cache = ignite.createCache(ccfg);
@@ -168,13 +162,11 @@ public class IndexingSpiQuerySelfTest extends TestCase {
public void testNonBinaryIndexingSpi() throws Exception {
System.setProperty(IgniteSystemProperties.IGNITE_UNWRAP_BINARY_FOR_INDEXING_SPI, "true");
- IgniteConfiguration cfg = configuration();
-
- cfg.setIndexingSpi(new MyIndexingSpi());
+ indexingSpi = new MyIndexingSpi();
- Ignite ignite = Ignition.start(cfg);
+ Ignite ignite = startGrid(0);
- CacheConfiguration<PersonKey, Person> ccfg = cacheConfiguration(CACHE_NAME);
+ CacheConfiguration<PersonKey, Person> ccfg = cacheConfiguration(DEFAULT_CACHE_NAME);
IgniteCache<PersonKey, Person> cache = ignite.createCache(ccfg);
@@ -198,13 +190,11 @@ public class IndexingSpiQuerySelfTest extends TestCase {
*/
@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
public void testIndexingSpiFailure() throws Exception {
- IgniteConfiguration cfg = configuration();
-
- cfg.setIndexingSpi(new MyBrokenIndexingSpi());
+ indexingSpi = new MyBrokenIndexingSpi();
- Ignite ignite = Ignition.start(cfg);
+ Ignite ignite = startGrid(0);
- CacheConfiguration<Integer, Integer> ccfg = cacheConfiguration(CACHE_NAME);
+ CacheConfiguration<Integer, Integer> ccfg = cacheConfiguration(DEFAULT_CACHE_NAME);
ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
http://git-wip-us.apache.org/repos/asf/ignite/blob/5eb871e1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQueryTxSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQueryTxSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQueryTxSelfTest.java
index e59deed..ca80b13 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQueryTxSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQueryTxSelfTest.java
@@ -17,6 +17,11 @@
package org.apache.ignite.internal.processors.cache.query;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.Cache;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteTransactions;
import org.apache.ignite.cache.CacheAtomicityMode;
@@ -37,61 +42,64 @@ import org.apache.ignite.transactions.TransactionIsolation;
import org.apache.ignite.transactions.TransactionState;
import org.jetbrains.annotations.Nullable;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.concurrent.Callable;
-import java.util.concurrent.atomic.AtomicInteger;
-import javax.cache.Cache;
-
/**
* Indexing Spi transactional query test
*/
public class IndexingSpiQueryTxSelfTest extends GridCacheAbstractSelfTest {
- /** */
- private static AtomicInteger cnt;
-
/** {@inheritDoc} */
@Override protected int gridCount() {
return 4;
}
/** {@inheritDoc} */
- @Override protected void beforeTestsStarted() throws Exception {
- cnt = new AtomicInteger();
-
- super.beforeTestsStarted();
- }
-
- /** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
((TcpDiscoverySpi)cfg.getDiscoverySpi()).setForceServerMode(true);
- if (cnt.getAndIncrement() == 0)
- cfg.setClientMode(true);
- else {
- cfg.setIndexingSpi(new MyBrokenIndexingSpi());
+ cfg.setClientMode("client".equals(igniteInstanceName));
+ cfg.setIndexingSpi(new MyBrokenIndexingSpi());
- CacheConfiguration ccfg = cacheConfiguration(igniteInstanceName);
- ccfg.setName("test-cache");
- ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+ CacheConfiguration ccfg = cacheConfiguration(igniteInstanceName);
+ ccfg.setName(DEFAULT_CACHE_NAME);
+ ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
- ccfg.setIndexedTypes(Integer.class, Integer.class);
+ ccfg.setIndexedTypes(Integer.class, Integer.class);
+
+ cfg.setCacheConfiguration(ccfg);
- cfg.setCacheConfiguration(ccfg);
- }
return cfg;
}
+ /** */
+ public void testIndexingSpiWithTxClient() throws Exception {
+ IgniteEx client = startGrid("client");
+
+ assertNotNull(client.cache(DEFAULT_CACHE_NAME));
+
+ doTestIndexingSpiWithTx(client, 0);
+ }
+
+ /** */
+ public void testIndexingSpiWithTxLocal() throws Exception {
+ IgniteEx ignite = (IgniteEx)primaryNode(0, DEFAULT_CACHE_NAME);
+
+ doTestIndexingSpiWithTx(ignite, 0);
+ }
+
+ /** */
+ public void testIndexingSpiWithTxNotLocal() throws Exception {
+ IgniteEx ignite = (IgniteEx)primaryNode(0, DEFAULT_CACHE_NAME);
+
+ doTestIndexingSpiWithTx(ignite, 1);
+ }
+
/**
* @throws Exception If failed.
*/
@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
- public void testIndexingSpiWithTx() throws Exception {
- IgniteEx ignite = grid(0);
-
- final IgniteCache<Integer, Integer> cache = ignite.cache("test-cache");
+ private void doTestIndexingSpiWithTx(IgniteEx ignite, int key) throws Exception {
+ final IgniteCache<Integer, Integer> cache = ignite.cache(DEFAULT_CACHE_NAME);
final IgniteTransactions txs = ignite.transactions();
@@ -104,7 +112,7 @@ public class IndexingSpiQueryTxSelfTest extends GridCacheAbstractSelfTest {
Transaction tx;
try (Transaction tx0 = tx = txs.txStart(concurrency, isolation)) {
- cache.put(1, 1);
+ cache.put(key, key);
tx0.commit();
}
@@ -114,6 +122,8 @@ public class IndexingSpiQueryTxSelfTest extends GridCacheAbstractSelfTest {
return null;
}
}, IgniteTxHeuristicCheckedException.class);
+
+ checkFutures();
}
}
}
@@ -135,7 +145,7 @@ public class IndexingSpiQueryTxSelfTest extends GridCacheAbstractSelfTest {
/** {@inheritDoc} */
@Override public Iterator<Cache.Entry<?, ?>> query(@Nullable String cacheName, Collection<Object> params,
@Nullable IndexingQueryFilter filters) throws IgniteSpiException {
- return null;
+ return null;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/5eb871e1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/AbstractTransactionIntergrityTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/AbstractTransactionIntergrityTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/AbstractTransactionIntergrityTest.java
index fe27e6e..01db747 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/AbstractTransactionIntergrityTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/AbstractTransactionIntergrityTest.java
@@ -33,6 +33,7 @@ import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
import org.apache.ignite.cluster.ClusterNode;
@@ -40,10 +41,10 @@ import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.failure.FailureHandler;
import org.apache.ignite.failure.StopNodeFailureHandler;
import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
@@ -73,7 +74,7 @@ public class AbstractTransactionIntergrityTest extends GridCommonAbstractTest {
private static final int DFLT_ACCOUNTS_CNT = 32;
/** Count of threads and caches. */
- private static final int DFLT_TX_THREADS_CNT = 20;
+ private static final int DFLT_TX_THREADS_CNT = Runtime.getRuntime().availableProcessors();
/** Count of nodes to start. */
private static final int DFLT_NODES_CNT = 3;
@@ -126,16 +127,6 @@ public class AbstractTransactionIntergrityTest extends GridCommonAbstractTest {
return true;
}
- /**
- * @return Flag enables cross-node transactions,
- * when primary partitions participating in transaction spreaded across several cluster nodes.
- */
- protected boolean crossNodeTransactions() {
- // Commit error during cross node transactions breaks transaction integrity
- // TODO: https://issues.apache.org/jira/browse/IGNITE-9086
- return false;
- }
-
/** {@inheritDoc} */
@Override protected FailureHandler getFailureHandler(String igniteInstanceName) {
return new StopNodeFailureHandler();
@@ -148,14 +139,15 @@ public class AbstractTransactionIntergrityTest extends GridCommonAbstractTest {
cfg.setConsistentId(name);
((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
- cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
- cfg.setLocalHost("127.0.0.1");
cfg.setDataStorageConfiguration(new DataStorageConfiguration()
.setDefaultDataRegionConfiguration(new DataRegionConfiguration()
- .setMaxSize(256 * 1024 * 1024)
- .setPersistenceEnabled(persistent()))
- );
+ .setPersistenceEnabled(persistent())
+ .setMaxSize(50 * 1024 * 1024)
+ )
+ .setWalSegmentSize(16 * 1024 * 1024)
+ .setPageSize(1024)
+ .setWalMode(WALMode.LOG_ONLY));
CacheConfiguration[] cacheConfigurations = new CacheConfiguration[txThreadsCount()];
@@ -178,6 +170,8 @@ public class AbstractTransactionIntergrityTest extends GridCommonAbstractTest {
cfg.setCacheConfiguration(cacheConfigurations);
+ cfg.setFailureDetectionTimeout(30_000);
+
return cfg;
}
@@ -219,8 +213,11 @@ public class AbstractTransactionIntergrityTest extends GridCommonAbstractTest {
/**
* Test transfer amount.
+ *
+ * @param failoverScenario Scenario.
+ * @param colocatedAccounts {@code True} to use colocated on same primary node accounts.
*/
- public void doTestTransferAmount(FailoverScenario failoverScenario) throws Exception {
+ public void doTestTransferAmount(FailoverScenario failoverScenario, boolean colocatedAccounts) throws Exception {
failoverScenario.beforeNodesStarted();
//given: started some nodes with client.
@@ -230,26 +227,26 @@ public class AbstractTransactionIntergrityTest extends GridCommonAbstractTest {
igniteClient.cluster().active(true);
- int[] initAmount = new int[txThreadsCount()];
+ int[] initAmounts = new int[txThreadsCount()];
completedTxs = new ConcurrentLinkedHashMap[txThreadsCount()];
//and: fill all accounts on all caches and calculate total amount for every cache.
for (int cachePrefixIdx = 0; cachePrefixIdx < txThreadsCount(); cachePrefixIdx++) {
IgniteCache<Integer, AccountState> cache = igniteClient.getOrCreateCache(cacheName(cachePrefixIdx));
- AtomicInteger coinsCounter = new AtomicInteger();
+ AtomicInteger coinsCntr = new AtomicInteger();
try (Transaction tx = igniteClient.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
for (int accountId = 0; accountId < accountsCount(); accountId++) {
- Set<Integer> initialAmount = generateCoins(coinsCounter, 5);
+ Set<Integer> initAmount = generateCoins(coinsCntr, 5);
- cache.put(accountId, new AccountState(accountId, tx.xid(), initialAmount));
+ cache.put(accountId, new AccountState(accountId, tx.xid(), initAmount));
}
tx.commit();
}
- initAmount[cachePrefixIdx] = coinsCounter.get();
+ initAmounts[cachePrefixIdx] = coinsCntr.get();
completedTxs[cachePrefixIdx] = new ConcurrentLinkedHashMap();
}
@@ -259,7 +256,8 @@ public class AbstractTransactionIntergrityTest extends GridCommonAbstractTest {
ArrayList<Thread> transferThreads = new ArrayList<>();
for (int i = 0; i < txThreadsCount(); i++) {
- transferThreads.add(new TransferAmountTxThread(firstTransactionDone, igniteClient, cacheName(i), i));
+ transferThreads.add(new TransferAmountTxThread(firstTransactionDone,
+ igniteClient, cacheName(i), i, colocatedAccounts));
transferThreads.get(i).start();
}
@@ -268,13 +266,12 @@ public class AbstractTransactionIntergrityTest extends GridCommonAbstractTest {
failoverScenario.afterFirstTransaction();
- for (Thread thread : transferThreads) {
+ for (Thread thread : transferThreads)
thread.join();
- }
failoverScenario.afterTransactionsFinished();
- consistencyCheck(initAmount);
+ consistencyCheck(initAmounts);
}
/**
@@ -385,11 +382,11 @@ public class AbstractTransactionIntergrityTest extends GridCommonAbstractTest {
/**
* @param txId Transaction id.
- * @param coinsToRemove Coins to remove from current account.
+ * @param coinsToRmv Coins to remove from current account.
* @return Account state with removed coins.
*/
- public AccountState removeCoins(IgniteUuid txId, Set<Integer> coinsToRemove) {
- return new AccountState(accId, txId, Sets.difference(coins, coinsToRemove).immutableCopy());
+ public AccountState removeCoins(IgniteUuid txId, Set<Integer> coinsToRmv) {
+ return new AccountState(accId, txId, Sets.difference(coins, coinsToRmv).immutableCopy());
}
/** {@inheritDoc} */
@@ -418,11 +415,11 @@ public class AbstractTransactionIntergrityTest extends GridCommonAbstractTest {
/**
* @param coinsNum Coins number.
*/
- private Set<Integer> generateCoins(AtomicInteger coinsCounter, int coinsNum) {
+ private Set<Integer> generateCoins(AtomicInteger coinsCntr, int coinsNum) {
Set<Integer> res = new HashSet<>();
for (int i = 0; i < coinsNum; i++)
- res.add(coinsCounter.incrementAndGet());
+ res.add(coinsCntr.incrementAndGet());
return res;
}
@@ -479,23 +476,35 @@ public class AbstractTransactionIntergrityTest extends GridCommonAbstractTest {
private class TransferAmountTxThread extends Thread {
/** */
private CountDownLatch firstTransactionLatch;
+
/** */
private IgniteEx ignite;
+
/** */
private String cacheName;
+
/** */
- private int txIndex;
+ private int workerIdx;
+
/** */
private Random random = new Random();
+ /** */
+ private final boolean colocatedAccounts;
+
/**
* @param ignite Ignite.
*/
- private TransferAmountTxThread(CountDownLatch firstTransactionLatch, final IgniteEx ignite, String cacheName, int txIndex) {
+ private TransferAmountTxThread(CountDownLatch firstTransactionLatch,
+ final IgniteEx ignite,
+ String cacheName,
+ int workerIdx,
+ boolean colocatedAccounts) {
this.firstTransactionLatch = firstTransactionLatch;
this.ignite = ignite;
this.cacheName = cacheName;
- this.txIndex = txIndex;
+ this.workerIdx = workerIdx;
+ this.colocatedAccounts = colocatedAccounts;
}
/** {@inheritDoc} */
@@ -514,7 +523,6 @@ public class AbstractTransactionIntergrityTest extends GridCommonAbstractTest {
/**
* @throws IgniteException if fails
*/
- @SuppressWarnings("unchecked")
private void updateInTransaction(IgniteCache<Integer, AccountState> cache) throws IgniteException {
int accIdFrom;
int accIdTo;
@@ -526,11 +534,16 @@ public class AbstractTransactionIntergrityTest extends GridCommonAbstractTest {
if (accIdFrom == accIdTo)
continue;
- ClusterNode primaryForAccFrom = ignite.cachex(cacheName).affinity().mapKeyToNode(accIdFrom);
- ClusterNode primaryForAccTo = ignite.cachex(cacheName).affinity().mapKeyToNode(accIdTo);
+ Affinity<Object> affinity = ignite.affinity(cacheName);
+
+ ClusterNode primaryForAccFrom = affinity.mapKeyToNode(accIdFrom);
+ assertNotNull(primaryForAccFrom);
+
+ ClusterNode primaryForAccTo = affinity.mapKeyToNode(accIdTo);
+ assertNotNull(primaryForAccTo);
// Allows only transaction between accounts that primary on the same node if corresponding flag is enabled.
- if (!crossNodeTransactions() && !primaryForAccFrom.id().equals(primaryForAccTo.id()))
+ if (colocatedAccounts && !primaryForAccFrom.id().equals(primaryForAccTo.id()))
continue;
break;
@@ -541,7 +554,10 @@ public class AbstractTransactionIntergrityTest extends GridCommonAbstractTest {
try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
acctFrom = cache.get(accIdFrom);
+ assertNotNull(acctFrom);
+
acctTo = cache.get(accIdTo);
+ assertNotNull(acctTo);
Set<Integer> coinsToTransfer = acctFrom.coinsToTransfer(random);
@@ -553,23 +569,8 @@ public class AbstractTransactionIntergrityTest extends GridCommonAbstractTest {
tx.commit();
- completedTxs[txIndex].put(tx.xid(), new TxState(acctFrom, acctTo, nextFrom, nextTo, coinsToTransfer));
- }
- }
-
- /**
- * @param curr current
- * @return random value
- */
- private long getNextAccountId(long curr) {
- long randomVal;
-
- do {
- randomVal = random.nextInt(accountsCount());
+ completedTxs[workerIdx].put(tx.xid(), new TxState(acctFrom, acctTo, nextFrom, nextTo, coinsToTransfer));
}
- while (curr == randomVal);
-
- return randomVal;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5eb871e1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TransactionIntegrityWithPrimaryIndexCorruptionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TransactionIntegrityWithPrimaryIndexCorruptionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TransactionIntegrityWithPrimaryIndexCorruptionTest.java
index 3260607..473eaf5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TransactionIntegrityWithPrimaryIndexCorruptionTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TransactionIntegrityWithPrimaryIndexCorruptionTest.java
@@ -17,20 +17,26 @@
package org.apache.ignite.internal.processors.cache.transactions;
+import java.util.Collection;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.BiFunction;
+import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteIllegalStateException;
import org.apache.ignite.Ignition;
-import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.util.PageHandler;
import org.apache.ignite.internal.processors.cache.tree.SearchRow;
import org.apache.ignite.testframework.GridTestUtils;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
+
/**
* Test cases that check transaction data integrity after transaction commit failed.
*/
@@ -45,81 +51,96 @@ public class TransactionIntegrityWithPrimaryIndexCorruptionTest extends Abstract
super.afterTest();
}
- /** {@inheritDoc} */
- @Override protected long getTestTimeout() {
- return 60 * 1000L;
+ /** */
+ public void testPrimaryIndexCorruptionDuringCommitPrimaryColocatedThrowsError() throws Exception {
+ doTestTransferAmount0(true, true, () -> new AssertionError("Test"));
}
- /**
- * Throws a test {@link AssertionError} during tx commit from {@link BPlusTree} and checks after that data is consistent.
- */
- public void testPrimaryIndexCorruptionDuringCommitOnPrimaryNode1() throws Exception {
- doTestTransferAmount(new IndexCorruptionFailoverScenario(
- true,
- (hnd, tree) -> hnd instanceof BPlusTree.Search,
- failoverPredicate(true, () -> new AssertionError("Test")))
- );
+ /** */
+ public void testPrimaryIndexCorruptionDuringCommitPrimaryColocatedThrowsUnchecked() throws Exception {
+ doTestTransferAmount0(true, true, () -> new RuntimeException("Test"));
}
- /**
- * Throws a test {@link RuntimeException} during tx commit from {@link BPlusTree} and checks after that data is consistent.
- */
- public void testPrimaryIndexCorruptionDuringCommitOnPrimaryNode2() throws Exception {
- doTestTransferAmount(new IndexCorruptionFailoverScenario(
- true,
- (hnd, tree) -> hnd instanceof BPlusTree.Search,
- failoverPredicate(true, () -> new RuntimeException("Test")))
- );
+ /** */
+ public void testPrimaryIndexCorruptionDuringCommitPrimaryColocatedThrowsChecked() throws Exception {
+ doTestTransferAmount0(true, true, () -> new IgniteCheckedException("Test"));
}
- /**
- * Throws a test {@link AssertionError} during tx commit from {@link BPlusTree} and checks after that data is consistent.
- */
- public void testPrimaryIndexCorruptionDuringCommitOnBackupNode() throws Exception {
- doTestTransferAmount(new IndexCorruptionFailoverScenario(
- true,
- (hnd, tree) -> hnd instanceof BPlusTree.Search,
- failoverPredicate(false, () -> new AssertionError("Test")))
- );
+ /** */
+ public void testPrimaryIndexCorruptionDuringCommitPrimaryNonColocatedThrowsError() throws Exception {
+ doTestTransferAmount0(false, true, () -> new AssertionError("Test"));
}
- /**
- * Throws a test {@link IgniteCheckedException} during tx commit from {@link BPlusTree} and checks after that data is consistent.
- */
- public void testPrimaryIndexCorruptionDuringCommitOnPrimaryNode3() throws Exception {
- fail("https://issues.apache.org/jira/browse/IGNITE-9082");
+ /** */
+ public void testPrimaryIndexCorruptionDuringCommitPrimaryNonColocatedThrowsUnchecked() throws Exception {
+ doTestTransferAmount0(false, true, () -> new RuntimeException("Test"));
+ }
- doTestTransferAmount(new IndexCorruptionFailoverScenario(
- false,
- (hnd, tree) -> hnd instanceof BPlusTree.Search,
- failoverPredicate(true, () -> new IgniteCheckedException("Test")))
- );
+ /** */
+ public void testPrimaryIndexCorruptionDuringCommitPrimaryNonColocatedThrowsChecked() throws Exception {
+ doTestTransferAmount0(false, true, () -> new IgniteCheckedException("Test"));
+ }
+
+ /** */
+ public void testPrimaryIndexCorruptionDuringCommitBackupColocatedThrowsError() throws Exception {
+ doTestTransferAmount0(true, false, () -> new AssertionError("Test"));
+ }
+
+ /** */
+ public void testPrimaryIndexCorruptionDuringCommitBackupColocatedThrowsUnchecked() throws Exception {
+ doTestTransferAmount0(true, false, () -> new RuntimeException("Test"));
+ }
+
+ /** */
+ public void testPrimaryIndexCorruptionDuringCommitBackupColocatedThrowsChecked() throws Exception {
+ doTestTransferAmount0(true, false, () -> new IgniteCheckedException("Test"));
+ }
+
+ /** */
+ public void testPrimaryIndexCorruptionDuringCommitBackupNonColocatedThrowsError() throws Exception {
+ doTestTransferAmount0(false, false, () -> new AssertionError("Test"));
+ }
+
+ /** */
+ public void testPrimaryIndexCorruptionDuringCommitBackupNonColocatedThrowsUnchecked() throws Exception {
+ doTestTransferAmount0(false, false, () -> new RuntimeException("Test"));
+ }
+
+ /** */
+ public void testPrimaryIndexCorruptionDuringCommitBackupNonColocatedThrowsChecked() throws Exception {
+ doTestTransferAmount0(false, false, () -> new IgniteCheckedException("Test"));
}
/**
* Creates failover predicate which generates error during transaction commmit.
*
- * @param failOnPrimary If {@code true} index should be failed on transaction primary node.
+ * @param failOnPrimary If {@code true} index should be failed on transaction primary node, otherwise on backup.
* @param errorSupplier Supplier to create various errors.
+ * @param errorConsumer Consumer to track unexpected errors while committing.
*/
private BiFunction<IgniteEx, SearchRow, Throwable> failoverPredicate(
boolean failOnPrimary,
- Supplier<Throwable> errorSupplier
+ Supplier<Throwable> errorSupplier,
+ Consumer<Throwable> errorConsumer
) {
return (ignite, row) -> {
- int cacheId = row.cacheId();
- int partId = row.key().partition();
-
- final ClusterNode locNode = ignite.localNode();
- final AffinityTopologyVersion curTopVer = ignite.context().discovery().topologyVersionEx();
-
- // Throw exception if current node is primary for given row.
- return ignite.cachesx(c -> c.context().cacheId() == cacheId)
- .stream()
- .filter(c -> c.context().affinity().primaryByPartition(locNode, partId, curTopVer) == failOnPrimary)
- .map(c -> errorSupplier.get())
- .findFirst()
- .orElse(null);
+ try {
+ int cacheId = row.cacheId();
+ int partId = row.key().partition();
+
+ GridDhtPartitionTopology top = ignite.context().cache().cacheGroup(cacheId).topology();
+
+ GridDhtLocalPartition part = top.localPartition(partId);
+
+ assertTrue("Illegal partition state for mapped tx: " + part, part != null && part.state() == OWNING);
+
+ return part.primary(top.readyTopologyVersion()) == failOnPrimary ? errorSupplier.get() : null;
+ }
+ catch (Throwable e) {
+ errorConsumer.accept(e);
+
+ throw e;
+ }
};
}
@@ -130,68 +151,68 @@ public class TransactionIntegrityWithPrimaryIndexCorruptionTest extends Abstract
/** Failed node index. */
static final int failedNodeIdx = 1;
- /** Is node stopping expected after failover. */
- private final boolean nodeStoppingExpected;
-
- /** Predicate that will choose an instance of {@link BPlusTree} and page operation
- * to make further failover in this tree using {@link #failoverPredicate}. */
- private final BiFunction<PageHandler, BPlusTree, Boolean> treeCorruptionPredicate;
+ /**
+ * Predicate that will choose an instance of {@link BPlusTree} and page operation to make further failover in
+ * this tree using {@link #failoverPred}.
+ */
+ private final BiFunction<PageHandler, BPlusTree, Boolean> treeCorruptionPred;
/** Function that may return error during row insertion into {@link BPlusTree}. */
- private final BiFunction<IgniteEx, SearchRow, Throwable> failoverPredicate;
+ private final BiFunction<IgniteEx, SearchRow, Throwable> failoverPred;
/**
- * @param nodeStoppingExpected Node stopping expected.
- * @param treeCorruptionPredicate Tree corruption predicate.
- * @param failoverPredicate Failover predicate.
+ * @param treeCorruptionPred Tree corruption predicate.
+ * @param failoverPred Failover predicate.
*/
IndexCorruptionFailoverScenario(
- boolean nodeStoppingExpected,
- BiFunction<PageHandler, BPlusTree, Boolean> treeCorruptionPredicate,
- BiFunction<IgniteEx, SearchRow, Throwable> failoverPredicate
+ BiFunction<PageHandler, BPlusTree, Boolean> treeCorruptionPred,
+ BiFunction<IgniteEx, SearchRow, Throwable> failoverPred
) {
- this.nodeStoppingExpected = nodeStoppingExpected;
- this.treeCorruptionPredicate = treeCorruptionPredicate;
- this.failoverPredicate = failoverPredicate;
+ this.treeCorruptionPred = treeCorruptionPred;
+ this.failoverPred = failoverPred;
}
/** {@inheritDoc} */
@Override public void beforeNodesStarted() {
BPlusTree.pageHndWrapper = (tree, hnd) -> {
- final IgniteEx locIgnite = (IgniteEx) Ignition.localIgnite();
+ final IgniteEx locIgnite = (IgniteEx)Ignition.localIgnite();
- if (!locIgnite.name().endsWith(String.valueOf(failedNodeIdx)))
+ if (getTestIgniteInstanceIndex(locIgnite.name()) != failedNodeIdx)
return hnd;
- if (treeCorruptionPredicate.apply(hnd, tree)) {
- log.info("Created corrupted tree handler for -> " + hnd + " " + tree);
+ if (treeCorruptionPred.apply(hnd, tree)) {
+ log.info("Created corrupted tree handler [nodeOrder=" + locIgnite.localNode().order() + ", hnd=" + hnd +
+ ", tree=" + tree + ']');
- PageHandler<Object, BPlusTree.Result> delegate = (PageHandler<Object, BPlusTree.Result>) hnd;
+ PageHandler<Object, BPlusTree.Result> delegate = (PageHandler<Object, BPlusTree.Result>)hnd;
return new PageHandler<BPlusTree.Get, BPlusTree.Result>() {
- @Override public BPlusTree.Result run(int cacheId, long pageId, long page, long pageAddr, PageIO io, Boolean walPlc, BPlusTree.Get arg, int lvl) throws IgniteCheckedException {
- log.info("Invoked " + " " + cacheId + " " + arg.toString() + " for BTree (" + corruptionEnabled + ") -> " + arg.row() + " / " + arg.row().getClass());
+ @Override public BPlusTree.Result run(int cacheId, long pageId, long page, long pageAddr, PageIO io,
+ Boolean walPlc, BPlusTree.Get arg, int lvl) throws IgniteCheckedException {
+ log.info("Invoked [cachedId=" + cacheId + ", hnd=" + arg.toString() +
+ ", corruption=" + corruptionEnabled + ", row=" + arg.row() + ", rowCls=" + arg.row().getClass() + ']');
if (corruptionEnabled && (arg.row() instanceof SearchRow)) {
- SearchRow row = (SearchRow) arg.row();
+ SearchRow row = (SearchRow)arg.row();
// Store cacheId to search row explicitly, as it can be zero if there is one cache in a group.
- Throwable res = failoverPredicate.apply(locIgnite, new SearchRow(cacheId, row.key()));
+ Throwable res = failoverPred.apply(locIgnite, new SearchRow(cacheId, row.key()));
if (res != null) {
if (res instanceof Error)
- throw (Error) res;
+ throw (Error)res;
else if (res instanceof RuntimeException)
- throw (RuntimeException) res;
+ throw (RuntimeException)res;
else if (res instanceof IgniteCheckedException)
- throw (IgniteCheckedException) res;
+ throw (IgniteCheckedException)res;
}
}
return delegate.run(cacheId, pageId, page, pageAddr, io, walPlc, arg, lvl);
}
- @Override public boolean releaseAfterWrite(int cacheId, long pageId, long page, long pageAddr, BPlusTree.Get g, int lvl) {
+ @Override public boolean releaseAfterWrite(int cacheId, long pageId, long page, long pageAddr,
+ BPlusTree.Get g, int lvl) {
return g.canRelease(pageId, lvl);
}
};
@@ -212,27 +233,68 @@ public class TransactionIntegrityWithPrimaryIndexCorruptionTest extends Abstract
// Disable index corruption.
BPlusTree.pageHndWrapper = (tree, hnd) -> hnd;
- if (nodeStoppingExpected) {
- // Wait until node with corrupted index will left cluster.
- GridTestUtils.waitForCondition(() -> {
- try {
- grid(failedNodeIdx);
- }
- catch (IgniteIllegalStateException e) {
- return true;
- }
+ // Wait until node with corrupted index will left cluster.
+ GridTestUtils.waitForCondition(() -> {
+ try {
+ grid(failedNodeIdx);
+ }
+ catch (IgniteIllegalStateException e) {
+ return true;
+ }
- return false;
- }, getTestTimeout());
+ return false;
+ }, getTestTimeout());
- // Failed node should be stopped.
- GridTestUtils.assertThrows(log, () -> grid(failedNodeIdx), IgniteIllegalStateException.class, "");
+ // Failed node should be stopped.
+ GridTestUtils.assertThrows(log, () -> grid(failedNodeIdx), IgniteIllegalStateException.class, null);
- // Re-start failed node.
- startGrid(failedNodeIdx);
+ // Re-start failed node.
+ startGrid(failedNodeIdx);
- awaitPartitionMapExchange();
- }
+ awaitPartitionMapExchange();
+ }
+ }
+
+ /**
+ * Test transfer amount with extended error recording.
+ *
+ * @param colocatedAccount Colocated account.
+ * @param failOnPrimary {@code True} if fail on primary, else on backup.
+ * @param supplier Fail reason supplier.
+ * @throws Exception If failover predicate execution is failed.
+ */
+ private void doTestTransferAmount0(boolean colocatedAccount, boolean failOnPrimary, Supplier<Throwable> supplier) throws Exception {
+ ErrorTracker errTracker = new ErrorTracker();
+
+ doTestTransferAmount(
+ new IndexCorruptionFailoverScenario(
+ (hnd, tree) -> hnd instanceof BPlusTree.Search,
+ failoverPredicate(failOnPrimary, supplier, errTracker)),
+ colocatedAccount
+ );
+
+ for (Throwable throwable : errTracker.errors())
+ log.error("Recorded error", throwable);
+
+ if (!errTracker.errors().isEmpty())
+ fail("Test run has error");
+ }
+
+ /** */
+ private static class ErrorTracker implements Consumer<Throwable> {
+ /** Queue. */
+ private final Queue<Throwable> q = new ConcurrentLinkedQueue<>();
+
+ /** {@inheritDoc} */
+ @Override public void accept(Throwable throwable) {
+ q.add(throwable);
+ }
+
+ /**
+ * @return Recorded errors.
+ */
+ public Collection<Throwable> errors() {
+ return q;
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5eb871e1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TransactionIntegrityWithSystemWorkerDeathTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TransactionIntegrityWithSystemWorkerDeathTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TransactionIntegrityWithSystemWorkerDeathTest.java
index 25aae4b..551335f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TransactionIntegrityWithSystemWorkerDeathTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TransactionIntegrityWithSystemWorkerDeathTest.java
@@ -41,9 +41,7 @@ public class TransactionIntegrityWithSystemWorkerDeathTest extends AbstractTrans
return false;
}
- /**
- *
- */
+ /** */
public void testFailoverWithDiscoWorkerTermination() throws Exception {
doTestTransferAmount(new FailoverScenario() {
static final int failedNodeIdx = 1;
@@ -83,7 +81,7 @@ public class TransactionIntegrityWithSystemWorkerDeathTest extends AbstractTrans
awaitPartitionMapExchange();
}
- });
+ }, true);
}
/**