You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2019/01/23 10:17:29 UTC
[ignite] branch master updated: IGNITE-10754: SQL: Query history
tracking. This closes #5805.
This is an automated email from the ASF dual-hosted git repository.
vozerov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new faf4f88 IGNITE-10754: SQL: Query history tracking. This closes #5805.
faf4f88 is described below
commit faf4f88936537c960b0b806a207d2efbd318025a
Author: Yuriy Gerzhedovich <yg...@gridgain.com>
AuthorDate: Wed Jan 23 13:17:19 2019 +0300
IGNITE-10754: SQL: Query history tracking. This closes #5805.
---
.../ignite/configuration/IgniteConfiguration.java | 31 +
.../processors/bulkload/BulkLoadProcessor.java | 9 +-
.../internal/processors/cache/QueryCursorImpl.java | 2 +-
.../platform/utils/PlatformConfigurationUtils.java | 16 +-
.../processors/query/QueryHistoryMetrics.java | 175 ++++++
.../processors/query/QueryHistoryMetricsKey.java | 95 +++
.../processors/query/QueryHistoryMetricsValue.java | 99 ++++
.../processors/query/QueryHistoryTracker.java | 150 +++++
.../processors/query/RunningQueryManager.java | 87 ++-
.../IgniteClientCacheInitializationFailTest.java | 1 -
.../cache/query/RegisteredQueryCursor.java | 17 +-
.../query/h2/DmlStatementsProcessor.java | 22 +-
.../processors/query/h2/IgniteH2Indexing.java | 70 ++-
.../processors/query/RunningQueriesTest.java | 17 +-
.../query/SqlQueryHistoryFromClientSelfTest.java | 55 ++
.../processors/query/SqlQueryHistorySelfTest.java | 644 +++++++++++++++++++++
.../IgniteBinaryCacheQueryTestSuite.java | 6 +
.../Config/full-config.xml | 3 +-
.../IgniteConfigurationSerializerTest.cs | 6 +-
.../IgniteConfigurationTest.cs | 2 +
.../Apache.Ignite.Core/IgniteConfiguration.cs | 27 +-
.../IgniteConfigurationSection.xsd | 6 +-
22 files changed, 1468 insertions(+), 72 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index 00178e9..7da24fbcc 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -237,6 +237,9 @@ public class IgniteConfiguration {
/** Initial value of time which we would wait from the first discovery event in the chain(node join/exit). */
public static final long DFLT_INIT_BASELINE_AUTO_ADJUST_MAX_TIMEOUT = 0;
+ /** Default SQL query history size. */
+ public static final int DFLT_SQL_QUERY_HISTORY_SIZE = 1000;
+
/** Optional local Ignite instance name. */
private String igniteInstanceName;
@@ -285,6 +288,9 @@ public class IgniteConfiguration {
/** Query pool size. */
private int qryPoolSize = DFLT_QUERY_THREAD_POOL_SIZE;
+ /** SQL query history size. */
+ private int sqlQryHistSize = DFLT_SQL_QUERY_HISTORY_SIZE;
+
/** Ignite installation folder. */
private String igniteHome;
@@ -650,6 +656,7 @@ public class IgniteConfiguration {
sndRetryCnt = cfg.getNetworkSendRetryCount();
sndRetryDelay = cfg.getNetworkSendRetryDelay();
sqlConnCfg = cfg.getSqlConnectorConfiguration();
+ sqlQryHistSize = cfg.getSqlQueryHistorySize();
sqlSchemas = cfg.getSqlSchemas();
sslCtxFactory = cfg.getSslContextFactory();
storeSesLsnrs = cfg.getCacheStoreSessionListenerFactories();
@@ -1026,6 +1033,30 @@ public class IgniteConfiguration {
}
/**
+ * Number of SQL query history elements to keep in memory. If not provided, then default value {@link
+ * #DFLT_SQL_QUERY_HISTORY_SIZE} is used. If provided value is less or equals 0, then gathering SQL query history
+ * will be switched off.
+ *
+ * @return SQL query history size.
+ */
+ public int getSqlQueryHistorySize() {
+ return sqlQryHistSize;
+ }
+
+ /**
+ * Sets number of SQL query history elements kept in memory. If not explicitly set, then default value is {@link
+ * #DFLT_SQL_QUERY_HISTORY_SIZE}.
+ *
+ * @param size Number of SQL query history elements kept in memory.
+ * @return {@code this} for chaining.
+ */
+ public IgniteConfiguration setSqlQueryHistorySize(int size) {
+ sqlQryHistSize = size;
+
+ return this;
+ }
+
+ /**
* Sets thread pool size to use within grid.
*
* @param poolSize Thread pool size to use within grid.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadProcessor.java
index 9dba60b..3bca845 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadProcessor.java
@@ -107,13 +107,20 @@ public class BulkLoadProcessor implements AutoCloseable {
if (isClosed)
return;
+ boolean failed = false;
+
try {
isClosed = true;
outputStreamer.close();
}
+ catch (Exception e) {
+ failed = true;
+
+ throw e;
+ }
finally {
- runningQryMgr.unregister(qryId);
+ runningQryMgr.unregister(qryId, failed);
}
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java
index a17a155..23d36c2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java
@@ -95,7 +95,7 @@ public class QueryCursorImpl<T> implements QueryCursorEx<T>, FieldsQueryCursor<T
/**
* @return An simple iterator.
*/
- private Iterator<T> iter() {
+ protected Iterator<T> iter() {
if (!STATE_UPDATER.compareAndSet(this, IDLE, EXECUTION))
throw new IgniteException("Iterator is already fetched or query was cancelled.");
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
index 11ff87e..a316b59 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
@@ -523,7 +523,7 @@ public class PlatformConfigurationUtils {
Object defVal = in.readObject();
if (defVal != null)
defVals.put(fieldName, defVal);
-
+
if (ver.compareTo(VER_1_2_0) >= 0) {
int precision = in.readInt();
@@ -616,7 +616,7 @@ public class PlatformConfigurationUtils {
* @param ver Client version.
*/
@SuppressWarnings("deprecation")
- public static void readIgniteConfiguration(BinaryRawReaderEx in, IgniteConfiguration cfg,
+ public static void readIgniteConfiguration(BinaryRawReaderEx in, IgniteConfiguration cfg,
ClientListenerProtocolVersion ver) {
if (in.readBoolean())
cfg.setClientMode(in.readBoolean());
@@ -667,6 +667,8 @@ public class PlatformConfigurationUtils {
cfg.setInitBaselineAutoAdjustTimeout(in.readLong());
if (in.readBoolean())
cfg.setInitBaselineAutoAdjustMaxTimeout(in.readLong());
+ if (in.readBoolean())
+ cfg.setSqlQueryHistorySize(in.readInt());
int sqlSchemasCnt = in.readInt();
@@ -854,7 +856,7 @@ public class PlatformConfigurationUtils {
* @param in Reader.
* @param ver Client version.
*/
- private static void readCacheConfigurations(BinaryRawReaderEx in, IgniteConfiguration cfg,
+ private static void readCacheConfigurations(BinaryRawReaderEx in, IgniteConfiguration cfg,
ClientListenerProtocolVersion ver) {
int len = in.readInt();
@@ -989,7 +991,7 @@ public class PlatformConfigurationUtils {
* @param ccfg Configuration.
* @param ver Client version.
*/
- public static void writeCacheConfiguration(BinaryRawWriter writer, CacheConfiguration ccfg,
+ public static void writeCacheConfiguration(BinaryRawWriter writer, CacheConfiguration ccfg,
ClientListenerProtocolVersion ver) {
assert writer != null;
assert ccfg != null;
@@ -1110,7 +1112,7 @@ public class PlatformConfigurationUtils {
* @param qryEntity Query entity.
* @param ver Client version.
*/
- public static void writeQueryEntity(BinaryRawWriter writer, QueryEntity qryEntity,
+ public static void writeQueryEntity(BinaryRawWriter writer, QueryEntity qryEntity,
ClientListenerProtocolVersion ver) {
assert qryEntity != null;
@@ -1210,7 +1212,7 @@ public class PlatformConfigurationUtils {
* @param ver Client version.
*/
@SuppressWarnings("deprecation")
- public static void writeIgniteConfiguration(BinaryRawWriter w, IgniteConfiguration cfg,
+ public static void writeIgniteConfiguration(BinaryRawWriter w, IgniteConfiguration cfg,
ClientListenerProtocolVersion ver) {
assert w != null;
assert cfg != null;
@@ -1262,6 +1264,8 @@ public class PlatformConfigurationUtils {
w.writeLong(cfg.getInitBaselineAutoAdjustTimeout());
w.writeBoolean(true);
w.writeLong(cfg.getInitBaselineAutoAdjustMaxTimeout());
+ w.writeBoolean(true);
+ w.writeInt(cfg.getSqlQueryHistorySize());
if (cfg.getSqlSchemas() == null)
w.writeInt(-1);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryHistoryMetrics.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryHistoryMetrics.java
new file mode 100644
index 0000000..0f8a07d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryHistoryMetrics.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.query;
+
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+import org.jsr166.ConcurrentLinkedDeque8;
+
+/**
+ * Query history metrics.
+ */
+public class QueryHistoryMetrics {
+ /** Link to internal node in eviction deque. */
+ @GridToStringExclude
+ private final AtomicReference<ConcurrentLinkedDeque8.Node<QueryHistoryMetrics>> linkRef;
+
+ /** Query history metrics immutable wrapper. */
+ private volatile QueryHistoryMetricsValue val;
+
+ /** Query history metrics group key. */
+ private final QueryHistoryMetricsKey key;
+
+ /**
+ * Constructor with metrics.
+ *
+ * @param qry Textual query representation.
+ * @param schema Schema name.
+ * @param loc {@code true} for local query.
+ * @param startTime Duration of queue execution.
+ * @param duration Duration of queue execution.
+ * @param failed {@code True} query executed unsuccessfully {@code false} otherwise.
+ */
+ public QueryHistoryMetrics(String qry, String schema, boolean loc, long startTime, long duration, boolean failed) {
+ key = new QueryHistoryMetricsKey(qry, schema, loc);
+
+ if (failed)
+ val = new QueryHistoryMetricsValue(1, 1, 0, 0, startTime);
+ else
+ val = new QueryHistoryMetricsValue(1, 0, duration, duration, startTime);
+
+ linkRef = new AtomicReference<>();
+ }
+
+ /**
+ * @return Metrics group key.
+ */
+ public QueryHistoryMetricsKey key() {
+ return key;
+ }
+
+ /**
+ * Aggregate new metrics with already existen.
+ *
+ * @param m Other metrics to take into account.
+ * @return Aggregated metrics.
+ */
+ public QueryHistoryMetrics aggregateWithNew(QueryHistoryMetrics m) {
+ val = new QueryHistoryMetricsValue(
+ val.execs() + m.executions(),
+ val.failures() + m.failures(),
+ Math.min(val.minTime(), m.minimumTime()),
+ Math.max(val.maxTime(), m.maximumTime()),
+ Math.max(val.lastStartTime(), m.lastStartTime()));
+
+ return this;
+ }
+
+ /**
+ * @return Textual representation of query.
+ */
+ public String query() {
+ return key.query();
+ }
+
+ /**
+ * @return Schema.
+ */
+ public String schema() {
+ return key.schema();
+ }
+
+ /**
+ * @return {@code true} For query with enabled local flag.
+ */
+ public boolean local() {
+ return key.local();
+ }
+
+ /**
+ * Gets total number execution of query.
+ *
+ * @return Number of executions.
+ */
+ public int executions() {
+ return val.execs();
+ }
+
+ /**
+ * Gets number of times a query execution failed.
+ *
+ * @return Number of times a query execution failed.
+ */
+ public int failures() {
+ return val.failures();
+ }
+
+ /**
+ * Gets minimum execution time of query.
+ *
+ * @return Minimum execution time of query.
+ */
+ public long minimumTime() {
+ return val.minTime();
+ }
+
+ /**
+ * Gets maximum execution time of query.
+ *
+ * @return Maximum execution time of query.
+ */
+ public long maximumTime() {
+ return val.maxTime();
+ }
+
+ /**
+ * Gets latest query start time.
+ *
+ * @return Latest time query was stared.
+ */
+ public long lastStartTime() {
+ return val.lastStartTime();
+ }
+
+ /**
+ * @return Link to internal node in eviction deque.
+ */
+ @Nullable public ConcurrentLinkedDeque8.Node<QueryHistoryMetrics> link() {
+ return linkRef.get();
+ }
+
+ /**
+ * Atomically replace link to new.
+ *
+ * @param expLink Link which should be replaced.
+ * @param updatedLink New link which should be set.
+ * @return {@code true} If link has been updated.
+ */
+ public boolean replaceLink(ConcurrentLinkedDeque8.Node<QueryHistoryMetrics> expLink,
+ ConcurrentLinkedDeque8.Node<QueryHistoryMetrics> updatedLink) {
+ return linkRef.compareAndSet(expLink, updatedLink);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(QueryHistoryMetrics.class, this);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryHistoryMetricsKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryHistoryMetricsKey.java
new file mode 100644
index 0000000..890c574
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryHistoryMetricsKey.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.query;
+
+import org.apache.ignite.internal.util.typedef.F;
+
+/**
+ * Immutable query metrics key used to group metrics.
+ */
+public class QueryHistoryMetricsKey {
+ /** Textual query representation. */
+ private final String qry;
+
+ /** Schema. */
+ private final String schema;
+
+ /** Local flag. */
+ private final boolean loc;
+
+ /** Pre-calculated hash code. */
+ private final int hash;
+
+ /**
+ * Constructor.
+ *
+ * @param qry Textual query representation.
+ * @param schema Schema.
+ * @param loc Local flag of execution query.
+ */
+ public QueryHistoryMetricsKey(String qry, String schema, boolean loc) {
+ assert qry != null;
+ assert schema != null;
+
+ this.qry = qry;
+ this.schema = schema;
+ this.loc = loc;
+
+ hash = 31 * (31 * qry.hashCode() + schema.hashCode()) + (loc ? 1 : 0);
+ }
+
+ /**
+ * @return Textual representation of query.
+ */
+ public String query() {
+ return qry;
+ }
+
+ /**
+ * @return Textual representation of schema.
+ */
+ public String schema() {
+ return schema;
+ }
+
+ /**
+ * @return Local query flag.
+ */
+ public boolean local() {
+ return loc;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return hash;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ QueryHistoryMetricsKey key = (QueryHistoryMetricsKey)o;
+
+ return F.eq(qry, key.qry) && F.eq(schema, key.schema) && F.eq(loc, key.loc);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryHistoryMetricsValue.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryHistoryMetricsValue.java
new file mode 100644
index 0000000..c37e141
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryHistoryMetricsValue.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.query;
+
+/**
+ * Immutable query metrics.
+ */
+class QueryHistoryMetricsValue {
+ /** Number of executions. */
+ private final int execs;
+
+ /** Number of failures. */
+ private final int failures;
+
+ /** Minimum time of execution. */
+ private final long minTime;
+
+ /** Maximum time of execution. */
+ private final long maxTime;
+
+ /** Last start time of execution. */
+ private final long lastStartTime;
+
+ /**
+ * @param execs Number of executions.
+ * @param failures Number of failure.
+ * @param minTime Min time of execution.
+ * @param maxTime Max time of execution.
+ * @param lastStartTime Last start time of execution.
+ */
+ public QueryHistoryMetricsValue(int execs, int failures, long minTime, long maxTime, long lastStartTime) {
+ this.execs = execs;
+ this.failures = failures;
+ this.minTime = minTime;
+ this.maxTime = maxTime;
+ this.lastStartTime = lastStartTime;
+ }
+
+ /**
+ * Gets total number execution of query.
+ *
+ * @return Number of executions.
+ */
+ public int execs() {
+ return execs;
+ }
+
+ /**
+ * Gets number of times a query execution failed.
+ *
+ * @return Number of times a query execution failed.
+ */
+ public int failures() {
+ return failures;
+ }
+
+ /**
+ * Gets minimum execution time of query.
+ *
+ * @return Minimum execution time of query.
+ */
+ public long minTime() {
+ return minTime;
+ }
+
+ /**
+ * Gets maximum execution time of query.
+ *
+ * @return Maximum execution time of query.
+ */
+ public long maxTime() {
+ return maxTime;
+ }
+
+ /**
+ * Gets latest query start time.
+ *
+ * @return Latest time query was stared.
+ */
+ public long lastStartTime() {
+ return lastStartTime;
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryHistoryTracker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryHistoryTracker.java
new file mode 100644
index 0000000..a8bf796
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryHistoryTracker.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.query;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.jsr166.ConcurrentLinkedDeque8;
+
+/**
+ *
+ */
+class QueryHistoryTracker {
+ /** Query metrics. */
+ private final ConcurrentHashMap<QueryHistoryMetricsKey, QueryHistoryMetrics> qryMetrics;
+
+ /** Queue. */
+ private final ConcurrentLinkedDeque8<QueryHistoryMetrics> evictionQueue = new ConcurrentLinkedDeque8<>();
+
+ /** History size. */
+ private final int histSz;
+
+ /**
+ * @param histSz History size.
+ */
+ QueryHistoryTracker(int histSz) {
+ this.histSz = histSz;
+
+ qryMetrics = histSz > 0 ? new ConcurrentHashMap<>(histSz) : null;
+ }
+
+ /**
+ * @param failed {@code True} if query execution failed.
+ */
+ void collectMetrics(GridRunningQueryInfo runningQryInfo, boolean failed) {
+ if (histSz <= 0)
+ return;
+
+ String qry = runningQryInfo.query();
+ String schema = runningQryInfo.schemaName();
+ boolean loc = runningQryInfo.local();
+ long startTime = runningQryInfo.startTime();
+ long duration = System.currentTimeMillis() - startTime;
+
+ QueryHistoryMetrics m = new QueryHistoryMetrics(qry, schema, loc, startTime, duration, failed);
+
+ QueryHistoryMetrics mergedMetrics = qryMetrics.merge(m.key(), m, QueryHistoryMetrics::aggregateWithNew);
+
+ if (touch(mergedMetrics) && qryMetrics.size() > histSz)
+ shrink();
+ }
+
+ /**
+ * @param entry Entry Which was updated
+ * @return {@code true} In case entry is new and has been added, {@code false} otherwise.
+ */
+ private boolean touch(QueryHistoryMetrics entry) {
+ ConcurrentLinkedDeque8.Node<QueryHistoryMetrics> node = entry.link();
+
+ // Entry has not been enqueued yet.
+ if (node == null) {
+ node = evictionQueue.offerLastx(entry);
+
+ if (!entry.replaceLink(null, node)) {
+ // Was concurrently added, need to clear it from queue.
+ removeLink(node);
+
+ return false;
+ }
+
+ if (node.item() == null) {
+ // Was concurrently shrinked.
+ entry.replaceLink(node, null);
+
+ return false;
+ }
+
+ return true;
+ }
+ else if (removeLink(node)) {
+ // Move node to tail.
+ ConcurrentLinkedDeque8.Node<QueryHistoryMetrics> newNode = evictionQueue.offerLastx(entry);
+
+ if (!entry.replaceLink(node, newNode)) {
+ // Was concurrently added, need to clear it from queue.
+ removeLink(newNode);
+ }
+ }
+
+ // Entry is already in queue.
+ return false;
+ }
+
+ /**
+ * Tries to remove one item from queue.
+ */
+ private void shrink() {
+ while (true) {
+ QueryHistoryMetrics entry = evictionQueue.poll();
+
+ if (entry == null)
+ return;
+
+ // Metrics has been changed if we can't remove metric entry.
+ // In this case eviction queue already offered by the entry and we don't put it back. Just try to do new
+ // attempt to remove oldest entry.
+ if (qryMetrics.remove(entry.key(), entry))
+ return;
+ }
+ }
+
+ /**
+ * @param node Node wchi should be unlinked from eviction queue.
+ * @return {@code true} If node was unlinked.
+ */
+ private boolean removeLink(ConcurrentLinkedDeque8.Node<QueryHistoryMetrics> node) {
+ return evictionQueue.unlinkx(node);
+ }
+
+ /**
+ * Gets SQL query history. Size of history could be configured via {@link
+ * IgniteConfiguration#setSqlQueryHistorySize(int)}
+ *
+ * @return SQL queries history aggregated by query text, schema and local flag.
+ */
+ Map<QueryHistoryMetricsKey, QueryHistoryMetrics> queryHistoryMetrics() {
+ if (histSz <= 0)
+ return Collections.emptyMap();
+
+ return new HashMap<>(qryMetrics);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/RunningQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/RunningQueryManager.java
index 17b7894..7c908be 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/RunningQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/RunningQueryManager.java
@@ -20,13 +20,20 @@ package org.apache.ignite.internal.processors.query;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.jetbrains.annotations.Nullable;
+import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL;
+import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL_FIELDS;
+
/**
* Keep information about all running queries.
*/
@@ -37,6 +44,23 @@ public class RunningQueryManager {
/** Unique id for queries on single node. */
private final AtomicLong qryIdGen = new AtomicLong();
+ /** History size. */
+ private final int histSz;
+
+ /** Query history tracker. */
+ private volatile QueryHistoryTracker qryHistTracker;
+
+ /**
+ * Constructor.
+ *
+ * @param ctx Context.
+ */
+ public RunningQueryManager(GridKernalContext ctx) {
+ histSz = ctx.config().getSqlQueryHistorySize();
+
+ qryHistTracker = new QueryHistoryTracker(histSz);
+ }
+
/**
* Register running query.
*
@@ -45,11 +69,11 @@ public class RunningQueryManager {
* @param schemaName Schema name.
* @param loc Local query flag.
* @param cancel Query cancel. Should be passed in case query is cancelable, or {@code null} otherwise.
- * @return Registered RunningQueryInfo.
+ * @return Id of registered query.
*/
- public GridRunningQueryInfo register(String qry, GridCacheQueryType qryType, String schemaName,
- boolean loc, @Nullable GridQueryCancel cancel) {
- long qryId = qryIdGen.incrementAndGet();
+ public Long register(String qry, GridCacheQueryType qryType, String schemaName, boolean loc,
+ @Nullable GridQueryCancel cancel) {
+ Long qryId = qryIdGen.incrementAndGet();
GridRunningQueryInfo run = new GridRunningQueryInfo(
qryId,
@@ -65,30 +89,34 @@ public class RunningQueryManager {
assert preRun == null : "Running query already registered [prev_qry=" + preRun + ", newQry=" + run + ']';
- return run;
+ return qryId;
}
/**
* Unregister running query.
*
- * @param runningQryInfo Running query info..
- * @return Unregistered running query info. {@code null} in case running query is not registered.
+ * @param qryId Query id.
+ * @param failed {@code true} In case query was failed.
*/
- @Nullable public GridRunningQueryInfo unregister(@Nullable GridRunningQueryInfo runningQryInfo) {
- return (runningQryInfo != null) ? unregister(runningQryInfo.id()) : null;
+ public void unregister(Long qryId, boolean failed) {
+ if (qryId == null)
+ return;
+
+ GridRunningQueryInfo unregRunninigQry = runs.remove(qryId);
+
+ //We need to collect query history only for SQL queries.
+ if (unregRunninigQry != null && isSqlQuery(unregRunninigQry))
+ qryHistTracker.collectMetrics(unregRunninigQry, failed);
}
/**
- * Unregister running query.
+ * Check belongs running query to an SQL type.
*
- * @param qryId Query id.
- * @return Unregistered running query info. {@code null} in case running query with give id wasn't found.
+ * @param runningQryInfo Running query info object.
+ * @return {@code true} For SQL or SQL_FIELDS query type.
*/
- @Nullable public GridRunningQueryInfo unregister(Long qryId) {
- if (qryId == null)
- return null;
-
- return runs.remove(qryId);
+ private boolean isSqlQuery(GridRunningQueryInfo runningQryInfo){
+ return runningQryInfo.queryType() == SQL_FIELDS || runningQryInfo.queryType() == SQL;
}
/**
@@ -126,9 +154,13 @@ public class RunningQueryManager {
* Cancel all executing queries and deregistering all of them.
*/
public void stop() {
- for (GridRunningQueryInfo r : runs.values()) {
+ Iterator<GridRunningQueryInfo> iter = runs.values().iterator();
+
+ while (iter.hasNext()) {
try {
- unregister(r.id());
+ GridRunningQueryInfo r = iter.next();
+
+ iter.remove();
r.cancel();
}
@@ -138,6 +170,23 @@ public class RunningQueryManager {
}
}
+ /**
+ * Gets query history statistics. Size of history could be configured via {@link
+ * IgniteConfiguration#setSqlQueryHistorySize(int)}
+ *
+ * @return Queries history statistics aggregated by query text, schema and local flag.
+ */
+ public Map<QueryHistoryMetricsKey, QueryHistoryMetrics> queryHistoryMetrics() {
+ return qryHistTracker.queryHistoryMetrics();
+ }
+
+ /**
+ * Reset query history metrics.
+ */
+ public void resetQueryHistoryMetrics() {
+ qryHistTracker = new QueryHistoryTracker(histSz);
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(RunningQueryManager.class, this);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java
index cc0bee8..35359d8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java
@@ -53,7 +53,6 @@ import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
import org.apache.ignite.internal.processors.query.QueryField;
import org.apache.ignite.internal.processors.query.QueryIndexDescriptorImpl;
-import org.apache.ignite.internal.processors.query.RunningQueryManager;
import org.apache.ignite.internal.processors.query.SqlClientContext;
import org.apache.ignite.internal.processors.query.UpdateSourceIterator;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitor;
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/RegisteredQueryCursor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/RegisteredQueryCursor.java
index 3e08c7d..667c713 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/RegisteredQueryCursor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/RegisteredQueryCursor.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.cache.query;
+import java.util.Iterator;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
import org.apache.ignite.internal.processors.query.GridQueryCancel;
@@ -38,6 +39,9 @@ public class RegisteredQueryCursor<T> extends QueryCursorImpl<T> {
/** */
private Long qryId;
+ /** Flag to indicate error. */
+ private boolean failed;
+
/**
* @param iterExec Query executor.
* @param cancel Cancellation closure.
@@ -55,10 +59,21 @@ public class RegisteredQueryCursor<T> extends QueryCursorImpl<T> {
this.qryId = qryId;
}
+ @Override protected Iterator<T> iter() {
+ try {
+ return super.iter();
+ }
+ catch (Exception e) {
+ failed = true;
+
+ throw e;
+ }
+ }
+
/** {@inheritDoc} */
@Override public void close() {
if (unregistered.compareAndSet(false, true))
- runningQryMgr.unregister(qryId);
+ runningQryMgr.unregister(qryId, failed);
super.close();
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
index a51a058..05d3e6ec 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
@@ -67,7 +67,6 @@ import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator
import org.apache.ignite.internal.processors.query.GridQueryCancel;
import org.apache.ignite.internal.processors.query.GridQueryFieldsResult;
import org.apache.ignite.internal.processors.query.GridQueryFieldsResultAdapter;
-import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.processors.query.UpdateSourceIterator;
@@ -427,8 +426,9 @@ public class DmlStatementsProcessor {
@SuppressWarnings({"unchecked"})
long streamUpdateQuery(String qry, String schemaName, IgniteDataStreamer streamer, PreparedStatement stmt,
final Object[] args) throws IgniteCheckedException {
- GridRunningQueryInfo runningQryInfo = idx.runningQueryManager().register(qry,
- GridCacheQueryType.SQL_FIELDS, schemaName, true, null);
+ Long qryId = idx.runningQueryManager().register(qry, GridCacheQueryType.SQL_FIELDS, schemaName, true, null);
+
+ boolean fail = false;
try {
idx.checkStatementStreamable(stmt);
@@ -498,8 +498,13 @@ public class DmlStatementsProcessor {
return rows.size();
}
+ catch (IgniteCheckedException e) {
+ fail = true;
+
+ throw e;
+ }
finally {
- idx.runningQueryManager().unregister(runningQryInfo);
+ idx.runningQueryManager().unregister(qryId, fail);
}
}
@@ -1216,24 +1221,23 @@ public class DmlStatementsProcessor {
* @throws IgniteSQLException If failed.
*/
public FieldsQueryCursor<List<?>> runNativeDmlStatement(String schemaName, String sql, SqlCommand cmd) {
- GridRunningQueryInfo runningQryInfo = idx.runningQueryManager().register(sql,
- GridCacheQueryType.SQL_FIELDS, schemaName, true, null);
+ Long qryId = idx.runningQueryManager().register(sql, GridCacheQueryType.SQL_FIELDS, schemaName, true, null);
try {
if (cmd instanceof SqlBulkLoadCommand)
- return processBulkLoadCommand((SqlBulkLoadCommand)cmd, runningQryInfo.id());
+ return processBulkLoadCommand((SqlBulkLoadCommand)cmd, qryId);
else
throw new IgniteSQLException("Unsupported DML operation: " + sql,
IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
}
catch (IgniteSQLException e) {
- idx.runningQueryManager().unregister(runningQryInfo);
+ idx.runningQueryManager().unregister(qryId, true);
throw e;
}
catch (Exception e) {
- idx.runningQueryManager().unregister(runningQryInfo);
+ idx.runningQueryManager().unregister(qryId, true);
throw new IgniteSQLException("Unexpected DML operation failure: " + e.getMessage(), e);
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index abc24ad..a1b7ff9 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -89,6 +89,8 @@ import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.NestedTxMode;
import org.apache.ignite.internal.processors.query.QueryField;
+import org.apache.ignite.internal.processors.query.QueryHistoryMetrics;
+import org.apache.ignite.internal.processors.query.QueryHistoryMetricsKey;
import org.apache.ignite.internal.processors.query.QueryIndexDescriptorImpl;
import org.apache.ignite.internal.processors.query.RunningQueryManager;
import org.apache.ignite.internal.processors.query.SqlClientContext;
@@ -257,7 +259,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
private PartitionExtractor partExtractor;
/** */
- private final RunningQueryManager runningQueryMgr = new RunningQueryManager();
+ private RunningQueryManager runningQueryMgr;
/** */
private volatile GridBoundedConcurrentLinkedHashMap<H2TwoStepCachedQueryKey, H2TwoStepCachedQuery> twoStepCache =
@@ -465,14 +467,13 @@ public class IgniteH2Indexing implements GridQueryIndexing {
H2TableDescriptor tbl = schemaMgr.tableForType(schemaName, cacheName, typeName);
if (tbl != null && tbl.luceneIndex() != null) {
- GridRunningQueryInfo runningQryInfo = runningQueryManager().register(qry,
- TEXT, schemaName, true, null);
+ Long qryId = runningQueryManager().register(qry, TEXT, schemaName, true, null);
try {
return tbl.luceneIndex().query(qry.toUpperCase(), filters);
}
finally {
- runningQueryManager().unregister(runningQryInfo);
+ runningQueryManager().unregister(qryId, false);
}
}
@@ -1353,16 +1354,16 @@ public class IgniteH2Indexing implements GridQueryIndexing {
*/
private List<FieldsQueryCursor<List<?>>> queryDistributedSqlFieldsNative(String schemaName, SqlFieldsQuery qry,
SqlCommand cmd, @Nullable SqlClientContext cliCtx) {
- Long qryId = null;
+ boolean fail = false;
// Execute.
- try {
- if (cmd instanceof SqlBulkLoadCommand)
- return Collections.singletonList(dmlProc.runNativeDmlStatement(schemaName, qry.getSql(), cmd));
+ if (cmd instanceof SqlBulkLoadCommand)
+ return Collections.singletonList(dmlProc.runNativeDmlStatement(schemaName, qry.getSql(), cmd));
- //Always registry new running query for native commands except COPY. Currently such operations don't support cancellation.
- qryId = registerRunningQuery(schemaName, null, qry.getSql(), qry.isLocal(), true);
+ //Always registry new running query for native commands except COPY. Currently such operations don't support cancellation.
+ Long qryId = registerRunningQuery(schemaName, null, qry.getSql(), qry.isLocal(), true);
+ try {
if (cmd instanceof SqlCreateIndexCommand
|| cmd instanceof SqlDropIndexCommand
|| cmd instanceof SqlAlterTableCommand
@@ -1388,11 +1389,13 @@ public class IgniteH2Indexing implements GridQueryIndexing {
return Collections.singletonList(H2Utils.zeroCursor());
}
catch (IgniteCheckedException e) {
+ fail = true;
+
throw new IgniteSQLException("Failed to execute DDL statement [stmt=" + qry.getSql() +
", err=" + e.getMessage() + ']', e);
}
finally {
- runningQueryMgr.unregister(qryId);
+ runningQueryMgr.unregister(qryId, fail);
}
}
@@ -1658,6 +1661,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
if (!prepared.isQuery()) {
Long qryId = registerRunningQuery(schemaName, cancel, sqlQry, loc, registerAsNewQry);
+ boolean fail = false;
+
try {
if (DmlStatementsProcessor.isDmlStatement(prepared)) {
try {
@@ -1684,15 +1689,20 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
}
catch (IgniteCheckedException e) {
+ fail = true;
+
throw new IgniteSQLException("Failed to execute DML statement [stmt=" + sqlQry +
", params=" + Arrays.deepToString(qry.getArgs()) + "]", e);
}
}
if (DdlStatementsProcessor.isDdlStatement(prepared)) {
- if (loc)
+ if (loc) {
+ fail = true;
+
throw new IgniteSQLException("DDL statements are not supported for LOCAL caches",
IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
+ }
return Collections.singletonList(ddlProc.runDdlStatement(sqlQry, prepared));
}
@@ -1700,11 +1710,13 @@ public class IgniteH2Indexing implements GridQueryIndexing {
if (prepared instanceof NoOperation)
return Collections.singletonList(H2Utils.zeroCursor());
+ fail = true;
+
throw new IgniteSQLException("Unsupported DDL/DML operation: " + prepared.getClass().getName(),
IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
}
finally {
- runningQueryMgr.unregister(qryId);
+ runningQueryMgr.unregister(qryId, fail);
}
}
@@ -1729,7 +1741,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
return Collections.singletonList(queryLocalSqlFields(schemaName, qry, keepBinary, filter, cancel, qryId));
}
catch (IgniteCheckedException e) {
- runningQueryMgr.unregister(qryId);
+ runningQueryMgr.unregister(qryId, true);
throw new IgniteSQLException("Failed to execute local statement [stmt=" + sqlQry +
", params=" + Arrays.deepToString(qry.getArgs()) + "]", e);
@@ -1746,12 +1758,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
*/
private Long registerRunningQuery(String schemaName, GridQueryCancel cancel, String qry, boolean loc,
boolean registerAsNewQry) {
- if (registerAsNewQry) {
- GridRunningQueryInfo runningQryInfo = runningQueryMgr.register(qry,
- GridCacheQueryType.SQL_FIELDS, schemaName, loc, cancel);
-
- return runningQryInfo.id();
- }
+ if (registerAsNewQry)
+ return runningQueryMgr.register(qry, GridCacheQueryType.SQL_FIELDS, schemaName, loc, cancel);
return null;
}
@@ -2064,6 +2072,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
Long qryId = registerRunningQuery(schemaName, cancel, qry.getSql(), qry.isLocal(), registerAsNewQry);
boolean cursorCreated = false;
+ boolean failed = true;
try {
// When explicit partitions are set, there must be an owning cache they should be applied to.
@@ -2073,6 +2082,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
int parts[] = calculatePartitions(explicitParts, derivedParts, qry.getArgs());
if (parts != null && parts.length == 0) {
+ failed = false;
+
return new QueryCursorImpl<>(new Iterable<List<?>>() {
@Override public Iterator<List<?>> iterator() {
return new Iterator<List<?>>() {
@@ -2116,7 +2127,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
finally {
if (!cursorCreated)
- runningQueryMgr.unregister(qryId);
+ runningQueryMgr.unregister(qryId, failed);
}
}
@@ -2455,6 +2466,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
ddlProc = new DdlStatementsProcessor(ctx, schemaMgr);
partExtractor = new PartitionExtractor(this);
+ runningQueryMgr = new RunningQueryManager(ctx);
if (JdbcUtils.serializer != null)
U.warn(log, "Custom H2 serialization is already configured, will override.");
@@ -2721,6 +2733,22 @@ public class IgniteH2Indexing implements GridQueryIndexing {
return runningQueryMgr.longRunningQueries(duration);
}
+ /**
+ * Gets query history metrics.
+ *
+ * @return Queries history metrics.
+ */
+ public Map<QueryHistoryMetricsKey, QueryHistoryMetrics> queryHistoryMetrics() {
+ return runningQueryMgr.queryHistoryMetrics();
+ }
+
+ /**
+ * Reset query history metrics.
+ */
+ public void resetQueryHistoryMetrics() {
+ runningQueryMgr.resetQueryHistoryMetrics();
+ }
+
/** {@inheritDoc} */
@Override public void cancelQueries(Collection<Long> queries) {
if (!F.isEmpty(queries)) {
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/RunningQueriesTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/RunningQueriesTest.java
index a5d8e2c..c9c5704 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/RunningQueriesTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/RunningQueriesTest.java
@@ -197,13 +197,18 @@ public class RunningQueriesTest extends AbstractIndexingCommonTest {
}
/**
- * Check clenup running queries on node stop.
+ * Check cleanup running queries on node stop.
*
* @throws Exception Exception in case of failure.
*/
@Test
- public void tesctCloseRunningQueriesOnNodeStop() throws Exception {
- IgniteCache<Object, Object> cache = ignite.cache(DEFAULT_CACHE_NAME);
+ public void testCloseRunningQueriesOnNodeStop() throws Exception {
+ IgniteEx ign = startGrid(super.getConfiguration("TST"));
+
+ IgniteCache<Integer, Integer> cache = ign.getOrCreateCache(new CacheConfiguration<Integer, Integer>()
+ .setName("TST")
+ .setQueryEntities(Collections.singletonList(new QueryEntity(Integer.class, Integer.class)))
+ );
for (int i = 0; i < 10000; i++)
cache.put(i, i);
@@ -212,11 +217,11 @@ public class RunningQueriesTest extends AbstractIndexingCommonTest {
Assert.assertEquals("Should be one running query",
1,
- ignite.context().query().runningQueries(-1).size());
+ ign.context().query().runningQueries(-1).size());
- ignite.close();
+ ign.close();
- assertNoRunningQueries();
+ Assert.assertEquals(0, ign.context().query().runningQueries(-1).size());
}
/**
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/SqlQueryHistoryFromClientSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/SqlQueryHistoryFromClientSelfTest.java
new file mode 100644
index 0000000..35a5ab9
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/SqlQueryHistoryFromClientSelfTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.query;
+
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+
+/**
+ * Check query history metrics from client node.
+ */
+public class SqlQueryHistoryFromClientSelfTest extends SqlQueryHistorySelfTest {
+
+ private int idx;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ if (idx++ == 2)
+ cfg.setClientMode(true);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteEx queryNode() {
+ IgniteEx node = grid(2);
+
+ assertTrue(node.context().clientNode());
+
+ return node;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void startTestGrid() throws Exception {
+ startGrids(3);
+ }
+
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/SqlQueryHistorySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/SqlQueryHistorySelfTest.java
new file mode 100644
index 0000000..8b78ef9
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/SqlQueryHistorySelfTest.java
@@ -0,0 +1,644 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.query;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.query.Query;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.SqlQuery;
+import org.apache.ignite.cache.query.TextQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ * Check query history metrics from server node.
+ */
+@RunWith(JUnit4.class)
+public class SqlQueryHistorySelfTest extends GridCommonAbstractTest {
+ /** */
+ private static final int QUERY_HISTORY_SIZE = 3;
+
+ /** */
+ private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ startTestGrid();
+
+ IgniteCache<Integer, String> cacheA = grid(0).cache("A");
+ IgniteCache<Integer, String> cacheB = grid(0).cache("B");
+
+ for (int i = 0; i < 100; i++) {
+ cacheA.put(i, String.valueOf(i));
+ cacheB.put(i, String.valueOf(i));
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ super.afterTestsStopped();
+
+ stopAllGrids();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ ((IgniteH2Indexing)queryNode().context().query().getIndexing()).resetQueryHistoryMetrics();
+ }
+
+ /**
+ * @param cacheName Cache name.
+ * @return Cache configuration.
+ */
+ private CacheConfiguration<Integer, String> configureCache(String cacheName) {
+ CacheConfiguration<Integer, String> ccfg = defaultCacheConfiguration();
+
+ ccfg.setName(cacheName);
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+ ccfg.setIndexedTypes(Integer.class, String.class);
+ ccfg.setSqlFunctionClasses(Functions.class);
+
+ return ccfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+ disco.setIpFinder(ipFinder);
+
+ cfg.setDiscoverySpi(disco);
+
+ cfg.setCacheConfiguration(configureCache("A"), configureCache("B"));
+
+ cfg.setSqlQueryHistorySize(QUERY_HISTORY_SIZE);
+
+ return cfg;
+ }
+
+ /**
+ * Test metrics for JDBC.
+ *
+ * @throws Exception In case of error.
+ */
+ @Test
+ public void testJdbcSelectQueryHistory() throws Exception {
+ String qry = "select * from A.String";
+ checkQueryMetrics(qry);
+ }
+
+ /**
+ * Test metrics for JDBC in case not fully resultset is not fully read.
+ *
+ * @throws Exception In case of error.
+ */
+ @Test
+ public void testJdbcSelectNotFullyFetchedQueryHistory() throws Exception {
+ String qry = "select * from A.String";
+
+ try (Connection conn = GridTestUtils.connect(queryNode(), null); Statement stmt = conn.createStatement()) {
+ stmt.setFetchSize(1);
+
+ ResultSet rs = stmt.executeQuery(qry);
+
+ assertTrue(rs.next());
+
+ checkMetrics(0, 0, 0, 0, true);
+ }
+ }
+
+ /**
+ * Test metrics for failed SQL queries.
+ */
+ @Test
+ public void testJdbcQueryHistoryFailed() {
+ try (Connection conn = GridTestUtils.connect(queryNode(), null); Statement stmt = conn.createStatement()) {
+ stmt.executeQuery("select * from A.String where A.fail()=1");
+
+ fail("Query should be failed.");
+ }
+ catch (Exception ignore) {
+ //No-Op
+ }
+
+ checkMetrics(1, 0, 1, 1, true);
+ }
+
+ /**
+ * Test metrics for JDBC in case of DDL and DML
+ *
+ * @throws Exception In case of error.
+ */
+ @Test
+ public void testJdbcQueryHistoryForDmlAndDdl() throws Exception {
+ List<String> cmds = Arrays.asList(
+ "create table TST(id int PRIMARY KEY, name varchar)",
+ "insert into TST(id) values(1)",
+ "commit"
+ );
+
+ try (Connection conn = GridTestUtils.connect(queryNode(), null); Statement stmt = conn.createStatement()) {
+ for (String cmd : cmds)
+ stmt.execute(cmd);
+ }
+
+ checkSeriesCommand(cmds);
+ }
+
+ /**
+ * @param cmds List of SQL commands.
+ * @throws IgniteInterruptedCheckedException In case of failure.
+ */
+ private void checkSeriesCommand(List<String> cmds) throws IgniteInterruptedCheckedException {
+ waitingFor("size", QUERY_HISTORY_SIZE);
+
+ for (int i = 0; i < QUERY_HISTORY_SIZE; i++)
+ checkMetrics(QUERY_HISTORY_SIZE, i, 1, 0, false);
+
+ // Check that collected metrics contains correct items: metrics for last N queries.
+
+ Collection<QueryHistoryMetrics> metrics = ((IgniteH2Indexing)queryNode().context().query().getIndexing()).queryHistoryMetrics().values();
+
+ assertEquals(QUERY_HISTORY_SIZE, metrics.size());
+
+ Set<String> qries = metrics.stream().map(QueryHistoryMetrics::query).collect(Collectors.toSet());
+
+ for (int i = 0; i < cmds.size(); i++)
+ assertTrue(qries.contains(cmds.get(QUERY_HISTORY_SIZE - 1 - i)));
+ }
+
+ /**
+ * Test metrics for SQL fields queries.
+ */
+ @Test
+ public void testSqlFieldsQueryHistory() {
+ SqlFieldsQuery qry = new SqlFieldsQuery("select * from String");
+
+ checkQueryMetrics(qry);
+ }
+
+ /**
+ * Test metrics for SQL fields queries.
+ *
+ * @throws Exception In case of error.
+ */
+ @Test
+ public void testSqlFieldsQueryHistoryNotFullyFetched() throws Exception {
+ SqlFieldsQuery qry = new SqlFieldsQuery("select * from String");
+ qry.setPageSize(10);
+
+ checkQueryNotFullyFetchedMetrics(qry, false);
+ }
+
+ /**
+ * Test metrics for failed SQL queries.
+ */
+ @Test
+ public void testSqlFieldsQueryHistoryFailed() {
+ SqlFieldsQuery qry = new SqlFieldsQuery("select * from String where fail()=1");
+
+ checkQueryFailedMetrics(qry);
+ }
+
+ /**
+ * Test metrics eviction.
+ *
+ * @throws Exception In case of error.
+ */
+ @Test
+ public void testQueryHistoryForDmlAndDdl() throws Exception {
+ IgniteCache<Integer, String> cache = queryNode().context().cache().jcache("A");
+
+ List<String> cmds = Arrays.asList(
+ "create table TST(id int PRIMARY KEY, name varchar)",
+ "insert into TST(id) values(1)",
+ "commit"
+ );
+
+ cmds.forEach((cmd) ->
+ cache.query(new SqlFieldsQuery(cmd)).getAll()
+ );
+
+ checkSeriesCommand(cmds);
+ }
+
+ /**
+ * Test metrics eviction.
+ *
+ * @throws Exception In case of error.
+ */
+ @Test
+ public void testQueryHistoryEviction() throws Exception {
+ IgniteCache<Integer, String> cache = queryNode().context().cache().jcache("A");
+
+ cache.query(new SqlFieldsQuery("select * from String")).getAll();
+
+ cache.query(new SqlFieldsQuery("select count(*) from String")).getAll();
+
+ cache.query(new SqlFieldsQuery("select * from String limit 1")).getAll();
+
+ cache.query(new SqlFieldsQuery("select * from String limit 2")).getAll();
+
+ cache.query(new SqlQuery<>("String", "from String")).getAll();
+
+ waitingFor("size", QUERY_HISTORY_SIZE);
+
+ for (int i = 0; i < QUERY_HISTORY_SIZE; i++)
+ checkMetrics(QUERY_HISTORY_SIZE, i, 1, 0, false);
+
+ // Check that collected metrics contains correct items: metrics for last N queries.
+ Collection<QueryHistoryMetrics> metrics = ((IgniteH2Indexing)queryNode().context().query().getIndexing()).queryHistoryMetrics().values();
+
+ assertEquals(QUERY_HISTORY_SIZE, metrics.size());
+
+ Set<String> qries = metrics.stream().map(QueryHistoryMetrics::query).collect(Collectors.toSet());
+
+ assertTrue(qries.contains("SELECT \"A\".\"STRING\"._KEY, \"A\".\"STRING\"._VAL from String"));
+ assertTrue(qries.contains("select * from String limit 2"));
+ assertTrue(qries.contains("select * from String limit 1"));
+ }
+
+ /**
+ * Test metrics if queries executed from several threads.
+ *
+ * @throws Exception In case of error.
+ */
+ @Test
+ public void testQueryHistoryMultithreaded() throws Exception {
+ IgniteCache<Integer, String> cache = queryNode().context().cache().jcache("A");
+
+ Collection<Worker> workers = new ArrayList<>();
+
+ int repeat = 10;
+
+ for (int k = 0; k < repeat; k++) {
+ // Execute as match queries as history size to avoid eviction.
+ for (int i = 1; i <= QUERY_HISTORY_SIZE; i++)
+ workers.add(new Worker(cache, new SqlFieldsQuery("select * from String limit " + i)));
+ }
+
+ for (Worker worker : workers)
+ worker.start();
+
+ for (Worker worker : workers)
+ worker.join();
+
+ for (int i = 0; i < QUERY_HISTORY_SIZE; i++)
+ checkMetrics(QUERY_HISTORY_SIZE, i, repeat, 0, false);
+ }
+
+ /**
+ * Test metrics for Scan queries.
+ */
+ @Test
+ public void testScanQueryHistory() {
+ ScanQuery<Integer, String> qry = new ScanQuery<>();
+
+ checkNoQueryMetrics(qry);
+ }
+
+ /**
+ * Test metrics for Scan queries.
+ */
+ @Test
+ public void testSqlQueryHistory() {
+ SqlQuery<Integer, String> qry = new SqlQuery<>("String", "from String");
+
+ checkQueryMetrics(qry);
+ }
+
+ /**
+ * Test metrics for Scan queries.
+ *
+ * @throws Exception In case of error.
+ */
+ @Test
+ public void testSqlQueryHistoryNotFullyFetched() throws Exception {
+ SqlQuery<Integer, String> qry = new SqlQuery<>("String", "from String");
+ qry.setPageSize(10);
+
+ checkQueryNotFullyFetchedMetrics(qry, true);
+ }
+
+ /**
+ * Test metrics for Sql queries.
+ */
+ @Test
+ public void testTextQueryMetrics() {
+ TextQuery qry = new TextQuery<>("String", "1");
+
+ checkNoQueryMetrics(qry);
+ }
+
+ /**
+ * Test metrics for Sql queries.
+ *
+ * @throws Exception In case of error.
+ */
+ @Test
+ public void testTextQueryHistoryNotFullyFetched() throws Exception {
+ TextQuery qry = new TextQuery<>("String", "1");
+ qry.setPageSize(10);
+
+ checkQueryNotFullyFetchedMetrics(qry, true);
+ }
+
+ /**
+ * Test metrics for SQL cross cache queries.
+ */
+ @Test
+ public void testSqlFieldsCrossCacheQueryHistory() {
+ SqlFieldsQuery qry = new SqlFieldsQuery("select * from \"B\".String");
+
+ checkQueryMetrics(qry);
+ }
+
+ /**
+ * Test metrics for SQL cross cache queries.
+ *
+ * @throws Exception In case of error.
+ */
+ @Test
+ public void testSqlFieldsQueryHistoryCrossCacheQueryNotFullyFetched() throws Exception {
+ SqlFieldsQuery qry = new SqlFieldsQuery("select * from \"B\".String");
+ qry.setPageSize(10);
+
+ checkQueryNotFullyFetchedMetrics(qry, false);
+ }
+
+ /**
+ * Check metrics.
+ *
+ * @param sz Expected size of metrics.
+ * @param idx Index of metrics to check.
+ * @param execs Expected number of executions.
+ * @param failures Expected number of failures.
+ * @param first {@code true} if metrics checked for first query only.
+ */
+ private void checkMetrics(int sz, int idx, int execs, int failures,
+ boolean first) {
+
+ Collection<QueryHistoryMetrics> metrics = ((IgniteH2Indexing)queryNode().context().query().getIndexing()).queryHistoryMetrics().values();
+
+ assertNotNull(metrics);
+ assertEquals(sz, metrics.size());
+
+ if (sz == 0)
+ return;
+
+ QueryHistoryMetrics m = new ArrayList<>(metrics).get(idx);
+
+ info("Metrics: " + m);
+
+ assertEquals("Executions", execs, m.executions());
+ assertEquals("Failures", failures, m.failures());
+ assertTrue(m.maximumTime() >= 0);
+ assertTrue(m.minimumTime() >= 0);
+ assertTrue(m.lastStartTime() > 0 && m.lastStartTime() <= System.currentTimeMillis());
+
+ if (first)
+ assertEquals("On first execution minTime == maxTime", m.minimumTime(), m.maximumTime());
+ }
+
+ /**
+ * @param qry Query.
+ * @throws SQLException In case of failure.
+ */
+ private void checkQueryMetrics(String qry) throws SQLException {
+
+ runJdbcQuery(qry);
+
+ checkMetrics(1, 0, 1, 0, true);
+
+ // Execute again with the same parameters.
+ runJdbcQuery(qry);
+
+ checkMetrics(1, 0, 2, 0, false);
+ }
+
+ /**
+ * @param qry SQL query.
+ * @throws SQLException In case of failure.
+ */
+ private void runJdbcQuery(String qry) throws SQLException {
+ try (Connection conn = GridTestUtils.connect(queryNode(), null); Statement stmt = conn.createStatement()) {
+ stmt.execute(qry);
+ }
+ }
+
+ /**
+ * @param qry Query.
+ */
+ private void checkQueryMetrics(Query qry) {
+ IgniteCache<Integer, String> cache = queryNode().context().cache().jcache("A");
+
+ // Execute query.
+ cache.query(qry).getAll();
+
+ checkMetrics(1, 0, 1, 0, true);
+
+ // Execute again with the same parameters.
+ cache.query(qry).getAll();
+
+ checkMetrics(1, 0, 2, 0, false);
+ }
+
+ /**
+ * @param qry Query.
+ */
+ private void checkNoQueryMetrics(Query qry) {
+ IgniteCache<Integer, String> cache = queryNode().context().cache().jcache("A");
+
+ // Execute query.
+ cache.query(qry).getAll();
+
+ checkMetrics(0, 0, 0, 0, true);
+
+ // Execute again with the same parameters.
+ cache.query(qry).getAll();
+
+ checkMetrics(0, 0, 0, 0, false);
+ }
+
+ /**
+ * @param qry Query.
+ * @param waitingForCompletion Waiting for query completion.
+ */
+ private void checkQueryNotFullyFetchedMetrics(Query qry, boolean waitingForCompletion)
+ throws IgniteInterruptedCheckedException {
+ IgniteCache<Integer, String> cache = queryNode().context().cache().jcache("A");
+
+ // Execute query.
+ cache.query(qry).iterator().next();
+
+ if (waitingForCompletion)
+ waitingFor("executions", 1);
+
+ checkMetrics(0, 0, 0, 0, true);
+
+ // Execute again with the same parameters.
+ cache.query(qry).iterator().next();
+
+ if (waitingForCompletion)
+ waitingFor("executions", 2);
+
+ checkMetrics(0, 0, 0, 0, false);
+ }
+
+ /**
+ * @param qry Query.
+ */
+ private void checkQueryFailedMetrics(Query qry) {
+ IgniteCache<Integer, String> cache = queryNode().context().cache().jcache("A");
+
+ try {
+ // Execute invalid query.
+ cache.query(qry).getAll();
+ }
+ catch (Exception ignored) {
+ // No-op.
+ }
+
+ checkMetrics(1, 0, 1, 1, true);
+
+ try {
+ // Execute invalid query again with the same parameters.
+ cache.query(qry).getAll();
+ }
+ catch (Exception ignored) {
+ // No-op.
+ }
+
+ checkMetrics(1, 0, 2, 2, true);
+ }
+
+ /**
+ * @param cond Condition to check.
+ * @param exp Expected value.
+ */
+ private void waitingFor(final String cond, final int exp) throws IgniteInterruptedCheckedException {
+ GridTestUtils.waitForCondition(() -> {
+ Collection<QueryHistoryMetrics> metrics = ((IgniteH2Indexing)queryNode().context().query().getIndexing()).queryHistoryMetrics().values();
+
+ switch (cond) {
+ case "size":
+ return metrics.size() == exp;
+
+ case "executions":
+ int executions = 0;
+
+ for (QueryHistoryMetrics m : metrics)
+ executions += m.executions();
+
+ return executions == exp;
+
+ default:
+ return true;
+ }
+ }, 2000);
+ }
+
+ /**
+ * @return Ignite instance for quering.
+ */
+ protected IgniteEx queryNode() {
+ IgniteEx node = grid(0);
+
+ assertFalse(node.context().clientNode());
+
+ return node;
+ }
+
+ /**
+ * @throws Exception In case of failure.
+ */
+ protected void startTestGrid() throws Exception {
+ startGrids(2);
+ }
+
+ /**
+ *
+ */
+ public static class Functions {
+ /**
+ *
+ */
+ @QuerySqlFunction
+ public static int fail() {
+ throw new IgniteSQLException("SQL function fail for test purpuses");
+ }
+ }
+
+ /**
+ *
+ */
+ private static class Worker extends Thread {
+ /**
+ *
+ */
+ private final IgniteCache cache;
+
+ /**
+ *
+ */
+ private final Query qry;
+
+ /**
+ *
+ */
+ Worker(IgniteCache cache, Query qry) {
+ this.cache = cache;
+ this.qry = qry;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run() {
+ cache.query(qry).getAll();
+ }
+ }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
index 9a40309..0584c8d 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
@@ -197,6 +197,8 @@ import org.apache.ignite.internal.processors.query.RunningQueriesTest;
import org.apache.ignite.internal.processors.query.SqlIllegalSchemaSelfTest;
import org.apache.ignite.internal.processors.query.SqlNestedQuerySelfTest;
import org.apache.ignite.internal.processors.query.SqlPushDownFunctionTest;
+import org.apache.ignite.internal.processors.query.SqlQueryHistoryFromClientSelfTest;
+import org.apache.ignite.internal.processors.query.SqlQueryHistorySelfTest;
import org.apache.ignite.internal.processors.query.SqlSchemaSelfTest;
import org.apache.ignite.internal.processors.query.SqlSystemViewsSelfTest;
import org.apache.ignite.internal.processors.query.h2.GridH2IndexingInMemSelfTest;
@@ -536,6 +538,10 @@ import org.junit.runners.Suite;
GridCacheDynamicLoadOnClientTest.class,
GridCacheDynamicLoadOnClientPersistentTest.class,
+
+ //Query history.
+ SqlQueryHistorySelfTest.class,
+ SqlQueryHistoryFromClientSelfTest.class,
})
public class IgniteBinaryCacheQueryTestSuite {
}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/full-config.xml b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/full-config.xml
index 4aa5190..6438fdd 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/full-config.xml
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/full-config.xml
@@ -23,7 +23,8 @@
workDirectory='c:' JvmMaxMemoryMb='1024' MetricsLogFrequency='0:0:10' isDaemon='true'
isLateAffinityAssignment='false' springConfigUrl='c:\myconfig.xml' autoGenerateIgniteInstanceName='true'
peerAssemblyLoadingMode='CurrentAppDomain' longQueryWarningTimeout='1:2:3' isActiveOnStart='false'
- consistentId='someId012' redirectJavaConsoleOutput='false' authenticationEnabled='true' mvccVacuumFrequency='10000' mvccVacuumThreadCount='4'>
+ consistentId='someId012' redirectJavaConsoleOutput='false' authenticationEnabled='true' mvccVacuumFrequency='10000' mvccVacuumThreadCount='4'
+ sqlQueryHistorySize='123'>
<localhost>127.1.1.1</localhost>
<binaryConfiguration compactFooter='false' keepDeserialized='true'>
<nameMapper type='Apache.Ignite.Core.Tests.IgniteConfigurationSerializerTest+NameMapper' bar='testBar' />
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationSerializerTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationSerializerTest.cs
index ec9d4fd..f919718 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationSerializerTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationSerializerTest.cs
@@ -102,6 +102,7 @@ namespace Apache.Ignite.Core.Tests
Assert.AreEqual(10000, cfg.MvccVacuumFrequency);
Assert.AreEqual(4, cfg.MvccVacuumThreadCount);
+ Assert.AreEqual(123, cfg.SqlQueryHistorySize);
Assert.IsNotNull(cfg.SqlSchemas);
Assert.AreEqual(2, cfg.SqlSchemas.Count);
@@ -1020,11 +1021,12 @@ namespace Apache.Ignite.Core.Tests
}
},
SslContextFactory = new SslContextFactory(),
- FailureHandler = new StopNodeOrHaltFailureHandler()
+ FailureHandler = new StopNodeOrHaltFailureHandler
{
TryStop = false,
Timeout = TimeSpan.FromSeconds(10)
- }
+ },
+ SqlQueryHistorySize = 345
};
}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationTest.cs
index 2665c25..ae33a42 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationTest.cs
@@ -261,6 +261,7 @@ namespace Apache.Ignite.Core.Tests
Assert.AreEqual(cfg.MvccVacuumFrequency, resCfg.MvccVacuumFrequency);
Assert.AreEqual(cfg.MvccVacuumThreadCount, resCfg.MvccVacuumThreadCount);
+ Assert.AreEqual(cfg.SqlQueryHistorySize, resCfg.SqlQueryHistorySize);
Assert.AreEqual(cfg.InitBaselineAutoAdjustEnabled, resCfg.InitBaselineAutoAdjustEnabled);
Assert.AreEqual(cfg.InitBaselineAutoAdjustTimeout, resCfg.InitBaselineAutoAdjustTimeout);
@@ -879,6 +880,7 @@ namespace Apache.Ignite.Core.Tests
AuthenticationEnabled = false,
MvccVacuumFrequency = 20000,
MvccVacuumThreadCount = 8,
+ SqlQueryHistorySize = 99,
SqlSchemas = new List<string> { "SCHEMA_3", "schema_4" }
};
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfiguration.cs b/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfiguration.cs
index 987fc21..f293267 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfiguration.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfiguration.cs
@@ -225,6 +225,9 @@ namespace Apache.Ignite.Core
/** Initial value of time which we would wait from the first discovery event in the chain(node join/exit). */
private long? _initBaselineAutoAdjustMaxTimeout;
+ /** SQL query history size. */
+ private int? _sqlQueryHistorySize;
+
/// <summary>
/// Default network retry count.
/// </summary>
@@ -276,6 +279,11 @@ namespace Apache.Ignite.Core
public const long DefaultInitBaselineAutoAdjustMaxTimeout = 0;
/// <summary>
+ /// Default value for <see cref="SqlQueryHistorySize"/> property.
+ /// </summary>
+ public const int DefaultSqlQueryHistorySize = 1000;
+
+ /// <summary>
/// Initializes a new instance of the <see cref="IgniteConfiguration"/> class.
/// </summary>
public IgniteConfiguration()
@@ -360,6 +368,7 @@ namespace Apache.Ignite.Core
writer.WriteBooleanNullable(_initBaselineAutoAdjustEnabled);
writer.WriteLongNullable(_initBaselineAutoAdjustTimeout);
writer.WriteLongNullable(_initBaselineAutoAdjustMaxTimeout);
+ writer.WriteIntNullable(_sqlQueryHistorySize);
if (SqlSchemas == null)
writer.WriteInt(-1);
@@ -413,7 +422,7 @@ namespace Apache.Ignite.Core
writer.WriteBoolean(true);
var keystoreEnc = enc as KeystoreEncryptionSpi;
-
+
if (keystoreEnc == null)
throw new InvalidOperationException("Unsupported encryption SPI: " + enc.GetType());
@@ -752,6 +761,7 @@ namespace Apache.Ignite.Core
_initBaselineAutoAdjustEnabled = r.ReadBooleanNullable();
_initBaselineAutoAdjustTimeout = r.ReadLongNullable();
_initBaselineAutoAdjustMaxTimeout = r.ReadLongNullable();
+ _sqlQueryHistorySize = r.ReadIntNullable();
int sqlSchemasCnt = r.ReadInt();
@@ -784,7 +794,7 @@ namespace Apache.Ignite.Core
// Discovery config
DiscoverySpi = r.ReadBoolean() ? new TcpDiscoverySpi(r) : null;
- EncryptionSpi = (srvVer.CompareTo(ClientSocket.Ver120) >= 0 && r.ReadBoolean()) ?
+ EncryptionSpi = (srvVer.CompareTo(ClientSocket.Ver120) >= 0 && r.ReadBoolean()) ?
new KeystoreEncryptionSpi(r) : null;
// Communication config
@@ -1115,7 +1125,7 @@ namespace Apache.Ignite.Core
/// Null for default communication.
/// </summary>
public ICommunicationSpi CommunicationSpi { get; set; }
-
+
/// <summary>
/// Gets or sets the encryption service provider.
/// Null for disabled encryption.
@@ -1665,6 +1675,17 @@ namespace Apache.Ignite.Core
}
/// <summary>
+ /// Gets or sets the value indicating the number of SQL query history elements to keep in memory.
+ /// Zero or negative value disables the history.
+ /// </summary>
+ [DefaultValue(DefaultSqlQueryHistorySize)]
+ public int SqlQueryHistorySize
+ {
+ get { return _sqlQueryHistorySize ?? DefaultSqlQueryHistorySize; }
+ set { _sqlQueryHistorySize = value; }
+ }
+
+ /// <summary>
/// Gets or sets predefined failure handlers implementation.
/// A failure handler handles critical failures of Ignite instance accordingly:
/// <para><see cref="NoOpFailureHandler"/> -- do nothing.</para>
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd b/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd
index efde394..b023c3b 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd
@@ -2321,12 +2321,16 @@
<xs:documentation>Time interval between MVCC vacuum runs in milliseconds.</xs:documentation>
</xs:annotation>
</xs:attribute>
-
<xs:attribute name="mvccVacuumThreadCount" type="xs:int">
<xs:annotation>
<xs:documentation>Number of MVCC vacuum threads.</xs:documentation>
</xs:annotation>
</xs:attribute>
+ <xs:attribute name="sqlQueryHistorySize" type="xs:int">
+ <xs:annotation>
+ <xs:documentation>Number of SQL query history elements to keep in memory.</xs:documentation>
+ </xs:annotation>
+ </xs:attribute>
<xs:attribute name="clientConnectorConfigurationEnabled" type="xs:boolean">
<xs:annotation>
<xs:documentation>Whether client connector should be enabled (allow thin clients, ODBC and JDBC drivers to work with Ignite).</xs:documentation>