You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ni...@apache.org on 2020/12/15 17:50:23 UTC
[ignite] branch master updated: IGNITE-12666 Provide cluster
performance profiling tool (#7693)
This is an automated email from the ASF dual-hosted git repository.
nizhikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new c5c007f IGNITE-12666 Provide cluster performance profiling tool (#7693)
c5c007f is described below
commit c5c007f73359c1b3264e0e627afa9af9144017f4
Author: Nikita Amelchev <ns...@gmail.com>
AuthorDate: Tue Dec 15 20:49:41 2020 +0300
IGNITE-12666 Provide cluster performance profiling tool (#7693)
---
.../org/apache/ignite/IgniteSystemProperties.java | 37 ++
.../org/apache/ignite/internal/GridComponent.java | 5 +-
.../apache/ignite/internal/GridKernalContext.java | 9 +-
.../ignite/internal/GridKernalContextImpl.java | 13 +-
.../org/apache/ignite/internal/IgniteFeatures.java | 5 +-
.../org/apache/ignite/internal/IgniteKernal.java | 2 +
.../internal/managers/IgniteMBeansManager.java | 6 +
.../internal/metric/IoStatisticsHolderQuery.java | 21 +-
.../internal/metric/IoStatisticsQueryHelper.java | 6 +-
.../internal/processors/cache/CacheLockImpl.java | 21 +-
.../processors/cache/GridCacheAdapter.java | 171 ++++++-
.../processors/cache/GridCacheProcessor.java | 3 +
.../wal/reader/StandaloneGridKernalContext.java | 6 +
.../query/GridCacheDistributedQueryFuture.java | 5 +
.../query/GridCacheDistributedQueryManager.java | 52 +-
.../cache/query/GridCacheQueryManager.java | 20 +
.../cache/transactions/IgniteInternalTx.java | 7 +
.../cache/transactions/IgniteTxAdapter.java | 29 +-
.../cache/transactions/IgniteTxManager.java | 24 +-
.../internal/processors/job/GridJobProcessor.java | 8 +
.../FilePerformanceStatisticsReader.java | 529 +++++++++++++++++++++
.../FilePerformanceStatisticsWriter.java | 502 +++++++++++++++++++
.../performancestatistics/OperationType.java | 185 +++++++
.../PerformanceStatisticsHandler.java | 97 ++++
.../PerformanceStatisticsMBeanImpl.java | 50 ++
.../PerformanceStatisticsProcessor.java | 293 ++++++++++++
.../processors/query/GridRunningQueryInfo.java | 26 +
.../processors/query/RunningQueryManager.java | 32 ++
.../processors/task/GridTaskProcessor.java | 9 +
.../ignite/mxbean/PerformanceStatisticsMBean.java | 40 ++
.../AbstractPerformanceStatisticsTest.java | 194 ++++++++
.../performancestatistics/CacheStartTest.java | 152 ++++++
.../performancestatistics/ForwardReadTest.java | 147 ++++++
.../PerformanceStatisticsMultipleStartTest.java | 69 +++
.../PerformanceStatisticsPropertiesTest.java | 201 ++++++++
.../PerformanceStatisticsSelfTest.java | 318 +++++++++++++
.../PerformanceStatisticsThinClientTest.java | 244 ++++++++++
.../performancestatistics/StringCacheTest.java | 81 ++++
.../performancestatistics/TopologyChangesTest.java | 137 ++++++
.../IgniteBasicWithPersistenceTestSuite.java | 18 +-
.../query/h2/twostep/GridMapQueryExecutor.java | 21 +
.../query/h2/twostep/GridReduceQueryExecutor.java | 4 +
.../PerformanceStatisticsQueryTest.java | 356 ++++++++++++++
.../testsuites/IgniteCacheQuerySelfTestSuite6.java | 4 +-
44 files changed, 4097 insertions(+), 62 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index aa12e54..0035f1b 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -36,6 +36,7 @@ import org.apache.ignite.internal.client.GridClient;
import org.apache.ignite.internal.marshaller.optimized.OptimizedMarshaller;
import org.apache.ignite.internal.processors.metastorage.DistributedMetaStorage;
import org.apache.ignite.internal.processors.metric.GridMetricManager;
+import org.apache.ignite.internal.processors.performancestatistics.FilePerformanceStatisticsWriter;
import org.apache.ignite.internal.processors.rest.GridRestCommand;
import org.apache.ignite.internal.util.GridLogThrottle;
import org.apache.ignite.lang.IgniteExperimental;
@@ -118,6 +119,10 @@ import static org.apache.ignite.internal.processors.failure.FailureProcessor.DFL
import static org.apache.ignite.internal.processors.job.GridJobProcessor.DFLT_JOBS_HISTORY_SIZE;
import static org.apache.ignite.internal.processors.jobmetrics.GridJobMetricsProcessor.DFLT_JOBS_METRICS_CONCURRENCY_LEVEL;
import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageImpl.DFLT_MAX_HISTORY_BYTES;
+import static org.apache.ignite.internal.processors.performancestatistics.FilePerformanceStatisticsWriter.DFLT_BUFFER_SIZE;
+import static org.apache.ignite.internal.processors.performancestatistics.FilePerformanceStatisticsWriter.DFLT_CACHED_STRINGS_THRESHOLD;
+import static org.apache.ignite.internal.processors.performancestatistics.FilePerformanceStatisticsWriter.DFLT_FILE_MAX_SIZE;
+import static org.apache.ignite.internal.processors.performancestatistics.FilePerformanceStatisticsWriter.DFLT_FLUSH_SIZE;
import static org.apache.ignite.internal.processors.query.QueryUtils.DFLT_INDEXING_DISCOVERY_HISTORY_SIZE;
import static org.apache.ignite.internal.processors.rest.GridRestProcessor.DFLT_SES_TIMEOUT;
import static org.apache.ignite.internal.processors.rest.GridRestProcessor.DFLT_SES_TOKEN_INVALIDATE_INTERVAL;
@@ -1963,6 +1968,38 @@ public final class IgniteSystemProperties {
"IGNITE_DEFRAGMENTATION_REGION_SIZE_PERCENTAGE";
/**
+ * Performance statistics maximum file size in bytes. Performance statistics will be stopped when the size exceeded.
+ * The default value is {@link FilePerformanceStatisticsWriter#DFLT_FILE_MAX_SIZE}.
+ */
+ @SystemProperty(value = "Performance statistics maximum file size in bytes. Performance statistics will be " +
+ "stopped when the size exceeded", type = Long.class, defaults = "" + DFLT_FILE_MAX_SIZE)
+ public static final String IGNITE_PERF_STAT_FILE_MAX_SIZE = "IGNITE_PERF_STAT_FILE_MAX_SIZE";
+
+ /**
+ * Performance statistics off heap buffer size in bytes. The default value is
+ * {@link FilePerformanceStatisticsWriter#DFLT_BUFFER_SIZE}.
+ */
+ @SystemProperty(value = "Performance statistics off heap buffer size in bytes", type = Integer.class,
+ defaults = "" + DFLT_BUFFER_SIZE)
+ public static final String IGNITE_PERF_STAT_BUFFER_SIZE = "IGNITE_PERF_STAT_BUFFER_SIZE";
+
+ /**
+ * Performance statistics minimal batch size to flush in bytes. The default value is
+ * {@link FilePerformanceStatisticsWriter#DFLT_FLUSH_SIZE}.
+ */
+ @SystemProperty(value = "Performance statistics minimal batch size to flush in bytes", type = Integer.class,
+ defaults = "" + DFLT_FLUSH_SIZE)
+ public static final String IGNITE_PERF_STAT_FLUSH_SIZE = "IGNITE_PERF_STAT_FLUSH_SIZE";
+
+ /**
+ * Performance statistics maximum cached strings threshold. String caching will stop on threshold excess.
+ * The default value is {@link FilePerformanceStatisticsWriter#DFLT_CACHED_STRINGS_THRESHOLD}.
+ */
+ @SystemProperty(value = "Performance statistics maximum cached strings threshold. String caching will stop on " +
+ "threshold excess", type = Integer.class, defaults = "" + DFLT_CACHED_STRINGS_THRESHOLD)
+ public static final String IGNITE_PERF_STAT_CACHED_STRINGS_THRESHOLD = "IGNITE_PERF_STAT_CACHED_STRINGS_THRESHOLD";
+
+ /**
* Enforces singleton.
*/
private IgniteSystemProperties() {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
index 067f79b..60198749d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
@@ -76,7 +76,10 @@ public interface GridComponent {
SERVICE_PROC,
/** Distributed MetaStorage processor. */
- META_STORAGE;
+ META_STORAGE,
+
+ /** Performance statistics processor. */
+ PERFORMANCE_STAT_PROC;
/** Cached values array. */
public static final DiscoveryDataExchangeType[] VALUES = values();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
index 56f9765..a799f60 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
@@ -21,7 +21,6 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
-
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.managers.checkpoint.GridCheckpointManager;
@@ -58,6 +57,7 @@ import org.apache.ignite.internal.processors.marshaller.GridMarshallerMappingPro
import org.apache.ignite.internal.processors.metastorage.DistributedMetaStorage;
import org.apache.ignite.internal.processors.metric.GridMetricManager;
import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor;
+import org.apache.ignite.internal.processors.performancestatistics.PerformanceStatisticsProcessor;
import org.apache.ignite.internal.processors.platform.PlatformProcessor;
import org.apache.ignite.internal.processors.plugin.IgnitePluginProcessor;
import org.apache.ignite.internal.processors.pool.PoolProcessor;
@@ -761,4 +761,11 @@ public interface GridKernalContext extends Iterable<GridComponent> {
* @return Thread pool for create/rebuild indexes.
*/
public ExecutorService buildIndexExecutorService();
+
+ /**
+ * Gets Performance statistics processor.
+ *
+ * @return Performance statistics processor.
+ */
+ public PerformanceStatisticsProcessor performanceStatistics();
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
index ae589ad..092cf1a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
@@ -31,7 +31,6 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
-
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
@@ -78,6 +77,7 @@ import org.apache.ignite.internal.processors.metastorage.DistributedMetaStorage;
import org.apache.ignite.internal.processors.metric.GridMetricManager;
import org.apache.ignite.internal.processors.nodevalidation.DiscoveryNodeValidationProcessor;
import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor;
+import org.apache.ignite.internal.processors.performancestatistics.PerformanceStatisticsProcessor;
import org.apache.ignite.internal.processors.platform.PlatformProcessor;
import org.apache.ignite.internal.processors.platform.plugin.PlatformPluginProcessor;
import org.apache.ignite.internal.processors.plugin.IgnitePluginProcessor;
@@ -415,6 +415,10 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
@GridToStringExclude
private DurableBackgroundTasksProcessor durableBackgroundTasksProcessor;
+ /** Performance statistics processor. */
+ @GridToStringExclude
+ private PerformanceStatisticsProcessor perfStatProc;
+
/** */
private Thread.UncaughtExceptionHandler hnd;
@@ -704,6 +708,8 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
durableBackgroundTasksProcessor = (DurableBackgroundTasksProcessor)comp;
else if (comp instanceof MaintenanceProcessor)
maintenanceProc = (MaintenanceProcessor) comp;
+ else if (comp instanceof PerformanceStatisticsProcessor)
+ perfStatProc = (PerformanceStatisticsProcessor)comp;
else if (!(comp instanceof DiscoveryNodeValidationProcessor
|| comp instanceof PlatformPluginProcessor))
assert (comp instanceof GridPluginComponent) : "Unknown manager class: " + comp.getClass();
@@ -1297,4 +1303,9 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
@Override public ExecutorService buildIndexExecutorService() {
return buildIdxExecSvc;
}
+
+ /** {@inheritDoc} */
+ @Override public PerformanceStatisticsProcessor performanceStatistics() {
+ return perfStatProc;
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java
index f9704ae..fa487a0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java
@@ -136,7 +136,10 @@ public enum IgniteFeatures {
SPLITTED_CACHE_CONFIGURATIONS_V2(46),
/** Cache encryption key change. See {@link IgniteEncryption#changeCacheGroupKey(Collection)}. */
- CACHE_GROUP_KEY_CHANGE(47);
+ CACHE_GROUP_KEY_CHANGE(47),
+
+ /** Collecting performance statistics. */
+ PERFORMANCE_STATISTICS(48);
/**
* Unique feature identifier.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 1bb3e6c..7ee9d56 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -167,6 +167,7 @@ import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.processors.nodevalidation.DiscoveryNodeValidationProcessor;
import org.apache.ignite.internal.processors.nodevalidation.OsDiscoveryNodeValidationProcessor;
import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor;
+import org.apache.ignite.internal.processors.performancestatistics.PerformanceStatisticsProcessor;
import org.apache.ignite.internal.processors.platform.PlatformNoopProcessor;
import org.apache.ignite.internal.processors.platform.PlatformProcessor;
import org.apache.ignite.internal.processors.platform.plugin.PlatformPluginProcessor;
@@ -1266,6 +1267,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
startProcessor(new DistributedMetaStorageImpl(ctx));
startProcessor(new DistributedConfigurationProcessor(ctx));
startProcessor(new DurableBackgroundTasksProcessor(ctx));
+ startProcessor(new PerformanceStatisticsProcessor(ctx));
startTimer.finishGlobalStage("Start processors");
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/IgniteMBeansManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/IgniteMBeansManager.java
index 550b60b..5c502e9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/IgniteMBeansManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/IgniteMBeansManager.java
@@ -42,6 +42,7 @@ import org.apache.ignite.internal.processors.cache.persistence.snapshot.Snapshot
import org.apache.ignite.internal.processors.cache.warmup.WarmUpMXBeanImpl;
import org.apache.ignite.internal.processors.cluster.BaselineAutoAdjustMXBeanImpl;
import org.apache.ignite.internal.processors.metric.MetricsMxBeanImpl;
+import org.apache.ignite.internal.processors.performancestatistics.PerformanceStatisticsMBeanImpl;
import org.apache.ignite.internal.util.StripedExecutor;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.worker.FailureHandlingMxBeanImpl;
@@ -55,6 +56,7 @@ import org.apache.ignite.mxbean.EncryptionMXBean;
import org.apache.ignite.mxbean.FailureHandlingMxBean;
import org.apache.ignite.mxbean.IgniteMXBean;
import org.apache.ignite.mxbean.MetricsMxBean;
+import org.apache.ignite.mxbean.PerformanceStatisticsMBean;
import org.apache.ignite.mxbean.QueryMXBean;
import org.apache.ignite.mxbean.ServiceMXBean;
import org.apache.ignite.mxbean.SnapshotMXBean;
@@ -236,6 +238,10 @@ public class IgniteMBeansManager {
if (ctx.query().moduleEnabled())
ctx.query().getIndexing().registerMxBeans(this);
+
+ PerformanceStatisticsMBeanImpl performanceStatMbean = new PerformanceStatisticsMBeanImpl(ctx);
+ registerMBean("PerformanceStatistics", performanceStatMbean.getClass().getSimpleName(), performanceStatMbean,
+ PerformanceStatisticsMBean.class);
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/metric/IoStatisticsHolderQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/metric/IoStatisticsHolderQuery.java
index 650e5e3..e9ce9ac 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/metric/IoStatisticsHolderQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/metric/IoStatisticsHolderQuery.java
@@ -40,16 +40,6 @@ public class IoStatisticsHolderQuery implements IoStatisticsHolder {
/** */
private LongAdder physicalReadCtr = new LongAdder();
- /** */
- private final String qryId;
-
- /**
- * @param qryId Query id.
- */
- public IoStatisticsHolderQuery(String qryId) {
- this.qryId = qryId;
- }
-
/** {@inheritDoc} */
@Override public void trackLogicalRead(long pageAddr) {
logicalReadCtr.increment();
@@ -78,13 +68,6 @@ public class IoStatisticsHolderQuery implements IoStatisticsHolder {
}
/**
- * @return Query id.
- */
- public String queryId() {
- return qryId;
- }
-
- /**
* Add given given statistics into this.
* Merge query statistics.
*
@@ -97,10 +80,10 @@ public class IoStatisticsHolderQuery implements IoStatisticsHolder {
physicalReadCtr.add(physicalReads);
}
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(IoStatisticsHolderQuery.class, this,
"logicalReadCtr", logicalReadCtr,
- "physicalReadCtr", physicalReadCtr,
- "qryId", qryId);
+ "physicalReadCtr", physicalReadCtr);
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/metric/IoStatisticsQueryHelper.java b/modules/core/src/main/java/org/apache/ignite/internal/metric/IoStatisticsQueryHelper.java
index 60f08f5..32a65ff 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/metric/IoStatisticsQueryHelper.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/metric/IoStatisticsQueryHelper.java
@@ -28,15 +28,13 @@ public class IoStatisticsQueryHelper {
/**
* Start gathering IO statistics for query. Should be used together with {@code finishGatheringQueryStatistics}
* method.
- *
- * @param qryId Identifier of query.
*/
- public static void startGatheringQueryStatistics(String qryId) {
+ public static void startGatheringQueryStatistics() {
IoStatisticsHolderQuery currQryStatisticsHolder = CUR_QRY_STATS.get();
assert currQryStatisticsHolder == null : currQryStatisticsHolder;
- CUR_QRY_STATS.set(new IoStatisticsHolderQuery(qryId));
+ CUR_QRY_STATS.set(new IoStatisticsHolderQuery());
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLockImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLockImpl.java
index ae7b42e..040642a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLockImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLockImpl.java
@@ -25,7 +25,9 @@ import javax.cache.CacheException;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.performancestatistics.OperationType;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.NotNull;
/**
@@ -50,6 +52,9 @@ class CacheLockImpl<K, V> implements Lock {
/** */
private volatile Thread lockedThread;
+ /** Lock start time in nanoseconds. */
+ private volatile long startTimeNanos;
+
/**
* @param gate Gate.
* @param delegate Delegate.
@@ -89,6 +94,9 @@ class CacheLockImpl<K, V> implements Lock {
private void incrementLockCounter() {
assert (lockedThread == null && cntr == 0) || (lockedThread == Thread.currentThread() && cntr > 0);
+ if (cntr == 0 && delegate.context().kernalContext().performanceStatistics().enabled())
+ startTimeNanos = System.nanoTime();
+
cntr++;
lockedThread = Thread.currentThread();
@@ -186,9 +194,20 @@ class CacheLockImpl<K, V> implements Lock {
cntr--;
- if (cntr == 0)
+ if (cntr == 0) {
lockedThread = null;
+ if (startTimeNanos > 0) {
+ delegate.context().kernalContext().performanceStatistics().cacheOperation(
+ OperationType.CACHE_LOCK,
+ delegate.context().cacheId(),
+ U.currentTimeMillis(),
+ System.nanoTime() - startTimeNanos);
+
+ startTimeNanos = 0;
+ }
+ }
+
delegate.unlockAll(keys);
}
catch (IgniteCheckedException e) {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index a6fcee8..0fe7e44 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -108,6 +108,7 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl;
import org.apache.ignite.internal.processors.dr.IgniteDrDataStreamerCacheUpdater;
+import org.apache.ignite.internal.processors.performancestatistics.OperationType;
import org.apache.ignite.internal.processors.platform.cache.PlatformCacheEntryFilter;
import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException;
@@ -1468,8 +1469,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
A.notNull(key, "key");
boolean statsEnabled = ctx.statisticsEnabled();
+ boolean performanceStatsEnabled = ctx.kernalContext().performanceStatistics().enabled();
- long start = statsEnabled ? System.nanoTime() : 0L;
+ long start = statsEnabled || performanceStatsEnabled ? System.nanoTime() : 0L;
boolean keepBinary = ctx.keepBinary();
@@ -1487,6 +1489,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (statsEnabled)
metrics0().addGetTimeNanos(System.nanoTime() - start);
+ if (performanceStatsEnabled)
+ writeStatistics(OperationType.CACHE_GET, start);
+
return val;
}
@@ -1495,8 +1500,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
A.notNull(key, "key");
boolean statsEnabled = ctx.statisticsEnabled();
+ boolean performanceStatsEnabled = ctx.kernalContext().performanceStatistics().enabled();
- long start = statsEnabled ? System.nanoTime() : 0L;
+ long start = statsEnabled || performanceStatsEnabled ? System.nanoTime() : 0L;
boolean keepBinary = ctx.keepBinary();
@@ -1523,6 +1529,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (statsEnabled)
metrics0().addGetTimeNanos(System.nanoTime() - start);
+ if (performanceStatsEnabled)
+ writeStatistics(OperationType.CACHE_GET, start);
+
return val;
}
@@ -1531,8 +1540,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
A.notNull(key, "key");
final boolean statsEnabled = ctx.statisticsEnabled();
+ final boolean performanceStatsEnabled = ctx.kernalContext().performanceStatistics().enabled();
- final long start = statsEnabled ? System.nanoTime() : 0L;
+ final long start = statsEnabled || performanceStatsEnabled ? System.nanoTime() : 0L;
final boolean keepBinary = ctx.keepBinary();
@@ -1558,6 +1568,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (statsEnabled)
fut.listen(new UpdateGetTimeStatClosure<V>(metrics0(), start));
+ if (performanceStatsEnabled)
+ fut.listen(f -> writeStatistics(OperationType.CACHE_GET, start));
+
return fut;
}
@@ -1566,8 +1579,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
A.notNull(key, "key");
final boolean statsEnabled = ctx.statisticsEnabled();
+ final boolean performanceStatsEnabled = ctx.kernalContext().performanceStatistics().enabled();
- final long start = statsEnabled ? System.nanoTime() : 0L;
+ final long start = statsEnabled || performanceStatsEnabled ? System.nanoTime() : 0L;
final boolean keepBinary = ctx.keepBinary();
@@ -1611,6 +1625,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (statsEnabled)
fut.listen(new UpdateGetTimeStatClosure<EntryGetResult>(metrics0(), start));
+ if (performanceStatsEnabled)
+ fut.listen(f -> writeStatistics(OperationType.CACHE_GET, start));
+
return fr;
}
@@ -1619,8 +1636,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
A.notNull(keys, "keys");
boolean statsEnabled = ctx.statisticsEnabled();
+ boolean performanceStatsEnabled = ctx.kernalContext().performanceStatistics().enabled();
- long start = statsEnabled ? System.nanoTime() : 0L;
+ long start = statsEnabled || performanceStatsEnabled ? System.nanoTime() : 0L;
final CacheOperationContext opCtx = ctx.operationContextPerCall();
@@ -1637,6 +1655,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (statsEnabled)
metrics0().addGetTimeNanos(System.nanoTime() - start);
+ if (performanceStatsEnabled)
+ writeStatistics(OperationType.CACHE_GET_ALL, start);
+
return map;
}
@@ -1646,8 +1667,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
A.notNull(keys, "keys");
boolean statsEnabled = ctx.statisticsEnabled();
+ boolean performanceStatsEnabled = ctx.kernalContext().performanceStatistics().enabled();
- long start = statsEnabled ? System.nanoTime() : 0L;
+ long start = statsEnabled || performanceStatsEnabled ? System.nanoTime() : 0L;
final CacheOperationContext opCtx = ctx.operationContextPerCall();
@@ -1669,6 +1691,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (statsEnabled)
metrics0().addGetTimeNanos(System.nanoTime() - start);
+ if (performanceStatsEnabled)
+ writeStatistics(OperationType.CACHE_GET_ALL, start);
+
return res;
}
@@ -1677,8 +1702,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
A.notNull(keys, "keys");
final boolean statsEnabled = ctx.statisticsEnabled();
+ final boolean performanceStatsEnabled = ctx.kernalContext().performanceStatistics().enabled();
- final long start = statsEnabled ? System.nanoTime() : 0L;
+ final long start = statsEnabled || performanceStatsEnabled ? System.nanoTime() : 0L;
String taskName = ctx.kernalContext().job().currentTaskName();
@@ -1706,6 +1732,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (statsEnabled)
fut.listen(new UpdateGetTimeStatClosure<Map<K, V>>(metrics0(), start));
+ if (performanceStatsEnabled)
+ fut.listen(f -> writeStatistics(OperationType.CACHE_GET_ALL, start));
+
return fut;
}
@@ -1715,8 +1744,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
A.notNull(keys, "keys");
final boolean statsEnabled = ctx.statisticsEnabled();
+ final boolean performanceStatsEnabled = ctx.kernalContext().performanceStatistics().enabled();
- final long start = statsEnabled ? System.nanoTime() : 0L;
+ final long start = statsEnabled || performanceStatsEnabled ? System.nanoTime() : 0L;
CacheOperationContext opCtx = ctx.operationContextPerCall();
@@ -1758,6 +1788,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (statsEnabled)
fut.listen(new UpdateGetTimeStatClosure<Map<K, EntryGetResult>>(metrics0(), start));
+ if (performanceStatsEnabled)
+ fut.listen(f -> writeStatistics(OperationType.CACHE_GET_ALL, start));
+
return rf;
}
@@ -2457,8 +2490,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
@Nullable public V getAndPut(final K key, final V val, @Nullable final CacheEntryPredicate filter)
throws IgniteCheckedException {
boolean statsEnabled = ctx.statisticsEnabled();
+ boolean performanceStatsEnabled = ctx.kernalContext().performanceStatistics().enabled();
- long start = statsEnabled ? System.nanoTime() : 0L;
+ long start = statsEnabled || performanceStatsEnabled ? System.nanoTime() : 0L;
A.notNull(key, "key", val, "val");
@@ -2470,6 +2504,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (statsEnabled)
metrics0().addPutAndGetTimeNanos(System.nanoTime() - start);
+ if (performanceStatsEnabled)
+ writeStatistics(OperationType.CACHE_GET_AND_PUT, start);
+
return prevVal;
}
@@ -2509,8 +2546,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
*/
protected final IgniteInternalFuture<V> getAndPutAsync(K key, V val, @Nullable CacheEntryPredicate filter) {
final boolean statsEnabled = ctx.statisticsEnabled();
+ final boolean performanceStatsEnabled = ctx.kernalContext().performanceStatistics().enabled();
- final long start = statsEnabled ? System.nanoTime() : 0L;
+ final long start = statsEnabled || performanceStatsEnabled ? System.nanoTime() : 0L;
A.notNull(key, "key", val, "val");
@@ -2522,6 +2560,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (statsEnabled)
fut.listen(new UpdatePutAndGetTimeStatClosure<V>(metrics0(), start));
+ if (performanceStatsEnabled)
+ fut.listen(f -> writeStatistics(OperationType.CACHE_GET_AND_PUT, start));
+
return fut;
}
@@ -2566,8 +2607,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
public boolean put(final K key, final V val, final CacheEntryPredicate filter)
throws IgniteCheckedException {
boolean statsEnabled = ctx.statisticsEnabled();
+ boolean performanceStatsEnabled = ctx.kernalContext().performanceStatistics().enabled();
- long start = statsEnabled ? System.nanoTime() : 0L;
+ long start = statsEnabled || performanceStatsEnabled ? System.nanoTime() : 0L;
A.notNull(key, "key", val, "val");
@@ -2579,6 +2621,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (statsEnabled && stored)
metrics0().addPutTimeNanos(System.nanoTime() - start);
+ if (performanceStatsEnabled)
+ writeStatistics(OperationType.CACHE_PUT, start);
+
return stored;
}
@@ -2692,8 +2737,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
tx.topologyVersion(topVer);
final boolean statsEnabled = ctx.statisticsEnabled();
+ final boolean performanceStatsEnabled = ctx.kernalContext().performanceStatistics().enabled();
- final long start = statsEnabled ? System.nanoTime() : 0L;
+ long start = statsEnabled || performanceStatsEnabled ? System.nanoTime() : 0L;
IgniteInternalFuture<GridCacheReturn> fut = tx.invokeAsync(ctx,
null,
@@ -2706,6 +2752,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (statsEnabled)
metrics0().addInvokeTimeNanos(System.nanoTime() - start);
+ if (performanceStatsEnabled)
+ writeStatistics(OperationType.CACHE_INVOKE, start);
+
EntryProcessorResult<T> res = null;
if (resMap != null) {
@@ -2731,8 +2780,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
warnIfUnordered(keys, BulkOperation.INVOKE);
final boolean statsEnabled = ctx.statisticsEnabled();
+ final boolean performanceStatsEnabled = ctx.kernalContext().performanceStatistics().enabled();
- final long start = statsEnabled ? System.nanoTime() : 0L;
+ long start = statsEnabled || performanceStatsEnabled ? System.nanoTime() : 0L;
return syncOp(new SyncOp<Map<K, EntryProcessorResult<T>>>(keys.size() == 1) {
@Override public Map<K, EntryProcessorResult<T>> op(GridNearTxLocal tx)
@@ -2751,6 +2801,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (statsEnabled)
metrics0().addInvokeTimeNanos(System.nanoTime() - start);
+ if (performanceStatsEnabled)
+ writeStatistics(OperationType.CACHE_INVOKE_ALL, start);
+
return res != null ? res : Collections.<K, EntryProcessorResult<T>>emptyMap();
}
});
@@ -2768,8 +2821,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheKey(key);
final boolean statsEnabled = ctx.statisticsEnabled();
+ final boolean performanceStatsEnabled = ctx.kernalContext().performanceStatistics().enabled();
- final long start = statsEnabled ? System.nanoTime() : 0L;
+ long start = statsEnabled || performanceStatsEnabled ? System.nanoTime() : 0L;
IgniteInternalFuture<?> fut = asyncOp(new AsyncOp() {
@Override public IgniteInternalFuture op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) {
@@ -2796,6 +2850,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (statsEnabled)
metrics0().addInvokeTimeNanos(System.nanoTime() - start);
+ if (performanceStatsEnabled)
+ writeStatistics(OperationType.CACHE_INVOKE, start);
+
Map<K, EntryProcessorResult<T>> resMap = ret.value();
if (resMap != null) {
@@ -2822,8 +2879,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
warnIfUnordered(keys, BulkOperation.INVOKE);
final boolean statsEnabled = ctx.statisticsEnabled();
+ final boolean performanceStatsEnabled = ctx.kernalContext().performanceStatistics().enabled();
- final long start = statsEnabled ? System.nanoTime() : 0L;
+ long start = statsEnabled || performanceStatsEnabled ? System.nanoTime() : 0L;
IgniteInternalFuture<?> fut = asyncOp(new AsyncOp(keys) {
@Override public IgniteInternalFuture<GridCacheReturn> op(GridNearTxLocal tx,
@@ -2855,6 +2913,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (statsEnabled)
metrics0().addInvokeTimeNanos(System.nanoTime() - start);
+ if (performanceStatsEnabled)
+ writeStatistics(OperationType.CACHE_INVOKE_ALL, start);
+
assert ret != null;
return ret.value() != null ? ret.<Map<K, EntryProcessorResult<T>>>value() : Collections.<K, EntryProcessorResult<T>>emptyMap();
@@ -2874,8 +2935,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
warnIfUnordered(map, BulkOperation.INVOKE);
final boolean statsEnabled = ctx.statisticsEnabled();
+ final boolean performanceStatsEnabled = ctx.kernalContext().performanceStatistics().enabled();
- final long start = statsEnabled ? System.nanoTime() : 0L;
+ long start = statsEnabled || performanceStatsEnabled ? System.nanoTime() : 0L;
IgniteInternalFuture<?> fut = asyncOp(new AsyncOp(map.keySet()) {
@Override public IgniteInternalFuture<GridCacheReturn> op(GridNearTxLocal tx,
@@ -2902,6 +2964,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (statsEnabled)
metrics0().addInvokeTimeNanos(System.nanoTime() - start);
+ if (performanceStatsEnabled)
+ writeStatistics(OperationType.CACHE_INVOKE_ALL, start);
+
assert ret != null;
return ret.value() != null ? ret.<Map<K, EntryProcessorResult<T>>>value() : Collections.<K, EntryProcessorResult<T>>emptyMap();
@@ -2921,8 +2986,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
warnIfUnordered(map, BulkOperation.INVOKE);
final boolean statsEnabled = ctx.statisticsEnabled();
+ final boolean performanceStatsEnabled = ctx.kernalContext().performanceStatistics().enabled();
- final long start = statsEnabled ? System.nanoTime() : 0L;
+ long start = statsEnabled || performanceStatsEnabled ? System.nanoTime() : 0L;
return syncOp(new SyncOp<Map<K, EntryProcessorResult<T>>>(map.size() == 1) {
@Nullable @Override public Map<K, EntryProcessorResult<T>> op(GridNearTxLocal tx)
@@ -2935,6 +3001,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (statsEnabled)
metrics0().addInvokeTimeNanos(System.nanoTime() - start);
+ if (performanceStatsEnabled)
+ writeStatistics(OperationType.CACHE_INVOKE_ALL, start);
+
return value;
}
});
@@ -2958,14 +3027,18 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheKey(key);
final boolean statsEnabled = ctx.statisticsEnabled();
+ final boolean performanceStatsEnabled = ctx.kernalContext().performanceStatistics().enabled();
- final long start = statsEnabled ? System.nanoTime() : 0L;
+ final long start = statsEnabled || performanceStatsEnabled ? System.nanoTime() : 0L;
IgniteInternalFuture<Boolean> fut = putAsync0(key, val, filter);
if (statsEnabled)
fut.listen(new UpdatePutTimeStatClosure<Boolean>(metrics0(), start));
+ if (performanceStatsEnabled)
+ fut.listen(f -> writeStatistics(OperationType.CACHE_PUT, start));
+
return fut;
}
@@ -3059,8 +3132,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
return;
boolean statsEnabled = ctx.statisticsEnabled();
+ boolean performanceStatsEnabled = ctx.kernalContext().performanceStatistics().enabled();
- long start = statsEnabled ? System.nanoTime() : 0L;
+ long start = statsEnabled || performanceStatsEnabled ? System.nanoTime() : 0L;
if (keyCheck)
validateCacheKeys(m.keySet());
@@ -3071,6 +3145,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (statsEnabled)
metrics0().addPutTimeNanos(System.nanoTime() - start);
+
+ if (performanceStatsEnabled)
+ writeStatistics(OperationType.CACHE_PUT_ALL, start);
}
/**
@@ -3096,8 +3173,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
return new GridFinishedFuture<Object>();
boolean statsEnabled = ctx.statisticsEnabled();
+ boolean performanceStatsEnabled = ctx.kernalContext().performanceStatistics().enabled();
- long start = statsEnabled ? System.nanoTime() : 0L;
+ long start = statsEnabled || performanceStatsEnabled ? System.nanoTime() : 0L;
if (keyCheck)
validateCacheKeys(m.keySet());
@@ -3109,6 +3187,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (statsEnabled)
fut.listen(new UpdatePutTimeStatClosure<Boolean>(metrics0(), start));
+ if (performanceStatsEnabled)
+ fut.listen(f -> writeStatistics(OperationType.CACHE_PUT_ALL, start));
+
return fut;
}
@@ -3135,8 +3216,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/** {@inheritDoc} */
@Nullable @Override public V getAndRemove(final K key) throws IgniteCheckedException {
boolean statsEnabled = ctx.statisticsEnabled();
+ boolean performanceStatsEnabled = ctx.kernalContext().performanceStatistics().enabled();
- long start = statsEnabled ? System.nanoTime() : 0L;
+ long start = statsEnabled || performanceStatsEnabled ? System.nanoTime() : 0L;
A.notNull(key, "key");
@@ -3148,6 +3230,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (statsEnabled)
metrics0().addRemoveAndGetTimeNanos(System.nanoTime() - start);
+ if (performanceStatsEnabled)
+ writeStatistics(OperationType.CACHE_GET_AND_REMOVE, start);
+
return prevVal;
}
@@ -3191,8 +3276,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/** {@inheritDoc} */
@Override public IgniteInternalFuture<V> getAndRemoveAsync(final K key) {
final boolean statsEnabled = ctx.statisticsEnabled();
+ final boolean performanceStatsEnabled = ctx.kernalContext().performanceStatistics().enabled();
- final long start = statsEnabled ? System.nanoTime() : 0L;
+ final long start = statsEnabled || performanceStatsEnabled ? System.nanoTime() : 0L;
A.notNull(key, "key");
@@ -3204,6 +3290,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (statsEnabled)
fut.listen(new UpdateRemoveTimeStatClosure<V>(metrics0(), start));
+ if (performanceStatsEnabled)
+ fut.listen(f -> writeStatistics(OperationType.CACHE_GET_AND_REMOVE, start));
+
return fut;
}
@@ -3255,8 +3344,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/** {@inheritDoc} */
@Override public void removeAll(final Collection<? extends K> keys) throws IgniteCheckedException {
boolean statsEnabled = ctx.statisticsEnabled();
+ boolean performanceStatsEnabled = ctx.kernalContext().performanceStatistics().enabled();
- long start = statsEnabled ? System.nanoTime() : 0L;
+ long start = statsEnabled || performanceStatsEnabled ? System.nanoTime() : 0L;
A.notNull(keys, "keys");
@@ -3272,6 +3362,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (statsEnabled)
metrics0().addRemoveTimeNanos(System.nanoTime() - start);
+
+ if (performanceStatsEnabled)
+ writeStatistics(OperationType.CACHE_REMOVE_ALL, start);
}
/**
@@ -3302,8 +3395,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
return new GridFinishedFuture<Object>();
final boolean statsEnabled = ctx.statisticsEnabled();
+ final boolean performanceStatsEnabled = ctx.kernalContext().performanceStatistics().enabled();
- final long start = statsEnabled ? System.nanoTime() : 0L;
+ final long start = statsEnabled || performanceStatsEnabled ? System.nanoTime() : 0L;
if (keyCheck)
validateCacheKeys(keys);
@@ -3315,6 +3409,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (statsEnabled)
fut.listen(new UpdateRemoveTimeStatClosure<>(metrics0(), start));
+ if (performanceStatsEnabled)
+ fut.listen(f -> writeStatistics(OperationType.CACHE_REMOVE_ALL, start));
+
return fut;
}
@@ -3353,8 +3450,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
*/
public boolean remove(final K key, @Nullable CacheEntryPredicate filter) throws IgniteCheckedException {
boolean statsEnabled = ctx.statisticsEnabled();
+ boolean performanceStatsEnabled = ctx.kernalContext().performanceStatistics().enabled();
- long start = statsEnabled ? System.nanoTime() : 0L;
+ long start = statsEnabled || performanceStatsEnabled ? System.nanoTime() : 0L;
A.notNull(key, "key");
@@ -3366,6 +3464,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (statsEnabled && rmv)
metrics0().addRemoveTimeNanos(System.nanoTime() - start);
+ if (performanceStatsEnabled)
+ writeStatistics(OperationType.CACHE_REMOVE, start);
+
return rmv;
}
@@ -3411,8 +3512,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
*/
public IgniteInternalFuture<Boolean> removeAsync(final K key, @Nullable final CacheEntryPredicate filter) {
final boolean statsEnabled = ctx.statisticsEnabled();
+ final boolean performanceStatsEnabled = ctx.kernalContext().performanceStatistics().enabled();
- final long start = statsEnabled ? System.nanoTime() : 0L;
+ final long start = statsEnabled || performanceStatsEnabled ? System.nanoTime() : 0L;
A.notNull(key, "key");
@@ -3424,6 +3526,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (statsEnabled)
fut.listen(new UpdateRemoveTimeStatClosure<Boolean>(metrics0(), start));
+ if (performanceStatsEnabled)
+ fut.listen(f -> writeStatistics(OperationType.CACHE_REMOVE, start));
+
return fut;
}
@@ -6793,6 +6898,20 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
/**
+ * Writes cache operation performance statistics.
+ *
+ * @param op Operation type.
+ * @param start Start time in nanoseconds.
+ */
+ private void writeStatistics(OperationType op, long start) {
+ ctx.kernalContext().performanceStatistics().cacheOperation(
+ op,
+ ctx.cacheId(),
+ U.currentTimeMillis(),
+ System.nanoTime() - start);
+ }
+
+ /**
* Delayed callable class.
*/
public abstract static class TopologyVersionAwareJob extends ComputeJobAdapter {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index ae33bf8..7b3f9c1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -2293,6 +2293,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
grp.onCacheStarted(cacheCtx);
onKernalStart(cache);
+
+ if (ctx.performanceStatistics().enabled() && U.isLocalNodeCoordinator(ctx.discovery()))
+ ctx.performanceStatistics().cacheStart(cacheCtx.cacheId(), cfg.getName());
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
index d5a9ed0..958053b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
@@ -74,6 +74,7 @@ import org.apache.ignite.internal.processors.marshaller.GridMarshallerMappingPro
import org.apache.ignite.internal.processors.metastorage.DistributedMetaStorage;
import org.apache.ignite.internal.processors.metric.GridMetricManager;
import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor;
+import org.apache.ignite.internal.processors.performancestatistics.PerformanceStatisticsProcessor;
import org.apache.ignite.internal.processors.platform.PlatformProcessor;
import org.apache.ignite.internal.processors.plugin.IgnitePluginProcessor;
import org.apache.ignite.internal.processors.pool.PoolProcessor;
@@ -728,4 +729,9 @@ public class StandaloneGridKernalContext implements GridKernalContext {
@Override public ExecutorService buildIndexExecutorService() {
return null;
}
+
+ /** {@inheritDoc} */
+ @Override public PerformanceStatisticsProcessor performanceStatistics() {
+ return null;
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
index 1380772..0019df0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
@@ -279,4 +279,9 @@ public class GridCacheDistributedQueryFuture<K, V, R> extends GridCacheQueryFutu
if (qryMgr != null)
qryMgr.removeQueryFuture(reqId);
}
+
+ /** @return Request ID. */
+ long requestId() {
+ return reqId;
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
index 3abebf4..5474fc5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
@@ -36,6 +36,8 @@ import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+import org.apache.ignite.internal.metric.IoStatisticsHolder;
+import org.apache.ignite.internal.metric.IoStatisticsQueryHelper;
import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
import org.apache.ignite.internal.util.GridBoundedConcurrentOrderedSet;
@@ -57,6 +59,7 @@ import static org.apache.ignite.cache.CacheMode.LOCAL;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE;
+import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SCAN;
/**
* Distributed query manager (for cache in REPLICATED / PARTITIONED cache mode).
@@ -597,6 +600,11 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
assert qry.type() == GridCacheQueryType.SCAN : qry;
assert qry.mvccSnapshot() != null || !cctx.mvccEnabled();
+ boolean performanceStatsEnabled = cctx.kernalContext().performanceStatistics().enabled();
+
+ long startTime = performanceStatsEnabled ? System.currentTimeMillis() : 0;
+ long startTimeNanos = performanceStatsEnabled ? System.nanoTime() : 0;
+
GridCloseableIterator locIter0 = null;
for (ClusterNode node : nodes) {
@@ -627,6 +635,12 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
/** */
private Object cur;
+ /** Logical reads. */
+ private long logicalReads;
+
+ /** Physical reads. */
+ private long physicalReads;
+
@Override protected Object onNext() throws IgniteCheckedException {
if (!onHasNext())
throw new NoSuchElementException();
@@ -642,8 +656,23 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
if (cur != null)
return true;
- if (locIter != null && locIter.hasNextX())
- cur = locIter.nextX();
+ if (locIter != null) {
+ if (performanceStatsEnabled)
+ IoStatisticsQueryHelper.startGatheringQueryStatistics();
+
+ try {
+ if (locIter.hasNextX())
+ cur = locIter.nextX();
+
+ } finally {
+ if (performanceStatsEnabled) {
+ IoStatisticsHolder stat = IoStatisticsQueryHelper.finishGatheringQueryStatistics();
+
+ logicalReads += stat.logicalReads();
+ physicalReads += stat.physicalReads();
+ }
+ }
+ }
return cur != null || (cur = convert(fut.next())) != null;
}
@@ -669,6 +698,25 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
if (fut != null)
fut.cancel();
+
+ if (performanceStatsEnabled) {
+ cctx.kernalContext().performanceStatistics().query(
+ SCAN,
+ cctx.name(),
+ ((GridCacheDistributedQueryFuture)fut).requestId(),
+ startTime,
+ System.nanoTime() - startTimeNanos,
+ true);
+
+ if (logicalReads > 0 || physicalReads > 0) {
+ cctx.kernalContext().performanceStatistics().queryReads(
+ SCAN,
+ cctx.localNodeId(),
+ ((GridCacheDistributedQueryFuture)fut).requestId(),
+ logicalReads,
+ physicalReads);
+ }
+ }
}
};
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index f36ebb2..07d906b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -61,6 +61,8 @@ import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+import org.apache.ignite.internal.metric.IoStatisticsHolder;
+import org.apache.ignite.internal.metric.IoStatisticsQueryHelper;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
@@ -1119,6 +1121,11 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
}
try {
+ boolean performanceStatsEnabled = cctx.kernalContext().performanceStatistics().enabled();
+
+ if (performanceStatsEnabled)
+ IoStatisticsQueryHelper.startGatheringQueryStatistics();
+
boolean loc = qryInfo.local();
QueryResult<K, V> res = null;
@@ -1376,6 +1383,19 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
}
else if (rmvIter)
removeQueryResult(qryInfo.senderId(), qryInfo.requestId());
+
+ if (performanceStatsEnabled) {
+ IoStatisticsHolder stat = IoStatisticsQueryHelper.finishGatheringQueryStatistics();
+
+ if (stat.logicalReads() > 0 || stat.physicalReads() > 0) {
+ cctx.kernalContext().performanceStatistics().queryReads(
+ res.type(),
+ qryInfo.senderId(),
+ qryInfo.requestId(),
+ stat.logicalReads(),
+ stat.physicalReads());
+ }
+ }
}
}
finally {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
index ec44544..8d419fa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
@@ -94,6 +94,13 @@ public interface IgniteInternalTx {
public long startTime();
/**
+ * Start time of this transaction in nanoseconds to measure duration.
+ *
+ * @return Start time of this transaction in nanoseconds.
+ */
+ public long startTimeNanos();
+
+ /**
* Cache transaction isolation level.
*
* @return Isolation level.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 5a552a9..9daddb5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -163,6 +163,9 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
@GridToStringInclude
protected long startTime = U.currentTimeMillis();
+ /** Transaction start time in nanoseconds to measure duration. */
+ protected long startTimeNanos;
+
/** Node ID. */
@GridToStringInclude
protected UUID nodeId;
@@ -343,6 +346,9 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_REMOVED);
taskName = needTaskName ? cctx.kernalContext().task().resolveTaskName(taskNameHash) : null;
+
+ if (cctx.kernalContext().performanceStatistics().enabled())
+ startTimeNanos = System.nanoTime();
}
/**
@@ -399,6 +405,9 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_REMOVED);
taskName = needTaskName ? cctx.kernalContext().task().resolveTaskName(taskNameHash) : null;
+
+ if (cctx.kernalContext().performanceStatistics().enabled())
+ startTimeNanos = System.nanoTime();
}
/**
@@ -724,6 +733,11 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
return startTime;
}
+ /** {@inheritDoc} */
+ @Override public long startTimeNanos() {
+ return startTimeNanos;
+ }
+
/**
* @return Flag indicating whether transaction needs return value.
*/
@@ -2027,6 +2041,9 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
/** Start time. */
private final long startTime;
+ /** Start time in nanoseconds. */
+ private final long startTimeNanos;
+
/** Transaction isolation. */
private final TransactionIsolation isolation;
@@ -2061,13 +2078,14 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
* @param state Transaction state.
* @param rollbackOnly Rollback-only flag.
*/
- TxShadow(IgniteUuid xid, UUID nodeId, long threadId, long startTime, TransactionIsolation isolation,
- TransactionConcurrency concurrency, boolean invalidate, boolean implicit, long timeout,
- TransactionState state, boolean rollbackOnly) {
+ TxShadow(IgniteUuid xid, UUID nodeId, long threadId, long startTime, long startTimeNanos,
+ TransactionIsolation isolation, TransactionConcurrency concurrency, boolean invalidate, boolean implicit,
+ long timeout, TransactionState state, boolean rollbackOnly) {
this.xid = xid;
this.nodeId = nodeId;
this.threadId = threadId;
this.startTime = startTime;
+ this.startTimeNanos = startTimeNanos;
this.isolation = isolation;
this.concurrency = concurrency;
this.invalidate = invalidate;
@@ -2113,6 +2131,11 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
}
/** {@inheritDoc} */
+ @Override public long startTimeNanos() {
+ return startTimeNanos;
+ }
+
+ /** {@inheritDoc} */
@Override public TransactionIsolation isolation() {
return isolation;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index f57d276..34834ba 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -34,7 +34,6 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteClientDisconnectedException;
import org.apache.ignite.IgniteException;
@@ -1676,6 +1675,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
if (!tx.system())
cctx.txMetrics().onTxCommit();
+ writeStatistics(tx, true);
+
tx.txState().onTxEnd(cctx, tx, true);
}
@@ -1745,6 +1746,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
if (!tx.system())
cctx.txMetrics().onTxRollback();
+ writeStatistics(tx, false);
+
tx.txState().onTxEnd(cctx, tx, false);
}
@@ -1798,6 +1801,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
cctx.txMetrics().onTxRollback();
}
+ writeStatistics(tx, commit);
+
tx.txState().onTxEnd(cctx, tx, commit);
}
}
@@ -3216,6 +3221,23 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
}
/**
+ * Writes transaction performance statistics.
+ *
+ * @param tx Transaction.
+ * @param commited {@code True} if transaction commited.
+ */
+ private void writeStatistics(IgniteInternalTx tx, boolean commited) {
+ if (!cctx.kernalContext().performanceStatistics().enabled() || tx.startTimeNanos() == 0)
+ return;
+
+ cctx.kernalContext().performanceStatistics().transaction(
+ tx.txState().cacheIds(),
+ tx.startTime(),
+ System.nanoTime() - tx.startTimeNanos(),
+ commited);
+ }
+
+ /**
* Transactions recovery initialization runnable.
*/
private final class TxRecoveryInitRunnable implements Runnable {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
index 2833f38..e290286 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
@@ -2039,6 +2039,14 @@ public class GridJobProcessor extends GridProcessorAdapter {
}
}
}
+
+ if (ctx.performanceStatistics().enabled()) {
+ ctx.performanceStatistics().job(ses.getId(),
+ worker.getQueuedTime(),
+ worker.getStartTime(),
+ worker.getExecuteTime(),
+ worker.isTimedOut());
+ }
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/FilePerformanceStatisticsReader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/FilePerformanceStatisticsReader.java
new file mode 100644
index 0000000..499ff35
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/FilePerformanceStatisticsReader.java
@@ -0,0 +1,529 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.performancestatistics;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.FileVisitOption;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
+import org.apache.ignite.internal.util.GridIntList;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+
+import static java.nio.ByteBuffer.allocateDirect;
+import static java.nio.ByteOrder.nativeOrder;
+import static java.nio.file.Files.walkFileTree;
+import static org.apache.ignite.internal.processors.performancestatistics.OperationType.CACHE_START;
+import static org.apache.ignite.internal.processors.performancestatistics.OperationType.JOB;
+import static org.apache.ignite.internal.processors.performancestatistics.OperationType.QUERY;
+import static org.apache.ignite.internal.processors.performancestatistics.OperationType.QUERY_READS;
+import static org.apache.ignite.internal.processors.performancestatistics.OperationType.TASK;
+import static org.apache.ignite.internal.processors.performancestatistics.OperationType.TX_COMMIT;
+import static org.apache.ignite.internal.processors.performancestatistics.OperationType.cacheOperation;
+import static org.apache.ignite.internal.processors.performancestatistics.OperationType.cacheRecordSize;
+import static org.apache.ignite.internal.processors.performancestatistics.OperationType.cacheStartRecordSize;
+import static org.apache.ignite.internal.processors.performancestatistics.OperationType.jobRecordSize;
+import static org.apache.ignite.internal.processors.performancestatistics.OperationType.queryReadsRecordSize;
+import static org.apache.ignite.internal.processors.performancestatistics.OperationType.queryRecordSize;
+import static org.apache.ignite.internal.processors.performancestatistics.OperationType.taskRecordSize;
+import static org.apache.ignite.internal.processors.performancestatistics.OperationType.transactionOperation;
+import static org.apache.ignite.internal.processors.performancestatistics.OperationType.transactionRecordSize;
+
+/**
+ * Walker over the performance statistics file.
+ *
+ * @see FilePerformanceStatisticsWriter
+ */
+public class FilePerformanceStatisticsReader {
+ /** Default file read buffer size. */
+ private static final int DFLT_READ_BUFFER_SIZE = (int)(8 * U.MB);
+
+ /** Uuid as string pattern. */
+ private static final String UUID_STR_PATTERN =
+ "[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{12}";
+
+ /** File name pattern. */
+ private static final Pattern FILE_PATTERN = Pattern.compile("^node-(" + UUID_STR_PATTERN + ")(-\\d+)?.prf$");
+
+ /** No-op handler. */
+ private static final PerformanceStatisticsHandler[] NOOP_HANDLER = {};
+
+ /** IO factory. */
+ private final RandomAccessFileIOFactory ioFactory = new RandomAccessFileIOFactory();
+
+ /** Current file I/O. */
+ private FileIO fileIo;
+
+ /** Buffer. */
+ private final ByteBuffer buf;
+
+ /** Handlers to process deserialized operations. */
+ private final PerformanceStatisticsHandler[] handlers;
+
+ /** Current handlers. */
+ private PerformanceStatisticsHandler[] curHnd;
+
+ /** Cached strings by hashcodes. */
+ private final Map<Integer, String> knownStrs = new HashMap<>();
+
+ /** Forward read mode. */
+ private ForwardRead forwardRead;
+
+ /** @param handlers Handlers to process deserialized operations. */
+ public FilePerformanceStatisticsReader(PerformanceStatisticsHandler... handlers) {
+ this(DFLT_READ_BUFFER_SIZE, handlers);
+ }
+
+ /**
+ * @param bufSize Buffer size.
+ * @param handlers Handlers to process deserialized operations.
+ */
+ FilePerformanceStatisticsReader(int bufSize, PerformanceStatisticsHandler... handlers) {
+ A.notEmpty(handlers, "At least one handler expected.");
+
+ buf = allocateDirect(bufSize).order(nativeOrder());
+ this.handlers = handlers;
+ curHnd = handlers;
+ }
+
+ /**
+ * Walks over performance statistics files.
+ *
+ * @param filesOrDirs Files or directories.
+ * @throws IOException If read failed.
+ */
+ public void read(List<File> filesOrDirs) throws IOException {
+ List<File> files = resolveFiles(filesOrDirs);
+
+ if (files.isEmpty())
+ return;
+
+ for (File file : files) {
+ buf.clear();
+
+ UUID nodeId = nodeId(file);
+
+ try (FileIO io = ioFactory.create(file)) {
+ fileIo = io;
+
+ while (true) {
+ if (io.read(buf) <= 0) {
+ if (forwardRead == null)
+ break;
+
+ io.position(forwardRead.nextRecPos);
+
+ buf.clear();
+
+ curHnd = handlers;
+
+ forwardRead = null;
+
+ continue;
+ }
+
+ buf.flip();
+
+ buf.mark();
+
+ while (deserialize(buf, nodeId)) {
+ if (forwardRead != null && forwardRead.found) {
+ if (forwardRead.resetBuf) {
+ buf.limit(0);
+
+ io.position(forwardRead.curRecPos);
+ }
+ else
+ buf.position(forwardRead.bufPos);
+
+ curHnd = handlers;
+
+ forwardRead = null;
+ }
+
+ buf.mark();
+ }
+
+ buf.reset();
+
+ if (forwardRead != null)
+ forwardRead.resetBuf = true;
+
+ buf.compact();
+ }
+ }
+
+ knownStrs.clear();
+ forwardRead = null;
+ }
+ }
+
+ /**
+ * @param buf Buffer.
+ * @param nodeId Node id.
+ * @return {@code True} if operation deserialized. {@code False} if not enough bytes.
+ */
+ private boolean deserialize(ByteBuffer buf, UUID nodeId) throws IOException {
+ if (buf.remaining() < 1)
+ return false;
+
+ byte opTypeByte = buf.get();
+
+ OperationType opType = OperationType.of(opTypeByte);
+
+ if (cacheOperation(opType)) {
+ if (buf.remaining() < cacheRecordSize())
+ return false;
+
+ int cacheId = buf.getInt();
+ long startTime = buf.getLong();
+ long duration = buf.getLong();
+
+ for (PerformanceStatisticsHandler handler : curHnd)
+ handler.cacheOperation(nodeId, opType, cacheId, startTime, duration);
+
+ return true;
+ }
+ else if (transactionOperation(opType)) {
+ if (buf.remaining() < 4)
+ return false;
+
+ int cacheIdsCnt = buf.getInt();
+
+ if (buf.remaining() < transactionRecordSize(cacheIdsCnt) - 4)
+ return false;
+
+ GridIntList cacheIds = new GridIntList(cacheIdsCnt);
+
+ for (int i = 0; i < cacheIdsCnt; i++)
+ cacheIds.add(buf.getInt());
+
+ long startTime = buf.getLong();
+ long duration = buf.getLong();
+
+ for (PerformanceStatisticsHandler handler : curHnd)
+ handler.transaction(nodeId, cacheIds, startTime, duration, opType == TX_COMMIT);
+
+ return true;
+ }
+ else if (opType == QUERY) {
+ if (buf.remaining() < 1)
+ return false;
+
+ boolean cached = buf.get() != 0;
+
+ String text;
+ int hash = 0;
+
+ if (cached) {
+ if (buf.remaining() < 4)
+ return false;
+
+ hash = buf.getInt();
+
+ text = knownStrs.get(hash);
+
+ if (buf.remaining() < queryRecordSize(0, true) - 1 - 4)
+ return false;
+ }
+ else {
+ if (buf.remaining() < 4)
+ return false;
+
+ int textLen = buf.getInt();
+
+ if (buf.remaining() < queryRecordSize(textLen, false) - 1 - 4)
+ return false;
+
+ text = readString(buf, textLen);
+ }
+
+ GridCacheQueryType queryType = GridCacheQueryType.fromOrdinal(buf.get());
+ long id = buf.getLong();
+ long startTime = buf.getLong();
+ long duration = buf.getLong();
+ boolean success = buf.get() != 0;
+
+ if (text == null)
+ forwardRead(hash);
+
+ for (PerformanceStatisticsHandler handler : curHnd)
+ handler.query(nodeId, queryType, text, id, startTime, duration, success);
+
+ return true;
+ }
+ else if (opType == QUERY_READS) {
+ if (buf.remaining() < queryReadsRecordSize())
+ return false;
+
+ GridCacheQueryType queryType = GridCacheQueryType.fromOrdinal(buf.get());
+ UUID uuid = readUuid(buf);
+ long id = buf.getLong();
+ long logicalReads = buf.getLong();
+ long physicalReads = buf.getLong();
+
+ for (PerformanceStatisticsHandler handler : curHnd)
+ handler.queryReads(nodeId, queryType, uuid, id, logicalReads, physicalReads);
+
+ return true;
+ }
+ else if (opType == TASK) {
+ if (buf.remaining() < 1)
+ return false;
+
+ boolean cached = buf.get() != 0;
+
+ String taskName;
+ int hash = 0;
+
+ if (cached) {
+ if (buf.remaining() < 4)
+ return false;
+
+ hash = buf.getInt();
+
+ taskName = knownStrs.get(hash);
+
+ if (buf.remaining() < taskRecordSize(0, true) - 1 - 4)
+ return false;
+ }
+ else {
+ if (buf.remaining() < 4)
+ return false;
+
+ int nameLen = buf.getInt();
+
+ if (buf.remaining() < taskRecordSize(nameLen, false) - 1 - 4)
+ return false;
+
+ taskName = readString(buf, nameLen);
+ }
+
+ IgniteUuid sesId = readIgniteUuid(buf);
+ long startTime = buf.getLong();
+ long duration = buf.getLong();
+ int affPartId = buf.getInt();
+
+ if (taskName == null)
+ forwardRead(hash);
+
+ for (PerformanceStatisticsHandler handler : curHnd)
+ handler.task(nodeId, sesId, taskName, startTime, duration, affPartId);
+
+ return true;
+ }
+ else if (opType == JOB) {
+ if (buf.remaining() < jobRecordSize())
+ return false;
+
+ IgniteUuid sesId = readIgniteUuid(buf);
+ long queuedTime = buf.getLong();
+ long startTime = buf.getLong();
+ long duration = buf.getLong();
+ boolean timedOut = buf.get() != 0;
+
+ for (PerformanceStatisticsHandler handler : curHnd)
+ handler.job(nodeId, sesId, queuedTime, startTime, duration, timedOut);
+
+ return true;
+ }
+ else if (opType == CACHE_START) {
+ if (buf.remaining() < 1)
+ return false;
+
+ boolean cached = buf.get() != 0;
+
+ String cacheName;
+ int hash = 0;
+
+ if (cached) {
+ if (buf.remaining() < 4)
+ return false;
+
+ hash = buf.getInt();
+
+ cacheName = knownStrs.get(hash);
+
+ if (buf.remaining() < cacheStartRecordSize(0, true) - 1 - 4)
+ return false;
+ }
+ else {
+ if (buf.remaining() < 4)
+ return false;
+
+ int nameLen = buf.getInt();
+
+ if (buf.remaining() < cacheStartRecordSize(nameLen, false) - 1 - 4)
+ return false;
+
+ cacheName = readString(buf, nameLen);
+ }
+
+ int cacheId = buf.getInt();
+
+ for (PerformanceStatisticsHandler handler : curHnd)
+ handler.cacheStart(nodeId, cacheId, cacheName);
+
+ return true;
+ }
+ else
+ throw new IgniteException("Unknown operation type id [typeId=" + opTypeByte + ']');
+ }
+
+ /** Turns on forward read mode. */
+ private void forwardRead(int hash) throws IOException {
+ if (forwardRead != null)
+ return;
+
+ int pos = buf.position();
+
+ long nextRecPos = fileIo.position() - buf.remaining();
+
+ buf.reset();
+
+ int bufPos = buf.position();
+
+ long curRecPos = fileIo.position() - buf.remaining();
+
+ buf.position(pos);
+
+ curHnd = NOOP_HANDLER;
+
+ forwardRead = new ForwardRead(hash, curRecPos, nextRecPos, bufPos);
+ }
+
+ /** Resolves performance statistics files. */
+ static List<File> resolveFiles(List<File> filesOrDirs) throws IOException {
+ if (filesOrDirs == null || filesOrDirs.isEmpty())
+ return Collections.emptyList();
+
+ List<File> files = new LinkedList<>();
+
+ for (File file : filesOrDirs) {
+ if (file.isDirectory()) {
+ walkFileTree(file.toPath(), EnumSet.noneOf(FileVisitOption.class), 1,
+ new SimpleFileVisitor<Path>() {
+ @Override public FileVisitResult visitFile(Path path, BasicFileAttributes attrs) {
+ if (nodeId(path.toFile()) != null)
+ files.add(path.toFile());
+
+ return FileVisitResult.CONTINUE;
+ }
+ });
+
+ continue;
+ }
+
+ if (nodeId(file) != null)
+ files.add(file);
+ }
+
+ return files;
+ }
+
+ /** @return UUID node of file. {@code Null} if this is not a statistics file. */
+ @Nullable private static UUID nodeId(File file) {
+ Matcher matcher = FILE_PATTERN.matcher(file.getName());
+
+ if (matcher.matches())
+ return UUID.fromString(matcher.group(1));
+
+ return null;
+ }
+
+ /** Reads string from byte buffer. */
+ private String readString(ByteBuffer buf, int size) {
+ byte[] bytes = new byte[size];
+
+ buf.get(bytes);
+
+ String str = new String(bytes);
+
+ knownStrs.putIfAbsent(str.hashCode(), str);
+
+ if (forwardRead != null && forwardRead.hash == str.hashCode())
+ forwardRead.found = true;
+
+ return str;
+ }
+
+ /** Reads {@link UUID} from buffer. */
+ private static UUID readUuid(ByteBuffer buf) {
+ return new UUID(buf.getLong(), buf.getLong());
+ }
+
+ /** Reads {@link IgniteUuid} from buffer. */
+ private static IgniteUuid readIgniteUuid(ByteBuffer buf) {
+ UUID globalId = new UUID(buf.getLong(), buf.getLong());
+
+ return new IgniteUuid(globalId, buf.getLong());
+ }
+
+ /** Forward read mode info. */
+ private static class ForwardRead {
+ /** Hashcode. */
+ final int hash;
+
+ /** Absolute current record position. */
+ final long curRecPos;
+
+ /** Absolute next record position. */
+ final long nextRecPos;
+
+ /** Current record buffer position. */
+ final int bufPos;
+
+ /** String found flag. */
+ boolean found;
+
+ /** {@code True} if the data in the buffer was overwritten during the search. */
+ boolean resetBuf;
+
+ /**
+ * @param hash Hashcode.
+ * @param curRecPos Absolute current record position.
+ * @param nextRecPos Absolute next record position.
+ * @param bufPos Buffer position.
+ */
+ private ForwardRead(int hash, long curRecPos, long nextRecPos, int bufPos) {
+ this.hash = hash;
+ this.curRecPos = curRecPos;
+ this.nextRecPos = nextRecPos;
+ this.bufPos = bufPos;
+ }
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/FilePerformanceStatisticsWriter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/FilePerformanceStatisticsWriter.java
new file mode 100644
index 0000000..d3e0472
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/FilePerformanceStatisticsWriter.java
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.performancestatistics;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedByInterruptException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.wal.SegmentedRingByteBuffer;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.GridIntIterator;
+import org.apache.ignite.internal.util.GridIntList;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.util.worker.GridWorker;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.thread.IgniteThread;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_PERF_STAT_BUFFER_SIZE;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_PERF_STAT_CACHED_STRINGS_THRESHOLD;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_PERF_STAT_FILE_MAX_SIZE;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_PERF_STAT_FLUSH_SIZE;
+import static org.apache.ignite.internal.processors.performancestatistics.OperationType.CACHE_START;
+import static org.apache.ignite.internal.processors.performancestatistics.OperationType.JOB;
+import static org.apache.ignite.internal.processors.performancestatistics.OperationType.QUERY;
+import static org.apache.ignite.internal.processors.performancestatistics.OperationType.QUERY_READS;
+import static org.apache.ignite.internal.processors.performancestatistics.OperationType.TASK;
+import static org.apache.ignite.internal.processors.performancestatistics.OperationType.TX_COMMIT;
+import static org.apache.ignite.internal.processors.performancestatistics.OperationType.TX_ROLLBACK;
+import static org.apache.ignite.internal.processors.performancestatistics.OperationType.cacheRecordSize;
+import static org.apache.ignite.internal.processors.performancestatistics.OperationType.cacheStartRecordSize;
+import static org.apache.ignite.internal.processors.performancestatistics.OperationType.jobRecordSize;
+import static org.apache.ignite.internal.processors.performancestatistics.OperationType.queryReadsRecordSize;
+import static org.apache.ignite.internal.processors.performancestatistics.OperationType.queryRecordSize;
+import static org.apache.ignite.internal.processors.performancestatistics.OperationType.taskRecordSize;
+import static org.apache.ignite.internal.processors.performancestatistics.OperationType.transactionRecordSize;
+
+/**
+ * Performance statistics writer based on logging to a file.
+ * <p>
+ * Each node collects statistics to a file placed under {@link #PERF_STAT_DIR}.
+ * <p>
+ * To iterate over records use {@link FilePerformanceStatisticsReader}.
+ */
+public class FilePerformanceStatisticsWriter {
+ /** Directory to store performance statistics files. Placed under Ignite work directory. */
+ public static final String PERF_STAT_DIR = "perf_stat";
+
+ /** Default maximum file size in bytes. Performance statistics will be stopped when the size exceeded. */
+ public static final long DFLT_FILE_MAX_SIZE = 32 * U.GB;
+
+ /** Default off heap buffer size in bytes. */
+ public static final int DFLT_BUFFER_SIZE = (int)(32 * U.MB);
+
+ /** Default minimal batch size to flush in bytes. */
+ public static final int DFLT_FLUSH_SIZE = (int)(8 * U.MB);
+
+ /** Default maximum cached strings threshold. String caching will stop on threshold excess. */
+ public static final int DFLT_CACHED_STRINGS_THRESHOLD = 1024;
+
+ /** File writer thread name. */
+ static final String WRITER_THREAD_NAME = "performance-statistics-writer";
+
+ /** Minimal batch size to flush in bytes. */
+ private final int flushSize =
+ IgniteSystemProperties.getInteger(IGNITE_PERF_STAT_FLUSH_SIZE, DFLT_FLUSH_SIZE);
+
+ /** Maximum cached strings threshold. String caching will stop on threshold excess. */
+ private final int cachedStrsThreshold =
+ IgniteSystemProperties.getInteger(IGNITE_PERF_STAT_CACHED_STRINGS_THRESHOLD, DFLT_CACHED_STRINGS_THRESHOLD);
+
+ /** Factory to provide I/O interface. */
+ private final FileIOFactory fileIoFactory = new RandomAccessFileIOFactory();
+
+ /** Performance statistics file I/O. */
+ private final FileIO fileIo;
+
+ /** Performance statistics file writer worker. */
+ private final FileWriter fileWriter;
+
+ /** File writer thread started flag. */
+ private boolean started;
+
+ /** File write buffer. */
+ private final SegmentedRingByteBuffer ringByteBuf;
+
+ /** Count of written to buffer bytes. */
+ private final AtomicInteger writtenToBuf = new AtomicInteger();
+
+ /** {@code True} if the small buffer warning message logged. */
+ private final AtomicBoolean smallBufLogged = new AtomicBoolean();
+
+ /** {@code True} if worker stopped due to maximum file size reached. */
+ private final AtomicBoolean stopByMaxSize = new AtomicBoolean();
+
+ /** Logger. */
+ private final IgniteLogger log;
+
+ /** Hashcodes of cached strings. */
+ private final Set<Integer> knownStrs = new GridConcurrentHashSet<>();
+
+ /** Count of cached strings. */
+ private volatile int knownStrsSz;
+
+ /** @param ctx Kernal context. */
+ public FilePerformanceStatisticsWriter(GridKernalContext ctx) throws IgniteCheckedException, IOException {
+ log = ctx.log(getClass());
+
+ File file = resolveStatisticsFile(ctx);
+
+ fileIo = fileIoFactory.create(file);
+
+ log.info("Performance statistics file created [file=" + file.getAbsolutePath() + ']');
+
+ long fileMaxSize = IgniteSystemProperties.getLong(IGNITE_PERF_STAT_FILE_MAX_SIZE, DFLT_FILE_MAX_SIZE);
+ int bufSize = IgniteSystemProperties.getInteger(IGNITE_PERF_STAT_BUFFER_SIZE, DFLT_BUFFER_SIZE);
+
+ ringByteBuf = new SegmentedRingByteBuffer(bufSize, fileMaxSize, SegmentedRingByteBuffer.BufferMode.DIRECT);
+
+ fileWriter = new FileWriter(ctx, log);
+ }
+
+ /** Starts collecting performance statistics. */
+ public synchronized void start() {
+ assert !started;
+
+ new IgniteThread(fileWriter).start();
+
+ started = true;
+ }
+
+ /** Stops collecting performance statistics. */
+ public synchronized void stop() {
+ assert started;
+
+ // Stop accepting new records.
+ ringByteBuf.close();
+
+ U.awaitForWorkersStop(Collections.singleton(fileWriter), true, log);
+
+ // Make sure that all producers released their buffers to safe deallocate memory (in case of worker
+ // stopped abnormally).
+ ringByteBuf.poll();
+
+ ringByteBuf.free();
+
+ try {
+ fileIo.force();
+ }
+ catch (IOException e) {
+ log.warning("Failed to fsync the performance statistics file.", e);
+ }
+
+ U.closeQuiet(fileIo);
+
+ knownStrs.clear();
+
+ started = false;
+ }
+
+ /**
+ * @param cacheId Cache id.
+ * @param name Cache name.
+ */
+ public void cacheStart(int cacheId, String name) {
+ boolean cached = cacheIfPossible(name);
+
+ doWrite(CACHE_START, cacheStartRecordSize(cached ? 0 : name.getBytes().length, cached), buf -> {
+ writeString(buf, name, cached);
+ buf.putInt(cacheId);
+ });
+ }
+
+ /**
+ * @param type Operation type.
+ * @param cacheId Cache id.
+ * @param startTime Start time in milliseconds.
+ * @param duration Duration in nanoseconds.
+ */
+ public void cacheOperation(OperationType type, int cacheId, long startTime, long duration) {
+ doWrite(type, cacheRecordSize(), buf -> {
+ buf.putInt(cacheId);
+ buf.putLong(startTime);
+ buf.putLong(duration);
+ });
+ }
+
+ /**
+ * @param cacheIds Cache IDs.
+ * @param startTime Start time in milliseconds.
+ * @param duration Duration in nanoseconds.
+ * @param commited {@code True} if commited.
+ */
+ public void transaction(GridIntList cacheIds, long startTime, long duration, boolean commited) {
+ doWrite(commited ? TX_COMMIT : TX_ROLLBACK, transactionRecordSize(cacheIds.size()), buf -> {
+ buf.putInt(cacheIds.size());
+
+ GridIntIterator iter = cacheIds.iterator();
+
+ while (iter.hasNext())
+ buf.putInt(iter.next());
+
+ buf.putLong(startTime);
+ buf.putLong(duration);
+ });
+ }
+
+ /**
+ * @param type Cache query type.
+ * @param text Query text in case of SQL query. Cache name in case of SCAN query.
+ * @param id Query id.
+ * @param startTime Start time in milliseconds.
+ * @param duration Duration in nanoseconds.
+ * @param success Success flag.
+ */
+ public void query(GridCacheQueryType type, String text, long id, long startTime, long duration, boolean success) {
+ boolean cached = cacheIfPossible(text);
+
+ doWrite(QUERY, queryRecordSize(cached ? 0 : text.getBytes().length, cached), buf -> {
+ writeString(buf, text, cached);
+ buf.put((byte)type.ordinal());
+ buf.putLong(id);
+ buf.putLong(startTime);
+ buf.putLong(duration);
+ buf.put(success ? (byte)1 : 0);
+ });
+ }
+
+ /**
+ * @param type Cache query type.
+ * @param queryNodeId Originating node id.
+ * @param id Query id.
+ * @param logicalReads Number of logical reads.
+ * @param physicalReads Number of physical reads.
+ */
+ public void queryReads(GridCacheQueryType type, UUID queryNodeId, long id, long logicalReads, long physicalReads) {
+ doWrite(QUERY_READS, queryReadsRecordSize(), buf -> {
+ buf.put((byte)type.ordinal());
+ writeUuid(buf, queryNodeId);
+ buf.putLong(id);
+ buf.putLong(logicalReads);
+ buf.putLong(physicalReads);
+ });
+ }
+
+ /**
+ * @param sesId Session id.
+ * @param taskName Task name.
+ * @param startTime Start time in milliseconds.
+ * @param duration Duration.
+ * @param affPartId Affinity partition id.
+ */
+ public void task(IgniteUuid sesId, String taskName, long startTime, long duration, int affPartId) {
+ boolean cached = cacheIfPossible(taskName);
+
+ doWrite(TASK, taskRecordSize(cached ? 0 : taskName.getBytes().length, cached), buf -> {
+ writeString(buf, taskName, cached);
+ writeIgniteUuid(buf, sesId);
+ buf.putLong(startTime);
+ buf.putLong(duration);
+ buf.putInt(affPartId);
+ });
+ }
+
+ /**
+ * @param sesId Session id.
+ * @param queuedTime Time job spent on waiting queue.
+ * @param startTime Start time in milliseconds.
+ * @param duration Job execution time.
+ * @param timedOut {@code True} if job is timed out.
+ */
+ public void job(IgniteUuid sesId, long queuedTime, long startTime, long duration, boolean timedOut) {
+ doWrite(JOB, jobRecordSize(), buf -> {
+ writeIgniteUuid(buf, sesId);
+ buf.putLong(queuedTime);
+ buf.putLong(startTime);
+ buf.putLong(duration);
+ buf.put(timedOut ? (byte)1 : 0);
+ });
+ }
+
+ /**
+ * @param op Operation type.
+ * @param recSize Record size.
+ * @param writer Record writer.
+ */
+ private void doWrite(OperationType op, int recSize, Consumer<ByteBuffer> writer) {
+ int size = recSize + /*type*/ 1;
+
+ SegmentedRingByteBuffer.WriteSegment seg = ringByteBuf.offer(size);
+
+ if (seg == null) {
+ if (smallBufLogged.compareAndSet(false, true)) {
+ log.warning("The performance statistics in-memory buffer size is too small. Some operations " +
+ "will not be logged.");
+ }
+
+ return;
+ }
+
+ // Ring buffer closed (writer stopping) or maximum size reached.
+ if (seg.buffer() == null) {
+ seg.release();
+
+ if (!fileWriter.isCancelled() && stopByMaxSize.compareAndSet(false, true))
+ log.warning("The performance statistics file maximum size is reached.");
+
+ return;
+ }
+
+ ByteBuffer buf = seg.buffer();
+
+ buf.put(op.id());
+
+ writer.accept(buf);
+
+ seg.release();
+
+ int bufCnt = writtenToBuf.get() / flushSize;
+
+ if (writtenToBuf.addAndGet(size) / flushSize > bufCnt) {
+ // Wake up worker to start writing data to the file.
+ synchronized (fileWriter) {
+ fileWriter.notify();
+ }
+ }
+ }
+
+ /** @return Performance statistics file. */
+ private static File resolveStatisticsFile(GridKernalContext ctx) throws IgniteCheckedException {
+ String igniteWorkDir = U.workDirectory(ctx.config().getWorkDirectory(), ctx.config().getIgniteHome());
+
+ File fileDir = U.resolveWorkDirectory(igniteWorkDir, PERF_STAT_DIR, false);
+
+ File file = new File(fileDir, "node-" + ctx.localNodeId() + ".prf");;
+
+ int idx = 0;
+
+ while (file.exists()) {
+ idx++;
+
+ file = new File(fileDir, "node-" + ctx.localNodeId() + '-' + idx + ".prf");
+ }
+
+ return file;
+ }
+
+ /** Writes {@link UUID} to buffer. */
+ private static void writeUuid(ByteBuffer buf, UUID uuid) {
+ buf.putLong(uuid.getMostSignificantBits());
+ buf.putLong(uuid.getLeastSignificantBits());
+ }
+
+ /** Writes {@link IgniteUuid} to buffer. */
+ static void writeIgniteUuid(ByteBuffer buf, IgniteUuid uuid) {
+ buf.putLong(uuid.globalId().getMostSignificantBits());
+ buf.putLong(uuid.globalId().getLeastSignificantBits());
+ buf.putLong(uuid.localId());
+ }
+
+ /**
+ * @param buf Buffer to write to.
+ * @param str String to write.
+ * @param cached {@code True} if string cached.
+ */
+ static void writeString(ByteBuffer buf, String str, boolean cached) {
+ buf.put(cached ? (byte)1 : 0);
+
+ if (cached)
+ buf.putInt(str.hashCode());
+ else {
+ byte[] bytes = str.getBytes();
+
+ buf.putInt(bytes.length);
+ buf.put(bytes);
+ }
+ }
+
+ /** @return {@code True} if string was cached and can be written as hashcode. */
+ private boolean cacheIfPossible(String str) {
+ if (knownStrsSz >= cachedStrsThreshold)
+ return false;
+
+ int hash = str.hashCode();
+
+ // We can cache slightly more strings then threshold value.
+ // Don't implement solution with synchronization here, because our primary goal is avoid any contention.
+ if (knownStrs.contains(hash) || !knownStrs.add(hash))
+ return true;
+
+ knownStrsSz = knownStrs.size();
+
+ return false;
+ }
+
+ /** Worker to write to performance statistics file. */
+ private class FileWriter extends GridWorker {
+ /**
+ * @param ctx Kernal context.
+ * @param log Logger.
+ */
+ FileWriter(GridKernalContext ctx, IgniteLogger log) {
+ super(ctx.igniteInstanceName(), WRITER_THREAD_NAME, log, ctx.workersRegistry());
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
+ try {
+ long writtenToFile = 0;
+
+ while (!isCancelled()) {
+ blockingSectionBegin();
+
+ try {
+ synchronized (this) {
+ if (writtenToFile / flushSize == writtenToBuf.get() / flushSize)
+ wait();
+ }
+ }
+ finally {
+ blockingSectionEnd();
+ }
+
+ writtenToFile += flush();
+ }
+
+ flush();
+ }
+ catch (InterruptedException e) {
+ try {
+ flush();
+ }
+ catch (IOException ignored) {
+ // No-op.
+ }
+ }
+ catch (ClosedByInterruptException ignored) {
+ // No-op.
+ }
+ catch (IOException e) {
+ log.error("Unable to write to the performance statistics file.", e);
+ }
+ }
+
+ /**
+ * Flushes to disk available bytes from the ring buffer.
+ *
+ * @return Count of written bytes.
+ */
+ private int flush() throws IOException {
+ List<SegmentedRingByteBuffer.ReadSegment> segs = ringByteBuf.poll();
+
+ if (segs == null)
+ return 0;
+
+ int written = 0;
+
+ for (SegmentedRingByteBuffer.ReadSegment seg : segs) {
+ updateHeartbeat();
+
+ try {
+ written += fileIo.writeFully(seg.buffer());
+ }
+ finally {
+ seg.release();
+ }
+ }
+
+ return written;
+ }
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/OperationType.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/OperationType.java
new file mode 100644
index 0000000..d287f45
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/OperationType.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.performancestatistics;
+
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Map;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Performance statistics operation type.
+ */
+public enum OperationType {
+ /** Cache get. */
+ CACHE_GET(0),
+
+ /** Cache put. */
+ CACHE_PUT(1),
+
+ /** Cache remove. */
+ CACHE_REMOVE(2),
+
+ /** Cache get and put. */
+ CACHE_GET_AND_PUT(3),
+
+ /** Cache get and remove. */
+ CACHE_GET_AND_REMOVE(4),
+
+ /** Cache invoke. */
+ CACHE_INVOKE(5),
+
+ /** Cache lock. */
+ CACHE_LOCK(6),
+
+ /** Cache get all. */
+ CACHE_GET_ALL(7),
+
+ /** Cache put all. */
+ CACHE_PUT_ALL(8),
+
+ /** Cache remove all. */
+ CACHE_REMOVE_ALL(9),
+
+ /** Cache invoke all. */
+ CACHE_INVOKE_ALL(10),
+
+ /** Transaction commit. */
+ TX_COMMIT(11),
+
+ /** Transaction rollback. */
+ TX_ROLLBACK(12),
+
+ /** Query. */
+ QUERY(13),
+
+ /** Query reads. */
+ QUERY_READS(14),
+
+ /** Task. */
+ TASK(15),
+
+ /** Job. */
+ JOB(16),
+
+ /** Cache start. */
+ CACHE_START(17);
+
+ /** Cache operations. */
+ public static final EnumSet<OperationType> CACHE_OPS = EnumSet.of(CACHE_GET, CACHE_PUT, CACHE_REMOVE,
+ CACHE_GET_AND_PUT, CACHE_GET_AND_REMOVE, CACHE_INVOKE, CACHE_LOCK, CACHE_GET_ALL, CACHE_PUT_ALL,
+ CACHE_REMOVE_ALL, CACHE_INVOKE_ALL);
+
+ /** Transaction operations. */
+ public static final EnumSet<OperationType> TX_OPS = EnumSet.of(TX_COMMIT, TX_ROLLBACK);
+
+ /** Value by identifier. */
+ private static final Map<Byte, OperationType> VALS;
+
+ /** Unique operation identifier. */
+ private final byte id;
+
+ /** Static initializer. */
+ static {
+ Map<Byte, OperationType> vals = new HashMap<>();
+
+ for (OperationType op : values()) {
+ OperationType old = vals.put(op.id(), op);
+
+ assert old == null : "Duplicate operation ID found [op=" + op + ']';
+ }
+
+ VALS = Collections.unmodifiableMap(vals);
+ }
+
+ /** @param id Unique operation identifier. */
+ OperationType(int id) {
+ this.id = (byte)id;
+ }
+
+ /** @return Unique operation identifier. */
+ public byte id() {
+ return id;
+ }
+
+ /** @return Operation type of given identifier. */
+ @Nullable public static OperationType of(byte id) {
+ return VALS.get(id);
+ }
+
+ /** @return {@code True} if cache operation. */
+ public static boolean cacheOperation(OperationType op) {
+ return CACHE_OPS.contains(op);
+ }
+
+ /** @return {@code True} if transaction operation. */
+ public static boolean transactionOperation(OperationType op) {
+ return TX_OPS.contains(op);
+ }
+
+ /**
+ * @param nameLen Cache name length.
+ * @param cached {@code True} if cache name cached.
+ * @return Cache start record size.
+ */
+ public static int cacheStartRecordSize(int nameLen, boolean cached) {
+ return 1 + 4 + (cached ? 4 : 4 + nameLen);
+ }
+
+ /** @return Cache record size. */
+ public static int cacheRecordSize() {
+ return 4 + 8 + 8;
+ }
+
+ /**
+ * @param cachesIdsCnt Cache identifiers size.
+ * @return Transaction record size.
+ */
+ public static int transactionRecordSize(int cachesIdsCnt) {
+ return 4 + cachesIdsCnt * 4 + 8 + 8;
+ }
+
+ /**
+ * @param textLen Query text length.
+ * @param cached {@code True} if query text cached.
+ * @return Query record size.
+ */
+ public static int queryRecordSize(int textLen, boolean cached) {
+ return 1 + (cached ? 4 : 4 + textLen) + 1 + 8 + 8 + 8 + 1;
+ }
+
+ /** @return Query reads record size. */
+ public static int queryReadsRecordSize() {
+ return 1 + 16 + 8 + 8 + 8;
+ }
+
+ /**
+ * @param nameLen Task name length.
+ * @param cached {@code True} if task name cached.
+ * @return Task record size.
+ */
+ public static int taskRecordSize(int nameLen, boolean cached) {
+ return 1 + (cached ? 4 : 4 + nameLen) + 24 + 8 + 8 + 4;
+ }
+
+ /** @return Job record size. */
+ public static int jobRecordSize() {
+ return 24 + 8 + 8 + 8 + 1;
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsHandler.java
new file mode 100644
index 0000000..3962db4
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsHandler.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.performancestatistics;
+
+import java.util.UUID;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
+import org.apache.ignite.internal.util.GridIntList;
+import org.apache.ignite.lang.IgniteUuid;
+
+/**
+ * The interface represents performance statistics operations collection for purposes of troubleshooting and
+ * performance analysis.
+ */
+public interface PerformanceStatisticsHandler {
+ /**
+ * @param nodeId Node id.
+ * @param cacheId Cache id.
+ * @param name Cache name.
+ */
+ void cacheStart(UUID nodeId, int cacheId, String name);
+
+ /**
+ * @param nodeId Node id.
+ * @param type Operation type.
+ * @param cacheId Cache id.
+ * @param startTime Start time in milliseconds.
+ * @param duration Duration in nanoseconds.
+ */
+ void cacheOperation(UUID nodeId, OperationType type, int cacheId, long startTime, long duration);
+
+ /**
+ * @param nodeId Node id.
+ * @param cacheIds Cache IDs.
+ * @param startTime Start time in milliseconds.
+ * @param duration Duration in nanoseconds.
+ * @param commited {@code True} if commited.
+ */
+ void transaction(UUID nodeId, GridIntList cacheIds, long startTime, long duration, boolean commited);
+
+ /**
+ * @param nodeId Node id.
+ * @param type Cache query type.
+ * @param text Query text in case of SQL query. Cache name in case of SCAN query.
+ * @param id Query id.
+ * @param startTime Start time in milliseconds.
+ * @param duration Duration in nanoseconds.
+ * @param success Success flag.
+ */
+ void query(UUID nodeId, GridCacheQueryType type, String text, long id, long startTime, long duration,
+ boolean success);
+
+ /**
+ * @param nodeId Node id.
+ * @param type Cache query type.
+ * @param queryNodeId Originating node id.
+ * @param id Query id.
+ * @param logicalReads Number of logical reads.
+ * @param physicalReads Number of physical reads.
+ */
+ void queryReads(UUID nodeId, GridCacheQueryType type, UUID queryNodeId, long id, long logicalReads,
+ long physicalReads);
+
+ /**
+ * @param nodeId Node id.
+ * @param sesId Session id.
+ * @param taskName Task name.
+ * @param startTime Start time in milliseconds.
+ * @param duration Duration.
+ * @param affPartId Affinity partition id.
+ */
+ void task(UUID nodeId, IgniteUuid sesId, String taskName, long startTime, long duration, int affPartId);
+
+ /**
+ * @param nodeId Node id.
+ * @param sesId Session id.
+ * @param queuedTime Time job spent on waiting queue.
+ * @param startTime Start time in milliseconds.
+ * @param duration Job execution time.
+ * @param timedOut {@code True} if job is timed out.
+ */
+ void job(UUID nodeId, IgniteUuid sesId, long queuedTime, long startTime, long duration, boolean timedOut);
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsMBeanImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsMBeanImpl.java
new file mode 100644
index 0000000..8b6a236
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsMBeanImpl.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.performancestatistics;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.mxbean.PerformanceStatisticsMBean;
+
+/**
+ * {@link PerformanceStatisticsMBean} implementation.
+ */
+public class PerformanceStatisticsMBeanImpl implements PerformanceStatisticsMBean {
+ /** Kernal context. */
+ private final GridKernalContext ctx;
+
+ /** @param ctx Kernal context. */
+ public PerformanceStatisticsMBeanImpl(GridKernalContext ctx) {
+ this.ctx = ctx;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void start() throws IgniteCheckedException {
+ ctx.performanceStatistics().startCollectStatistics();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop() throws IgniteCheckedException {
+ ctx.performanceStatistics().stopCollectStatistics();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean started() {
+ return ctx.performanceStatistics().enabled();
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsProcessor.java
new file mode 100644
index 0000000..f648114
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsProcessor.java
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.performancestatistics;
+
+import java.util.ArrayList;
+import java.util.EventListener;
+import java.util.UUID;
+import java.util.function.Consumer;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteFeatures;
+import org.apache.ignite.internal.NodeStoppingException;
+import org.apache.ignite.internal.processors.GridProcessorAdapter;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
+import org.apache.ignite.internal.processors.metastorage.DistributedMetaStorage;
+import org.apache.ignite.internal.processors.metastorage.DistributedMetastorageLifecycleListener;
+import org.apache.ignite.internal.processors.metastorage.ReadableDistributedMetaStorage;
+import org.apache.ignite.internal.util.GridIntList;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.IgniteFeatures.allNodesSupports;
+import static org.apache.ignite.internal.processors.metastorage.DistributedMetaStorage.IGNITE_INTERNAL_KEY_PREFIX;
+
+/**
+ * Performance statistics processor.
+ * <p>
+ * Manages collecting performance statistics.
+ *
+ * @see FilePerformanceStatisticsWriter
+ * @see FilePerformanceStatisticsReader
+ */
+public class PerformanceStatisticsProcessor extends GridProcessorAdapter {
+ /** Prefix for performance statistics enabled key. */
+ private static final String PERF_STAT_KEY = IGNITE_INTERNAL_KEY_PREFIX + "performanceStatistics.enabled";
+
+ /** Performance statistics writer. {@code Null} if collecting statistics disabled. */
+ @Nullable private volatile FilePerformanceStatisticsWriter writer;
+
+ /** Metastorage with the write access. */
+ @Nullable private volatile DistributedMetaStorage metastorage;
+
+ /** Synchronization mutex for start/stop collecting performance statistics operations. */
+ private final Object mux = new Object();
+
+ /** Performance statistics state listeners. */
+ private final ArrayList<PerformanceStatisticsStateListener> lsnrs = new ArrayList<>();
+
+ /** @param ctx Kernal context. */
+ public PerformanceStatisticsProcessor(GridKernalContext ctx) {
+ super(ctx);
+
+ ctx.internalSubscriptionProcessor().registerDistributedMetastorageListener(
+ new DistributedMetastorageLifecycleListener() {
+ @Override public void onReadyForRead(ReadableDistributedMetaStorage metastorage) {
+ metastorage.listen(PERF_STAT_KEY::equals, (key, oldVal, newVal) -> {
+ // Skip history on local join.
+ if (!ctx.discovery().localJoinFuture().isDone())
+ return;
+
+ onMetastorageUpdate((boolean)newVal);
+ });
+ }
+
+ @Override public void onReadyForWrite(DistributedMetaStorage metastorage) {
+ PerformanceStatisticsProcessor.this.metastorage = metastorage;
+
+ try {
+ Boolean performanceStatsEnabled = metastorage.read(PERF_STAT_KEY);
+
+ if (performanceStatsEnabled == null)
+ return;
+
+ onMetastorageUpdate(performanceStatsEnabled);
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+ });
+
+ registerStateListener(() -> {
+ if (U.isLocalNodeCoordinator(ctx.discovery()))
+ ctx.cache().cacheDescriptors().values().forEach(desc -> cacheStart(desc.cacheId(), desc.cacheName()));
+ });
+ }
+
+ /** Registers state listener. */
+ public void registerStateListener(PerformanceStatisticsStateListener lsnr) {
+ lsnrs.add(lsnr);
+ }
+
+ /**
+ * @param cacheId Cache id.
+ * @param name Cache name.
+ */
+ public void cacheStart(int cacheId, String name) {
+ write(writer -> writer.cacheStart(cacheId, name));
+ }
+
+ /**
+ * @param type Operation type.
+ * @param cacheId Cache id.
+ * @param startTime Start time in milliseconds.
+ * @param duration Duration in nanoseconds.
+ */
+ public void cacheOperation(OperationType type, int cacheId, long startTime, long duration) {
+ write(writer -> writer.cacheOperation(type, cacheId, startTime, duration));
+ }
+
+ /**
+ * @param cacheIds Cache IDs.
+ * @param startTime Start time in milliseconds.
+ * @param duration Duration in nanoseconds.
+ * @param commited {@code True} if commited.
+ */
+ public void transaction(GridIntList cacheIds, long startTime, long duration, boolean commited) {
+ write(writer -> writer.transaction(cacheIds, startTime, duration, commited));
+ }
+
+ /**
+ * @param type Cache query type.
+ * @param text Query text in case of SQL query. Cache name in case of SCAN query.
+ * @param id Query id.
+ * @param startTime Start time in milliseconds.
+ * @param duration Duration in nanoseconds.
+ * @param success Success flag.
+ */
+ public void query(GridCacheQueryType type, String text, long id, long startTime, long duration, boolean success) {
+ write(writer -> writer.query(type, text, id, startTime, duration, success));
+ }
+
+ /**
+ * @param type Cache query type.
+ * @param queryNodeId Originating node id.
+ * @param id Query id.
+ * @param logicalReads Number of logical reads.
+ * @param physicalReads Number of physical reads.
+ */
+ public void queryReads(GridCacheQueryType type, UUID queryNodeId, long id, long logicalReads, long physicalReads) {
+ write(writer -> writer.queryReads(type, queryNodeId, id, logicalReads, physicalReads));
+ }
+
+ /**
+ * @param sesId Session id.
+ * @param taskName Task name.
+ * @param startTime Start time in milliseconds.
+ * @param duration Duration.
+ * @param affPartId Affinity partition id.
+ */
+ public void task(IgniteUuid sesId, String taskName, long startTime, long duration, int affPartId) {
+ write(writer -> writer.task(sesId, taskName, startTime, duration, affPartId));
+ }
+
+ /**
+ * @param sesId Session id.
+ * @param queuedTime Time job spent on waiting queue.
+ * @param startTime Start time in milliseconds.
+ * @param duration Job execution time.
+ * @param timedOut {@code True} if job is timed out.
+ */
+ public void job(IgniteUuid sesId, long queuedTime, long startTime, long duration, boolean timedOut) {
+ write(writer -> writer.job(sesId, queuedTime, startTime, duration, timedOut));
+ }
+
+ /**
+ * Starts collecting performance statistics.
+ *
+ * @throws IgniteCheckedException If starting failed.
+ */
+ public void startCollectStatistics() throws IgniteCheckedException {
+ A.notNull(metastorage, "Metastorage not ready. Node not started?");
+
+ if (!allNodesSupports(ctx.discovery().allNodes(), IgniteFeatures.PERFORMANCE_STATISTICS))
+ throw new IllegalStateException("Not all nodes in the cluster support collecting performance statistics.");
+
+ if (ctx.isStopping())
+ throw new NodeStoppingException("Operation has been cancelled (node is stopping)");
+
+ metastorage.write(PERF_STAT_KEY, true);
+ }
+
+ /**
+ * Stops collecting performance statistics.
+ *
+ * @throws IgniteCheckedException If stopping failed.
+ */
+ public void stopCollectStatistics() throws IgniteCheckedException {
+ A.notNull(metastorage, "Metastorage not ready. Node not started?");
+
+ if (ctx.isStopping())
+ throw new NodeStoppingException("Operation has been cancelled (node is stopping)");
+
+ metastorage.write(PERF_STAT_KEY, false);
+ }
+
+ /** @return {@code True} if collecting performance statistics is enabled. */
+ public boolean enabled() {
+ return writer != null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onKernalStop(boolean cancel) {
+ if (enabled())
+ stopWriter();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onDisconnected(IgniteFuture<?> reconnectFut) {
+ if (enabled())
+ stopWriter();
+ }
+
+ /** Starts or stops collecting statistics on metastorage update. */
+ private void onMetastorageUpdate(boolean start) {
+ ctx.closure().runLocalSafe(() -> {
+ if (start)
+ startWriter();
+ else
+ stopWriter();
+ });
+ }
+
+ /** Starts performance statistics writer. */
+ private void startWriter() {
+ try {
+ synchronized (mux) {
+ if (writer != null)
+ return;
+
+ writer = new FilePerformanceStatisticsWriter(ctx);
+
+ writer.start();
+ }
+
+ lsnrs.forEach(PerformanceStatisticsStateListener::onStarted);
+
+ log.info("Performance statistics writer started.");
+ }
+ catch (Exception e) {
+ log.error("Failed to start performance statistics writer.", e);
+ }
+ }
+
+ /** Stops performance statistics writer. */
+ private void stopWriter() {
+ synchronized (mux) {
+ if (writer == null)
+ return;
+
+ FilePerformanceStatisticsWriter writer = this.writer;
+
+ this.writer = null;
+
+ writer.stop();
+ }
+
+ log.info("Performance statistics writer stopped.");
+ }
+
+ /** Writes statistics through passed writer. */
+ private void write(Consumer<FilePerformanceStatisticsWriter> c) {
+ FilePerformanceStatisticsWriter writer = this.writer;
+
+ if (writer != null)
+ c.accept(writer);
+ }
+
+ /** Performance statistics state listener. */
+ public interface PerformanceStatisticsStateListener extends EventListener {
+ /** This method is called whenever the performance statistics collecting is started. */
+ public void onStarted();
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java
index 974d850..e8906500 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java
@@ -44,6 +44,9 @@ public class GridRunningQueryInfo {
/** */
private final long startTime;
+ /** Query start time in nanoseconds to measure duration. */
+ private final long startTimeNanos;
+
/** */
private final GridQueryCancel cancel;
@@ -56,6 +59,9 @@ public class GridRunningQueryInfo {
/** Span of the running query. */
private final Span span;
+ /** Request ID. */
+ private long reqId;
+
/**
* Constructor.
*
@@ -65,6 +71,7 @@ public class GridRunningQueryInfo {
* @param qryType Query type.
* @param schemaName Schema name.
* @param startTime Query start time.
+ * @param startTimeNanos Query start time in nanoseconds.
* @param cancel Query cancel.
* @param loc Local query flag.
*/
@@ -75,6 +82,7 @@ public class GridRunningQueryInfo {
GridCacheQueryType qryType,
String schemaName,
long startTime,
+ long startTimeNanos,
GridQueryCancel cancel,
boolean loc
) {
@@ -84,6 +92,7 @@ public class GridRunningQueryInfo {
this.qryType = qryType;
this.schemaName = schemaName;
this.startTime = startTime;
+ this.startTimeNanos = startTimeNanos;
this.cancel = cancel;
this.loc = loc;
this.span = MTC.span();
@@ -132,6 +141,13 @@ public class GridRunningQueryInfo {
}
/**
+ * @return Query start time in nanoseconds.
+ */
+ public long startTimeNanos() {
+ return startTimeNanos;
+ }
+
+ /**
* @param curTime Current time.
* @param duration Duration of long query.
* @return {@code true} if this query should be considered as long running query.
@@ -182,4 +198,14 @@ public class GridRunningQueryInfo {
public Span span() {
return span;
}
+
+ /** @return Request ID. */
+ public long requestId() {
+ return reqId;
+ }
+
+ /** @param reqId Request ID. */
+ public void requestId(long reqId) {
+ this.reqId = reqId;
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/RunningQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/RunningQueryManager.java
index 8072378..36159f0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/RunningQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/RunningQueryManager.java
@@ -91,12 +91,20 @@ public class RunningQueryManager {
*/
private final AtomicLongMetric canceledQrsCnt;
+ /** Kernal context. */
+ private final GridKernalContext ctx;
+
+ /** Current running query info. */
+ private final ThreadLocal<GridRunningQueryInfo> currQryInfo = new ThreadLocal<>();
+
/**
* Constructor.
*
* @param ctx Context.
*/
public RunningQueryManager(GridKernalContext ctx) {
+ this.ctx = ctx;
+
localNodeId = ctx.localNodeId();
histSz = ctx.config().getSqlConfiguration().getSqlQueryHistorySize();
@@ -146,12 +154,16 @@ public class RunningQueryManager {
qryType,
schemaName,
System.currentTimeMillis(),
+ ctx.performanceStatistics().enabled() ? System.nanoTime() : 0,
cancel,
loc
);
GridRunningQueryInfo preRun = runs.putIfAbsent(qryId, run);
+ if (ctx.performanceStatistics().enabled())
+ currQryInfo.set(run);
+
assert preRun == null : "Running query already registered [prev_qry=" + preRun + ", newQry=" + run + ']';
return qryId;
@@ -199,12 +211,32 @@ public class RunningQueryManager {
canceledQrsCnt.increment();
}
}
+
+ if (ctx.performanceStatistics().enabled() && qry.startTimeNanos() > 0) {
+ ctx.performanceStatistics().query(
+ qry.queryType(),
+ qry.query(),
+ qry.requestId(),
+ qry.startTime(),
+ System.nanoTime() - qry.startTimeNanos(),
+ !failed);
+ }
}
finally {
qrySpan.end();
}
}
+ /** @param reqId Request ID of query to track. */
+ public void trackRequestId(long reqId) {
+ if (ctx.performanceStatistics().enabled()) {
+ GridRunningQueryInfo info = currQryInfo.get();
+
+ if (info != null)
+ info.requestId(reqId);
+ }
+ }
+
/**
* Return SQL queries which executing right now.
*
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
index 9a8cdbe..34032a0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
@@ -1363,6 +1363,15 @@ public class GridTaskProcessor extends GridProcessorAdapter implements IgniteCha
U.error(log, "Failed to unregister job communication message listeners and counters.", e);
}
}
+
+ if (ctx.performanceStatistics().enabled()) {
+ ctx.performanceStatistics().task(
+ ses.getId(),
+ ses.getTaskName(),
+ ses.getStartTime(),
+ U.currentTimeMillis() - ses.getStartTime(),
+ worker.affPartId());
+ }
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/mxbean/PerformanceStatisticsMBean.java b/modules/core/src/main/java/org/apache/ignite/mxbean/PerformanceStatisticsMBean.java
new file mode 100644
index 0000000..29d7c2d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/mxbean/PerformanceStatisticsMBean.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.mxbean;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.lang.IgniteExperimental;
+
+/**
+ * MBean that provides access to performance statistics management.
+ */
+@IgniteExperimental
+@MXBeanDescription("MBean provide access to performance statistics management.")
+public interface PerformanceStatisticsMBean {
+ /** Start collecting performance statistics in the cluster. */
+ @MXBeanDescription("Start collecting performance statistics in the cluster.")
+ public void start() throws IgniteCheckedException;
+
+ /** Stop collecting performance statistics in the cluster. */
+ @MXBeanDescription("Stop collecting performance statistics in the cluster.")
+ public void stop() throws IgniteCheckedException;
+
+ /** @return {@code True} if collecting performance statistics is started. */
+ @MXBeanDescription("True if collecting performance statistics is started.")
+ public boolean started();
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/performancestatistics/AbstractPerformanceStatisticsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/performancestatistics/AbstractPerformanceStatisticsTest.java
new file mode 100644
index 0000000..e72794c
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/performancestatistics/AbstractPerformanceStatisticsTest.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.performancestatistics;
+
+import java.io.File;
+import java.lang.management.ThreadInfo;
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
+import org.apache.ignite.internal.util.GridIntList;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.mxbean.PerformanceStatisticsMBean;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static java.util.Collections.singletonList;
+import static org.apache.ignite.internal.processors.performancestatistics.FilePerformanceStatisticsWriter.PERF_STAT_DIR;
+import static org.apache.ignite.internal.processors.performancestatistics.FilePerformanceStatisticsWriter.WRITER_THREAD_NAME;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/**
+ * Ignite performance statistics abstract test.
+ */
+public abstract class AbstractPerformanceStatisticsTest extends GridCommonAbstractTest {
+ /** */
+ public static final long TIMEOUT = 30_000;
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ cleanPerformanceStatisticsDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ super.afterTestsStopped();
+
+ stopAllGrids();
+
+ cleanPerformanceStatisticsDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ cleanPerformanceStatisticsDir();
+ }
+
+ /** Cleans performance statistics directory. */
+ protected static void cleanPerformanceStatisticsDir() throws Exception {
+ U.resolveWorkDirectory(U.defaultWorkDirectory(), PERF_STAT_DIR, true);
+ }
+
+ /** Starts collecting performance statistics. */
+ protected static void startCollectStatistics() throws Exception {
+ List<Ignite> grids = G.allGrids();
+
+ assertFalse(grids.isEmpty());
+
+ statisticsMBean(grids.get(0).name()).start();
+
+ waitForStatisticsEnabled(true);
+ }
+
+ /** Stops and reads collecting performance statistics. */
+ protected static void stopCollectStatisticsAndRead(TestHandler... handlers) throws Exception {
+ List<Ignite> grids = G.allGrids();
+
+ assertFalse(grids.isEmpty());
+
+ statisticsMBean(grids.get(0).name()).stop();
+
+ waitForStatisticsEnabled(false);
+
+ File dir = U.resolveWorkDirectory(U.defaultWorkDirectory(), PERF_STAT_DIR, false);
+
+ new FilePerformanceStatisticsReader(handlers).read(singletonList(dir));
+ }
+
+ /** Wait for statistics started/stopped in the cluster. */
+ public static void waitForStatisticsEnabled(boolean performanceStatsEnabled) throws Exception {
+ assertTrue(waitForCondition(() -> {
+ List<Ignite> grids = G.allGrids();
+
+ for (Ignite grid : grids)
+ if (performanceStatsEnabled != statisticsMBean(grid.name()).started())
+ return false;
+
+ // Make sure that writer flushed data and stopped.
+ if (!performanceStatsEnabled) {
+ for (long id : U.getThreadMx().getAllThreadIds()) {
+ ThreadInfo info = U.getThreadMx().getThreadInfo(id);
+
+ if (info != null && info.getThreadState() != Thread.State.TERMINATED &&
+ info.getThreadName().startsWith(WRITER_THREAD_NAME))
+ return false;
+ }
+ }
+
+ return true;
+ }, TIMEOUT));
+ }
+
+ /**
+ * @param igniteInstanceName Ignite instance name.
+ * @return Ignite performance statistics MBean.
+ */
+ protected static PerformanceStatisticsMBean statisticsMBean(String igniteInstanceName) {
+ return getMxBean(igniteInstanceName, "PerformanceStatistics", PerformanceStatisticsMBeanImpl.class,
+ PerformanceStatisticsMBean.class);
+ }
+
+ /** @return Performance statistics files. */
+ protected static List<File> statisticsFiles() throws Exception {
+ File perfStatDir = U.resolveWorkDirectory(U.defaultWorkDirectory(), PERF_STAT_DIR, false);
+
+ return FilePerformanceStatisticsReader.resolveFiles(singletonList(perfStatDir));
+ }
+
+ /** Test performance statistics handler. */
+ public static class TestHandler implements PerformanceStatisticsHandler {
+ /** {@inheritDoc} */
+ @Override public void cacheStart(UUID nodeId, int cacheId, String name) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void cacheOperation(UUID nodeId, OperationType type, int cacheId, long startTime,
+ long duration) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void transaction(UUID nodeId, GridIntList cacheIds, long startTime, long duration,
+ boolean commited) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void query(UUID nodeId, GridCacheQueryType type, String text, long id, long startTime,
+ long duration, boolean success) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void queryReads(UUID nodeId, GridCacheQueryType type, UUID queryNodeId, long id,
+ long logicalReads, long physicalReads) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void task(UUID nodeId, IgniteUuid sesId, String taskName, long startTime, long duration,
+ int affPartId) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void job(UUID nodeId, IgniteUuid sesId, long queuedTime, long startTime, long duration,
+ boolean timedOut) {
+ // No-op.
+ }
+ }
+
+ /** Client type to run load from. */
+ enum ClientType {
+ /** Server node. */
+ SERVER,
+
+ /** Client node. */
+ CLIENT,
+
+ /** Thin client. */
+ THIN_CLIENT;
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/performancestatistics/CacheStartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/performancestatistics/CacheStartTest.java
new file mode 100644
index 0000000..5230901
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/performancestatistics/CacheStartTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.performancestatistics;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * Tests cache start operation.
+ */
+@RunWith(Parameterized.class)
+public class CacheStartTest extends AbstractPerformanceStatisticsTest {
+ /** Static configured cache name. */
+ private static final String STATIC_CACHE_NAME = "static-cache";
+
+ /** Persistence enabled flag. */
+ @Parameterized.Parameter
+ public boolean persistence;
+
+ /** @return Test parameters. */
+ @Parameterized.Parameters(name = "persistence={0}")
+ public static Collection<?> parameters() {
+ return Arrays.asList(new Object[][] {{false}, {true}});
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setConsistentId(igniteInstanceName);
+
+ cfg.setCacheConfiguration(new CacheConfiguration<>(STATIC_CACHE_NAME));
+
+ if (persistence) {
+ cfg.setDataStorageConfiguration(
+ new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(
+ new DataRegionConfiguration()
+ .setPersistenceEnabled(true)
+ )
+ );
+ }
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ if (persistence)
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ if (persistence)
+ cleanPersistenceDir();
+
+ cleanPerformanceStatisticsDir();
+ }
+
+ /** @throws Exception If failed. */
+ @Test
+ public void testCacheStart() throws Exception {
+ IgniteEx srv = startGrids(2);
+
+ if (persistence)
+ srv.cluster().state(ClusterState.ACTIVE);
+
+ startClientGrid("client");
+
+ startCollectStatistics();
+
+ stopCollectStatisticsAndCheckCaches(srv);
+
+ cleanPerformanceStatisticsDir();
+
+ startCollectStatistics();
+
+ srv.getOrCreateCache(DEFAULT_CACHE_NAME);
+
+ stopCollectStatisticsAndCheckCaches(srv);
+
+ if (persistence) {
+ stopAllGrids();
+
+ srv = startGrids(2);
+
+ cleanPerformanceStatisticsDir();
+
+ startCollectStatistics();
+
+ stopCollectStatisticsAndCheckCaches(srv);
+ }
+ }
+
+ /** Stops and reads collecting performance statistics. Checks cache start operation. */
+ private void stopCollectStatisticsAndCheckCaches(IgniteEx ignite) throws Exception {
+ Map<Integer, String> expCaches = new HashMap<>();
+
+ ignite.context().cache().cacheDescriptors().values().forEach(
+ desc -> expCaches.put(desc.cacheId(), desc.cacheName()));
+
+ ClusterNode coord = U.oldest(ignite.cluster().nodes(), null);
+
+ Set<Integer> caches = new HashSet<>();
+
+ stopCollectStatisticsAndRead(new TestHandler() {
+ @Override public void cacheStart(UUID nodeId, int cacheId, String name) {
+ caches.add(cacheId);
+
+ assertEquals(coord.id(), nodeId);
+ assertTrue(expCaches.containsKey(cacheId));
+ assertEquals(expCaches.get(cacheId), name);
+ }
+ });
+
+ assertTrue(expCaches.keySet().equals(caches));
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/performancestatistics/ForwardReadTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/performancestatistics/ForwardReadTest.java
new file mode 100644
index 0000000..2719bc7
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/performancestatistics/ForwardReadTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.performancestatistics;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteUuid;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static com.google.common.collect.Collections2.permutations;
+import static com.google.common.collect.Lists.cartesianProduct;
+import static java.util.Collections.singletonList;
+import static java.util.UUID.randomUUID;
+import static org.apache.ignite.internal.processors.performancestatistics.FilePerformanceStatisticsWriter.PERF_STAT_DIR;
+import static org.apache.ignite.internal.processors.performancestatistics.FilePerformanceStatisticsWriter.writeIgniteUuid;
+import static org.apache.ignite.internal.processors.performancestatistics.FilePerformanceStatisticsWriter.writeString;
+
+/**
+ * Tests strings caching.
+ */
+@RunWith(Parameterized.class)
+public class ForwardReadTest extends AbstractPerformanceStatisticsTest {
+ /** Read buffer size. */
+ private static final int BUFFER_SIZE = 100;
+
+ /** {@code True} If test with strings that can't be found during forward read. */
+ @Parameterized.Parameter
+ public boolean unknownStrs;
+
+ /** @return Test parameters. */
+ @Parameterized.Parameters(name = "unknownStrs={0}")
+ public static Collection<?> parameters() {
+ return Arrays.asList(new Object[][] {{false}, {true}});
+ }
+
+ /** @throws Exception If failed. */
+ @Test
+ public void testStringForwardRead() throws Exception {
+ File dir = U.resolveWorkDirectory(U.defaultWorkDirectory(), PERF_STAT_DIR, false);
+
+ Map<String, Integer> expTasks = createStatistics(dir);
+
+ new FilePerformanceStatisticsReader(BUFFER_SIZE, new TestHandler() {
+ @Override public void task(UUID nodeId, IgniteUuid sesId, String taskName, long startTime, long duration,
+ int affPartId) {
+ assertNotNull(taskName);
+
+ assertTrue(expTasks.containsKey(taskName));
+
+ expTasks.computeIfPresent(taskName, (name, cnt) -> --cnt);
+ }
+ }).read(singletonList(dir));
+
+ assertTrue(expTasks.values().stream().allMatch(cnt -> cnt == 0));
+ }
+
+ /** Creates test performance statistics file. */
+ private Map<String, Integer> createStatistics(File dir) throws Exception {
+ Map<String, Integer> expTasks;
+
+ File file = new File(dir, "node-" + randomUUID() + ".prf");
+
+ try (FileIO fileIo = new RandomAccessFileIOFactory().create(file)) {
+ ByteBuffer buf = ByteBuffer.allocate(10 * 1024).order(ByteOrder.nativeOrder());
+
+ expTasks = writeData(buf);
+
+ buf.flip();
+
+ fileIo.write(buf);
+
+ fileIo.force();
+ }
+
+ return expTasks;
+ }
+
+ /** Generates task permutations and writes to buffer. */
+ private Map<String, Integer> writeData(ByteBuffer buf) {
+ Map<String, Integer> expTasks = new HashMap<>();
+
+ List<List<Object>> lists = cartesianProduct(F.asList("task1", "task2"), F.asList(false, true));
+
+ int setIdx = 0;
+
+ for (List<List<Object>> permute : permutations(lists)) {
+ for (List<Object> t2 : permute) {
+ String taskName = "dataSet-" + setIdx + "-" + t2.get(0);
+ Boolean cached = (Boolean)t2.get(1);
+
+ expTasks.compute(taskName, (name, cnt) -> cnt == null ? 1 : ++cnt);
+
+ writeTask(buf, taskName, cached);
+ }
+
+ if (unknownStrs) {
+ String unknownTask = "dataSet-" + setIdx + "-unknownTask";
+
+ expTasks.put(unknownTask, 0);
+
+ writeTask(buf, unknownTask, true);
+ }
+
+ setIdx++;
+ }
+
+ return expTasks;
+ }
+
+ /** Writes test task to buffer. */
+ private static void writeTask(ByteBuffer buf, String taskName, boolean cached) {
+ buf.put((byte)OperationType.TASK.ordinal());
+ writeString(buf, taskName, cached);
+ writeIgniteUuid(buf, IgniteUuid.randomUuid());
+ buf.putLong(0);
+ buf.putLong(0);
+ buf.putInt(0);
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsMultipleStartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsMultipleStartTest.java
new file mode 100644
index 0000000..e104b66
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsMultipleStartTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.performancestatistics;
+
+import java.io.File;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.junit.Test;
+
+/**
+ * Tests performance statistics multiple start.
+ */
+public class PerformanceStatisticsMultipleStartTest extends AbstractPerformanceStatisticsTest {
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setCacheConfiguration(defaultCacheConfiguration());
+
+ return cfg;
+ }
+
+ /** @throws Exception If failed. */
+ @Test
+ public void testStartCreateNewFile() throws Exception {
+ IgniteEx srv = startGrid(0);
+
+ int cnt = 5;
+
+ for (int i = 1; i <= cnt; i++) {
+ startCollectStatistics();
+
+ srv.cache(DEFAULT_CACHE_NAME).get(0);
+
+ AtomicInteger ops = new AtomicInteger();
+
+ stopCollectStatisticsAndRead(new TestHandler() {
+ @Override public void cacheOperation(UUID nodeId, OperationType type, int cacheId, long startTime,
+ long duration) {
+ ops.incrementAndGet();
+ }
+ });
+
+ assertEquals(i, ops.get());
+
+ List<File> files = statisticsFiles();
+
+ assertEquals(i, files.size());
+ }
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsPropertiesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsPropertiesTest.java
new file mode 100644
index 0000000..1b7a35b
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsPropertiesTest.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.performancestatistics;
+
+import java.io.File;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.junit.Test;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_PERF_STAT_CACHED_STRINGS_THRESHOLD;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_PERF_STAT_FILE_MAX_SIZE;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_PERF_STAT_FLUSH_SIZE;
+import static org.apache.ignite.internal.processors.performancestatistics.OperationType.cacheStartRecordSize;
+import static org.apache.ignite.internal.processors.performancestatistics.OperationType.jobRecordSize;
+import static org.apache.ignite.internal.processors.performancestatistics.OperationType.taskRecordSize;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/**
+ * Tests public performance statistics properties.
+ */
+public class PerformanceStatisticsPropertiesTest extends AbstractPerformanceStatisticsTest {
+ /** Test value of {@link IgniteSystemProperties#IGNITE_PERF_STAT_FILE_MAX_SIZE}. */
+ private static final long TEST_FILE_MAX_SIZE = 1024;
+
+ /** Test value of {@link IgniteSystemProperties#IGNITE_PERF_STAT_FLUSH_SIZE}. */
+ private static final int TEST_FLUSH_SIZE = 1024;
+
+ /** Test value of {@link IgniteSystemProperties#IGNITE_PERF_STAT_CACHED_STRINGS_THRESHOLD}. */
+ private static final int TEST_CACHED_STRINGS_THRESHOLD = 5;
+
+ /** Server. */
+ private static IgniteEx srv;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setCacheConfiguration(defaultCacheConfiguration());
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ srv = startGrid(0);
+ }
+
+ /** @throws Exception If failed. */
+ @Test
+ @WithSystemProperty(key = IGNITE_PERF_STAT_FILE_MAX_SIZE, value = "" + TEST_FILE_MAX_SIZE)
+ public void testFileMaxSize() throws Exception {
+ long initLen = srv.context().cache().cacheDescriptors().values().stream().mapToInt(
+ desc -> 1 + cacheStartRecordSize(desc.cacheName().getBytes().length, false)).sum();
+
+ long expOpsCnt = (TEST_FILE_MAX_SIZE - initLen) / (/*typeOp*/1 + OperationType.cacheRecordSize());
+
+ startCollectStatistics();
+
+ for (int i = 0; i < expOpsCnt * 2; i++)
+ srv.cache(DEFAULT_CACHE_NAME).get(i);
+
+ AtomicInteger opsCnt = new AtomicInteger();
+
+ stopCollectStatisticsAndRead(new TestHandler() {
+ @Override public void cacheOperation(UUID nodeId, OperationType type, int cacheId, long startTime,
+ long duration) {
+ opsCnt.incrementAndGet();
+ }
+ });
+
+ assertEquals(expOpsCnt, opsCnt.get());
+
+ long expLen = initLen + opsCnt.get() * (/*typeOp*/1 + OperationType.cacheRecordSize());
+
+ List<File> files = statisticsFiles();
+
+ assertEquals(1, files.size());
+
+ long statFileLen = files.get(0).length();
+
+ assertEquals(expLen, statFileLen);
+
+ assertTrue(statFileLen <= TEST_FILE_MAX_SIZE);
+ }
+
+ /** @throws Exception If failed. */
+ @Test
+ @WithSystemProperty(key = IGNITE_PERF_STAT_FLUSH_SIZE, value = "" + TEST_FLUSH_SIZE)
+ public void testFlushSize() throws Exception {
+ long initLen = srv.context().cache().cacheDescriptors().values().stream().mapToInt(
+ desc -> 1 + cacheStartRecordSize(desc.cacheName().getBytes().length, false)).sum();
+
+ long opsCnt = (TEST_FLUSH_SIZE - initLen) / (/*typeOp*/1 + OperationType.cacheRecordSize());
+
+ startCollectStatistics();
+
+ for (int i = 0; i < opsCnt; i++)
+ srv.cache(DEFAULT_CACHE_NAME).get(i);
+
+ List<File> files = statisticsFiles();
+
+ assertEquals(1, files.size());
+ assertEquals(0, files.get(0).length());
+
+ srv.cache(DEFAULT_CACHE_NAME).get(0);
+
+ assertTrue(waitForCondition(
+ () -> {
+ try {
+ List<File> statFiles = statisticsFiles();
+
+ assertEquals(1, statFiles.size());
+
+ return statFiles.get(0).length() > 0;
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ },
+ getTestTimeout()));
+
+ stopCollectStatisticsAndRead(new TestHandler());
+ }
+
+ /** @throws Exception If failed. */
+ @Test
+ @WithSystemProperty(key = IGNITE_PERF_STAT_CACHED_STRINGS_THRESHOLD, value = "" + TEST_CACHED_STRINGS_THRESHOLD)
+ public void testCachedStringsThreshold() throws Exception {
+ int tasksCnt = TEST_CACHED_STRINGS_THRESHOLD * 2;
+ int executions = 2;
+
+ startCollectStatistics();
+
+ int expLen = 0;
+
+ for (int i = 0; i < tasksCnt; i++) {
+ String taskName = "TestTask-" + i;
+
+ if (i < TEST_CACHED_STRINGS_THRESHOLD - 1 - srv.context().cache().cacheDescriptors().values().size()) {
+ expLen += taskRecordSize(taskName.getBytes().length, false) + jobRecordSize() +
+ (taskRecordSize(0, true) + jobRecordSize()) * (executions - 1);
+ }
+ else
+ expLen += (taskRecordSize(taskName.getBytes().length, false) + jobRecordSize()) * executions;
+
+ expLen += /*typeOp*/2 * executions;
+
+ for (int j = 0; j < executions; j++) {
+ srv.compute().withName(taskName).run(new IgniteRunnable() {
+ @Override public void run() {
+ // No-op.
+ }
+ });
+ }
+ }
+
+ AtomicInteger tasks = new AtomicInteger();
+
+ stopCollectStatisticsAndRead(new TestHandler() {
+ @Override public void task(UUID nodeId, IgniteUuid sesId, String taskName, long startTime, long duration,
+ int affPartId) {
+ tasks.incrementAndGet();
+ }
+ });
+
+ assertEquals(tasksCnt * executions, tasks.get());
+
+ // Started caches.
+ expLen += srv.context().cache().cacheDescriptors().values().stream().mapToInt(
+ desc -> 1 + cacheStartRecordSize(desc.cacheName().getBytes().length, false)).sum();
+
+ List<File> files = statisticsFiles();
+
+ assertEquals(1, files.size());
+ assertEquals(expLen, files.get(0).length());
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsSelfTest.java
new file mode 100644
index 0000000..396f5ee
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsSelfTest.java
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.performancestatistics;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.function.Consumer;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheEntryProcessor;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.util.GridIntList;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.transactions.Transaction;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.internal.processors.performancestatistics.AbstractPerformanceStatisticsTest.ClientType.CLIENT;
+import static org.apache.ignite.internal.processors.performancestatistics.AbstractPerformanceStatisticsTest.ClientType.SERVER;
+import static org.apache.ignite.internal.processors.performancestatistics.OperationType.CACHE_GET;
+import static org.apache.ignite.internal.processors.performancestatistics.OperationType.CACHE_GET_ALL;
+import static org.apache.ignite.internal.processors.performancestatistics.OperationType.CACHE_GET_AND_PUT;
+import static org.apache.ignite.internal.processors.performancestatistics.OperationType.CACHE_GET_AND_REMOVE;
+import static org.apache.ignite.internal.processors.performancestatistics.OperationType.CACHE_INVOKE;
+import static org.apache.ignite.internal.processors.performancestatistics.OperationType.CACHE_INVOKE_ALL;
+import static org.apache.ignite.internal.processors.performancestatistics.OperationType.CACHE_LOCK;
+import static org.apache.ignite.internal.processors.performancestatistics.OperationType.CACHE_PUT;
+import static org.apache.ignite.internal.processors.performancestatistics.OperationType.CACHE_PUT_ALL;
+import static org.apache.ignite.internal.processors.performancestatistics.OperationType.CACHE_REMOVE;
+import static org.apache.ignite.internal.processors.performancestatistics.OperationType.CACHE_REMOVE_ALL;
+
+/**
+ * Tests performance statistics.
+ */
+@RunWith(Parameterized.class)
+@SuppressWarnings({"LockAcquiredButNotSafelyReleased"})
+public class PerformanceStatisticsSelfTest extends AbstractPerformanceStatisticsTest {
+ /** Nodes count. */
+ private static final int NODES_CNT = 2;
+
+ /** Cache entry count. */
+ private static final int ENTRY_COUNT = 100;
+
+ /** Test entry processor. */
+ private static final EntryProcessor<Object, Object, Object> ENTRY_PROC =
+ new EntryProcessor<Object, Object, Object>() {
+ @Override public Object process(MutableEntry<Object, Object> entry, Object... arguments)
+ throws EntryProcessorException {
+ return null;
+ }
+ };
+
+ /** Test cache entry processor. */
+ private static final CacheEntryProcessor<Object, Object, Object> CACHE_ENTRY_PROC =
+ new CacheEntryProcessor<Object, Object, Object>() {
+ @Override public Object process(MutableEntry<Object, Object> entry, Object... arguments)
+ throws EntryProcessorException {
+ return null;
+ }
+ };
+
+ /** Client type to run operations from. */
+ @Parameterized.Parameter
+ public ClientType clientType;
+
+ /** @return Test parameters. */
+ @Parameterized.Parameters(name = "clientType={0}")
+ public static Collection<?> parameters() {
+ return Arrays.asList(new Object[][] {{SERVER}, {CLIENT}});
+ }
+
+ /** Ignite. */
+ private static IgniteEx srv;
+
+ /** Ignite node to run load from. */
+ private static IgniteEx node;
+
+ /** Test cache. */
+ private static IgniteCache<Object, Object> cache;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setCacheConfiguration(defaultCacheConfiguration());
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ srv = startGrid(NODES_CNT - 1);
+
+ IgniteEx client = startClientGrid(NODES_CNT);
+
+ node = clientType == SERVER ? srv : client;
+
+ cache = node.cache(DEFAULT_CACHE_NAME);
+
+ for (int i = 0; i < ENTRY_COUNT; i++)
+ cache.put(i, i);
+ }
+
+ /** @throws Exception If failed. */
+ @Test
+ public void testCompute() throws Exception {
+ String testTaskName = "testTask";
+ int executions = 5;
+ long startTime = U.currentTimeMillis();
+
+ startCollectStatistics();
+
+ IgniteRunnable task = new IgniteRunnable() {
+ @Override public void run() {
+ // No-op.
+ }
+ };
+
+ for (int i = 0; i < executions; i++)
+ node.compute().withName(testTaskName).run(task);
+
+ HashMap<IgniteUuid, Integer> sessions = new HashMap<>();
+ AtomicInteger tasks = new AtomicInteger();
+ AtomicInteger jobs = new AtomicInteger();
+
+ stopCollectStatisticsAndRead(new TestHandler() {
+ @Override public void task(UUID nodeId, IgniteUuid sesId, String taskName, long taskStartTime,
+ long duration, int affPartId) {
+ sessions.compute(sesId, (uuid, cnt) -> cnt == null ? 1 : ++cnt);
+
+ tasks.incrementAndGet();
+
+ assertEquals(node.context().localNodeId(), nodeId);
+ assertEquals(testTaskName, taskName);
+ assertTrue(taskStartTime >= startTime);
+ assertTrue(duration >= 0);
+ assertEquals(-1, affPartId);
+ }
+
+ @Override public void job(UUID nodeId, IgniteUuid sesId, long queuedTime, long jobStartTime, long duration,
+ boolean timedOut) {
+ sessions.compute(sesId, (uuid, cnt) -> cnt == null ? 1 : ++cnt);
+
+ jobs.incrementAndGet();
+
+ assertEquals(srv.context().localNodeId(), nodeId);
+ assertTrue(queuedTime >= 0);
+ assertTrue(jobStartTime >= startTime);
+ assertTrue(duration >= 0);
+ assertFalse(timedOut);
+ }
+ });
+
+ assertEquals(executions, tasks.get());
+ assertEquals(executions, jobs.get());
+
+ Collection<Integer> vals = sessions.values();
+
+ assertEquals(executions, vals.size());
+ assertTrue("Invalid sessions: " + sessions, vals.stream().allMatch(cnt -> cnt == NODES_CNT));
+ }
+
+ /** @throws Exception If failed. */
+ @Test
+ public void testCacheOperation() throws Exception {
+ checkCacheOperation(CACHE_PUT, cache -> cache.put(1, 1));
+ checkCacheOperation(CACHE_PUT, cache -> cache.putAsync(2, 2).get());
+
+ checkCacheOperation(CACHE_PUT_ALL, cache -> cache.putAll(Collections.singletonMap(3, 3)));
+ checkCacheOperation(CACHE_PUT_ALL, cache -> cache.putAllAsync(Collections.singletonMap(4, 4)).get());
+
+ checkCacheOperation(CACHE_GET, cache -> cache.get(1));
+ checkCacheOperation(CACHE_GET, cache -> cache.getAsync(2).get());
+
+ checkCacheOperation(CACHE_GET_AND_PUT, cache -> cache.getAndPut(1, 1));
+ checkCacheOperation(CACHE_GET_AND_PUT, cache -> cache.getAndPutAsync(2, 2).get());
+
+ checkCacheOperation(CACHE_GET_ALL, cache -> cache.getAll(Collections.singleton(1)));
+ checkCacheOperation(CACHE_GET_ALL, cache -> cache.getAllAsync(Collections.singleton(2)).get());
+
+ checkCacheOperation(CACHE_REMOVE, cache -> cache.remove(1));
+ checkCacheOperation(CACHE_REMOVE, cache -> cache.removeAsync(2).get());
+
+ checkCacheOperation(CACHE_REMOVE_ALL, cache -> cache.removeAll(Collections.singleton(3)));
+ checkCacheOperation(CACHE_REMOVE_ALL, cache -> cache.removeAllAsync(Collections.singleton(4)).get());
+
+ checkCacheOperation(CACHE_GET_AND_REMOVE, cache -> cache.getAndRemove(5));
+ checkCacheOperation(CACHE_GET_AND_REMOVE, cache -> cache.getAndRemoveAsync(6).get());
+
+ checkCacheOperation(CACHE_LOCK, cache -> {
+ Lock lock = cache.lock(7);
+
+ lock.lock();
+ lock.unlock();
+ });
+
+ checkCacheOperation(CACHE_LOCK, cache -> {
+ Lock lock = cache.lockAll(Collections.singleton(8));
+
+ lock.lock();
+ lock.unlock();
+ });
+
+ checkCacheOperation(CACHE_INVOKE, cache -> cache.invoke(10, ENTRY_PROC));
+ checkCacheOperation(CACHE_INVOKE, cache -> cache.invokeAsync(10, ENTRY_PROC).get());
+
+ checkCacheOperation(CACHE_INVOKE, cache -> cache.invoke(10, CACHE_ENTRY_PROC));
+ checkCacheOperation(CACHE_INVOKE, cache -> cache.invokeAsync(10, CACHE_ENTRY_PROC).get());
+
+ checkCacheOperation(CACHE_INVOKE_ALL, cache -> cache.invokeAll(Collections.singleton(10), ENTRY_PROC));
+ checkCacheOperation(CACHE_INVOKE_ALL,
+ cache -> cache.invokeAllAsync(Collections.singleton(10), ENTRY_PROC).get());
+
+ checkCacheOperation(CACHE_INVOKE_ALL, cache -> cache.invokeAll(Collections.singleton(10), CACHE_ENTRY_PROC));
+ checkCacheOperation(CACHE_INVOKE_ALL,
+ cache -> cache.invokeAllAsync(Collections.singleton(10), CACHE_ENTRY_PROC).get());
+ }
+
+ /** Checks cache operation. */
+ private void checkCacheOperation(OperationType op, Consumer<IgniteCache<Object, Object>> clo) throws Exception {
+ long startTime = U.currentTimeMillis();
+
+ cleanPerformanceStatisticsDir();
+
+ startCollectStatistics();
+
+ clo.accept(cache);
+
+ AtomicInteger ops = new AtomicInteger();
+
+ stopCollectStatisticsAndRead(new TestHandler() {
+ @Override public void cacheOperation(UUID nodeId, OperationType type, int cacheId, long opStartTime,
+ long duration) {
+ ops.incrementAndGet();
+
+ assertEquals(node.context().localNodeId(), nodeId);
+ assertEquals(op, type);
+ assertEquals(CU.cacheId(DEFAULT_CACHE_NAME), cacheId);
+ assertTrue(opStartTime >= startTime);
+ assertTrue(duration >= 0);
+ }
+ });
+
+ assertEquals(1, ops.get());
+ }
+
+ /** @throws Exception If failed. */
+ @Test
+ public void testTransaction() throws Exception {
+ checkTx(true);
+
+ checkTx(false);
+ }
+
+ /** @param commited {@code True} if check transaction commited. */
+ private void checkTx(boolean commited) throws Exception {
+ long startTime = U.currentTimeMillis();
+
+ cleanPerformanceStatisticsDir();
+
+ startCollectStatistics();
+
+ try (Transaction tx = node.transactions().txStart()) {
+ for (int i = 0; i < 10; i++)
+ cache.put(i, i * 2);
+
+ if (commited)
+ tx.commit();
+ else
+ tx.rollback();
+ }
+
+ AtomicInteger txs = new AtomicInteger();
+
+ stopCollectStatisticsAndRead(new TestHandler() {
+ @Override public void transaction(UUID nodeId, GridIntList cacheIds, long txStartTime, long duration,
+ boolean txCommited) {
+ txs.incrementAndGet();
+
+ assertEquals(node.context().localNodeId(), nodeId);
+ assertEquals(1, cacheIds.size());
+ assertEquals(CU.cacheId(DEFAULT_CACHE_NAME), cacheIds.get(0));
+ assertTrue(txStartTime >= startTime);
+ assertTrue(duration >= 0);
+ assertEquals(commited, txCommited);
+ }
+ });
+
+ assertEquals(1, txs.get());
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsThinClientTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsThinClientTest.java
new file mode 100644
index 0000000..978e237
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsThinClientTest.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.performancestatistics;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.client.ClientCache;
+import org.apache.ignite.client.ClientTransaction;
+import org.apache.ignite.client.Config;
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.configuration.ClientConnectorConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.ThinClientConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.client.thin.TestTask;
+import org.apache.ignite.internal.util.GridIntList;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteUuid;
+import org.junit.Test;
+
+import static org.apache.ignite.internal.processors.performancestatistics.OperationType.CACHE_GET;
+import static org.apache.ignite.internal.processors.performancestatistics.OperationType.CACHE_GET_ALL;
+import static org.apache.ignite.internal.processors.performancestatistics.OperationType.CACHE_GET_AND_PUT;
+import static org.apache.ignite.internal.processors.performancestatistics.OperationType.CACHE_GET_AND_REMOVE;
+import static org.apache.ignite.internal.processors.performancestatistics.OperationType.CACHE_PUT;
+import static org.apache.ignite.internal.processors.performancestatistics.OperationType.CACHE_PUT_ALL;
+import static org.apache.ignite.internal.processors.performancestatistics.OperationType.CACHE_REMOVE;
+import static org.apache.ignite.internal.processors.performancestatistics.OperationType.CACHE_REMOVE_ALL;
+
+/**
+ * Tests thin client performance statistics.
+ */
+public class PerformanceStatisticsThinClientTest extends AbstractPerformanceStatisticsTest {
+ /** Test task name. */
+ public static final String TEST_TASK_NAME = "TestTask";
+
+ /** Active tasks limit. */
+ private static final int ACTIVE_TASKS_LIMIT = 50;
+
+ /** Grids count. */
+ private static final int GRIDS_CNT = 2;
+
+ /** Thin client. */
+ private static IgniteClient thinClient;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setCacheConfiguration(defaultCacheConfiguration());
+
+ cfg.setClientConnectorConfiguration(
+ new ClientConnectorConfiguration().setThinClientConfiguration(
+ new ThinClientConfiguration().setMaxActiveComputeTasksPerConnection(ACTIVE_TASKS_LIMIT)));
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ IgniteEx ignite = startGrids(GRIDS_CNT);
+
+ ignite.compute().localDeployTask(TestTask.class, TestTask.class.getClassLoader());
+
+ thinClient = Ignition.startClient(new ClientConfiguration().setAddresses(Config.SERVER));
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ super.afterTestsStopped();
+
+ thinClient.close();
+ }
+
+ /** @throws Exception If failed. */
+ @Test
+ public void testCompute() throws Exception {
+ int executions = 5;
+ long startTime = U.currentTimeMillis();
+
+ startCollectStatistics();
+
+ for (int i = 0; i < executions; i++)
+ thinClient.compute().execute(TEST_TASK_NAME, null);
+
+ HashMap<IgniteUuid, Integer> sessions = new HashMap<>();
+ AtomicInteger tasks = new AtomicInteger();
+ AtomicInteger jobs = new AtomicInteger();
+
+ stopCollectStatisticsAndRead(new TestHandler() {
+ @Override public void task(UUID nodeId, IgniteUuid sesId, String taskName, long taskStartTime,
+ long duration, int affPartId) {
+ sessions.compute(sesId, (uuid, cnt) -> cnt == null ? 1 : ++cnt);
+
+ tasks.incrementAndGet();
+
+ assertTrue(F.nodeIds(grid(0).cluster().forServers().nodes()).contains(nodeId));
+ assertEquals(TEST_TASK_NAME, taskName);
+ assertTrue(taskStartTime >= startTime);
+ assertTrue(duration >= 0);
+ assertEquals(-1, affPartId);
+ }
+
+ @Override public void job(UUID nodeId, IgniteUuid sesId, long queuedTime, long jobStartTime, long duration,
+ boolean timedOut) {
+ sessions.compute(sesId, (uuid, cnt) -> cnt == null ? 1 : ++cnt);
+
+ jobs.incrementAndGet();
+
+ assertTrue(F.nodeIds(grid(0).cluster().forServers().nodes()).contains(nodeId));
+ assertTrue(queuedTime >= 0);
+ assertTrue(jobStartTime >= startTime);
+ assertTrue(duration >= 0);
+ assertFalse(timedOut);
+ }
+ });
+
+ assertEquals(executions, tasks.get());
+ assertEquals(executions * GRIDS_CNT, jobs.get());
+
+ Collection<Integer> vals = sessions.values();
+
+ assertEquals(executions, vals.size());
+ assertTrue("Invalid sessions: " + sessions, vals.stream().allMatch(val -> val == GRIDS_CNT + 1));
+ }
+
+ /** @throws Exception If failed. */
+ @Test
+ public void testCacheOperation() throws Exception {
+ checkCacheOperation(CACHE_PUT, cache -> cache.put(1, 1));
+
+ checkCacheOperation(CACHE_PUT_ALL, cache -> cache.putAll(Collections.singletonMap(3, 3)));
+
+ checkCacheOperation(CACHE_GET, cache -> cache.get(1));
+
+ checkCacheOperation(CACHE_GET_AND_PUT, cache -> cache.getAndPut(1, 1));
+
+ checkCacheOperation(CACHE_GET_ALL, cache -> cache.getAll(Collections.singleton(1)));
+
+ checkCacheOperation(CACHE_REMOVE, cache -> cache.remove(1));
+
+ checkCacheOperation(CACHE_REMOVE_ALL, cache -> cache.removeAll(Collections.singleton(3)));
+
+ checkCacheOperation(CACHE_GET_AND_REMOVE, cache -> cache.getAndRemove(5));
+ }
+
+ /** Checks cache operation. */
+ private void checkCacheOperation(OperationType op, Consumer<ClientCache<Object, Object>> clo) throws Exception {
+ long startTime = U.currentTimeMillis();
+
+ cleanPerformanceStatisticsDir();
+
+ startCollectStatistics();
+
+ clo.accept(thinClient.cache(DEFAULT_CACHE_NAME));
+
+ AtomicInteger ops = new AtomicInteger();
+
+ stopCollectStatisticsAndRead(new TestHandler() {
+ @Override public void cacheOperation(UUID nodeId, OperationType type, int cacheId, long opStartTime,
+ long duration) {
+ ops.incrementAndGet();
+
+ assertTrue(F.nodeIds(grid(0).cluster().forServers().nodes()).contains(nodeId));
+ assertEquals(op, type);
+ assertEquals(CU.cacheId(DEFAULT_CACHE_NAME), cacheId);
+ assertTrue(opStartTime >= startTime);
+ assertTrue(duration >= 0);
+ }
+ });
+
+ assertEquals(1, ops.get());
+ }
+
+ /** @throws Exception If failed. */
+ @Test
+ public void testTransaction() throws Exception {
+ checkTx(true);
+
+ checkTx(false);
+ }
+
+ /** @param commited {@code True} if check transaction commited. */
+ private void checkTx(boolean commited) throws Exception {
+ long startTime = U.currentTimeMillis();
+
+ cleanPerformanceStatisticsDir();
+
+ startCollectStatistics();
+
+ try (ClientTransaction tx = thinClient.transactions().txStart()) {
+ for (int i = 0; i < 10; i++)
+ thinClient.cache(DEFAULT_CACHE_NAME).put(i, i * 2);
+
+ if (commited)
+ tx.commit();
+ else
+ tx.rollback();
+ }
+
+ AtomicInteger txs = new AtomicInteger();
+
+ stopCollectStatisticsAndRead(new TestHandler() {
+ @Override public void transaction(UUID nodeId, GridIntList cacheIds, long txStartTime, long duration,
+ boolean txCommited) {
+ txs.incrementAndGet();
+
+ assertTrue(F.nodeIds(grid(0).cluster().forServers().nodes()).contains(nodeId));
+ assertEquals(1, cacheIds.size());
+ assertEquals(CU.cacheId(DEFAULT_CACHE_NAME), cacheIds.get(0));
+ assertTrue(txStartTime >= startTime);
+ assertTrue(duration >= 0);
+ assertEquals(commited, txCommited);
+ }
+ });
+
+ assertEquals(1, txs.get());
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/performancestatistics/StringCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/performancestatistics/StringCacheTest.java
new file mode 100644
index 0000000..d423bf2
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/performancestatistics/StringCacheTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.performancestatistics;
+
+import java.io.File;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.ignite.lang.IgniteUuid;
+import org.junit.Test;
+
+import static org.apache.ignite.internal.processors.performancestatistics.OperationType.cacheStartRecordSize;
+import static org.apache.ignite.internal.processors.performancestatistics.OperationType.jobRecordSize;
+import static org.apache.ignite.internal.processors.performancestatistics.OperationType.taskRecordSize;
+
+/**
+ * Tests strings caching.
+ */
+public class StringCacheTest extends AbstractPerformanceStatisticsTest {
+ /** @throws Exception If failed. */
+ @Test
+ public void testCacheTaskName() throws Exception {
+ IgniteEx ignite = startGrid(0);
+
+ String testTaskName = "TestTask";
+ int executions = 5;
+
+ startCollectStatistics();
+
+ for (int i = 0; i < executions; i++) {
+ ignite.compute().withName(testTaskName).run(new IgniteRunnable() {
+ @Override public void run() {
+ // No-op.
+ }
+ });
+ }
+
+ AtomicInteger tasks = new AtomicInteger();
+
+ stopCollectStatisticsAndRead(new TestHandler() {
+ @Override public void task(UUID nodeId, IgniteUuid sesId, String taskName, long startTime, long duration,
+ int affPartId) {
+ if (testTaskName.equals(taskName))
+ tasks.incrementAndGet();
+ }
+ });
+
+ assertEquals(executions, tasks.get());
+
+ long expLen = taskRecordSize(testTaskName.getBytes().length, false) +
+ taskRecordSize(0, true) * (executions - 1) +
+ jobRecordSize() * executions +
+ /*opType*/ 2 * executions;
+
+ // Started caches.
+ expLen += ignite.context().cache().cacheDescriptors().values().stream().mapToInt(
+ desc -> 1 + cacheStartRecordSize(desc.cacheName().getBytes().length, false)).sum();
+
+ List<File> files = statisticsFiles();
+
+ assertEquals(1, files.size());
+ assertEquals(expLen, files.get(0).length());
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/performancestatistics/TopologyChangesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/performancestatistics/TopologyChangesTest.java
new file mode 100644
index 0000000..856b3f2
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/performancestatistics/TopologyChangesTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.performancestatistics;
+
+import java.util.Arrays;
+import java.util.Collection;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * Tests topology changes during collecting performance statistics.
+ */
+@RunWith(Parameterized.class)
+public class TopologyChangesTest extends AbstractPerformanceStatisticsTest {
+ /** Persistence enabled flag. */
+ @Parameterized.Parameter
+ public boolean persistence;
+
+ /** @return Test parameters. */
+ @Parameterized.Parameters(name = "persistence={0}")
+ public static Collection<?> parameters() {
+ return Arrays.asList(new Object[][] {{false}, {true}});
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setDataStorageConfiguration(
+ new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(
+ new DataRegionConfiguration().setPersistenceEnabled(persistence)
+ )
+ );
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ stopAllGrids();
+
+ if (persistence)
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+
+ if (persistence)
+ cleanPersistenceDir();
+ }
+
+ /** @throws Exception If failed. */
+ @Test
+ public void testNodeJoin() throws Exception {
+ startGrid(0);
+
+ startCollectStatistics();
+
+ startGrid(1);
+
+ waitForStatisticsEnabled(true);
+ }
+
+ /** @throws Exception If failed. */
+ @Test
+ public void testClusterRestartWithPersistence() throws Exception {
+ startGrids(2);
+
+ if (persistence)
+ grid(0).cluster().state(ClusterState.ACTIVE);
+
+ startCollectStatistics();
+
+ stopAllGrids(false);
+
+ startGrids(2);
+
+ if (persistence)
+ grid(0).cluster().state(ClusterState.ACTIVE);
+
+ waitForStatisticsEnabled(persistence);
+ }
+
+ /** @throws Exception If failed. */
+ @Test
+ public void testClientReconnected() throws Exception {
+ IgniteEx grid = startGrid(0);
+
+ if (persistence)
+ grid.cluster().state(ClusterState.ACTIVE);
+
+ startCollectStatistics();
+
+ startClientGrid(1);
+
+ waitForStatisticsEnabled(true);
+
+ stopGrid(0);
+
+ grid = startGrid(0);
+
+ if (persistence)
+ grid.cluster().state(ClusterState.ACTIVE);
+
+ waitForTopology(2);
+
+ waitForStatisticsEnabled(persistence);
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicWithPersistenceTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicWithPersistenceTestSuite.java
index 6addefe..2f1ba8f 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicWithPersistenceTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicWithPersistenceTestSuite.java
@@ -41,6 +41,13 @@ import org.apache.ignite.internal.processors.cache.persistence.SingleNodePersist
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteClusterSnapshotSelfTest;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotMXBeanTest;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManagerSelfTest;
+import org.apache.ignite.internal.processors.performancestatistics.ForwardReadTest;
+import org.apache.ignite.internal.processors.performancestatistics.PerformanceStatisticsMultipleStartTest;
+import org.apache.ignite.internal.processors.performancestatistics.PerformanceStatisticsPropertiesTest;
+import org.apache.ignite.internal.processors.performancestatistics.PerformanceStatisticsSelfTest;
+import org.apache.ignite.internal.processors.performancestatistics.PerformanceStatisticsThinClientTest;
+import org.apache.ignite.internal.processors.performancestatistics.StringCacheTest;
+import org.apache.ignite.internal.processors.performancestatistics.TopologyChangesTest;
import org.apache.ignite.marshaller.GridMarshallerMappingConsistencyTest;
import org.apache.ignite.util.GridInternalTaskUnusedWalSegmentsTest;
import org.junit.runner.RunWith;
@@ -85,7 +92,16 @@ import org.junit.runners.Suite;
IgniteClusterSnapshotSelfTest.class,
IgniteSnapshotMXBeanTest.class,
- IgniteClusterIdTagTest.class
+ IgniteClusterIdTagTest.class,
+
+ PerformanceStatisticsSelfTest.class,
+ PerformanceStatisticsThinClientTest.class,
+ TopologyChangesTest.class,
+ IgniteClusterIdTagTest.class,
+ StringCacheTest.class,
+ PerformanceStatisticsPropertiesTest.class,
+ PerformanceStatisticsMultipleStartTest.class,
+ ForwardReadTest.class
})
public class IgniteBasicWithPersistenceTestSuite {
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index c5dcc68..aa79094 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -44,10 +44,13 @@ import org.apache.ignite.events.CacheQueryExecutedEvent;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.GridTopic;
+import org.apache.ignite.internal.metric.IoStatisticsHolder;
+import org.apache.ignite.internal.metric.IoStatisticsQueryHelper;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
import org.apache.ignite.internal.processors.query.GridQueryCancel;
import org.apache.ignite.internal.processors.query.h2.H2PooledConnection;
@@ -320,6 +323,11 @@ public class GridMapQueryExecutor {
@Nullable final MvccSnapshot mvccSnapshot,
Boolean dataPageScanEnabled
) {
+ boolean performanceStatsEnabled = ctx.performanceStatistics().enabled();
+
+ if (performanceStatsEnabled)
+ IoStatisticsQueryHelper.startGatheringQueryStatistics();
+
// Prepare to run queries.
GridCacheContext<?, ?> mainCctx = mainCacheContext(cacheIds);
@@ -555,6 +563,19 @@ public class GridMapQueryExecutor {
if (trace != null)
trace.close();
+
+ if (performanceStatsEnabled) {
+ IoStatisticsHolder stat = IoStatisticsQueryHelper.finishGatheringQueryStatistics();
+
+ if (stat.logicalReads() > 0 || stat.physicalReads() > 0) {
+ ctx.performanceStatistics().queryReads(
+ GridCacheQueryType.SQL_FIELDS,
+ node.id(),
+ reqId,
+ stat.logicalReads(),
+ stat.physicalReads());
+ }
+ }
}
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index cd4910d..ea02bc3 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -419,6 +419,8 @@ public class GridReduceQueryExecutor {
final long qryReqId = qryIdGen.incrementAndGet();
+ h2.runningQueryManager().trackRequestId(qryReqId);
+
boolean retry = false;
boolean release = true;
@@ -877,6 +879,8 @@ public class GridReduceQueryExecutor {
final long reqId = qryIdGen.incrementAndGet();
+ h2.runningQueryManager().trackRequestId(reqId);
+
final DmlDistributedUpdateRun r = new DmlDistributedUpdateRun(nodes.size());
int flags = enforceJoinOrder ? GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER : 0;
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsQueryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsQueryTest.java
new file mode 100644
index 0000000..2cd63db
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsQueryTest.java
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.performancestatistics;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.query.FieldsQueryCursor;
+import org.apache.ignite.cache.query.Query;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.client.Config;
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.cluster.ClusterState.INACTIVE;
+import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SCAN;
+import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL_FIELDS;
+import static org.apache.ignite.internal.processors.performancestatistics.AbstractPerformanceStatisticsTest.ClientType.CLIENT;
+import static org.apache.ignite.internal.processors.performancestatistics.AbstractPerformanceStatisticsTest.ClientType.SERVER;
+import static org.apache.ignite.internal.processors.performancestatistics.AbstractPerformanceStatisticsTest.ClientType.THIN_CLIENT;
+import static org.apache.ignite.internal.processors.query.QueryUtils.DFLT_SCHEMA;
+import static org.junit.Assume.assumeFalse;
+
+/** Tests query performance statistics. */
+@RunWith(Parameterized.class)
+public class PerformanceStatisticsQueryTest extends AbstractPerformanceStatisticsTest {
+ /** Cache entry count. */
+ private static final int ENTRY_COUNT = 100;
+
+ /** Test cache 2 name. */
+ private static final String CACHE_2 = "cache2";
+
+ /** Test SQL table name. */
+ private static final String SQL_TABLE = "test";
+
+ /** Page size. */
+ @Parameterized.Parameter
+ public int pageSize;
+
+ /** Client type to run queries from. */
+ @Parameterized.Parameter(1)
+ public ClientType clientType;
+
+ /** @return Test parameters. */
+ @Parameterized.Parameters(name = "pageSize={0}, clientType={1}")
+ public static Collection<?> parameters() {
+ List<Object[]> res = new ArrayList<>();
+
+ for (Integer pageSize : new Integer[] {ENTRY_COUNT, ENTRY_COUNT / 10}) {
+ for (ClientType clientType : new ClientType[] {SERVER, CLIENT, THIN_CLIENT})
+ res.add(new Object[] {pageSize, clientType});
+ }
+
+ return res;
+ }
+
+ /** Server. */
+ private static IgniteEx srv;
+
+ /** Client. */
+ private static IgniteEx client;
+
+ /** Thin client. */
+ private static IgniteClient thinClient;
+
+ /** Cache. */
+ private static IgniteCache<Integer, Integer> cache;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setDataStorageConfiguration(new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(
+ new DataRegionConfiguration()
+ .setPersistenceEnabled(true)));
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ cleanPersistenceDir();
+
+ srv = startGrids(2);
+
+ thinClient = Ignition.startClient(new ClientConfiguration().setAddresses(Config.SERVER));
+
+ client = startClientGrid("client");
+
+ client.cluster().state(ACTIVE);
+
+ cache = client.getOrCreateCache(new CacheConfiguration<Integer, Integer>()
+ .setName(DEFAULT_CACHE_NAME)
+ .setSqlSchema(DFLT_SCHEMA)
+ .setQueryEntities(Collections.singletonList(
+ new QueryEntity(Integer.class, Integer.class)
+ .setTableName(DEFAULT_CACHE_NAME)))
+ );
+
+ IgniteCache<Object, Object> cache2 = client.getOrCreateCache(new CacheConfiguration<>()
+ .setName(CACHE_2)
+ .setSqlSchema(DFLT_SCHEMA)
+ .setQueryEntities(Collections.singletonList(
+ new QueryEntity(Long.class, Long.class)
+ .setTableName(CACHE_2)))
+ );
+
+ for (int i = 0; i < ENTRY_COUNT; i++) {
+ cache.put(i, i);
+ cache2.put(i, i * 2);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ super.afterTestsStopped();
+
+ cleanPersistenceDir();
+
+ thinClient.close();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ cache.query(new SqlFieldsQuery("drop table if exists " + SQL_TABLE));
+ }
+
+ /** @throws Exception If failed. */
+ @Test
+ public void testScanQuery() throws Exception {
+ ScanQuery<Object, Object> qry = new ScanQuery<>().setPageSize(pageSize);
+
+ checkQuery(SCAN, qry, DEFAULT_CACHE_NAME);
+ }
+
+ /** @throws Exception If failed. */
+ @Test
+ public void testSqlFieldsQuery() throws Exception {
+ String sql = "select * from " + DEFAULT_CACHE_NAME;
+
+ SqlFieldsQuery qry = new SqlFieldsQuery(sql).setPageSize(pageSize);
+
+ checkQuery(SQL_FIELDS, qry, sql);
+ }
+
+ /** @throws Exception If failed. */
+ @Test
+ public void testSqlFieldsJoinQuery() throws Exception {
+ String sql = "select * from " + DEFAULT_CACHE_NAME + " a inner join " + CACHE_2 + " b on a._key = b._key";
+
+ SqlFieldsQuery qry = new SqlFieldsQuery(sql).setPageSize(pageSize);
+
+ checkQuery(SQL_FIELDS, qry, sql);
+ }
+
+ /** Check query. */
+ private void checkQuery(GridCacheQueryType type, Query<?> qry, String text) throws Exception {
+ client.cluster().state(INACTIVE);
+ client.cluster().state(ACTIVE);
+
+ runQueryAndCheck(type, qry, text, true, true);
+
+ runQueryAndCheck(type, qry, text, true, false);
+ }
+
+ /** @throws Exception If failed. */
+ @Test
+ public void testDdlAndDmlQueries() throws Exception {
+ String sql = "create table " + SQL_TABLE + " (id int, val varchar, primary key (id))";
+
+ runQueryAndCheck(SQL_FIELDS, new SqlFieldsQuery(sql), sql, false, false);
+
+ sql = "insert into " + SQL_TABLE + " (id) values (1)";
+
+ runQueryAndCheck(SQL_FIELDS, new SqlFieldsQuery(sql), sql, false, false);
+
+ sql = "update " + SQL_TABLE + " set val = 'abc'";
+
+ runQueryAndCheck(SQL_FIELDS, new SqlFieldsQuery(sql), sql, true, false);
+ }
+
+ /** Runs query and checks statistics. */
+ private void runQueryAndCheck(GridCacheQueryType expType, Query<?> qry, String expText, boolean hasLogicalReads,
+ boolean hasPhysicalReads)
+ throws Exception {
+ long startTime = U.currentTimeMillis();
+
+ cleanPerformanceStatisticsDir();
+
+ startCollectStatistics();
+
+ Collection<UUID> expNodeIds = new ArrayList<>();
+
+ if (clientType == SERVER) {
+ srv.cache(DEFAULT_CACHE_NAME).query(qry).getAll();
+
+ expNodeIds.add(srv.localNode().id());
+ }
+ else if (clientType == CLIENT) {
+ client.cache(DEFAULT_CACHE_NAME).query(qry).getAll();
+
+ expNodeIds.add(client.localNode().id());
+ }
+ else if (clientType == THIN_CLIENT) {
+ thinClient.cache(DEFAULT_CACHE_NAME).query(qry).getAll();
+
+ expNodeIds.addAll(F.nodeIds(client.cluster().forServers().nodes()));
+ }
+
+ Set<UUID> readsNodes = new HashSet<>();
+
+ if (hasLogicalReads)
+ srv.cluster().forServers().nodes().forEach(node -> readsNodes.add(node.id()));
+
+ AtomicInteger queryCnt = new AtomicInteger();
+ AtomicInteger readsCnt = new AtomicInteger();
+ HashSet<Long> qryIds = new HashSet<>();
+
+ stopCollectStatisticsAndRead(new TestHandler() {
+ @Override public void query(UUID nodeId, GridCacheQueryType type, String text, long id, long queryStartTime,
+ long duration, boolean success) {
+ queryCnt.incrementAndGet();
+ qryIds.add(id);
+
+ assertTrue(expNodeIds.contains(nodeId));
+ assertEquals(expType, type);
+ assertEquals(expText, text);
+ assertTrue(queryStartTime >= startTime);
+ assertTrue(duration >= 0);
+ assertTrue(success);
+ }
+
+ @Override public void queryReads(UUID nodeId, GridCacheQueryType type, UUID queryNodeId, long id,
+ long logicalReads, long physicalReads) {
+ readsCnt.incrementAndGet();
+ qryIds.add(id);
+ readsNodes.remove(nodeId);
+
+ assertTrue(expNodeIds.contains(queryNodeId));
+ assertEquals(expType, type);
+ assertTrue(logicalReads > 0);
+ assertTrue(hasPhysicalReads ? physicalReads > 0 : physicalReads == 0);
+ }
+ });
+
+ assertEquals(1, queryCnt.get());
+ assertTrue("Query reads expected on nodes: " + readsNodes, readsNodes.isEmpty());
+ assertEquals(1, qryIds.size());
+ }
+
+ /** @throws Exception If failed. */
+ @Test
+ public void testMultipleStatementsSql() throws Exception {
+ assumeFalse("Multiple statements queries are not supported by thin client.",
+ clientType == THIN_CLIENT);
+
+ long startTime = U.currentTimeMillis();
+
+ LinkedList<String> expQrs = new LinkedList<>();
+
+ expQrs.add("create table " + SQL_TABLE + " (id int primary key, val varchar)");
+ expQrs.add("insert into " + SQL_TABLE + " (id, val) values (1, 'a')");
+ expQrs.add("insert into " + SQL_TABLE + " (id, val) values (2, 'b'), (3, 'c')");
+
+ LinkedList<String> qrsWithReads = new LinkedList<>();
+
+ qrsWithReads.add("update " + SQL_TABLE + " set val = 'd' where id = 1");
+ qrsWithReads.add("select * from " + SQL_TABLE);
+
+ expQrs.addAll(qrsWithReads);
+
+ startCollectStatistics();
+
+ SqlFieldsQuery qry = new SqlFieldsQuery(F.concat(expQrs, ";"));
+
+ IgniteEx loadNode = this.clientType == SERVER ? srv : client;
+
+ List<FieldsQueryCursor<List<?>>> res = loadNode.context().query().querySqlFields(qry, true, false);
+
+ assertEquals("Unexpected cursors count: " + res.size(), expQrs.size(), res.size());
+
+ res.get(4).getAll();
+
+ HashSet<Long> qryIds = new HashSet<>();
+
+ stopCollectStatisticsAndRead(new TestHandler() {
+ @Override public void query(UUID nodeId, GridCacheQueryType type, String text, long id, long queryStartTime,
+ long duration, boolean success) {
+ if (qrsWithReads.contains(text))
+ qryIds.add(id);
+
+ assertEquals(loadNode.localNode().id(), nodeId);
+ assertEquals(SQL_FIELDS, type);
+ assertTrue("Unexpected query: " + text, expQrs.remove(text));
+ assertTrue(queryStartTime >= startTime);
+ assertTrue(duration >= 0);
+ assertTrue(success);
+ }
+
+ @Override public void queryReads(UUID nodeId, GridCacheQueryType type, UUID queryNodeId, long id,
+ long logicalReads, long physicalReads) {
+ qryIds.add(id);
+
+ assertEquals(SQL_FIELDS, type);
+ assertEquals(loadNode.localNode().id(), queryNodeId);
+ assertTrue(logicalReads > 0);
+ assertEquals(0, physicalReads);
+ }
+ });
+
+ assertTrue("Queries was not handled: " + expQrs, expQrs.isEmpty());
+ assertEquals("Unexpected IDs: " + qryIds, qrsWithReads.size(), qryIds.size());
+ }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite6.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite6.java
index c2b7106..da2b696 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite6.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite6.java
@@ -38,6 +38,7 @@ import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheCon
import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryReplicatedAtomicOneNodeTest;
import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryReplicatedAtomicSelfTest;
import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientTest;
+import org.apache.ignite.internal.processors.performancestatistics.PerformanceStatisticsQueryTest;
import org.apache.ignite.internal.processors.query.MemLeakOnSqlWithClientReconnectTest;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
@@ -68,7 +69,8 @@ import org.junit.runners.Suite;
StaticCacheDdlTest.class,
StaticCacheDdlKeepStaticConfigurationTest.class,
MemLeakOnSqlWithClientReconnectTest.class,
- CacheContinuousQueryFilterDeploymentFailedTest.class
+ CacheContinuousQueryFilterDeploymentFailedTest.class,
+ PerformanceStatisticsQueryTest.class
})
public class IgniteCacheQuerySelfTestSuite6 {
}