You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2019/04/01 11:21:07 UTC
[ignite] branch master updated: IGNITE-11143: SQL: Improved
printout of long-running queries. This closes #6353.
This is an automated email from the ASF dual-hosted git repository.
vozerov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 963a40b IGNITE-11143: SQL: Improved printout of long-running queries. This closes #6353.
963a40b is described below
commit 963a40b7aad623f3b56b41e40d61894b9e15a6b6
Author: tledkov <tl...@gridgain.com>
AuthorDate: Mon Apr 1 14:20:32 2019 +0300
IGNITE-11143: SQL: Improved printout of long-running queries. This closes #6353.
---
.../org/apache/ignite/internal/IgniteKernal.java | 227 +----------------
.../internal/StripedExecutorMXBeanAdapter.java | 2 +-
.../internal/TransactionMetricsMxBeanImpl.java | 2 +-
.../ignite/internal/TransactionsMXBeanImpl.java | 2 +-
.../internal/managers/IgniteMBeansManager.java | 283 +++++++++++++++++++++
.../processors/query/GridQueryIndexing.java | 9 +
.../IgniteClientCacheInitializationFailTest.java | 6 +
.../ignite/internal/mxbean/SqlQueryMXBean.java | 71 ++++++
.../ignite/internal/mxbean/SqlQueryMXBeanImpl.java | 56 ++++
.../internal/processors/query/h2/H2QueryInfo.java | 159 ++++++++++++
.../processors/query/h2/IgniteH2Indexing.java | 76 +++---
.../query/h2/LongRunningQueryManager.java | 204 +++++++++++++++
.../processors/query/h2/MapH2QueryInfo.java | 58 +++++
.../processors/query/h2/ReduceH2QueryInfo.java | 44 ++++
.../query/h2/twostep/GridMapQueryExecutor.java | 8 +-
.../query/h2/twostep/GridReduceQueryExecutor.java | 20 +-
.../processors/query/LongRunningQueryTest.java | 194 ++++++++++++++
.../processors/query/h2/QueryDataPageScanTest.java | 5 +-
18 files changed, 1162 insertions(+), 264 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 3b8604f..e1564c7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -38,13 +38,11 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.ListIterator;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
-import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
@@ -52,7 +50,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.cache.CacheException;
import javax.management.JMException;
-import javax.management.ObjectName;
import org.apache.ignite.DataRegionMetrics;
import org.apache.ignite.DataRegionMetricsAdapter;
import org.apache.ignite.DataStorageMetrics;
@@ -105,6 +102,7 @@ import org.apache.ignite.internal.binary.BinaryUtils;
import org.apache.ignite.internal.cluster.ClusterGroupAdapter;
import org.apache.ignite.internal.cluster.IgniteClusterEx;
import org.apache.ignite.internal.managers.GridManager;
+import org.apache.ignite.internal.managers.IgniteMBeansManager;
import org.apache.ignite.internal.managers.checkpoint.GridCheckpointManager;
import org.apache.ignite.internal.managers.collision.GridCollisionManager;
import org.apache.ignite.internal.managers.communication.GridIoManager;
@@ -132,11 +130,9 @@ import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
import org.apache.ignite.internal.processors.cache.mvcc.MvccProcessorImpl;
import org.apache.ignite.internal.processors.cache.persistence.DataRegion;
-import org.apache.ignite.internal.processors.cache.persistence.DataStorageMXBeanImpl;
import org.apache.ignite.internal.processors.cache.persistence.filename.PdsConsistentIdProcessor;
import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
import org.apache.ignite.internal.processors.closure.GridClosureProcessor;
-import org.apache.ignite.internal.processors.cluster.BaselineConfigurationMXBeanImpl;
import org.apache.ignite.internal.processors.cluster.ClusterProcessor;
import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
import org.apache.ignite.internal.processors.cluster.GridClusterStateProcessor;
@@ -174,7 +170,6 @@ import org.apache.ignite.internal.processors.session.GridTaskSessionProcessor;
import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor;
import org.apache.ignite.internal.processors.task.GridTaskProcessor;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
-import org.apache.ignite.internal.stat.IoStatisticsMetricsLocalMXBeanImpl;
import org.apache.ignite.internal.suggestions.GridPerformanceSuggestions;
import org.apache.ignite.internal.suggestions.JvmConfigurationSuggestions;
import org.apache.ignite.internal.suggestions.OsConfigurationSuggestions;
@@ -195,8 +190,6 @@ import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.SB;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.internal.worker.FailureHandlingMxBeanImpl;
-import org.apache.ignite.internal.worker.WorkersControlMXBeanImpl;
import org.apache.ignite.internal.worker.WorkersRegistry;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteClosure;
@@ -209,17 +202,7 @@ import org.apache.ignite.lifecycle.LifecycleEventType;
import org.apache.ignite.marshaller.MarshallerExclusions;
import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.marshaller.jdk.JdkMarshaller;
-import org.apache.ignite.mxbean.BaselineConfigurationMXBean;
-import org.apache.ignite.mxbean.ClusterMetricsMXBean;
-import org.apache.ignite.mxbean.DataStorageMXBean;
-import org.apache.ignite.mxbean.FailureHandlingMxBean;
import org.apache.ignite.mxbean.IgniteMXBean;
-import org.apache.ignite.mxbean.IoStatisticsMetricsMXBean;
-import org.apache.ignite.mxbean.StripedExecutorMXBean;
-import org.apache.ignite.mxbean.ThreadPoolMXBean;
-import org.apache.ignite.mxbean.TransactionMetricsMxBean;
-import org.apache.ignite.mxbean.TransactionsMXBean;
-import org.apache.ignite.mxbean.WorkersControlMXBean;
import org.apache.ignite.plugin.IgnitePlugin;
import org.apache.ignite.plugin.PluginNotFoundException;
import org.apache.ignite.plugin.PluginProvider;
@@ -334,7 +317,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
/** Helper that registers MBeans */
@GridToStringExclude
- private final MBeansManager mBeansMgr = new MBeansManager();
+ private IgniteMBeansManager mBeansMgr;
/** Configuration. */
private IgniteConfiguration cfg;
@@ -933,6 +916,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
longJVMPauseDetector
);
+ mBeansMgr = new IgniteMBeansManager(this);
+
cfg.getMarshaller().setContext(ctx.marshallerContext());
GridInternalSubscriptionProcessor subscriptionProc = new GridInternalSubscriptionProcessor(ctx);
@@ -4335,210 +4320,6 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
}
}
- /**
- * Class that registers and unregisters MBeans for kernal.
- */
- private class MBeansManager {
- /** MBean names stored to be unregistered later. */
- private final Set<ObjectName> mBeanNames = new HashSet<>();
-
- /**
- * Registers all kernal MBeans (for kernal, metrics, thread pools).
- *
- * @param utilityCachePool Utility cache pool
- * @param execSvc Executor service
- * @param sysExecSvc System executor service
- * @param stripedExecSvc Striped executor
- * @param p2pExecSvc P2P executor service
- * @param mgmtExecSvc Management executor service
- * @param igfsExecSvc IGFS executor service
- * @param dataStreamExecSvc data stream executor service
- * @param restExecSvc Reset executor service
- * @param affExecSvc Affinity executor service
- * @param idxExecSvc Indexing executor service
- * @param callbackExecSvc Callback executor service
- * @param qryExecSvc Query executor service
- * @param schemaExecSvc Schema executor service
- * @param customExecSvcs Custom named executors
- * @throws IgniteCheckedException if fails to register any of the MBeans
- */
- private void registerAllMBeans(
- ExecutorService utilityCachePool,
- final ExecutorService execSvc,
- final ExecutorService svcExecSvc,
- final ExecutorService sysExecSvc,
- final StripedExecutor stripedExecSvc,
- ExecutorService p2pExecSvc,
- ExecutorService mgmtExecSvc,
- ExecutorService igfsExecSvc,
- StripedExecutor dataStreamExecSvc,
- ExecutorService restExecSvc,
- ExecutorService affExecSvc,
- @Nullable ExecutorService idxExecSvc,
- IgniteStripedThreadPoolExecutor callbackExecSvc,
- ExecutorService qryExecSvc,
- ExecutorService schemaExecSvc,
- @Nullable final Map<String, ? extends ExecutorService> customExecSvcs,
- WorkersRegistry workersRegistry
- ) throws IgniteCheckedException {
- if (U.IGNITE_MBEANS_DISABLED)
- return;
-
- // Kernal
- registerMBean("Kernal", IgniteKernal.class.getSimpleName(), IgniteKernal.this, IgniteMXBean.class);
-
- // Metrics
- ClusterMetricsMXBean locMetricsBean = new ClusterLocalNodeMetricsMXBeanImpl(ctx.discovery());
- registerMBean("Kernal", locMetricsBean.getClass().getSimpleName(), locMetricsBean, ClusterMetricsMXBean.class);
- ClusterMetricsMXBean metricsBean = new ClusterMetricsMXBeanImpl(cluster());
- registerMBean("Kernal", metricsBean.getClass().getSimpleName(), metricsBean, ClusterMetricsMXBean.class);
-
- //IO metrics
- IoStatisticsMetricsMXBean ioStatMetricsBean = new IoStatisticsMetricsLocalMXBeanImpl(ctx.ioStats());
- registerMBean("IOMetrics", ioStatMetricsBean.getClass().getSimpleName(), ioStatMetricsBean, IoStatisticsMetricsMXBean.class);
-
- // Transaction metrics
- TransactionMetricsMxBean txMetricsMXBean = new TransactionMetricsMxBeanImpl(ctx.cache().transactions().metrics());
- registerMBean("TransactionMetrics", txMetricsMXBean.getClass().getSimpleName(), txMetricsMXBean, TransactionMetricsMxBean.class);
-
- // Transactions
- TransactionsMXBean txMXBean = new TransactionsMXBeanImpl(ctx);
- registerMBean("Transactions", txMXBean.getClass().getSimpleName(), txMXBean, TransactionsMXBean.class);
-
- // Data storage
- DataStorageMXBean dataStorageMXBean = new DataStorageMXBeanImpl(ctx);
- registerMBean("DataStorage", dataStorageMXBean.getClass().getSimpleName(), dataStorageMXBean, DataStorageMXBean.class);
-
- // Baseline configuration
- BaselineConfigurationMXBean baselineConfigurationMXBean = new BaselineConfigurationMXBeanImpl(ctx);
- registerMBean("Baseline", baselineConfigurationMXBean.getClass().getSimpleName(), baselineConfigurationMXBean, BaselineConfigurationMXBean.class);
-
- // Executors
- registerExecutorMBean("GridUtilityCacheExecutor", utilityCachePool);
- registerExecutorMBean("GridExecutionExecutor", execSvc);
- registerExecutorMBean("GridServicesExecutor", svcExecSvc);
- registerExecutorMBean("GridSystemExecutor", sysExecSvc);
- registerExecutorMBean("GridClassLoadingExecutor", p2pExecSvc);
- registerExecutorMBean("GridManagementExecutor", mgmtExecSvc);
- registerExecutorMBean("GridIgfsExecutor", igfsExecSvc);
- registerExecutorMBean("GridDataStreamExecutor", dataStreamExecSvc);
- registerExecutorMBean("GridAffinityExecutor", affExecSvc);
- registerExecutorMBean("GridCallbackExecutor", callbackExecSvc);
- registerExecutorMBean("GridQueryExecutor", qryExecSvc);
- registerExecutorMBean("GridSchemaExecutor", schemaExecSvc);
-
- if (idxExecSvc != null)
- registerExecutorMBean("GridIndexingExecutor", idxExecSvc);
-
- if (cfg.getConnectorConfiguration() != null)
- registerExecutorMBean("GridRestExecutor", restExecSvc);
-
- if (stripedExecSvc != null) {
- // striped executor uses a custom adapter
- registerMBean("Thread Pools",
- "StripedExecutor",
- new StripedExecutorMXBeanAdapter(stripedExecSvc),
- StripedExecutorMXBean.class);
- }
-
- if (customExecSvcs != null) {
- for (Map.Entry<String, ? extends ExecutorService> entry : customExecSvcs.entrySet())
- registerExecutorMBean(entry.getKey(), entry.getValue());
- }
-
- if (U.IGNITE_TEST_FEATURES_ENABLED) {
- WorkersControlMXBean workerCtrlMXBean = new WorkersControlMXBeanImpl(workersRegistry);
-
- registerMBean("Kernal", workerCtrlMXBean.getClass().getSimpleName(),
- workerCtrlMXBean, WorkersControlMXBean.class);
- }
-
- FailureHandlingMxBean blockOpCtrlMXBean = new FailureHandlingMxBeanImpl(workersRegistry,
- ctx.cache().context().database());
-
- registerMBean("Kernal", blockOpCtrlMXBean.getClass().getSimpleName(), blockOpCtrlMXBean,
- FailureHandlingMxBean.class);
- }
-
- /**
- * Registers a {@link ThreadPoolMXBean} for an executor.
- *
- * @param name name of the bean to register
- * @param exec executor to register a bean for
- * @throws IgniteCheckedException if registration fails.
- */
- private void registerExecutorMBean(String name, ExecutorService exec) throws IgniteCheckedException {
- registerMBean("Thread Pools", name, new ThreadPoolMXBeanAdapter(exec), ThreadPoolMXBean.class);
- }
-
- /**
- * Register an Ignite MBean.
- *
- * @param grp bean group name
- * @param name bean name
- * @param impl bean implementation
- * @param itf bean interface
- * @param <T> bean type
- * @throws IgniteCheckedException if registration fails
- */
- private <T> void registerMBean(String grp, String name, T impl, Class<T> itf) throws IgniteCheckedException {
- assert !U.IGNITE_MBEANS_DISABLED;
-
- try {
- ObjectName objName = U.registerMBean(
- cfg.getMBeanServer(),
- cfg.getIgniteInstanceName(),
- grp, name, impl, itf);
-
- if (log.isDebugEnabled())
- log.debug("Registered MBean: " + objName);
-
- mBeanNames.add(objName);
- }
- catch (JMException e) {
- throw new IgniteCheckedException("Failed to register MBean " + name, e);
- }
- }
-
- /**
- * Unregisters all previously registered MBeans.
- *
- * @return {@code true} if all mbeans were unregistered successfully; {@code false} otherwise.
- */
- private boolean unregisterAllMBeans() {
- boolean success = true;
-
- for (ObjectName name : mBeanNames)
- success = success && unregisterMBean(name);
-
- return success;
- }
-
- /**
- * Unregisters given MBean.
- *
- * @param mbean MBean to unregister.
- * @return {@code true} if successfully unregistered, {@code false} otherwise.
- */
- private boolean unregisterMBean(ObjectName mbean) {
- assert !U.IGNITE_MBEANS_DISABLED;
-
- try {
- cfg.getMBeanServer().unregisterMBean(mbean);
-
- if (log.isDebugEnabled())
- log.debug("Unregistered MBean: " + mbean);
-
- return true;
- }
- catch (JMException e) {
- U.error(log, "Failed to unregister MBean.", e);
-
- return false;
- }
- }
- }
-
/** {@inheritDoc} */
@Override public void runIoTest(
long warmup,
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/StripedExecutorMXBeanAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/StripedExecutorMXBeanAdapter.java
index 0659492..f565d4e4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/StripedExecutorMXBeanAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/StripedExecutorMXBeanAdapter.java
@@ -32,7 +32,7 @@ public class StripedExecutorMXBeanAdapter implements StripedExecutorMXBean {
/**
* @param exec Executor service
*/
- StripedExecutorMXBeanAdapter(StripedExecutor exec) {
+ public StripedExecutorMXBeanAdapter(StripedExecutor exec) {
assert exec != null;
this.exec = exec;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/TransactionMetricsMxBeanImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/TransactionMetricsMxBeanImpl.java
index 715bd27..58b5867 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/TransactionMetricsMxBeanImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/TransactionMetricsMxBeanImpl.java
@@ -41,7 +41,7 @@ public class TransactionMetricsMxBeanImpl implements TransactionMetricsMxBean {
/**
* @param transactionMetrics Transaction metrics.
*/
- TransactionMetricsMxBeanImpl(TransactionMetrics transactionMetrics) {
+ public TransactionMetricsMxBeanImpl(TransactionMetrics transactionMetrics) {
this.transactionMetrics = transactionMetrics;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/TransactionsMXBeanImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/TransactionsMXBeanImpl.java
index a8a3c88..a488cb3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/TransactionsMXBeanImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/TransactionsMXBeanImpl.java
@@ -46,7 +46,7 @@ public class TransactionsMXBeanImpl implements TransactionsMXBean {
/**
* @param ctx Context.
*/
- TransactionsMXBeanImpl(GridKernalContextImpl ctx) {
+ public TransactionsMXBeanImpl(GridKernalContextImpl ctx) {
this.ctx = ctx;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/IgniteMBeansManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/IgniteMBeansManager.java
new file mode 100644
index 0000000..7bf42c1
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/IgniteMBeansManager.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import javax.management.JMException;
+import javax.management.ObjectName;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.ClusterLocalNodeMetricsMXBeanImpl;
+import org.apache.ignite.internal.ClusterMetricsMXBeanImpl;
+import org.apache.ignite.internal.GridKernalContextImpl;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.StripedExecutorMXBeanAdapter;
+import org.apache.ignite.internal.ThreadPoolMXBeanAdapter;
+import org.apache.ignite.internal.TransactionMetricsMxBeanImpl;
+import org.apache.ignite.internal.TransactionsMXBeanImpl;
+import org.apache.ignite.internal.processors.cache.persistence.DataStorageMXBeanImpl;
+import org.apache.ignite.internal.processors.cluster.BaselineConfigurationMXBeanImpl;
+import org.apache.ignite.internal.stat.IoStatisticsMetricsLocalMXBeanImpl;
+import org.apache.ignite.internal.util.StripedExecutor;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.worker.FailureHandlingMxBeanImpl;
+import org.apache.ignite.internal.worker.WorkersControlMXBeanImpl;
+import org.apache.ignite.internal.worker.WorkersRegistry;
+import org.apache.ignite.mxbean.BaselineConfigurationMXBean;
+import org.apache.ignite.mxbean.ClusterMetricsMXBean;
+import org.apache.ignite.mxbean.DataStorageMXBean;
+import org.apache.ignite.mxbean.FailureHandlingMxBean;
+import org.apache.ignite.mxbean.IgniteMXBean;
+import org.apache.ignite.mxbean.IoStatisticsMetricsMXBean;
+import org.apache.ignite.mxbean.StripedExecutorMXBean;
+import org.apache.ignite.mxbean.ThreadPoolMXBean;
+import org.apache.ignite.mxbean.TransactionMetricsMxBean;
+import org.apache.ignite.mxbean.TransactionsMXBean;
+import org.apache.ignite.mxbean.WorkersControlMXBean;
+import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Class that registers and unregisters MBeans for kernal.
+ */
+public class IgniteMBeansManager {
+ /** Ignite kernal */
+ private final IgniteKernal kernal;
+
+ /** Ignite kernal context. */
+ private final GridKernalContextImpl ctx;
+
+ /** Logger. */
+ private final IgniteLogger log;
+
+ /** MBean names stored to be unregistered later. */
+ private final Set<ObjectName> mBeanNames = new HashSet<>();
+
+ /**
+ * @param kernal Grid kernal.
+ */
+ public IgniteMBeansManager(IgniteKernal kernal) {
+ this.kernal = kernal;
+ ctx = (GridKernalContextImpl)kernal.context();
+ log = ctx.log(IgniteMBeansManager.class);
+ }
+
+ /**
+ * Registers all kernal MBeans (for kernal, metrics, thread pools).
+ *
+ * @param utilityCachePool Utility cache pool.
+ * @param execSvc Executor service.
+ * @param svcExecSvc Services' executor service.
+ * @param sysExecSvc System executor service.
+ * @param stripedExecSvc Striped executor.
+ * @param p2pExecSvc P2P executor service.
+ * @param mgmtExecSvc Management executor service.
+ * @param igfsExecSvc IGFS executor service.
+ * @param dataStreamExecSvc data stream executor service.
+ * @param restExecSvc Reset executor service.
+ * @param affExecSvc Affinity executor service.
+ * @param idxExecSvc Indexing executor service.
+ * @param callbackExecSvc Callback executor service.
+ * @param qryExecSvc Query executor service.
+ * @param schemaExecSvc Schema executor service.
+ * @param customExecSvcs Custom named executors.
+ * @param workersRegistry Worker registry.
+ * @throws IgniteCheckedException if fails to register any of the MBeans.
+ */
+ public void registerAllMBeans(
+ ExecutorService utilityCachePool,
+ final ExecutorService execSvc,
+ final ExecutorService svcExecSvc,
+ final ExecutorService sysExecSvc,
+ final StripedExecutor stripedExecSvc,
+ ExecutorService p2pExecSvc,
+ ExecutorService mgmtExecSvc,
+ ExecutorService igfsExecSvc,
+ StripedExecutor dataStreamExecSvc,
+ ExecutorService restExecSvc,
+ ExecutorService affExecSvc,
+ @Nullable ExecutorService idxExecSvc,
+ IgniteStripedThreadPoolExecutor callbackExecSvc,
+ ExecutorService qryExecSvc,
+ ExecutorService schemaExecSvc,
+ @Nullable final Map<String, ? extends ExecutorService> customExecSvcs,
+ WorkersRegistry workersRegistry
+ ) throws IgniteCheckedException {
+ if (U.IGNITE_MBEANS_DISABLED)
+ return;
+
+ // Kernal
+ registerMBean("Kernal", IgniteKernal.class.getSimpleName(), kernal, IgniteMXBean.class);
+
+ // Metrics
+ ClusterMetricsMXBean locMetricsBean = new ClusterLocalNodeMetricsMXBeanImpl(ctx.discovery());
+ registerMBean("Kernal", locMetricsBean.getClass().getSimpleName(), locMetricsBean, ClusterMetricsMXBean.class);
+ ClusterMetricsMXBean metricsBean = new ClusterMetricsMXBeanImpl(kernal.cluster());
+ registerMBean("Kernal", metricsBean.getClass().getSimpleName(), metricsBean, ClusterMetricsMXBean.class);
+
+ //IO metrics
+ IoStatisticsMetricsMXBean ioStatMetricsBean = new IoStatisticsMetricsLocalMXBeanImpl(ctx.ioStats());
+ registerMBean("IOMetrics", ioStatMetricsBean.getClass().getSimpleName(), ioStatMetricsBean, IoStatisticsMetricsMXBean.class);
+
+ // Transaction metrics
+ TransactionMetricsMxBean txMetricsMXBean = new TransactionMetricsMxBeanImpl(ctx.cache().transactions().metrics());
+ registerMBean("TransactionMetrics", txMetricsMXBean.getClass().getSimpleName(), txMetricsMXBean, TransactionMetricsMxBean.class);
+
+ // Transactions
+ TransactionsMXBean txMXBean = new TransactionsMXBeanImpl(ctx);
+ registerMBean("Transactions", txMXBean.getClass().getSimpleName(), txMXBean, TransactionsMXBean.class);
+
+ // Data storage
+ DataStorageMXBean dataStorageMXBean = new DataStorageMXBeanImpl(ctx);
+ registerMBean("DataStorage", dataStorageMXBean.getClass().getSimpleName(), dataStorageMXBean, DataStorageMXBean.class);
+
+ // Baseline configuration
+ BaselineConfigurationMXBean baselineConfigurationMXBean = new BaselineConfigurationMXBeanImpl(ctx);
+ registerMBean("Baseline", baselineConfigurationMXBean.getClass().getSimpleName(), baselineConfigurationMXBean, BaselineConfigurationMXBean.class);
+
+ // Executors
+ registerExecutorMBean("GridUtilityCacheExecutor", utilityCachePool);
+ registerExecutorMBean("GridExecutionExecutor", execSvc);
+ registerExecutorMBean("GridServicesExecutor", svcExecSvc);
+ registerExecutorMBean("GridSystemExecutor", sysExecSvc);
+ registerExecutorMBean("GridClassLoadingExecutor", p2pExecSvc);
+ registerExecutorMBean("GridManagementExecutor", mgmtExecSvc);
+ registerExecutorMBean("GridIgfsExecutor", igfsExecSvc);
+ registerExecutorMBean("GridDataStreamExecutor", dataStreamExecSvc);
+ registerExecutorMBean("GridAffinityExecutor", affExecSvc);
+ registerExecutorMBean("GridCallbackExecutor", callbackExecSvc);
+ registerExecutorMBean("GridQueryExecutor", qryExecSvc);
+ registerExecutorMBean("GridSchemaExecutor", schemaExecSvc);
+
+ if (idxExecSvc != null)
+ registerExecutorMBean("GridIndexingExecutor", idxExecSvc);
+
+ if (ctx.config().getConnectorConfiguration() != null)
+ registerExecutorMBean("GridRestExecutor", restExecSvc);
+
+ if (stripedExecSvc != null) {
+ // striped executor uses a custom adapter
+ registerMBean("Thread Pools",
+ "StripedExecutor",
+ new StripedExecutorMXBeanAdapter(stripedExecSvc),
+ StripedExecutorMXBean.class);
+ }
+
+ if (customExecSvcs != null) {
+ for (Map.Entry<String, ? extends ExecutorService> entry : customExecSvcs.entrySet())
+ registerExecutorMBean(entry.getKey(), entry.getValue());
+ }
+
+ if (U.IGNITE_TEST_FEATURES_ENABLED) {
+ WorkersControlMXBean workerCtrlMXBean = new WorkersControlMXBeanImpl(workersRegistry);
+
+ registerMBean("Kernal", workerCtrlMXBean.getClass().getSimpleName(),
+ workerCtrlMXBean, WorkersControlMXBean.class);
+ }
+
+ FailureHandlingMxBean blockOpCtrlMXBean = new FailureHandlingMxBeanImpl(workersRegistry,
+ ctx.cache().context().database());
+
+ registerMBean("Kernal", blockOpCtrlMXBean.getClass().getSimpleName(), blockOpCtrlMXBean,
+ FailureHandlingMxBean.class);
+
+ if (ctx.query().moduleEnabled())
+ ctx.query().getIndexing().registerMxBeans(this);
+ }
+
+ /**
+ * Registers a {@link ThreadPoolMXBean} for an executor.
+ *
+ * @param name name of the bean to register
+ * @param exec executor to register a bean for
+ * @throws IgniteCheckedException if registration fails.
+ */
+ private void registerExecutorMBean(String name, ExecutorService exec) throws IgniteCheckedException {
+ registerMBean("Thread Pools", name, new ThreadPoolMXBeanAdapter(exec), ThreadPoolMXBean.class);
+ }
+
+ /**
+ * Register an Ignite MBean.
+ *
+ * @param grp bean group name
+ * @param name bean name
+ * @param impl bean implementation
+ * @param itf bean interface
+ * @param <T> bean type
+ * @throws IgniteCheckedException if registration fails
+ */
+ public <T> void registerMBean(String grp, String name, T impl, Class<T> itf) throws IgniteCheckedException {
+ assert !U.IGNITE_MBEANS_DISABLED;
+
+ try {
+ ObjectName objName = U.registerMBean(
+ ctx.config().getMBeanServer(),
+ ctx.config().getIgniteInstanceName(),
+ grp, name, impl, itf);
+
+ if (log.isDebugEnabled())
+ log.debug("Registered MBean: " + objName);
+
+ mBeanNames.add(objName);
+ }
+ catch (JMException e) {
+ throw new IgniteCheckedException("Failed to register MBean " + name, e);
+ }
+ }
+
+ /**
+ * Unregisters all previously registered MBeans.
+ *
+ * @return {@code true} if all mbeans were unregistered successfully; {@code false} otherwise.
+ */
+ public boolean unregisterAllMBeans() {
+ boolean success = true;
+
+ for (ObjectName name : mBeanNames)
+ success = success && unregisterMBean(name);
+
+ return success;
+ }
+
+ /**
+ * Unregisters given MBean.
+ *
+ * @param mbean MBean to unregister.
+ * @return {@code true} if successfully unregistered, {@code false} otherwise.
+ */
+ private boolean unregisterMBean(ObjectName mbean) {
+ assert !U.IGNITE_MBEANS_DISABLED;
+
+ try {
+ ctx.config().getMBeanServer().unregisterMBean(mbean);
+
+ if (log.isDebugEnabled())
+ log.debug("Unregistered MBean: " + mbean);
+
+ return true;
+ }
+ catch (JMException e) {
+ U.error(log, "Failed to unregister MBean.", e);
+
+ return false;
+ }
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
index 74ab8e9..01f9502 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
@@ -29,6 +29,7 @@ import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.SqlQuery;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.managers.IgniteMBeansManager;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheContextInfo;
@@ -385,4 +386,12 @@ public interface GridQueryIndexing {
* @return {@code true} If context has been initialized.
*/
public boolean initCacheContext(GridCacheContext ctx) throws IgniteCheckedException;
+
+ /**
+ * Register SQL JMX beans.
+ *
+ * @param mbMgr Ignite MXBean manager.
+ * @throws IgniteCheckedException On bean registration error.
+ */
+ void registerMxBeans(IgniteMBeansManager mbMgr) throws IgniteCheckedException;
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java
index 3f8009a..2af2086 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java
@@ -41,6 +41,7 @@ import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.managers.IgniteMBeansManager;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
@@ -447,5 +448,10 @@ public class IgniteClientCacheInitializationFailTest extends GridCommonAbstractT
return true;
}
+
+ /** {@inheritDoc} */
+ @Override public void registerMxBeans(IgniteMBeansManager mbMgr) {
+ // No-op.
+ }
}
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/mxbean/SqlQueryMXBean.java b/modules/indexing/src/main/java/org/apache/ignite/internal/mxbean/SqlQueryMXBean.java
new file mode 100644
index 0000000..6380393
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/mxbean/SqlQueryMXBean.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.mxbean;
+
+import org.apache.ignite.mxbean.MXBeanDescription;
+import org.apache.ignite.mxbean.MXBeanParametersDescriptions;
+import org.apache.ignite.mxbean.MXBeanParametersNames;
+
+/**
+ * An MX bean allowing to monitor and tune SQL queries.
+ *
+ * @deprecated Temporary monitoring solution.
+ */
+@Deprecated
+public interface SqlQueryMXBean {
+ /**
+ * @return Timeout in milliseconds after which long query warning will be printed.
+ */
+ @MXBeanDescription("Timeout in milliseconds after which long query warning will be printed.")
+ long getLongQueryWarningTimeout();
+
+ /**
+ * Sets timeout in milliseconds after which long query warning will be printed.
+ *
+ * @param longQueryWarningTimeout Timeout in milliseconds after which long query warning will be printed.
+ */
+ @MXBeanDescription("Sets timeout in milliseconds after which long query warning will be printed.")
+ @MXBeanParametersNames("longQueryWarningTimeout")
+ @MXBeanParametersDescriptions("Timeout in milliseconds after which long query warning will be printed.")
+ void setLongQueryWarningTimeout(long longQueryWarningTimeout);
+
+ /**
+ * @return Long query timeout multiplier.
+ */
+ @MXBeanDescription("Long query timeout multiplier. The warning will be printed after: timeout, " +
+ "timeout * multiplier, timeout * multiplier * multiplier, etc. " +
+ "If the multiplier <= 1, the warning message is printed once.")
+ int getLongQueryTimeoutMultiplier();
+
+ /**
+ * Sets long query timeout multiplier. The warning will be printed after:
+ * - timeout;
+ * - timeout * multiplier;
+ * - timeout * multiplier * multiplier;
+ * - etc.
+ * If the multiplier <= 1, the warning message is printed once.
+ *
+ * @param longQueryTimeoutMultiplier Long query timeout multiplier.
+ */
+ @MXBeanDescription("Sets long query timeout multiplier. The warning will be printed after: timeout, " +
+ "timeout * multiplier, timeout * multiplier * multiplier, etc. " +
+ "If the multiplier <= 1, the warning message is printed once.")
+ @MXBeanParametersNames("longQueryTimeoutMultiplier")
+ @MXBeanParametersDescriptions("Long query timeout multiplier.")
+ void setLongQueryTimeoutMultiplier(int longQueryTimeoutMultiplier);
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/mxbean/SqlQueryMXBeanImpl.java b/modules/indexing/src/main/java/org/apache/ignite/internal/mxbean/SqlQueryMXBeanImpl.java
new file mode 100644
index 0000000..3dc615a
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/mxbean/SqlQueryMXBeanImpl.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.mxbean;
+
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+
+/**
+ * QueryMXBean implementation.
+ */
+public class SqlQueryMXBeanImpl implements SqlQueryMXBean {
+ /** */
+ private final IgniteH2Indexing h2idx;
+
+ /**
+ * @param ctx Context.
+ */
+ public SqlQueryMXBeanImpl(GridKernalContext ctx) {
+ h2idx = (IgniteH2Indexing)ctx.query().getIndexing();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getLongQueryWarningTimeout() {
+ return h2idx.longRunningQueries().getTimeout();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setLongQueryWarningTimeout(long longQryWarningTimeout) {
+ h2idx.longRunningQueries().setTimeout(longQryWarningTimeout);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getLongQueryTimeoutMultiplier() {
+ return h2idx.longRunningQueries().getTimeoutMultiplier();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setLongQueryTimeoutMultiplier(int longQryTimeoutMultiplier) {
+ h2idx.longRunningQueries().setTimeoutMultiplier(longQryTimeoutMultiplier);
+ }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2QueryInfo.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2QueryInfo.java
new file mode 100644
index 0000000..32ce756
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2QueryInfo.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.util.typedef.internal.LT;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.h2.engine.Session;
+
+/**
+ * Base H2 query info with commons for MAP, LOCAL, REDUCE queries.
+ */
+public class H2QueryInfo {
+ /** Type. */
+ private final QueryType type;
+
+ /** Begin timestamp. */
+ private final long beginTs;
+
+ /** Query schema. */
+ private final String schema;
+
+ /** Query SQL. */
+ private final String sql;
+
+ /** Enforce join order. */
+ private final boolean enforceJoinOrder;
+
+ /** Join batch enabled (distributed join). */
+ private final boolean distributedJoin;
+
+ /** Lazy mode. */
+ private final boolean lazy;
+
+ /**
+ * @param type Query type.
+ * @param stmt Query statement.
+ * @param sql Query statement.
+ */
+ public H2QueryInfo(QueryType type, PreparedStatement stmt, String sql) {
+ try {
+ assert stmt != null;
+
+ this.type = type;
+ this.sql = sql;
+
+ beginTs = U.currentTimeMillis();
+
+ schema = stmt.getConnection().getSchema();
+
+ Session s = H2Utils.session(stmt.getConnection());
+
+ enforceJoinOrder = s.isForceJoinOrder();
+ distributedJoin = s.isJoinBatchEnabled();
+ lazy = s.isLazyQueryExecution();
+ }
+ catch (SQLException e) {
+ throw new IgniteSQLException("Cannot collect query info", IgniteQueryErrorCode.UNKNOWN, e);
+ }
+ }
+
+ /**
+ * Print info specified by children.
+ *
+ * @param msg Message string builder.
+ */
+ protected void printInfo(StringBuilder msg) {
+ // No-op.
+ }
+
+ /**
+ * @return Query execution time.
+ */
+ public long time() {
+ return U.currentTimeMillis() - beginTs;
+ }
+
+ /**
+ * @param log Logger.
+ * @param msg Log message
+ * @param connMgr Connection manager.
+ */
+ public void printLogMessage(IgniteLogger log, ConnectionManager connMgr, String msg) {
+ StringBuilder msgSb = new StringBuilder(msg + " [");
+
+ msgSb.append("time=").append(time()).append("ms")
+ .append(", type=").append(type)
+ .append(", distributedJoin=").append(distributedJoin)
+ .append(", enforceJoinOrder=").append(enforceJoinOrder)
+ .append(", lazy=").append(lazy);
+
+ printInfo(msgSb);
+
+ msgSb.append(", sql='")
+ .append(sql);
+
+ if (type != QueryType.REDUCE)
+ msgSb.append("', plan=").append(queryPlan(log, connMgr));
+
+ msgSb.append(']');
+
+ LT.warn(log, msgSb.toString());
+ }
+
+ /**
+ * @param log Logger.
+ * @param connMgr Connection manager.
+ * @return Query plan.
+ */
+ protected String queryPlan(IgniteLogger log, ConnectionManager connMgr) {
+ Connection c = connMgr.connectionForThread().connection(schema);
+
+ H2Utils.setupConnection(c, distributedJoin, enforceJoinOrder);
+
+ try (PreparedStatement pstmt = c.prepareStatement("EXPLAIN " + sql)) {
+
+ try (ResultSet plan = pstmt.executeQuery()) {
+ plan.next();
+
+ return plan.getString(1) + U.nl();
+ }
+ }
+ catch (Exception e) {
+ log.warning("Cannot get plan for long query: " + sql, e);
+
+ return "[error on calculate plan: " + e.getMessage() + ']';
+ }
+ }
+
+ /**
+ * Query type.
+ */
+ public enum QueryType {
+ LOCAL,
+ MAP,
+ REDUCE
+ }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index cad0a4b..7598bb0 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -49,6 +49,9 @@ import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.GridTopic;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
+import org.apache.ignite.internal.managers.IgniteMBeansManager;
+import org.apache.ignite.internal.mxbean.SqlQueryMXBean;
+import org.apache.ignite.internal.mxbean.SqlQueryMXBeanImpl;
import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
@@ -134,7 +137,6 @@ import org.apache.ignite.internal.util.lang.IgniteSingletonIterator;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.internal.util.worker.GridWorkerFuture;
@@ -259,6 +261,10 @@ public class IgniteH2Indexing implements GridQueryIndexing {
/** Schema manager. */
private SchemaManager schemaMgr;
+ /** H2 Connection manager. */
+ private LongRunningQueryManager longRunningQryMgr;
+
+
/**
* @return Kernal context.
*/
@@ -553,7 +559,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
args,
timeout0,
cancel,
- qryParams.dataPageScanEnabled()
+ qryParams.dataPageScanEnabled(),
+ new H2QueryInfo(H2QueryInfo.QueryType.LOCAL, stmt, qry0)
);
if (sfuFut0 != null) {
@@ -798,7 +805,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
* @return Prepared statement with set parameters.
* @throws IgniteCheckedException If failed.
*/
- private PreparedStatement preparedStatementWithParams(Connection conn, String sql, Collection<Object> params,
+ public PreparedStatement preparedStatementWithParams(Connection conn, String sql, Collection<Object> params,
boolean useStmtCache) throws IgniteCheckedException {
final PreparedStatement stmt;
@@ -880,10 +887,11 @@ public class IgniteH2Indexing implements GridQueryIndexing {
@Nullable Collection<Object> params,
int timeoutMillis,
@Nullable GridQueryCancel cancel,
- Boolean dataPageScanEnabled
+ Boolean dataPageScanEnabled,
+ final H2QueryInfo qryInfo
) throws IgniteCheckedException {
return executeSqlQueryWithTimer(preparedStatementWithParams(conn, sql, params, false),
- conn, sql, params, timeoutMillis, cancel, dataPageScanEnabled);
+ conn, sql, params, timeoutMillis, cancel, dataPageScanEnabled, qryInfo);
}
/**
@@ -914,44 +922,33 @@ public class IgniteH2Indexing implements GridQueryIndexing {
@Nullable Collection<Object> params,
int timeoutMillis,
@Nullable GridQueryCancel cancel,
- Boolean dataPageScanEnabled
+ Boolean dataPageScanEnabled,
+ final H2QueryInfo qryInfo
) throws IgniteCheckedException {
- long start = U.currentTimeMillis();
+ if (qryInfo != null)
+ longRunningQryMgr.registerQuery(qryInfo);
enableDataPageScan(dataPageScanEnabled);
try {
ResultSet rs = executeSqlQuery(conn, stmt, timeoutMillis, cancel);
- long time = U.currentTimeMillis() - start;
-
- if (time > ctx.config().getLongQueryWarningTimeout()) {
- // In lazy mode we have to use separate connection to gather plan to print warning.
- // Otherwise the all tables are unlocked by this query.
- try (Connection planConn = connMgr.connectionNoCache(conn.getSchema())) {
- ResultSet plan = executeSqlQuery(planConn, preparedStatementWithParams(planConn, "EXPLAIN " + sql,
- params, false), 0, null);
-
- plan.next();
-
- // Add SQL explain result message into log.
- String msg = "Query execution is too long [time=" + time + " ms, sql='" + sql + '\'' +
- ", plan=" + U.nl() + plan.getString(1) + U.nl() + ", parameters=" +
- (params == null ? "[]" : Arrays.deepToString(params.toArray())) + "]";
-
- LT.warn(log, msg);
- }
- }
-
- return rs;
+ if (qryInfo != null && qryInfo.time() > longRunningQryMgr.getTimeout())
+ qryInfo.printLogMessage(log, connMgr, "Long running query is finished"); return rs;
}
- catch (SQLException e) {
- connMgr.onSqlException(conn);
+ catch (Throwable e) {
+ if (qryInfo != null && qryInfo.time() > longRunningQryMgr.getTimeout()) {
+ qryInfo.printLogMessage(log, connMgr, "Long running query is finished with error: "
+ + e.getMessage());
+ }
- throw new IgniteCheckedException(e);
+ throw e;
}
finally {
CacheDataTree.setDataPageScanEnabled(false);
+
+ if (qryInfo != null)
+ longRunningQryMgr.unregisterQuery(qryInfo);
}
}
@@ -1945,6 +1942,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
connMgr = new ConnectionManager(ctx);
+ longRunningQryMgr = new LongRunningQueryManager(ctx);
+
parser = new QueryParser(this, connMgr);
schemaMgr = new SchemaManager(ctx, connMgr);
@@ -2101,6 +2100,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
runningQryMgr.stop();
schemaMgr.stop();
+ longRunningQryMgr.stop();
connMgr.stop();
if (log.isDebugEnabled())
@@ -2749,4 +2749,18 @@ public class IgniteH2Indexing implements GridQueryIndexing {
cctx.tm().resetContext();
}
}
+
+ /** {@inheritDoc} */
+ @Override public void registerMxBeans(IgniteMBeansManager mbMgr) throws IgniteCheckedException {
+ SqlQueryMXBean qryMXBean = new SqlQueryMXBeanImpl(ctx);
+
+ mbMgr.registerMBean("SQL Query", qryMXBean.getClass().getSimpleName(), qryMXBean, SqlQueryMXBean.class);
+ }
+
+ /**
+ * @return Long running queries manager.
+ */
+ public LongRunningQueryManager longRunningQueries() {
+ return longRunningQryMgr;
+ }
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/LongRunningQueryManager.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/LongRunningQueryManager.java
new file mode 100644
index 0000000..24b078f
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/LongRunningQueryManager.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.util.worker.GridWorker;
+
+/**
+ * Long running query manager.
+ */
+public final class LongRunningQueryManager {
+ /** Check period in ms. */
+ private static final long CHECK_PERIOD = 1_000;
+
+ /** Connection manager. */
+ private final ConnectionManager connMgr;
+
+ /** Queries collection. Sorted collection isn't used to reduce 'put' time. */
+ private final ConcurrentHashMap<H2QueryInfo, TimeoutChecker> qrys = new ConcurrentHashMap<>();
+
+ /** Check worker. */
+ private final GridWorker checkWorker;
+
+ /** Logger. */
+ private final IgniteLogger log;
+
+ /** Long query timeout milliseconds. */
+ private volatile long timeout;
+
+ /**
+ * Long query timeout multiplier. The warning will be printed after:
+ * - timeout;
+ * - timeout * multiplier;
+ * - timeout * multiplier * multiplier;
+ * - etc...
+ *
+ * If the multiplier <= 1, the warning message is printed once.
+ */
+ private volatile int timeoutMult = 2;
+
+ /**
+ * @param ctx Kernal context.
+ */
+ public LongRunningQueryManager(GridKernalContext ctx) {
+ connMgr = ((IgniteH2Indexing)ctx.query().getIndexing()).connections();
+
+ log = ctx.log(LongRunningQueryManager.class);
+
+ checkWorker = new GridWorker(ctx.igniteInstanceName(), "long-qry", log) {
+ @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
+ while (true) {
+ checkLongRunning();
+
+ U.sleep(CHECK_PERIOD);
+ }
+ }
+ };
+
+ timeout = ctx.config().getLongQueryWarningTimeout();
+
+ Thread thread = new Thread(checkWorker);
+
+ thread.setDaemon(true);
+ thread.start();
+ }
+
+ /**
+ *
+ */
+ public void stop() {
+ checkWorker.cancel();
+
+ qrys.clear();
+ }
+
+ /**
+ * @param qryInfo Query info to register.
+ */
+ public void registerQuery(H2QueryInfo qryInfo) {
+ assert qryInfo != null;
+
+ final long timeout0 = timeout;
+
+ if (timeout0 > 0)
+ qrys.put(qryInfo, new TimeoutChecker(timeout0, timeoutMult));
+ }
+
+ /**
+ * @param qryInfo Query info to remove.
+ */
+ public void unregisterQuery(H2QueryInfo qryInfo) {
+ assert qryInfo != null;
+
+ qrys.remove(qryInfo);
+ }
+
+ /**
+ *
+ */
+ private void checkLongRunning() {
+ for (Map.Entry<H2QueryInfo, TimeoutChecker> e : qrys.entrySet()) {
+ H2QueryInfo qinfo = e.getKey();
+
+ if (e.getValue().checkTimeout(qinfo.time())) {
+ qinfo.printLogMessage(log, connMgr, "Query execution is too long");
+
+ if (e.getValue().timeoutMult <= 1)
+ qrys.remove(qinfo);
+ }
+ }
+ }
+
+ /**
+ * @return Timeout in milliseconds after which long query warning will be printed.
+ */
+ public long getTimeout() {
+ return timeout;
+ }
+
+ /**
+ * Sets timeout in milliseconds after which long query warning will be printed.
+ *
+ * @param timeout Timeout in milliseconds after which long query warning will be printed.
+ */
+ public void setTimeout(long timeout) {
+ this.timeout = timeout;
+ }
+
+ /**
+ * @return Long query timeout multiplier.
+ */
+ public int getTimeoutMultiplier() {
+ return timeoutMult;
+ }
+
+ /**
+ * Sets long query timeout multiplier. The warning will be printed after:
+ * - timeout;
+ * - timeout * multiplier;
+ * - timeout * multiplier * multiplier;
+ * - etc...
+ * If the multiplier <= 1, the warning message is printed once.
+ *
+ * @param timeoutMult Long query timeout multiplier.
+ */
+ public void setTimeoutMultiplier(int timeoutMult) {
+ this.timeoutMult = timeoutMult;
+ }
+
+ /**
+ * Holds timeout settings for the specified query.
+ */
+ private static class TimeoutChecker {
+ /** */
+ private long timeout;
+
+ /** */
+ private int timeoutMult;
+
+ /**
+ * @param timeout Initial timeout.
+ * @param timeoutMult Timeout multiplier.
+ */
+ public TimeoutChecker(long timeout, int timeoutMult) {
+ this.timeout = timeout;
+ this.timeoutMult = timeoutMult;
+ }
+
+ /**
+ * @param time Query execution time.
+ * @return {@code true} if timeout occurred.
+ */
+ public boolean checkTimeout(long time) {
+ if (time > timeout) {
+ if (timeoutMult > 1)
+ timeout *= timeoutMult;
+
+ return true;
+ }
+ else
+ return false;
+ }
+ }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/MapH2QueryInfo.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/MapH2QueryInfo.java
new file mode 100644
index 0000000..4c8cca5
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/MapH2QueryInfo.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2;
+
+import java.sql.PreparedStatement;
+import org.apache.ignite.cluster.ClusterNode;
+
+/**
+ * Map query info.
+ */
+public class MapH2QueryInfo extends H2QueryInfo {
+ /** Node. */
+ private final ClusterNode node;
+
+ /** Request id. */
+ private final long reqId;
+
+ /** Segment. */
+ private final int segment;
+
+ /**
+ * @param stmt Query statement.
+ * @param sql Query statement.
+ * @param node Originator node ID
+ * @param reqId Request ID.
+ * @param segment Segment.
+ */
+ public MapH2QueryInfo(PreparedStatement stmt, String sql,
+ ClusterNode node, long reqId, int segment) {
+ super(QueryType.MAP, stmt, sql);
+
+ this.node = node;
+ this.reqId= reqId;
+ this.segment = segment;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void printInfo(StringBuilder msg) {
+ msg.append(", node=").append(node)
+ .append(", reqId=").append(reqId)
+ .append(", segment=").append(segment);
+ }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ReduceH2QueryInfo.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ReduceH2QueryInfo.java
new file mode 100644
index 0000000..8346707
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ReduceH2QueryInfo.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2;
+
+import java.sql.PreparedStatement;
+
+/**
+ * Reduce query info.
+ */
+public class ReduceH2QueryInfo extends H2QueryInfo {
+ /** Request id. */
+ private final long reqId;
+
+ /**
+ * @param stmt Query statement.
+ * @param sql Query statement.
+ * @param reqId Request ID.
+ */
+ public ReduceH2QueryInfo(PreparedStatement stmt, String sql, long reqId) {
+ super(QueryType.REDUCE, stmt, sql);
+
+ this.reqId= reqId;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void printInfo(StringBuilder msg) {
+ msg.append(", reqId=").append(reqId);
+ }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index e132b1a..7314a4e 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -64,6 +64,7 @@ import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.h2.H2ConnectionWrapper;
import org.apache.ignite.internal.processors.query.h2.H2Utils;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import org.apache.ignite.internal.processors.query.h2.MapH2QueryInfo;
import org.apache.ignite.internal.processors.query.h2.ResultSetEnlistFuture;
import org.apache.ignite.internal.processors.query.h2.UpdateResult;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2RetryException;
@@ -560,6 +561,8 @@ public class GridMapQueryExecutor {
boolean removeMapping = false;
ResultSet rs = null;
+ MapH2QueryInfo qryInfo = null;
+
// If we are not the target node for this replicated query, just ignore it.
if (qry.node() == null || (segmentId == 0 && qry.node().equals(ctx.localNodeId()))) {
String sql = qry.query();
@@ -585,6 +588,8 @@ public class GridMapQueryExecutor {
int opTimeout = IgniteH2Indexing.operationTimeout(timeout, tx);
+ qryInfo = new MapH2QueryInfo(stmt, qry.query(), node, reqId, segmentId);
+
rs = h2.executeSqlQueryWithTimer(
stmt,
connWrp.connection(),
@@ -592,7 +597,8 @@ public class GridMapQueryExecutor {
params0,
opTimeout,
qryResults.queryCancel(qryIdx),
- dataPageScanEnabled);
+ dataPageScanEnabled,
+ qryInfo);
if (inTx) {
ResultSetEnlistFuture enlistFut = ResultSetEnlistFuture.future(
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index b2c5170..5aab4b5 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.query.h2.twostep;
import java.sql.Connection;
+import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
@@ -70,6 +71,7 @@ import org.apache.ignite.internal.processors.query.h2.H2ConnectionWrapper;
import org.apache.ignite.internal.processors.query.h2.H2FieldsIterator;
import org.apache.ignite.internal.processors.query.h2.H2Utils;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import org.apache.ignite.internal.processors.query.h2.ReduceH2QueryInfo;
import org.apache.ignite.internal.processors.query.h2.ThreadLocalObjectPool;
import org.apache.ignite.internal.processors.query.h2.UpdateResult;
import org.apache.ignite.internal.processors.query.h2.dml.DmlDistributedUpdateRun;
@@ -819,12 +821,21 @@ public class GridReduceQueryExecutor {
GridCacheSqlQuery rdc = qry.reduceQuery();
- ResultSet res = h2.executeSqlQueryWithTimer(r.connection(),
+ Collection<Object> params0 = F.asList(rdc.parameters(params));
+
+ final PreparedStatement stmt = h2.preparedStatementWithParams(r.connection(), rdc.query(),
+ params0, false);
+
+ ReduceH2QueryInfo qryInfo = new ReduceH2QueryInfo(stmt, qry.originalSql(), qryReqId);
+
+ ResultSet res = h2.executeSqlQueryWithTimer(stmt, r.connection(),
rdc.query(),
F.asList(rdc.parameters(params)),
timeoutMillis,
cancel,
- dataPageScanEnabled);
+ dataPageScanEnabled,
+ qryInfo
+ );
resIter = new H2FieldsIterator(res, mvccTracker, false, detachedConn);
@@ -1104,6 +1115,7 @@ public class GridReduceQueryExecutor {
* @param nodes Nodes to check periodically if they alive.
* @param cancel Query cancel.
* @throws IgniteInterruptedCheckedException If interrupted.
+ * @throws QueryCancelledException On query cancel.
*/
private void awaitAllReplies(ReduceQueryRun r, Collection<ClusterNode> nodes, GridQueryCancel cancel)
throws IgniteInterruptedCheckedException, QueryCancelledException {
@@ -1171,7 +1183,7 @@ public class GridReduceQueryExecutor {
for (int i = 0, mapQrys = qry.mapQueries().size(); i < mapQrys; i++) {
ResultSet rs =
- h2.executeSqlQueryWithTimer(c, "SELECT PLAN FROM " + mergeTableIdentifier(i), null, 0, null, null);
+ h2.executeSqlQueryWithTimer(c, "SELECT PLAN FROM " + mergeTableIdentifier(i), null, 0, null, null, null);
lists.add(F.asList(getPlan(rs)));
}
@@ -1191,7 +1203,7 @@ public class GridReduceQueryExecutor {
F.asList(rdc.parameters(params)),
0,
null,
- null);
+ null, null);
lists.add(F.asList(getPlan(rs)));
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java
new file mode 100644
index 0000000..a5827c3
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.FieldsQueryCursor;
+import org.apache.ignite.cache.query.QueryCancelledException;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest;
+import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.LogListener;
+import org.junit.Test;
+
+/**
+ * Tests for log print for long running query.
+ */
+public class LongRunningQueryTest extends AbstractIndexingCommonTest {
+ /** Keys count. */
+ private static final int KEY_CNT = 1000;
+
+ /** Local query mode. */
+ private boolean local;
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ startGrid();
+
+ IgniteCache c = grid().createCache(new CacheConfiguration<Long, Long>()
+ .setName("test")
+ .setSqlSchema("TEST")
+ .setQueryEntities(Collections.singleton(new QueryEntity(Long.class, Long.class)
+ .setTableName("test")
+ .addQueryField("id", Long.class.getName(), null)
+ .addQueryField("val", Long.class.getName(), null)
+ .setKeyFieldName("id")
+ .setValueFieldName("val")
+ ))
+ .setAffinity(new RendezvousAffinityFunction(false, 10))
+ .setSqlFunctionClasses(TestSQLFunctions.class));
+
+ for (long i = 0; i < KEY_CNT; ++i)
+ c.put(i, i);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ super.afterTest();
+ }
+
+ /**
+ *
+ */
+ @Test
+ public void testLongDistributed() {
+ local = false;
+
+ checkLongRunning();
+ checkFastQueries();
+ }
+
+ /**
+ *
+ */
+ @Test
+ public void testLongLocal() {
+ local = true;
+
+ checkLongRunning();
+ checkFastQueries();
+ }
+
+ /**
+ * Do several fast queries.
+ * Log messages must not contain info about long query.
+ */
+ private void checkFastQueries() {
+ ListeningTestLogger testLog = testLog();
+
+ LogListener lsnr = LogListener
+ .matches(Pattern.compile("Query execution is too long"))
+ .build();
+
+ testLog.registerListener(lsnr);
+
+ // Several fast queries.
+ for (int i = 0; i < 10; ++i)
+ sql("SELECT * FROM test").getAll();
+
+ assertFalse(lsnr.check());
+ }
+
+ /**
+ * Do long running query canceled by timeout and check log output.
+ * Log messages must contain info about long query.
+ */
+ private void checkLongRunning() {
+ ListeningTestLogger testLog = testLog();
+
+ LogListener lsnr = LogListener
+ .matches("Query execution is too long")
+ .build();
+
+ testLog.registerListener(lsnr);
+
+ sqlCheckLongRunning("SELECT T0.id FROM test AS T0, test AS T1, test AS T2 where T0.id > ?", 0);
+
+ assertTrue(lsnr.check());
+ }
+
+ /**
+ * @param sql SQL query.
+ * @param args Query parameters.
+ */
+ private void sqlCheckLongRunning(String sql, Object ... args) {
+ GridTestUtils.assertThrowsAnyCause(log, () -> sql(sql, args).getAll(), QueryCancelledException.class, "");
+ }
+
+ /**
+ * @param sql SQL query.
+ * @param args Query parameters.
+ * @return Results cursor.
+ */
+ private FieldsQueryCursor<List<?>> sql(String sql, Object ... args) {
+ return grid().context().query().querySqlFields(new SqlFieldsQuery(sql)
+ .setTimeout(10, TimeUnit.SECONDS)
+ .setLocal(local)
+ .setSchema("TEST")
+ .setArgs(args), false);
+ }
+
+ /**
+ * Utility class with custom SQL functions.
+ */
+ public static class TestSQLFunctions {
+ /**
+ * @param v amount of milliseconds to sleep
+ * @return amount of milliseconds to sleep
+ */
+ @SuppressWarnings("unused")
+ @QuerySqlFunction
+ public static int sleep_func(int v) {
+ try {
+ Thread.sleep(v);
+ }
+ catch (InterruptedException ignored) {
+ // No-op
+ }
+ return v;
+ }
+ }
+
+ /**
+ * Setup and return test log.
+ *
+ * @return Test logger.
+ */
+ private ListeningTestLogger testLog() {
+ ListeningTestLogger testLog = new ListeningTestLogger(false, log);
+
+ GridTestUtils.setFieldValue(((IgniteH2Indexing)grid().context().query().getIndexing()).longRunningQueries(),
+ "log", testLog);
+
+ return testLog;
+ }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/QueryDataPageScanTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/QueryDataPageScanTest.java
index c536d79..3b7a8d7 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/QueryDataPageScanTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/QueryDataPageScanTest.java
@@ -517,13 +517,14 @@ public class QueryDataPageScanTest extends GridCommonAbstractTest {
@Nullable Collection<Object> params,
int timeoutMillis,
@Nullable GridQueryCancel cancel,
- Boolean dataPageScanEnabled
+ Boolean dataPageScanEnabled,
+ final H2QueryInfo qryInfo
) throws IgniteCheckedException {
callsCnt.incrementAndGet();
assertEquals(expectedDataPageScanEnabled, dataPageScanEnabled);
return super.executeSqlQueryWithTimer(stmt, conn, sql, params, timeoutMillis,
- cancel, dataPageScanEnabled);
+ cancel, dataPageScanEnabled, qryInfo);
}
}