You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ko...@apache.org on 2021/11/12 07:45:47 UTC
[ignite] branch sql-calcite updated: IGNITE-15289 Global statistics
collection (#9392)
This is an automated email from the ASF dual-hosted git repository.
korlov pushed a commit to branch sql-calcite
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/sql-calcite by this push:
new e25c7db IGNITE-15289 Global statistics collection (#9392)
e25c7db is described below
commit e25c7db727a13f1076b0f9a26330fbd5b2a0e9f4
Author: Berkof <sa...@mail.ru>
AuthorDate: Fri Nov 12 14:44:15 2021 +0700
IGNITE-15289 Global statistics collection (#9392)
---
.../query/calcite/CalciteQueryProcessor.java | 2 +-
.../integration/TableDdlIntegrationTest.java | 1 -
.../query/calcite/planner/AbstractPlannerTest.java | 15 +-
.../ignite/testsuites/IntegrationTestSuite.java | 2 +
.../SystemViewRowAttributeWalkerGenerator.java | 2 +
.../java/org/apache/ignite/internal/GridTopic.java | 5 +-
.../cache/persistence/metastorage/MetaStorage.java | 46 +-
.../query/stat/IgniteStatisticsManager.java | 8 +
.../processors/query/stat/StatisticsKey.java | 12 +-
.../stat/config/StatisticsColumnConfiguration.java | 24 +-
.../stat/config/StatisticsObjectConfiguration.java | 2 +-
.../StatisticsColumnGlobalDataViewWalker.java | 90 ++
.../h2/twostep/msg/GridH2ValueMessageFactory.java | 11 +
.../query/stat/IgniteGlobalStatisticsManager.java | 1021 ++++++++++++++++++++
.../stat/IgniteStatisticsConfigurationManager.java | 53 +-
.../query/stat/IgniteStatisticsHelper.java | 60 ++
.../query/stat/IgniteStatisticsManagerImpl.java | 104 +-
.../query/stat/IgniteStatisticsRepository.java | 86 +-
.../query/stat/ObjectStatisticsEvent.java | 75 ++
.../query/stat/ObjectStatisticsImpl.java | 16 +
...csType.java => StatisticsAddressedRequest.java} | 44 +-
.../processors/query/stat/StatisticsProcessor.java | 4 +-
.../processors/query/stat/StatisticsType.java | 5 +-
.../processors/query/stat/StatisticsUtils.java | 63 +-
.../query/stat/messages/StatisticsKeyMessage.java | 6 +
.../query/stat/messages/StatisticsRequest.java | 243 +++++
...ticsKeyMessage.java => StatisticsResponse.java} | 107 +-
.../stat/view/ColumnLocalDataViewSupplier.java | 4 +-
...ew.java => StatisticsColumnGlobalDataView.java} | 7 +-
.../stat/view/StatisticsColumnLocalDataView.java | 2 +-
.../stat/IgniteStatisticsRepositoryStaticTest.java | 84 ++
.../query/stat/IgniteStatisticsRepositoryTest.java | 13 +-
...cValueDistributionTableStatisticsUsageTest.java | 2 +-
.../PSUCompositeIndexTableStatisticsUsageTest.java | 10 +-
.../stat/PSUStatisticPartialGatheringTest.java | 4 +-
.../query/stat/PSUStatisticsStorageTest.java | 12 +-
...UValueDistributionTableStatisticsUsageTest.java | 2 +-
.../query/stat/SqlStatisticsCommandTests.java | 5 +-
.../query/stat/StatisticsAbstractTest.java | 274 +++++-
.../processors/query/stat/StatisticsClearTest.java | 2 +-
.../query/stat/StatisticsConfigurationTest.java | 22 +-
.../query/stat/StatisticsGatheringTest.java | 70 +-
....java => StatisticsGlobalViewInMemoryTest.java} | 15 +-
...va => StatisticsGlobalViewPersistenceTest.java} | 15 +-
.../query/stat/StatisticsGlobalViewTest.java | 160 +++
.../query/stat/StatisticsObsolescenceTest.java | 2 +-
.../query/stat/StatisticsRestartAbstractTest.java | 5 +-
.../query/stat/StatisticsStorageTest.java | 10 +-
.../query/stat/StatisticsTypesAbstractTest.java | 2 +-
.../processors/query/stat/StatisticsUtilsTest.java | 158 +++
.../query/stat/StatisticsViewsInMemoryTest.java | 11 +-
.../query/stat/StatisticsViewsPersistenceTest.java | 11 +-
.../processors/query/stat/StatisticsViewsTest.java | 141 ++-
.../testsuites/IgniteStatisticsTestSuite.java | 10 +-
54 files changed, 2723 insertions(+), 437 deletions(-)
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
index 28746f2..508d50e 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
@@ -71,9 +71,9 @@ import org.apache.ignite.internal.processors.query.calcite.metadata.MappingServi
import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCostFactory;
import org.apache.ignite.internal.processors.query.calcite.prepare.CacheKey;
import org.apache.ignite.internal.processors.query.calcite.prepare.IgniteConvertletTable;
+import org.apache.ignite.internal.processors.query.calcite.prepare.IgniteTypeCoercion;
import org.apache.ignite.internal.processors.query.calcite.prepare.PrepareServiceImpl;
import org.apache.ignite.internal.processors.query.calcite.prepare.QueryPlan;
-import org.apache.ignite.internal.processors.query.calcite.prepare.IgniteTypeCoercion;
import org.apache.ignite.internal.processors.query.calcite.prepare.QueryPlanCache;
import org.apache.ignite.internal.processors.query.calcite.prepare.QueryPlanCacheImpl;
import org.apache.ignite.internal.processors.query.calcite.schema.SchemaHolder;
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/TableDdlIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/TableDdlIntegrationTest.java
index 57ac900..da8f83f 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/TableDdlIntegrationTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/TableDdlIntegrationTest.java
@@ -43,7 +43,6 @@ import org.hamcrest.Matcher;
import org.junit.Test;
import static org.apache.ignite.internal.util.IgniteUtils.map;
-import static org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause;
import static org.apache.ignite.testframework.GridTestUtils.hasSize;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.hasItem;
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
index c3499c9..8ffaa06 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
@@ -363,13 +363,14 @@ public abstract class AbstractPlannerTest extends GridCommonAbstractTest {
clearTraits(expected);
clearTraits(deserialized);
- if (!expected.deepEquals(deserialized))
- assertTrue(
- "Invalid serialization / deserialization.\n" +
- "Expected:\n" + RelOptUtil.toString(expected) +
- "Deserialized:\n" + RelOptUtil.toString(deserialized),
- expected.deepEquals(deserialized)
- );
+ if (!expected.deepEquals(deserialized)) {
+ assertTrue(
+ "Invalid serialization / deserialization.\n" +
+ "Expected:\n" + RelOptUtil.toString(expected) +
+ "Deserialized:\n" + RelOptUtil.toString(deserialized),
+ expected.deepEquals(deserialized)
+ );
+ }
}
}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
index 50d2979..0be1471 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
@@ -34,6 +34,7 @@ import org.apache.ignite.internal.processors.query.calcite.integration.IntervalT
import org.apache.ignite.internal.processors.query.calcite.integration.JoinIntegrationTest;
import org.apache.ignite.internal.processors.query.calcite.integration.KillCommandDdlIntegrationTest;
import org.apache.ignite.internal.processors.query.calcite.integration.MetadataIntegrationTest;
+import org.apache.ignite.internal.processors.query.calcite.integration.RunningQueriesIntegrationTest;
import org.apache.ignite.internal.processors.query.calcite.integration.ServerStatisticsIntegrationTest;
import org.apache.ignite.internal.processors.query.calcite.integration.SetOpIntegrationTest;
import org.apache.ignite.internal.processors.query.calcite.integration.SortAggregateIntegrationTest;
@@ -64,6 +65,7 @@ import org.junit.runners.Suite;
SqlFieldsQueryUsageTest.class,
AggregatesIntegrationTest.class,
MetadataIntegrationTest.class,
+ RunningQueriesIntegrationTest.class,
SortAggregateIntegrationTest.class,
TableDdlIntegrationTest.class,
IndexDdlIntegrationTest.class,
diff --git a/modules/codegen/src/main/java/org/apache/ignite/codegen/SystemViewRowAttributeWalkerGenerator.java b/modules/codegen/src/main/java/org/apache/ignite/codegen/SystemViewRowAttributeWalkerGenerator.java
index 6beb0fa..70f88c5 100644
--- a/modules/codegen/src/main/java/org/apache/ignite/codegen/SystemViewRowAttributeWalkerGenerator.java
+++ b/modules/codegen/src/main/java/org/apache/ignite/codegen/SystemViewRowAttributeWalkerGenerator.java
@@ -36,6 +36,7 @@ import org.apache.ignite.internal.managers.systemview.SystemViewMBean;
import org.apache.ignite.internal.managers.systemview.walker.Filtrable;
import org.apache.ignite.internal.managers.systemview.walker.Order;
import org.apache.ignite.internal.processors.query.stat.view.StatisticsColumnConfigurationView;
+import org.apache.ignite.internal.processors.query.stat.view.StatisticsColumnGlobalDataView;
import org.apache.ignite.internal.processors.query.stat.view.StatisticsColumnLocalDataView;
import org.apache.ignite.internal.processors.query.stat.view.StatisticsColumnPartitionDataView;
import org.apache.ignite.internal.util.typedef.F;
@@ -150,6 +151,7 @@ public class SystemViewRowAttributeWalkerGenerator {
gen.generateAndWrite(StatisticsColumnConfigurationView.class, INDEXING_SRC_DIR);
gen.generateAndWrite(StatisticsColumnLocalDataView.class, INDEXING_SRC_DIR);
+ gen.generateAndWrite(StatisticsColumnGlobalDataView.class, INDEXING_SRC_DIR);
gen.generateAndWrite(StatisticsColumnPartitionDataView.class, INDEXING_SRC_DIR);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
index 0c1be03..7cc43e3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
@@ -155,7 +155,10 @@ public enum GridTopic {
TOPIC_DISTRIBUTED_PROCESS,
/** */
- TOPIC_COMM_SYSTEM;
+ TOPIC_COMM_SYSTEM,
+
+ /** Statistics related messages topic. */
+ TOPIC_STATISTICS;
/** Enum values. */
private static final GridTopic[] VALS = values();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java
index f036546..435fbf0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java
@@ -342,39 +342,43 @@ public class MetaStorage implements CheckpointListener, ReadWriteMetastorage {
}
}
- Map.Entry<String, byte[]> curUpdatesEntry = null;
+ // TODO rewrite synchronized after https://issues.apache.org/jira/browse/IGNITE-15472
+ synchronized (this) {
- if (updatesIter != null) {
- assert updatesIter.hasNext();
+ Map.Entry<String, byte[]> curUpdatesEntry = null;
- curUpdatesEntry = updatesIter.next();
- }
+ if (updatesIter != null) {
+ assert updatesIter.hasNext();
- MetastorageSearchRow lower = new MetastorageSearchRow(keyPrefix);
+ curUpdatesEntry = updatesIter.next();
+ }
- MetastorageSearchRow upper = new MetastorageSearchRow(keyPrefix + "\uFFFF");
+ MetastorageSearchRow lower = new MetastorageSearchRow(keyPrefix);
- GridCursor<MetastorageDataRow> cur = tree.find(lower, upper);
+ MetastorageSearchRow upper = new MetastorageSearchRow(keyPrefix + "\uFFFF");
- while (cur.next()) {
- MetastorageDataRow row = cur.get();
+ GridCursor<MetastorageDataRow> cur = tree.find(lower, upper);
- String key = row.key();
- byte[] valBytes = partStorage.readRow(row.link());
+ while (cur.next()) {
+ MetastorageDataRow row = cur.get();
- int c = 0;
+ String key = row.key();
+ byte[] valBytes = partStorage.readRow(row.link());
- while (curUpdatesEntry != null && (c = curUpdatesEntry.getKey().compareTo(key)) < 0)
- curUpdatesEntry = advanceCurrentUpdatesEntry(cb, unmarshal, updatesIter, curUpdatesEntry);
+ int c = 0;
+
+ while (curUpdatesEntry != null && (c = curUpdatesEntry.getKey().compareTo(key)) < 0)
+ curUpdatesEntry = advanceCurrentUpdatesEntry(cb, unmarshal, updatesIter, curUpdatesEntry);
- if (curUpdatesEntry != null && c == 0)
+ if (curUpdatesEntry != null && c == 0)
+ curUpdatesEntry = advanceCurrentUpdatesEntry(cb, unmarshal, updatesIter, curUpdatesEntry);
+ else
+ applyCallback(cb, unmarshal, key, valBytes);
+ }
+
+ while (curUpdatesEntry != null)
curUpdatesEntry = advanceCurrentUpdatesEntry(cb, unmarshal, updatesIter, curUpdatesEntry);
- else
- applyCallback(cb, unmarshal, key, valBytes);
}
-
- while (curUpdatesEntry != null)
- curUpdatesEntry = advanceCurrentUpdatesEntry(cb, unmarshal, updatesIter, curUpdatesEntry);
}
/** */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsManager.java
index 59e73f4..233f2fa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsManager.java
@@ -61,6 +61,14 @@ public interface IgniteStatisticsManager {
public ObjectStatistics getLocalStatistics(StatisticsKey key);
/**
+ * Get global statistics by object.
+ *
+ * @param key Statistic key.
+ * @return Object statistics of {@code null} if there are no available global statistics by specified object.
+ */
+ public ObjectStatistics getGlobalStatistics(StatisticsKey key);
+
+ /**
* Stop statistic manager.
*/
public void stop();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/StatisticsKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/StatisticsKey.java
index d061320..b7a7037 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/StatisticsKey.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/StatisticsKey.java
@@ -61,11 +61,15 @@ public class StatisticsKey implements Serializable {
/** {@inheritDoc} */
@Override public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
StatisticsKey statsKey = (StatisticsKey) o;
- return Objects.equals(schema, statsKey.schema) &&
- Objects.equals(obj, statsKey.obj);
+
+ return Objects.equals(schema, statsKey.schema) && Objects.equals(obj, statsKey.obj);
}
/** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/config/StatisticsColumnConfiguration.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/config/StatisticsColumnConfiguration.java
index f1ab02f..b025aab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/config/StatisticsColumnConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/config/StatisticsColumnConfiguration.java
@@ -128,7 +128,7 @@ public class StatisticsColumnConfiguration implements Serializable {
*
* @param oldCfg Previous configuration. May be {@code null} when new configuration is created.
* @param newCfg New configuration.
- * @return merged configuration.
+ * @return Merged configuration.
*/
public static StatisticsColumnConfiguration merge(
StatisticsColumnConfiguration oldCfg,
@@ -137,32 +137,10 @@ public class StatisticsColumnConfiguration implements Serializable {
if (oldCfg == null)
return newCfg;
- if (oldCfg.collectionAwareEqual(newCfg))
- return new StatisticsColumnConfiguration(newCfg, oldCfg.ver, oldCfg.tombstone, newCfg.overrides);
-
return new StatisticsColumnConfiguration(newCfg, oldCfg.ver + 1, false, newCfg.overrides);
}
/**
- * Compare only collection or gathering related fields of config.
- *
- * @param o StatisticsColumnConfiguration to compare with.
- * @return {@code true} if configurations are equal from the gathering point of view, {@code false} - otherwise.
- */
- public boolean collectionAwareEqual(StatisticsColumnConfiguration o) {
- if (this == o)
- return true;
-
- if (o == null || getClass() != o.getClass())
- return false;
-
- StatisticsColumnConfiguration that = (StatisticsColumnConfiguration)o;
-
- return ver == that.ver && tombstone == that.tombstone
- && Objects.equals(name, that.name);
- }
-
- /**
* Create configuration for dropped statistic column.
*
* @return Tombstone column configuration.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/config/StatisticsObjectConfiguration.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/config/StatisticsObjectConfiguration.java
index 32d1ddd..d39706f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/config/StatisticsObjectConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/config/StatisticsObjectConfiguration.java
@@ -245,7 +245,7 @@ public class StatisticsObjectConfiguration implements Serializable, Comparable<S
/**
* Compare only configuration from the same branch. I.e. can't correctly compare
* Cfg(A=v1,B=v3) vs Cfg(A=v2,B=v1)
- * Cfg(A=v1,B=v3) vs Cfg(A=v1m C=v2)
+ * Cfg(A=v1,B=v3) vs Cfg(A=v1,C=v2)
* because there is no changes chain to get one from another.
*
* @param o Other configuration to compare.
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/managers/systemview/walker/StatisticsColumnGlobalDataViewWalker.java b/modules/indexing/src/main/java/org/apache/ignite/internal/managers/systemview/walker/StatisticsColumnGlobalDataViewWalker.java
new file mode 100644
index 0000000..c5d9cce
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/managers/systemview/walker/StatisticsColumnGlobalDataViewWalker.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.systemview.walker;
+
+import java.sql.Timestamp;
+import java.util.Collections;
+import java.util.List;
+import org.apache.ignite.internal.processors.query.stat.view.StatisticsColumnGlobalDataView;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.spi.systemview.view.SystemViewRowAttributeWalker;
+
+/**
+ * Generated by {@code org.apache.ignite.codegen.SystemViewRowAttributeWalkerGenerator}.
+ * {@link StatisticsColumnGlobalDataView} attributes walker.
+ *
+ * @see StatisticsColumnGlobalDataView
+ */
+public class StatisticsColumnGlobalDataViewWalker implements SystemViewRowAttributeWalker<StatisticsColumnGlobalDataView> {
+ /** Filter key for attribute "schema" */
+ public static final String SCHEMA_FILTER = "schema";
+
+ /** Filter key for attribute "type" */
+ public static final String TYPE_FILTER = "type";
+
+ /** Filter key for attribute "name" */
+ public static final String NAME_FILTER = "name";
+
+ /** Filter key for attribute "column" */
+ public static final String COLUMN_FILTER = "column";
+
+ /** List of filtrable attributes. */
+ private static final List<String> FILTRABLE_ATTRS = Collections.unmodifiableList(F.asList(
+ "schema", "type", "name", "column"
+ ));
+
+ /** {@inheritDoc} */
+ @Override public List<String> filtrableAttributes() {
+ return FILTRABLE_ATTRS;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void visitAll(AttributeVisitor v) {
+ v.accept(0, "schema", String.class);
+ v.accept(1, "type", String.class);
+ v.accept(2, "name", String.class);
+ v.accept(3, "column", String.class);
+ v.accept(4, "rowsCount", long.class);
+ v.accept(5, "distinct", long.class);
+ v.accept(6, "nulls", long.class);
+ v.accept(7, "total", long.class);
+ v.accept(8, "size", int.class);
+ v.accept(9, "version", long.class);
+ v.accept(10, "lastUpdateTime", Timestamp.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void visitAll(StatisticsColumnGlobalDataView row, AttributeWithValueVisitor v) {
+ v.accept(0, "schema", String.class, row.schema());
+ v.accept(1, "type", String.class, row.type());
+ v.accept(2, "name", String.class, row.name());
+ v.accept(3, "column", String.class, row.column());
+ v.acceptLong(4, "rowsCount", row.rowsCount());
+ v.acceptLong(5, "distinct", row.distinct());
+ v.acceptLong(6, "nulls", row.nulls());
+ v.acceptLong(7, "total", row.total());
+ v.acceptInt(8, "size", row.size());
+ v.acceptLong(9, "version", row.version());
+ v.accept(10, "lastUpdateTime", Timestamp.class, row.lastUpdateTime());
+ }
+
+ /** {@inheritDoc} */
+ @Override public int count() {
+ return 11;
+ }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java
index e4aae7d..a306f7b 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java
@@ -23,6 +23,11 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.cache.query.QueryTable;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2ValueCacheObject;
+import org.apache.ignite.internal.processors.query.stat.messages.StatisticsColumnData;
+import org.apache.ignite.internal.processors.query.stat.messages.StatisticsKeyMessage;
+import org.apache.ignite.internal.processors.query.stat.messages.StatisticsObjectData;
+import org.apache.ignite.internal.processors.query.stat.messages.StatisticsRequest;
+import org.apache.ignite.internal.processors.query.stat.messages.StatisticsResponse;
import org.apache.ignite.plugin.extensions.communication.IgniteMessageFactory;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
@@ -64,6 +69,12 @@ public class GridH2ValueMessageFactory implements MessageFactoryProvider {
factory.register((short)-55, GridH2DmlRequest::new);
factory.register((short)-56, GridH2DmlResponse::new);
factory.register((short)-57, GridH2SelectForUpdateTxDetails::new);
+
+ factory.register(StatisticsKeyMessage.TYPE_CODE, StatisticsKeyMessage::new);
+ factory.register(StatisticsObjectData.TYPE_CODE, StatisticsObjectData::new);
+ factory.register(StatisticsColumnData.TYPE_CODE, StatisticsColumnData::new);
+ factory.register(StatisticsRequest.TYPE_CODE, StatisticsRequest::new);
+ factory.register(StatisticsResponse.TYPE_CODE, StatisticsResponse::new);
}
/** {@inheritDoc} */
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteGlobalStatisticsManager.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteGlobalStatisticsManager.java
new file mode 100644
index 0000000..88a555b
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteGlobalStatisticsManager.java
@@ -0,0 +1,1021 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.stat;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.internal.GridTopic;
+import org.apache.ignite.internal.events.DiscoveryCustomEvent;
+import org.apache.ignite.internal.managers.communication.GridIoManager;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.internal.managers.communication.GridMessageListener;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
+import org.apache.ignite.internal.managers.systemview.GridSystemViewManager;
+import org.apache.ignite.internal.managers.systemview.walker.StatisticsColumnGlobalDataViewWalker;
+import org.apache.ignite.internal.managers.systemview.walker.StatisticsColumnLocalDataViewWalker;
+import org.apache.ignite.internal.managers.systemview.walker.StatisticsColumnPartitionDataViewWalker;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
+import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
+import org.apache.ignite.internal.processors.cluster.GridClusterStateProcessor;
+import org.apache.ignite.internal.processors.query.stat.config.StatisticsColumnConfiguration;
+import org.apache.ignite.internal.processors.query.stat.config.StatisticsObjectConfiguration;
+import org.apache.ignite.internal.processors.query.stat.messages.StatisticsKeyMessage;
+import org.apache.ignite.internal.processors.query.stat.messages.StatisticsObjectData;
+import org.apache.ignite.internal.processors.query.stat.messages.StatisticsRequest;
+import org.apache.ignite.internal.processors.query.stat.messages.StatisticsResponse;
+import org.apache.ignite.internal.processors.query.stat.view.StatisticsColumnConfigurationView;
+import org.apache.ignite.internal.processors.query.stat.view.StatisticsColumnGlobalDataView;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.thread.IgniteThreadPoolExecutor;
+
+/**
+ * Global statistics manager. Cache global statistics and collect it.
+ */
+public class IgniteGlobalStatisticsManager implements GridMessageListener {
+ /** */
+ private static final String STAT_GLOBAL_VIEW_NAME = "statisticsGlobalData";
+
+ /** */
+ private static final String STAT_GLOBAL_VIEW_DESCRIPTION = "Global statistics.";
+
+ /** Statistics manager. */
+ private final IgniteStatisticsManagerImpl statMgr;
+
+ /** Pool to process statistics requests. */
+ private final IgniteThreadPoolExecutor mgmtPool;
+
+ /** Discovery manager to get server node list to statistics master calculation. */
+ private final GridDiscoveryManager discoMgr;
+
+ /** Cluster state processor. */
+ private final GridClusterStateProcessor cluster;
+
+ /** Cache partition exchange manager. */
+ private final GridCachePartitionExchangeManager<?, ?> exchange;
+
+ /** Helper to transform or generate statistics related messages. */
+ private final IgniteStatisticsHelper helper;
+
+ /** Grid io manager to exchange global and local statistics. */
+ private final GridIoManager ioMgr;
+
+ /** Cache for global statistics. */
+ private final ConcurrentMap<StatisticsKey, CacheEntry<ObjectStatisticsImpl>> globalStatistics =
+ new ConcurrentHashMap<>();
+
+ /** Incoming requests which should be served after local statistics collection finish. */
+ private final ConcurrentMap<StatisticsKey, Collection<StatisticsAddressedRequest>> inLocalRequests =
+ new ConcurrentHashMap<>();
+
+ /** Incoming requests which should be served after global statistics collection finish. */
+ private final ConcurrentMap<StatisticsKey, Collection<StatisticsAddressedRequest>> inGloblaRequests =
+ new ConcurrentHashMap<>();
+
+ /** Outcoming global collection requests. */
+ private final ConcurrentMap<StatisticsKey, StatisticsGatheringContext> curCollections = new ConcurrentHashMap<>();
+
+ /** Outcoming global statistics requests to request id. */
+ private final ConcurrentMap<StatisticsKey, UUID> outGlobalStatisticsRequests = new ConcurrentHashMap<>();
+
+ /** Logger. */
+ private final IgniteLogger log;
+
+ /** Started flag. */
+ private boolean started;
+
+ /** Exchange listener: clean inbound requests and restart outbount. */
+ private final PartitionsExchangeAware exchAwareLsnr = new PartitionsExchangeAware() {
+ @Override public void onDoneAfterTopologyUnlock(GridDhtPartitionsExchangeFuture fut) {
+
+ // Skip join/left client nodes.
+ if (fut.exchangeType() != GridDhtPartitionsExchangeFuture.ExchangeType.ALL ||
+ cluster.clusterState().lastState() != ClusterState.ACTIVE)
+ return;
+
+ DiscoveryEvent evt = fut.firstEvent();
+
+ // Skip create/destroy caches.
+ if (evt.type() == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) {
+ DiscoveryCustomMessage msg = ((DiscoveryCustomEvent)evt).customMessage();
+
+ if (msg instanceof DynamicCacheChangeBatch)
+ return;
+
+ // Just clear all activities and update topology version.
+ if (log.isDebugEnabled())
+ log.debug("Resetting all global statistics activities due to new topology " +
+ fut.topologyVersion());
+
+ inLocalRequests.clear();
+ inGloblaRequests.clear();
+
+ Set<StatisticsKey> curColls = curCollections.keySet();
+
+ for (StatisticsKey key : curColls) {
+ curCollections.remove(key);
+
+ mgmtPool.submit(() -> collectGlobalStatistics(key));
+ }
+
+ Set<StatisticsKey> outReqs = outGlobalStatisticsRequests.keySet();
+
+ for (StatisticsKey key : outReqs) {
+ outGlobalStatisticsRequests.remove(key);
+
+ mgmtPool.submit(() -> collectGlobalStatistics(key));
+ }
+ }
+ }
+ };
+
+ /**
+ * Constructor.
+ *
+ * @param statMgr Statistics manager.
+ * @param sysViewMgr System view manager.
+ * @param mgmtPool Statistics management pool.
+ * @param discoMgr Grid discovery manager.
+ * @param cluster Cluster state processor.
+ * @param exchange Partition exchange manager.
+ * @param helper Statistics helper.
+ * @param ioMgr Communication manager.
+ * @param logSupplier Log supplier.
+ */
+ public IgniteGlobalStatisticsManager(
+ IgniteStatisticsManagerImpl statMgr,
+ GridSystemViewManager sysViewMgr,
+ IgniteThreadPoolExecutor mgmtPool,
+ GridDiscoveryManager discoMgr,
+ GridClusterStateProcessor cluster,
+ GridCachePartitionExchangeManager<?, ?> exchange,
+ IgniteStatisticsHelper helper,
+ GridIoManager ioMgr,
+ Function<Class<?>, IgniteLogger> logSupplier
+ ) {
+ this.statMgr = statMgr;
+ this.mgmtPool = mgmtPool;
+ this.discoMgr = discoMgr;
+ this.cluster = cluster;
+ this.exchange = exchange;
+ this.helper = helper;
+ this.ioMgr = ioMgr;
+ log = logSupplier.apply(IgniteGlobalStatisticsManager.class);
+
+ statMgr.subscribeToLocalStatistics(nls -> onLocalStatisticsAggregated(nls.key(), nls.statistics(),
+ nls.topologyVersion()));
+ statMgr.subscribeToStatisticsConfig(this::onConfigChanged);
+ ioMgr.addMessageListener(GridTopic.TOPIC_STATISTICS, this);
+
+ sysViewMgr.registerFiltrableView(STAT_GLOBAL_VIEW_NAME, STAT_GLOBAL_VIEW_DESCRIPTION,
+ new StatisticsColumnGlobalDataViewWalker(), this::columnGlobalStatisticsViewSupplier, Function.identity());
+ }
+
+ /**
+ * Statistics column global data view filterable supplier.
+ *
+ * @param filter Filter.
+ * @return Iterable with statistics column global data views.
+ */
+ private Iterable<StatisticsColumnGlobalDataView> columnGlobalStatisticsViewSupplier(Map<String, Object> filter) {
+ String type = (String)filter.get(StatisticsColumnPartitionDataViewWalker.TYPE_FILTER);
+ if (type != null && !StatisticsColumnConfigurationView.TABLE_TYPE.equalsIgnoreCase(type))
+ return Collections.emptyList();
+
+ String schema = (String)filter.get(StatisticsColumnLocalDataViewWalker.SCHEMA_FILTER);
+ String name = (String)filter.get(StatisticsColumnLocalDataViewWalker.NAME_FILTER);
+ String column = (String)filter.get(StatisticsColumnPartitionDataViewWalker.COLUMN_FILTER);
+
+ Map<StatisticsKey, ObjectStatisticsImpl> globalStatsMap;
+ if (!F.isEmpty(schema) && !F.isEmpty(name)) {
+ StatisticsKey key = new StatisticsKey(schema, name);
+
+ CacheEntry<ObjectStatisticsImpl> objLocStat = globalStatistics.get(key);
+
+ if (objLocStat == null || objLocStat.obj == null)
+ return Collections.emptyList();
+
+ globalStatsMap = Collections.singletonMap(key, objLocStat.object());
+ }
+ else
+ globalStatsMap = globalStatistics.entrySet().stream()
+ .filter(e -> e.getValue().object() != null && (F.isEmpty(schema) || schema.equals(e.getKey().schema())))
+ .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().object()));
+
+ List<StatisticsColumnGlobalDataView> res = new ArrayList<>();
+
+ for (Map.Entry<StatisticsKey, ObjectStatisticsImpl> localStatsEntry : globalStatsMap.entrySet()) {
+ StatisticsKey key = localStatsEntry.getKey();
+ ObjectStatisticsImpl stat = localStatsEntry.getValue();
+
+ if (column == null) {
+ for (Map.Entry<String, ColumnStatistics> colStat : localStatsEntry.getValue().columnsStatistics()
+ .entrySet()) {
+ StatisticsColumnGlobalDataView colStatView = new StatisticsColumnGlobalDataView(key,
+ colStat.getKey(), stat);
+
+ res.add(colStatView);
+ }
+ }
+ else {
+ ColumnStatistics colStat = localStatsEntry.getValue().columnStatistics(column);
+
+ if (colStat != null) {
+ StatisticsColumnGlobalDataView colStatView = new StatisticsColumnGlobalDataView(key, column, stat);
+
+ res.add(colStatView);
+ }
+ }
+ }
+
+ return res;
+ }
+
+ /**
+ * Start operations.
+ * Shouldn't be called twice.
+ */
+ public synchronized void start() {
+ if (started) {
+ if (log.isDebugEnabled())
+ log.debug("IgniteGlobalStatisticsManager already started.");
+
+ return;
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Global statistics manager starting...");
+
+ globalStatistics.clear();
+ exchange.registerExchangeAwareComponent(exchAwareLsnr);
+
+ started = true;
+
+ if (log.isDebugEnabled())
+ log.debug("Global statistics manager started.");
+ }
+
+ /**
+ * Stop operations.
+ * Shouldn't be called twice.
+ */
+ public synchronized void stop() {
+ if (!started) {
+ if (log.isDebugEnabled())
+ log.debug("IgniteGlobalStatisticsManager already stopped.");
+
+ return;
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Global statistics manager stopping...");
+
+ globalStatistics.clear();
+
+ inGloblaRequests.clear();
+ inLocalRequests.clear();
+ outGlobalStatisticsRequests.clear();
+ curCollections.clear();
+
+ exchange.unregisterExchangeAwareComponent(exchAwareLsnr);
+
+ started = false;
+
+ if (log.isDebugEnabled())
+ log.debug("Global statistics manager stopped.");
+ }
+
+ /**
+ * Get global statistics for the given key. If there is no cached statistics, but there is cache record with
+ * empty object - no additional collection will be started.
+ *
+ * @param key Statistics key.
+ * @return Global object statistics or {@code null} if there is no global statistics available.
+ */
+ public ObjectStatisticsImpl getGlobalStatistics(StatisticsKey key) {
+ CacheEntry<ObjectStatisticsImpl> res = globalStatistics.computeIfAbsent(key, k -> {
+ if (log.isDebugEnabled())
+ log.debug("Scheduling global statistics collection by key " + key);
+
+ mgmtPool.submit(() -> collectGlobalStatistics(key));
+
+ return new CacheEntry<>(null);
+ });
+
+ return res.object();
+ }
+
+ /**
+ * Either send local or global statistics request to get global statistics.
+ *
+ * @param key Statistics key to get global statistics by.
+ */
+ private void collectGlobalStatistics(StatisticsKey key) {
+ try {
+ StatisticsObjectConfiguration statCfg = statMgr.statisticConfiguration().config(key);
+
+ if (statCfg != null && !statCfg.columns().isEmpty()) {
+ UUID statMaster = getStatisticsMasterNode(key);
+
+ if (discoMgr.localNode().id().equals(statMaster))
+ gatherGlobalStatistics(statCfg);
+ else {
+ StatisticsKeyMessage keyMsg = new StatisticsKeyMessage(key.schema(), key.obj(),
+ Collections.emptyList());
+
+ Map<String, Long> versions = statCfg.columns().entrySet().stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().version()));
+
+ StatisticsRequest globalReq = new StatisticsRequest(UUID.randomUUID(), keyMsg,
+ StatisticsType.GLOBAL, null, versions);
+
+ outGlobalStatisticsRequests.put(key, globalReq.reqId());
+
+ if (log.isDebugEnabled())
+ log.debug("Send global statistics request by configuration " + statCfg);
+
+ send(statMaster, globalReq);
+
+ }
+ }
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Unable to start global statistics collection due to lack of configuration by key "
+ + key);
+ }
+
+ }
+ catch (IgniteCheckedException e) {
+ if (log.isInfoEnabled())
+ log.info("Unable to get statistics configuration due to " + e.getMessage());
+ }
+ }
+
+ /**
+ * Collect global statistics on master node.
+ *
+ * @param statCfg Statistics config to gather global statistics by.
+ */
+ private void gatherGlobalStatistics(StatisticsObjectConfiguration statCfg) throws IgniteCheckedException {
+ if (log.isDebugEnabled())
+ log.debug("Start global statistics collection by configuration " + statCfg);
+
+ StatisticsTarget target = new StatisticsTarget(statCfg.key());
+
+ List<StatisticsAddressedRequest> locRequests = helper.generateGatheringRequests(target, statCfg);
+ UUID reqId = locRequests.get(0).req().reqId();
+
+ StatisticsGatheringContext gatCtx = new StatisticsGatheringContext(locRequests.size(), reqId, statCfg);
+
+ curCollections.put(statCfg.key(), gatCtx);
+
+ for (StatisticsAddressedRequest addReq : locRequests) {
+ if (log.isDebugEnabled())
+ log.debug("Sending local request " + addReq.req().reqId() + " to node " + addReq.nodeId());
+
+ send(addReq.nodeId(), addReq.req());
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
+ mgmtPool.submit(() -> {
+ try {
+ if (msg instanceof StatisticsRequest) {
+ StatisticsRequest req = (StatisticsRequest)msg;
+ switch (req.type()) {
+ case LOCAL:
+ processLocalRequest(nodeId, req);
+
+ break;
+
+ case GLOBAL:
+ processGlobalRequest(nodeId, req);
+
+ break;
+
+ default:
+ log.warning("Unexpected type " + req.type() + " in statistics request message " + req);
+ }
+ }
+ else if (msg instanceof StatisticsResponse) {
+ StatisticsResponse resp = (StatisticsResponse)msg;
+
+ switch (resp.data().type()) {
+ case LOCAL:
+ processLocalResponse(nodeId, resp);
+
+ break;
+
+ case GLOBAL:
+ processGlobalResponse(nodeId, resp);
+
+ break;
+
+ default:
+ log.warning("Unexpected type " + resp.data().type() +
+ " in statistics reposonse message " + resp);
+ }
+
+ }
+ else
+ log.warning("Unknown msg " + msg + " in statistics topic " + GridTopic.TOPIC_STATISTICS +
+ " from node " + nodeId);
+ }
+ catch (Throwable e) {
+ log.warning("Unable to process statistics message: " + e.getMessage(), e);
+ }
+ });
+ }
+
+ /**
+ * Process request for local statistics.
+ * 1) If there are local statistics for the given key - send response.
+ * 2) If there is no such statistics - add request to incoming queue.
+ *
+ * @param nodeId Sender node id.
+ * @param req Request to process.
+ * @throws IgniteCheckedException
+ */
+ private void processLocalRequest(UUID nodeId, StatisticsRequest req) throws IgniteCheckedException {
+ if (log.isDebugEnabled())
+ log.debug("Got local statistics request from node " + nodeId + " : " + req);
+
+ StatisticsKey key = new StatisticsKey(req.key().schema(), req.key().obj());
+
+ ObjectStatisticsImpl objectStatistics = statMgr.getLocalStatistics(key, req.topVer());
+
+ if (StatisticsUtils.compareVersions(objectStatistics, req.versions()) == 0)
+ sendResponse(nodeId, req.reqId(), key, StatisticsType.LOCAL, objectStatistics);
+ else {
+ addToRequests(inLocalRequests, key, new StatisticsAddressedRequest(req, nodeId));
+
+ objectStatistics = statMgr.getLocalStatistics(key, req.topVer());
+
+ if (StatisticsUtils.compareVersions(objectStatistics, req.versions()) == 0) {
+ StatisticsAddressedRequest removed = removeFromRequests(inLocalRequests, key, req.reqId());
+
+ if (removed != null)
+ sendResponse(nodeId, req.reqId(), key, StatisticsType.LOCAL, objectStatistics);
+ }
+ }
+ }
+
+ /**
+ * Test if statistics configuration is fit to all required versions.
+ * @param cfg Statistics configuration to check.
+ * @param versions Map of column name to required version.
+ * @return {@code true} if it is, {@code false} otherwise.
+ */
+ private boolean checkStatisticsCfg(StatisticsObjectConfiguration cfg, Map<String, Long> versions) {
+ if (cfg == null)
+ return false;
+
+ for (Map.Entry<String, Long> version : versions.entrySet()) {
+ StatisticsColumnConfiguration colCfg = cfg.columns().get(version.getKey());
+
+ if (colCfg == null || colCfg.version() < version.getValue())
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
+ * Process incoming request for global statistics. Either response (if it exists), or collect and response
+ * (if current node is master node for the given key) or ignore (if current node is no more master node for
+ * the given key.
+ *
+ * @param nodeId Sender node id.
+ * @param req Request.
+ */
+ private void processGlobalRequest(UUID nodeId, StatisticsRequest req) throws IgniteCheckedException {
+ if (log.isDebugEnabled())
+ log.debug("Got global statistics request from node " + nodeId + " : " + req);
+
+ StatisticsKey key = new StatisticsKey(req.key().schema(), req.key().obj());
+
+ ObjectStatisticsImpl objStatistics = getGlobalStatistics(key, req.versions());
+
+ if (objStatistics == null) {
+ if (discoMgr.localNode().id().equals(getStatisticsMasterNode(key))) {
+
+ addToRequests(inGloblaRequests, key, new StatisticsAddressedRequest(req, nodeId));
+ globalStatistics.computeIfAbsent(key, k -> new CacheEntry<>(null));
+
+ if (!hasCurrentCollection(key, req.versions())) {
+ StatisticsObjectConfiguration objCfg = statMgr.statisticConfiguration().config(key);
+
+ if (StatisticsUtils.compareVersions(objCfg, req.versions()) >= 0)
+ gatherGlobalStatistics(objCfg);
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Wait for statistics configuration to process global statistics request " +
+ req.reqId());
+ }
+ }
+ }
+
+ objStatistics = getGlobalStatistics(key, req.versions());
+
+ if (objStatistics != null) {
+ StatisticsAddressedRequest removed = removeFromRequests(inGloblaRequests, key, req.reqId());
+
+ if (removed != null)
+ sendResponse(nodeId, req.reqId(), key, StatisticsType.GLOBAL, objStatistics);
+ }
+ }
+ else
+ sendResponse(nodeId, req.reqId(), key, StatisticsType.GLOBAL, objStatistics);
+ }
+
+ /**
+ * Check if there are already started current collection with specified parameters.
+ *
+ * @param key Statistics key.
+ * @param versions Reuqired versions.
+ * @return {@code true} if there are already current collection with specifie parameters, {@code false} - otherwise.
+ */
+ private boolean hasCurrentCollection(StatisticsKey key, Map<String, Long> versions) {
+ StatisticsGatheringContext ctx = curCollections.get(key);
+
+ if (ctx == null)
+ return false;
+
+ return StatisticsUtils.compareVersions(ctx.configuration(), versions) == 0;
+ }
+
+ /**
+ * Get apptopriate global statistics from cache.
+ *
+ * @param key Statistics key.
+ * @param versions Required versions.
+ * @return Global statistics or {@code null} if there are no such global statistics.
+ */
+ private ObjectStatisticsImpl getGlobalStatistics(StatisticsKey key, Map<String, Long> versions) {
+ CacheEntry<ObjectStatisticsImpl> objStatEntry = globalStatistics.get(key);
+
+ if (objStatEntry == null || StatisticsUtils.compareVersions(objStatEntry.object(), versions) != 0)
+ return null;
+
+ return objStatEntry.object();
+ }
+
+ /**
+ * Build statistics response and send it to specified node.
+ *
+ * @param nodeId Target node id.
+ * @param reqId Request id.
+ * @param key Statistics key.
+ * @param type Statistics type.
+ * @param data Statitsics data.
+ */
+ private void sendResponse(
+ UUID nodeId,
+ UUID reqId,
+ StatisticsKey key,
+ StatisticsType type,
+ ObjectStatisticsImpl data
+ ) throws IgniteCheckedException {
+ StatisticsKeyMessage keyMsg = new StatisticsKeyMessage(key.schema(), key.obj(), null);
+ StatisticsObjectData dataMsg = StatisticsUtils.toObjectData(keyMsg, type, data);
+
+ send(nodeId, new StatisticsResponse(reqId, dataMsg));
+ }
+
+ /**
+ * Add to addressed requests map.
+ *
+ * @param map Map to add into.
+ * @param key Request statistics key.
+ * @param req Request to add.
+ */
+ private void addToRequests(
+ ConcurrentMap<StatisticsKey, Collection<StatisticsAddressedRequest>> map,
+ StatisticsKey key,
+ StatisticsAddressedRequest req
+ ) {
+ map.compute(key, (k, v) -> {
+ if (v == null)
+ v = new ArrayList<>();
+
+ v.add(req);
+
+ return v;
+ });
+ }
+
+ /**
+ * Check if specified map contains request with specified key and id, remove and return it.
+ *
+ * @param key Request statistics key.
+ * @param reqId Request id.
+ * @return Removed request.
+ */
+ private StatisticsAddressedRequest removeFromRequests(
+ ConcurrentMap<StatisticsKey, Collection<StatisticsAddressedRequest>> map,
+ StatisticsKey key,
+ UUID reqId
+ ) {
+ StatisticsAddressedRequest[] res = new StatisticsAddressedRequest[1];
+
+ map.compute(key, (k, v) -> {
+ if (v != null)
+ res[0] = v.stream().filter(e -> reqId.equals(e.req().reqId())).findAny().orElse(null);
+
+ if (res[0] != null)
+ v = v.stream().filter(e -> !reqId.equals(e.req().reqId())).collect(Collectors.toList());
+
+ return v;
+ });
+
+ return res[0];
+ }
+
+ /**
+ * Process statistics configuration changes:
+ *
+ * 1) Remove all outbound activity by specified key, inbound may be suspended due to lack of
+ * requested configuration.
+ * 2) Remove all inbound activity by changed key if reqiest col cfg versions lower than configuration col cfg versions.
+ * 3.1) If there are no live column's config - remove cached global statistics.
+ * 3.2) If there are some live columns config and global statistics cache contains statistics for the given key -
+ * start to collect it again.
+ */
+ public void onConfigChanged(StatisticsObjectConfiguration cfg) {
+ StatisticsKey key = cfg.key();
+
+ curCollections.remove(key);
+ outGlobalStatisticsRequests.remove(key);
+
+ inLocalRequests.computeIfPresent(key, (k, v) -> {
+ // Current config newer than income request - request should be invalidated.
+ v.removeIf(req -> StatisticsUtils.compareVersions(cfg, req.req().versions()) > 0);
+
+ return (v.isEmpty()) ? null : v;
+ });
+
+ inGloblaRequests.computeIfPresent(key, (k, v) -> {
+ v.removeIf(req -> StatisticsUtils.compareVersions(cfg, req.req().versions()) > 0);
+
+ return (v.isEmpty()) ? null : v;
+ });
+
+ if (cfg.columns().isEmpty())
+ globalStatistics.remove(key);
+ else {
+ globalStatistics.computeIfPresent(key, (k, v) -> {
+ if (v != null) {
+ if (log.isDebugEnabled())
+ log.debug("Scheduling global statistics recollection by key " + key);
+
+ mgmtPool.submit(() -> collectGlobalStatistics(key));
+ }
+ return v;
+ });
+ }
+ }
+
+ /**
+ * Clear global object statistics.
+ *
+ * @param key Object key to clear global statistics by.
+ * @param colNames Only statistics by specified columns will be cleared.
+ */
+ public void clearGlobalStatistics(StatisticsKey key, Set<String> colNames) {
+ globalStatistics.computeIfPresent(key, (k, v) -> {
+ ObjectStatisticsImpl globStatOld = v.object();
+ ObjectStatisticsImpl globStatNew = (globStatOld == null) ? null : globStatOld.subtract(colNames);
+
+ return (globStatNew == null || globStatNew.columnsStatistics().isEmpty()) ? null :
+ new CacheEntry<>(globStatNew);
+ });
+
+ outGlobalStatisticsRequests.remove(key);
+ }
+
+ /**
+ * Process response with local statistics. Try to finish collecting operation and send pending requests.
+ *
+ * @param nodeId Sender node id.
+ * @param resp Statistics response to process.
+ * @throws IgniteCheckedException In case of error.
+ */
+ private void processLocalResponse(UUID nodeId, StatisticsResponse resp) throws IgniteCheckedException {
+ StatisticsKeyMessage keyMsg = resp.data().key();
+ StatisticsKey key = new StatisticsKey(keyMsg.schema(), resp.data().key().obj());
+
+ if (log.isDebugEnabled())
+ log.debug("Got local statistics response " + resp.reqId() + " from node " + nodeId + " by key " + key);
+
+ StatisticsGatheringContext curCtx = curCollections.get(key);
+
+ if (curCtx != null) {
+ if (!curCtx.reqId().equals(resp.reqId())) {
+ if (log.isDebugEnabled())
+ log.debug("Got outdated local statistics response " + resp + " instead of " + curCtx.reqId());
+
+ return;
+ }
+
+ ObjectStatisticsImpl data = StatisticsUtils.toObjectStatistics(null, resp.data());
+
+ if (curCtx.registerResponse(data)) {
+ StatisticsObjectConfiguration cfg = statMgr.statisticConfiguration().config(key);
+
+ if (cfg != null) {
+ if (log.isDebugEnabled())
+ log.debug("Aggregating global statistics for key " + key + " by request " + curCtx.reqId());
+
+ ObjectStatisticsImpl globalStat = helper.aggregateLocalStatistics(cfg, curCtx.collectedData());
+
+ globalStatistics.put(key, new CacheEntry<>(globalStat));
+
+ if (log.isDebugEnabled())
+ log.debug("Global statistics for key " + key + " collected.");
+
+ Collection<StatisticsAddressedRequest> globalRequests = inGloblaRequests.remove(key);
+
+ if (globalRequests != null) {
+ StatisticsObjectData globalStatData = StatisticsUtils.toObjectData(keyMsg,
+ StatisticsType.GLOBAL, globalStat);
+
+ for (StatisticsAddressedRequest req : globalRequests) {
+ StatisticsResponse outResp = new StatisticsResponse(req.req().reqId(), globalStatData);
+
+ send(req.nodeId(), outResp);
+ }
+ }
+ }
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Dropping collected statistics due to lack of configuration for key " + key);
+ }
+
+ curCollections.remove(key);
+ }
+ }
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Got outdated local statistics response " + resp);
+ }
+ }
+
+ /**
+ * Process response of global statistics.
+ *
+ * @param resp Response.
+ * @throws IgniteCheckedException In case of error.
+ */
+ private void processGlobalResponse(UUID nodeId, StatisticsResponse resp) throws IgniteCheckedException {
+ StatisticsKeyMessage keyMsg = resp.data().key();
+ StatisticsKey key = new StatisticsKey(keyMsg.schema(), keyMsg.obj());
+
+ if (log.isDebugEnabled())
+ log.debug("Got global statistics response " + resp.reqId() + " from node " + nodeId + " by key " + key);
+
+ UUID reqId = outGlobalStatisticsRequests.get(key);
+
+ if (reqId != null) {
+ if (!resp.reqId().equals(reqId)) {
+ if (log.isDebugEnabled())
+ log.debug("Got outdated global statistics response " + resp + " instead of " + reqId);
+
+ return;
+ }
+
+ ObjectStatisticsImpl data = StatisticsUtils.toObjectStatistics(null, resp.data());
+
+ globalStatistics.put(key, new CacheEntry(data));
+ outGlobalStatisticsRequests.remove(key);
+ }
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Got outdated global statistics response " + resp);
+ }
+ }
+
+ /**
+ * Calculate id of statistics master node for the given key.
+ *
+ * @param key Statistics key to calculate master node for.
+ * @return UUID of statistics master node.
+ */
+ private UUID getStatisticsMasterNode(StatisticsKey key) {
+ UUID[] nodes = discoMgr.aliveServerNodes().stream().map(ClusterNode::id).sorted().toArray(UUID[]::new);
+ int idx = IgniteUtils.hashToIndex(key.obj().hashCode(), nodes.length);
+
+ return nodes[idx];
+ }
+
+ /**
+ * After collecting local statistics - check if there are some pending request for it and send responces.
+ *
+ * @param key Statistics key on which local statistics was aggregated.
+ * @param statistics Collected statistics by key.
+ * @param topVer Topology version which aggregated statistics stands for.
+ */
+ public void onLocalStatisticsAggregated(
+ StatisticsKey key,
+ ObjectStatisticsImpl statistics,
+ AffinityTopologyVersion topVer
+ ) {
+ List<StatisticsAddressedRequest> inReqs = new ArrayList<>();
+
+ inLocalRequests.computeIfPresent(key, (k, v) -> {
+ List<StatisticsAddressedRequest> left = new ArrayList<>();
+
+ for (StatisticsAddressedRequest req : v) {
+ if (topVer.equals(req.req().topVer()) &&
+ StatisticsUtils.compareVersions(statistics, req.req().versions()) == 0)
+ inReqs.add(req);
+ else
+ left.add(req);
+ }
+
+ return (left.isEmpty()) ? null : left;
+ });
+
+ if (inReqs.isEmpty())
+ return;
+
+ for (StatisticsAddressedRequest req : inReqs) {
+ try {
+ sendResponse(req.nodeId(), req.req().reqId(), key, StatisticsType.LOCAL, statistics);
+ }
+ catch (IgniteCheckedException e) {
+ log.info("Unable to send local object statistics for key " + key + " due to " + e.getMessage());
+ }
+ }
+ }
+
+ /**
+ * Send statistics related message.
+ *
+ * @param nodeId Target node id.
+ * @param msg Message to send.
+ * @throws IgniteCheckedException In case of error.
+ */
+ private void send(UUID nodeId, StatisticsRequest msg) throws IgniteCheckedException {
+ if (discoMgr.localNode().id().equals(nodeId)) {
+ switch (msg.type()) {
+ case LOCAL:
+ processLocalRequest(nodeId, msg);
+
+ break;
+
+ default:
+ log.warning("Unexpected type " + msg.type() + " in statistics request message " + msg);
+ }
+ }
+ else
+ ioMgr.sendToGridTopic(nodeId, GridTopic.TOPIC_STATISTICS, msg, GridIoPolicy.MANAGEMENT_POOL);
+ }
+
+ /**
+ * Send statistics response or process it locally.
+ *
+ * @param nodeId Target node id. If equals to local node - corresponding method will be called directly.
+ * @param msg Statistics response to send.
+ * @throws IgniteCheckedException In case of error.
+ */
+ private void send(UUID nodeId, StatisticsResponse msg) throws IgniteCheckedException {
+ if (discoMgr.localNode().id().equals(nodeId)) {
+ switch (msg.data().type()) {
+ case LOCAL:
+ processLocalResponse(nodeId, msg);
+
+ break;
+
+ case GLOBAL:
+ processGlobalResponse(nodeId, msg);
+
+ break;
+
+ default:
+ log.warning("Unexpected type " + msg.data().type() + " in statistics response message " + msg);
+ }
+ }
+ else
+ ioMgr.sendToGridTopic(nodeId, GridTopic.TOPIC_STATISTICS, msg, GridIoPolicy.MANAGEMENT_POOL);
+ }
+
+ /** Cache entry. */
+ private static class CacheEntry<T> {
+ /** Cached object. */
+ private final T obj;
+
+ /**
+ * Constructor.
+ *
+ * @param obj Cached object.
+ */
+ public CacheEntry(T obj) {
+ this.obj = obj;
+ }
+
+ /**
+ * @return Cached object.
+ */
+ public T object() {
+ return obj;
+ }
+ }
+
+ /** Context of global statistics gathering. */
+ private static class StatisticsGatheringContext {
+ /** Number of remaining requests. */
+ private int remainingResponses;
+
+ /** Requests id. */
+ private final UUID reqId;
+
+ /** Local object statistics from responses. */
+ private final Collection<ObjectStatisticsImpl> responses = new ArrayList<>();
+
+ /** Configuration, used to collect statistics. */
+ private final StatisticsObjectConfiguration cfg;
+
+ /**
+ * Constructor.
+ *
+ * @param responseCont Expectiong response count.
+ * @param reqId Requests id.
+ * @param cfg Configuration, used to collect statistics.
+ */
+ public StatisticsGatheringContext(int responseCont, UUID reqId, StatisticsObjectConfiguration cfg) {
+ remainingResponses = responseCont;
+ this.reqId = reqId;
+ this.cfg = cfg;
+ }
+
+ /**
+ * Register response.
+ *
+ * @param data Object statistics from response.
+ * @return {@code true} if all respones collected, {@code false} otherwise.
+ */
+ public synchronized boolean registerResponse(ObjectStatisticsImpl data) {
+ responses.add(data);
+ return --remainingResponses == 0;
+ }
+
+ /**
+ * @return Requests id.
+ */
+ public UUID reqId() {
+ return reqId;
+ }
+
+ /**
+ * Get collected local object statistics.
+ * @return Local object statistics.
+ */
+ public Collection<ObjectStatisticsImpl> collectedData() {
+ assert remainingResponses == 0;
+
+ return responses;
+ }
+
+ /**
+ * @return Statistics configuration, used to start gathering.
+ */
+ public StatisticsObjectConfiguration configuration() {
+ return cfg;
+ }
+ }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsConfigurationManager.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsConfigurationManager.java
index 326b028..fab6bcb 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsConfigurationManager.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsConfigurationManager.java
@@ -23,7 +23,9 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.BiConsumer;
+import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.ignite.IgniteCheckedException;
@@ -100,30 +102,34 @@ public class IgniteStatisticsConfigurationManager {
/** Is server node flag. */
private final boolean isServerNode;
+ /** Configuration change subscribers. */
+ private List<Consumer<StatisticsObjectConfiguration>> subscribers = new CopyOnWriteArrayList<>();
+
/** Change statistics configuration listener to update particular object statistics. */
private final DistributedMetastorageLifecycleListener distrMetaStoreLsnr =
new DistributedMetastorageLifecycleListener() {
- @Override public void onReadyForWrite(DistributedMetaStorage metastorage) {
- distrMetaStorage = metastorage;
-
- distrMetaStorage.listen(
- (metaKey) -> metaKey.startsWith(STAT_OBJ_PREFIX),
- (k, oldV, newV) -> {
- // Skip invoke on start node (see 'ReadableDistributedMetaStorage#listen' the second case)
- // The update statistics on start node is handled by 'scanAndCheckLocalStatistic' method
- // called on exchange done.
- if (topVer == null)
- return;
+ @Override public void onReadyForWrite(DistributedMetaStorage metastorage) {
+ distrMetaStorage = metastorage;
+
+ distrMetaStorage.listen(
+ (metaKey) -> metaKey.startsWith(STAT_OBJ_PREFIX),
+ (k, oldV, newV) -> {
+ // Skip invoke on start node (see 'ReadableDistributedMetaStorage#listen' the second case)
+ // The update statistics on start node is handled by 'scanAndCheckLocalStatistic' method
+ // called on exchange done.
+ if (topVer == null)
+ return;
- mgmtBusyExecutor.execute(() -> {
StatisticsObjectConfiguration newStatCfg = (StatisticsObjectConfiguration)newV;
- updateLocalStatistics(newStatCfg);
- });
- }
- );
- }
- };
+ for (Consumer<StatisticsObjectConfiguration> subscriber : subscribers)
+ subscriber.accept(newStatCfg);
+
+ mgmtBusyExecutor.execute(() -> updateLocalStatistics(newStatCfg));
+ }
+ );
+ }
+ };
/**
* Constructor.
@@ -204,7 +210,7 @@ public class IgniteStatisticsConfigurationManager {
*/
@Override public void accept(GridH2Table tbl, List<String> cols) {
assert !F.isEmpty(cols);
- dropStatistics(Collections.singletonList(
+ dropStatistics(Collections.singletonList(
new StatisticsTarget(
tbl.identifier().schema(),
tbl.getName(),
@@ -603,4 +609,13 @@ public class IgniteStatisticsConfigurationManager {
return sb.toString();
}
+
+ /**
+ * Subscribe to statistics configuration changed.
+ *
+ * @param subscriber Subscriber.
+ */
+ public void subscribe(Consumer<StatisticsObjectConfiguration> subscriber) {
+ subscribers.add(subscriber);
+ }
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsHelper.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsHelper.java
index 8f1893a..44b6cde 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsHelper.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsHelper.java
@@ -30,13 +30,18 @@ import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.processors.query.h2.SchemaManager;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
import org.apache.ignite.internal.processors.query.stat.config.StatisticsColumnConfiguration;
import org.apache.ignite.internal.processors.query.stat.config.StatisticsObjectConfiguration;
import org.apache.ignite.internal.processors.query.stat.messages.StatisticsKeyMessage;
+import org.apache.ignite.internal.processors.query.stat.messages.StatisticsRequest;
import org.apache.ignite.internal.util.typedef.F;
import org.h2.table.Column;
import org.jetbrains.annotations.Nullable;
@@ -68,6 +73,61 @@ public class IgniteStatisticsHelper {
}
/**
+ * Get cache group context by specified statistics key.
+ *
+ * @param key Statistics key to get context by.
+ * @return Cache group context for the given key.
+ * @throws IgniteCheckedException If unable to find table by specified key.
+ */
+ public CacheGroupContext groupContext(StatisticsKey key) throws IgniteCheckedException {
+ GridH2Table tbl = schemaMgr.dataTable(key.schema(), key.obj());
+
+ if (tbl == null)
+ throw new IgniteCheckedException(String.format("Can't find object %s.%s", key.schema(), key.obj()));
+
+ return tbl.cacheContext().group();
+ }
+
+ /**
+ * Generate local statistics requests.
+ *
+ * @param target Statistics target to request local statistics by.
+ * @param cfg Statistics configuration.
+ * @return Collection of statistics request.
+ */
+ public List<StatisticsAddressedRequest> generateGatheringRequests(
+ StatisticsTarget target,
+ StatisticsObjectConfiguration cfg
+ ) throws IgniteCheckedException {
+ List<String> cols = (target.columns() == null) ? null : Arrays.asList(target.columns());
+ StatisticsKeyMessage keyMsg = new StatisticsKeyMessage(target.schema(), target.obj(), cols);
+
+ Map<String, Long> versions = cfg.columns().entrySet().stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().version()));
+ CacheGroupContext grpCtx = groupContext(target.key());
+ AffinityTopologyVersion topVer = grpCtx.affinity().lastVersion();
+
+ StatisticsRequest req = new StatisticsRequest(UUID.randomUUID(), keyMsg, StatisticsType.LOCAL, topVer, versions);
+
+ List<List<ClusterNode>> assignments = grpCtx.affinity().assignments(topVer);
+ Set<UUID> nodes = new HashSet<>();
+
+ for (List<ClusterNode> partNodes : assignments) {
+ if (F.isEmpty(partNodes))
+ continue;
+
+ nodes.add(partNodes.get(0).id());
+ }
+
+ List<StatisticsAddressedRequest> res = new ArrayList<>(nodes.size());
+
+ for (UUID nodeId : nodes)
+ res.add(new StatisticsAddressedRequest(req, nodeId));
+
+ return res;
+ }
+
+ /**
* Aggregate specified partition level statistics to local level statistics.
*
* @param cfg Statistics object configuration.
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsManagerImpl.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsManagerImpl.java
index f68b5b0..07ff549 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsManagerImpl.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsManagerImpl.java
@@ -22,6 +22,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.Consumer;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
@@ -30,6 +31,7 @@ import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheUtils;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
@@ -72,6 +74,9 @@ public class IgniteStatisticsManagerImpl implements IgniteStatisticsManager {
/** Statistics repository. */
private final IgniteStatisticsRepository statsRepos;
+ /** Global statistics repository. */
+ private final IgniteGlobalStatisticsManager globalStatsMgr;
+
/** Ignite statistics helper. */
private final IgniteStatisticsHelper helper;
@@ -144,31 +149,25 @@ public class IgniteStatisticsManagerImpl implements IgniteStatisticsManager {
IgniteCacheDatabaseSharedManager db = (GridCacheUtils.isPersistenceEnabled(ctx.config())) ?
ctx.cache().context().database() : null;
- if (serverNode) {
- gatherPool = new IgniteThreadPoolExecutor("stat-gather",
- ctx.igniteInstanceName(),
- 0,
- STATS_POOL_SIZE,
- IgniteConfiguration.DFLT_THREAD_KEEP_ALIVE_TIME,
- new LinkedBlockingQueue<>(),
- GridIoPolicy.UNDEFINED,
- ctx.uncaughtExceptionHandler()
- );
-
- mgmtPool = new IgniteThreadPoolExecutor("stat-mgmt",
- ctx.igniteInstanceName(),
- 0,
- 1,
- IgniteConfiguration.DFLT_THREAD_KEEP_ALIVE_TIME,
- new LinkedBlockingQueue<>(),
- GridIoPolicy.UNDEFINED,
- ctx.uncaughtExceptionHandler()
- );
- }
- else {
- gatherPool = null;
- mgmtPool = null;
- }
+ gatherPool = (serverNode) ? new IgniteThreadPoolExecutor("stat-gather",
+ ctx.igniteInstanceName(),
+ 0,
+ STATS_POOL_SIZE,
+ IgniteConfiguration.DFLT_THREAD_KEEP_ALIVE_TIME,
+ new LinkedBlockingQueue<>(),
+ GridIoPolicy.UNDEFINED,
+ ctx.uncaughtExceptionHandler()
+ ) : null;
+
+ mgmtPool = new IgniteThreadPoolExecutor("stat-mgmt",
+ ctx.igniteInstanceName(),
+ 0,
+ 1,
+ IgniteConfiguration.DFLT_THREAD_KEEP_ALIVE_TIME,
+ new LinkedBlockingQueue<>(),
+ GridIoPolicy.UNDEFINED,
+ ctx.uncaughtExceptionHandler()
+ );
obsolescenceBusyExecutor = new BusyExecutor("obsolescence", mgmtPool, ctx::log);
@@ -201,6 +200,17 @@ public class IgniteStatisticsManagerImpl implements IgniteStatisticsManager {
serverNode
);
+ globalStatsMgr = new IgniteGlobalStatisticsManager(
+ this,
+ ctx.systemView(),
+ mgmtPool,
+ ctx.discovery(),
+ ctx.state(),
+ ctx.cache().context().exchange(),
+ helper,
+ ctx.io(),
+ ctx::log);
+
ctx.internalSubscriptionProcessor().registerDistributedConfigurationListener(dispatcher -> {
usageState.addListener((name, oldVal, newVal) -> {
if (log.isInfoEnabled())
@@ -253,6 +263,7 @@ public class IgniteStatisticsManagerImpl implements IgniteStatisticsManager {
if (log.isDebugEnabled())
log.debug("Stopping statistics subsystem");
+ globalStatsMgr.stop();
statCfgMgr.stop();
if (statProc != null)
@@ -292,6 +303,7 @@ public class IgniteStatisticsManagerImpl implements IgniteStatisticsManager {
statProc.start();
statCfgMgr.start();
+ globalStatsMgr.start();
started = true;
}
@@ -307,7 +319,25 @@ public class IgniteStatisticsManagerImpl implements IgniteStatisticsManager {
@Override public ObjectStatistics getLocalStatistics(StatisticsKey key) {
StatisticsUsageState currState = usageState();
- return (currState == ON || currState == NO_UPDATE) ? statsRepos.getLocalStatistics(key) : null;
+ return (currState == ON || currState == NO_UPDATE) ? statsRepos.getLocalStatistics(key, null) : null;
+ }
+
+ /**
+ * Get local statitsics with specified topology version if exists.
+ *
+ * @param key Key to get statistics by.
+ * @param topVer Required topology version.
+ * @return Local object statistics or {@code null} if there are no statistics with requested topology version.
+ */
+ public ObjectStatisticsImpl getLocalStatistics(StatisticsKey key, AffinityTopologyVersion topVer) {
+ return statsRepos.getLocalStatistics(key, topVer);
+ }
+
+ /** {@inheritDoc} */
+ @Override public ObjectStatistics getGlobalStatistics(StatisticsKey key) {
+ StatisticsUsageState currState = usageState();
+
+ return (currState == ON || currState == NO_UPDATE) ? globalStatsMgr.getGlobalStatistics(key) : null;
}
/** {@inheritDoc} */
@@ -440,12 +470,14 @@ public class IgniteStatisticsManagerImpl implements IgniteStatisticsManager {
for (StatisticsKey key : keys) {
StatisticsObjectConfiguration cfg = null;
+
try {
- cfg = statCfgMgr.config(key);
+ cfg = statCfgMgr.config(key);
}
catch (IgniteCheckedException e) {
// No-op/
}
+
Set<Integer> tasksParts = calculateObsolescencedPartitions(cfg, statsRepos.getObsolescence(key));
GridH2Table tbl = schemaMgr.dataTable(key.schema(), key.obj());
@@ -489,6 +521,24 @@ public class IgniteStatisticsManagerImpl implements IgniteStatisticsManager {
}
/**
+ * Subscribe to all local statistics changes.
+ *
+ * @param subscriber Local statitics subscriber.
+ */
+ public void subscribeToLocalStatistics(Consumer<ObjectStatisticsEvent> subscriber) {
+ statsRepos.subscribeToLocalStatistics(subscriber);
+ }
+
+ /**
+ * Subscribe to all statistics configuration changed.
+ *
+ * @param subscriber Statistics configuration subscriber.
+ */
+ public void subscribeToStatisticsConfig(Consumer<StatisticsObjectConfiguration> subscriber) {
+ statCfgMgr.subscribe(subscriber);
+ }
+
+ /**
* Check that cluster is active.
*
* @param op Operation name.
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsRepository.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsRepository.java
index 9dac793..ea4360d 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsRepository.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsRepository.java
@@ -23,12 +23,15 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.managers.systemview.GridSystemViewManager;
import org.apache.ignite.internal.managers.systemview.walker.StatisticsColumnLocalDataViewWalker;
import org.apache.ignite.internal.managers.systemview.walker.StatisticsColumnPartitionDataViewWalker;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.query.stat.config.StatisticsObjectConfiguration;
import org.apache.ignite.internal.processors.query.stat.view.ColumnLocalDataViewSupplier;
import org.apache.ignite.internal.processors.query.stat.view.ColumnPartitionDataViewSupplier;
@@ -60,7 +63,7 @@ public class IgniteStatisticsRepository {
private final IgniteStatisticsStore store;
/** Local (for current node) object statistics. */
- private final Map<StatisticsKey, ObjectStatisticsImpl> locStats = new ConcurrentHashMap<>();
+ private final Map<StatisticsKey, VersionedStatistics> locStats = new ConcurrentHashMap<>();
/** Obsolescence for each partition. */
private final Map<StatisticsKey, IntMap<ObjectPartitionStatisticsObsolescence>> statObs = new ConcurrentHashMap<>();
@@ -68,6 +71,9 @@ public class IgniteStatisticsRepository {
/** Statistics helper (msg converter). */
private final IgniteStatisticsHelper helper;
+ /** Local statistics subscribers. */
+ private final List<Consumer<ObjectStatisticsEvent>> subscribers = new CopyOnWriteArrayList<>();
+
/**
* Constructor.
*
@@ -178,9 +184,19 @@ public class IgniteStatisticsRepository {
*
* @param key Object key.
* @param statistics Statistics to save.
+ * @param topVer Topology version.
*/
- public void saveLocalStatistics(StatisticsKey key, ObjectStatisticsImpl statistics) {
- locStats.put(key, statistics);
+ public void saveLocalStatistics(
+ StatisticsKey key,
+ ObjectStatisticsImpl statistics,
+ AffinityTopologyVersion topVer
+ ) {
+ locStats.put(key, new VersionedStatistics(topVer, statistics));
+
+ ObjectStatisticsEvent newLocalStat = new ObjectStatisticsEvent(key, statistics, topVer);
+
+ for (Consumer<ObjectStatisticsEvent> subscriber : subscribers)
+ subscriber.accept(newLocalStat);
}
/**
@@ -213,10 +229,16 @@ public class IgniteStatisticsRepository {
* Get local statistics.
*
* @param key Object key to load statistics by.
+ * @param topVer Required topology version.
* @return Object local statistics or {@code null} if there are no statistics collected for such object.
*/
- public ObjectStatisticsImpl getLocalStatistics(StatisticsKey key) {
- return locStats.get(key);
+ public ObjectStatisticsImpl getLocalStatistics(StatisticsKey key, AffinityTopologyVersion topVer) {
+ VersionedStatistics stat = locStats.get(key);
+
+ if (stat == null)
+ return null;
+
+ return (topVer != null && !stat.topologyVersion().equals(topVer)) ? null : stat.statistics();
}
/**
@@ -224,7 +246,7 @@ public class IgniteStatisticsRepository {
*
* @return Local (for current node) object statistics.
*/
- public Map<StatisticsKey, ObjectStatisticsImpl> localStatisticsMap() {
+ public Map<StatisticsKey, VersionedStatistics> localStatisticsMap() {
return locStats;
}
@@ -240,11 +262,13 @@ public class IgniteStatisticsRepository {
*
* @param stats Partitions statistics to aggregate.
* @param cfg Statistic configuration to specify statistic object to aggregate.
+ * @param topVer Topology version to which specified partition set actual for.
* @return aggregated local statistic.
*/
public ObjectStatisticsImpl aggregatedLocalStatistics(
Collection<ObjectPartitionStatisticsImpl> stats,
- StatisticsObjectConfiguration cfg
+ StatisticsObjectConfiguration cfg,
+ AffinityTopologyVersion topVer
) {
if (log.isDebugEnabled()) {
log.debug("Refresh local aggregated statistic [cfg=" + cfg +
@@ -254,7 +278,7 @@ public class IgniteStatisticsRepository {
ObjectStatisticsImpl locStat = helper.aggregateLocalStatistics(cfg, stats);
if (locStat != null)
- saveLocalStatistics(cfg.key(), locStat);
+ saveLocalStatistics(cfg.key(), locStat, topVer);
return locStat;
}
@@ -320,4 +344,50 @@ public class IgniteStatisticsRepository {
}
});
}
+
+ /**
+ * Subscribe to all local statistics changes.
+ *
+ * @param subscriber Local statitics subscriber.
+ */
+ public void subscribeToLocalStatistics(Consumer<ObjectStatisticsEvent> subscriber
+ ) {
+ subscribers.add(subscriber);
+ }
+
+ /**
+ * Object statistics with topology version to which it is actual for.
+ */
+ public static class VersionedStatistics {
+ /** Topology version. */
+ private final AffinityTopologyVersion topVer;
+
+ /** Statistics. */
+ private final ObjectStatisticsImpl statistics;
+
+ /**
+ * Constructor.
+ *
+ * @param topVer Topology version.
+ * @param statistics Statistics.
+ */
+ public VersionedStatistics(AffinityTopologyVersion topVer, ObjectStatisticsImpl statistics) {
+ this.topVer = topVer;
+ this.statistics = statistics;
+ }
+
+ /**
+ * @return Topology version.
+ */
+ public AffinityTopologyVersion topologyVersion() {
+ return topVer;
+ }
+
+ /**
+ * @return Statistics.
+ */
+ public ObjectStatisticsImpl statistics() {
+ return statistics;
+ }
+ }
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/ObjectStatisticsEvent.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/ObjectStatisticsEvent.java
new file mode 100644
index 0000000..56450b4
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/ObjectStatisticsEvent.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.stat;
+
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+
+/**
+ * Local object statistics change event.
+ */
+public class ObjectStatisticsEvent {
+ /** Statistics key. */
+ private final StatisticsKey key;
+
+ /** Local object statistics. */
+ private final ObjectStatisticsImpl statistics;
+
+ /** Local statistics topology version. */
+ private final AffinityTopologyVersion topVer;
+
+ /**
+ * Constructor.
+ *
+ * @param key Statistics key.
+ * @param statistics Local object statistics.
+ * @param topVer Affinity topology version for whick local object statistics were calculated.
+ */
+ public ObjectStatisticsEvent(StatisticsKey key,
+ ObjectStatisticsImpl statistics, AffinityTopologyVersion topVer) {
+ this.key = key;
+ this.statistics = statistics;
+ this.topVer = topVer;
+ }
+
+ /**
+ * Get statistics key.
+ *
+ * @return Statistics key.
+ */
+ public StatisticsKey key() {
+ return key;
+ }
+
+ /**
+ * Get object statistics.
+ *
+ * @return Object statistics.
+ */
+ public ObjectStatisticsImpl statistics() {
+ return statistics;
+ }
+
+ /**
+ * Get affinity topology version.
+ *
+ * @return topology version for which statisics were calculated.
+ */
+ public AffinityTopologyVersion topologyVersion() {
+ return topVer;
+ }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/ObjectStatisticsImpl.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/ObjectStatisticsImpl.java
index 8325f88..4b8952c 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/ObjectStatisticsImpl.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/ObjectStatisticsImpl.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.query.stat;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -104,4 +105,19 @@ public class ObjectStatisticsImpl implements Cloneable, ObjectStatistics {
@Override public String toString() {
return S.toString(ObjectStatisticsImpl.class, this);
}
+
+ /**
+ * Remove specified columns from clone of current ObjectStatistics object.
+ *
+ * @param cols Columns to remove.
+ * @return Cloned object without specified columns statistics.
+ */
+ public <T extends ObjectStatisticsImpl> T subtract(Set<String> cols) {
+ T res = (T)clone();
+
+ for (String col : cols)
+ res.columnsStatistics().remove(col);
+
+ return res;
+ }
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/StatisticsType.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/StatisticsAddressedRequest.java
similarity index 53%
copy from modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/StatisticsType.java
copy to modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/StatisticsAddressedRequest.java
index 38f3f67a..8a7c5fd 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/StatisticsType.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/StatisticsAddressedRequest.java
@@ -17,28 +17,42 @@
package org.apache.ignite.internal.processors.query.stat;
-import org.jetbrains.annotations.Nullable;
+import java.util.UUID;
+import org.apache.ignite.internal.processors.query.stat.messages.StatisticsRequest;
/**
- * Types of statistics width.
+ * Statistics request with target/sender node id.
*/
-public enum StatisticsType {
- /** Statistics by some particular partition. */
- PARTITION,
+public class StatisticsAddressedRequest {
+ /** Statistics request */
+ private final StatisticsRequest req;
- /** Statistics by some data node. */
- LOCAL;
-
- /** Enumerated values. */
- private static final StatisticsType[] VALUES = values();
+ /** Target/sender node id. */
+ private final UUID nodeId;
/**
- * Efficiently gets enumerated value from its ordinal.
+ * Constructor.
*
- * @param ord Ordinal value.
- * @return Enumerated value or {@code null} if ordinal out of range.
+ * @param req Statistics request.
+ * @param nodeId Target/sender node id.
+ */
+ public StatisticsAddressedRequest(StatisticsRequest req, UUID nodeId) {
+ this.req = req;
+ this.nodeId = nodeId;
+ }
+
+ /**
+ * @return Statistics request.
*/
- @Nullable public static StatisticsType fromOrdinal(int ord) {
- return ord >= 0 && ord < VALUES.length ? VALUES[ord] : null;
+ public StatisticsRequest req() {
+ return req;
+ }
+
+ /**
+ * @return Target/sender node id.
+ */
+ public UUID nodeId() {
+ return nodeId;
}
}
+
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/StatisticsProcessor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/StatisticsProcessor.java
index 34fd6bd..84c7cb0 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/StatisticsProcessor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/StatisticsProcessor.java
@@ -235,7 +235,7 @@ public class StatisticsProcessor {
Collection<ObjectPartitionStatisticsImpl> allParts = statRepo.getLocalPartitionsStatistics(key);
if (ctx.forceRecollect())
- statRepo.aggregatedLocalStatistics(allParts, ctx.configuration());
+ statRepo.aggregatedLocalStatistics(allParts, ctx.configuration(), ctx.topologyVersion());
else {
Set<Integer> partsToRemove = new HashSet<>();
Collection<ObjectPartitionStatisticsImpl> partsToAggregate = new ArrayList<>();
@@ -251,7 +251,7 @@ public class StatisticsProcessor {
statRepo.clearLocalPartitionsStatistics(ctx.configuration().key(), partsToRemove);
if (!partsToAggregate.isEmpty())
- statRepo.aggregatedLocalStatistics(partsToAggregate, ctx.configuration());
+ statRepo.aggregatedLocalStatistics(partsToAggregate, ctx.configuration(), ctx.topologyVersion());
}
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/StatisticsType.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/StatisticsType.java
index 38f3f67a..29900bb 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/StatisticsType.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/StatisticsType.java
@@ -27,7 +27,10 @@ public enum StatisticsType {
PARTITION,
/** Statistics by some data node. */
- LOCAL;
+ LOCAL,
+
+ /** Statistics by the whole object. */
+ GLOBAL,;
/** Enumerated values. */
private static final StatisticsType[] VALUES = values();
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/StatisticsUtils.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/StatisticsUtils.java
index a210944..99d2624 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/StatisticsUtils.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/StatisticsUtils.java
@@ -24,6 +24,8 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessage;
import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessageFactory;
+import org.apache.ignite.internal.processors.query.stat.config.StatisticsColumnConfiguration;
+import org.apache.ignite.internal.processors.query.stat.config.StatisticsObjectConfiguration;
import org.apache.ignite.internal.processors.query.stat.messages.StatisticsColumnData;
import org.apache.ignite.internal.processors.query.stat.messages.StatisticsKeyMessage;
import org.apache.ignite.internal.processors.query.stat.messages.StatisticsObjectData;
@@ -31,7 +33,7 @@ import org.apache.ignite.internal.util.typedef.F;
import org.h2.value.Value;
/**
- * Utilities to convert statistics from/to messages.
+ * Utilities to convert statistics from/to messages, validate configurations with statistics and so on.
*/
public class StatisticsUtils {
/**
@@ -172,4 +174,63 @@ public class StatisticsUtils {
String[] cols = (msg.colNames() == null) ? null : msg.colNames().toArray(new String[0]);
return new StatisticsTarget(msg.schema(), msg.obj(), cols);
}
+
+ /**
+ * Test if specified statistics are fit to all required versions.
+ * It means that specified statistics contains all columns with at least versions
+ * from specified map.
+ *
+ * @param stat Statistics to check. Can be {@code null}.
+ * @param versions Map of column name to required version.
+ * @return positive value if statistics versions bigger than specified in versions map,
+ * negative value if statistics version smaller than specified in version map,
+ * zero it they are equals.
+ */
+ public static int compareVersions(
+ ObjectStatisticsImpl stat,
+ Map<String, Long> versions
+ ) {
+ if (stat == null)
+ return -1;
+
+ for (Map.Entry<String, Long> version : versions.entrySet()) {
+ ColumnStatistics colStat = stat.columnsStatistics().get(version.getKey());
+
+ if (colStat == null || colStat.version() < version.getValue())
+ return -1;
+
+ if (colStat.version() > version.getValue())
+ return 1;
+ }
+
+ return 0;
+ }
+
+ /**
+ * Test if secified statistics configuration is fit to all required versions.
+ * It means that specified statistics configuraion contains all columns with at least versions from specified map.
+ *
+ * @param cfg Statistics configuraion to check. Can be {@code null}.
+ * @param versions Map of column name to required version.
+ * @return {@code true} if it is, {@code talse} otherwise.
+ */
+ public static int compareVersions(
+ StatisticsObjectConfiguration cfg,
+ Map<String, Long> versions
+ ) {
+ if (cfg == null)
+ return -1;
+
+ for (Map.Entry<String, Long> colVersion : versions.entrySet()) {
+ StatisticsColumnConfiguration colCfg = cfg.columns().get(colVersion.getKey());
+
+ if (colCfg == null || colCfg.version() < colVersion.getValue())
+ return -1;
+
+ if (colCfg.version() > colVersion.getValue())
+ return 1;
+ }
+
+ return 0;
+ }
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/messages/StatisticsKeyMessage.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/messages/StatisticsKeyMessage.java
index 481c39d..7beab33 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/messages/StatisticsKeyMessage.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/messages/StatisticsKeyMessage.java
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.Objects;
import org.apache.ignite.internal.GridDirectCollection;
+import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
@@ -177,6 +178,11 @@ public class StatisticsKeyMessage implements Message {
}
/** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(StatisticsKeyMessage.class, this);
+ }
+
+ /** {@inheritDoc} */
@Override public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/messages/StatisticsRequest.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/messages/StatisticsRequest.java
new file mode 100644
index 0000000..ea7c38d
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/messages/StatisticsRequest.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.stat.messages;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.internal.GridDirectMap;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.query.stat.StatisticsType;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * Request for statistics.
+ */
+public class StatisticsRequest implements Message {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ public static final short TYPE_CODE = 186;
+
+ /** Gathering id. */
+ private UUID reqId;
+
+ /** Key to supply statistics by. */
+ private StatisticsKeyMessage key;
+
+ /** Type of required statistcs. */
+ private StatisticsType type;
+
+ /** For local statistics request - column config versions map: name to version. */
+ @GridDirectMap(keyType = String.class, valueType = Long.class)
+ private Map<String, Long> versions;
+
+ /** For local statistics request - version to gather statistics by. */
+ private AffinityTopologyVersion topVer;
+
+ /**
+ * Constructor.
+ */
+ public StatisticsRequest() {
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param reqId Request id.
+ * @param key Statistics key to get statistics by.
+ * @param type Required statistics type.
+ * @param topVer Topology version to get statistics by.
+ * @param versions Map of statistics version to column name to ensure actual statistics aquired.
+ */
+ public StatisticsRequest(
+ UUID reqId,
+ StatisticsKeyMessage key,
+ StatisticsType type,
+ AffinityTopologyVersion topVer,
+ Map<String, Long> versions
+ ) {
+ this.reqId = reqId;
+ this.key = key;
+ this.type = type;
+ this.topVer = topVer;
+ this.versions = versions;
+ }
+
+ /** Request id. */
+ public UUID reqId() {
+ return reqId;
+ }
+
+ /**
+ * @return Required statistics type.
+ */
+ public StatisticsType type() {
+ return type;
+ }
+
+ /**
+ * @return Key for required statitics.
+ */
+ public StatisticsKeyMessage key() {
+ return key;
+ }
+
+ /**
+ * @return Topology version.
+ */
+ public AffinityTopologyVersion topVer() {
+ return topVer;
+ }
+
+ /**
+ * @return Column name to version map.
+ */
+ public Map<String, Long> versions() {
+ return versions;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(StatisticsRequest.class, this);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 0:
+ if (!writer.writeMessage("key", key))
+ return false;
+
+ writer.incrementState();
+
+ case 1:
+ if (!writer.writeUuid("reqId", reqId))
+ return false;
+
+ writer.incrementState();
+
+ case 2:
+ if (!writer.writeAffinityTopologyVersion("topVer", topVer))
+ return false;
+
+ writer.incrementState();
+
+ case 3:
+ if (!writer.writeByte("type", type != null ? (byte)type.ordinal() : -1))
+ return false;
+
+ writer.incrementState();
+
+ case 4:
+ if (!writer.writeMap("versions", versions, MessageCollectionItemType.STRING, MessageCollectionItemType.LONG))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ switch (reader.state()) {
+ case 0:
+ key = reader.readMessage("key");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 1:
+ reqId = reader.readUuid("reqId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 2:
+ topVer = reader.readAffinityTopologyVersion("topVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 3:
+ byte typeOrd;
+
+ typeOrd = reader.readByte("type");
+
+ if (!reader.isLastRead())
+ return false;
+
+ type = StatisticsType.fromOrdinal(typeOrd);
+
+ reader.incrementState();
+
+ case 4:
+ versions = reader.readMap("versions", MessageCollectionItemType.STRING, MessageCollectionItemType.LONG, false);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(StatisticsRequest.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return TYPE_CODE;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 5;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onAckReceived() {
+
+ }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/messages/StatisticsKeyMessage.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/messages/StatisticsResponse.java
similarity index 52%
copy from modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/messages/StatisticsKeyMessage.java
copy to modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/messages/StatisticsResponse.java
index 481c39d..fcf7063 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/messages/StatisticsKeyMessage.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/messages/StatisticsResponse.java
@@ -17,76 +17,66 @@
package org.apache.ignite.internal.processors.query.stat.messages;
-import java.io.Externalizable;
import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Objects;
-
-import org.apache.ignite.internal.GridDirectCollection;
+import java.util.UUID;
+import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
/**
- * Key, describing the object of statistics. For example: table with some columns.
+ * Response for statistics request.
*/
-public class StatisticsKeyMessage implements Message {
+public class StatisticsResponse implements Message {
/** */
private static final long serialVersionUID = 0L;
/** */
- public static final short TYPE_CODE = 183;
-
- /** Object schema. */
- private String schema;
+ public static final short TYPE_CODE = 187;
- /** Object name. */
- private String obj;
+ /** Request id. */
+ private UUID reqId;
- /** Optional list of columns to collect statistics by. */
- @GridDirectCollection(String.class)
- private List<String> colNames;
+ /** Requested statistics. */
+ private StatisticsObjectData data;
/**
- * {@link Externalizable} support.
+ * Constructor.
*/
- public StatisticsKeyMessage() {
- // No-op.
+ public StatisticsResponse() {
}
/**
* Constructor.
*
- * @param schema Schema name.
- * @param obj Object name.
- * @param colNames Column names.
+ * @param reqId Request id
+ * @param data Statistics data.
*/
- public StatisticsKeyMessage(String schema, String obj, List<String> colNames) {
- this.schema = schema;
- this.obj = obj;
- this.colNames = colNames;
+ public StatisticsResponse(
+ UUID reqId,
+ StatisticsObjectData data
+ ) {
+ this.reqId = reqId;
+ this.data = data;
}
/**
- * @return Schema name.
+ * @return Request id.
*/
- public String schema() {
- return schema;
+ public UUID reqId() {
+ return reqId;
}
/**
- * @return Object name.
+ * @return Statitics data.
*/
- public String obj() {
- return obj;
+ public StatisticsObjectData data() {
+ return data;
}
- /**
- * @return Column names.
- */
- public List<String> colNames() {
- return colNames;
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(StatisticsResponse.class, this);
}
/** {@inheritDoc} */
@@ -102,19 +92,13 @@ public class StatisticsKeyMessage implements Message {
switch (writer.state()) {
case 0:
- if (!writer.writeCollection("colNames", colNames, MessageCollectionItemType.STRING))
+ if (!writer.writeMessage("data", data))
return false;
writer.incrementState();
case 1:
- if (!writer.writeString("obj", obj))
- return false;
-
- writer.incrementState();
-
- case 2:
- if (!writer.writeString("schema", schema))
+ if (!writer.writeUuid("reqId", reqId))
return false;
writer.incrementState();
@@ -133,7 +117,7 @@ public class StatisticsKeyMessage implements Message {
switch (reader.state()) {
case 0:
- colNames = reader.readCollection("colNames", MessageCollectionItemType.STRING);
+ data = reader.readMessage("data");
if (!reader.isLastRead())
return false;
@@ -141,15 +125,7 @@ public class StatisticsKeyMessage implements Message {
reader.incrementState();
case 1:
- obj = reader.readString("obj");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 2:
- schema = reader.readString("schema");
+ reqId = reader.readUuid("reqId");
if (!reader.isLastRead())
return false;
@@ -158,7 +134,7 @@ public class StatisticsKeyMessage implements Message {
}
- return reader.afterMessageRead(StatisticsKeyMessage.class);
+ return reader.afterMessageRead(StatisticsResponse.class);
}
/** {@inheritDoc} */
@@ -168,26 +144,11 @@ public class StatisticsKeyMessage implements Message {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 3;
+ return 2;
}
/** {@inheritDoc} */
@Override public void onAckReceived() {
}
-
- /** {@inheritDoc} */
- @Override public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- StatisticsKeyMessage that = (StatisticsKeyMessage) o;
- return Objects.equals(schema, that.schema) &&
- Objects.equals(obj, that.obj) &&
- Objects.equals(colNames, that.colNames);
- }
-
- /** {@inheritDoc} */
- @Override public int hashCode() {
- return Objects.hash(schema, obj, colNames);
- }
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/view/ColumnLocalDataViewSupplier.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/view/ColumnLocalDataViewSupplier.java
index ba13e00..040d771 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/view/ColumnLocalDataViewSupplier.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/view/ColumnLocalDataViewSupplier.java
@@ -67,7 +67,7 @@ public class ColumnLocalDataViewSupplier {
if (!F.isEmpty(schema) && !F.isEmpty(name)) {
StatisticsKey key = new StatisticsKey(schema, name);
- ObjectStatisticsImpl objLocStat = repo.getLocalStatistics(key);
+ ObjectStatisticsImpl objLocStat = repo.getLocalStatistics(key, null);
if (objLocStat == null)
return Collections.emptyList();
@@ -77,7 +77,7 @@ public class ColumnLocalDataViewSupplier {
else
locStatsMap = repo.localStatisticsMap().entrySet().stream()
.filter(e -> F.isEmpty(schema) || schema.equals(e.getKey().schema()))
- .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().statistics()));
List<StatisticsColumnLocalDataView> res = new ArrayList<>();
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/view/StatisticsColumnLocalDataView.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/view/StatisticsColumnGlobalDataView.java
similarity index 94%
copy from modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/view/StatisticsColumnLocalDataView.java
copy to modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/view/StatisticsColumnGlobalDataView.java
index 20c96dc..81e1a6a 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/view/StatisticsColumnLocalDataView.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/view/StatisticsColumnGlobalDataView.java
@@ -18,16 +18,15 @@
package org.apache.ignite.internal.processors.query.stat.view;
import java.sql.Timestamp;
-
import org.apache.ignite.internal.managers.systemview.walker.Filtrable;
import org.apache.ignite.internal.managers.systemview.walker.Order;
import org.apache.ignite.internal.processors.query.stat.ObjectStatisticsImpl;
import org.apache.ignite.internal.processors.query.stat.StatisticsKey;
/**
- * Statistics partition data view.
+ * Statistics global data view.
*/
-public class StatisticsColumnLocalDataView {
+public class StatisticsColumnGlobalDataView {
/** Statistics key */
private final StatisticsKey key;
@@ -44,7 +43,7 @@ public class StatisticsColumnLocalDataView {
* @param column Column name.
* @param statistics Object local statistics.
*/
- public StatisticsColumnLocalDataView(StatisticsKey key, String column, ObjectStatisticsImpl statistics) {
+ public StatisticsColumnGlobalDataView(StatisticsKey key, String column, ObjectStatisticsImpl statistics) {
this.key = key;
this.column = column;
this.statistics = statistics;
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/view/StatisticsColumnLocalDataView.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/view/StatisticsColumnLocalDataView.java
index 20c96dc..8db1888 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/view/StatisticsColumnLocalDataView.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/view/StatisticsColumnLocalDataView.java
@@ -25,7 +25,7 @@ import org.apache.ignite.internal.processors.query.stat.ObjectStatisticsImpl;
import org.apache.ignite.internal.processors.query.stat.StatisticsKey;
/**
- * Statistics partition data view.
+ * Statistics local data view.
*/
public class StatisticsColumnLocalDataView {
/** Statistics key */
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsRepositoryStaticTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsRepositoryStaticTest.java
new file mode 100644
index 0000000..4d0a2ae
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsRepositoryStaticTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.stat;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.stream.Collectors;
+
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.h2.value.ValueInt;
+import org.junit.Test;
+
+/**
+ * Test of static methods of Ignite statistics repository.
+ */
+public class IgniteStatisticsRepositoryStaticTest extends StatisticsAbstractTest {
+ /** First default key. */
+ protected static final StatisticsKey K1 = new StatisticsKey(SCHEMA, "tab1");
+
+ /** Second default key. */
+ protected static final StatisticsKey K2 = new StatisticsKey(SCHEMA, "tab2");
+
+ /** Column statistics with 100 nulls. */
+ protected ColumnStatistics cs1 = new ColumnStatistics(null, null, 100, 0, 100,
+ 0, new byte[0], 0, U.currentTimeMillis());
+
+ /** Column statistics with 100 integers 0-100. */
+ protected ColumnStatistics cs2 = new ColumnStatistics(ValueInt.get(0), ValueInt.get(100), 0, 100, 100,
+ 4, new byte[0], 0, U.currentTimeMillis());
+
+ /** Column statistics with 0 rows. */
+ protected ColumnStatistics cs3 = new ColumnStatistics(null, null, 0, 0, 0, 0,
+ new byte[0], 0, U.currentTimeMillis());
+
+ /** Column statistics with 100 integers 0-10. */
+ protected ColumnStatistics cs4 = new ColumnStatistics(ValueInt.get(0), ValueInt.get(10), 0, 10, 100,
+ 4, new byte[0], 0, U.currentTimeMillis());
+
+ /**
+ * 1) Remove not existing column.
+ * 2) Remove some columns.
+ * 3) Remove all columns.
+ */
+ @Test
+ public void subtractTest() {
+ HashMap<String, ColumnStatistics> colStat1 = new HashMap<>();
+ colStat1.put("col1", cs1);
+ colStat1.put("col2", cs2);
+
+ ObjectStatisticsImpl os = new ObjectStatisticsImpl(100, colStat1);
+
+ // 1) Remove not existing column.
+ ObjectStatisticsImpl os1 = os.subtract(Collections.singleton("col0"));
+
+ assertEquals(os, os1);
+
+ // 2) Remove some columns.
+ ObjectStatisticsImpl os2 = os.subtract(Collections.singleton("col1"));
+
+ assertEquals(1, os2.columnsStatistics().size());
+ assertEquals(cs2, os2.columnStatistics("col2"));
+
+ // 3) Remove all columns.
+ ObjectStatisticsImpl os3 = os.subtract(Arrays.stream(new String[] {"col2", "col1"}).collect(Collectors.toSet()));
+
+ assertTrue(os3.columnsStatistics().isEmpty());
+ }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsRepositoryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsRepositoryTest.java
index b47f075..c8b64eb 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsRepositoryTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsRepositoryTest.java
@@ -24,6 +24,7 @@ import java.util.Map;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.managers.systemview.GridSystemViewManager;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageLifecycleListener;
import org.apache.ignite.internal.processors.metastorage.persistence.ReadWriteMetaStorageMock;
@@ -195,16 +196,16 @@ public class IgniteStatisticsRepositoryTest extends StatisticsAbstractTest {
ObjectStatisticsImpl stat1 = getStatistics();
ObjectStatisticsImpl stat2 = getStatistics();
- repo.saveLocalStatistics(K1, stat1);
- repo.saveLocalStatistics(K2, stat2);
+ repo.saveLocalStatistics(K1, stat1, AffinityTopologyVersion.ZERO);
+ repo.saveLocalStatistics(K2, stat2, AffinityTopologyVersion.ZERO);
- assertEquals(stat1, repo.getLocalStatistics(K1));
- assertEquals(stat1, repo.getLocalStatistics(K2));
+ assertEquals(stat1, repo.getLocalStatistics(K1, null));
+ assertEquals(stat1, repo.getLocalStatistics(K2, AffinityTopologyVersion.ZERO));
repo.clearLocalPartitionsStatistics(K1, null);
- assertNull(repo.getLocalStatistics(K1));
- assertEquals(stat2, repo.getLocalStatistics(K2));
+ assertNull(repo.getLocalStatistics(K1, AffinityTopologyVersion.ZERO));
+ assertEquals(stat2, repo.getLocalStatistics(K2, null));
}
/**
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/PSUBasicValueDistributionTableStatisticsUsageTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/PSUBasicValueDistributionTableStatisticsUsageTest.java
index 4a39535..b757565 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/PSUBasicValueDistributionTableStatisticsUsageTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/PSUBasicValueDistributionTableStatisticsUsageTest.java
@@ -93,7 +93,7 @@ public class PSUBasicValueDistributionTableStatisticsUsageTest extends Statistic
sql("CREATE INDEX empty_distribution_no_stat_col_a ON empty_distribution_no_stat(col_a)");
- collectStatistics("digital_distribution", "empty_distribution");
+ collectStatistics(StatisticsType.GLOBAL, "digital_distribution", "empty_distribution");
}
/**
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/PSUCompositeIndexTableStatisticsUsageTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/PSUCompositeIndexTableStatisticsUsageTest.java
index c44e350..391ee07 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/PSUCompositeIndexTableStatisticsUsageTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/PSUCompositeIndexTableStatisticsUsageTest.java
@@ -72,7 +72,7 @@ public class PSUCompositeIndexTableStatisticsUsageTest extends StatisticsAbstrac
sql(sql);
}
- collectStatistics("ci_table");
+ collectStatistics(StatisticsType.GLOBAL, "ci_table");
}
/**
@@ -92,12 +92,12 @@ public class PSUCompositeIndexTableStatisticsUsageTest extends StatisticsAbstrac
checkOptimalPlanChosenForDifferentIndexes(grid(0), new String[]{"CI_TABLE_ABC"}, sql, new String[1][]);
sql("CREATE INDEX ci_table_c ON ci_table(col_c)");
- updateStatistics("ci_table");
+ updateStatistics(StatisticsType.GLOBAL, "ci_table");
checkOptimalPlanChosenForDifferentIndexes(grid(0), new String[]{"CI_TABLE_ABC"}, sql, new String[1][]);
sql("DROP INDEX IF EXISTS ci_table_c");
- updateStatistics("ci_table");
+ updateStatistics(StatisticsType.GLOBAL, "ci_table");
}
/**
@@ -117,12 +117,12 @@ public class PSUCompositeIndexTableStatisticsUsageTest extends StatisticsAbstrac
checkOptimalPlanChosenForDifferentIndexes(grid(0), new String[]{}, sql, new String[1][]);
sql("CREATE INDEX ci_table_c ON ci_table(col_c)");
- updateStatistics("ci_table");
+ updateStatistics(StatisticsType.GLOBAL, "ci_table");
checkOptimalPlanChosenForDifferentIndexes(grid(0), new String[]{"CI_TABLE_C"}, sql, new String[1][]);
sql("DROP INDEX IF EXISTS ci_table_c");
- updateStatistics("ci_table");
+ updateStatistics(StatisticsType.GLOBAL, "ci_table");
}
/**
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/PSUStatisticPartialGatheringTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/PSUStatisticPartialGatheringTest.java
index bd3029c..d30ef38 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/PSUStatisticPartialGatheringTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/PSUStatisticPartialGatheringTest.java
@@ -52,7 +52,7 @@ public class PSUStatisticPartialGatheringTest extends StatisticsAbstractTest {
sql(String.format("insert into tbl_select(id, lo_select, med_select, hi_select) values(%d, %d, %d, %d)",
i, i % 10, i % 100, i % 1000));
- collectStatistics("tbl_select");
+ collectStatistics(StatisticsType.GLOBAL, "tbl_select");
}
/**
@@ -82,7 +82,7 @@ public class PSUStatisticPartialGatheringTest extends StatisticsAbstractTest {
String.format(SQL, 6, 6), NO_HINTS);
// All columns set up before also will be updated
- updateStatistics(new StatisticsTarget(SCHEMA, "TBL_SELECT", "LO_SELECT"));
+ updateStatistics(StatisticsType.GLOBAL, new StatisticsTarget(SCHEMA, "TBL_SELECT", "LO_SELECT"));
checkOptimalPlanChosenForDifferentIndexes(grid(0), new String[]{"TBL_SELECT_LO_IDX"},
String.format(SQL, 8, 8), NO_HINTS);
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/PSUStatisticsStorageTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/PSUStatisticsStorageTest.java
index ad16f14..19d97ed 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/PSUStatisticsStorageTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/PSUStatisticsStorageTest.java
@@ -43,7 +43,7 @@ public class PSUStatisticsStorageTest extends StatisticsStorageAbstractTest {
*/
@Test
public void testPartialDeletionCollection() throws Exception {
- collectStatistics(SMALL_TARGET);
+ collectStatistics(StatisticsType.GLOBAL, SMALL_TARGET);
IgniteEx ign = grid(0);
@@ -51,6 +51,8 @@ public class PSUStatisticsStorageTest extends StatisticsStorageAbstractTest {
checkOptimalPlanChosenForDifferentIndexes(ign, new String[]{"SMALL_B"}, SQL, NO_HINTS);
// 2) partially remove statistics for one extra column and check chat the rest statistics still can be used
+ log.info("Dropping statistics by A column...");
+
statisticsMgr(0).dropStatistics(new StatisticsTarget("PUBLIC", "SMALL", "A"));
assertTrue(GridTestUtils.waitForCondition(
@@ -61,6 +63,8 @@ public class PSUStatisticsStorageTest extends StatisticsStorageAbstractTest {
checkOptimalPlanChosenForDifferentIndexes(ign, new String[]{"SMALL_B"}, SQL, NO_HINTS);
// 3) partially remove necessarily for the query statistics and check that query plan will be changed
+ log.info("Dropping statistics by B column...");
+
statisticsMgr(0).dropStatistics(new StatisticsTarget(SCHEMA, "SMALL", "B"));
assertTrue(GridTestUtils.waitForCondition(
@@ -82,13 +86,15 @@ public class PSUStatisticsStorageTest extends StatisticsStorageAbstractTest {
// 4) partially collect statistics for extra column and check that query plan still unable to get all statistics
// it wants
- collectStatistics(new StatisticsTarget(SCHEMA, "SMALL", "A"));
+ log.info("Recollecting statistics by A column...");
+
+ collectStatistics(StatisticsType.GLOBAL, new StatisticsTarget(SCHEMA, "SMALL", "A"));
checkOptimalPlanChosenForDifferentIndexes(ign, new String[]{"SMALL_C"}, SQL, NO_HINTS);
// 5) partially collect statistics for the necessarily column
// and check that the query plan will restore to optimal
- collectStatistics(new StatisticsTarget(SCHEMA, "SMALL", "B"));
+ collectStatistics(StatisticsType.GLOBAL, new StatisticsTarget(SCHEMA, "SMALL", "B"));
checkOptimalPlanChosenForDifferentIndexes(ign, new String[]{"SMALL_B"}, SQL, NO_HINTS);
}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/PSUValueDistributionTableStatisticsUsageTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/PSUValueDistributionTableStatisticsUsageTest.java
index 3be7eb2..f163156 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/PSUValueDistributionTableStatisticsUsageTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/PSUValueDistributionTableStatisticsUsageTest.java
@@ -89,7 +89,7 @@ public class PSUValueDistributionTableStatisticsUsageTest extends StatisticsAbst
}
sql("INSERT INTO sized(id, small, big) VALUES(" + BIG_SIZE + ", null, null)");
- collectStatistics("sized");
+ collectStatistics(StatisticsType.GLOBAL, "sized");
}
/**
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/SqlStatisticsCommandTests.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/SqlStatisticsCommandTests.java
index 4d75e01..5c41193 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/SqlStatisticsCommandTests.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/SqlStatisticsCommandTests.java
@@ -28,13 +28,13 @@ import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.GridTestUtils;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
+import org.junit.Ignore;
import org.junit.Test;
/**
* Integration tests for statistics collection.
*/
+@Ignore("https://issues.apache.org/jira/browse/IGNITE-15455")
public class SqlStatisticsCommandTests extends StatisticsAbstractTest {
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
@@ -158,7 +158,6 @@ public class SqlStatisticsCommandTests extends StatisticsAbstractTest {
*/
@Test
public void testDropStatistics() throws IgniteInterruptedCheckedException {
- Logger.getLogger(StatisticsProcessor.class).setLevel(Level.TRACE);
sql("ANALYZE PUBLIC.TEST, test2");
testStatistics(SCHEMA, "TEST", false);
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsAbstractTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsAbstractTest.java
index f434eec..1de7857 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsAbstractTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsAbstractTest.java
@@ -25,6 +25,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -35,8 +36,12 @@ import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
@@ -78,7 +83,7 @@ public abstract class StatisticsAbstractTest extends GridCommonAbstractTest {
static final StatisticsTarget SMALL_TARGET = new StatisticsTarget(SMALL_KEY, null);
/** Async operation timeout for test */
- static final int TIMEOUT = 3_000;
+ static final int TIMEOUT = 5_000;
static {
assertTrue(SMALL_SIZE < MED_SIZE && MED_SIZE < BIG_SIZE);
@@ -353,79 +358,70 @@ public abstract class StatisticsAbstractTest extends GridCommonAbstractTest {
/**
* Update statistics on specified objects in PUBLIC schema.
*
+ * @param type Statistics type to collect statistics by.
* @param tables Tables where to update statistics.
*/
- protected void updateStatistics(@NotNull String... tables) {
+ protected void updateStatistics(StatisticsType type, @NotNull String... tables) {
StatisticsTarget[] targets = Arrays.stream(tables).map(tbl -> new StatisticsTarget(SCHEMA, tbl.toUpperCase()))
.toArray(StatisticsTarget[]::new);
- updateStatistics(targets);
+ updateStatistics(type, targets);
}
/**
* Collect statistics for specified objects in PUBLIC schema.
*
+ * @param type Statistics type to collect statistics by.
* @param tables Tables where to collect statistics.
*/
- protected void collectStatistics(@NotNull String... tables) {
+ protected void collectStatistics(StatisticsType type, @NotNull String... tables) {
StatisticsTarget[] targets = Arrays.stream(tables).map(tbl -> new StatisticsTarget(SCHEMA, tbl.toUpperCase()))
.toArray(StatisticsTarget[]::new);
- collectStatistics(targets);
+ collectStatistics(type, targets);
}
/**
* Update statistics on specified objects.
*
+ * @param type Statistics type to ubdate statistics by.
* @param targets Targets to refresh statistics by.
*/
- protected void updateStatistics(StatisticsTarget... targets) {
- makeStatistics(false, targets);
+ protected void updateStatistics(StatisticsType type, StatisticsTarget... targets) {
+ makeStatistics(false, type, targets);
}
/**
* Update statistics on specified objects.
*
+ * @param type Statistics type to update statistics by.
* @param targets Targets to collect statistics by.
*/
- protected void collectStatistics(StatisticsTarget... targets) {
- makeStatistics(true, targets);
+ protected void collectStatistics(StatisticsType type, StatisticsTarget... targets) {
+ makeStatistics(true, type, targets);
}
/**
* Collect or refresh statistics.
*
* @param collect If {@code true} - collect new statistics, if {@code false} - update existing.
- * @param targets
+ * @param type Statistics type to get statistics by.
+ * @param targets Targets to process statistics by.
*/
- private void makeStatistics(boolean collect, StatisticsTarget... targets) {
+ private void makeStatistics(boolean collect, StatisticsType type, StatisticsTarget... targets) {
try {
- Map<StatisticsTarget, Long> expectedVersion = new HashMap<>();
+ Map<StatisticsTarget, Long> expectedVer = new HashMap<>();
IgniteStatisticsManagerImpl statMgr = statisticsMgr(0);
- for (StatisticsTarget target : targets) {
- StatisticsObjectConfiguration currCfg = statMgr.statisticConfiguration().config(target.key());
- Predicate<StatisticsColumnConfiguration> pred;
- if (F.isEmpty(target.columns()))
- pred = c -> true;
- else {
- Set<String> cols = Arrays.stream(target.columns()).collect(Collectors.toSet());
-
- pred = c -> cols.contains(c.name());
- }
-
- Long expVer = (currCfg == null) ? 1L : currCfg.columnsAll().values().stream().filter(pred)
- .mapToLong(StatisticsColumnConfiguration::version).min().orElse(0L) + 1;
-
- expectedVersion.put(target, expVer);
- }
+ for (StatisticsTarget target : targets)
+ expectedVer.put(target, minStatVer(statMgr, target) + 1);
if (collect)
statisticsMgr(0).collectStatistics(buildDefaultConfigurations(targets));
else
statisticsMgr(0).refreshStatistics(targets);
- awaitStatistics(TIMEOUT, expectedVersion);
+ awaitStatistics(TIMEOUT, expectedVer, type);
}
catch (Exception ex) {
throw new IgniteException(ex);
@@ -433,6 +429,32 @@ public abstract class StatisticsAbstractTest extends GridCommonAbstractTest {
}
/**
+ * Get minimum statistics version for the given target.
+ *
+ * param statMgr Statistics manager to get configuration from.
+ * @param target Statistics target to get the minimum version by.
+ * @return Minimum statistics configuraion version for the given target
+ * or 0 if there are no configuration for the given targer.
+ * @throws IgniteCheckedException In case of configuration read errors.
+ */
+ public Long minStatVer(IgniteStatisticsManagerImpl statMgr, StatisticsTarget target) throws IgniteCheckedException {
+ StatisticsObjectConfiguration currCfg = statMgr.statisticConfiguration().config(target.key());
+
+ Predicate<StatisticsColumnConfiguration> pred;
+
+ if (F.isEmpty(target.columns()))
+ pred = c -> true;
+ else {
+ Set<String> cols = Arrays.stream(target.columns()).collect(Collectors.toSet());
+
+ pred = c -> cols.contains(c.name());
+ }
+
+ return (currCfg == null) ? 0L : currCfg.columnsAll().values().stream().filter(pred)
+ .mapToLong(StatisticsColumnConfiguration::version).min().orElse(0L);
+ }
+
+ /**
* Get object statistics.
*
* @param rowsCnt Rows count.
@@ -475,11 +497,15 @@ public abstract class StatisticsAbstractTest extends GridCommonAbstractTest {
/** Check that all statistics collections related tasks is empty in specified node. */
protected void checkStatisticTasksEmpty(IgniteEx ign) {
+ if (ign.localNode().isClient())
+ return;
+
Map<StatisticsKey, LocalStatisticsGatheringContext> currColls = GridTestUtils.getFieldValue(
statisticsMgr(ign), "statProc", "gatheringInProgress"
);
- assertTrue(currColls.toString(), currColls.isEmpty());
+ assertTrue("Has statistics collection tasks on node " + ign.localNode().id() + ":" + currColls.toString(),
+ currColls.isEmpty());
IgniteThreadPoolExecutor mgmtPool = GridTestUtils.getFieldValue(statisticsMgr(ign), "mgmtPool");
@@ -495,12 +521,17 @@ public abstract class StatisticsAbstractTest extends GridCommonAbstractTest {
*
* @param timeout Timeout.
* @param expectedVersions Expected versions for specified targets.
+ * @param type Type to get statistics by.
* @throws Exception In case of errors.
*/
- protected void awaitStatistics(long timeout, Map<StatisticsTarget, Long> expectedVersions) throws Exception {
+ protected void awaitStatistics(
+ long timeout,
+ Map<StatisticsTarget, Long> expectedVersions,
+ StatisticsType type
+ ) throws Exception {
for (Ignite ign : G.allGrids()) {
- if (!((IgniteEx)ign).context().clientNode())
- awaitStatistics(timeout, expectedVersions, (IgniteEx)ign);
+ if (StatisticsType.GLOBAL == type || !((IgniteEx)ign).context().clientNode())
+ awaitStatistics(timeout, expectedVersions, (IgniteEx)ign, type);
}
}
@@ -510,10 +541,15 @@ public abstract class StatisticsAbstractTest extends GridCommonAbstractTest {
* @param timeout Timeout.
* @param expectedVersions Expected versions for specified targets.
* @param ign Node to await.
+ * @param type Statistics type.
* @throws Exception In case of errors.
*/
- protected void awaitStatistics(long timeout, Map<StatisticsTarget, Long> expectedVersions, IgniteEx ign)
- throws Exception {
+ protected void awaitStatistics(
+ long timeout,
+ Map<StatisticsTarget, Long> expectedVersions,
+ IgniteEx ign,
+ StatisticsType type
+ ) throws Exception {
long t0 = U.currentTimeMillis();
IgniteH2Indexing indexing = (IgniteH2Indexing)ign.context().query().getIndexing();
@@ -524,30 +560,31 @@ public abstract class StatisticsAbstractTest extends GridCommonAbstractTest {
for (Map.Entry<StatisticsTarget, Long> targetVersionEntry : expectedVersions.entrySet()) {
StatisticsTarget target = targetVersionEntry.getKey();
+
+ // Statistics configuration manager could not get fresh enough configuration version till now so we
+ // need to request global statistics again to force it's collection
+ // TODO: remove me statisticsMgr(ign).getGlobalStatistics(target.key());
+
Long ver = targetVersionEntry.getValue();
- ObjectStatisticsImpl s = (ObjectStatisticsImpl)indexing.statsManager().getLocalStatistics(target.key());
- assertNotNull(s);
+ ObjectStatisticsImpl s;
- long minVer = Long.MAX_VALUE;
+ switch (type) {
+ case LOCAL:
+ s = (ObjectStatisticsImpl)indexing.statsManager().getLocalStatistics(target.key());
- Set<String> cols;
+ break;
- if (F.isEmpty(target.columns()))
- cols = s.columnsStatistics().keySet();
- else
- cols = Arrays.stream(target.columns()).collect(Collectors.toSet());
+ case GLOBAL:
+ s = (ObjectStatisticsImpl)indexing.statsManager().getGlobalStatistics(target.key());
- for (String col : cols) {
- if (s.columnStatistics(col).version() < minVer)
- minVer = s.columnStatistics(col).version();
- }
+ break;
- if (minVer == Long.MAX_VALUE)
- minVer = -1;
+ default:
+ throw new IllegalArgumentException("Unexpected statistics type " + type);
+ }
- assertEquals(String.format("Expected minimum statistics version %d but found %d", ver, minVer),
- (long)ver, minVer);
+ checkStatisticsVersion(ign.localNode().id(), s, target, ver);
}
return;
@@ -562,6 +599,32 @@ public abstract class StatisticsAbstractTest extends GridCommonAbstractTest {
}
/**
+ * Check specified statistics.
+ *
+ * @param stat Object statistics to check.
+ * @param target Statistics target to check only some columns.
+ * @param ver Mininum allowed version.
+ */
+ private void checkStatisticsVersion(UUID nodeId, ObjectStatisticsImpl stat, StatisticsTarget target, long ver) {
+ assertFalse("No column statistics found: " + stat + " on node " + nodeId,
+ stat == null || stat.columnsStatistics().isEmpty());
+
+ Set<String> cols;
+
+ if (F.isEmpty(target.columns()))
+ cols = stat.columnsStatistics().keySet();
+ else
+ cols = Arrays.stream(target.columns()).collect(Collectors.toSet());
+
+ for (String col : cols) {
+ ColumnStatistics colStat = stat.columnStatistics(col);
+
+ assertFalse(String.format("Expect minVer %d but column %s has %s version on node %s", ver, col,
+ (colStat == null) ? null : colStat.version(), nodeId), colStat == null || colStat.version() < ver);
+ }
+ }
+
+ /**
* Get nodes StatisticsGatheringRequestCrawlerImpl.msgMgmtPool lock.
* Put additional task into it and return lock to complete these task.
*
@@ -640,9 +703,17 @@ public abstract class StatisticsAbstractTest extends GridCommonAbstractTest {
* @return Local table statistics or {@code null} if there are no such statistics in specified node.
*/
protected ObjectStatisticsImpl getStatsFromNode(int nodeIdx, String tblName, StatisticsType type) {
+ StatisticsKey key = new StatisticsKey(SCHEMA, tblName);
+
switch (type) {
+ case GLOBAL:
+
+ return (ObjectStatisticsImpl)statisticsMgr(nodeIdx).getGlobalStatistics(key);
+
case LOCAL:
- return (ObjectStatisticsImpl)statisticsMgr(nodeIdx).getLocalStatistics(new StatisticsKey(SCHEMA, tblName));
+
+ return (ObjectStatisticsImpl)statisticsMgr(nodeIdx).getLocalStatistics(key);
+
case PARTITION:
default:
throw new UnsupportedOperationException();
@@ -719,4 +790,103 @@ public abstract class StatisticsAbstractTest extends GridCommonAbstractTest {
}, 1000));
}
}
+
+ /**
+ * Add persistence region to default data region specified ignite configuration.
+ *
+ * @param cfg Base configuration to add persistence into.
+ * @param igniteInstanceName Instance name.
+ * @return Same configuration with persistence enabled.
+ */
+ protected IgniteConfiguration addPersistenceRegion(IgniteConfiguration cfg, String igniteInstanceName) {
+ cfg.setConsistentId(igniteInstanceName);
+
+ DataStorageConfiguration memCfg = new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(new DataRegionConfiguration().setPersistenceEnabled(true));
+
+ cfg.setDataStorageConfiguration(memCfg);
+
+ return cfg;
+ }
+
+ /**
+ * Add in memory region to default data region specified ignite configuration.
+ *
+ * @param cfg Base configuration to add in memory into.
+ * @param igniteInstanceName Instance name.
+ * @return Same configuration with persistence enabled.
+ */
+ protected IgniteConfiguration addImMemoryRegion(IgniteConfiguration cfg, String igniteInstanceName) {
+ cfg.setConsistentId(igniteInstanceName);
+
+ DataStorageConfiguration memCfg = new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(new DataRegionConfiguration().setPersistenceEnabled(false));
+
+ cfg.setDataStorageConfiguration(memCfg);
+
+ return cfg;
+ }
+
+ /**
+ * Check that all expected lines exist in actual. If not - fail.
+ *
+ * @param expected Expected lines, nulls mean any value.
+ * @param actual Actual lines.
+ */
+ protected void checkContains(List<List<Object>> expected, List<List<?>> actual) {
+ List<Object> notExisting = testContains(expected, actual);
+
+ if (notExisting != null)
+ fail("Unable to found " + notExisting + " in specified dataset");
+ }
+
+ /**
+ * Test that all expected lines exist in actual.
+ *
+ * @param expected Expected lines, nulls mean any value.
+ * @param actual Actual lines.
+ * @return First not existing line or {@code null} if all lines presented.
+ */
+ protected List<Object> testContains(List<List<Object>> expected, List<List<?>> actual) {
+ assertTrue(expected.size() <= actual.size());
+
+ assertTrue("Test may take too long with such datasets of actual = " + actual.size(), actual.size() <= 1024);
+
+ for (List<Object> exp : expected) {
+ boolean found = false;
+
+ for (List<?> act : actual) {
+ found = checkEqualWithNull(exp, act);
+
+ if (found)
+ break;
+ }
+
+ if (!found)
+ return exp;
+ }
+
+ return null;
+ }
+
+ /**
+ * Compare expected line with actual one.
+ *
+ * @param expected Expected line, {@code null} value mean any value.
+ * @param actual Actual line.
+ * @return {@code true} if line are equal, {@code false} - otherwise.
+ */
+ protected boolean checkEqualWithNull(List<Object> expected, List<?> actual) {
+ assertEquals(expected.size(), actual.size());
+
+ for (int i = 0; i < expected.size(); i++) {
+ Object exp = expected.get(i);
+ Object act = actual.get(i);
+
+ if (exp != null && !exp.equals(act) && act != null)
+ return false;
+ }
+
+ return true;
+ }
}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsClearTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsClearTest.java
index eee869c..5b6eb15 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsClearTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsClearTest.java
@@ -48,7 +48,7 @@ public class StatisticsClearTest extends StatisticsRestartAbstractTest {
*/
@Test
public void testStatisticsClear() throws Exception {
- updateStatistics(SMALL_TARGET);
+ updateStatistics(StatisticsType.GLOBAL, SMALL_TARGET);
Assert.assertNotNull(statisticsMgr(0).getLocalStatistics(SMALL_KEY));
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsConfigurationTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsConfigurationTest.java
index c51f898..e397929 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsConfigurationTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsConfigurationTest.java
@@ -181,7 +181,7 @@ public class StatisticsConfigurationTest extends StatisticsAbstractTest {
createSmallTable(null);
- collectStatistics(SMALL_TARGET);
+ collectStatistics(StatisticsType.GLOBAL, SMALL_TARGET);
waitForStats(SCHEMA, "SMALL", TIMEOUT, checkTotalRows, checkColumStats);
@@ -209,7 +209,7 @@ public class StatisticsConfigurationTest extends StatisticsAbstractTest {
createSmallTable(null);
- collectStatistics(SMALL_TARGET);
+ collectStatistics(StatisticsType.GLOBAL, SMALL_TARGET);
waitForStats(SCHEMA, "SMALL", TIMEOUT, checkTotalRows, checkColumStats);
@@ -231,7 +231,9 @@ public class StatisticsConfigurationTest extends StatisticsAbstractTest {
IgniteEx client = startClientGrid("cli");
- collectStatistics(SMALL_TARGET);
+ awaitPartitionMapExchange();
+
+ collectStatistics(StatisticsType.GLOBAL, SMALL_TARGET);
sql("delete from small");
@@ -269,7 +271,7 @@ public class StatisticsConfigurationTest extends StatisticsAbstractTest {
createSmallTable(null);
- collectStatistics(SMALL_TARGET);
+ collectStatistics(StatisticsType.GLOBAL, SMALL_TARGET);
waitForStats(SCHEMA, "SMALL", TIMEOUT, checkTotalRows, checkColumStats);
@@ -330,7 +332,7 @@ public class StatisticsConfigurationTest extends StatisticsAbstractTest {
createSmallTable(null);
// 1. Create statistic for a table;
- collectStatistics(SMALL_TARGET);
+ collectStatistics(StatisticsType.GLOBAL, SMALL_TARGET);
// 2. Check statistics on all nodes of the cluster;
waitForStats(SCHEMA, "SMALL", TIMEOUT, checkTotalRows, checkColumStats);
@@ -343,7 +345,7 @@ public class StatisticsConfigurationTest extends StatisticsAbstractTest {
(stats) -> stats.forEach(s -> assertNull(s.columnStatistics("A"))));
// 5. Re-create statistics;
- collectStatistics(new StatisticsTarget(SCHEMA, "SMALL", "A"));
+ collectStatistics(StatisticsType.GLOBAL, new StatisticsTarget(SCHEMA, "SMALL", "A"));
// 6. Check statistics on all nodes of the cluster;
waitForStats(SCHEMA, "SMALL", TIMEOUT, checkTotalRows, checkColumStats);
@@ -369,7 +371,7 @@ public class StatisticsConfigurationTest extends StatisticsAbstractTest {
createSmallTable(null);
- collectStatistics(SMALL_TARGET);
+ collectStatistics(StatisticsType.GLOBAL, SMALL_TARGET);
waitForStats(SCHEMA, "SMALL", TIMEOUT, checkTotalRows, checkColumStats);
@@ -406,7 +408,7 @@ public class StatisticsConfigurationTest extends StatisticsAbstractTest {
createSmallTable(null);
createSmallTable("_A");
- collectStatistics(
+ collectStatistics(StatisticsType.GLOBAL,
new StatisticsTarget(SCHEMA, "SMALL"),
new StatisticsTarget(SCHEMA, "SMALL_A"));
@@ -440,7 +442,7 @@ public class StatisticsConfigurationTest extends StatisticsAbstractTest {
createSmallTable(null);
- collectStatistics(SMALL_TARGET);
+ collectStatistics(StatisticsType.GLOBAL, SMALL_TARGET);
waitForStats(SCHEMA, "SMALL", TIMEOUT, checkTotalRows, checkColumStats);
@@ -475,7 +477,7 @@ public class StatisticsConfigurationTest extends StatisticsAbstractTest {
createSmallTable(null);
- collectStatistics(SMALL_TARGET);
+ collectStatistics(StatisticsType.GLOBAL, SMALL_TARGET);
waitForStats(SCHEMA, "SMALL", TIMEOUT, checkTotalRows, checkColumStats);
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsGatheringTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsGatheringTest.java
index f60d907..d0ef1ff 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsGatheringTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsGatheringTest.java
@@ -18,8 +18,10 @@
package org.apache.ignite.internal.processors.query.stat;
import java.util.Collections;
+import java.util.Map;
import java.util.Objects;
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
@@ -34,7 +36,7 @@ import static org.apache.ignite.internal.processors.query.stat.IgniteStatisticsH
public class StatisticsGatheringTest extends StatisticsRestartAbstractTest {
/** {@inheritDoc} */
@Override public int nodes() {
- return 1;
+ return 2;
}
/**
@@ -52,7 +54,10 @@ public class StatisticsGatheringTest extends StatisticsRestartAbstractTest {
grid(0).cluster().state(ClusterState.INACTIVE);
- GridTestUtils.assertThrows(null, () -> collectStatistics(t101), IgniteException.class,
+ GridTestUtils.assertThrows(null, () -> collectStatistics(StatisticsType.LOCAL, t101), IgniteException.class,
+ "Unable to perform collect statistics due to cluster state [state=INACTIVE]");
+
+ GridTestUtils.assertThrows(null, () -> collectStatistics(StatisticsType.GLOBAL, t101), IgniteException.class,
"Unable to perform collect statistics due to cluster state [state=INACTIVE]");
}
@@ -62,14 +67,63 @@ public class StatisticsGatheringTest extends StatisticsRestartAbstractTest {
* 2) Get global statistics (with delay) and check its equality in all nodes.
*/
@Test
- public void testGathering() throws Exception {
- ObjectStatisticsImpl stats[] = getStats("SMALL", StatisticsType.LOCAL);
+ public void testGathering() throws InterruptedException, IgniteCheckedException {
+ ObjectStatisticsImpl localStats[] = getStats("SMALL", StatisticsType.LOCAL);
+
+ testCond(Objects::nonNull, localStats);
+
+ testCond(stat -> stat.columnsStatistics().size() == localStats[0].columnsStatistics().size(), localStats);
+
+ testCond(this::checkStat, localStats);
+
+ ObjectStatisticsImpl globalStat = getStatsFromNode(0, "SMALL", StatisticsType.GLOBAL);
+
+ assertNotNull(globalStat);
+ }
+
+ /**
+ * Test that all node contains the same global statistics.
+ *
+ * @throws Exception In case of errors.
+ */
+ @Test
+ public void testGlobalIsEqual() throws Exception {
+ ObjectStatisticsImpl globalStats[] = getStats("SMALL", StatisticsType.GLOBAL);
+
+ testCond(Objects::nonNull, globalStats);
+ testCond(this::checkStat, globalStats);
- testCond(Objects::nonNull, stats);
+ ObjectStatisticsImpl globalStat = globalStats[0];
- testCond(stat -> stat.columnsStatistics().size() == stats[0].columnsStatistics().size(), stats);
+ assertTrue(globalStats.length > 1);
- testCond(this::checkStat, stats);
+ for (int i = 1; i < globalStats.length; i++)
+ testEquaData(globalStat, globalStats[i]);
+ }
+
+ /**
+ * Check specified statistics contains equal data (all, except collection time and versions).
+ *
+ * @param expected Expected statistics.
+ * @param actual Actual statistics.
+ */
+ private static void testEquaData(ObjectStatisticsImpl expected, ObjectStatisticsImpl actual) {
+ assertEquals(expected.rowCount(), actual.rowCount());
+
+ assertEquals(expected.columnsStatistics().size(), actual.columnsStatistics().size());
+
+ for (Map.Entry<String, ColumnStatistics> expectedColStatEntry : expected.columnsStatistics().entrySet()) {
+ ColumnStatistics expColStat = expectedColStatEntry.getValue();
+ ColumnStatistics actColStat = actual.columnStatistics(expectedColStatEntry.getKey());
+
+ assertNotNull(actColStat);
+ assertEquals(expColStat.min(), actColStat.min());
+ assertEquals(expColStat.max(), actColStat.max());
+ assertEquals(expColStat.size(), actColStat.size());
+ assertEquals(expColStat.distinct(), actColStat.distinct());
+ assertEquals(expColStat.total(), actColStat.total());
+ assertEquals(expColStat.nulls(), actColStat.nulls());
+ }
}
/**
@@ -90,7 +144,7 @@ public class StatisticsGatheringTest extends StatisticsRestartAbstractTest {
"Table doesn't exist [schema=PUBLIC, table=SMALL101wrong]"
);
- updateStatistics(t100, t101);
+ updateStatistics(StatisticsType.GLOBAL, t100, t101);
ObjectStatisticsImpl[] stats100 = getStats(t100.obj(), StatisticsType.LOCAL);
ObjectStatisticsImpl[] stats101 = getStats(t101.obj(), StatisticsType.LOCAL);
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsViewsPersistenceTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsGlobalViewInMemoryTest.java
similarity index 68%
copy from modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsViewsPersistenceTest.java
copy to modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsGlobalViewInMemoryTest.java
index d4e5365..f93d4ad 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsViewsPersistenceTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsGlobalViewInMemoryTest.java
@@ -17,25 +17,16 @@
package org.apache.ignite.internal.processors.query.stat;
-import org.apache.ignite.configuration.DataRegionConfiguration;
-import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
/**
- * Test views with persistence.
+ * Test global view without persistence.
*/
-public class StatisticsViewsPersistenceTest extends StatisticsViewsTest {
+public class StatisticsGlobalViewInMemoryTest extends StatisticsGlobalViewTest {
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
- cfg.setConsistentId(igniteInstanceName);
-
- DataStorageConfiguration memCfg = new DataStorageConfiguration()
- .setDefaultDataRegionConfiguration(new DataRegionConfiguration().setPersistenceEnabled(true));
-
- cfg.setDataStorageConfiguration(memCfg);
-
- return cfg;
+ return addImMemoryRegion(cfg, igniteInstanceName);
}
}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsViewsPersistenceTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsGlobalViewPersistenceTest.java
similarity index 68%
copy from modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsViewsPersistenceTest.java
copy to modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsGlobalViewPersistenceTest.java
index d4e5365..07a0adb 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsViewsPersistenceTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsGlobalViewPersistenceTest.java
@@ -17,25 +17,16 @@
package org.apache.ignite.internal.processors.query.stat;
-import org.apache.ignite.configuration.DataRegionConfiguration;
-import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
/**
- * Test views with persistence.
+ * Test global view with persistence.
*/
-public class StatisticsViewsPersistenceTest extends StatisticsViewsTest {
+public class StatisticsGlobalViewPersistenceTest extends StatisticsGlobalViewTest {
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
- cfg.setConsistentId(igniteInstanceName);
-
- DataStorageConfiguration memCfg = new DataStorageConfiguration()
- .setDefaultDataRegionConfiguration(new DataRegionConfiguration().setPersistenceEnabled(true));
-
- cfg.setDataStorageConfiguration(memCfg);
-
- return cfg;
+ return addPersistenceRegion(cfg, igniteInstanceName);
}
}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsGlobalViewTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsGlobalViewTest.java
new file mode 100644
index 0000000..7cf0c07
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsGlobalViewTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.stat;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+/**
+ * Tests for global statistics view.
+ */
+public abstract class StatisticsGlobalViewTest extends StatisticsAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTestsStarted();
+ cleanPersistenceDir();
+
+ startGridsMultiThreaded(2);
+ grid(0).cluster().state(ClusterState.ACTIVE);
+
+ grid(0).getOrCreateCache(DEFAULT_CACHE_NAME);
+
+ createSmallTable(null);
+ collectStatistics(StatisticsType.GLOBAL, SMALL_TARGET);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ super.afterTest();
+ }
+
+ /**
+ * Start additional node and try to collect statistics without adding it into baseline topology.
+ * Check that statistics awailable, but no local statistics exists and no additional gathering tasks
+ * stick in such node.
+ *
+ * @throws Exception In case of errors.
+ */
+ @Test
+ public void testStatisticsCollectionOutsideBaseline() throws Exception {
+ List<List<Object>> partLines = Arrays.asList(
+ Arrays.asList(SCHEMA, "TABLE", "SMALL", "A", (long)SMALL_SIZE, (long)SMALL_SIZE, 0L, 100L, 4, null, null),
+ Arrays.asList(SCHEMA, "TABLE", "SMALL", "B", (long)SMALL_SIZE, (long)SMALL_SIZE, 0L, 100L, 4, null, null),
+ Arrays.asList(SCHEMA, "TABLE", "SMALL", "C", (long)SMALL_SIZE, 10L, 0L, 100L, 4, null, null)
+ );
+
+ checkSqlResult("select * from SYS.STATISTICS_GLOBAL_DATA where NAME = 'SMALL'", null, act -> {
+ checkContains(partLines, act);
+
+ return true;
+ });
+
+ ignite(0).cluster().baselineAutoAdjustEnabled(false);
+
+ IgniteEx ign2 = startGrid(2);
+ awaitPartitionMapExchange();
+
+ requestGlobalStatistics(SMALL_KEY);
+
+ assertTrue(GridTestUtils.waitForCondition(() -> statisticsMgr(ign2).getGlobalStatistics(SMALL_KEY) != null,
+ TIMEOUT));
+
+ checkSqlResult("select * from SYS.STATISTICS_GLOBAL_DATA where NAME = 'SMALL'", null, act -> {
+ checkContains(partLines, act);
+
+ return true;
+ });
+
+ stopGrid(2);
+ }
+
+ /**
+ * Check that global stats equals on each node in cluster:
+ * 1) Check global statistics on each grid nodes.
+ * 2) Start new node and check global statistics.
+ * 3) Collect statistics configuration and check it on each node.
+ */
+ @Test
+ public void testGlobalStatEquals() throws Exception {
+ List<List<Object>> partLines = Arrays.asList(
+ Arrays.asList(SCHEMA, "TABLE", "SMALL", "A", (long)SMALL_SIZE, (long)SMALL_SIZE, 0L, 100L, 4, null, null),
+ Arrays.asList(SCHEMA, "TABLE", "SMALL", "B", (long)SMALL_SIZE, (long)SMALL_SIZE, 0L, 100L, 4, null, null),
+ Arrays.asList(SCHEMA, "TABLE", "SMALL", "C", (long)SMALL_SIZE, 10L, 0L, 100L, 4, null, null)
+ );
+
+ checkSqlResult("select * from SYS.STATISTICS_GLOBAL_DATA where NAME = 'SMALL'", null, act -> {
+ checkContains(partLines, act);
+
+ return true;
+ });
+
+ startGrid(2);
+ awaitPartitionMapExchange();
+
+ requestGlobalStatistics(SMALL_KEY);
+
+ long minVer = minStatVer(statisticsMgr(0), SMALL_TARGET);
+
+ checkSqlResult("select * from SYS.STATISTICS_GLOBAL_DATA where NAME = 'SMALL' and VERSION >= " + minVer,
+ null, list -> !list.isEmpty());
+
+ ignite(0).cluster().baselineAutoAdjustEnabled(false);
+ ignite(0).cluster().setBaselineTopology(ignite(1).cluster().topologyVersion());
+ awaitPartitionMapExchange(true, true, null);
+
+ for (Ignite ign : G.allGrids()) {
+
+ System.out.println("node = " + ign.cluster().localNode().id() +
+ " is Server = " + !((IgniteEx)ign).localNode().isClient() + " local stat = " +
+ statisticsMgr((IgniteEx)ign).getLocalStatistics(SMALL_KEY));
+ }
+
+ collectStatistics(StatisticsType.GLOBAL, SMALL_TARGET);
+
+ minVer++;
+
+ checkSqlResult("select * from SYS.STATISTICS_GLOBAL_DATA where NAME = 'SMALL' and VERSION >= " + minVer,
+ null, act -> {
+ checkContains(partLines, act);
+ return true;
+ });
+
+ stopGrid(2);
+ }
+
+ /**
+ * Request global statistics by specified key from each node.
+ *
+ * @param key Key to request global statistics by.
+ */
+ private void requestGlobalStatistics(StatisticsKey key) {
+ for (Ignite ign : G.allGrids()) {
+ IgniteStatisticsManagerImpl nodeMgr = statisticsMgr((IgniteEx)ign);
+
+ nodeMgr.getGlobalStatistics(key);
+ }
+ }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsObsolescenceTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsObsolescenceTest.java
index e4e726f..c9769be 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsObsolescenceTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsObsolescenceTest.java
@@ -90,7 +90,7 @@ public class StatisticsObsolescenceTest extends StatisticsAbstractTest {
ignite.cluster().state(ClusterState.ACTIVE);
createSmallTable(null);
- sql("ANALYZE SMALL");
+ collectStatistics(StatisticsType.GLOBAL, "SMALL");
ignite.cluster().state(ClusterState.INACTIVE);
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsRestartAbstractTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsRestartAbstractTest.java
index 250afe2..d954185 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsRestartAbstractTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsRestartAbstractTest.java
@@ -22,6 +22,7 @@ import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.processors.query.stat.config.StatisticsObjectConfiguration;
import org.apache.ignite.internal.util.typedef.internal.U;
/**
@@ -61,7 +62,7 @@ public class StatisticsRestartAbstractTest extends StatisticsAbstractTest {
createStatisticTarget(null);
- collectStatistics(SMALL_TARGET);
+ statisticsMgr(0).collectStatistics(new StatisticsObjectConfiguration(SMALL_KEY));
}
/**
@@ -80,7 +81,7 @@ public class StatisticsRestartAbstractTest extends StatisticsAbstractTest {
/** {@inheritDoc} */
@Override protected void beforeTest() throws IgniteCheckedException {
- updateStatistics(SMALL_TARGET);
+ updateStatistics(StatisticsType.GLOBAL, SMALL_TARGET);
}
/**
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsStorageTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsStorageTest.java
index c2511ef..ef0ad44 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsStorageTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsStorageTest.java
@@ -27,7 +27,7 @@ import org.junit.Test;
public abstract class StatisticsStorageTest extends StatisticsStorageAbstractTest {
/** {@inheritDoc} */
@Override public void beforeTest() throws Exception {
- collectStatistics(SMALL_TARGET);
+ collectStatistics(StatisticsType.GLOBAL, SMALL_TARGET);
}
/**
@@ -53,11 +53,11 @@ public abstract class StatisticsStorageTest extends StatisticsStorageAbstractTes
*/
@Test
public void testRecollection() throws Exception {
- updateStatistics(SMALL_TARGET);
+ updateStatistics(StatisticsType.GLOBAL, SMALL_TARGET);
ObjectStatisticsImpl locStat = (ObjectStatisticsImpl)statisticsMgr(0).getLocalStatistics(SMALL_KEY);
- updateStatistics(SMALL_TARGET);
+ updateStatistics(StatisticsType.GLOBAL, SMALL_TARGET);
ObjectStatisticsImpl locStat2 = (ObjectStatisticsImpl)statisticsMgr(0).getLocalStatistics(SMALL_KEY);
@@ -85,11 +85,11 @@ public abstract class StatisticsStorageTest extends StatisticsStorageAbstractTes
*/
@Test
public void testPartialRecollection() throws Exception {
- updateStatistics(new StatisticsTarget(SCHEMA, "SMALL", "B"));
+ updateStatistics(StatisticsType.GLOBAL, new StatisticsTarget(SCHEMA, "SMALL", "B"));
ObjectStatisticsImpl locStat = (ObjectStatisticsImpl)statisticsMgr(0)
.getLocalStatistics(new StatisticsKey(SCHEMA, "SMALL"));
- updateStatistics(new StatisticsTarget(SCHEMA, "SMALL", "B"));
+ updateStatistics(StatisticsType.GLOBAL, new StatisticsTarget(SCHEMA, "SMALL", "B"));
ObjectStatisticsImpl locStat2 = (ObjectStatisticsImpl)statisticsMgr(0)
.getLocalStatistics(new StatisticsKey(SCHEMA, "SMALL"));
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsTypesAbstractTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsTypesAbstractTest.java
index 8cb82c9..d9969aa 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsTypesAbstractTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsTypesAbstractTest.java
@@ -83,7 +83,7 @@ public abstract class StatisticsTypesAbstractTest extends StatisticsAbstractTest
for (int i = 0; i > -SMALL_SIZE / 2; i--)
sql(insertNulls(i));
- collectStatistics("dtypes");
+ collectStatistics(StatisticsType.GLOBAL, "dtypes");
}
/**
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsUtilsTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsUtilsTest.java
new file mode 100644
index 0000000..ac810cf
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsUtilsTest.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.stat;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.internal.processors.query.stat.config.StatisticsColumnConfiguration;
+import org.apache.ignite.internal.processors.query.stat.config.StatisticsObjectConfiguration;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+/**
+ * Tests for StatisticsUtils methods
+ */
+public class StatisticsUtilsTest extends GridCommonAbstractTest {
+ /** Test key. */
+ private static final StatisticsKey KEY = new StatisticsKey("SCHEMA", "TABLE");
+
+ /** Test COL1 statistics. */
+ private static final ColumnStatistics COL_1_STAT =
+ new ColumnStatistics(null, null, 0, 0, 0, 0, null, 1, 0);
+
+ /** Test COL1 statistics configuration. */
+ private static final StatisticsColumnConfiguration COL_1_CFG =
+ new StatisticsColumnConfiguration("COL1", null);
+
+ /** Test COL2 statistics. */
+ private static final ColumnStatistics COL_2_STAT =
+ new ColumnStatistics(null, null, 0, 0, 0, 0, null, 2, 0);
+
+ /** Test COL2 statistics configuration. */
+ private static final StatisticsColumnConfiguration COL_2_CFG =
+ new StatisticsColumnConfiguration("COL2", null).refresh();
+
+ /** Test COL3 statistics. */
+ private static final ColumnStatistics COL_3_STAT =
+ new ColumnStatistics(null, null, 0, 0, 0, 0, null, 3, 0);
+
+ /** Test COL3 statistics configuration. */
+ private static final StatisticsColumnConfiguration COL_3_CFG =
+ new StatisticsColumnConfiguration("COL3", null).refresh().refresh();
+
+ /** Test required version. */
+ private static final Map<String, Long> VERSIONS = F.asMap("COL1", 1L, "COL2", 2L);
+
+ /**
+ * Test compareVersions with stat equals to {@code null} returns {@code false}.
+ */
+ @Test
+ public void testCompareStatisticsNullStat() {
+ assertTrue(StatisticsUtils.compareVersions((ObjectStatisticsImpl)null, VERSIONS) < 0);
+ }
+
+ /**
+ * Test compareVersions with stat contains no columns returns {@code false}.
+ */
+ @Test
+ public void testCompareStatisticsVersionsEmptyStat() {
+ ObjectStatisticsImpl objStat = new ObjectStatisticsImpl(100, Collections.emptyMap());
+
+ assertTrue(StatisticsUtils.compareVersions(objStat, VERSIONS) < 0);
+ }
+
+ /**
+ * Test compareVersions with stat contains all columns returns {@code true}.
+ */
+ @Test
+ public void testCompareVersionsAllStat() {
+ Map<String, ColumnStatistics> colStats = F.asMap("COL1", COL_1_STAT, "COL2", COL_2_STAT);
+ ObjectStatisticsImpl objStat = new ObjectStatisticsImpl(100, colStats);
+
+ assertEquals(0, StatisticsUtils.compareVersions(objStat, VERSIONS));
+ }
+
+ /**
+ * Test compareVersions with stat with never columns returns {@code true}.
+ */
+ @Test
+ public void testCompareVersionsNeverStat() {
+ Map<String, ColumnStatistics> colStats = F.asMap("COL1", COL_1_STAT, "COL2", COL_3_STAT);
+ ObjectStatisticsImpl objStat = new ObjectStatisticsImpl(100, colStats);
+
+ assertTrue(StatisticsUtils.compareVersions(objStat, VERSIONS) > 0);
+ }
+
+ /**
+ * Test compareVersions with stat with extra columns returns {@code true}.
+ */
+ @Test
+ public void testCheckStatisticsVersionsExtraColumn() {
+ Map<String, ColumnStatistics> colStats =
+ F.asMap("COL1", COL_1_STAT, "COL2", COL_3_STAT, "COL3", COL_2_STAT);
+ ObjectStatisticsImpl objStat = new ObjectStatisticsImpl(100, colStats);
+
+ assertTrue(StatisticsUtils.compareVersions(objStat, VERSIONS) > 0);
+ }
+
+ /**
+ * Test compareVersions with stat contains no columns returns {@code false}.
+ */
+ @Test
+ public void testCompareObjVersionsEmptyStat() {
+ StatisticsObjectConfiguration objCfg = new StatisticsObjectConfiguration(KEY);
+
+ assertTrue(StatisticsUtils.compareVersions(objCfg, VERSIONS) < 0);
+ }
+
+ /**
+ * Test checkStatisticsConfigurationVersions with stat contains all columns returns {@code true}.
+ */
+ @Test
+ public void testCompareObjVersionsAllStat() {
+ List<StatisticsColumnConfiguration> colCfgs = Arrays.asList(COL_1_CFG, COL_2_CFG);
+ StatisticsObjectConfiguration objCfg = new StatisticsObjectConfiguration(KEY, colCfgs, (byte)50);
+
+ assertTrue(StatisticsUtils.compareVersions(objCfg, VERSIONS) == 0);
+ }
+
+ /**
+ * Test checkStatisticsConfigurationVersions with stat with never columns returns {@code true}.
+ */
+ @Test
+ public void testCompareObjVersionsNewerStat() {
+ List<StatisticsColumnConfiguration> colCfgs = Arrays.asList(COL_1_CFG, COL_2_CFG.refresh());
+ StatisticsObjectConfiguration objCfg = new StatisticsObjectConfiguration(KEY, colCfgs, (byte)50);
+
+ assertTrue(StatisticsUtils.compareVersions(objCfg, VERSIONS) > 0);
+ }
+
+ /**
+ * Test checkStatisticsConfigurationVersions with stat with extra columns returns {@code true}.
+ */
+ @Test
+ public void testCompareObjVersionsExtraColumn() {
+ List<StatisticsColumnConfiguration> colCfgs = Arrays.asList(COL_1_CFG, COL_2_CFG, COL_3_CFG);
+ StatisticsObjectConfiguration objCfg = new StatisticsObjectConfiguration(KEY, colCfgs, (byte)50);
+
+ assertEquals(0, StatisticsUtils.compareVersions(objCfg, VERSIONS));
+ }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsViewsInMemoryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsViewsInMemoryTest.java
index 9feb117..5788118 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsViewsInMemoryTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsViewsInMemoryTest.java
@@ -17,8 +17,6 @@
package org.apache.ignite.internal.processors.query.stat;
-import org.apache.ignite.configuration.DataRegionConfiguration;
-import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
/**
@@ -29,13 +27,6 @@ public class StatisticsViewsInMemoryTest extends StatisticsViewsTest {
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
- cfg.setConsistentId(igniteInstanceName);
-
- DataStorageConfiguration memCfg = new DataStorageConfiguration()
- .setDefaultDataRegionConfiguration(new DataRegionConfiguration().setPersistenceEnabled(false));
-
- cfg.setDataStorageConfiguration(memCfg);
-
- return cfg;
+ return addImMemoryRegion(cfg, igniteInstanceName);
}
}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsViewsPersistenceTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsViewsPersistenceTest.java
index d4e5365..34aef75 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsViewsPersistenceTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsViewsPersistenceTest.java
@@ -17,8 +17,6 @@
package org.apache.ignite.internal.processors.query.stat;
-import org.apache.ignite.configuration.DataRegionConfiguration;
-import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
/**
@@ -29,13 +27,6 @@ public class StatisticsViewsPersistenceTest extends StatisticsViewsTest {
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
- cfg.setConsistentId(igniteInstanceName);
-
- DataStorageConfiguration memCfg = new DataStorageConfiguration()
- .setDefaultDataRegionConfiguration(new DataRegionConfiguration().setPersistenceEnabled(true));
-
- cfg.setDataStorageConfiguration(memCfg);
-
- return cfg;
+ return addPersistenceRegion(cfg, igniteInstanceName);
}
}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsViewsTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsViewsTest.java
index c69bb51..c31c269 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsViewsTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsViewsTest.java
@@ -20,10 +20,13 @@ package org.apache.ignite.internal.processors.query.stat;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
-import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.internal.processors.query.stat.config.StatisticsColumnConfiguration;
+import org.apache.ignite.internal.processors.query.stat.config.StatisticsColumnOverrides;
+import org.apache.ignite.internal.processors.query.stat.config.StatisticsObjectConfiguration;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
@@ -44,21 +47,30 @@ public abstract class StatisticsViewsTest extends StatisticsAbstractTest {
grid(0).getOrCreateCache(DEFAULT_CACHE_NAME);
createSmallTable(null);
- collectStatistics(SMALL_TARGET);
+ collectStatistics(StatisticsType.GLOBAL, SMALL_TARGET);
}
+
/**
* Check small table configuration in statistics column configuration view.
*/
@Test
public void testConfigurationView() throws Exception {
+ collectStatistics(StatisticsType.GLOBAL, SMALL_TARGET);
+
+ ObjectStatisticsImpl smallStat = (ObjectStatisticsImpl)statisticsMgr(0).getLocalStatistics(SMALL_KEY);
+
+ long aVer = smallStat.columnStatistics("A").version();
+ long bVer = smallStat.columnStatistics("B").version();
+ long cVer = smallStat.columnStatistics("C").version();
+
List<List<Object>> config = Arrays.asList(
- Arrays.asList(SCHEMA, "TABLE", "SMALL", "A", (byte)15, null, null, null, null, 1L),
- Arrays.asList(SCHEMA, "TABLE", "SMALL", "B", (byte)15, null, null, null, null, 1L),
- Arrays.asList(SCHEMA, "TABLE", "SMALL", "C", (byte)15, null, null, null, null, 1L)
+ Arrays.asList(SCHEMA, "TABLE", "SMALL", "A", (byte)15, null, null, null, null, aVer),
+ Arrays.asList(SCHEMA, "TABLE", "SMALL", "B", (byte)15, null, null, null, null, bVer),
+ Arrays.asList(SCHEMA, "TABLE", "SMALL", "C", (byte)15, null, null, null, null, cVer)
);
- checkSqlResult("select * from SYS.STATISTICS_CONFIGURATION", null, config::equals);
+ checkSqlResult("select * from SYS.STATISTICS_CONFIGURATION where name = 'SMALL'", null, config::equals);
}
/**
@@ -80,7 +92,9 @@ public abstract class StatisticsViewsTest extends StatisticsAbstractTest {
name = name.toUpperCase();
// 2) Create statistics for new table.
- grid(0).cache(DEFAULT_CACHE_NAME).query(new SqlFieldsQuery("ANALYZE " + name)).getAll();
+ // TODO: revert after IGNITE-15455
+ //grid(0).cache(DEFAULT_CACHE_NAME).query(new SqlFieldsQuery("ANALYZE " + name)).getAll();
+ collectStatistics(StatisticsType.GLOBAL, name);
// 3) Check statistics configuration presence.
List<List<Object>> config = new ArrayList<>();
@@ -91,7 +105,9 @@ public abstract class StatisticsViewsTest extends StatisticsAbstractTest {
checkSqlResult("select * from SYS.STATISTICS_CONFIGURATION where NAME = '" + name + "'", null, config::equals);
// 4) Drop statistics for some column of new table.
- grid(0).cache(DEFAULT_CACHE_NAME).query(new SqlFieldsQuery("DROP STATISTICS " + name + "(A);")).getAll();
+ //grid(0).cache(DEFAULT_CACHE_NAME).query(new SqlFieldsQuery("DROP STATISTICS " + name + "(A);")).getAll();
+ statisticsMgr(0).statisticConfiguration().dropStatistics(
+ Collections.singletonList(new StatisticsTarget(SCHEMA, name, "A")), true);
// 5) Check statistics configuration without dropped column.
List<Object> removed = config.remove(0);
@@ -99,7 +115,10 @@ public abstract class StatisticsViewsTest extends StatisticsAbstractTest {
act -> testContains(config, act) == null && testContains(Arrays.asList(removed), act) != null);
// 6) Drop statistics for new table.
- grid(0).cache(DEFAULT_CACHE_NAME).query(new SqlFieldsQuery("DROP STATISTICS " + name)).getAll();
+ // TODO: revert after IGNITE-15455
+ //grid(0).cache(DEFAULT_CACHE_NAME).query(new SqlFieldsQuery("DROP STATISTICS " + name)).getAll();
+ statisticsMgr(0).statisticConfiguration().dropStatistics(
+ Collections.singletonList(new StatisticsTarget(SCHEMA, name)), true);
// 7) Check statistics configuration without it.
checkSqlResult("select * from SYS.STATISTICS_CONFIGURATION where NAME = '" + name + "'", null, List::isEmpty);
@@ -121,67 +140,6 @@ public abstract class StatisticsViewsTest extends StatisticsAbstractTest {
}
/**
- * Check that all expected lines exist in actual. If not - fail.
- *
- * @param expected Expected lines, nulls mean any value.
- * @param actual Actual lines.
- */
- private void checkContains(List<List<Object>> expected, List<List<?>> actual) {
- List<Object> notExisting = testContains(expected, actual);
- if (notExisting != null)
- fail("Unable to found " + notExisting + " in specified dataset");
- }
-
- /**
- * Test that all expected lines exist in actual.
- *
- * @param expected Expected lines, nulls mean any value.
- * @param actual Actual lines.
- * @return First not existing line or {@code null} if all lines presented.
- */
- private List<Object> testContains(List<List<Object>> expected, List<List<?>> actual) {
- assertTrue(expected.size() <= actual.size());
-
- assertTrue("Test may take too long with such datasets of actual = " + actual.size(), actual.size() <= 1024);
-
- for (List<Object> exp : expected) {
- boolean found = false;
-
- for (List<?> act : actual) {
- found = checkEqualWithNull(exp, act);
-
- if (found)
- break;
- }
-
- if (!found)
- return exp;
- }
-
- return null;
- }
-
- /**
- * Compare expected line with actual one.
- *
- * @param expected Expected line, {@code null} value mean any value.
- * @param actual Actual line.
- * @return {@code true} if line are equal, {@code false} - otherwise.
- */
- private boolean checkEqualWithNull(List<Object> expected, List<?> actual) {
- assertEquals(expected.size(), actual.size());
-
- for (int i = 0; i < expected.size(); i++) {
- Object exp = expected.get(i);
- Object act = actual.get(i);
- if (exp != null && !exp.equals(act) && act != null)
- return false;
- }
-
- return true;
- }
-
- /**
* Check small table local data in statistics local data view.
*/
@Test
@@ -219,14 +177,35 @@ public abstract class StatisticsViewsTest extends StatisticsAbstractTest {
assertNotNull(smallStat);
assertEquals(size, smallStat.rowCount());
- sql("DROP STATISTICS SMALL");
+ // TODO: revert after IGNITE-15455
+ // sql("DROP STATISTICS SMALL");
- checkSqlResult("select * from SYS.STATISTICS_LOCAL_DATA where NAME = 'SMALL'", null,
- list -> list.isEmpty());
+ // sql("ANALYZE SMALL (A) WITH \"DISTINCT=5,NULLS=6,TOTAL=7,SIZE=8\"");
+ // sql("ANALYZE SMALL (B) WITH \"DISTINCT=6,NULLS=7,TOTAL=8\"");
+ // sql("ANALYZE SMALL (C)");
+ IgniteStatisticsConfigurationManager cfgMgr = statisticsMgr(0).statisticConfiguration();
+
+ cfgMgr.dropStatistics(Collections.singletonList(SMALL_TARGET), true);
+
+ StatisticsColumnConfiguration aCfg = new StatisticsColumnConfiguration("A",
+ new StatisticsColumnOverrides(6L, 5L, 7L, 8));
+ StatisticsObjectConfiguration smallACfg = new StatisticsObjectConfiguration(SMALL_KEY,
+ Collections.singleton(aCfg), StatisticsObjectConfiguration.DEFAULT_OBSOLESCENCE_MAX_PERCENT);
+
+ cfgMgr.updateStatistics(smallACfg);
- sql("ANALYZE SMALL (A) WITH \"DISTINCT=5,NULLS=6,TOTAL=7,SIZE=8\"");
- sql("ANALYZE SMALL (B) WITH \"DISTINCT=6,NULLS=7,TOTAL=8\"");
- sql("ANALYZE SMALL (C)");
+ StatisticsColumnConfiguration bCfg = new StatisticsColumnConfiguration("B",
+ new StatisticsColumnOverrides(7L, 6L, 8L, null));
+ StatisticsObjectConfiguration smallBCfg = new StatisticsObjectConfiguration(SMALL_KEY,
+ Collections.singleton(bCfg), StatisticsObjectConfiguration.DEFAULT_OBSOLESCENCE_MAX_PERCENT);
+
+ cfgMgr.updateStatistics(smallBCfg);
+
+ StatisticsColumnConfiguration cCfg = new StatisticsColumnConfiguration("C", null);
+ StatisticsObjectConfiguration smallCCfg = new StatisticsObjectConfiguration(SMALL_KEY,
+ Collections.singleton(cCfg), StatisticsObjectConfiguration.DEFAULT_OBSOLESCENCE_MAX_PERCENT);
+
+ cfgMgr.updateStatistics(smallCCfg);
checkSqlResult("select * from SYS.STATISTICS_LOCAL_DATA where NAME = 'SMALL' and COLUMN = 'C'", null,
list -> !list.isEmpty());
@@ -243,10 +222,14 @@ public abstract class StatisticsViewsTest extends StatisticsAbstractTest {
Timestamp tsB = new Timestamp(smallStat.columnStatistics("B").createdAt());
Timestamp tsC = new Timestamp(smallStat.columnStatistics("C").createdAt());
+ long aVer = smallStat.columnStatistics("A").version();
+ long bVer = smallStat.columnStatistics("B").version();
+ long cVer = smallStat.columnStatistics("C").version();
+
List<List<Object>> localData = Arrays.asList(
- Arrays.asList(SCHEMA, "TABLE", "SMALL", "A", 8L, 5L, 6L, 7L, 8, 3L, tsA.toString()),
- Arrays.asList(SCHEMA, "TABLE", "SMALL", "B", 8L, 6L, 7L, 8L, 4, 3L, tsB.toString()),
- Arrays.asList(SCHEMA, "TABLE", "SMALL", "C", 8L, 10L, 0L, size, 4, 3L, tsC.toString())
+ Arrays.asList(SCHEMA, "TABLE", "SMALL", "A", 8L, 5L, 6L, 7L, 8, aVer, tsA.toString()),
+ Arrays.asList(SCHEMA, "TABLE", "SMALL", "B", 8L, 6L, 7L, 8L, 4, bVer, tsB.toString()),
+ Arrays.asList(SCHEMA, "TABLE", "SMALL", "C", 8L, 10L, 0L, size, 4, cVer, tsC.toString())
);
checkSqlResult("select * from SYS.STATISTICS_LOCAL_DATA where NAME = 'SMALL'", null,
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteStatisticsTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteStatisticsTestSuite.java
index ef338cd0..312c014 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteStatisticsTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteStatisticsTestSuite.java
@@ -23,6 +23,7 @@ import org.apache.ignite.internal.processors.query.stat.BusyExecutorTest;
import org.apache.ignite.internal.processors.query.stat.ColumnStatisticsCollectorAggregationTest;
import org.apache.ignite.internal.processors.query.stat.ColumnStatisticsCollectorTest;
import org.apache.ignite.internal.processors.query.stat.HasherSelfTest;
+import org.apache.ignite.internal.processors.query.stat.IgniteStatisticsRepositoryStaticTest;
import org.apache.ignite.internal.processors.query.stat.IgniteStatisticsRepositoryTest;
import org.apache.ignite.internal.processors.query.stat.ManagerStatisticsTypesTest;
import org.apache.ignite.internal.processors.query.stat.PSUBasicValueDistributionTableStatisticsUsageTest;
@@ -35,11 +36,14 @@ import org.apache.ignite.internal.processors.query.stat.SqlStatisticsCommandTest
import org.apache.ignite.internal.processors.query.stat.StatisticsClearTest;
import org.apache.ignite.internal.processors.query.stat.StatisticsConfigurationTest;
import org.apache.ignite.internal.processors.query.stat.StatisticsGatheringTest;
+import org.apache.ignite.internal.processors.query.stat.StatisticsGlobalViewInMemoryTest;
+import org.apache.ignite.internal.processors.query.stat.StatisticsGlobalViewPersistenceTest;
import org.apache.ignite.internal.processors.query.stat.StatisticsObsolescenceTest;
import org.apache.ignite.internal.processors.query.stat.StatisticsStorageInMemoryTest;
import org.apache.ignite.internal.processors.query.stat.StatisticsStoragePersistenceTest;
import org.apache.ignite.internal.processors.query.stat.StatisticsStorageRestartTest;
import org.apache.ignite.internal.processors.query.stat.StatisticsStorageUnitTest;
+import org.apache.ignite.internal.processors.query.stat.StatisticsUtilsTest;
import org.apache.ignite.internal.processors.query.stat.StatisticsViewsInMemoryTest;
import org.apache.ignite.internal.processors.query.stat.StatisticsViewsPersistenceTest;
import org.apache.ignite.internal.processors.query.stat.hll.FullHLLTest;
@@ -63,6 +67,7 @@ import org.junit.runners.Suite;
StatisticsStorageRestartTest.class,
StatisticsGatheringTest.class,
StatisticsClearTest.class,
+ IgniteStatisticsRepositoryStaticTest.class,
// Table statistics usage.
RowCountTableStatisticsUsageTest.class,
@@ -80,6 +85,7 @@ import org.junit.runners.Suite;
StatisticsStorageInMemoryTest.class,
StatisticsStoragePersistenceTest.class,
StatisticsStorageUnitTest.class,
+ StatisticsUtilsTest.class,
// Statistics SQL commands
SqlParserAnalyzeSelfTest.class,
@@ -93,7 +99,9 @@ import org.junit.runners.Suite;
// Views
StatisticsViewsPersistenceTest.class,
- StatisticsViewsInMemoryTest.class
+ StatisticsViewsInMemoryTest.class,
+ StatisticsGlobalViewPersistenceTest.class,
+ StatisticsGlobalViewInMemoryTest.class
})
public class IgniteStatisticsTestSuite {
}