You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ko...@apache.org on 2021/11/12 07:45:47 UTC

[ignite] branch sql-calcite updated: IGNITE-15289 Global statistics collection (#9392)

This is an automated email from the ASF dual-hosted git repository.

korlov pushed a commit to branch sql-calcite
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/sql-calcite by this push:
     new e25c7db  IGNITE-15289 Global statistics collection (#9392)
e25c7db is described below

commit e25c7db727a13f1076b0f9a26330fbd5b2a0e9f4
Author: Berkof <sa...@mail.ru>
AuthorDate: Fri Nov 12 14:44:15 2021 +0700

    IGNITE-15289 Global statistics collection (#9392)
---
 .../query/calcite/CalciteQueryProcessor.java       |    2 +-
 .../integration/TableDdlIntegrationTest.java       |    1 -
 .../query/calcite/planner/AbstractPlannerTest.java |   15 +-
 .../ignite/testsuites/IntegrationTestSuite.java    |    2 +
 .../SystemViewRowAttributeWalkerGenerator.java     |    2 +
 .../java/org/apache/ignite/internal/GridTopic.java |    5 +-
 .../cache/persistence/metastorage/MetaStorage.java |   46 +-
 .../query/stat/IgniteStatisticsManager.java        |    8 +
 .../processors/query/stat/StatisticsKey.java       |   12 +-
 .../stat/config/StatisticsColumnConfiguration.java |   24 +-
 .../stat/config/StatisticsObjectConfiguration.java |    2 +-
 .../StatisticsColumnGlobalDataViewWalker.java      |   90 ++
 .../h2/twostep/msg/GridH2ValueMessageFactory.java  |   11 +
 .../query/stat/IgniteGlobalStatisticsManager.java  | 1021 ++++++++++++++++++++
 .../stat/IgniteStatisticsConfigurationManager.java |   53 +-
 .../query/stat/IgniteStatisticsHelper.java         |   60 ++
 .../query/stat/IgniteStatisticsManagerImpl.java    |  104 +-
 .../query/stat/IgniteStatisticsRepository.java     |   86 +-
 .../query/stat/ObjectStatisticsEvent.java          |   75 ++
 .../query/stat/ObjectStatisticsImpl.java           |   16 +
 ...csType.java => StatisticsAddressedRequest.java} |   44 +-
 .../processors/query/stat/StatisticsProcessor.java |    4 +-
 .../processors/query/stat/StatisticsType.java      |    5 +-
 .../processors/query/stat/StatisticsUtils.java     |   63 +-
 .../query/stat/messages/StatisticsKeyMessage.java  |    6 +
 .../query/stat/messages/StatisticsRequest.java     |  243 +++++
 ...ticsKeyMessage.java => StatisticsResponse.java} |  107 +-
 .../stat/view/ColumnLocalDataViewSupplier.java     |    4 +-
 ...ew.java => StatisticsColumnGlobalDataView.java} |    7 +-
 .../stat/view/StatisticsColumnLocalDataView.java   |    2 +-
 .../stat/IgniteStatisticsRepositoryStaticTest.java |   84 ++
 .../query/stat/IgniteStatisticsRepositoryTest.java |   13 +-
 ...cValueDistributionTableStatisticsUsageTest.java |    2 +-
 .../PSUCompositeIndexTableStatisticsUsageTest.java |   10 +-
 .../stat/PSUStatisticPartialGatheringTest.java     |    4 +-
 .../query/stat/PSUStatisticsStorageTest.java       |   12 +-
 ...UValueDistributionTableStatisticsUsageTest.java |    2 +-
 .../query/stat/SqlStatisticsCommandTests.java      |    5 +-
 .../query/stat/StatisticsAbstractTest.java         |  274 +++++-
 .../processors/query/stat/StatisticsClearTest.java |    2 +-
 .../query/stat/StatisticsConfigurationTest.java    |   22 +-
 .../query/stat/StatisticsGatheringTest.java        |   70 +-
 ....java => StatisticsGlobalViewInMemoryTest.java} |   15 +-
 ...va => StatisticsGlobalViewPersistenceTest.java} |   15 +-
 .../query/stat/StatisticsGlobalViewTest.java       |  160 +++
 .../query/stat/StatisticsObsolescenceTest.java     |    2 +-
 .../query/stat/StatisticsRestartAbstractTest.java  |    5 +-
 .../query/stat/StatisticsStorageTest.java          |   10 +-
 .../query/stat/StatisticsTypesAbstractTest.java    |    2 +-
 .../processors/query/stat/StatisticsUtilsTest.java |  158 +++
 .../query/stat/StatisticsViewsInMemoryTest.java    |   11 +-
 .../query/stat/StatisticsViewsPersistenceTest.java |   11 +-
 .../processors/query/stat/StatisticsViewsTest.java |  141 ++-
 .../testsuites/IgniteStatisticsTestSuite.java      |   10 +-
 54 files changed, 2723 insertions(+), 437 deletions(-)

diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
index 28746f2..508d50e 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
@@ -71,9 +71,9 @@ import org.apache.ignite.internal.processors.query.calcite.metadata.MappingServi
 import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCostFactory;
 import org.apache.ignite.internal.processors.query.calcite.prepare.CacheKey;
 import org.apache.ignite.internal.processors.query.calcite.prepare.IgniteConvertletTable;
+import org.apache.ignite.internal.processors.query.calcite.prepare.IgniteTypeCoercion;
 import org.apache.ignite.internal.processors.query.calcite.prepare.PrepareServiceImpl;
 import org.apache.ignite.internal.processors.query.calcite.prepare.QueryPlan;
-import org.apache.ignite.internal.processors.query.calcite.prepare.IgniteTypeCoercion;
 import org.apache.ignite.internal.processors.query.calcite.prepare.QueryPlanCache;
 import org.apache.ignite.internal.processors.query.calcite.prepare.QueryPlanCacheImpl;
 import org.apache.ignite.internal.processors.query.calcite.schema.SchemaHolder;
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/TableDdlIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/TableDdlIntegrationTest.java
index 57ac900..da8f83f 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/TableDdlIntegrationTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/TableDdlIntegrationTest.java
@@ -43,7 +43,6 @@ import org.hamcrest.Matcher;
 import org.junit.Test;
 
 import static org.apache.ignite.internal.util.IgniteUtils.map;
-import static org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause;
 import static org.apache.ignite.testframework.GridTestUtils.hasSize;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.hasItem;
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
index c3499c9..8ffaa06 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
@@ -363,13 +363,14 @@ public abstract class AbstractPlannerTest extends GridCommonAbstractTest {
             clearTraits(expected);
             clearTraits(deserialized);
 
-            if (!expected.deepEquals(deserialized))
-            assertTrue(
-                "Invalid serialization / deserialization.\n" +
-                    "Expected:\n" + RelOptUtil.toString(expected) +
-                    "Deserialized:\n" + RelOptUtil.toString(deserialized),
-                expected.deepEquals(deserialized)
-            );
+            if (!expected.deepEquals(deserialized)) {
+                assertTrue(
+                    "Invalid serialization / deserialization.\n" +
+                        "Expected:\n" + RelOptUtil.toString(expected) +
+                        "Deserialized:\n" + RelOptUtil.toString(deserialized),
+                    expected.deepEquals(deserialized)
+                );
+            }
         }
     }
 
diff --git a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
index 50d2979..0be1471 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
@@ -34,6 +34,7 @@ import org.apache.ignite.internal.processors.query.calcite.integration.IntervalT
 import org.apache.ignite.internal.processors.query.calcite.integration.JoinIntegrationTest;
 import org.apache.ignite.internal.processors.query.calcite.integration.KillCommandDdlIntegrationTest;
 import org.apache.ignite.internal.processors.query.calcite.integration.MetadataIntegrationTest;
+import org.apache.ignite.internal.processors.query.calcite.integration.RunningQueriesIntegrationTest;
 import org.apache.ignite.internal.processors.query.calcite.integration.ServerStatisticsIntegrationTest;
 import org.apache.ignite.internal.processors.query.calcite.integration.SetOpIntegrationTest;
 import org.apache.ignite.internal.processors.query.calcite.integration.SortAggregateIntegrationTest;
@@ -64,6 +65,7 @@ import org.junit.runners.Suite;
     SqlFieldsQueryUsageTest.class,
     AggregatesIntegrationTest.class,
     MetadataIntegrationTest.class,
+    RunningQueriesIntegrationTest.class,
     SortAggregateIntegrationTest.class,
     TableDdlIntegrationTest.class,
     IndexDdlIntegrationTest.class,
diff --git a/modules/codegen/src/main/java/org/apache/ignite/codegen/SystemViewRowAttributeWalkerGenerator.java b/modules/codegen/src/main/java/org/apache/ignite/codegen/SystemViewRowAttributeWalkerGenerator.java
index 6beb0fa..70f88c5 100644
--- a/modules/codegen/src/main/java/org/apache/ignite/codegen/SystemViewRowAttributeWalkerGenerator.java
+++ b/modules/codegen/src/main/java/org/apache/ignite/codegen/SystemViewRowAttributeWalkerGenerator.java
@@ -36,6 +36,7 @@ import org.apache.ignite.internal.managers.systemview.SystemViewMBean;
 import org.apache.ignite.internal.managers.systemview.walker.Filtrable;
 import org.apache.ignite.internal.managers.systemview.walker.Order;
 import org.apache.ignite.internal.processors.query.stat.view.StatisticsColumnConfigurationView;
+import org.apache.ignite.internal.processors.query.stat.view.StatisticsColumnGlobalDataView;
 import org.apache.ignite.internal.processors.query.stat.view.StatisticsColumnLocalDataView;
 import org.apache.ignite.internal.processors.query.stat.view.StatisticsColumnPartitionDataView;
 import org.apache.ignite.internal.util.typedef.F;
@@ -150,6 +151,7 @@ public class SystemViewRowAttributeWalkerGenerator {
 
         gen.generateAndWrite(StatisticsColumnConfigurationView.class, INDEXING_SRC_DIR);
         gen.generateAndWrite(StatisticsColumnLocalDataView.class, INDEXING_SRC_DIR);
+        gen.generateAndWrite(StatisticsColumnGlobalDataView.class, INDEXING_SRC_DIR);
         gen.generateAndWrite(StatisticsColumnPartitionDataView.class, INDEXING_SRC_DIR);
     }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
index 0c1be03..7cc43e3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
@@ -155,7 +155,10 @@ public enum GridTopic {
     TOPIC_DISTRIBUTED_PROCESS,
 
     /** */
-    TOPIC_COMM_SYSTEM;
+    TOPIC_COMM_SYSTEM,
+
+    /** Statistics related messages topic. */
+    TOPIC_STATISTICS;
 
     /** Enum values. */
     private static final GridTopic[] VALS = values();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java
index f036546..435fbf0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java
@@ -342,39 +342,43 @@ public class MetaStorage implements CheckpointListener, ReadWriteMetastorage {
             }
         }
 
-        Map.Entry<String, byte[]> curUpdatesEntry = null;
+        // TODO rewrite synchronized after https://issues.apache.org/jira/browse/IGNITE-15472
+        synchronized (this) {
 
-        if (updatesIter != null) {
-            assert updatesIter.hasNext();
+            Map.Entry<String, byte[]> curUpdatesEntry = null;
 
-            curUpdatesEntry = updatesIter.next();
-        }
+            if (updatesIter != null) {
+                assert updatesIter.hasNext();
 
-        MetastorageSearchRow lower = new MetastorageSearchRow(keyPrefix);
+                curUpdatesEntry = updatesIter.next();
+            }
 
-        MetastorageSearchRow upper = new MetastorageSearchRow(keyPrefix + "\uFFFF");
+            MetastorageSearchRow lower = new MetastorageSearchRow(keyPrefix);
 
-        GridCursor<MetastorageDataRow> cur = tree.find(lower, upper);
+            MetastorageSearchRow upper = new MetastorageSearchRow(keyPrefix + "\uFFFF");
 
-        while (cur.next()) {
-            MetastorageDataRow row = cur.get();
+            GridCursor<MetastorageDataRow> cur = tree.find(lower, upper);
 
-            String key = row.key();
-            byte[] valBytes = partStorage.readRow(row.link());
+            while (cur.next()) {
+                MetastorageDataRow row = cur.get();
 
-            int c = 0;
+                String key = row.key();
+                byte[] valBytes = partStorage.readRow(row.link());
 
-            while (curUpdatesEntry != null && (c = curUpdatesEntry.getKey().compareTo(key)) < 0)
-                curUpdatesEntry = advanceCurrentUpdatesEntry(cb, unmarshal, updatesIter, curUpdatesEntry);
+                int c = 0;
+
+                while (curUpdatesEntry != null && (c = curUpdatesEntry.getKey().compareTo(key)) < 0)
+                    curUpdatesEntry = advanceCurrentUpdatesEntry(cb, unmarshal, updatesIter, curUpdatesEntry);
 
-            if (curUpdatesEntry != null && c == 0)
+                if (curUpdatesEntry != null && c == 0)
+                    curUpdatesEntry = advanceCurrentUpdatesEntry(cb, unmarshal, updatesIter, curUpdatesEntry);
+                else
+                    applyCallback(cb, unmarshal, key, valBytes);
+            }
+
+            while (curUpdatesEntry != null)
                 curUpdatesEntry = advanceCurrentUpdatesEntry(cb, unmarshal, updatesIter, curUpdatesEntry);
-            else
-                applyCallback(cb, unmarshal, key, valBytes);
         }
-
-        while (curUpdatesEntry != null)
-            curUpdatesEntry = advanceCurrentUpdatesEntry(cb, unmarshal, updatesIter, curUpdatesEntry);
     }
 
     /** */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsManager.java
index 59e73f4..233f2fa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsManager.java
@@ -61,6 +61,14 @@ public interface IgniteStatisticsManager {
     public ObjectStatistics getLocalStatistics(StatisticsKey key);
 
     /**
+     * Get global statistics by object.
+     *
+     * @param key Statistic key.
+     * @return Object statistics of {@code null} if there are no available global statistics by specified object.
+     */
+    public ObjectStatistics getGlobalStatistics(StatisticsKey key);
+
+    /**
      * Stop statistic manager.
      */
     public void stop();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/StatisticsKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/StatisticsKey.java
index d061320..b7a7037 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/StatisticsKey.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/StatisticsKey.java
@@ -61,11 +61,15 @@ public class StatisticsKey implements Serializable {
 
     /** {@inheritDoc} */
     @Override public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
+        if (this == o)
+            return true;
+
+        if (o == null || getClass() != o.getClass())
+            return false;
+
         StatisticsKey statsKey = (StatisticsKey) o;
-        return Objects.equals(schema, statsKey.schema) &&
-            Objects.equals(obj, statsKey.obj);
+
+        return Objects.equals(schema, statsKey.schema) && Objects.equals(obj, statsKey.obj);
     }
 
     /** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/config/StatisticsColumnConfiguration.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/config/StatisticsColumnConfiguration.java
index f1ab02f..b025aab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/config/StatisticsColumnConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/config/StatisticsColumnConfiguration.java
@@ -128,7 +128,7 @@ public class StatisticsColumnConfiguration implements Serializable {
      *
      * @param oldCfg Previous configuration. May be {@code null} when new configuration is created.
      * @param newCfg New configuration.
-     * @return merged configuration.
+     * @return Merged configuration.
      */
     public static StatisticsColumnConfiguration merge(
         StatisticsColumnConfiguration oldCfg,
@@ -137,32 +137,10 @@ public class StatisticsColumnConfiguration implements Serializable {
         if (oldCfg == null)
             return newCfg;
 
-        if (oldCfg.collectionAwareEqual(newCfg))
-            return new StatisticsColumnConfiguration(newCfg, oldCfg.ver, oldCfg.tombstone, newCfg.overrides);
-
         return new StatisticsColumnConfiguration(newCfg, oldCfg.ver + 1, false, newCfg.overrides);
     }
 
     /**
-     * Compare only collection or gathering related fields of config.
-     *
-     * @param o StatisticsColumnConfiguration to compare with.
-     * @return {@code true} if configurations are equal from the gathering point of view, {@code false} - otherwise.
-     */
-    public boolean collectionAwareEqual(StatisticsColumnConfiguration o) {
-        if (this == o)
-            return true;
-
-        if (o == null || getClass() != o.getClass())
-            return false;
-
-        StatisticsColumnConfiguration that = (StatisticsColumnConfiguration)o;
-
-        return ver == that.ver && tombstone == that.tombstone
-            && Objects.equals(name, that.name);
-    }
-
-    /**
      * Create configuration for dropped statistic column.
      *
      * @return Tombstone column configuration.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/config/StatisticsObjectConfiguration.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/config/StatisticsObjectConfiguration.java
index 32d1ddd..d39706f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/config/StatisticsObjectConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/config/StatisticsObjectConfiguration.java
@@ -245,7 +245,7 @@ public class StatisticsObjectConfiguration implements Serializable, Comparable<S
     /**
      * Compare only configuration from the same branch. I.e. can't correctly compare
      * Cfg(A=v1,B=v3) vs Cfg(A=v2,B=v1)
-     * Cfg(A=v1,B=v3) vs Cfg(A=v1m C=v2)
+     * Cfg(A=v1,B=v3) vs Cfg(A=v1,C=v2)
      * because there is no changes chain to get one from another.
      *
      * @param o Other configuration to compare.
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/managers/systemview/walker/StatisticsColumnGlobalDataViewWalker.java b/modules/indexing/src/main/java/org/apache/ignite/internal/managers/systemview/walker/StatisticsColumnGlobalDataViewWalker.java
new file mode 100644
index 0000000..c5d9cce
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/managers/systemview/walker/StatisticsColumnGlobalDataViewWalker.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.systemview.walker;
+
+import java.sql.Timestamp;
+import java.util.Collections;
+import java.util.List;
+import org.apache.ignite.internal.processors.query.stat.view.StatisticsColumnGlobalDataView;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.spi.systemview.view.SystemViewRowAttributeWalker;
+
+/**
+ * Generated by {@code org.apache.ignite.codegen.SystemViewRowAttributeWalkerGenerator}.
+ * {@link StatisticsColumnGlobalDataView} attributes walker.
+ * 
+ * @see StatisticsColumnGlobalDataView
+ */
+public class StatisticsColumnGlobalDataViewWalker implements SystemViewRowAttributeWalker<StatisticsColumnGlobalDataView> {
+    /** Filter key for attribute "schema" */
+    public static final String SCHEMA_FILTER = "schema";
+
+    /** Filter key for attribute "type" */
+    public static final String TYPE_FILTER = "type";
+
+    /** Filter key for attribute "name" */
+    public static final String NAME_FILTER = "name";
+
+    /** Filter key for attribute "column" */
+    public static final String COLUMN_FILTER = "column";
+
+    /** List of filtrable attributes. */
+    private static final List<String> FILTRABLE_ATTRS = Collections.unmodifiableList(F.asList(
+        "schema", "type", "name", "column"
+    ));
+
+    /** {@inheritDoc} */
+    @Override public List<String> filtrableAttributes() {
+        return FILTRABLE_ATTRS;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void visitAll(AttributeVisitor v) {
+        v.accept(0, "schema", String.class);
+        v.accept(1, "type", String.class);
+        v.accept(2, "name", String.class);
+        v.accept(3, "column", String.class);
+        v.accept(4, "rowsCount", long.class);
+        v.accept(5, "distinct", long.class);
+        v.accept(6, "nulls", long.class);
+        v.accept(7, "total", long.class);
+        v.accept(8, "size", int.class);
+        v.accept(9, "version", long.class);
+        v.accept(10, "lastUpdateTime", Timestamp.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void visitAll(StatisticsColumnGlobalDataView row, AttributeWithValueVisitor v) {
+        v.accept(0, "schema", String.class, row.schema());
+        v.accept(1, "type", String.class, row.type());
+        v.accept(2, "name", String.class, row.name());
+        v.accept(3, "column", String.class, row.column());
+        v.acceptLong(4, "rowsCount", row.rowsCount());
+        v.acceptLong(5, "distinct", row.distinct());
+        v.acceptLong(6, "nulls", row.nulls());
+        v.acceptLong(7, "total", row.total());
+        v.acceptInt(8, "size", row.size());
+        v.acceptLong(9, "version", row.version());
+        v.accept(10, "lastUpdateTime", Timestamp.class, row.lastUpdateTime());
+    }
+
+    /** {@inheritDoc} */
+    @Override public int count() {
+        return 11;
+    }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java
index e4aae7d..a306f7b 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java
@@ -23,6 +23,11 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.cache.query.QueryTable;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2ValueCacheObject;
+import org.apache.ignite.internal.processors.query.stat.messages.StatisticsColumnData;
+import org.apache.ignite.internal.processors.query.stat.messages.StatisticsKeyMessage;
+import org.apache.ignite.internal.processors.query.stat.messages.StatisticsObjectData;
+import org.apache.ignite.internal.processors.query.stat.messages.StatisticsRequest;
+import org.apache.ignite.internal.processors.query.stat.messages.StatisticsResponse;
 import org.apache.ignite.plugin.extensions.communication.IgniteMessageFactory;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
@@ -64,6 +69,12 @@ public class GridH2ValueMessageFactory implements MessageFactoryProvider {
         factory.register((short)-55, GridH2DmlRequest::new);
         factory.register((short)-56, GridH2DmlResponse::new);
         factory.register((short)-57, GridH2SelectForUpdateTxDetails::new);
+
+        factory.register(StatisticsKeyMessage.TYPE_CODE, StatisticsKeyMessage::new);
+        factory.register(StatisticsObjectData.TYPE_CODE, StatisticsObjectData::new);
+        factory.register(StatisticsColumnData.TYPE_CODE, StatisticsColumnData::new);
+        factory.register(StatisticsRequest.TYPE_CODE, StatisticsRequest::new);
+        factory.register(StatisticsResponse.TYPE_CODE, StatisticsResponse::new);
     }
 
     /** {@inheritDoc} */
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteGlobalStatisticsManager.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteGlobalStatisticsManager.java
new file mode 100644
index 0000000..88a555b
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteGlobalStatisticsManager.java
@@ -0,0 +1,1021 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.stat;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.internal.GridTopic;
+import org.apache.ignite.internal.events.DiscoveryCustomEvent;
+import org.apache.ignite.internal.managers.communication.GridIoManager;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.internal.managers.communication.GridMessageListener;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
+import org.apache.ignite.internal.managers.systemview.GridSystemViewManager;
+import org.apache.ignite.internal.managers.systemview.walker.StatisticsColumnGlobalDataViewWalker;
+import org.apache.ignite.internal.managers.systemview.walker.StatisticsColumnLocalDataViewWalker;
+import org.apache.ignite.internal.managers.systemview.walker.StatisticsColumnPartitionDataViewWalker;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
+import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
+import org.apache.ignite.internal.processors.cluster.GridClusterStateProcessor;
+import org.apache.ignite.internal.processors.query.stat.config.StatisticsColumnConfiguration;
+import org.apache.ignite.internal.processors.query.stat.config.StatisticsObjectConfiguration;
+import org.apache.ignite.internal.processors.query.stat.messages.StatisticsKeyMessage;
+import org.apache.ignite.internal.processors.query.stat.messages.StatisticsObjectData;
+import org.apache.ignite.internal.processors.query.stat.messages.StatisticsRequest;
+import org.apache.ignite.internal.processors.query.stat.messages.StatisticsResponse;
+import org.apache.ignite.internal.processors.query.stat.view.StatisticsColumnConfigurationView;
+import org.apache.ignite.internal.processors.query.stat.view.StatisticsColumnGlobalDataView;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.thread.IgniteThreadPoolExecutor;
+
+/**
+ * Global statistics manager. Cache global statistics and collect it.
+ */
+public class IgniteGlobalStatisticsManager implements GridMessageListener {
+    /** */
+    private static final String STAT_GLOBAL_VIEW_NAME = "statisticsGlobalData";
+
+    /** */
+    private static final String STAT_GLOBAL_VIEW_DESCRIPTION = "Global statistics.";
+
+    /** Statistics manager. */
+    private final IgniteStatisticsManagerImpl statMgr;
+
+    /** Pool to process statistics requests. */
+    private final IgniteThreadPoolExecutor mgmtPool;
+
+    /** Discovery manager to get server node list to statistics master calculation. */
+    private final GridDiscoveryManager discoMgr;
+
+    /** Cluster state processor. */
+    private final GridClusterStateProcessor cluster;
+
+    /** Cache partition exchange manager. */
+    private final GridCachePartitionExchangeManager<?, ?> exchange;
+
+    /** Helper to transform or generate statistics related messages. */
+    private final IgniteStatisticsHelper helper;
+
+    /** Grid io manager to exchange global and local statistics. */
+    private final GridIoManager ioMgr;
+
+    /** Cache for global statistics. */
+    private final ConcurrentMap<StatisticsKey, CacheEntry<ObjectStatisticsImpl>> globalStatistics =
+        new ConcurrentHashMap<>();
+
+    /** Incoming requests which should be served after local statistics collection finish. */
+    private final ConcurrentMap<StatisticsKey, Collection<StatisticsAddressedRequest>> inLocalRequests =
+        new ConcurrentHashMap<>();
+
+    /** Incoming requests which should be served after global statistics collection finish. */
+    private final ConcurrentMap<StatisticsKey, Collection<StatisticsAddressedRequest>> inGloblaRequests =
+        new ConcurrentHashMap<>();
+
+    /** Outcoming global collection requests. */
+    private final ConcurrentMap<StatisticsKey, StatisticsGatheringContext> curCollections = new ConcurrentHashMap<>();
+
+    /** Outcoming global statistics requests to request id. */
+    private final ConcurrentMap<StatisticsKey, UUID> outGlobalStatisticsRequests = new ConcurrentHashMap<>();
+
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** Started flag. */
+    private boolean started;
+
+    /** Exchange listener: clean inbound requests and restart outbount. */
+    private final PartitionsExchangeAware exchAwareLsnr = new PartitionsExchangeAware() {
+        @Override public void onDoneAfterTopologyUnlock(GridDhtPartitionsExchangeFuture fut) {
+
+            // Skip join/left client nodes.
+            if (fut.exchangeType() != GridDhtPartitionsExchangeFuture.ExchangeType.ALL ||
+                cluster.clusterState().lastState() != ClusterState.ACTIVE)
+                return;
+
+            DiscoveryEvent evt = fut.firstEvent();
+
+            // Skip create/destroy caches.
+            if (evt.type() == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) {
+                DiscoveryCustomMessage msg = ((DiscoveryCustomEvent)evt).customMessage();
+
+                if (msg instanceof DynamicCacheChangeBatch)
+                    return;
+
+                // Just clear all activities and update topology version.
+                if (log.isDebugEnabled())
+                    log.debug("Resetting all global statistics activities due to new topology " +
+                        fut.topologyVersion());
+
+                inLocalRequests.clear();
+                inGloblaRequests.clear();
+
+                Set<StatisticsKey> curColls = curCollections.keySet();
+
+                for (StatisticsKey key : curColls) {
+                    curCollections.remove(key);
+
+                    mgmtPool.submit(() -> collectGlobalStatistics(key));
+                }
+
+                Set<StatisticsKey> outReqs = outGlobalStatisticsRequests.keySet();
+
+                for (StatisticsKey key : outReqs) {
+                    outGlobalStatisticsRequests.remove(key);
+
+                    mgmtPool.submit(() -> collectGlobalStatistics(key));
+                }
+            }
+        }
+    };
+
+    /**
+     * Constructor.
+     *
+     * @param statMgr Statistics manager.
+     * @param sysViewMgr System view manager.
+     * @param mgmtPool Statistics management pool.
+     * @param discoMgr Grid discovery manager.
+     * @param cluster Cluster state processor.
+     * @param exchange Partition exchange manager.
+     * @param helper Statistics helper.
+     * @param ioMgr Communication manager.
+     * @param logSupplier Log supplier.
+     */
+    public IgniteGlobalStatisticsManager(
+        IgniteStatisticsManagerImpl statMgr,
+        GridSystemViewManager sysViewMgr,
+        IgniteThreadPoolExecutor mgmtPool,
+        GridDiscoveryManager discoMgr,
+        GridClusterStateProcessor cluster,
+        GridCachePartitionExchangeManager<?, ?> exchange,
+        IgniteStatisticsHelper helper,
+        GridIoManager ioMgr,
+        Function<Class<?>, IgniteLogger> logSupplier
+    ) {
+        this.statMgr = statMgr;
+        this.mgmtPool = mgmtPool;
+        this.discoMgr = discoMgr;
+        this.cluster = cluster;
+        this.exchange = exchange;
+        this.helper = helper;
+        this.ioMgr = ioMgr;
+        log = logSupplier.apply(IgniteGlobalStatisticsManager.class);
+
+        statMgr.subscribeToLocalStatistics(nls -> onLocalStatisticsAggregated(nls.key(), nls.statistics(),
+            nls.topologyVersion()));
+        statMgr.subscribeToStatisticsConfig(this::onConfigChanged);
+        ioMgr.addMessageListener(GridTopic.TOPIC_STATISTICS, this);
+
+        sysViewMgr.registerFiltrableView(STAT_GLOBAL_VIEW_NAME, STAT_GLOBAL_VIEW_DESCRIPTION,
+            new StatisticsColumnGlobalDataViewWalker(), this::columnGlobalStatisticsViewSupplier, Function.identity());
+    }
+
+    /**
+     * Statistics column global data view filterable supplier.
+     *
+     * @param filter Filter.
+     * @return Iterable with statistics column global data views.
+     */
+    private Iterable<StatisticsColumnGlobalDataView> columnGlobalStatisticsViewSupplier(Map<String, Object> filter) {
+        String type = (String)filter.get(StatisticsColumnPartitionDataViewWalker.TYPE_FILTER);
+        if (type != null && !StatisticsColumnConfigurationView.TABLE_TYPE.equalsIgnoreCase(type))
+            return Collections.emptyList();
+
+        String schema = (String)filter.get(StatisticsColumnLocalDataViewWalker.SCHEMA_FILTER);
+        String name = (String)filter.get(StatisticsColumnLocalDataViewWalker.NAME_FILTER);
+        String column = (String)filter.get(StatisticsColumnPartitionDataViewWalker.COLUMN_FILTER);
+
+        Map<StatisticsKey, ObjectStatisticsImpl> globalStatsMap;
+        if (!F.isEmpty(schema) && !F.isEmpty(name)) {
+            StatisticsKey key = new StatisticsKey(schema, name);
+
+            CacheEntry<ObjectStatisticsImpl> objLocStat = globalStatistics.get(key);
+
+            if (objLocStat == null || objLocStat.obj == null)
+                return Collections.emptyList();
+
+            globalStatsMap = Collections.singletonMap(key, objLocStat.object());
+        }
+        else
+            globalStatsMap = globalStatistics.entrySet().stream()
+                .filter(e -> e.getValue().object() != null && (F.isEmpty(schema) || schema.equals(e.getKey().schema())))
+                .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().object()));
+
+        List<StatisticsColumnGlobalDataView> res = new ArrayList<>();
+
+        for (Map.Entry<StatisticsKey, ObjectStatisticsImpl> localStatsEntry : globalStatsMap.entrySet()) {
+            StatisticsKey key = localStatsEntry.getKey();
+            ObjectStatisticsImpl stat = localStatsEntry.getValue();
+
+            if (column == null) {
+                for (Map.Entry<String, ColumnStatistics> colStat : localStatsEntry.getValue().columnsStatistics()
+                    .entrySet()) {
+                    StatisticsColumnGlobalDataView colStatView = new StatisticsColumnGlobalDataView(key,
+                        colStat.getKey(), stat);
+
+                    res.add(colStatView);
+                }
+            }
+            else {
+                ColumnStatistics colStat = localStatsEntry.getValue().columnStatistics(column);
+
+                if (colStat != null) {
+                    StatisticsColumnGlobalDataView colStatView = new StatisticsColumnGlobalDataView(key, column, stat);
+
+                    res.add(colStatView);
+                }
+            }
+        }
+
+        return res;
+    }
+
+    /**
+     * Start operations.
+     * Shouldn't be called twice.
+     */
+    public synchronized void start() {
+        if (started) {
+            if (log.isDebugEnabled())
+                log.debug("IgniteGlobalStatisticsManager already started.");
+
+            return;
+        }
+
+        if (log.isDebugEnabled())
+            log.debug("Global statistics manager starting...");
+
+        globalStatistics.clear();
+        exchange.registerExchangeAwareComponent(exchAwareLsnr);
+
+        started = true;
+
+        if (log.isDebugEnabled())
+            log.debug("Global statistics manager started.");
+    }
+
+    /**
+     * Stop operations.
+     * Shouldn't be called twice.
+     */
+    public synchronized void stop() {
+        if (!started) {
+            if (log.isDebugEnabled())
+                log.debug("IgniteGlobalStatisticsManager already stopped.");
+
+            return;
+        }
+
+        if (log.isDebugEnabled())
+            log.debug("Global statistics manager stopping...");
+
+        globalStatistics.clear();
+
+        inGloblaRequests.clear();
+        inLocalRequests.clear();
+        outGlobalStatisticsRequests.clear();
+        curCollections.clear();
+
+        exchange.unregisterExchangeAwareComponent(exchAwareLsnr);
+
+        started = false;
+
+        if (log.isDebugEnabled())
+            log.debug("Global statistics manager stopped.");
+    }
+
+    /**
+     * Get global statistics for the given key. If there is no cached statistics, but there is cache record with
+     * empty object - no additional collection will be started.
+     *
+     * @param key Statistics key.
+     * @return Global object statistics or {@code null} if there is no global statistics available.
+     */
+    public ObjectStatisticsImpl getGlobalStatistics(StatisticsKey key) {
+        CacheEntry<ObjectStatisticsImpl> res = globalStatistics.computeIfAbsent(key, k -> {
+            if (log.isDebugEnabled())
+                log.debug("Scheduling global statistics collection by key " + key);
+
+            mgmtPool.submit(() -> collectGlobalStatistics(key));
+
+            return new CacheEntry<>(null);
+        });
+
+        return res.object();
+    }
+
+    /**
+     * Either send local or global statistics request to get global statistics.
+     *
+     * @param key Statistics key to get global statistics by.
+     */
+    private void collectGlobalStatistics(StatisticsKey key) {
+        try {
+            StatisticsObjectConfiguration statCfg = statMgr.statisticConfiguration().config(key);
+
+            if (statCfg != null && !statCfg.columns().isEmpty()) {
+                UUID statMaster = getStatisticsMasterNode(key);
+
+                if (discoMgr.localNode().id().equals(statMaster))
+                    gatherGlobalStatistics(statCfg);
+                else {
+                    StatisticsKeyMessage keyMsg = new StatisticsKeyMessage(key.schema(), key.obj(),
+                        Collections.emptyList());
+
+                    Map<String, Long> versions = statCfg.columns().entrySet().stream()
+                        .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().version()));
+
+                    StatisticsRequest globalReq = new StatisticsRequest(UUID.randomUUID(), keyMsg,
+                        StatisticsType.GLOBAL, null, versions);
+
+                    outGlobalStatisticsRequests.put(key, globalReq.reqId());
+
+                    if (log.isDebugEnabled())
+                        log.debug("Send global statistics request by configuration " + statCfg);
+
+                    send(statMaster, globalReq);
+
+                }
+            }
+            else {
+                if (log.isDebugEnabled())
+                    log.debug("Unable to start global statistics collection due to lack of configuration by key "
+                        + key);
+            }
+
+        }
+        catch (IgniteCheckedException e) {
+            if (log.isInfoEnabled())
+                log.info("Unable to get statistics configuration due to " + e.getMessage());
+        }
+    }
+
+    /**
+     * Collect global statistics on master node.
+     *
+     * @param statCfg Statistics config to gather global statistics by.
+     */
+    private void gatherGlobalStatistics(StatisticsObjectConfiguration statCfg) throws IgniteCheckedException {
+        if (log.isDebugEnabled())
+            log.debug("Start global statistics collection by configuration " + statCfg);
+
+        StatisticsTarget target = new StatisticsTarget(statCfg.key());
+
+        List<StatisticsAddressedRequest> locRequests = helper.generateGatheringRequests(target, statCfg);
+        UUID reqId = locRequests.get(0).req().reqId();
+
+        StatisticsGatheringContext gatCtx = new StatisticsGatheringContext(locRequests.size(), reqId, statCfg);
+
+        curCollections.put(statCfg.key(), gatCtx);
+
+        for (StatisticsAddressedRequest addReq : locRequests) {
+            if (log.isDebugEnabled())
+                log.debug("Sending local request " + addReq.req().reqId() + " to node " + addReq.nodeId());
+
+            send(addReq.nodeId(), addReq.req());
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
+        mgmtPool.submit(() -> {
+            try {
+                if (msg instanceof StatisticsRequest) {
+                    StatisticsRequest req = (StatisticsRequest)msg;
+                    switch (req.type()) {
+                        case LOCAL:
+                            processLocalRequest(nodeId, req);
+
+                            break;
+
+                        case GLOBAL:
+                            processGlobalRequest(nodeId, req);
+
+                            break;
+
+                        default:
+                            log.warning("Unexpected type " + req.type() + " in statistics request message " + req);
+                    }
+                }
+                else if (msg instanceof StatisticsResponse) {
+                    StatisticsResponse resp = (StatisticsResponse)msg;
+
+                    switch (resp.data().type()) {
+                        case LOCAL:
+                            processLocalResponse(nodeId, resp);
+
+                            break;
+
+                        case GLOBAL:
+                            processGlobalResponse(nodeId, resp);
+
+                            break;
+
+                        default:
+                            log.warning("Unexpected type " + resp.data().type() +
+                                " in statistics reposonse message " + resp);
+                    }
+
+                }
+                else
+                    log.warning("Unknown msg " + msg + " in statistics topic " + GridTopic.TOPIC_STATISTICS +
+                        " from node " + nodeId);
+            }
+            catch (Throwable e) {
+                log.warning("Unable to process statistics message: " + e.getMessage(), e);
+            }
+        });
+    }
+
+    /**
+     * Process request for local statistics.
+     * 1) If there are local statistics for the given key - send response.
+     * 2) If there is no such statistics - add request to incoming queue.
+     *
+     * @param nodeId Sender node id.
+     * @param req Request to process.
+     * @throws IgniteCheckedException
+     */
+    private void processLocalRequest(UUID nodeId, StatisticsRequest req) throws IgniteCheckedException {
+        if (log.isDebugEnabled())
+            log.debug("Got local statistics request from node " + nodeId + " : " + req);
+
+        StatisticsKey key = new StatisticsKey(req.key().schema(), req.key().obj());
+
+        ObjectStatisticsImpl objectStatistics = statMgr.getLocalStatistics(key, req.topVer());
+
+        if (StatisticsUtils.compareVersions(objectStatistics, req.versions()) == 0)
+            sendResponse(nodeId, req.reqId(), key, StatisticsType.LOCAL, objectStatistics);
+        else {
+            addToRequests(inLocalRequests, key, new StatisticsAddressedRequest(req, nodeId));
+
+            objectStatistics = statMgr.getLocalStatistics(key, req.topVer());
+
+            if (StatisticsUtils.compareVersions(objectStatistics, req.versions()) == 0) {
+                StatisticsAddressedRequest removed = removeFromRequests(inLocalRequests, key, req.reqId());
+
+                if (removed != null)
+                    sendResponse(nodeId, req.reqId(), key, StatisticsType.LOCAL, objectStatistics);
+            }
+        }
+    }
+
+    /**
+     * Test if statistics configuration is fit to all required versions.
+     * @param cfg Statistics configuration to check.
+     * @param versions Map of column name to required version.
+     * @return {@code true} if it is, {@code false} otherwise.
+     */
+    private boolean checkStatisticsCfg(StatisticsObjectConfiguration cfg, Map<String, Long> versions) {
+        if (cfg == null)
+            return false;
+
+        for (Map.Entry<String, Long> version : versions.entrySet()) {
+            StatisticsColumnConfiguration colCfg = cfg.columns().get(version.getKey());
+
+            if (colCfg == null || colCfg.version() < version.getValue())
+                return false;
+        }
+
+        return true;
+    }
+
+    /**
+     * Process incoming request for global statistics. Either response (if it exists), or collect and response
+     * (if current node is master node for the given key) or ignore (if current node is no more master node for
+     * the given key.
+     *
+     * @param nodeId Sender node id.
+     * @param req Request.
+     */
+    private void processGlobalRequest(UUID nodeId, StatisticsRequest req) throws IgniteCheckedException {
+        if (log.isDebugEnabled())
+            log.debug("Got global statistics request from node " + nodeId + " : " + req);
+
+        StatisticsKey key = new StatisticsKey(req.key().schema(), req.key().obj());
+
+        ObjectStatisticsImpl objStatistics = getGlobalStatistics(key, req.versions());
+
+        if (objStatistics == null) {
+            if (discoMgr.localNode().id().equals(getStatisticsMasterNode(key))) {
+
+                addToRequests(inGloblaRequests, key, new StatisticsAddressedRequest(req, nodeId));
+                globalStatistics.computeIfAbsent(key, k -> new CacheEntry<>(null));
+
+                if (!hasCurrentCollection(key, req.versions())) {
+                    StatisticsObjectConfiguration objCfg = statMgr.statisticConfiguration().config(key);
+
+                    if (StatisticsUtils.compareVersions(objCfg, req.versions()) >= 0)
+                        gatherGlobalStatistics(objCfg);
+                    else {
+                        if (log.isDebugEnabled())
+                            log.debug("Wait for statistics configuration to process global statistics request " +
+                                req.reqId());
+                    }
+                }
+            }
+
+            objStatistics = getGlobalStatistics(key, req.versions());
+
+            if (objStatistics != null) {
+                StatisticsAddressedRequest removed = removeFromRequests(inGloblaRequests, key, req.reqId());
+
+                if (removed != null)
+                    sendResponse(nodeId, req.reqId(), key, StatisticsType.GLOBAL, objStatistics);
+            }
+        }
+        else
+            sendResponse(nodeId, req.reqId(), key, StatisticsType.GLOBAL, objStatistics);
+    }
+
+    /**
+     * Check if there are already started current collection with specified parameters.
+     *
+     * @param key Statistics key.
+     * @param versions Reuqired versions.
+     * @return {@code true} if there are already current collection with specifie parameters, {@code false} - otherwise.
+     */
+    private boolean hasCurrentCollection(StatisticsKey key, Map<String, Long> versions) {
+        StatisticsGatheringContext ctx = curCollections.get(key);
+
+        if (ctx == null)
+            return false;
+
+        return StatisticsUtils.compareVersions(ctx.configuration(), versions) == 0;
+    }
+
+    /**
+     * Get apptopriate global statistics from cache.
+     *
+     * @param key Statistics key.
+     * @param versions Required versions.
+     * @return Global statistics or {@code null} if there are no such global statistics.
+     */
+    private ObjectStatisticsImpl getGlobalStatistics(StatisticsKey key, Map<String, Long> versions) {
+        CacheEntry<ObjectStatisticsImpl> objStatEntry = globalStatistics.get(key);
+
+        if (objStatEntry == null || StatisticsUtils.compareVersions(objStatEntry.object(), versions) != 0)
+            return null;
+
+        return objStatEntry.object();
+    }
+
+    /**
+     * Build statistics response and send it to specified node.
+     *
+     * @param nodeId Target node id.
+     * @param reqId Request id.
+     * @param key Statistics key.
+     * @param type Statistics type.
+     * @param data Statitsics data.
+     */
+    private void sendResponse(
+        UUID nodeId,
+        UUID reqId,
+        StatisticsKey key,
+        StatisticsType type,
+        ObjectStatisticsImpl data
+    ) throws IgniteCheckedException {
+        StatisticsKeyMessage keyMsg = new StatisticsKeyMessage(key.schema(), key.obj(), null);
+        StatisticsObjectData dataMsg = StatisticsUtils.toObjectData(keyMsg, type, data);
+
+        send(nodeId, new StatisticsResponse(reqId, dataMsg));
+    }
+
+    /**
+     * Add to addressed requests map.
+     *
+     * @param map Map to add into.
+     * @param key Request statistics key.
+     * @param req Request to add.
+     */
+    private void addToRequests(
+        ConcurrentMap<StatisticsKey, Collection<StatisticsAddressedRequest>> map,
+        StatisticsKey key,
+        StatisticsAddressedRequest req
+    ) {
+        map.compute(key, (k, v) -> {
+            if (v == null)
+                v = new ArrayList<>();
+
+            v.add(req);
+
+            return v;
+        });
+    }
+
+    /**
+     * Check if specified map contains request with specified key and id, remove and return it.
+     *
+     * @param key Request statistics key.
+     * @param reqId Request id.
+     * @return Removed request.
+     */
+    private StatisticsAddressedRequest removeFromRequests(
+        ConcurrentMap<StatisticsKey, Collection<StatisticsAddressedRequest>> map,
+        StatisticsKey key,
+        UUID reqId
+    ) {
+        StatisticsAddressedRequest[] res = new StatisticsAddressedRequest[1];
+
+        map.compute(key, (k, v) -> {
+            if (v != null)
+                res[0] = v.stream().filter(e -> reqId.equals(e.req().reqId())).findAny().orElse(null);
+
+            if (res[0] != null)
+                v = v.stream().filter(e -> !reqId.equals(e.req().reqId())).collect(Collectors.toList());
+
+            return v;
+        });
+
+        return res[0];
+    }
+
+    /**
+     * Process statistics configuration changes:
+     *
+     * 1) Remove all outbound activity by specified key, inbound may be suspended due to lack of
+     *  requested configuration.
+     * 2) Remove all inbound activity by changed key if reqiest col cfg versions lower than configuration col cfg versions.
+     * 3.1) If there are no live column's config - remove cached global statistics.
+     * 3.2) If there are some live columns config and global statistics cache contains statistics for the given key -
+     * start to collect it again.
+     */
+    public void onConfigChanged(StatisticsObjectConfiguration cfg) {
+       StatisticsKey key = cfg.key();
+
+       curCollections.remove(key);
+       outGlobalStatisticsRequests.remove(key);
+
+       inLocalRequests.computeIfPresent(key, (k, v) -> {
+           // Current config newer than income request - request should be invalidated.
+           v.removeIf(req -> StatisticsUtils.compareVersions(cfg, req.req().versions()) > 0);
+
+           return (v.isEmpty()) ? null : v;
+       });
+
+       inGloblaRequests.computeIfPresent(key, (k, v) -> {
+           v.removeIf(req -> StatisticsUtils.compareVersions(cfg, req.req().versions()) > 0);
+
+           return (v.isEmpty()) ? null : v;
+       });
+
+       if (cfg.columns().isEmpty())
+           globalStatistics.remove(key);
+       else {
+           globalStatistics.computeIfPresent(key, (k, v) -> {
+               if (v != null) {
+                   if (log.isDebugEnabled())
+                       log.debug("Scheduling global statistics recollection by key " + key);
+
+                   mgmtPool.submit(() -> collectGlobalStatistics(key));
+               }
+               return v;
+           });
+       }
+    }
+
+    /**
+     * Clear global object statistics.
+     *
+     * @param key Object key to clear global statistics by.
+     * @param colNames Only statistics by specified columns will be cleared.
+     */
+    public void clearGlobalStatistics(StatisticsKey key, Set<String> colNames) {
+        globalStatistics.computeIfPresent(key, (k, v) -> {
+            ObjectStatisticsImpl globStatOld = v.object();
+            ObjectStatisticsImpl globStatNew = (globStatOld == null) ? null : globStatOld.subtract(colNames);
+
+            return (globStatNew == null || globStatNew.columnsStatistics().isEmpty()) ? null :
+                new CacheEntry<>(globStatNew);
+        });
+
+        outGlobalStatisticsRequests.remove(key);
+    }
+
+    /**
+     * Process response with local statistics. Try to finish collecting operation and send pending requests.
+     *
+     * @param nodeId Sender node id.
+     * @param resp Statistics response to process.
+     * @throws IgniteCheckedException In case of error.
+     */
+    private void processLocalResponse(UUID nodeId, StatisticsResponse resp) throws IgniteCheckedException {
+        StatisticsKeyMessage keyMsg = resp.data().key();
+        StatisticsKey key = new StatisticsKey(keyMsg.schema(), resp.data().key().obj());
+
+        if (log.isDebugEnabled())
+            log.debug("Got local statistics response " + resp.reqId() + " from node " + nodeId + " by key " + key);
+
+        StatisticsGatheringContext curCtx = curCollections.get(key);
+
+        if (curCtx != null) {
+            if (!curCtx.reqId().equals(resp.reqId())) {
+                if (log.isDebugEnabled())
+                    log.debug("Got outdated local statistics response " + resp + " instead of " + curCtx.reqId());
+
+                return;
+            }
+
+            ObjectStatisticsImpl data = StatisticsUtils.toObjectStatistics(null, resp.data());
+
+            if (curCtx.registerResponse(data)) {
+                StatisticsObjectConfiguration cfg = statMgr.statisticConfiguration().config(key);
+
+                if (cfg != null) {
+                    if (log.isDebugEnabled())
+                        log.debug("Aggregating global statistics for key " + key + " by request " + curCtx.reqId());
+
+                    ObjectStatisticsImpl globalStat = helper.aggregateLocalStatistics(cfg, curCtx.collectedData());
+
+                    globalStatistics.put(key, new CacheEntry<>(globalStat));
+
+                    if (log.isDebugEnabled())
+                        log.debug("Global statistics for key " + key + " collected.");
+
+                    Collection<StatisticsAddressedRequest> globalRequests = inGloblaRequests.remove(key);
+
+                    if (globalRequests != null) {
+                        StatisticsObjectData globalStatData = StatisticsUtils.toObjectData(keyMsg,
+                            StatisticsType.GLOBAL, globalStat);
+
+                        for (StatisticsAddressedRequest req : globalRequests) {
+                            StatisticsResponse outResp = new StatisticsResponse(req.req().reqId(), globalStatData);
+
+                            send(req.nodeId(), outResp);
+                        }
+                    }
+                }
+                else {
+                    if (log.isDebugEnabled())
+                        log.debug("Dropping collected statistics due to lack of configuration for key " + key);
+                }
+
+                curCollections.remove(key);
+            }
+        }
+        else {
+            if (log.isDebugEnabled())
+                log.debug("Got outdated local statistics response " + resp);
+        }
+    }
+
+    /**
+     * Process response of global statistics.
+     *
+     * @param resp Response.
+     * @throws IgniteCheckedException In case of error.
+     */
+    private void processGlobalResponse(UUID nodeId, StatisticsResponse resp) throws IgniteCheckedException {
+        StatisticsKeyMessage keyMsg = resp.data().key();
+        StatisticsKey key = new StatisticsKey(keyMsg.schema(), keyMsg.obj());
+
+        if (log.isDebugEnabled())
+            log.debug("Got global statistics response " + resp.reqId() + " from node " + nodeId + " by key " + key);
+
+        UUID reqId = outGlobalStatisticsRequests.get(key);
+
+        if (reqId != null) {
+            if (!resp.reqId().equals(reqId)) {
+                if (log.isDebugEnabled())
+                    log.debug("Got outdated global statistics response " + resp + " instead of " + reqId);
+
+                return;
+            }
+
+            ObjectStatisticsImpl data = StatisticsUtils.toObjectStatistics(null, resp.data());
+
+            globalStatistics.put(key, new CacheEntry(data));
+            outGlobalStatisticsRequests.remove(key);
+        }
+        else {
+            if (log.isDebugEnabled())
+                log.debug("Got outdated global statistics response " + resp);
+        }
+    }
+
+    /**
+     * Calculate id of statistics master node for the given key.
+     *
+     * @param key Statistics key to calculate master node for.
+     * @return UUID of statistics master node.
+     */
+    private UUID getStatisticsMasterNode(StatisticsKey key) {
+        UUID[] nodes = discoMgr.aliveServerNodes().stream().map(ClusterNode::id).sorted().toArray(UUID[]::new);
+        int idx = IgniteUtils.hashToIndex(key.obj().hashCode(), nodes.length);
+
+        return nodes[idx];
+    }
+
+    /**
+     * After collecting local statistics - check if there are some pending request for it and send responces.
+     *
+     * @param key Statistics key on which local statistics was aggregated.
+     * @param statistics Collected statistics by key.
+     * @param topVer Topology version which aggregated statistics stands for.
+     */
+    public void onLocalStatisticsAggregated(
+        StatisticsKey key,
+        ObjectStatisticsImpl statistics,
+        AffinityTopologyVersion topVer
+    ) {
+        List<StatisticsAddressedRequest> inReqs = new ArrayList<>();
+
+        inLocalRequests.computeIfPresent(key, (k, v) -> {
+            List<StatisticsAddressedRequest> left = new ArrayList<>();
+
+            for (StatisticsAddressedRequest req : v) {
+                if (topVer.equals(req.req().topVer()) &&
+                    StatisticsUtils.compareVersions(statistics, req.req().versions()) == 0)
+                    inReqs.add(req);
+                else
+                    left.add(req);
+            }
+
+            return (left.isEmpty()) ? null : left;
+        });
+
+        if (inReqs.isEmpty())
+            return;
+
+        for (StatisticsAddressedRequest req : inReqs) {
+            try {
+                sendResponse(req.nodeId(), req.req().reqId(), key, StatisticsType.LOCAL, statistics);
+            }
+            catch (IgniteCheckedException e) {
+                log.info("Unable to send local object statistics for key " + key + " due to " + e.getMessage());
+            }
+        }
+    }
+
+    /**
+     * Send statistics related message.
+     *
+     * @param nodeId Target node id.
+     * @param msg Message to send.
+     * @throws IgniteCheckedException In case of error.
+     */
+    private void send(UUID nodeId, StatisticsRequest msg) throws IgniteCheckedException {
+        if (discoMgr.localNode().id().equals(nodeId)) {
+            switch (msg.type()) {
+                case LOCAL:
+                    processLocalRequest(nodeId, msg);
+
+                    break;
+
+                default:
+                    log.warning("Unexpected type " + msg.type() + " in statistics request message " + msg);
+            }
+        }
+        else
+            ioMgr.sendToGridTopic(nodeId, GridTopic.TOPIC_STATISTICS, msg, GridIoPolicy.MANAGEMENT_POOL);
+    }
+
+    /**
+     * Send statistics response or process it locally.
+     *
+     * @param nodeId Target node id. If equals to local node - corresponding method will be called directly.
+     * @param msg Statistics response to send.
+     * @throws IgniteCheckedException In case of error.
+     */
+    private void send(UUID nodeId, StatisticsResponse msg) throws IgniteCheckedException {
+        if (discoMgr.localNode().id().equals(nodeId)) {
+            switch (msg.data().type()) {
+                case LOCAL:
+                    processLocalResponse(nodeId, msg);
+
+                    break;
+
+                case GLOBAL:
+                    processGlobalResponse(nodeId, msg);
+
+                    break;
+
+                default:
+                    log.warning("Unexpected type " + msg.data().type() + " in statistics response message " + msg);
+            }
+        }
+        else
+            ioMgr.sendToGridTopic(nodeId, GridTopic.TOPIC_STATISTICS, msg, GridIoPolicy.MANAGEMENT_POOL);
+    }
+
+    /** Cache entry. */
+    private static class CacheEntry<T> {
+        /** Cached object. */
+        private final T obj;
+
+        /**
+         * Constructor.
+         *
+         * @param obj Cached object.
+         */
+        public CacheEntry(T obj) {
+            this.obj = obj;
+        }
+
+        /**
+         * @return Cached object.
+         */
+        public T object() {
+            return obj;
+        }
+    }
+
+    /** Context of global statistics gathering. */
+    private static class StatisticsGatheringContext {
+        /** Number of remaining requests. */
+        private int remainingResponses;
+
+        /** Requests id. */
+        private final UUID reqId;
+
+        /** Local object statistics from responses. */
+        private final Collection<ObjectStatisticsImpl> responses = new ArrayList<>();
+
+        /** Configuration, used to collect statistics. */
+        private final StatisticsObjectConfiguration cfg;
+
+        /**
+         * Constructor.
+         *
+         * @param responseCont Expectiong response count.
+         * @param reqId Requests id.
+         * @param cfg Configuration, used to collect statistics.
+         */
+        public StatisticsGatheringContext(int responseCont, UUID reqId, StatisticsObjectConfiguration cfg) {
+            remainingResponses = responseCont;
+            this.reqId = reqId;
+            this.cfg = cfg;
+        }
+
+        /**
+         * Register response.
+         *
+         * @param data Object statistics from response.
+         * @return {@code true} if all respones collected, {@code false} otherwise.
+         */
+        public synchronized boolean registerResponse(ObjectStatisticsImpl data) {
+            responses.add(data);
+            return --remainingResponses == 0;
+        }
+
+        /**
+         * @return Requests id.
+         */
+        public UUID reqId() {
+            return reqId;
+        }
+
+        /**
+         * Get collected local object statistics.
+         * @return Local object statistics.
+         */
+        public Collection<ObjectStatisticsImpl> collectedData() {
+            assert remainingResponses == 0;
+
+            return responses;
+        }
+
+        /**
+         * @return Statistics configuration, used to start gathering.
+         */
+        public StatisticsObjectConfiguration configuration() {
+            return cfg;
+        }
+    }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsConfigurationManager.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsConfigurationManager.java
index 326b028..fab6bcb 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsConfigurationManager.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsConfigurationManager.java
@@ -23,7 +23,9 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.function.BiConsumer;
+import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import org.apache.ignite.IgniteCheckedException;
@@ -100,30 +102,34 @@ public class IgniteStatisticsConfigurationManager {
     /** Is server node flag. */
     private final boolean isServerNode;
 
+    /** Configuration change subscribers. */
+    private List<Consumer<StatisticsObjectConfiguration>> subscribers = new CopyOnWriteArrayList<>();
+
     /** Change statistics configuration listener to update particular object statistics. */
     private final DistributedMetastorageLifecycleListener distrMetaStoreLsnr =
         new DistributedMetastorageLifecycleListener() {
-        @Override public void onReadyForWrite(DistributedMetaStorage metastorage) {
-            distrMetaStorage = metastorage;
-
-            distrMetaStorage.listen(
-                (metaKey) -> metaKey.startsWith(STAT_OBJ_PREFIX),
-                (k, oldV, newV) -> {
-                    // Skip invoke on start node (see 'ReadableDistributedMetaStorage#listen' the second case)
-                    // The update statistics on start node is handled by 'scanAndCheckLocalStatistic' method
-                    // called on exchange done.
-                    if (topVer == null)
-                        return;
+            @Override public void onReadyForWrite(DistributedMetaStorage metastorage) {
+                distrMetaStorage = metastorage;
+
+                distrMetaStorage.listen(
+                    (metaKey) -> metaKey.startsWith(STAT_OBJ_PREFIX),
+                    (k, oldV, newV) -> {
+                        // Skip invoke on start node (see 'ReadableDistributedMetaStorage#listen' the second case)
+                        // The update statistics on start node is handled by 'scanAndCheckLocalStatistic' method
+                        // called on exchange done.
+                        if (topVer == null)
+                            return;
 
-                    mgmtBusyExecutor.execute(() -> {
                         StatisticsObjectConfiguration newStatCfg = (StatisticsObjectConfiguration)newV;
 
-                        updateLocalStatistics(newStatCfg);
-                    });
-                }
-            );
-        }
-    };
+                        for (Consumer<StatisticsObjectConfiguration> subscriber : subscribers)
+                            subscriber.accept(newStatCfg);
+
+                        mgmtBusyExecutor.execute(() -> updateLocalStatistics(newStatCfg));
+                    }
+                );
+            }
+        };
 
     /**
      * Constructor.
@@ -204,7 +210,7 @@ public class IgniteStatisticsConfigurationManager {
          */
         @Override public void accept(GridH2Table tbl, List<String> cols) {
             assert !F.isEmpty(cols);
-                dropStatistics(Collections.singletonList(
+            dropStatistics(Collections.singletonList(
                     new StatisticsTarget(
                         tbl.identifier().schema(),
                         tbl.getName(),
@@ -603,4 +609,13 @@ public class IgniteStatisticsConfigurationManager {
 
         return sb.toString();
     }
+
+    /**
+     * Subscribe to statistics configuration changed.
+     *
+     * @param subscriber Subscriber.
+     */
+    public void subscribe(Consumer<StatisticsObjectConfiguration> subscriber) {
+        subscribers.add(subscriber);
+    }
 }
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsHelper.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsHelper.java
index 8f1893a..44b6cde 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsHelper.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsHelper.java
@@ -30,13 +30,18 @@ import java.util.UUID;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.query.QueryUtils;
 import org.apache.ignite.internal.processors.query.h2.SchemaManager;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
 import org.apache.ignite.internal.processors.query.stat.config.StatisticsColumnConfiguration;
 import org.apache.ignite.internal.processors.query.stat.config.StatisticsObjectConfiguration;
 import org.apache.ignite.internal.processors.query.stat.messages.StatisticsKeyMessage;
+import org.apache.ignite.internal.processors.query.stat.messages.StatisticsRequest;
 import org.apache.ignite.internal.util.typedef.F;
 import org.h2.table.Column;
 import org.jetbrains.annotations.Nullable;
@@ -68,6 +73,61 @@ public class IgniteStatisticsHelper {
     }
 
     /**
+     * Get cache group context by specified statistics key.
+     *
+     * @param key Statistics key to get context by.
+     * @return Cache group context for the given key.
+     * @throws IgniteCheckedException If unable to find table by specified key.
+     */
+    public CacheGroupContext groupContext(StatisticsKey key) throws IgniteCheckedException {
+        GridH2Table tbl = schemaMgr.dataTable(key.schema(), key.obj());
+
+        if (tbl == null)
+            throw new IgniteCheckedException(String.format("Can't find object %s.%s", key.schema(), key.obj()));
+
+        return tbl.cacheContext().group();
+    }
+
+    /**
+     * Generate local statistics requests.
+     *
+     * @param target Statistics target to request local statistics by.
+     * @param cfg Statistics configuration.
+     * @return Collection of statistics request.
+     */
+    public List<StatisticsAddressedRequest> generateGatheringRequests(
+        StatisticsTarget target,
+        StatisticsObjectConfiguration cfg
+    ) throws IgniteCheckedException {
+        List<String> cols = (target.columns() == null) ? null : Arrays.asList(target.columns());
+        StatisticsKeyMessage keyMsg = new StatisticsKeyMessage(target.schema(), target.obj(), cols);
+
+        Map<String, Long> versions = cfg.columns().entrySet().stream()
+            .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().version()));
+        CacheGroupContext grpCtx = groupContext(target.key());
+        AffinityTopologyVersion topVer = grpCtx.affinity().lastVersion();
+
+        StatisticsRequest req = new StatisticsRequest(UUID.randomUUID(), keyMsg, StatisticsType.LOCAL, topVer, versions);
+
+        List<List<ClusterNode>> assignments = grpCtx.affinity().assignments(topVer);
+        Set<UUID> nodes = new HashSet<>();
+
+        for (List<ClusterNode> partNodes : assignments) {
+            if (F.isEmpty(partNodes))
+                continue;
+
+            nodes.add(partNodes.get(0).id());
+        }
+
+        List<StatisticsAddressedRequest> res = new ArrayList<>(nodes.size());
+
+        for (UUID nodeId : nodes)
+            res.add(new StatisticsAddressedRequest(req, nodeId));
+
+        return res;
+    }
+
+    /**
      * Aggregate specified partition level statistics to local level statistics.
      *
      * @param cfg Statistics object configuration.
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsManagerImpl.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsManagerImpl.java
index f68b5b0..07ff549 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsManagerImpl.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsManagerImpl.java
@@ -22,6 +22,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.Consumer;
 
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
@@ -30,6 +31,7 @@ import org.apache.ignite.cluster.ClusterState;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheUtils;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
@@ -72,6 +74,9 @@ public class IgniteStatisticsManagerImpl implements IgniteStatisticsManager {
     /** Statistics repository. */
     private final IgniteStatisticsRepository statsRepos;
 
+    /** Global statistics repository. */
+    private final IgniteGlobalStatisticsManager globalStatsMgr;
+
     /** Ignite statistics helper. */
     private final IgniteStatisticsHelper helper;
 
@@ -144,31 +149,25 @@ public class IgniteStatisticsManagerImpl implements IgniteStatisticsManager {
         IgniteCacheDatabaseSharedManager db = (GridCacheUtils.isPersistenceEnabled(ctx.config())) ?
             ctx.cache().context().database() : null;
 
-        if (serverNode) {
-            gatherPool = new IgniteThreadPoolExecutor("stat-gather",
-                ctx.igniteInstanceName(),
-                0,
-                STATS_POOL_SIZE,
-                IgniteConfiguration.DFLT_THREAD_KEEP_ALIVE_TIME,
-                new LinkedBlockingQueue<>(),
-                GridIoPolicy.UNDEFINED,
-                ctx.uncaughtExceptionHandler()
-            );
-
-            mgmtPool = new IgniteThreadPoolExecutor("stat-mgmt",
-                ctx.igniteInstanceName(),
-                0,
-                1,
-                IgniteConfiguration.DFLT_THREAD_KEEP_ALIVE_TIME,
-                new LinkedBlockingQueue<>(),
-                GridIoPolicy.UNDEFINED,
-                ctx.uncaughtExceptionHandler()
-            );
-        }
-        else {
-            gatherPool = null;
-            mgmtPool = null;
-        }
+        gatherPool = (serverNode) ? new IgniteThreadPoolExecutor("stat-gather",
+            ctx.igniteInstanceName(),
+            0,
+            STATS_POOL_SIZE,
+            IgniteConfiguration.DFLT_THREAD_KEEP_ALIVE_TIME,
+            new LinkedBlockingQueue<>(),
+            GridIoPolicy.UNDEFINED,
+            ctx.uncaughtExceptionHandler()
+            ) : null;
+
+        mgmtPool = new IgniteThreadPoolExecutor("stat-mgmt",
+            ctx.igniteInstanceName(),
+            0,
+            1,
+            IgniteConfiguration.DFLT_THREAD_KEEP_ALIVE_TIME,
+            new LinkedBlockingQueue<>(),
+            GridIoPolicy.UNDEFINED,
+            ctx.uncaughtExceptionHandler()
+        );
 
         obsolescenceBusyExecutor = new BusyExecutor("obsolescence", mgmtPool, ctx::log);
 
@@ -201,6 +200,17 @@ public class IgniteStatisticsManagerImpl implements IgniteStatisticsManager {
             serverNode
         );
 
+        globalStatsMgr = new IgniteGlobalStatisticsManager(
+            this,
+            ctx.systemView(),
+            mgmtPool,
+            ctx.discovery(),
+            ctx.state(),
+            ctx.cache().context().exchange(),
+            helper,
+            ctx.io(),
+            ctx::log);
+
         ctx.internalSubscriptionProcessor().registerDistributedConfigurationListener(dispatcher -> {
             usageState.addListener((name, oldVal, newVal) -> {
                 if (log.isInfoEnabled())
@@ -253,6 +263,7 @@ public class IgniteStatisticsManagerImpl implements IgniteStatisticsManager {
         if (log.isDebugEnabled())
             log.debug("Stopping statistics subsystem");
 
+        globalStatsMgr.stop();
         statCfgMgr.stop();
 
         if (statProc != null)
@@ -292,6 +303,7 @@ public class IgniteStatisticsManagerImpl implements IgniteStatisticsManager {
             statProc.start();
 
         statCfgMgr.start();
+        globalStatsMgr.start();
 
         started = true;
     }
@@ -307,7 +319,25 @@ public class IgniteStatisticsManagerImpl implements IgniteStatisticsManager {
     @Override public ObjectStatistics getLocalStatistics(StatisticsKey key) {
         StatisticsUsageState currState = usageState();
 
-        return (currState == ON || currState == NO_UPDATE) ? statsRepos.getLocalStatistics(key) : null;
+        return (currState == ON || currState == NO_UPDATE) ? statsRepos.getLocalStatistics(key, null) : null;
+    }
+
+    /**
+     * Get local statitsics with specified topology version if exists.
+     *
+     * @param key Key to get statistics by.
+     * @param topVer Required topology version.
+     * @return Local object statistics or {@code null} if there are no statistics with requested topology version.
+     */
+    public ObjectStatisticsImpl getLocalStatistics(StatisticsKey key, AffinityTopologyVersion topVer) {
+        return statsRepos.getLocalStatistics(key, topVer);
+    }
+
+    /** {@inheritDoc} */
+    @Override public ObjectStatistics getGlobalStatistics(StatisticsKey key) {
+        StatisticsUsageState currState = usageState();
+
+        return (currState == ON || currState == NO_UPDATE) ? globalStatsMgr.getGlobalStatistics(key) : null;
     }
 
     /** {@inheritDoc} */
@@ -440,12 +470,14 @@ public class IgniteStatisticsManagerImpl implements IgniteStatisticsManager {
 
         for (StatisticsKey key : keys) {
             StatisticsObjectConfiguration cfg = null;
+
             try {
-                 cfg = statCfgMgr.config(key);
+                cfg = statCfgMgr.config(key);
             }
             catch (IgniteCheckedException e) {
                 // No-op/
             }
+
             Set<Integer> tasksParts = calculateObsolescencedPartitions(cfg, statsRepos.getObsolescence(key));
 
             GridH2Table tbl = schemaMgr.dataTable(key.schema(), key.obj());
@@ -489,6 +521,24 @@ public class IgniteStatisticsManagerImpl implements IgniteStatisticsManager {
     }
 
     /**
+     * Subscribe to all local statistics changes.
+     *
+     * @param subscriber Local statitics subscriber.
+     */
+    public void subscribeToLocalStatistics(Consumer<ObjectStatisticsEvent> subscriber) {
+        statsRepos.subscribeToLocalStatistics(subscriber);
+    }
+
+    /**
+     * Subscribe to all statistics configuration changed.
+     *
+     * @param subscriber Statistics configuration subscriber.
+     */
+    public void subscribeToStatisticsConfig(Consumer<StatisticsObjectConfiguration> subscriber) {
+        statCfgMgr.subscribe(subscriber);
+    }
+
+    /**
      * Check that cluster is active.
      *
      * @param op Operation name.
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsRepository.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsRepository.java
index 9dac793..ea4360d 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsRepository.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsRepository.java
@@ -23,12 +23,15 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.function.Consumer;
 import java.util.function.Function;
 
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.managers.systemview.GridSystemViewManager;
 import org.apache.ignite.internal.managers.systemview.walker.StatisticsColumnLocalDataViewWalker;
 import org.apache.ignite.internal.managers.systemview.walker.StatisticsColumnPartitionDataViewWalker;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.query.stat.config.StatisticsObjectConfiguration;
 import org.apache.ignite.internal.processors.query.stat.view.ColumnLocalDataViewSupplier;
 import org.apache.ignite.internal.processors.query.stat.view.ColumnPartitionDataViewSupplier;
@@ -60,7 +63,7 @@ public class IgniteStatisticsRepository {
     private final IgniteStatisticsStore store;
 
     /** Local (for current node) object statistics. */
-    private final Map<StatisticsKey, ObjectStatisticsImpl> locStats = new ConcurrentHashMap<>();
+    private final Map<StatisticsKey, VersionedStatistics> locStats = new ConcurrentHashMap<>();
 
     /** Obsolescence for each partition. */
     private final Map<StatisticsKey, IntMap<ObjectPartitionStatisticsObsolescence>> statObs = new ConcurrentHashMap<>();
@@ -68,6 +71,9 @@ public class IgniteStatisticsRepository {
     /** Statistics helper (msg converter). */
     private final IgniteStatisticsHelper helper;
 
+    /** Local statistics subscribers. */
+    private final List<Consumer<ObjectStatisticsEvent>> subscribers = new CopyOnWriteArrayList<>();
+
     /**
      * Constructor.
      *
@@ -178,9 +184,19 @@ public class IgniteStatisticsRepository {
      *
      * @param key Object key.
      * @param statistics Statistics to save.
+     * @param topVer Topology version.
      */
-    public void saveLocalStatistics(StatisticsKey key, ObjectStatisticsImpl statistics) {
-        locStats.put(key, statistics);
+    public void saveLocalStatistics(
+        StatisticsKey key,
+        ObjectStatisticsImpl statistics,
+        AffinityTopologyVersion topVer
+    ) {
+        locStats.put(key, new VersionedStatistics(topVer, statistics));
+
+        ObjectStatisticsEvent newLocalStat = new ObjectStatisticsEvent(key, statistics, topVer);
+
+        for (Consumer<ObjectStatisticsEvent> subscriber : subscribers)
+            subscriber.accept(newLocalStat);
     }
 
     /**
@@ -213,10 +229,16 @@ public class IgniteStatisticsRepository {
      * Get local statistics.
      *
      * @param key Object key to load statistics by.
+     * @param topVer Required topology version.
      * @return Object local statistics or {@code null} if there are no statistics collected for such object.
      */
-    public ObjectStatisticsImpl getLocalStatistics(StatisticsKey key) {
-        return locStats.get(key);
+    public ObjectStatisticsImpl getLocalStatistics(StatisticsKey key, AffinityTopologyVersion topVer) {
+        VersionedStatistics stat = locStats.get(key);
+
+        if (stat == null)
+            return null;
+
+        return (topVer != null && !stat.topologyVersion().equals(topVer)) ? null : stat.statistics();
     }
 
     /**
@@ -224,7 +246,7 @@ public class IgniteStatisticsRepository {
      *
      * @return Local (for current node) object statistics.
      */
-    public Map<StatisticsKey, ObjectStatisticsImpl> localStatisticsMap() {
+    public Map<StatisticsKey, VersionedStatistics> localStatisticsMap() {
         return locStats;
     }
 
@@ -240,11 +262,13 @@ public class IgniteStatisticsRepository {
      *
      * @param stats Partitions statistics to aggregate.
      * @param cfg Statistic configuration to specify statistic object to aggregate.
+     * @param topVer Topology version to which specified partition set actual for.
      * @return aggregated local statistic.
      */
     public ObjectStatisticsImpl aggregatedLocalStatistics(
         Collection<ObjectPartitionStatisticsImpl> stats,
-        StatisticsObjectConfiguration cfg
+        StatisticsObjectConfiguration cfg,
+        AffinityTopologyVersion topVer
     ) {
         if (log.isDebugEnabled()) {
             log.debug("Refresh local aggregated statistic [cfg=" + cfg +
@@ -254,7 +278,7 @@ public class IgniteStatisticsRepository {
         ObjectStatisticsImpl locStat = helper.aggregateLocalStatistics(cfg, stats);
 
         if (locStat != null)
-            saveLocalStatistics(cfg.key(), locStat);
+            saveLocalStatistics(cfg.key(), locStat, topVer);
 
         return locStat;
     }
@@ -320,4 +344,50 @@ public class IgniteStatisticsRepository {
             }
         });
     }
+
+    /**
+     * Subscribe to all local statistics changes.
+     *
+     * @param subscriber Local statitics subscriber.
+     */
+    public void subscribeToLocalStatistics(Consumer<ObjectStatisticsEvent> subscriber
+    ) {
+        subscribers.add(subscriber);
+    }
+
+    /**
+     * Object statistics with topology version to which it is actual for.
+     */
+    public static class VersionedStatistics {
+        /** Topology version. */
+        private final AffinityTopologyVersion topVer;
+
+        /** Statistics. */
+        private final ObjectStatisticsImpl statistics;
+
+        /**
+         * Constructor.
+         *
+         * @param topVer Topology version.
+         * @param statistics Statistics.
+         */
+        public VersionedStatistics(AffinityTopologyVersion topVer, ObjectStatisticsImpl statistics) {
+            this.topVer = topVer;
+            this.statistics = statistics;
+        }
+
+        /**
+         * @return Topology version.
+         */
+        public AffinityTopologyVersion topologyVersion() {
+            return topVer;
+        }
+
+        /**
+         * @return Statistics.
+         */
+        public ObjectStatisticsImpl statistics() {
+            return statistics;
+        }
+    }
 }
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/ObjectStatisticsEvent.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/ObjectStatisticsEvent.java
new file mode 100644
index 0000000..56450b4
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/ObjectStatisticsEvent.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.stat;
+
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+
+/**
+ * Local object statistics change event.
+ */
+public class ObjectStatisticsEvent {
+    /** Statistics key. */
+    private final StatisticsKey key;
+
+    /** Local object statistics. */
+    private final ObjectStatisticsImpl statistics;
+
+    /** Local statistics topology version. */
+    private final AffinityTopologyVersion topVer;
+
+    /**
+     * Constructor.
+     *
+     * @param key Statistics key.
+     * @param statistics Local object statistics.
+     * @param topVer Affinity topology version for whick local object statistics were calculated.
+     */
+    public ObjectStatisticsEvent(StatisticsKey key,
+        ObjectStatisticsImpl statistics, AffinityTopologyVersion topVer) {
+        this.key = key;
+        this.statistics = statistics;
+        this.topVer = topVer;
+    }
+
+    /**
+     * Get statistics key.
+     *
+     * @return Statistics key.
+     */
+    public StatisticsKey key() {
+        return key;
+    }
+
+    /**
+     * Get object statistics.
+     *
+     * @return Object statistics.
+     */
+    public ObjectStatisticsImpl statistics() {
+        return statistics;
+    }
+
+    /**
+     * Get affinity topology version.
+     *
+     * @return topology version for which statisics were calculated.
+     */
+    public AffinityTopologyVersion topologyVersion() {
+        return topVer;
+    }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/ObjectStatisticsImpl.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/ObjectStatisticsImpl.java
index 8325f88..4b8952c 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/ObjectStatisticsImpl.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/ObjectStatisticsImpl.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.query.stat;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -104,4 +105,19 @@ public class ObjectStatisticsImpl implements Cloneable, ObjectStatistics {
     @Override public String toString() {
         return S.toString(ObjectStatisticsImpl.class, this);
     }
+
+    /**
+     * Remove specified columns from clone of current ObjectStatistics object.
+     *
+     * @param cols Columns to remove.
+     * @return Cloned object without specified columns statistics.
+     */
+    public <T extends ObjectStatisticsImpl> T subtract(Set<String> cols) {
+        T res = (T)clone();
+
+        for (String col : cols)
+            res.columnsStatistics().remove(col);
+
+        return res;
+    }
 }
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/StatisticsType.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/StatisticsAddressedRequest.java
similarity index 53%
copy from modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/StatisticsType.java
copy to modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/StatisticsAddressedRequest.java
index 38f3f67a..8a7c5fd 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/StatisticsType.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/StatisticsAddressedRequest.java
@@ -17,28 +17,42 @@
 
 package org.apache.ignite.internal.processors.query.stat;
 
-import org.jetbrains.annotations.Nullable;
+import java.util.UUID;
+import org.apache.ignite.internal.processors.query.stat.messages.StatisticsRequest;
 
 /**
- * Types of statistics width.
+ * Statistics request with target/sender node id.
  */
-public enum StatisticsType {
-    /** Statistics by some particular partition. */
-    PARTITION,
+public class StatisticsAddressedRequest {
+    /** Statistics request */
+    private final StatisticsRequest req;
 
-    /** Statistics by some data node. */
-    LOCAL;
-
-    /** Enumerated values. */
-    private static final StatisticsType[] VALUES = values();
+    /** Target/sender node id. */
+    private final UUID nodeId;
 
     /**
-     * Efficiently gets enumerated value from its ordinal.
+     * Constructor.
      *
-     * @param ord Ordinal value.
-     * @return Enumerated value or {@code null} if ordinal out of range.
+     * @param req Statistics request.
+     * @param nodeId Target/sender node id.
+     */
+    public StatisticsAddressedRequest(StatisticsRequest req, UUID nodeId) {
+        this.req = req;
+        this.nodeId = nodeId;
+    }
+
+    /**
+     * @return Statistics request.
      */
-    @Nullable public static StatisticsType fromOrdinal(int ord) {
-        return ord >= 0 && ord < VALUES.length ? VALUES[ord] : null;
+    public StatisticsRequest req() {
+        return req;
+    }
+
+    /**
+     * @return Target/sender node id.
+     */
+    public UUID nodeId() {
+        return nodeId;
     }
 }
+
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/StatisticsProcessor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/StatisticsProcessor.java
index 34fd6bd..84c7cb0 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/StatisticsProcessor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/StatisticsProcessor.java
@@ -235,7 +235,7 @@ public class StatisticsProcessor {
         Collection<ObjectPartitionStatisticsImpl> allParts = statRepo.getLocalPartitionsStatistics(key);
 
         if (ctx.forceRecollect())
-            statRepo.aggregatedLocalStatistics(allParts, ctx.configuration());
+            statRepo.aggregatedLocalStatistics(allParts, ctx.configuration(), ctx.topologyVersion());
         else {
             Set<Integer> partsToRemove = new HashSet<>();
             Collection<ObjectPartitionStatisticsImpl> partsToAggregate = new ArrayList<>();
@@ -251,7 +251,7 @@ public class StatisticsProcessor {
                 statRepo.clearLocalPartitionsStatistics(ctx.configuration().key(), partsToRemove);
 
             if (!partsToAggregate.isEmpty())
-                statRepo.aggregatedLocalStatistics(partsToAggregate, ctx.configuration());
+                statRepo.aggregatedLocalStatistics(partsToAggregate, ctx.configuration(), ctx.topologyVersion());
         }
     }
 
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/StatisticsType.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/StatisticsType.java
index 38f3f67a..29900bb 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/StatisticsType.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/StatisticsType.java
@@ -27,7 +27,10 @@ public enum StatisticsType {
     PARTITION,
 
     /** Statistics by some data node. */
-    LOCAL;
+    LOCAL,
+
+    /** Statistics by the whole object. */
+    GLOBAL,;
 
     /** Enumerated values. */
     private static final StatisticsType[] VALUES = values();
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/StatisticsUtils.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/StatisticsUtils.java
index a210944..99d2624 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/StatisticsUtils.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/StatisticsUtils.java
@@ -24,6 +24,8 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessage;
 import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessageFactory;
+import org.apache.ignite.internal.processors.query.stat.config.StatisticsColumnConfiguration;
+import org.apache.ignite.internal.processors.query.stat.config.StatisticsObjectConfiguration;
 import org.apache.ignite.internal.processors.query.stat.messages.StatisticsColumnData;
 import org.apache.ignite.internal.processors.query.stat.messages.StatisticsKeyMessage;
 import org.apache.ignite.internal.processors.query.stat.messages.StatisticsObjectData;
@@ -31,7 +33,7 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.h2.value.Value;
 
 /**
- * Utilities to convert statistics from/to messages.
+ * Utilities to convert statistics from/to messages, validate configurations with statistics and so on.
  */
 public class StatisticsUtils {
     /**
@@ -172,4 +174,63 @@ public class StatisticsUtils {
         String[] cols = (msg.colNames() == null) ? null : msg.colNames().toArray(new String[0]);
         return new StatisticsTarget(msg.schema(), msg.obj(), cols);
     }
+
+    /**
+     * Test if specified statistics are fit to all required versions.
+     * It means that specified statistics contains all columns with at least versions
+     * from specified map.
+     *
+     * @param stat Statistics to check. Can be {@code null}.
+     * @param versions Map of column name to required version.
+     * @return positive value if statistics versions bigger than specified in versions map,
+     *         negative value if statistics version smaller than specified in version map,
+     *         zero it they are equals.
+     */
+    public static int compareVersions(
+        ObjectStatisticsImpl stat,
+        Map<String, Long> versions
+    ) {
+        if (stat == null)
+            return -1;
+
+        for (Map.Entry<String, Long> version : versions.entrySet()) {
+            ColumnStatistics colStat = stat.columnsStatistics().get(version.getKey());
+
+            if (colStat == null || colStat.version() < version.getValue())
+                return -1;
+
+            if (colStat.version() > version.getValue())
+                return 1;
+        }
+
+        return 0;
+    }
+
+    /**
+     * Test if secified statistics configuration is fit to all required versions.
+     * It means that specified statistics configuraion contains all columns with at least versions from specified map.
+     *
+     * @param cfg Statistics configuraion to check. Can be {@code null}.
+     * @param versions Map of column name to required version.
+     * @return {@code true} if it is, {@code talse} otherwise.
+     */
+    public static int compareVersions(
+        StatisticsObjectConfiguration cfg,
+        Map<String, Long> versions
+    ) {
+        if (cfg == null)
+            return -1;
+
+        for (Map.Entry<String, Long> colVersion : versions.entrySet()) {
+            StatisticsColumnConfiguration colCfg = cfg.columns().get(colVersion.getKey());
+
+            if (colCfg == null || colCfg.version() < colVersion.getValue())
+                return -1;
+
+            if (colCfg.version() > colVersion.getValue())
+                return 1;
+        }
+
+        return 0;
+    }
 }
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/messages/StatisticsKeyMessage.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/messages/StatisticsKeyMessage.java
index 481c39d..7beab33 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/messages/StatisticsKeyMessage.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/messages/StatisticsKeyMessage.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Objects;
 
 import org.apache.ignite.internal.GridDirectCollection;
+import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
@@ -177,6 +178,11 @@ public class StatisticsKeyMessage implements Message {
     }
 
     /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(StatisticsKeyMessage.class, this);
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean equals(Object o) {
         if (this == o) return true;
         if (o == null || getClass() != o.getClass()) return false;
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/messages/StatisticsRequest.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/messages/StatisticsRequest.java
new file mode 100644
index 0000000..ea7c38d
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/messages/StatisticsRequest.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.stat.messages;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.internal.GridDirectMap;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.query.stat.StatisticsType;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * Request for statistics.
+ */
+public class StatisticsRequest implements Message {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    public static final short TYPE_CODE = 186;
+
+    /** Gathering id. */
+    private UUID reqId;
+
+    /** Key to supply statistics by. */
+    private StatisticsKeyMessage key;
+
+    /** Type of required statistcs. */
+    private StatisticsType type;
+
+    /** For local statistics request - column config versions map: name to version. */
+    @GridDirectMap(keyType = String.class, valueType = Long.class)
+    private Map<String, Long> versions;
+
+    /** For local statistics request - version to gather statistics by. */
+    private AffinityTopologyVersion topVer;
+
+    /**
+     * Constructor.
+     */
+    public StatisticsRequest() {
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param reqId Request id.
+     * @param key Statistics key to get statistics by.
+     * @param type Required statistics type.
+     * @param topVer Topology version to get statistics by.
+     * @param versions Map of statistics version to column name to ensure actual statistics aquired.
+     */
+    public StatisticsRequest(
+        UUID reqId,
+        StatisticsKeyMessage key,
+        StatisticsType type,
+        AffinityTopologyVersion topVer,
+        Map<String, Long> versions
+    ) {
+        this.reqId = reqId;
+        this.key = key;
+        this.type = type;
+        this.topVer = topVer;
+        this.versions = versions;
+    }
+
+    /** Request id. */
+    public UUID reqId() {
+        return reqId;
+    }
+
+    /**
+     * @return Required statistics type.
+     */
+    public StatisticsType type() {
+        return type;
+    }
+
+    /**
+     * @return Key for required statitics.
+     */
+    public StatisticsKeyMessage key() {
+        return key;
+    }
+
+    /**
+     * @return Topology version.
+     */
+    public AffinityTopologyVersion topVer() {
+        return topVer;
+    }
+
+    /**
+     * @return Column name to version map.
+     */
+    public Map<String, Long> versions() {
+        return versions;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(StatisticsRequest.class, this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeMessage("key", key))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeUuid("reqId", reqId))
+                    return false;
+
+                writer.incrementState();
+
+            case 2:
+                if (!writer.writeAffinityTopologyVersion("topVer", topVer))
+                    return false;
+
+                writer.incrementState();
+
+            case 3:
+                if (!writer.writeByte("type", type != null ? (byte)type.ordinal() : -1))
+                    return false;
+
+                writer.incrementState();
+
+            case 4:
+                if (!writer.writeMap("versions", versions, MessageCollectionItemType.STRING, MessageCollectionItemType.LONG))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                key = reader.readMessage("key");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                reqId = reader.readUuid("reqId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 2:
+                topVer = reader.readAffinityTopologyVersion("topVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 3:
+                byte typeOrd;
+
+                typeOrd = reader.readByte("type");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                type = StatisticsType.fromOrdinal(typeOrd);
+
+                reader.incrementState();
+
+            case 4:
+                versions = reader.readMap("versions", MessageCollectionItemType.STRING, MessageCollectionItemType.LONG, false);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(StatisticsRequest.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return TYPE_CODE;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 5;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+
+    }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/messages/StatisticsKeyMessage.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/messages/StatisticsResponse.java
similarity index 52%
copy from modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/messages/StatisticsKeyMessage.java
copy to modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/messages/StatisticsResponse.java
index 481c39d..fcf7063 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/messages/StatisticsKeyMessage.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/messages/StatisticsResponse.java
@@ -17,76 +17,66 @@
 
 package org.apache.ignite.internal.processors.query.stat.messages;
 
-import java.io.Externalizable;
 import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Objects;
-
-import org.apache.ignite.internal.GridDirectCollection;
+import java.util.UUID;
+import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 
 /**
- * Key, describing the object of statistics. For example: table with some columns.
+ * Response for statistics request.
  */
-public class StatisticsKeyMessage implements Message {
+public class StatisticsResponse implements Message {
     /** */
     private static final long serialVersionUID = 0L;
 
     /** */
-    public static final short TYPE_CODE = 183;
-
-    /** Object schema. */
-    private String schema;
+    public static final short TYPE_CODE = 187;
 
-    /** Object name. */
-    private String obj;
+    /** Request id. */
+    private UUID reqId;
 
-    /** Optional list of columns to collect statistics by. */
-    @GridDirectCollection(String.class)
-    private List<String> colNames;
+    /** Requested statistics. */
+    private StatisticsObjectData data;
 
     /**
-     * {@link Externalizable} support.
+     * Constructor.
      */
-    public StatisticsKeyMessage() {
-        // No-op.
+    public StatisticsResponse() {
     }
 
     /**
      * Constructor.
      *
-     * @param schema Schema name.
-     * @param obj Object name.
-     * @param colNames Column names.
+     * @param reqId Request id
+     * @param data Statistics data.
      */
-    public StatisticsKeyMessage(String schema, String obj, List<String> colNames) {
-        this.schema = schema;
-        this.obj = obj;
-        this.colNames = colNames;
+    public StatisticsResponse(
+        UUID reqId,
+        StatisticsObjectData data
+    ) {
+        this.reqId = reqId;
+        this.data = data;
     }
 
     /**
-     * @return Schema name.
+     * @return Request id.
      */
-    public String schema() {
-        return schema;
+    public UUID reqId() {
+        return reqId;
     }
 
     /**
-     * @return Object name.
+     * @return Statitics data.
      */
-    public String obj() {
-        return obj;
+    public StatisticsObjectData data() {
+        return data;
     }
 
-    /**
-     * @return Column names.
-     */
-    public List<String> colNames() {
-        return colNames;
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(StatisticsResponse.class, this);
     }
 
     /** {@inheritDoc} */
@@ -102,19 +92,13 @@ public class StatisticsKeyMessage implements Message {
 
         switch (writer.state()) {
             case 0:
-                if (!writer.writeCollection("colNames", colNames, MessageCollectionItemType.STRING))
+                if (!writer.writeMessage("data", data))
                     return false;
 
                 writer.incrementState();
 
             case 1:
-                if (!writer.writeString("obj", obj))
-                    return false;
-
-                writer.incrementState();
-
-            case 2:
-                if (!writer.writeString("schema", schema))
+                if (!writer.writeUuid("reqId", reqId))
                     return false;
 
                 writer.incrementState();
@@ -133,7 +117,7 @@ public class StatisticsKeyMessage implements Message {
 
         switch (reader.state()) {
             case 0:
-                colNames = reader.readCollection("colNames", MessageCollectionItemType.STRING);
+                data = reader.readMessage("data");
 
                 if (!reader.isLastRead())
                     return false;
@@ -141,15 +125,7 @@ public class StatisticsKeyMessage implements Message {
                 reader.incrementState();
 
             case 1:
-                obj = reader.readString("obj");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 2:
-                schema = reader.readString("schema");
+                reqId = reader.readUuid("reqId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -158,7 +134,7 @@ public class StatisticsKeyMessage implements Message {
 
         }
 
-        return reader.afterMessageRead(StatisticsKeyMessage.class);
+        return reader.afterMessageRead(StatisticsResponse.class);
     }
 
     /** {@inheritDoc} */
@@ -168,26 +144,11 @@ public class StatisticsKeyMessage implements Message {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 3;
+        return 2;
     }
 
     /** {@inheritDoc} */
     @Override public void onAckReceived() {
 
     }
-
-    /** {@inheritDoc} */
-    @Override public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-        StatisticsKeyMessage that = (StatisticsKeyMessage) o;
-        return Objects.equals(schema, that.schema) &&
-            Objects.equals(obj, that.obj) &&
-            Objects.equals(colNames, that.colNames);
-    }
-
-    /** {@inheritDoc} */
-    @Override public int hashCode() {
-        return Objects.hash(schema, obj, colNames);
-    }
 }
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/view/ColumnLocalDataViewSupplier.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/view/ColumnLocalDataViewSupplier.java
index ba13e00..040d771 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/view/ColumnLocalDataViewSupplier.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/view/ColumnLocalDataViewSupplier.java
@@ -67,7 +67,7 @@ public class ColumnLocalDataViewSupplier {
         if (!F.isEmpty(schema) && !F.isEmpty(name)) {
             StatisticsKey key = new StatisticsKey(schema, name);
 
-            ObjectStatisticsImpl objLocStat = repo.getLocalStatistics(key);
+            ObjectStatisticsImpl objLocStat = repo.getLocalStatistics(key, null);
 
             if (objLocStat == null)
                 return Collections.emptyList();
@@ -77,7 +77,7 @@ public class ColumnLocalDataViewSupplier {
         else
             locStatsMap = repo.localStatisticsMap().entrySet().stream()
                 .filter(e -> F.isEmpty(schema) || schema.equals(e.getKey().schema()))
-                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+                .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().statistics()));
 
         List<StatisticsColumnLocalDataView> res = new ArrayList<>();
 
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/view/StatisticsColumnLocalDataView.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/view/StatisticsColumnGlobalDataView.java
similarity index 94%
copy from modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/view/StatisticsColumnLocalDataView.java
copy to modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/view/StatisticsColumnGlobalDataView.java
index 20c96dc..81e1a6a 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/view/StatisticsColumnLocalDataView.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/view/StatisticsColumnGlobalDataView.java
@@ -18,16 +18,15 @@
 package org.apache.ignite.internal.processors.query.stat.view;
 
 import java.sql.Timestamp;
-
 import org.apache.ignite.internal.managers.systemview.walker.Filtrable;
 import org.apache.ignite.internal.managers.systemview.walker.Order;
 import org.apache.ignite.internal.processors.query.stat.ObjectStatisticsImpl;
 import org.apache.ignite.internal.processors.query.stat.StatisticsKey;
 
 /**
- * Statistics partition data view.
+ * Statistics global data view.
  */
-public class StatisticsColumnLocalDataView {
+public class StatisticsColumnGlobalDataView {
     /** Statistics key */
     private final StatisticsKey key;
 
@@ -44,7 +43,7 @@ public class StatisticsColumnLocalDataView {
      * @param column Column name.
      * @param statistics Object local statistics.
      */
-    public StatisticsColumnLocalDataView(StatisticsKey key, String column, ObjectStatisticsImpl statistics) {
+    public StatisticsColumnGlobalDataView(StatisticsKey key, String column, ObjectStatisticsImpl statistics) {
         this.key = key;
         this.column = column;
         this.statistics = statistics;
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/view/StatisticsColumnLocalDataView.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/view/StatisticsColumnLocalDataView.java
index 20c96dc..8db1888 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/view/StatisticsColumnLocalDataView.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/view/StatisticsColumnLocalDataView.java
@@ -25,7 +25,7 @@ import org.apache.ignite.internal.processors.query.stat.ObjectStatisticsImpl;
 import org.apache.ignite.internal.processors.query.stat.StatisticsKey;
 
 /**
- * Statistics partition data view.
+ * Statistics local data view.
  */
 public class StatisticsColumnLocalDataView {
     /** Statistics key */
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsRepositoryStaticTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsRepositoryStaticTest.java
new file mode 100644
index 0000000..4d0a2ae
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsRepositoryStaticTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.stat;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.stream.Collectors;
+
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.h2.value.ValueInt;
+import org.junit.Test;
+
+/**
+ * Test of static methods of Ignite statistics repository.
+ */
+public class IgniteStatisticsRepositoryStaticTest extends StatisticsAbstractTest {
+    /** First default key. */
+    protected static final StatisticsKey K1 = new StatisticsKey(SCHEMA, "tab1");
+
+    /** Second default key. */
+    protected static final StatisticsKey K2 = new StatisticsKey(SCHEMA, "tab2");
+
+    /** Column statistics with 100 nulls. */
+    protected ColumnStatistics cs1 = new ColumnStatistics(null, null, 100, 0, 100,
+        0, new byte[0], 0, U.currentTimeMillis());
+
+    /** Column statistics with 100 integers 0-100. */
+    protected ColumnStatistics cs2 = new ColumnStatistics(ValueInt.get(0), ValueInt.get(100), 0, 100, 100,
+        4, new byte[0], 0, U.currentTimeMillis());
+
+    /** Column statistics with 0 rows. */
+    protected ColumnStatistics cs3 = new ColumnStatistics(null, null, 0, 0, 0, 0,
+        new byte[0], 0, U.currentTimeMillis());
+
+    /** Column statistics with 100 integers 0-10. */
+    protected ColumnStatistics cs4 = new ColumnStatistics(ValueInt.get(0), ValueInt.get(10), 0, 10, 100,
+        4, new byte[0], 0, U.currentTimeMillis());
+
+    /**
+     * 1) Remove not existing column.
+     * 2) Remove some columns.
+     * 3) Remove all columns.
+     */
+    @Test
+    public void subtractTest() {
+        HashMap<String, ColumnStatistics> colStat1 = new HashMap<>();
+        colStat1.put("col1", cs1);
+        colStat1.put("col2", cs2);
+
+        ObjectStatisticsImpl os = new ObjectStatisticsImpl(100, colStat1);
+
+        // 1) Remove not existing column.
+        ObjectStatisticsImpl os1 = os.subtract(Collections.singleton("col0"));
+
+        assertEquals(os, os1);
+
+        // 2) Remove some columns.
+        ObjectStatisticsImpl os2 = os.subtract(Collections.singleton("col1"));
+
+        assertEquals(1, os2.columnsStatistics().size());
+        assertEquals(cs2, os2.columnStatistics("col2"));
+
+        // 3) Remove all columns.
+        ObjectStatisticsImpl os3 = os.subtract(Arrays.stream(new String[] {"col2", "col1"}).collect(Collectors.toSet()));
+
+        assertTrue(os3.columnsStatistics().isEmpty());
+    }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsRepositoryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsRepositoryTest.java
index b47f075..c8b64eb 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsRepositoryTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsRepositoryTest.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.managers.systemview.GridSystemViewManager;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
 import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageLifecycleListener;
 import org.apache.ignite.internal.processors.metastorage.persistence.ReadWriteMetaStorageMock;
@@ -195,16 +196,16 @@ public class IgniteStatisticsRepositoryTest extends StatisticsAbstractTest {
         ObjectStatisticsImpl stat1 = getStatistics();
         ObjectStatisticsImpl stat2 = getStatistics();
 
-        repo.saveLocalStatistics(K1, stat1);
-        repo.saveLocalStatistics(K2, stat2);
+        repo.saveLocalStatistics(K1, stat1, AffinityTopologyVersion.ZERO);
+        repo.saveLocalStatistics(K2, stat2, AffinityTopologyVersion.ZERO);
 
-        assertEquals(stat1, repo.getLocalStatistics(K1));
-        assertEquals(stat1, repo.getLocalStatistics(K2));
+        assertEquals(stat1, repo.getLocalStatistics(K1, null));
+        assertEquals(stat1, repo.getLocalStatistics(K2, AffinityTopologyVersion.ZERO));
 
         repo.clearLocalPartitionsStatistics(K1, null);
 
-        assertNull(repo.getLocalStatistics(K1));
-        assertEquals(stat2, repo.getLocalStatistics(K2));
+        assertNull(repo.getLocalStatistics(K1, AffinityTopologyVersion.ZERO));
+        assertEquals(stat2, repo.getLocalStatistics(K2, null));
     }
 
     /**
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/PSUBasicValueDistributionTableStatisticsUsageTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/PSUBasicValueDistributionTableStatisticsUsageTest.java
index 4a39535..b757565 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/PSUBasicValueDistributionTableStatisticsUsageTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/PSUBasicValueDistributionTableStatisticsUsageTest.java
@@ -93,7 +93,7 @@ public class PSUBasicValueDistributionTableStatisticsUsageTest extends Statistic
 
         sql("CREATE INDEX empty_distribution_no_stat_col_a ON empty_distribution_no_stat(col_a)");
 
-        collectStatistics("digital_distribution", "empty_distribution");
+        collectStatistics(StatisticsType.GLOBAL, "digital_distribution", "empty_distribution");
     }
 
     /**
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/PSUCompositeIndexTableStatisticsUsageTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/PSUCompositeIndexTableStatisticsUsageTest.java
index c44e350..391ee07 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/PSUCompositeIndexTableStatisticsUsageTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/PSUCompositeIndexTableStatisticsUsageTest.java
@@ -72,7 +72,7 @@ public class PSUCompositeIndexTableStatisticsUsageTest extends StatisticsAbstrac
             sql(sql);
         }
 
-        collectStatistics("ci_table");
+        collectStatistics(StatisticsType.GLOBAL, "ci_table");
     }
 
     /**
@@ -92,12 +92,12 @@ public class PSUCompositeIndexTableStatisticsUsageTest extends StatisticsAbstrac
         checkOptimalPlanChosenForDifferentIndexes(grid(0), new String[]{"CI_TABLE_ABC"}, sql, new String[1][]);
 
         sql("CREATE INDEX ci_table_c ON ci_table(col_c)");
-        updateStatistics("ci_table");
+        updateStatistics(StatisticsType.GLOBAL, "ci_table");
 
         checkOptimalPlanChosenForDifferentIndexes(grid(0), new String[]{"CI_TABLE_ABC"}, sql, new String[1][]);
 
         sql("DROP INDEX IF EXISTS ci_table_c");
-        updateStatistics("ci_table");
+        updateStatistics(StatisticsType.GLOBAL, "ci_table");
     }
 
     /**
@@ -117,12 +117,12 @@ public class PSUCompositeIndexTableStatisticsUsageTest extends StatisticsAbstrac
         checkOptimalPlanChosenForDifferentIndexes(grid(0), new String[]{}, sql, new String[1][]);
 
         sql("CREATE INDEX ci_table_c ON ci_table(col_c)");
-        updateStatistics("ci_table");
+        updateStatistics(StatisticsType.GLOBAL, "ci_table");
 
         checkOptimalPlanChosenForDifferentIndexes(grid(0), new String[]{"CI_TABLE_C"}, sql, new String[1][]);
 
         sql("DROP INDEX IF EXISTS ci_table_c");
-        updateStatistics("ci_table");
+        updateStatistics(StatisticsType.GLOBAL, "ci_table");
     }
 
     /**
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/PSUStatisticPartialGatheringTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/PSUStatisticPartialGatheringTest.java
index bd3029c..d30ef38 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/PSUStatisticPartialGatheringTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/PSUStatisticPartialGatheringTest.java
@@ -52,7 +52,7 @@ public class PSUStatisticPartialGatheringTest extends StatisticsAbstractTest {
             sql(String.format("insert into tbl_select(id, lo_select, med_select, hi_select) values(%d, %d, %d, %d)",
                 i, i % 10, i % 100, i % 1000));
 
-        collectStatistics("tbl_select");
+        collectStatistics(StatisticsType.GLOBAL, "tbl_select");
     }
 
     /**
@@ -82,7 +82,7 @@ public class PSUStatisticPartialGatheringTest extends StatisticsAbstractTest {
             String.format(SQL, 6, 6), NO_HINTS);
 
         // All columns set up before also will be updated
-        updateStatistics(new StatisticsTarget(SCHEMA, "TBL_SELECT", "LO_SELECT"));
+        updateStatistics(StatisticsType.GLOBAL, new StatisticsTarget(SCHEMA, "TBL_SELECT", "LO_SELECT"));
 
         checkOptimalPlanChosenForDifferentIndexes(grid(0), new String[]{"TBL_SELECT_LO_IDX"},
             String.format(SQL, 8, 8), NO_HINTS);
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/PSUStatisticsStorageTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/PSUStatisticsStorageTest.java
index ad16f14..19d97ed 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/PSUStatisticsStorageTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/PSUStatisticsStorageTest.java
@@ -43,7 +43,7 @@ public class PSUStatisticsStorageTest extends StatisticsStorageAbstractTest {
      */
     @Test
     public void testPartialDeletionCollection() throws Exception {
-        collectStatistics(SMALL_TARGET);
+        collectStatistics(StatisticsType.GLOBAL, SMALL_TARGET);
 
         IgniteEx ign = grid(0);
 
@@ -51,6 +51,8 @@ public class PSUStatisticsStorageTest extends StatisticsStorageAbstractTest {
         checkOptimalPlanChosenForDifferentIndexes(ign, new String[]{"SMALL_B"}, SQL, NO_HINTS);
 
         // 2) partially remove statistics for one extra column and check chat the rest statistics still can be used
+        log.info("Dropping statistics by A column...");
+
         statisticsMgr(0).dropStatistics(new StatisticsTarget("PUBLIC", "SMALL", "A"));
 
         assertTrue(GridTestUtils.waitForCondition(
@@ -61,6 +63,8 @@ public class PSUStatisticsStorageTest extends StatisticsStorageAbstractTest {
         checkOptimalPlanChosenForDifferentIndexes(ign, new String[]{"SMALL_B"}, SQL, NO_HINTS);
 
         // 3) partially remove necessarily for the query statistics and check that query plan will be changed
+        log.info("Dropping statistics by B column...");
+
         statisticsMgr(0).dropStatistics(new StatisticsTarget(SCHEMA, "SMALL", "B"));
 
         assertTrue(GridTestUtils.waitForCondition(
@@ -82,13 +86,15 @@ public class PSUStatisticsStorageTest extends StatisticsStorageAbstractTest {
         // 4) partially collect statistics for extra column and check that query plan still unable to get all statistics
         // it wants
 
-        collectStatistics(new StatisticsTarget(SCHEMA, "SMALL", "A"));
+        log.info("Recollecting statistics by A column...");
+
+        collectStatistics(StatisticsType.GLOBAL, new StatisticsTarget(SCHEMA, "SMALL", "A"));
 
         checkOptimalPlanChosenForDifferentIndexes(ign, new String[]{"SMALL_C"}, SQL, NO_HINTS);
 
         // 5) partially collect statistics for the necessarily column
         // and check that the query plan will restore to optimal
-        collectStatistics(new StatisticsTarget(SCHEMA, "SMALL", "B"));
+        collectStatistics(StatisticsType.GLOBAL, new StatisticsTarget(SCHEMA, "SMALL", "B"));
 
         checkOptimalPlanChosenForDifferentIndexes(ign, new String[]{"SMALL_B"}, SQL, NO_HINTS);
     }
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/PSUValueDistributionTableStatisticsUsageTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/PSUValueDistributionTableStatisticsUsageTest.java
index 3be7eb2..f163156 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/PSUValueDistributionTableStatisticsUsageTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/PSUValueDistributionTableStatisticsUsageTest.java
@@ -89,7 +89,7 @@ public class PSUValueDistributionTableStatisticsUsageTest extends StatisticsAbst
         }
         sql("INSERT INTO sized(id, small, big) VALUES(" + BIG_SIZE + ", null, null)");
 
-        collectStatistics("sized");
+        collectStatistics(StatisticsType.GLOBAL, "sized");
     }
 
     /**
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/SqlStatisticsCommandTests.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/SqlStatisticsCommandTests.java
index 4d75e01..5c41193 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/SqlStatisticsCommandTests.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/SqlStatisticsCommandTests.java
@@ -28,13 +28,13 @@ import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.testframework.GridTestUtils;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
+import org.junit.Ignore;
 import org.junit.Test;
 
 /**
  * Integration tests for statistics collection.
  */
+@Ignore("https://issues.apache.org/jira/browse/IGNITE-15455")
 public class SqlStatisticsCommandTests extends StatisticsAbstractTest {
     /** {@inheritDoc} */
     @Override protected void beforeTestsStarted() throws Exception {
@@ -158,7 +158,6 @@ public class SqlStatisticsCommandTests extends StatisticsAbstractTest {
      */
     @Test
     public void testDropStatistics() throws IgniteInterruptedCheckedException {
-        Logger.getLogger(StatisticsProcessor.class).setLevel(Level.TRACE);
         sql("ANALYZE PUBLIC.TEST, test2");
 
         testStatistics(SCHEMA, "TEST", false);
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsAbstractTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsAbstractTest.java
index f434eec..1de7857 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsAbstractTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsAbstractTest.java
@@ -25,6 +25,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -35,8 +36,12 @@ import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
 import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
 import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
@@ -78,7 +83,7 @@ public abstract class StatisticsAbstractTest extends GridCommonAbstractTest {
     static final StatisticsTarget SMALL_TARGET = new StatisticsTarget(SMALL_KEY, null);
 
     /** Async operation timeout for test */
-    static final int TIMEOUT = 3_000;
+    static final int TIMEOUT = 5_000;
 
     static {
         assertTrue(SMALL_SIZE < MED_SIZE && MED_SIZE < BIG_SIZE);
@@ -353,79 +358,70 @@ public abstract class StatisticsAbstractTest extends GridCommonAbstractTest {
     /**
      * Update statistics on specified objects in PUBLIC schema.
      *
+     * @param type Statistics type to collect statistics by.
      * @param tables Tables where to update statistics.
      */
-    protected void updateStatistics(@NotNull String... tables) {
+    protected void updateStatistics(StatisticsType type, @NotNull String... tables) {
         StatisticsTarget[] targets = Arrays.stream(tables).map(tbl -> new StatisticsTarget(SCHEMA, tbl.toUpperCase()))
             .toArray(StatisticsTarget[]::new);
 
-        updateStatistics(targets);
+        updateStatistics(type, targets);
     }
 
     /**
      * Collect statistics for specified objects in PUBLIC schema.
      *
+     * @param type Statistics type to collect statistics by.
      * @param tables Tables where to collect statistics.
      */
-    protected void collectStatistics(@NotNull String... tables) {
+    protected void collectStatistics(StatisticsType type, @NotNull String... tables) {
         StatisticsTarget[] targets = Arrays.stream(tables).map(tbl -> new StatisticsTarget(SCHEMA, tbl.toUpperCase()))
             .toArray(StatisticsTarget[]::new);
 
-        collectStatistics(targets);
+        collectStatistics(type, targets);
     }
 
     /**
      * Update statistics on specified objects.
      *
+     * @param type Statistics type to ubdate statistics by.
      * @param targets Targets to refresh statistics by.
      */
-    protected void updateStatistics(StatisticsTarget... targets) {
-        makeStatistics(false, targets);
+    protected void updateStatistics(StatisticsType type, StatisticsTarget... targets) {
+        makeStatistics(false, type, targets);
     }
 
     /**
      * Update statistics on specified objects.
      *
+     * @param type Statistics type to update statistics by.
      * @param targets Targets to collect statistics by.
      */
-    protected void collectStatistics(StatisticsTarget... targets) {
-        makeStatistics(true, targets);
+    protected void collectStatistics(StatisticsType type, StatisticsTarget... targets) {
+        makeStatistics(true, type, targets);
     }
 
     /**
      * Collect or refresh statistics.
      *
      * @param collect If {@code true} - collect new statistics, if {@code false} - update existing.
-     * @param targets
+     * @param type Statistics type to get statistics by.
+     * @param targets Targets to process statistics by.
      */
-    private void makeStatistics(boolean collect, StatisticsTarget... targets) {
+    private void makeStatistics(boolean collect, StatisticsType type, StatisticsTarget... targets) {
         try {
-            Map<StatisticsTarget, Long> expectedVersion = new HashMap<>();
+            Map<StatisticsTarget, Long> expectedVer = new HashMap<>();
             IgniteStatisticsManagerImpl statMgr = statisticsMgr(0);
-            for (StatisticsTarget target : targets) {
-                StatisticsObjectConfiguration currCfg = statMgr.statisticConfiguration().config(target.key());
 
-                Predicate<StatisticsColumnConfiguration> pred;
-                if (F.isEmpty(target.columns()))
-                    pred = c -> true;
-                else {
-                    Set<String> cols = Arrays.stream(target.columns()).collect(Collectors.toSet());
-
-                    pred = c -> cols.contains(c.name());
-                }
-
-                Long expVer = (currCfg == null) ? 1L : currCfg.columnsAll().values().stream().filter(pred)
-                    .mapToLong(StatisticsColumnConfiguration::version).min().orElse(0L) + 1;
-
-                expectedVersion.put(target, expVer);
-            }
+            for (StatisticsTarget target : targets)
+                expectedVer.put(target, minStatVer(statMgr, target) + 1);
 
             if (collect)
                 statisticsMgr(0).collectStatistics(buildDefaultConfigurations(targets));
             else
                 statisticsMgr(0).refreshStatistics(targets);
 
-            awaitStatistics(TIMEOUT, expectedVersion);
+            awaitStatistics(TIMEOUT, expectedVer, type);
         }
         catch (Exception ex) {
             throw new IgniteException(ex);
@@ -433,6 +429,32 @@ public abstract class StatisticsAbstractTest extends GridCommonAbstractTest {
     }
 
     /**
+     * Get minimum statistics version for the given target.
+     *
+     * param statMgr Statistics manager to get configuration from.
+     * @param target Statistics target to get the minimum version by.
+     * @return Minimum statistics configuraion version for the given target
+     * or 0 if there are no configuration for the given targer.
+     * @throws IgniteCheckedException In case of configuration read errors.
+     */
+    public Long minStatVer(IgniteStatisticsManagerImpl statMgr, StatisticsTarget target) throws IgniteCheckedException {
+        StatisticsObjectConfiguration currCfg = statMgr.statisticConfiguration().config(target.key());
+
+        Predicate<StatisticsColumnConfiguration> pred;
+
+        if (F.isEmpty(target.columns()))
+            pred = c -> true;
+        else {
+            Set<String> cols = Arrays.stream(target.columns()).collect(Collectors.toSet());
+
+            pred = c -> cols.contains(c.name());
+        }
+
+        return (currCfg == null) ? 0L : currCfg.columnsAll().values().stream().filter(pred)
+            .mapToLong(StatisticsColumnConfiguration::version).min().orElse(0L);
+    }
+
+    /**
      * Get object statistics.
      *
      * @param rowsCnt Rows count.
@@ -475,11 +497,15 @@ public abstract class StatisticsAbstractTest extends GridCommonAbstractTest {
 
     /** Check that all statistics collections related tasks is empty in specified node. */
     protected void checkStatisticTasksEmpty(IgniteEx ign) {
+        if (ign.localNode().isClient())
+            return;
+
         Map<StatisticsKey, LocalStatisticsGatheringContext> currColls = GridTestUtils.getFieldValue(
             statisticsMgr(ign), "statProc", "gatheringInProgress"
         );
 
-        assertTrue(currColls.toString(), currColls.isEmpty());
+        assertTrue("Has statistics collection tasks on node " + ign.localNode().id() + ":" + currColls.toString(),
+            currColls.isEmpty());
 
         IgniteThreadPoolExecutor mgmtPool = GridTestUtils.getFieldValue(statisticsMgr(ign), "mgmtPool");
 
@@ -495,12 +521,17 @@ public abstract class StatisticsAbstractTest extends GridCommonAbstractTest {
      *
      * @param timeout Timeout.
      * @param expectedVersions Expected versions for specified targets.
+     * @param type Type to get statistics by.
      * @throws Exception In case of errors.
      */
-    protected void awaitStatistics(long timeout, Map<StatisticsTarget, Long> expectedVersions) throws Exception {
+    protected void awaitStatistics(
+        long timeout,
+        Map<StatisticsTarget, Long> expectedVersions,
+        StatisticsType type
+    ) throws Exception {
         for (Ignite ign : G.allGrids()) {
-            if (!((IgniteEx)ign).context().clientNode())
-                awaitStatistics(timeout, expectedVersions, (IgniteEx)ign);
+            if (StatisticsType.GLOBAL == type || !((IgniteEx)ign).context().clientNode())
+                awaitStatistics(timeout, expectedVersions, (IgniteEx)ign, type);
         }
     }
 
@@ -510,10 +541,15 @@ public abstract class StatisticsAbstractTest extends GridCommonAbstractTest {
      * @param timeout Timeout.
      * @param expectedVersions Expected versions for specified targets.
      * @param ign Node to await.
+     * @param type Statistics type.
      * @throws Exception In case of errors.
      */
-    protected void awaitStatistics(long timeout, Map<StatisticsTarget, Long> expectedVersions, IgniteEx ign)
-        throws Exception {
+    protected void awaitStatistics(
+        long timeout,
+        Map<StatisticsTarget, Long> expectedVersions,
+        IgniteEx ign,
+        StatisticsType type
+    ) throws Exception {
         long t0 = U.currentTimeMillis();
 
         IgniteH2Indexing indexing = (IgniteH2Indexing)ign.context().query().getIndexing();
@@ -524,30 +560,31 @@ public abstract class StatisticsAbstractTest extends GridCommonAbstractTest {
 
                 for (Map.Entry<StatisticsTarget, Long> targetVersionEntry : expectedVersions.entrySet()) {
                     StatisticsTarget target = targetVersionEntry.getKey();
+
+                    // Statistics configuration manager could not get fresh enough configuration version till now so we
+                    // need to request global statistics again to force it's collection
+                    // TODO: remove me statisticsMgr(ign).getGlobalStatistics(target.key());
+
                     Long ver = targetVersionEntry.getValue();
 
-                    ObjectStatisticsImpl s = (ObjectStatisticsImpl)indexing.statsManager().getLocalStatistics(target.key());
-                    assertNotNull(s);
+                    ObjectStatisticsImpl s;
 
-                    long minVer = Long.MAX_VALUE;
+                    switch (type) {
+                        case LOCAL:
+                            s = (ObjectStatisticsImpl)indexing.statsManager().getLocalStatistics(target.key());
 
-                    Set<String> cols;
+                            break;
 
-                    if (F.isEmpty(target.columns()))
-                        cols = s.columnsStatistics().keySet();
-                    else
-                        cols = Arrays.stream(target.columns()).collect(Collectors.toSet());
+                        case GLOBAL:
+                            s = (ObjectStatisticsImpl)indexing.statsManager().getGlobalStatistics(target.key());
 
-                    for (String col : cols) {
-                        if (s.columnStatistics(col).version() < minVer)
-                            minVer = s.columnStatistics(col).version();
-                    }
+                            break;
 
-                    if (minVer == Long.MAX_VALUE)
-                        minVer = -1;
+                        default:
+                            throw new IllegalArgumentException("Unexpected statistics type " + type);
+                    }
 
-                    assertEquals(String.format("Expected minimum statistics version %d but found %d", ver, minVer),
-                        (long)ver, minVer);
+                    checkStatisticsVersion(ign.localNode().id(), s, target, ver);
                 }
 
                 return;
@@ -562,6 +599,32 @@ public abstract class StatisticsAbstractTest extends GridCommonAbstractTest {
     }
 
     /**
+     * Check specified statistics.
+     *
+     * @param stat Object statistics to check.
+     * @param target Statistics target to check only some columns.
+     * @param ver Mininum allowed version.
+     */
+    private void checkStatisticsVersion(UUID nodeId, ObjectStatisticsImpl stat, StatisticsTarget target, long ver) {
+        assertFalse("No column statistics found: " + stat + " on node " + nodeId,
+            stat == null || stat.columnsStatistics().isEmpty());
+
+        Set<String> cols;
+
+        if (F.isEmpty(target.columns()))
+            cols = stat.columnsStatistics().keySet();
+        else
+            cols = Arrays.stream(target.columns()).collect(Collectors.toSet());
+
+        for (String col : cols) {
+            ColumnStatistics colStat = stat.columnStatistics(col);
+
+            assertFalse(String.format("Expect minVer %d but column %s has %s version on node %s", ver, col,
+                (colStat == null) ? null : colStat.version(), nodeId), colStat == null || colStat.version() < ver);
+        }
+    }
+
+    /**
      * Get nodes StatisticsGatheringRequestCrawlerImpl.msgMgmtPool lock.
      * Put additional task into it and return lock to complete these task.
      *
@@ -640,9 +703,17 @@ public abstract class StatisticsAbstractTest extends GridCommonAbstractTest {
      * @return Local table statistics or {@code null} if there are no such statistics in specified node.
      */
     protected ObjectStatisticsImpl getStatsFromNode(int nodeIdx, String tblName, StatisticsType type) {
+        StatisticsKey key = new StatisticsKey(SCHEMA, tblName);
+
         switch (type) {
+            case GLOBAL:
+
+                return (ObjectStatisticsImpl)statisticsMgr(nodeIdx).getGlobalStatistics(key);
+
             case LOCAL:
-                return (ObjectStatisticsImpl)statisticsMgr(nodeIdx).getLocalStatistics(new StatisticsKey(SCHEMA, tblName));
+
+                return (ObjectStatisticsImpl)statisticsMgr(nodeIdx).getLocalStatistics(key);
+
             case PARTITION:
             default:
                 throw new UnsupportedOperationException();
@@ -719,4 +790,103 @@ public abstract class StatisticsAbstractTest extends GridCommonAbstractTest {
             }, 1000));
         }
     }
+
+    /**
+     * Add persistence region to default data region specified ignite configuration.
+     *
+     * @param cfg Base configuration to add persistence into.
+     * @param igniteInstanceName Instance name.
+     * @return Same configuration with persistence enabled.
+     */
+    protected IgniteConfiguration addPersistenceRegion(IgniteConfiguration cfg, String igniteInstanceName) {
+        cfg.setConsistentId(igniteInstanceName);
+
+        DataStorageConfiguration memCfg = new DataStorageConfiguration()
+            .setDefaultDataRegionConfiguration(new DataRegionConfiguration().setPersistenceEnabled(true));
+
+        cfg.setDataStorageConfiguration(memCfg);
+
+        return cfg;
+    }
+
+    /**
+     * Add in memory region to default data region specified ignite configuration.
+     *
+     * @param cfg Base configuration to add in memory into.
+     * @param igniteInstanceName Instance name.
+     * @return Same configuration with persistence enabled.
+     */
+    protected IgniteConfiguration addImMemoryRegion(IgniteConfiguration cfg, String igniteInstanceName) {
+        cfg.setConsistentId(igniteInstanceName);
+
+        DataStorageConfiguration memCfg = new DataStorageConfiguration()
+            .setDefaultDataRegionConfiguration(new DataRegionConfiguration().setPersistenceEnabled(false));
+
+        cfg.setDataStorageConfiguration(memCfg);
+
+        return cfg;
+    }
+
+    /**
+     * Check that all expected lines exist in actual. If not - fail.
+     *
+     * @param expected Expected lines, nulls mean any value.
+     * @param actual Actual lines.
+     */
+    protected void checkContains(List<List<Object>> expected, List<List<?>> actual) {
+        List<Object> notExisting = testContains(expected, actual);
+
+        if (notExisting != null)
+            fail("Unable to found " + notExisting + " in specified dataset");
+    }
+
+    /**
+     * Test that all expected lines exist in actual.
+     *
+     * @param expected Expected lines, nulls mean any value.
+     * @param actual Actual lines.
+     * @return First not existing line or {@code null} if all lines presented.
+     */
+    protected List<Object> testContains(List<List<Object>> expected, List<List<?>> actual) {
+        assertTrue(expected.size() <= actual.size());
+
+        assertTrue("Test may take too long with such datasets of actual = " + actual.size(), actual.size() <= 1024);
+
+        for (List<Object> exp : expected) {
+            boolean found = false;
+
+            for (List<?> act : actual) {
+                found = checkEqualWithNull(exp, act);
+
+                if (found)
+                    break;
+            }
+
+            if (!found)
+                return exp;
+        }
+
+        return null;
+    }
+
+    /**
+     * Compare expected line with actual one.
+     *
+     * @param expected Expected line, {@code null} value mean any value.
+     * @param actual Actual line.
+     * @return {@code true} if line are equal, {@code false} - otherwise.
+     */
+    protected boolean checkEqualWithNull(List<Object> expected, List<?> actual) {
+        assertEquals(expected.size(), actual.size());
+
+        for (int i = 0; i < expected.size(); i++) {
+            Object exp = expected.get(i);
+            Object act = actual.get(i);
+
+            if (exp != null && !exp.equals(act) && act != null)
+                return false;
+        }
+
+        return true;
+    }
 }
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsClearTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsClearTest.java
index eee869c..5b6eb15 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsClearTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsClearTest.java
@@ -48,7 +48,7 @@ public class StatisticsClearTest extends StatisticsRestartAbstractTest {
      */
     @Test
     public void testStatisticsClear() throws Exception {
-        updateStatistics(SMALL_TARGET);
+        updateStatistics(StatisticsType.GLOBAL, SMALL_TARGET);
 
         Assert.assertNotNull(statisticsMgr(0).getLocalStatistics(SMALL_KEY));
 
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsConfigurationTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsConfigurationTest.java
index c51f898..e397929 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsConfigurationTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsConfigurationTest.java
@@ -181,7 +181,7 @@ public class StatisticsConfigurationTest extends StatisticsAbstractTest {
 
         createSmallTable(null);
 
-        collectStatistics(SMALL_TARGET);
+        collectStatistics(StatisticsType.GLOBAL, SMALL_TARGET);
 
         waitForStats(SCHEMA, "SMALL", TIMEOUT, checkTotalRows, checkColumStats);
 
@@ -209,7 +209,7 @@ public class StatisticsConfigurationTest extends StatisticsAbstractTest {
 
         createSmallTable(null);
 
-        collectStatistics(SMALL_TARGET);
+        collectStatistics(StatisticsType.GLOBAL, SMALL_TARGET);
 
         waitForStats(SCHEMA, "SMALL", TIMEOUT, checkTotalRows, checkColumStats);
 
@@ -231,7 +231,9 @@ public class StatisticsConfigurationTest extends StatisticsAbstractTest {
 
         IgniteEx client = startClientGrid("cli");
 
-        collectStatistics(SMALL_TARGET);
+        awaitPartitionMapExchange();
+
+        collectStatistics(StatisticsType.GLOBAL, SMALL_TARGET);
 
         sql("delete from small");
 
@@ -269,7 +271,7 @@ public class StatisticsConfigurationTest extends StatisticsAbstractTest {
 
         createSmallTable(null);
 
-        collectStatistics(SMALL_TARGET);
+        collectStatistics(StatisticsType.GLOBAL, SMALL_TARGET);
 
         waitForStats(SCHEMA, "SMALL", TIMEOUT, checkTotalRows, checkColumStats);
 
@@ -330,7 +332,7 @@ public class StatisticsConfigurationTest extends StatisticsAbstractTest {
         createSmallTable(null);
 
         // 1. Create statistic for a table;
-        collectStatistics(SMALL_TARGET);
+        collectStatistics(StatisticsType.GLOBAL, SMALL_TARGET);
 
         // 2. Check statistics on all nodes of the cluster;
         waitForStats(SCHEMA, "SMALL", TIMEOUT, checkTotalRows, checkColumStats);
@@ -343,7 +345,7 @@ public class StatisticsConfigurationTest extends StatisticsAbstractTest {
             (stats) -> stats.forEach(s -> assertNull(s.columnStatistics("A"))));
 
         // 5. Re-create statistics;
-        collectStatistics(new StatisticsTarget(SCHEMA, "SMALL", "A"));
+        collectStatistics(StatisticsType.GLOBAL, new StatisticsTarget(SCHEMA, "SMALL", "A"));
 
         // 6. Check statistics on all nodes of the cluster;
         waitForStats(SCHEMA, "SMALL", TIMEOUT, checkTotalRows, checkColumStats);
@@ -369,7 +371,7 @@ public class StatisticsConfigurationTest extends StatisticsAbstractTest {
 
         createSmallTable(null);
 
-        collectStatistics(SMALL_TARGET);
+        collectStatistics(StatisticsType.GLOBAL, SMALL_TARGET);
 
         waitForStats(SCHEMA, "SMALL", TIMEOUT, checkTotalRows, checkColumStats);
 
@@ -406,7 +408,7 @@ public class StatisticsConfigurationTest extends StatisticsAbstractTest {
         createSmallTable(null);
         createSmallTable("_A");
 
-        collectStatistics(
+        collectStatistics(StatisticsType.GLOBAL,
             new StatisticsTarget(SCHEMA, "SMALL"),
             new StatisticsTarget(SCHEMA, "SMALL_A"));
 
@@ -440,7 +442,7 @@ public class StatisticsConfigurationTest extends StatisticsAbstractTest {
 
         createSmallTable(null);
 
-        collectStatistics(SMALL_TARGET);
+        collectStatistics(StatisticsType.GLOBAL, SMALL_TARGET);
 
         waitForStats(SCHEMA, "SMALL", TIMEOUT, checkTotalRows, checkColumStats);
 
@@ -475,7 +477,7 @@ public class StatisticsConfigurationTest extends StatisticsAbstractTest {
 
         createSmallTable(null);
 
-        collectStatistics(SMALL_TARGET);
+        collectStatistics(StatisticsType.GLOBAL, SMALL_TARGET);
 
         waitForStats(SCHEMA, "SMALL", TIMEOUT, checkTotalRows, checkColumStats);
 
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsGatheringTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsGatheringTest.java
index f60d907..d0ef1ff 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsGatheringTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsGatheringTest.java
@@ -18,8 +18,10 @@
 package org.apache.ignite.internal.processors.query.stat;
 
 import java.util.Collections;
+import java.util.Map;
 import java.util.Objects;
 
+import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterState;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
@@ -34,7 +36,7 @@ import static org.apache.ignite.internal.processors.query.stat.IgniteStatisticsH
 public class StatisticsGatheringTest extends StatisticsRestartAbstractTest {
     /** {@inheritDoc} */
     @Override public int nodes() {
-        return 1;
+        return 2;
     }
 
     /**
@@ -52,7 +54,10 @@ public class StatisticsGatheringTest extends StatisticsRestartAbstractTest {
 
         grid(0).cluster().state(ClusterState.INACTIVE);
 
-        GridTestUtils.assertThrows(null, () -> collectStatistics(t101), IgniteException.class,
+        GridTestUtils.assertThrows(null, () -> collectStatistics(StatisticsType.LOCAL, t101), IgniteException.class,
+            "Unable to perform collect statistics due to cluster state [state=INACTIVE]");
+
+        GridTestUtils.assertThrows(null, () -> collectStatistics(StatisticsType.GLOBAL, t101), IgniteException.class,
             "Unable to perform collect statistics due to cluster state [state=INACTIVE]");
     }
 
@@ -62,14 +67,63 @@ public class StatisticsGatheringTest extends StatisticsRestartAbstractTest {
      * 2) Get global statistics (with delay) and check its equality in all nodes.
      */
     @Test
-    public void testGathering() throws Exception {
-        ObjectStatisticsImpl stats[] = getStats("SMALL", StatisticsType.LOCAL);
+    public void testGathering() throws InterruptedException, IgniteCheckedException {
+        ObjectStatisticsImpl localStats[] = getStats("SMALL", StatisticsType.LOCAL);
+
+        testCond(Objects::nonNull, localStats);
+
+        testCond(stat -> stat.columnsStatistics().size() == localStats[0].columnsStatistics().size(), localStats);
+
+        testCond(this::checkStat, localStats);
+
+        ObjectStatisticsImpl globalStat = getStatsFromNode(0, "SMALL", StatisticsType.GLOBAL);
+
+        assertNotNull(globalStat);
+    }
+
+    /**
+     * Test that all node contains the same global statistics.
+     *
+     * @throws Exception In case of errors.
+     */
+    @Test
+    public void testGlobalIsEqual() throws Exception {
+        ObjectStatisticsImpl globalStats[] = getStats("SMALL", StatisticsType.GLOBAL);
+
+        testCond(Objects::nonNull, globalStats);
+        testCond(this::checkStat, globalStats);
 
-        testCond(Objects::nonNull, stats);
+        ObjectStatisticsImpl globalStat = globalStats[0];
 
-        testCond(stat -> stat.columnsStatistics().size() == stats[0].columnsStatistics().size(), stats);
+        assertTrue(globalStats.length > 1);
 
-        testCond(this::checkStat, stats);
+        for (int i = 1; i < globalStats.length; i++)
+            testEquaData(globalStat, globalStats[i]);
+    }
+
+    /**
+     * Check specified statistics contains equal data (all, except collection time and versions).
+     *
+     * @param expected Expected statistics.
+     * @param actual Actual statistics.
+     */
+    private static void testEquaData(ObjectStatisticsImpl expected, ObjectStatisticsImpl actual) {
+        assertEquals(expected.rowCount(), actual.rowCount());
+
+        assertEquals(expected.columnsStatistics().size(), actual.columnsStatistics().size());
+
+        for (Map.Entry<String, ColumnStatistics> expectedColStatEntry : expected.columnsStatistics().entrySet()) {
+            ColumnStatistics expColStat = expectedColStatEntry.getValue();
+            ColumnStatistics actColStat = actual.columnStatistics(expectedColStatEntry.getKey());
+
+            assertNotNull(actColStat);
+            assertEquals(expColStat.min(), actColStat.min());
+            assertEquals(expColStat.max(), actColStat.max());
+            assertEquals(expColStat.size(), actColStat.size());
+            assertEquals(expColStat.distinct(), actColStat.distinct());
+            assertEquals(expColStat.total(), actColStat.total());
+            assertEquals(expColStat.nulls(), actColStat.nulls());
+        }
     }
 
     /**
@@ -90,7 +144,7 @@ public class StatisticsGatheringTest extends StatisticsRestartAbstractTest {
             "Table doesn't exist [schema=PUBLIC, table=SMALL101wrong]"
         );
 
-        updateStatistics(t100, t101);
+        updateStatistics(StatisticsType.GLOBAL, t100, t101);
 
         ObjectStatisticsImpl[] stats100 = getStats(t100.obj(), StatisticsType.LOCAL);
         ObjectStatisticsImpl[] stats101 = getStats(t101.obj(), StatisticsType.LOCAL);
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsViewsPersistenceTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsGlobalViewInMemoryTest.java
similarity index 68%
copy from modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsViewsPersistenceTest.java
copy to modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsGlobalViewInMemoryTest.java
index d4e5365..f93d4ad 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsViewsPersistenceTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsGlobalViewInMemoryTest.java
@@ -17,25 +17,16 @@
 
 package org.apache.ignite.internal.processors.query.stat;
 
-import org.apache.ignite.configuration.DataRegionConfiguration;
-import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 
 /**
- * Test views with persistence.
+ * Test global view without persistence.
  */
-public class StatisticsViewsPersistenceTest extends StatisticsViewsTest {
+public class StatisticsGlobalViewInMemoryTest extends StatisticsGlobalViewTest {
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
 
-        cfg.setConsistentId(igniteInstanceName);
-
-        DataStorageConfiguration memCfg = new DataStorageConfiguration()
-            .setDefaultDataRegionConfiguration(new DataRegionConfiguration().setPersistenceEnabled(true));
-
-        cfg.setDataStorageConfiguration(memCfg);
-
-        return cfg;
+        return addImMemoryRegion(cfg, igniteInstanceName);
     }
 }
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsViewsPersistenceTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsGlobalViewPersistenceTest.java
similarity index 68%
copy from modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsViewsPersistenceTest.java
copy to modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsGlobalViewPersistenceTest.java
index d4e5365..07a0adb 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsViewsPersistenceTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsGlobalViewPersistenceTest.java
@@ -17,25 +17,16 @@
 
 package org.apache.ignite.internal.processors.query.stat;
 
-import org.apache.ignite.configuration.DataRegionConfiguration;
-import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 
 /**
- * Test views with persistence.
+ * Test global view with persistence.
  */
-public class StatisticsViewsPersistenceTest extends StatisticsViewsTest {
+public class StatisticsGlobalViewPersistenceTest extends StatisticsGlobalViewTest {
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
 
-        cfg.setConsistentId(igniteInstanceName);
-
-        DataStorageConfiguration memCfg = new DataStorageConfiguration()
-            .setDefaultDataRegionConfiguration(new DataRegionConfiguration().setPersistenceEnabled(true));
-
-        cfg.setDataStorageConfiguration(memCfg);
-
-        return cfg;
+        return addPersistenceRegion(cfg, igniteInstanceName);
     }
 }
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsGlobalViewTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsGlobalViewTest.java
new file mode 100644
index 0000000..7cf0c07
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsGlobalViewTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.stat;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+/**
+ * Tests for global statistics view.
+ */
+public abstract class StatisticsGlobalViewTest extends StatisticsAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTestsStarted();
+        cleanPersistenceDir();
+
+        startGridsMultiThreaded(2);
+        grid(0).cluster().state(ClusterState.ACTIVE);
+
+        grid(0).getOrCreateCache(DEFAULT_CACHE_NAME);
+
+        createSmallTable(null);
+        collectStatistics(StatisticsType.GLOBAL, SMALL_TARGET);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        super.afterTest();
+    }
+
+    /**
+     * Start additional node and try to collect statistics without adding it into baseline topology.
+     * Check that statistics awailable, but no local statistics exists and no additional gathering tasks
+     * stick in such node.
+     *
+     * @throws Exception In case of errors.
+     */
+    @Test
+    public void testStatisticsCollectionOutsideBaseline() throws Exception {
+        List<List<Object>> partLines = Arrays.asList(
+            Arrays.asList(SCHEMA, "TABLE", "SMALL", "A", (long)SMALL_SIZE, (long)SMALL_SIZE, 0L, 100L, 4, null, null),
+            Arrays.asList(SCHEMA, "TABLE", "SMALL", "B", (long)SMALL_SIZE, (long)SMALL_SIZE, 0L, 100L, 4, null, null),
+            Arrays.asList(SCHEMA, "TABLE", "SMALL", "C", (long)SMALL_SIZE, 10L, 0L, 100L, 4, null, null)
+        );
+
+        checkSqlResult("select * from SYS.STATISTICS_GLOBAL_DATA where NAME = 'SMALL'", null, act -> {
+            checkContains(partLines, act);
+
+            return true;
+        });
+
+        ignite(0).cluster().baselineAutoAdjustEnabled(false);
+
+        IgniteEx ign2 = startGrid(2);
+        awaitPartitionMapExchange();
+
+        requestGlobalStatistics(SMALL_KEY);
+
+        assertTrue(GridTestUtils.waitForCondition(() -> statisticsMgr(ign2).getGlobalStatistics(SMALL_KEY) != null,
+            TIMEOUT));
+
+        checkSqlResult("select * from SYS.STATISTICS_GLOBAL_DATA where NAME = 'SMALL'", null, act -> {
+            checkContains(partLines, act);
+
+            return true;
+        });
+
+        stopGrid(2);
+    }
+
+    /**
+     * Check that global stats equals on each node in cluster:
+     * 1) Check global statistics on each grid nodes.
+     * 2) Start new node and check global statistics.
+     * 3) Collect statistics configuration and check it on each node.
+     */
+    @Test
+    public void testGlobalStatEquals() throws Exception {
+        List<List<Object>> partLines = Arrays.asList(
+            Arrays.asList(SCHEMA, "TABLE", "SMALL", "A", (long)SMALL_SIZE, (long)SMALL_SIZE, 0L, 100L, 4, null, null),
+            Arrays.asList(SCHEMA, "TABLE", "SMALL", "B", (long)SMALL_SIZE, (long)SMALL_SIZE, 0L, 100L, 4, null, null),
+            Arrays.asList(SCHEMA, "TABLE", "SMALL", "C", (long)SMALL_SIZE, 10L, 0L, 100L, 4, null, null)
+        );
+
+        checkSqlResult("select * from SYS.STATISTICS_GLOBAL_DATA where NAME = 'SMALL'", null, act -> {
+            checkContains(partLines, act);
+
+            return true;
+        });
+
+        startGrid(2);
+        awaitPartitionMapExchange();
+
+        requestGlobalStatistics(SMALL_KEY);
+
+        long minVer = minStatVer(statisticsMgr(0), SMALL_TARGET);
+
+        checkSqlResult("select * from SYS.STATISTICS_GLOBAL_DATA where NAME = 'SMALL' and VERSION >= " + minVer,
+            null, list -> !list.isEmpty());
+
+        ignite(0).cluster().baselineAutoAdjustEnabled(false);
+        ignite(0).cluster().setBaselineTopology(ignite(1).cluster().topologyVersion());
+        awaitPartitionMapExchange(true, true, null);
+
+        for (Ignite ign : G.allGrids()) {
+
+            System.out.println("node = " + ign.cluster().localNode().id() +
+                " is Server = " + !((IgniteEx)ign).localNode().isClient() + " local stat = " +
+                statisticsMgr((IgniteEx)ign).getLocalStatistics(SMALL_KEY));
+        }
+
+        collectStatistics(StatisticsType.GLOBAL, SMALL_TARGET);
+
+        minVer++;
+
+        checkSqlResult("select * from SYS.STATISTICS_GLOBAL_DATA where NAME = 'SMALL' and VERSION >= " + minVer,
+            null, act -> {
+            checkContains(partLines, act);
+            return true;
+        });
+
+        stopGrid(2);
+    }
+
+    /**
+     * Request global statistics by specified key from each node.
+     *
+     * @param key Key to request global statistics by.
+     */
+    private void requestGlobalStatistics(StatisticsKey key) {
+        for (Ignite ign : G.allGrids()) {
+            IgniteStatisticsManagerImpl nodeMgr = statisticsMgr((IgniteEx)ign);
+
+            nodeMgr.getGlobalStatistics(key);
+        }
+    }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsObsolescenceTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsObsolescenceTest.java
index e4e726f..c9769be 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsObsolescenceTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsObsolescenceTest.java
@@ -90,7 +90,7 @@ public class StatisticsObsolescenceTest extends StatisticsAbstractTest {
         ignite.cluster().state(ClusterState.ACTIVE);
 
         createSmallTable(null);
-        sql("ANALYZE SMALL");
+        collectStatistics(StatisticsType.GLOBAL, "SMALL");
 
         ignite.cluster().state(ClusterState.INACTIVE);
 
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsRestartAbstractTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsRestartAbstractTest.java
index 250afe2..d954185 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsRestartAbstractTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsRestartAbstractTest.java
@@ -22,6 +22,7 @@ import org.apache.ignite.cluster.ClusterState;
 import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.processors.query.stat.config.StatisticsObjectConfiguration;
 import org.apache.ignite.internal.util.typedef.internal.U;
 
 /**
@@ -61,7 +62,7 @@ public class StatisticsRestartAbstractTest extends StatisticsAbstractTest {
 
         createStatisticTarget(null);
 
-        collectStatistics(SMALL_TARGET);
+        statisticsMgr(0).collectStatistics(new StatisticsObjectConfiguration(SMALL_KEY));
     }
 
     /**
@@ -80,7 +81,7 @@ public class StatisticsRestartAbstractTest extends StatisticsAbstractTest {
 
     /** {@inheritDoc} */
     @Override protected void beforeTest() throws IgniteCheckedException {
-        updateStatistics(SMALL_TARGET);
+        updateStatistics(StatisticsType.GLOBAL, SMALL_TARGET);
     }
 
     /**
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsStorageTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsStorageTest.java
index c2511ef..ef0ad44 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsStorageTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsStorageTest.java
@@ -27,7 +27,7 @@ import org.junit.Test;
 public abstract class StatisticsStorageTest extends StatisticsStorageAbstractTest {
     /** {@inheritDoc} */
     @Override public void beforeTest() throws Exception {
-        collectStatistics(SMALL_TARGET);
+        collectStatistics(StatisticsType.GLOBAL, SMALL_TARGET);
     }
 
     /**
@@ -53,11 +53,11 @@ public abstract class StatisticsStorageTest extends StatisticsStorageAbstractTes
      */
     @Test
     public void testRecollection() throws Exception {
-        updateStatistics(SMALL_TARGET);
+        updateStatistics(StatisticsType.GLOBAL, SMALL_TARGET);
 
         ObjectStatisticsImpl locStat = (ObjectStatisticsImpl)statisticsMgr(0).getLocalStatistics(SMALL_KEY);
 
-        updateStatistics(SMALL_TARGET);
+        updateStatistics(StatisticsType.GLOBAL, SMALL_TARGET);
 
         ObjectStatisticsImpl locStat2 = (ObjectStatisticsImpl)statisticsMgr(0).getLocalStatistics(SMALL_KEY);
 
@@ -85,11 +85,11 @@ public abstract class StatisticsStorageTest extends StatisticsStorageAbstractTes
      */
     @Test
     public void testPartialRecollection() throws Exception {
-        updateStatistics(new StatisticsTarget(SCHEMA, "SMALL", "B"));
+        updateStatistics(StatisticsType.GLOBAL, new StatisticsTarget(SCHEMA, "SMALL", "B"));
         ObjectStatisticsImpl locStat = (ObjectStatisticsImpl)statisticsMgr(0)
             .getLocalStatistics(new StatisticsKey(SCHEMA, "SMALL"));
 
-        updateStatistics(new StatisticsTarget(SCHEMA, "SMALL", "B"));
+        updateStatistics(StatisticsType.GLOBAL, new StatisticsTarget(SCHEMA, "SMALL", "B"));
         ObjectStatisticsImpl locStat2 = (ObjectStatisticsImpl)statisticsMgr(0)
             .getLocalStatistics(new StatisticsKey(SCHEMA, "SMALL"));
 
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsTypesAbstractTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsTypesAbstractTest.java
index 8cb82c9..d9969aa 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsTypesAbstractTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsTypesAbstractTest.java
@@ -83,7 +83,7 @@ public abstract class StatisticsTypesAbstractTest extends StatisticsAbstractTest
         for (int i = 0; i > -SMALL_SIZE / 2; i--)
             sql(insertNulls(i));
 
-        collectStatistics("dtypes");
+        collectStatistics(StatisticsType.GLOBAL, "dtypes");
     }
 
     /**
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsUtilsTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsUtilsTest.java
new file mode 100644
index 0000000..ac810cf
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsUtilsTest.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.stat;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.internal.processors.query.stat.config.StatisticsColumnConfiguration;
+import org.apache.ignite.internal.processors.query.stat.config.StatisticsObjectConfiguration;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+/**
+ * Tests for StatisticsUtils methods
+ */
+public class StatisticsUtilsTest extends GridCommonAbstractTest {
+    /** Test key. */
+    private static final StatisticsKey KEY = new StatisticsKey("SCHEMA", "TABLE");
+
+    /** Test COL1 statistics. */
+    private static final ColumnStatistics COL_1_STAT =
+        new ColumnStatistics(null, null, 0, 0, 0, 0, null, 1, 0);
+
+    /** Test COL1 statistics configuration. */
+    private static final StatisticsColumnConfiguration COL_1_CFG =
+        new StatisticsColumnConfiguration("COL1", null);
+
+    /** Test COL2 statistics. */
+    private static final ColumnStatistics COL_2_STAT =
+        new ColumnStatistics(null, null, 0, 0, 0, 0, null, 2, 0);
+
+    /** Test COL2 statistics configuration. */
+    private static final StatisticsColumnConfiguration COL_2_CFG =
+        new StatisticsColumnConfiguration("COL2", null).refresh();
+
+    /** Test COL3 statistics. */
+    private static final ColumnStatistics COL_3_STAT =
+        new ColumnStatistics(null, null, 0, 0, 0, 0, null, 3, 0);
+
+    /** Test COL3 statistics configuration. */
+    private static final StatisticsColumnConfiguration COL_3_CFG =
+        new StatisticsColumnConfiguration("COL3", null).refresh().refresh();
+
+    /** Test required version. */
+    private static final Map<String, Long> VERSIONS = F.asMap("COL1", 1L, "COL2", 2L);
+
+    /**
+     * Test compareVersions with stat equals to {@code null} returns {@code false}.
+     */
+    @Test
+    public void testCompareStatisticsNullStat() {
+        assertTrue(StatisticsUtils.compareVersions((ObjectStatisticsImpl)null, VERSIONS) < 0);
+    }
+
+    /**
+     * Test compareVersions with stat contains no columns returns {@code false}.
+     */
+    @Test
+    public void testCompareStatisticsVersionsEmptyStat() {
+        ObjectStatisticsImpl objStat = new ObjectStatisticsImpl(100, Collections.emptyMap());
+
+        assertTrue(StatisticsUtils.compareVersions(objStat, VERSIONS) < 0);
+    }
+
+    /**
+     * Test compareVersions with stat contains all columns returns {@code true}.
+     */
+    @Test
+    public void testCompareVersionsAllStat() {
+        Map<String, ColumnStatistics> colStats = F.asMap("COL1", COL_1_STAT, "COL2", COL_2_STAT);
+        ObjectStatisticsImpl objStat = new ObjectStatisticsImpl(100, colStats);
+
+        assertEquals(0, StatisticsUtils.compareVersions(objStat, VERSIONS));
+    }
+
+    /**
+     * Test compareVersions with stat with never columns returns {@code true}.
+     */
+    @Test
+    public void testCompareVersionsNeverStat() {
+        Map<String, ColumnStatistics> colStats = F.asMap("COL1", COL_1_STAT, "COL2", COL_3_STAT);
+        ObjectStatisticsImpl objStat = new ObjectStatisticsImpl(100, colStats);
+
+        assertTrue(StatisticsUtils.compareVersions(objStat, VERSIONS) > 0);
+    }
+
+    /**
+     * Test compareVersions with stat with extra columns returns {@code true}.
+     */
+    @Test
+    public void testCheckStatisticsVersionsExtraColumn() {
+        Map<String, ColumnStatistics> colStats =
+            F.asMap("COL1", COL_1_STAT, "COL2", COL_3_STAT, "COL3", COL_2_STAT);
+        ObjectStatisticsImpl objStat = new ObjectStatisticsImpl(100, colStats);
+
+        assertTrue(StatisticsUtils.compareVersions(objStat, VERSIONS) > 0);
+    }
+
+    /**
+     * Test compareVersions with stat contains no columns returns {@code false}.
+     */
+    @Test
+    public void testCompareObjVersionsEmptyStat() {
+        StatisticsObjectConfiguration objCfg = new StatisticsObjectConfiguration(KEY);
+
+        assertTrue(StatisticsUtils.compareVersions(objCfg, VERSIONS) < 0);
+    }
+
+    /**
+     * Test checkStatisticsConfigurationVersions with stat contains all columns returns {@code true}.
+     */
+    @Test
+    public void testCompareObjVersionsAllStat() {
+        List<StatisticsColumnConfiguration> colCfgs = Arrays.asList(COL_1_CFG, COL_2_CFG);
+        StatisticsObjectConfiguration objCfg = new StatisticsObjectConfiguration(KEY, colCfgs, (byte)50);
+
+        assertTrue(StatisticsUtils.compareVersions(objCfg, VERSIONS) == 0);
+    }
+
+    /**
+     * Test checkStatisticsConfigurationVersions with stat with never columns returns {@code true}.
+     */
+    @Test
+    public void testCompareObjVersionsNewerStat() {
+        List<StatisticsColumnConfiguration> colCfgs = Arrays.asList(COL_1_CFG, COL_2_CFG.refresh());
+        StatisticsObjectConfiguration objCfg = new StatisticsObjectConfiguration(KEY, colCfgs, (byte)50);
+
+        assertTrue(StatisticsUtils.compareVersions(objCfg, VERSIONS) > 0);
+    }
+
+    /**
+     * Test checkStatisticsConfigurationVersions with stat with extra columns returns {@code true}.
+     */
+    @Test
+    public void testCompareObjVersionsExtraColumn() {
+        List<StatisticsColumnConfiguration> colCfgs = Arrays.asList(COL_1_CFG, COL_2_CFG, COL_3_CFG);
+        StatisticsObjectConfiguration objCfg = new StatisticsObjectConfiguration(KEY, colCfgs, (byte)50);
+
+        assertEquals(0, StatisticsUtils.compareVersions(objCfg, VERSIONS));
+    }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsViewsInMemoryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsViewsInMemoryTest.java
index 9feb117..5788118 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsViewsInMemoryTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsViewsInMemoryTest.java
@@ -17,8 +17,6 @@
 
 package org.apache.ignite.internal.processors.query.stat;
 
-import org.apache.ignite.configuration.DataRegionConfiguration;
-import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 
 /**
@@ -29,13 +27,6 @@ public class StatisticsViewsInMemoryTest extends StatisticsViewsTest {
     @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
 
-        cfg.setConsistentId(igniteInstanceName);
-
-        DataStorageConfiguration memCfg = new DataStorageConfiguration()
-            .setDefaultDataRegionConfiguration(new DataRegionConfiguration().setPersistenceEnabled(false));
-
-        cfg.setDataStorageConfiguration(memCfg);
-
-        return cfg;
+        return addImMemoryRegion(cfg, igniteInstanceName);
     }
 }
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsViewsPersistenceTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsViewsPersistenceTest.java
index d4e5365..34aef75 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsViewsPersistenceTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsViewsPersistenceTest.java
@@ -17,8 +17,6 @@
 
 package org.apache.ignite.internal.processors.query.stat;
 
-import org.apache.ignite.configuration.DataRegionConfiguration;
-import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 
 /**
@@ -29,13 +27,6 @@ public class StatisticsViewsPersistenceTest extends StatisticsViewsTest {
     @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
 
-        cfg.setConsistentId(igniteInstanceName);
-
-        DataStorageConfiguration memCfg = new DataStorageConfiguration()
-            .setDefaultDataRegionConfiguration(new DataRegionConfiguration().setPersistenceEnabled(true));
-
-        cfg.setDataStorageConfiguration(memCfg);
-
-        return cfg;
+        return addPersistenceRegion(cfg, igniteInstanceName);
     }
 }
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsViewsTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsViewsTest.java
index c69bb51..c31c269 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsViewsTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsViewsTest.java
@@ -20,10 +20,13 @@ package org.apache.ignite.internal.processors.query.stat;
 import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 
-import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.internal.processors.query.stat.config.StatisticsColumnConfiguration;
+import org.apache.ignite.internal.processors.query.stat.config.StatisticsColumnOverrides;
+import org.apache.ignite.internal.processors.query.stat.config.StatisticsObjectConfiguration;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
@@ -44,21 +47,30 @@ public abstract class StatisticsViewsTest extends StatisticsAbstractTest {
         grid(0).getOrCreateCache(DEFAULT_CACHE_NAME);
 
         createSmallTable(null);
-        collectStatistics(SMALL_TARGET);
+        collectStatistics(StatisticsType.GLOBAL, SMALL_TARGET);
     }
 
+
     /**
      * Check small table configuration in statistics column configuration view.
      */
     @Test
     public void testConfigurationView() throws Exception {
+        collectStatistics(StatisticsType.GLOBAL, SMALL_TARGET);
+
+        ObjectStatisticsImpl smallStat = (ObjectStatisticsImpl)statisticsMgr(0).getLocalStatistics(SMALL_KEY);
+
+        long aVer = smallStat.columnStatistics("A").version();
+        long bVer = smallStat.columnStatistics("B").version();
+        long cVer = smallStat.columnStatistics("C").version();
+
         List<List<Object>> config = Arrays.asList(
-            Arrays.asList(SCHEMA, "TABLE", "SMALL", "A", (byte)15, null, null, null, null, 1L),
-            Arrays.asList(SCHEMA, "TABLE", "SMALL", "B", (byte)15, null, null, null, null, 1L),
-            Arrays.asList(SCHEMA, "TABLE", "SMALL", "C", (byte)15, null, null, null, null, 1L)
+            Arrays.asList(SCHEMA, "TABLE", "SMALL", "A", (byte)15, null, null, null, null, aVer),
+            Arrays.asList(SCHEMA, "TABLE", "SMALL", "B", (byte)15, null, null, null, null, bVer),
+            Arrays.asList(SCHEMA, "TABLE", "SMALL", "C", (byte)15, null, null, null, null, cVer)
         );
 
-        checkSqlResult("select * from SYS.STATISTICS_CONFIGURATION", null, config::equals);
+        checkSqlResult("select * from SYS.STATISTICS_CONFIGURATION where name = 'SMALL'", null, config::equals);
     }
 
     /**
@@ -80,7 +92,9 @@ public abstract class StatisticsViewsTest extends StatisticsAbstractTest {
         name = name.toUpperCase();
 
         // 2) Create statistics for new table.
-        grid(0).cache(DEFAULT_CACHE_NAME).query(new SqlFieldsQuery("ANALYZE " + name)).getAll();
+        // TODO: revert after IGNITE-15455
+        //grid(0).cache(DEFAULT_CACHE_NAME).query(new SqlFieldsQuery("ANALYZE " + name)).getAll();
+        collectStatistics(StatisticsType.GLOBAL, name);
 
         // 3) Check statistics configuration presence.
         List<List<Object>> config = new ArrayList<>();
@@ -91,7 +105,9 @@ public abstract class StatisticsViewsTest extends StatisticsAbstractTest {
         checkSqlResult("select * from SYS.STATISTICS_CONFIGURATION where NAME = '" + name + "'", null, config::equals);
 
         // 4) Drop statistics for some column of new table.
-        grid(0).cache(DEFAULT_CACHE_NAME).query(new SqlFieldsQuery("DROP STATISTICS " + name + "(A);")).getAll();
+        //grid(0).cache(DEFAULT_CACHE_NAME).query(new SqlFieldsQuery("DROP STATISTICS " + name + "(A);")).getAll();
+        statisticsMgr(0).statisticConfiguration().dropStatistics(
+            Collections.singletonList(new StatisticsTarget(SCHEMA, name, "A")), true);
 
         // 5) Check statistics configuration without dropped column.
         List<Object> removed = config.remove(0);
@@ -99,7 +115,10 @@ public abstract class StatisticsViewsTest extends StatisticsAbstractTest {
             act -> testContains(config, act) == null && testContains(Arrays.asList(removed), act) != null);
 
         // 6) Drop statistics for new table.
-        grid(0).cache(DEFAULT_CACHE_NAME).query(new SqlFieldsQuery("DROP STATISTICS " + name)).getAll();
+        // TODO: revert after IGNITE-15455
+        //grid(0).cache(DEFAULT_CACHE_NAME).query(new SqlFieldsQuery("DROP STATISTICS " + name)).getAll();
+        statisticsMgr(0).statisticConfiguration().dropStatistics(
+            Collections.singletonList(new StatisticsTarget(SCHEMA, name)), true);
 
         // 7) Check statistics configuration without it.
         checkSqlResult("select * from SYS.STATISTICS_CONFIGURATION where NAME = '" + name + "'", null, List::isEmpty);
@@ -121,67 +140,6 @@ public abstract class StatisticsViewsTest extends StatisticsAbstractTest {
     }
 
     /**
-     * Check that all expected lines exist in actual. If not - fail.
-     *
-     * @param expected Expected lines, nulls mean any value.
-     * @param actual Actual lines.
-     */
-    private void checkContains(List<List<Object>> expected, List<List<?>> actual) {
-        List<Object> notExisting = testContains(expected, actual);
-        if (notExisting != null)
-            fail("Unable to found " + notExisting + " in specified dataset");
-    }
-
-    /**
-     * Test that all expected lines exist in actual.
-     *
-     * @param expected Expected lines, nulls mean any value.
-     * @param actual Actual lines.
-     * @return First not existing line or {@code null} if all lines presented.
-     */
-    private List<Object> testContains(List<List<Object>> expected, List<List<?>> actual) {
-        assertTrue(expected.size() <= actual.size());
-
-        assertTrue("Test may take too long with such datasets of actual = " + actual.size(), actual.size() <= 1024);
-
-        for (List<Object> exp : expected) {
-            boolean found = false;
-
-            for (List<?> act : actual) {
-                found = checkEqualWithNull(exp, act);
-
-                if (found)
-                    break;
-            }
-
-            if (!found)
-                return exp;
-        }
-
-        return null;
-    }
-
-    /**
-     * Compare expected line with actual one.
-     *
-     * @param expected Expected line, {@code null} value mean any value.
-     * @param actual Actual line.
-     * @return {@code true} if line are equal, {@code false} - otherwise.
-     */
-    private boolean checkEqualWithNull(List<Object> expected, List<?> actual) {
-        assertEquals(expected.size(), actual.size());
-
-        for (int i = 0; i < expected.size(); i++) {
-            Object exp = expected.get(i);
-            Object act = actual.get(i);
-            if (exp != null && !exp.equals(act) && act != null)
-                return false;
-        }
-
-        return true;
-    }
-
-    /**
      * Check small table local data in statistics local data view.
      */
     @Test
@@ -219,14 +177,35 @@ public abstract class StatisticsViewsTest extends StatisticsAbstractTest {
         assertNotNull(smallStat);
         assertEquals(size, smallStat.rowCount());
 
-        sql("DROP STATISTICS SMALL");
+        // TODO: revert after IGNITE-15455
+        // sql("DROP STATISTICS SMALL");
 
-        checkSqlResult("select * from SYS.STATISTICS_LOCAL_DATA where NAME = 'SMALL'", null,
-            list -> list.isEmpty());
+        // sql("ANALYZE SMALL (A) WITH \"DISTINCT=5,NULLS=6,TOTAL=7,SIZE=8\"");
+        // sql("ANALYZE SMALL (B) WITH \"DISTINCT=6,NULLS=7,TOTAL=8\"");
+        // sql("ANALYZE SMALL (C)");
+        IgniteStatisticsConfigurationManager cfgMgr = statisticsMgr(0).statisticConfiguration();
+
+        cfgMgr.dropStatistics(Collections.singletonList(SMALL_TARGET), true);
+
+        StatisticsColumnConfiguration aCfg = new StatisticsColumnConfiguration("A",
+            new StatisticsColumnOverrides(6L, 5L, 7L, 8));
+        StatisticsObjectConfiguration smallACfg = new StatisticsObjectConfiguration(SMALL_KEY,
+            Collections.singleton(aCfg), StatisticsObjectConfiguration.DEFAULT_OBSOLESCENCE_MAX_PERCENT);
+
+        cfgMgr.updateStatistics(smallACfg);
 
-        sql("ANALYZE SMALL (A) WITH \"DISTINCT=5,NULLS=6,TOTAL=7,SIZE=8\"");
-        sql("ANALYZE SMALL (B) WITH \"DISTINCT=6,NULLS=7,TOTAL=8\"");
-        sql("ANALYZE SMALL (C)");
+        StatisticsColumnConfiguration bCfg = new StatisticsColumnConfiguration("B",
+            new StatisticsColumnOverrides(7L, 6L, 8L, null));
+        StatisticsObjectConfiguration smallBCfg = new StatisticsObjectConfiguration(SMALL_KEY,
+            Collections.singleton(bCfg), StatisticsObjectConfiguration.DEFAULT_OBSOLESCENCE_MAX_PERCENT);
+
+        cfgMgr.updateStatistics(smallBCfg);
+
+        StatisticsColumnConfiguration cCfg = new StatisticsColumnConfiguration("C", null);
+        StatisticsObjectConfiguration smallCCfg = new StatisticsObjectConfiguration(SMALL_KEY,
+            Collections.singleton(cCfg), StatisticsObjectConfiguration.DEFAULT_OBSOLESCENCE_MAX_PERCENT);
+
+        cfgMgr.updateStatistics(smallCCfg);
 
         checkSqlResult("select * from SYS.STATISTICS_LOCAL_DATA where NAME = 'SMALL' and COLUMN = 'C'", null,
             list -> !list.isEmpty());
@@ -243,10 +222,14 @@ public abstract class StatisticsViewsTest extends StatisticsAbstractTest {
         Timestamp tsB = new Timestamp(smallStat.columnStatistics("B").createdAt());
         Timestamp tsC = new Timestamp(smallStat.columnStatistics("C").createdAt());
 
+        long aVer = smallStat.columnStatistics("A").version();
+        long bVer = smallStat.columnStatistics("B").version();
+        long cVer = smallStat.columnStatistics("C").version();
+
         List<List<Object>> localData = Arrays.asList(
-            Arrays.asList(SCHEMA, "TABLE", "SMALL", "A", 8L, 5L, 6L, 7L, 8, 3L, tsA.toString()),
-            Arrays.asList(SCHEMA, "TABLE", "SMALL", "B", 8L, 6L, 7L, 8L, 4, 3L, tsB.toString()),
-            Arrays.asList(SCHEMA, "TABLE", "SMALL", "C", 8L, 10L, 0L, size, 4, 3L, tsC.toString())
+            Arrays.asList(SCHEMA, "TABLE", "SMALL", "A", 8L, 5L, 6L, 7L, 8, aVer, tsA.toString()),
+            Arrays.asList(SCHEMA, "TABLE", "SMALL", "B", 8L, 6L, 7L, 8L, 4, bVer, tsB.toString()),
+            Arrays.asList(SCHEMA, "TABLE", "SMALL", "C", 8L, 10L, 0L, size, 4, cVer, tsC.toString())
         );
 
         checkSqlResult("select * from SYS.STATISTICS_LOCAL_DATA where NAME = 'SMALL'", null,
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteStatisticsTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteStatisticsTestSuite.java
index ef338cd0..312c014 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteStatisticsTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteStatisticsTestSuite.java
@@ -23,6 +23,7 @@ import org.apache.ignite.internal.processors.query.stat.BusyExecutorTest;
 import org.apache.ignite.internal.processors.query.stat.ColumnStatisticsCollectorAggregationTest;
 import org.apache.ignite.internal.processors.query.stat.ColumnStatisticsCollectorTest;
 import org.apache.ignite.internal.processors.query.stat.HasherSelfTest;
+import org.apache.ignite.internal.processors.query.stat.IgniteStatisticsRepositoryStaticTest;
 import org.apache.ignite.internal.processors.query.stat.IgniteStatisticsRepositoryTest;
 import org.apache.ignite.internal.processors.query.stat.ManagerStatisticsTypesTest;
 import org.apache.ignite.internal.processors.query.stat.PSUBasicValueDistributionTableStatisticsUsageTest;
@@ -35,11 +36,14 @@ import org.apache.ignite.internal.processors.query.stat.SqlStatisticsCommandTest
 import org.apache.ignite.internal.processors.query.stat.StatisticsClearTest;
 import org.apache.ignite.internal.processors.query.stat.StatisticsConfigurationTest;
 import org.apache.ignite.internal.processors.query.stat.StatisticsGatheringTest;
+import org.apache.ignite.internal.processors.query.stat.StatisticsGlobalViewInMemoryTest;
+import org.apache.ignite.internal.processors.query.stat.StatisticsGlobalViewPersistenceTest;
 import org.apache.ignite.internal.processors.query.stat.StatisticsObsolescenceTest;
 import org.apache.ignite.internal.processors.query.stat.StatisticsStorageInMemoryTest;
 import org.apache.ignite.internal.processors.query.stat.StatisticsStoragePersistenceTest;
 import org.apache.ignite.internal.processors.query.stat.StatisticsStorageRestartTest;
 import org.apache.ignite.internal.processors.query.stat.StatisticsStorageUnitTest;
+import org.apache.ignite.internal.processors.query.stat.StatisticsUtilsTest;
 import org.apache.ignite.internal.processors.query.stat.StatisticsViewsInMemoryTest;
 import org.apache.ignite.internal.processors.query.stat.StatisticsViewsPersistenceTest;
 import org.apache.ignite.internal.processors.query.stat.hll.FullHLLTest;
@@ -63,6 +67,7 @@ import org.junit.runners.Suite;
     StatisticsStorageRestartTest.class,
     StatisticsGatheringTest.class,
     StatisticsClearTest.class,
+    IgniteStatisticsRepositoryStaticTest.class,
 
     // Table statistics usage.
     RowCountTableStatisticsUsageTest.class,
@@ -80,6 +85,7 @@ import org.junit.runners.Suite;
     StatisticsStorageInMemoryTest.class,
     StatisticsStoragePersistenceTest.class,
     StatisticsStorageUnitTest.class,
+    StatisticsUtilsTest.class,
 
     // Statistics SQL commands
     SqlParserAnalyzeSelfTest.class,
@@ -93,7 +99,9 @@ import org.junit.runners.Suite;
 
     // Views
     StatisticsViewsPersistenceTest.class,
-    StatisticsViewsInMemoryTest.class
+    StatisticsViewsInMemoryTest.class,
+    StatisticsGlobalViewPersistenceTest.class,
+    StatisticsGlobalViewInMemoryTest.class
 })
 public class IgniteStatisticsTestSuite {
 }