You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ir...@apache.org on 2020/05/29 17:37:05 UTC
[ignite] branch master updated: IGNITE-13044 Additional possibility
to check for high contending keys generated by the transaction payload. -
Fixes #7824
This is an automated email from the ASF dual-hosted git repository.
irakov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 9479377 IGNITE-13044 Additional possibility to check for high contending keys generated by the transaction payload. - Fixes #7824
9479377 is described below
commit 947937733cc7da8d9dde81c38570aded6b57008a
Author: zstan <st...@gmail.com>
AuthorDate: Fri May 29 20:35:56 2020 +0300
IGNITE-13044 Additional possibility to check for high contending keys generated by the transaction payload. - Fixes #7824
Signed-off-by: Ivan Rakov <iv...@gmail.com>
---
.../org/apache/ignite/IgniteSystemProperties.java | 8 +
.../java/org/apache/ignite/cache/CacheMetrics.java | 12 +
.../org/apache/ignite/internal/IgniteFeatures.java | 10 +-
.../org/apache/ignite/internal/IgniteKernal.java | 43 ---
.../ignite/internal/TransactionsMXBeanImpl.java | 10 +
.../cache/CacheClusterMetricsMXBeanImpl.java | 5 +
.../cache/CacheLocalMetricsMXBeanImpl.java | 5 +
.../processors/cache/CacheMetricsImpl.java | 61 ++++
.../processors/cache/CacheMetricsSnapshot.java | 5 +
.../processors/cache/CacheMetricsSnapshotV2.java | 12 +
.../cache/LongOperationsDumpSettingsClosure.java | 8 +-
.../distributed/GridDistributedCacheEntry.java | 4 +
.../cache/transactions/IgniteTxManager.java | 291 +++++++++++++++---
.../TxCollisionsDumpSettingsClosure.java} | 33 +-
.../apache/ignite/internal/util/IgniteUtils.java | 30 ++
.../apache/ignite/mxbean/CacheMetricsMXBean.java | 17 +-
.../apache/ignite/mxbean/TransactionsMXBean.java | 20 ++
.../processors/cache/CacheMetricsManageTest.java | 342 ++++++++++++++++++++-
.../transactions/TxWithKeyContentionSelfTest.java | 338 ++++++++++++++++++++
.../TxWithSmallTimeoutAndContentionOneKeyTest.java | 9 +
.../platform/PlatformCacheWriteMetricsTask.java | 5 +
.../ignite/testsuites/IgniteCacheTestSuite7.java | 3 +
22 files changed, 1156 insertions(+), 115 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index ee5b2d1..b362a36 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -1319,6 +1319,14 @@ public final class IgniteSystemProperties {
public static final String IGNITE_TCP_COMM_SET_ATTR_HOST_NAMES = "IGNITE_TCP_COMM_SET_ATTR_HOST_NAMES";
/**
+ * When above zero, prints tx key collisions once per interval.
+ * Each transaction besides OPTIMISTIC SERIALIZABLE capture locks on all enlisted keys, for some reasons
+ * per key lock queue may rise. This property sets the interval during which statistics are collected.
+ * Default is 1000 ms.
+ */
+ public static final String IGNITE_DUMP_TX_COLLISIONS_INTERVAL = "IGNITE_DUMP_TX_COLLISIONS_INTERVAL";
+
+ /**
* Enforces singleton.
*/
private IgniteSystemProperties() {
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java b/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java
index 1921dbb..09bf550 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java
@@ -21,6 +21,9 @@ import javax.cache.Cache;
import javax.cache.integration.CacheLoader;
import javax.cache.integration.CacheWriter;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.mxbean.TransactionsMXBean;
+import org.jetbrains.annotations.NotNull;
/**
* Cache metrics used to obtain statistics on cache itself.
@@ -708,4 +711,13 @@ public interface CacheMetrics {
* @return {@code true} when cache topology is valid for writing.
*/
public boolean isValidForWriting();
+
+ /**
+ * Checks if there were any tx key collisions last time.
+ * Interval for check specified throught: {@link IgniteSystemProperties#IGNITE_DUMP_TX_COLLISIONS_INTERVAL} or
+ * {@link TransactionsMXBean#setTxKeyCollisionsInterval(int)}.
+ *
+ * @return Key collisions and appropriate queue size string representation.
+ */
+ @NotNull public String getTxKeyCollisions();
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java
index 0bba3f2..904d93e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java
@@ -98,8 +98,14 @@ public enum IgniteFeatures {
/** Persistence caches can be snapshot. */
PERSISTENCE_CACHE_SNAPSHOT(23),
- /** Long operations dump timeout. */
- LONG_OPERATIONS_DUMP_TIMEOUT(30);
+ /** Distributed change timeout for dump long operations. */
+ DISTRIBUTED_CHANGE_LONG_OPERATIONS_DUMP_TIMEOUT(30),
+
+ /** Check secondary indexes inline size on join/by control utility request. */
+ CHECK_INDEX_INLINE_SIZES(36),
+
+ /** Distributed propagation of tx collisions dump interval. */
+ DISTRIBUTED_TX_COLLISIONS_DUMP(37);
/**
* Unique feature identifier.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 7138214..a0a9311 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -234,7 +234,6 @@ import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor;
import org.jetbrains.annotations.Nullable;
-import static java.util.Objects.nonNull;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_BINARY_MARSHALLER_USE_STRING_SERIALIZATION_VER_2;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_CONFIG_URL;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_DAEMON;
@@ -415,13 +414,6 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
@GridToStringExclude
private GridTimeoutProcessor.CancelableTask metricsLogTask;
- /**
- * The instance of scheduled long operation checker. {@code null} means that the operations checker is disabled
- * by the value of {@link IgniteSystemProperties#IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT} system property.
- */
- @GridToStringExclude
- private GridTimeoutProcessor.CancelableTask longOpDumpTask;
-
/** {@code true} if an error occurs at Ignite instance stop. */
@GridToStringExclude
private boolean errOnStop;
@@ -1525,8 +1517,6 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
}, metricsLogFreq, metricsLogFreq);
}
- scheduleLongOperationsDumpTask(ctx.cache().context().tm().longOperationsDumpTimeout());
-
ctx.performance().add("Disable assertions (remove '-ea' from JVM options)", !U.assertionsEnabled());
ctx.performance().logSuggestions(log, igniteInstanceName);
@@ -1551,36 +1541,6 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
}
/**
- * Scheduling tasks for dumping long operations. Closes current task
- * (if any) and if the {@code longOpDumpTimeout > 0} schedules a new task
- * with a new timeout, delay and start period equal to
- * {@code longOpDumpTimeout}, otherwise task is deleted.
- *
- * @param longOpDumpTimeout Long operations dump timeout.
- */
- public void scheduleLongOperationsDumpTask(long longOpDumpTimeout) {
- if (isStopping())
- return;
-
- synchronized (this) {
- GridTimeoutProcessor.CancelableTask task = longOpDumpTask;
-
- if (nonNull(task))
- task.close();
-
- if (longOpDumpTimeout > 0) {
- longOpDumpTask = ctx.timeout().schedule(
- () -> ctx.cache().context().exchange().dumpLongRunningOperations(longOpDumpTimeout),
- longOpDumpTimeout,
- longOpDumpTimeout
- );
- }
- else
- longOpDumpTask = null;
- }
- }
-
- /**
* @return Ignite security processor. See {@link IgniteSecurity} for details.
*/
private GridProcessor securityProcessor() throws IgniteCheckedException {
@@ -2639,9 +2599,6 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
if (metricsLogTask != null)
metricsLogTask.close();
- if (longOpDumpTask != null)
- longOpDumpTask.close();
-
if (longJVMPauseDetector != null)
longJVMPauseDetector.stop();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/TransactionsMXBeanImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/TransactionsMXBeanImpl.java
index 5ca0a90..2e40c52 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/TransactionsMXBeanImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/TransactionsMXBeanImpl.java
@@ -194,6 +194,16 @@ public class TransactionsMXBeanImpl implements TransactionsMXBean {
}
/** {@inheritDoc} */
+ @Override public void setTxKeyCollisionsInterval(int timeout) {
+ ctx.cache().context().tm().collisionsDumpIntervalDistributed(timeout);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getTxKeyCollisionsInterval() {
+ return ctx.cache().context().tm().collisionsDumpInterval();
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(TransactionsMXBeanImpl.class, this);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClusterMetricsMXBeanImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClusterMetricsMXBeanImpl.java
index cbd0b57..9403e5c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClusterMetricsMXBeanImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClusterMetricsMXBeanImpl.java
@@ -495,6 +495,11 @@ class CacheClusterMetricsMXBeanImpl implements CacheMetricsMXBean {
}
/** {@inheritDoc} */
+ @Override public String getTxKeyCollisions() {
+ return cache.clusterMetrics().getTxKeyCollisions();
+ }
+
+ /** {@inheritDoc} */
@Override public void enableStatistics() {
try {
cache.context().shared().cache().enableStatistics(Collections.singleton(cache.name()), true);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLocalMetricsMXBeanImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLocalMetricsMXBeanImpl.java
index fe67968..89c7c04 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLocalMetricsMXBeanImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLocalMetricsMXBeanImpl.java
@@ -511,4 +511,9 @@ class CacheLocalMetricsMXBeanImpl implements CacheMetricsMXBean {
throw new RuntimeException(e.getMessage());
}
}
+
+ /** {@inheritDoc} */
+ @Override public String getTxKeyCollisions() {
+ return cache.metrics0().getTxKeyCollisions();
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
index 23a8629..c120993 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
@@ -35,8 +35,15 @@ import org.apache.ignite.internal.processors.metric.impl.MetricUtils;
import org.apache.ignite.internal.util.collection.ImmutableIntSet;
import org.apache.ignite.internal.util.collection.IntSet;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.SB;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.List;
+import java.util.Map;
+import java.util.function.Supplier;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
@@ -203,6 +210,9 @@ public class CacheMetricsImpl implements CacheMetrics {
/** Write-behind store, if configured. */
private GridCacheWriteBehindStore store;
+ /** Tx collisions info. */
+ private volatile Supplier<List<Map.Entry</* Colliding keys. */ GridCacheMapEntry, /* Collisions queue size. */ Integer>>> txKeyCollisionInfo;
+
/**
* Creates cache metrics.
*
@@ -355,6 +365,10 @@ public class CacheMetricsImpl implements CacheMetrics {
commitTime = mreg.histogram("CommitTime", HISTOGRAM_BUCKETS, "Commit time in nanoseconds.");
rollbackTime = mreg.histogram("RollbackTime", HISTOGRAM_BUCKETS, "Rollback time in nanoseconds.");
+
+ mreg.register("TxKeyCollisions", this::getTxKeyCollisions, String.class, "Tx key collisions. " +
+ "Show keys and collisions queue size. Due transactional payload some keys become hot. Metric shows " +
+ "corresponding keys.");
}
/**
@@ -668,6 +682,8 @@ public class CacheMetricsImpl implements CacheMetrics {
if (delegate != null)
delegate.clear();
+
+ txKeyCollisionInfo = null;
}
/** {@inheritDoc} */
@@ -852,6 +868,51 @@ public class CacheMetricsImpl implements CacheMetrics {
delegate.onRead(isHit);
}
+ /** Set callback for tx key collisions detection.
+ *
+ * @param coll Key collisions info holder.
+ */
+ public void keyCollisionsInfo(Supplier<List<Map.Entry</* Colliding keys. */ GridCacheMapEntry, /* Collisions queue size. */ Integer>>> coll) {
+ txKeyCollisionInfo = coll;
+
+ if (delegate != null)
+ delegate.keyCollisionsInfo(coll);
+ }
+
+ /** Callback representing current key collisions state.
+ *
+ * @return Key collisions info holder.
+ */
+ public @Nullable Supplier<List<Map.Entry<GridCacheMapEntry, Integer>>> keyCollisionsInfo() {
+ return txKeyCollisionInfo;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getTxKeyCollisions() {
+ SB sb = null;
+
+ Supplier<List<Map.Entry<GridCacheMapEntry, Integer>>> collInfo = keyCollisionsInfo();
+
+ if (collInfo != null) {
+ List<Map.Entry<GridCacheMapEntry, Integer>> result = collInfo.get();
+
+ if (!F.isEmpty(result)) {
+ sb = new SB();
+
+ for (Map.Entry<GridCacheMapEntry, Integer> info : result) {
+ if (sb.length() > 0)
+ sb.a(U.nl());
+ sb.a("key=");
+ sb.a(info.getKey().key());
+ sb.a(", queueSize=");
+ sb.a(info.getValue());
+ }
+ }
+ }
+
+ return sb != null ? sb.toString() : "";
+ }
+
/**
* Cache invocations caused update callback.
*
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java
index f6357a8..e4809db 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java
@@ -1020,6 +1020,11 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
return isValidForWriting;
}
+ /** No need in snapshoting this metric, only local metric would be acceptable. */
+ @Override public String getTxKeyCollisions() {
+ return "";
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(CacheMetricsSnapshot.class, this);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshotV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshotV2.java
index c3a223d..d1dd6b8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshotV2.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshotV2.java
@@ -24,6 +24,7 @@ import java.util.Collection;
import org.apache.ignite.cache.CacheMetrics;
import org.apache.ignite.internal.dto.IgniteDataTransferObject;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
/**
* Metrics snapshot.
@@ -319,6 +320,9 @@ public class CacheMetricsSnapshotV2 extends IgniteDataTransferObject implements
*/
private boolean isValidForWriting;
+ /** Tx key collisions with appropriate queue size string representation. */
+ private String txKeyCollisions;
+
/**
* Default constructor.
*/
@@ -428,6 +432,7 @@ public class CacheMetricsSnapshotV2 extends IgniteDataTransferObject implements
rebalanceStartTime = m.rebalancingStartTime();
rebalanceFinishTime = m.estimateRebalancingFinishTime();
rebalanceClearingPartitionsLeft = m.getRebalanceClearingPartitionsLeft();
+ txKeyCollisions = m.getTxKeyCollisions();
}
/**
@@ -1044,6 +1049,11 @@ public class CacheMetricsSnapshotV2 extends IgniteDataTransferObject implements
}
/** {@inheritDoc} */
+ @Override public String getTxKeyCollisions() {
+ return txKeyCollisions;
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(CacheMetricsSnapshotV2.class, this);
}
@@ -1124,6 +1134,7 @@ public class CacheMetricsSnapshotV2 extends IgniteDataTransferObject implements
out.writeBoolean(isEmpty);
out.writeInt(size);
out.writeInt(keySize);
+ U.writeLongString(out, txKeyCollisions);
}
/** {@inheritDoc} */
@@ -1202,5 +1213,6 @@ public class CacheMetricsSnapshotV2 extends IgniteDataTransferObject implements
isEmpty = in.readBoolean();
size = in.readInt();
keySize = in.readInt();
+ txKeyCollisions = U.readLongString(in);
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/LongOperationsDumpSettingsClosure.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/LongOperationsDumpSettingsClosure.java
index edd6ebc..ed1c418 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/LongOperationsDumpSettingsClosure.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/LongOperationsDumpSettingsClosure.java
@@ -17,9 +17,7 @@
package org.apache.ignite.internal.processors.cache;
-import org.apache.ignite.Ignite;
import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.resources.IgniteInstanceResource;
@@ -37,7 +35,7 @@ public class LongOperationsDumpSettingsClosure implements IgniteRunnable {
/** Auto-inject Ignite instance. */
@IgniteInstanceResource
- private Ignite ignite;
+ private IgniteEx ignite;
/**
* Constructor.
@@ -50,8 +48,6 @@ public class LongOperationsDumpSettingsClosure implements IgniteRunnable {
/** {@inheritDoc} */
@Override public void run() {
- ((IgniteEx)ignite).context().cache().context().tm().longOperationsDumpTimeout(longOpsDumpTimeout);
-
- ((IgniteKernal)ignite).scheduleLongOperationsDumpTask(longOpsDumpTimeout);
+ ignite.context().cache().context().tm().longOperationsDumpTimeout(longOpsDumpTimeout);
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
index 3f0300f..7a8bab4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
@@ -312,6 +312,8 @@ public class GridDistributedCacheEntry extends GridCacheMapEntry {
CacheObject val;
+ cctx.tm().detectPossibleCollidingKeys(this);
+
lockEntry();
try {
@@ -372,6 +374,8 @@ public class GridDistributedCacheEntry extends GridCacheMapEntry {
CacheObject val;
+ cctx.tm().detectPossibleCollidingKeys(this);
+
lockEntry();
try {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 189328d..3219aa1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -21,7 +21,9 @@ import java.io.Externalizable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
@@ -29,15 +31,15 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteClientDisconnectedException;
-import org.apache.ignite.IgniteCompute;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.binary.BinaryObjectException;
-import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.TransactionConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
@@ -57,6 +59,7 @@ import org.apache.ignite.internal.pagemem.wal.record.MvccTxRecord;
import org.apache.ignite.internal.pagemem.wal.record.TxRecord;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObjectsReleaseFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
@@ -73,6 +76,7 @@ import org.apache.ignite.internal.processors.cache.TxOwnerDumpRequestAllowedSett
import org.apache.ignite.internal.processors.cache.TxTimeoutOnPartitionMapExchangeChangeMessage;
import org.apache.ignite.internal.processors.cache.distributed.GridCacheMappedVersion;
import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryFuture;
+import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocal;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxOnePhaseCommitAckRequest;
@@ -90,6 +94,7 @@ import org.apache.ignite.internal.processors.cache.transactions.TxDeadlockDetect
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cluster.BaselineTopology;
import org.apache.ignite.internal.processors.metric.impl.HitRateMetric;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
@@ -109,7 +114,6 @@ import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteOutClosure;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteReducer;
-import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.spi.systemview.view.TransactionView;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
@@ -118,8 +122,10 @@ import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentLinkedHashMap;
+import static java.util.Objects.nonNull;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_BUFFER_SIZE;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_DUMP_TX_COLLISIONS_INTERVAL;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_LONG_TRANSACTION_TIME_DUMP_THRESHOLD;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_MAX_COMPLETED_TX_COUNT;
@@ -136,9 +142,10 @@ import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.events.EventType.EVT_TX_STARTED;
import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE_COORDINATOR;
import static org.apache.ignite.internal.GridTopic.TOPIC_TX;
-import static org.apache.ignite.internal.IgniteFeatures.LONG_OPERATIONS_DUMP_TIMEOUT;
+import static org.apache.ignite.internal.IgniteFeatures.DISTRIBUTED_CHANGE_LONG_OPERATIONS_DUMP_TIMEOUT;
import static org.apache.ignite.internal.IgniteFeatures.LRT_SYSTEM_USER_TIME_DUMP_SETTINGS;
import static org.apache.ignite.internal.IgniteFeatures.TRANSACTION_OWNER_THREAD_DUMP_PROVIDING;
+import static org.apache.ignite.internal.IgniteFeatures.DISTRIBUTED_TX_COLLISIONS_DUMP;
import static org.apache.ignite.internal.IgniteKernal.DFLT_LONG_OPERATIONS_DUMP_TIMEOUT;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.READ;
@@ -147,6 +154,7 @@ import static org.apache.ignite.internal.processors.cache.transactions.IgniteInt
import static org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx.FinalizationStatus.USER_FINISH;
import static org.apache.ignite.internal.util.GridConcurrentFactory.newMap;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.internal.util.IgniteUtils.broadcastToNodesSupportingFeature;
import static org.apache.ignite.transactions.TransactionState.ACTIVE;
import static org.apache.ignite.transactions.TransactionState.COMMITTED;
import static org.apache.ignite.transactions.TransactionState.COMMITTING;
@@ -187,6 +195,13 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
static int DEADLOCK_MAX_ITERS =
IgniteSystemProperties.getInteger(IGNITE_TX_DEADLOCK_DETECTION_MAX_ITERS, 1000);
+ /** Collisions dump interval. */
+ private volatile int collisionsDumpInterval =
+ IgniteSystemProperties.getInteger(IGNITE_DUMP_TX_COLLISIONS_INTERVAL, 1000);
+
+ /** Lower tx collisions queue size threshold. */
+ private static final int COLLISIONS_QUEUE_THRESHOLD = 100;
+
/** Committing transactions. */
private final ThreadLocal<IgniteInternalTx> threadCtx = new ThreadLocal<>();
@@ -291,6 +306,16 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
private ConcurrentMap<UUID, TxTimeoutOnPartitionMapExchangeChangeFuture> txTimeoutOnPartitionMapExchangeFuts =
new ConcurrentHashMap<>();
+ /** Timeout operations. */
+ private final Map<String, GridTimeoutProcessor.CancelableTask> timeoutOperations =
+ new HashMap<String, GridTimeoutProcessor.CancelableTask>() {{
+ put(IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT, null);
+ put(IGNITE_DUMP_TX_COLLISIONS_INTERVAL, null);
+ }};
+
+ /** Key collisions info holder. */
+ private volatile KeyCollisionsHolder keyCollisionsInfo;
+
/** {@inheritDoc} */
@Override protected void onKernalStop0(boolean cancel) {
cctx.gridIO().removeMessageListener(TOPIC_TX);
@@ -373,6 +398,12 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
new TransactionViewWalker(),
new ReadOnlyCollectionView2X<>(idMap.values(), nearIdMap.values()),
TransactionView::new);
+
+ keyCollisionsInfo = new KeyCollisionsHolder();
+
+ longOperationsDumpTimeout(longOperationsDumpTimeout());
+
+ txCollisionsDumpInterval(collisionsDumpInterval());
}
/** {@inheritDoc} */
@@ -2120,6 +2151,55 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
*/
public void longOperationsDumpTimeout(long longOpsDumpTimeout) {
this.longOpsDumpTimeout = longOpsDumpTimeout;
+
+ scheduleDumpTask(
+ IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT,
+ () -> cctx.kernalContext().cache().context().exchange().dumpLongRunningOperations(longOpsDumpTimeout),
+ longOpsDumpTimeout);
+ }
+
+ /**
+ * Schedule tx collisions collection task.
+ *
+ * @param timeout Sets tx key collisions analysis interval.
+ **/
+ void txCollisionsDumpInterval(int timeout) {
+ collisionsDumpInterval = timeout;
+
+ scheduleDumpTask(
+ IGNITE_DUMP_TX_COLLISIONS_INTERVAL,
+ this::collectTxCollisionsInfo,
+ collisionsDumpInterval());
+ }
+
+ /**
+ * Scheduling tasks for dumping long operations. Closes current task
+ * (if any) and if the {@code timeout > 0} schedules a new task
+ * with a new timeout, delay and start period equal to
+ * {@code timeout}, otherwise task is deleted.
+ *
+ * @param taskKey Appropriate key in {@link IgniteTxManager#timeoutOperations}
+ * @param r Task.
+ * @param timeout Long operations dump timeout.
+ */
+ void scheduleDumpTask(String taskKey, Runnable r, long timeout) {
+ if (isStopping())
+ return;
+
+ GridTimeoutProcessor.CancelableTask longOpDumpTask;
+
+ GridTimeoutProcessor timeoutProc = cctx.kernalContext().timeout();
+
+ synchronized (timeoutOperations) {
+ GridTimeoutProcessor.CancelableTask task = timeoutOperations.get(taskKey);
+
+ if (nonNull(task))
+ task.close();
+
+ longOpDumpTask = timeout > 0 ? timeoutProc.schedule(r, timeout, timeout) : null;
+
+ timeoutOperations.put(taskKey, longOpDumpTask);
+ }
}
/**
@@ -2803,20 +2883,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
}
/**
- * Setting (for all nodes) a timeout (in millis) for printing long-running
- * transactions as well as transactions that cannot receive locks for all
- * their keys for a long time. Set less than or equal {@code 0} to disable.
- *
- * @param longOpsDumpTimeout Long operations dump timeout.
- */
- public void longOperationsDumpTimeoutDistributed(long longOpsDumpTimeout) {
- broadcastToNodesSupportingFeature(
- new LongOperationsDumpSettingsClosure(longOpsDumpTimeout),
- LONG_OPERATIONS_DUMP_TIMEOUT
- );
- }
-
- /**
* Sets transaction timeout on partition map exchange.
*
* @param timeout Transaction timeout on partition map exchange in milliseconds.
@@ -2894,14 +2960,12 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
* @param allowed whether allowed
*/
public void setTxOwnerDumpRequestsAllowedDistributed(boolean allowed) {
- ClusterGroup grp = cctx.kernalContext().grid()
- .cluster()
- .forServers()
- .forPredicate(node -> IgniteFeatures.nodeSupports(node, TRANSACTION_OWNER_THREAD_DUMP_PROVIDING));
-
- IgniteCompute compute = cctx.kernalContext().grid().compute(grp);
-
- compute.broadcast(new TxOwnerDumpRequestAllowedSettingClosure(allowed));
+ broadcastToNodesSupportingFeature(
+ cctx.kernalContext(),
+ new TxOwnerDumpRequestAllowedSettingClosure(allowed),
+ true,
+ TRANSACTION_OWNER_THREAD_DUMP_PROVIDING
+ );
}
/**
@@ -2917,7 +2981,9 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
assert threshold >= 0 : "Threshold timeout must be greater than or equal to 0.";
broadcastToNodesSupportingFeature(
+ cctx.kernalContext(),
new LongRunningTxTimeDumpSettingsClosure(threshold, null, null),
+ false,
LRT_SYSTEM_USER_TIME_DUMP_SETTINGS
);
}
@@ -2932,7 +2998,9 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
assert coefficient >= 0.0 && coefficient <= 1.0 : "Percentage value must be between 0.0 and 1.0 inclusively.";
broadcastToNodesSupportingFeature(
+ cctx.kernalContext(),
new LongRunningTxTimeDumpSettingsClosure(null, coefficient, null),
+ false,
LRT_SYSTEM_USER_TIME_DUMP_SETTINGS
);
}
@@ -2948,25 +3016,178 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
assert limit > 0 : "Limit value must be greater than 0.";
broadcastToNodesSupportingFeature(
+ cctx.kernalContext(),
new LongRunningTxTimeDumpSettingsClosure(null, null, limit),
+ false,
LRT_SYSTEM_USER_TIME_DUMP_SETTINGS
);
}
+ /** {@inheritDoc} */
+ @Override protected void stop0(boolean cancel) {
+ super.stop0(cancel);
+
+ synchronized (timeoutOperations) {
+ timeoutOperations.forEach((k, v) -> {
+ if (v != null)
+ v.close();
+ });
+ }
+ }
+
/**
- * Broadcasts given job to nodes that support ignite feature.
+ * Setting (for all nodes) a timeout (in millis) for printing long-running
+ * transactions as well as transactions that cannot receive locks for all
+ * their keys for a long time. Set less than or equal {@code 0} to disable.
*
- * @param job Ignite job.
- * @param feature Ignite feature.
+ * @param longOpsDumpTimeout Long operations dump timeout.
*/
- private void broadcastToNodesSupportingFeature(IgniteRunnable job, IgniteFeatures feature) {
- ClusterGroup grp = cctx.kernalContext().grid()
- .cluster()
- .forPredicate(node -> IgniteFeatures.nodeSupports(node, feature));
+ public void longOperationsDumpTimeoutDistributed(long longOpsDumpTimeout) {
+ broadcastToNodesSupportingFeature(
+ cctx.kernalContext(),
+ new LongOperationsDumpSettingsClosure(longOpsDumpTimeout),
+ false,
+ DISTRIBUTED_CHANGE_LONG_OPERATIONS_DUMP_TIMEOUT
+ );
+ }
- IgniteCompute compute = cctx.kernalContext().grid().compute(grp);
+ /**
+ * Returns tx keys collisions dump interval, for additional info check
+ * {@link IgniteSystemProperties#IGNITE_DUMP_TX_COLLISIONS_INTERVAL} description.
+ *
+ * @return Collisions dump interval.
+ */
+ public int collisionsDumpInterval() {
+ return collisionsDumpInterval;
+ }
+
+ /**
+ * Changes tx key collisions dump interval.
+ * For additional info check {@link IgniteSystemProperties#IGNITE_DUMP_TX_COLLISIONS_INTERVAL} description.
+ *
+ * @param collisionsDumpInterval New collisions dump interval or negative for disabling.
+ */
+ public void collisionsDumpIntervalDistributed(int collisionsDumpInterval) {
+ broadcastToNodesSupportingFeature(
+ cctx.kernalContext(),
+ new TxCollisionsDumpSettingsClosure(collisionsDumpInterval),
+ true,
+ DISTRIBUTED_TX_COLLISIONS_DUMP
+ );
+ }
+
+ /**
+ * Collect queue size per key collisions info.
+ *
+ * @param key Key.
+ * @param queueSize Collisions queue size
+ */
+ public void pushCollidingKeysWithQueueSize(GridCacheMapEntry key, int queueSize) {
+ keyCollisionsInfo.put(key, queueSize);
+ }
+
+ /** Wrapper for inner collect logic. */
+ private void collectTxCollisionsInfo() {
+ keyCollisionsInfo.collectInfo();
+ }
+
+ /**
+ * Check local and remote candidates queue size.
+ *
+ * @param entry CacheEntry.
+ */
+ public void detectPossibleCollidingKeys(GridDistributedCacheEntry entry) {
+ int qSize = entry.remoteMvccSnapshot().size();
+
+ try {
+ qSize += entry.localCandidates().size();
+ }
+ catch (GridCacheEntryRemovedException ignored) {
+ // No-op, obsolete vers found.
+ }
+
+ if (qSize >= COLLISIONS_QUEUE_THRESHOLD)
+ pushCollidingKeysWithQueueSize(entry, qSize);
+ }
+
+ /** Tx key collisions info holder. */
+ private final class KeyCollisionsHolder {
+ /** Stripes count. */
+ private final int stripesCnt = cctx.kernalContext().config().getSystemThreadPoolSize();
+
+ /** Max objects per store. */
+ private static final int MAX_OBJS = 5;
+
+ /** Store for keys and collisions queue sizes. */
+ private final Map<GridCacheMapEntry, Integer> stores[] = new LinkedHashMap[stripesCnt];
- compute.broadcast(job);
+ /** Metric per cache store. */
+ private final Map<GridCacheAdapter<?, ?>, List<Map.Entry<GridCacheMapEntry, Integer>>> metricPerCacheStore =
+ new ConcurrentHashMap<>();
+
+ /** Guard. */
+ private final AtomicBoolean alreadyRun = new AtomicBoolean();
+
+ /** Constructor. */
+ private KeyCollisionsHolder() {
+ for (int i = 0; i < stripesCnt; ++i) {
+ stores[i] = new LinkedHashMap<GridCacheMapEntry, Integer>() {
+ /** {@inheritDoc} */
+ @Override protected boolean removeEldestEntry(Map.Entry<GridCacheMapEntry, Integer> eldest) {
+ return size() > MAX_OBJS;
+ }
+ };
+ }
+ }
+
+ /**
+ * Stores keys and values.
+ *
+ * @param key Key to store.
+ * @param val Value to store.
+ */
+ public void put(GridCacheMapEntry key, Integer val) {
+ int stripeIdx = key.hashCode() & (stripesCnt - 1);
+
+ synchronized (stores[stripeIdx]) {
+ stores[stripeIdx].put(key, val);
+ }
+ }
+
+ /** Print hot keys info. */
+ private void collectInfo() {
+ if (!alreadyRun.compareAndSet(false, true))
+ return;
+
+ metricPerCacheStore.clear();
+
+ for (int i = 0; i < stripesCnt; ++i) {
+ synchronized (stores[i]) {
+ Map<GridCacheMapEntry, Integer> store = stores[i];
+
+ if (store.isEmpty())
+ continue;
+
+ for (Map.Entry<GridCacheMapEntry, Integer> info : store.entrySet()) {
+ GridCacheAdapter<Object, Object> cacheCtx = info.getKey().context().cache();
+
+ metricPerCacheStore.computeIfAbsent(cacheCtx, k -> new ArrayList<>()).add(info);
+ }
+
+ store.clear();
+ }
+ }
+
+ metricPerCacheStore.forEach((k, v) -> {
+ if (k.metrics0().keyCollisionsInfo() == null) {
+ k.metrics0().keyCollisionsInfo(
+ () -> metricPerCacheStore.get(k)
+ );
+ }
+ });
+
+ alreadyRun.getAndSet(false);
+ }
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/LongOperationsDumpSettingsClosure.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxCollisionsDumpSettingsClosure.java
similarity index 57%
copy from modules/core/src/main/java/org/apache/ignite/internal/processors/cache/LongOperationsDumpSettingsClosure.java
copy to modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxCollisionsDumpSettingsClosure.java
index edd6ebc..9c72ca8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/LongOperationsDumpSettingsClosure.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxCollisionsDumpSettingsClosure.java
@@ -15,43 +15,40 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.cache;
+package org.apache.ignite.internal.processors.cache.transactions;
-import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.resources.IgniteInstanceResource;
/**
- * Closure that is sent to the node in order to change
- * "Long operations dump timeout" parameter and also reschedule the task for
- * dumping long operations.
+ * Change tx collisions interval or negative for disabling.
*/
-public class LongOperationsDumpSettingsClosure implements IgniteRunnable {
+public class TxCollisionsDumpSettingsClosure implements IgniteRunnable {
/** Serialization ID. */
private static final long serialVersionUID = 0L;
- /** Long operations dump timeout. */
- private final long longOpsDumpTimeout;
-
/** Auto-inject Ignite instance. */
@IgniteInstanceResource
- private Ignite ignite;
+ private IgniteEx ignite;
/**
- * Constructor.
+ * Tx key collision dump interval.
+ * Check {@link IgniteSystemProperties#IGNITE_DUMP_TX_COLLISIONS_INTERVAL} for additional info.
+ */
+ private final int interval;
+
+ /** Constructor.
*
- * @param longOpsDumpTimeout Long operations dump timeout.
+ * @param timeout New interval for key collisions collection.
*/
- public LongOperationsDumpSettingsClosure(long longOpsDumpTimeout) {
- this.longOpsDumpTimeout = longOpsDumpTimeout;
+ TxCollisionsDumpSettingsClosure(int timeout) {
+ interval = timeout;
}
/** {@inheritDoc} */
@Override public void run() {
- ((IgniteEx)ignite).context().cache().context().tm().longOperationsDumpTimeout(longOpsDumpTimeout);
-
- ((IgniteKernal)ignite).scheduleLongOperationsDumpTask(longOpsDumpTimeout);
+ ignite.context().cache().context().tm().txCollisionsDumpInterval(interval);
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 8d608f1..222e0a4 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -171,6 +171,7 @@ import javax.net.ssl.X509TrustManager;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteClientDisconnectedException;
+import org.apache.ignite.IgniteCompute;
import org.apache.ignite.IgniteDeploymentException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteIllegalStateException;
@@ -179,6 +180,7 @@ import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.binary.BinaryRawReader;
import org.apache.ignite.binary.BinaryRawWriter;
+import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.cluster.ClusterGroupEmptyException;
import org.apache.ignite.cluster.ClusterMetrics;
import org.apache.ignite.cluster.ClusterNode;
@@ -195,6 +197,7 @@ import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteDeploymentCheckedException;
+import org.apache.ignite.internal.IgniteFeatures;
import org.apache.ignite.internal.IgniteFutureCancelledCheckedException;
import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
@@ -248,6 +251,7 @@ import org.apache.ignite.lang.IgniteFutureTimeoutException;
import org.apache.ignite.lang.IgniteOutClosure;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteProductVersion;
+import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.lifecycle.LifecycleAware;
import org.apache.ignite.marshaller.Marshaller;
@@ -11888,4 +11892,30 @@ public abstract class IgniteUtils {
public static int utfBytes(char c) {
return (c >= 0x0001 && c <= 0x007F) ? 1 : (c > 0x07FF) ? 3 : 2;
}
+
+ /**
+ * Broadcasts given job to nodes that support ignite feature.
+ *
+ * @param kctx Kernal context.
+ * @param job Ignite job.
+ * @param srvrsOnly Broadcast only on server nodes.
+ * @param feature Ignite feature.
+ */
+ public static void broadcastToNodesSupportingFeature(
+ GridKernalContext kctx,
+ IgniteRunnable job,
+ boolean srvrsOnly,
+ IgniteFeatures feature
+ ) {
+ ClusterGroup cl = kctx.grid().cluster();
+
+ if (srvrsOnly)
+ cl = cl.forServers();
+
+ ClusterGroup grp = cl.forPredicate(node -> IgniteFeatures.nodeSupports(node, feature));
+
+ IgniteCompute compute = kctx.grid().compute(grp);
+
+ compute.broadcast(job);
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/mxbean/CacheMetricsMXBean.java b/modules/core/src/main/java/org/apache/ignite/mxbean/CacheMetricsMXBean.java
index fc3cacb..e6fce4a 100644
--- a/modules/core/src/main/java/org/apache/ignite/mxbean/CacheMetricsMXBean.java
+++ b/modules/core/src/main/java/org/apache/ignite/mxbean/CacheMetricsMXBean.java
@@ -316,16 +316,21 @@ public interface CacheMetricsMXBean extends CacheStatisticsMXBean, CacheMXBean,
public boolean isReadThrough();
/** {@inheritDoc} */
- @Override @MXBeanDescription("True when a cache is in write-through mode.")
- public boolean isWriteThrough();
+ @MXBeanDescription("True when a cache is in write-through mode.")
+ @Override public boolean isWriteThrough();
/** {@inheritDoc} */
- @Override @MXBeanDescription("True when a cache topology is valid for read operations.")
- public boolean isValidForReading();
+ @MXBeanDescription("True when a cache topology is valid for read operations.")
+ @Override public boolean isValidForReading();
/** {@inheritDoc} */
- @Override @MXBeanDescription("True when a cache topology is valid for write operations.")
- public boolean isValidForWriting();
+ @MXBeanDescription("True when a cache topology is valid for write operations.")
+ @Override public boolean isValidForWriting();
+
+ /** {@inheritDoc} */
+ @MXBeanDescription("Tx key collisions. Show key and appropriate collisions queue size for the last " +
+ "IGNITE_DUMP_TX_COLLISIONS_INTERVAL.")
+ @Override public String getTxKeyCollisions();
/**
* Enable statistic collection for the cache.
diff --git a/modules/core/src/main/java/org/apache/ignite/mxbean/TransactionsMXBean.java b/modules/core/src/main/java/org/apache/ignite/mxbean/TransactionsMXBean.java
index fca905c..e6f322a 100644
--- a/modules/core/src/main/java/org/apache/ignite/mxbean/TransactionsMXBean.java
+++ b/modules/core/src/main/java/org/apache/ignite/mxbean/TransactionsMXBean.java
@@ -245,4 +245,24 @@ public interface TransactionsMXBean {
"receive locks for all their keys for a long time. Returns {@code 0} or less if not set."
)
long getLongOperationsDumpTimeout();
+
+ /**
+ * Set timeout interval for tx key contention analysis.
+ * @param timeout Interval in millis.
+ */
+ @MXBeanParametersNames("timeout")
+ @MXBeanDescription("Timeout interval (in millis) for printing tx key contention queue size info. Each transaction " +
+ "besides OPTIMISTIC SERIALIZABLE capture locks on all enlisted keys, for some reasons per key lock queue may " +
+ "rise. This property sets the interval during which keys and appropriate queue size statistics has been " +
+ "collected.")
+ void setTxKeyCollisionsInterval(int timeout);
+
+ /**
+ * @return Current interval in millis.
+ */
+ @MXBeanDescription("Returns a timeout (in millis) for printing tx key contention queue size info. Each transaction " +
+ "besides OPTIMISTIC SERIALIZABLE capture locks on all enlisted keys, for some reasons per key lock queue may " +
+ "rise. Returns the interval during which keys and appropriate queue size statistics has been " +
+ "collected.")
+ int getTxKeyCollisionsInterval();
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsManageTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsManageTest.java
index 271768a..c2c297c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsManageTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsManageTest.java
@@ -17,37 +17,64 @@
package org.apache.ignite.internal.processors.cache;
+import java.lang.management.ManagementFactory;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.List;
import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import javax.cache.CacheManager;
import javax.cache.Caching;
+import javax.management.MBeanServer;
+import javax.management.MBeanServerInvocationHandler;
+import javax.management.ObjectName;
+
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteTransactions;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMetrics;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.TransactionsMXBeanImpl;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishResponse;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
+import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.mxbean.CacheMetricsMXBean;
+import org.apache.ignite.mxbean.TransactionsMXBean;
+import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.MvccFeatureChecker;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
import org.junit.Assume;
import org.junit.Test;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_DUMP_TX_COLLISIONS_INTERVAL;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+
/**
*
*/
@@ -68,6 +95,15 @@ public class CacheMetricsManageTest extends GridCommonAbstractTest {
/** Persistence. */
private boolean persistence;
+ /** Use test spi flag. */
+ private boolean useTestCommSpi;
+
+ /** Backups count. */
+ private int backups = -1;
+
+ /** Client flag. */
+ private boolean client;
+
/**
* @throws Exception If failed.
*/
@@ -478,19 +514,35 @@ public class CacheMetricsManageTest extends GridCommonAbstractTest {
return getMxBean(getTestIgniteInstanceName(nodeIdx), cacheName, clazz.getName(), CacheMetricsMXBean.class);
}
- /** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
-
- CacheConfiguration cacheCfg = new CacheConfiguration()
+ /** Default cache config. */
+ private CacheConfiguration<?, ?> getCacheConfiguration() {
+ CacheConfiguration<?, ?> cacheCfg = new CacheConfiguration<>()
.setName(CACHE1)
.setGroupName(GROUP)
.setCacheMode(CacheMode.PARTITIONED)
.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+ if (backups != -1)
+ cacheCfg.setBackups(2);
+
+ return cacheCfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ CacheConfiguration<?, ?> cacheCfg = getCacheConfiguration();
+
cfg.setCacheConfiguration(cacheCfg);
+ if (useTestCommSpi)
+ cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
+
+ if (client)
+ cfg.setClientMode(client);
+
if (persistence)
cfg.setDataStorageConfiguration(new DataStorageConfiguration()
.setDefaultDataRegionConfiguration(
@@ -504,6 +556,286 @@ public class CacheMetricsManageTest extends GridCommonAbstractTest {
return cfg;
}
+ /** Test correct metric for tx key contention. */
+ @Test
+ @WithSystemProperty(key = IGNITE_DUMP_TX_COLLISIONS_INTERVAL, value = "30000")
+ public void testTxContentionMetric() throws Exception {
+ Assume.assumeFalse("https://issues.apache.org/jira/browse/IGNITE-9224", MvccFeatureChecker.forcedMvcc());
+
+ backups = 1;
+
+ useTestCommSpi = true;
+
+ Ignite ig = startGridsMultiThreaded(2);
+
+ int contCnt = (int)U.staticField(IgniteTxManager.class, "COLLISIONS_QUEUE_THRESHOLD") * 20;
+
+ CountDownLatch txLatch = new CountDownLatch(contCnt * 2);
+
+ CountDownLatch txLatch0 = new CountDownLatch(contCnt * 2);
+
+ ig.cluster().active(true);
+
+ client = true;
+
+ Ignite cl = startGrid();
+
+ CacheConfiguration<?, ?> dfltCacheCfg = getCacheConfiguration();
+
+ dfltCacheCfg.setStatisticsEnabled(true);
+
+ String cacheName = dfltCacheCfg.getName();
+
+ IgniteCache<Integer, Integer> cache = ig.cache(cacheName);
+
+ IgniteCache<Integer, Integer> cache0 = cl.cache(cacheName);
+
+ CacheMetricsMXBean mxBeanCache = mxBean(0, cacheName, CacheLocalMetricsMXBeanImpl.class);
+
+ final List<Integer> priKeys = primaryKeys(cache, 3, 1);
+
+ final Integer backKey = backupKey(cache);
+
+ IgniteTransactions txMgr = cl.transactions();
+
+ CountDownLatch blockOnce = new CountDownLatch(1);
+
+ for (Ignite ig0 : G.allGrids()) {
+ TestRecordingCommunicationSpi commSpi0 =
+ (TestRecordingCommunicationSpi)ig0.configuration().getCommunicationSpi();
+
+ commSpi0.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
+ @Override public boolean apply(ClusterNode node, Message msg) {
+ if (msg instanceof GridNearTxPrepareResponse && blockOnce.getCount() > 0) {
+ blockOnce.countDown();
+
+ return true;
+ }
+
+ return false;
+ }
+ });
+ }
+
+ IgniteInternalFuture f = GridTestUtils.runAsync(() -> {
+ try (Transaction tx = txMgr.txStart(PESSIMISTIC, READ_COMMITTED)) {
+ cache0.put(priKeys.get(0), 0);
+ cache0.put(priKeys.get(2), 0);
+ tx.commit();
+ }
+ });
+
+ blockOnce.await();
+
+ GridCompoundFuture<?, ?> finishFut = new GridCompoundFuture<>();
+
+ for (int i = 0; i < contCnt; ++i) {
+ IgniteInternalFuture f0 = GridTestUtils.runAsync(() -> {
+ try (Transaction tx = txMgr.txStart(PESSIMISTIC, READ_COMMITTED)) {
+ cache0.put(priKeys.get(0), 0);
+ cache0.put(priKeys.get(1), 0);
+
+ txLatch0.countDown();
+
+ tx.commit();
+
+ txLatch.countDown();
+ }
+
+ try (Transaction tx = txMgr.txStart(PESSIMISTIC, READ_COMMITTED)) {
+ cache0.put(priKeys.get(2), 0);
+ cache0.put(backKey, 0);
+
+ txLatch0.countDown();
+
+ tx.commit();
+
+ txLatch.countDown();
+ }
+ });
+
+ finishFut.add(f0);
+ }
+
+ finishFut.markInitialized();
+
+ txLatch0.await();
+
+ for (Ignite ig0 : G.allGrids()) {
+ TestRecordingCommunicationSpi commSpi0 =
+ (TestRecordingCommunicationSpi)ig0.configuration().getCommunicationSpi();
+
+ commSpi0.stopBlock();
+ }
+
+ IgniteTxManager txManager = ((IgniteEx) ig).context().cache().context().tm();
+
+ assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ try {
+ U.invoke(IgniteTxManager.class, txManager, "collectTxCollisionsInfo");
+ }
+ catch (IgniteCheckedException e) {
+ fail(e.toString());
+ }
+
+ String coll = mxBeanCache.getTxKeyCollisions();
+
+ if (coll.contains("val=" + priKeys.get(2)) || coll.contains("val=" + priKeys.get(0)));
+ return true;
+ }
+ }, 10_000));
+
+ f.get();
+
+ finishFut.get();
+
+ txLatch.await();
+ }
+
+ /** Tests metric change interval. */
+ @Test
+ public void testKeyCollisionsMetricDifferentTimeout() throws Exception {
+ Assume.assumeFalse("https://issues.apache.org/jira/browse/IGNITE-9224", MvccFeatureChecker.forcedMvcc());
+
+ backups = 2;
+
+ useTestCommSpi = true;
+
+ Ignite ig = startGridsMultiThreaded(3);
+
+ int contCnt = (int)U.staticField(IgniteTxManager.class, "COLLISIONS_QUEUE_THRESHOLD") * 5;
+
+ CountDownLatch txLatch = new CountDownLatch(contCnt);
+
+ ig.cluster().active(true);
+
+ client = true;
+
+ Ignite cl = startGrid();
+
+ IgniteTransactions txMgr = cl.transactions();
+
+ CacheConfiguration<?, ?> dfltCacheCfg = getCacheConfiguration();
+
+ dfltCacheCfg.setStatisticsEnabled(true);
+
+ String cacheName = dfltCacheCfg.getName();
+
+ IgniteCache<Integer, Integer> cache = ig.cache(cacheName);
+
+ IgniteCache<Integer, Integer> cache0 = cl.cache(cacheName);
+
+ final Integer keyId = primaryKey(cache);
+
+ CountDownLatch blockOnce = new CountDownLatch(1);
+
+ for (Ignite ig0 : G.allGrids()) {
+ if (ig0.configuration().isClientMode())
+ continue;
+
+ TestRecordingCommunicationSpi commSpi0 =
+ (TestRecordingCommunicationSpi)ig0.configuration().getCommunicationSpi();
+
+ commSpi0.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
+ @Override public boolean apply(ClusterNode node, Message msg) {
+ if (msg instanceof GridNearTxFinishResponse && blockOnce.getCount() > 0) {
+ blockOnce.countDown();
+
+ return true;
+ }
+
+ return false;
+ }
+ });
+ }
+
+ IgniteInternalFuture f = GridTestUtils.runAsync(() -> {
+ try (Transaction tx = txMgr.txStart(PESSIMISTIC, READ_COMMITTED)) {
+ cache0.put(keyId, 0);
+ tx.commit();
+ }
+ });
+
+ blockOnce.await();
+
+ GridCompoundFuture<?, ?> finishFut = new GridCompoundFuture<>();
+
+ for (int i = 0; i < contCnt; ++i) {
+ IgniteInternalFuture f0 = GridTestUtils.runAsync(() -> {
+ try (Transaction tx = txMgr.txStart(PESSIMISTIC, READ_COMMITTED)) {
+ cache0.put(keyId, 0);
+
+ tx.commit();
+
+ txLatch.countDown();
+ }
+ });
+
+ finishFut.add(f0);
+ }
+
+ finishFut.markInitialized();
+
+ for (Ignite ig0 : G.allGrids()) {
+ TestRecordingCommunicationSpi commSpi0 =
+ (TestRecordingCommunicationSpi)ig0.configuration().getCommunicationSpi();
+
+ if (ig0.configuration().isClientMode())
+ continue;
+
+ commSpi0.stopBlock();
+ }
+
+ CacheMetricsMXBean mxBeanCache = mxBean(0, cacheName, CacheLocalMetricsMXBeanImpl.class);
+
+ IgniteTxManager txManager = ((IgniteEx) ig).context().cache().context().tm();
+
+ final TransactionsMXBean txMXBean1 = txMXBean(0);
+
+ final TransactionsMXBean txMXBean2 = txMXBean(1);
+
+ for (int i = 0; i < 10; ++i) {
+ txMXBean1.setTxKeyCollisionsInterval(ThreadLocalRandom.current().nextInt(1000, 1100));
+
+ txMXBean2.setTxKeyCollisionsInterval(ThreadLocalRandom.current().nextInt(1000, 1100));
+
+ mxBeanCache.getTxKeyCollisions();
+
+ mxBeanCache.clear();
+
+ try {
+ U.invoke(IgniteTxManager.class, txManager, "collectTxCollisionsInfo");
+ }
+ catch (IgniteCheckedException e) {
+ fail(e.toString());
+ }
+
+ U.sleep(500);
+ }
+
+ f.get();
+
+ finishFut.get();
+
+ txLatch.await();
+ }
+
+ /**
+ *
+ */
+ private TransactionsMXBean txMXBean(int igniteInt) throws Exception {
+ ObjectName mbeanName = U.makeMBeanName(getTestIgniteInstanceName(igniteInt), "Transactions",
+ TransactionsMXBeanImpl.class.getSimpleName());
+
+ MBeanServer mbeanSrv = ManagementFactory.getPlatformMBeanServer();
+
+ if (!mbeanSrv.isRegistered(mbeanName))
+ fail("MBean is not registered: " + mbeanName.getCanonicalName());
+
+ return MBeanServerInvocationHandler.newProxyInstance(mbeanSrv, mbeanName, TransactionsMXBean.class, true);
+ }
+
/**
* Check cache statistics enabled/disabled flag for all nodes
*
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxWithKeyContentionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxWithKeyContentionSelfTest.java
new file mode 100644
index 0000000..deb4795
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxWithKeyContentionSelfTest.java
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.transactions;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteTransactions;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMetrics;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishResponse;
+import org.apache.ignite.internal.util.future.GridCompoundFuture;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.MvccFeatureChecker;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+import org.junit.Test;
+import java.util.concurrent.CountDownLatch;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_DUMP_TX_COLLISIONS_INTERVAL;
+import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/** Tests tx key contention detection functional. */
+public class TxWithKeyContentionSelfTest extends GridCommonAbstractTest {
+ /** Client flag. */
+ private boolean client;
+
+ /** Near cache flag. */
+ private boolean nearCache;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String name) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(name);
+
+ cfg.setConsistentId("NODE_" + name.substring(name.length() - 1));
+
+ if (client)
+ cfg.setClientMode(true);
+
+ cfg.setDataStorageConfiguration(
+ new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(
+ new DataRegionConfiguration()
+ .setMaxSize(20 * 1024 * 1024)
+ )
+ );
+
+ TestRecordingCommunicationSpi commSpi = new TestRecordingCommunicationSpi();
+
+ cfg.setCommunicationSpi(commSpi);
+
+ cfg.setCacheConfiguration(getCacheConfiguration(DEFAULT_CACHE_NAME));
+
+ if (client) {
+ cfg.setConsistentId("Client");
+
+ cfg.setClientMode(client);
+ }
+
+ return cfg;
+ }
+
+ /** */
+ protected CacheConfiguration<?, ?> getCacheConfiguration(String name) {
+ CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(name)
+ .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
+ .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
+ .setAffinity(new RendezvousAffinityFunction(false, 16))
+ .setBackups(2)
+ .setStatisticsEnabled(true);
+
+ if (nearCache)
+ ccfg.setNearConfiguration(new NearCacheConfiguration<>());
+
+ return ccfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ stopAllGrids();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+ }
+
+ /**
+ * Tests transactional payload.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ @WithSystemProperty(key = IGNITE_DUMP_TX_COLLISIONS_INTERVAL, value = "30000")
+ public void testPessimisticRepeatableReadCheckContentionTxMetric() throws Exception {
+ runKeyCollisionsMetric(PESSIMISTIC, REPEATABLE_READ);
+ }
+
+ /**
+ * Tests transactional payload with near cache enabled.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ @WithSystemProperty(key = IGNITE_DUMP_TX_COLLISIONS_INTERVAL, value = "30000")
+ public void testPessimisticRepeatableReadCheckContentionTxMetricNear() throws Exception {
+ nearCache = true;
+
+ runKeyCollisionsMetric(PESSIMISTIC, REPEATABLE_READ);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ @WithSystemProperty(key = IGNITE_DUMP_TX_COLLISIONS_INTERVAL, value = "30000")
+ public void testPessimisticReadCommitedCheckContentionTxMetric() throws Exception {
+ runKeyCollisionsMetric(PESSIMISTIC, READ_COMMITTED);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ @WithSystemProperty(key = IGNITE_DUMP_TX_COLLISIONS_INTERVAL, value = "30000")
+ public void testPessimisticReadCommitedCheckContentionTxMetricNear() throws Exception {
+ nearCache = true;
+
+ runKeyCollisionsMetric(PESSIMISTIC, READ_COMMITTED);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ @WithSystemProperty(key = IGNITE_DUMP_TX_COLLISIONS_INTERVAL, value = "30000")
+ public void testOptimisticReadCommittedCheckContentionTxMetric() throws Exception {
+ runKeyCollisionsMetric(OPTIMISTIC, READ_COMMITTED);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ @WithSystemProperty(key = IGNITE_DUMP_TX_COLLISIONS_INTERVAL, value = "30000")
+ public void testOptimisticReadCommittedCheckContentionTxMetricNear() throws Exception {
+ nearCache = true;
+
+ runKeyCollisionsMetric(OPTIMISTIC, READ_COMMITTED);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ @WithSystemProperty(key = IGNITE_DUMP_TX_COLLISIONS_INTERVAL, value = "30000")
+ public void testOptimisticRepeatableReadCheckContentionTxMetric() throws Exception {
+ runKeyCollisionsMetric(OPTIMISTIC, REPEATABLE_READ);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ @WithSystemProperty(key = IGNITE_DUMP_TX_COLLISIONS_INTERVAL, value = "30000")
+ public void testOptimisticRepeatableReadCheckContentionTxMetricNear() throws Exception {
+ nearCache = true;
+
+ runKeyCollisionsMetric(OPTIMISTIC, REPEATABLE_READ);
+ }
+
+ /** Tests metric correct results while tx collisions occured.
+ *
+ * @param concurrency Concurrency level.
+ * @param isolation Isolation level.
+ * @throws Exception If failed.
+ */
+ private void runKeyCollisionsMetric(TransactionConcurrency concurrency, TransactionIsolation isolation) throws Exception {
+ if (MvccFeatureChecker.forcedMvcc())
+ return; // Not supported.
+
+ Ignite ig = startGridsMultiThreaded(3);
+
+ int contCnt = (int)U.staticField(IgniteTxManager.class, "COLLISIONS_QUEUE_THRESHOLD") * 5;
+
+ CountDownLatch txLatch = new CountDownLatch(contCnt);
+
+ ig.cluster().active(true);
+
+ client = true;
+
+ Ignite cl = startGrid();
+
+ IgniteTransactions txMgr = cl.transactions();
+
+ IgniteCache<Integer, Integer> cache = ig.cache(DEFAULT_CACHE_NAME);
+
+ IgniteCache<Integer, Integer> cache0 = cl.cache(DEFAULT_CACHE_NAME);
+
+ final Integer keyId = primaryKey(cache);
+
+ CountDownLatch blockOnce = new CountDownLatch(1);
+
+ for (Ignite ig0 : G.allGrids()) {
+ if (ig0.configuration().isClientMode())
+ continue;
+
+ TestRecordingCommunicationSpi commSpi0 =
+ (TestRecordingCommunicationSpi)ig0.configuration().getCommunicationSpi();
+
+ commSpi0.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
+ @Override public boolean apply(ClusterNode node, Message msg) {
+ if (msg instanceof GridNearTxFinishResponse && blockOnce.getCount() > 0) {
+ blockOnce.countDown();
+
+ return true;
+ }
+
+ return false;
+ }
+ });
+ }
+
+ IgniteInternalFuture f = GridTestUtils.runAsync(() -> {
+ try (Transaction tx = txMgr.txStart(concurrency, isolation)) {
+ cache0.put(keyId, 0);
+ tx.commit();
+ }
+ });
+
+ blockOnce.await();
+
+ GridCompoundFuture<?, ?> finishFut = new GridCompoundFuture<>();
+
+ for (int i = 0; i < contCnt; ++i) {
+ IgniteInternalFuture f0 = GridTestUtils.runAsync(() -> {
+ try (Transaction tx = txMgr.txStart(concurrency, isolation)) {
+ cache0.put(keyId, 0);
+
+ tx.commit();
+
+ txLatch.countDown();
+ }
+ });
+
+ finishFut.add(f0);
+ }
+
+ finishFut.markInitialized();
+
+ for (Ignite ig0 : G.allGrids()) {
+ TestRecordingCommunicationSpi commSpi0 =
+ (TestRecordingCommunicationSpi)ig0.configuration().getCommunicationSpi();
+
+ if (ig0.configuration().isClientMode())
+ continue;
+
+ commSpi0.stopBlock();
+ }
+
+ IgniteTxManager txManager = ((IgniteEx) ig).context().cache().context().tm();
+
+ assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ try {
+ U.invoke(IgniteTxManager.class, txManager, "collectTxCollisionsInfo");
+ }
+ catch (IgniteCheckedException e) {
+ fail(e.toString());
+ }
+
+ CacheMetrics metrics = ig.cache(DEFAULT_CACHE_NAME).localMetrics();
+
+ String coll1 = metrics.getTxKeyCollisions();
+
+ if (!coll1.isEmpty()) {
+ String coll2 = metrics.getTxKeyCollisions();
+
+ // check idempotent
+ assertEquals(coll1, coll2);
+
+ assertTrue(coll1.contains("queueSize"));
+
+ return true;
+ }
+ else
+ return false;
+ }
+ }, 10_000));
+
+ f.get();
+
+ finishFut.get();
+
+ txLatch.await();
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxWithSmallTimeoutAndContentionOneKeyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxWithSmallTimeoutAndContentionOneKeyTest.java
index b1e75e3..5394f80 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxWithSmallTimeoutAndContentionOneKeyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxWithSmallTimeoutAndContentionOneKeyTest.java
@@ -94,6 +94,15 @@ public class TxWithSmallTimeoutAndContentionOneKeyTest extends GridCommonAbstrac
cleanPersistenceDir();
}
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+
+ cleanPersistenceDir();
+ }
+
/**
* @return Random transaction type.
*/
diff --git a/modules/core/src/test/java/org/apache/ignite/platform/PlatformCacheWriteMetricsTask.java b/modules/core/src/test/java/org/apache/ignite/platform/PlatformCacheWriteMetricsTask.java
index faca537..5eb14b6 100644
--- a/modules/core/src/test/java/org/apache/ignite/platform/PlatformCacheWriteMetricsTask.java
+++ b/modules/core/src/test/java/org/apache/ignite/platform/PlatformCacheWriteMetricsTask.java
@@ -415,6 +415,11 @@ public class PlatformCacheWriteMetricsTask extends ComputeTaskAdapter<Long, Obje
}
/** {@inheritDoc} */
+ @Override public String getTxKeyCollisions() {
+ return "";
+ }
+
+ /** {@inheritDoc} */
@Override public int getTotalPartitionsCount() {
return 54;
}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java
index 1824be5..0b71420 100755
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java
@@ -57,6 +57,7 @@ import org.apache.ignite.internal.processors.cache.transactions.TxCrossCacheMapO
import org.apache.ignite.internal.processors.cache.transactions.TxCrossCacheRemoteMultiplePartitionReservationTest;
import org.apache.ignite.internal.processors.cache.transactions.TxRecoveryWithConcurrentRollbackTest;
import org.apache.ignite.internal.processors.cache.transactions.TxRollbackAsyncWithPersistenceTest;
+import org.apache.ignite.internal.processors.cache.transactions.TxWithKeyContentionSelfTest;
import org.apache.ignite.internal.processors.cache.transactions.TxWithSmallTimeoutAndContentionOneKeyTest;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.DynamicSuite;
@@ -137,6 +138,8 @@ public class IgniteCacheTestSuite7 {
GridTestUtils.addTestIfNeeded(suite, AtomicPartitionCounterStateConsistencyTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, AtomicPartitionCounterStateConsistencyHistoryRebalanceTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite, TxWithKeyContentionSelfTest.class, ignoredTests);
+
return suite;
}
}