You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2021/09/14 12:52:21 UTC

[GitHub] [ignite] korlov42 commented on a change in pull request #9392: IGNITE-15289 Global statistics collection

korlov42 commented on a change in pull request #9392:
URL: https://github.com/apache/ignite/pull/9392#discussion_r708126867



##########
File path: modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteGlobalStatisticsManager.java
##########
@@ -0,0 +1,985 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.stat;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.internal.GridTopic;
+import org.apache.ignite.internal.events.DiscoveryCustomEvent;
+import org.apache.ignite.internal.managers.communication.GridIoManager;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.internal.managers.communication.GridMessageListener;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
+import org.apache.ignite.internal.managers.systemview.GridSystemViewManager;
+import org.apache.ignite.internal.managers.systemview.walker.StatisticsColumnGlobalDataViewWalker;
+import org.apache.ignite.internal.managers.systemview.walker.StatisticsColumnLocalDataViewWalker;
+import org.apache.ignite.internal.managers.systemview.walker.StatisticsColumnPartitionDataViewWalker;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
+import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
+import org.apache.ignite.internal.processors.cluster.GridClusterStateProcessor;
+import org.apache.ignite.internal.processors.query.stat.config.StatisticsColumnConfiguration;
+import org.apache.ignite.internal.processors.query.stat.config.StatisticsObjectConfiguration;
+import org.apache.ignite.internal.processors.query.stat.messages.StatisticsKeyMessage;
+import org.apache.ignite.internal.processors.query.stat.messages.StatisticsObjectData;
+import org.apache.ignite.internal.processors.query.stat.messages.StatisticsRequest;
+import org.apache.ignite.internal.processors.query.stat.messages.StatisticsResponse;
+import org.apache.ignite.internal.processors.query.stat.view.StatisticsColumnConfigurationView;
+import org.apache.ignite.internal.processors.query.stat.view.StatisticsColumnGlobalDataView;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.thread.IgniteThreadPoolExecutor;
+
+/**
+ * TODO: TBD
+ * Crawler to track and handle any requests, related to statistics.
+ * Crawler tracks requests and call back statistics manager to process failed requests.
+ */
+public class IgniteGlobalStatisticsManager implements GridMessageListener {
+    /** */
+    private static final String STAT_GLOBAL_VIEW_NAME = "statisticsGlobalData";
+
+    /** */
+    private static final String STAT_GLOBAL_VIEW_DESCRIPTION = "Global statistics.";
+
+    /** Statistics configuration manager. */
+    private final IgniteStatisticsConfigurationManager cfgMgr;
+
+    /** Statistics repository. */
+    private final IgniteStatisticsRepository repo;
+
+    /** Statistics gatherer. */
+    private final StatisticsGatherer gatherer;
+
+    /** Pool to process statistics requests. */
+    private final IgniteThreadPoolExecutor mgmtPool;
+
+    /** Discovery manager to get server node list to statistics master calculation. */
+    private final GridDiscoveryManager discoMgr;
+
+    /** Cluster state processor. */
+    private final GridClusterStateProcessor cluster;
+
+    /** Cache partition exchange manager. */
+    private final GridCachePartitionExchangeManager<?, ?> exchange;
+
+    /** Helper to transform or generate statistics related messages. */
+    private final IgniteStatisticsHelper helper;
+
+    /** Grid io manager to exchange global and local statistics. */
+    private final GridIoManager ioMgr;
+
+    /** Cache for global statistics. */
+    private final ConcurrentMap<StatisticsKey, CacheEntry<ObjectStatisticsImpl>> globalStatistics =
+        new ConcurrentHashMap<>();
+
+    /** Incoming requests which should be served after local statistics collection finish. */
+    private final ConcurrentMap<StatisticsKey, Collection<StatisticsAddressedRequest>> inLocalRequests =
+        new ConcurrentHashMap<>();
+
+    /** Incoming requests which should be served after global statistics collection finish. */
+    private final ConcurrentMap<StatisticsKey, Collection<StatisticsAddressedRequest>> inGloblaRequests =
+        new ConcurrentHashMap<>();
+
+    /** Outcoming global collection requests. */
+    private final ConcurrentMap<StatisticsKey, StatisticsGatheringContext> curCollections = new ConcurrentHashMap<>();
+
+    /** Outcoming global statistics requests to request id. */
+    private final ConcurrentMap<StatisticsKey, UUID> outGlobalStatisticsRequests = new ConcurrentHashMap<>();
+
+    /** Actual topology version for all pending requests. */
+    private volatile AffinityTopologyVersion topVer;
+
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** Exchange listener. */
+    private final PartitionsExchangeAware exchAwareLsnr = new PartitionsExchangeAware() {
+        @Override public void onDoneAfterTopologyUnlock(GridDhtPartitionsExchangeFuture fut) {
+
+            // Skip join/left client nodes.
+            if (fut.exchangeType() != GridDhtPartitionsExchangeFuture.ExchangeType.ALL ||
+                cluster.clusterState().lastState() != ClusterState.ACTIVE)
+                return;
+
+            DiscoveryEvent evt = fut.firstEvent();
+
+            // Skip create/destroy caches.
+            if (evt.type() == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) {
+                DiscoveryCustomMessage msg = ((DiscoveryCustomEvent)evt).customMessage();
+
+                if (msg instanceof DynamicCacheChangeBatch)
+                    return;
+
+                // Just clear all activities and update topology version.
+                if (log.isDebugEnabled())
+                    log.debug("Resetting all global statistics activities due to new topology " +
+                        fut.topologyVersion());
+
+                inLocalRequests.clear();
+                inGloblaRequests.clear();
+                curCollections.clear();
+                outGlobalStatisticsRequests.clear();
+
+                topVer = fut.topologyVersion();
+            }
+        }
+    };
+
+    /**
+     * Constructor.
+     *
+     * @param cfgMgr Statistics configuration manager.
+     */
+    public IgniteGlobalStatisticsManager(
+        IgniteStatisticsConfigurationManager cfgMgr,
+        GridSystemViewManager sysViewMgr,
+        IgniteStatisticsRepository repo,
+        StatisticsGatherer gatherer,
+        IgniteThreadPoolExecutor mgmtPool,
+        GridDiscoveryManager discoMgr,
+        GridClusterStateProcessor cluster,
+        GridCachePartitionExchangeManager<?, ?> exchange,
+        IgniteStatisticsHelper helper,
+        GridIoManager ioMgr,
+        Function<Class<?>, IgniteLogger> logSupplier
+    ) {
+        this.cfgMgr = cfgMgr;
+        this.repo = repo;
+        this.gatherer = gatherer;
+        this.mgmtPool = mgmtPool;
+        this.discoMgr = discoMgr;
+        this.cluster = cluster;
+        this.exchange = exchange;
+        this.helper = helper;
+        this.ioMgr = ioMgr;
+        log = logSupplier.apply(IgniteGlobalStatisticsManager.class);
+
+        ioMgr.addMessageListener(GridTopic.TOPIC_STATISTICS, this);
+
+        sysViewMgr.registerFiltrableView(STAT_GLOBAL_VIEW_NAME, STAT_GLOBAL_VIEW_DESCRIPTION,
+            new StatisticsColumnGlobalDataViewWalker(), this::columnGlobalStatisticsViewSupplier, Function.identity());
+    }
+
+    /**
+     * Statistics column global data view filterable supplier.
+     *
+     * @param filter Filter.
+     * @return Iterable with statistics column global data views.
+     */
+    private Iterable<StatisticsColumnGlobalDataView> columnGlobalStatisticsViewSupplier(Map<String, Object> filter) {
+        String type = (String)filter.get(StatisticsColumnPartitionDataViewWalker.TYPE_FILTER);
+        if (type != null && !StatisticsColumnConfigurationView.TABLE_TYPE.equalsIgnoreCase(type))
+            return Collections.emptyList();
+
+        String schema = (String)filter.get(StatisticsColumnLocalDataViewWalker.SCHEMA_FILTER);
+        String name = (String)filter.get(StatisticsColumnLocalDataViewWalker.NAME_FILTER);
+        String column = (String)filter.get(StatisticsColumnPartitionDataViewWalker.COLUMN_FILTER);
+
+        Map<StatisticsKey, ObjectStatisticsImpl> globalStatsMap;
+        if (!F.isEmpty(schema) && !F.isEmpty(name)) {
+            StatisticsKey key = new StatisticsKey(schema, name);
+
+            CacheEntry<ObjectStatisticsImpl> objLocStat = globalStatistics.get(key);
+
+            if (objLocStat == null || objLocStat.obj == null)
+                return Collections.emptyList();
+
+            globalStatsMap = Collections.singletonMap(key, objLocStat.object());
+        }
+        else
+            globalStatsMap = globalStatistics.entrySet().stream()
+                .filter(e -> e.getValue().object() != null && (F.isEmpty(schema) || schema.equals(e.getKey().schema())))
+                .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().object()));
+
+        List<StatisticsColumnGlobalDataView> res = new ArrayList<>();
+
+        for (Map.Entry<StatisticsKey, ObjectStatisticsImpl> localStatsEntry : globalStatsMap.entrySet()) {
+            StatisticsKey key = localStatsEntry.getKey();
+            ObjectStatisticsImpl stat = localStatsEntry.getValue();
+
+            if (column == null) {
+                for (Map.Entry<String, ColumnStatistics> colStat : localStatsEntry.getValue().columnsStatistics()
+                    .entrySet()) {
+                    StatisticsColumnGlobalDataView colStatView = new StatisticsColumnGlobalDataView(key,
+                        colStat.getKey(), stat);
+
+                    res.add(colStatView);
+                }
+            }
+            else {
+                ColumnStatistics colStat = localStatsEntry.getValue().columnStatistics(column);
+
+                if (colStat != null) {
+                    StatisticsColumnGlobalDataView colStatView = new StatisticsColumnGlobalDataView(key, column, stat);
+
+                    res.add(colStatView);
+                }
+            }
+        }
+
+        return res;
+    }
+
+    /** Start. */
+    public void start() {

Review comment:
       Currently it's unclear why `start` and `stop` operations are to made mutual exclusive. It would be great to describe this in javadoc.
   
   This component doesn't have internal state, thus sequential call to `start` will cause a registration of two similar listeners.

##########
File path: modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteGlobalStatisticsManager.java
##########
@@ -0,0 +1,985 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.stat;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.internal.GridTopic;
+import org.apache.ignite.internal.events.DiscoveryCustomEvent;
+import org.apache.ignite.internal.managers.communication.GridIoManager;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.internal.managers.communication.GridMessageListener;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
+import org.apache.ignite.internal.managers.systemview.GridSystemViewManager;
+import org.apache.ignite.internal.managers.systemview.walker.StatisticsColumnGlobalDataViewWalker;
+import org.apache.ignite.internal.managers.systemview.walker.StatisticsColumnLocalDataViewWalker;
+import org.apache.ignite.internal.managers.systemview.walker.StatisticsColumnPartitionDataViewWalker;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
+import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
+import org.apache.ignite.internal.processors.cluster.GridClusterStateProcessor;
+import org.apache.ignite.internal.processors.query.stat.config.StatisticsColumnConfiguration;
+import org.apache.ignite.internal.processors.query.stat.config.StatisticsObjectConfiguration;
+import org.apache.ignite.internal.processors.query.stat.messages.StatisticsKeyMessage;
+import org.apache.ignite.internal.processors.query.stat.messages.StatisticsObjectData;
+import org.apache.ignite.internal.processors.query.stat.messages.StatisticsRequest;
+import org.apache.ignite.internal.processors.query.stat.messages.StatisticsResponse;
+import org.apache.ignite.internal.processors.query.stat.view.StatisticsColumnConfigurationView;
+import org.apache.ignite.internal.processors.query.stat.view.StatisticsColumnGlobalDataView;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.thread.IgniteThreadPoolExecutor;
+
+/**
+ * TODO: TBD
+ * Crawler to track and handle any requests, related to statistics.
+ * Crawler tracks requests and call back statistics manager to process failed requests.
+ */
+public class IgniteGlobalStatisticsManager implements GridMessageListener {
+    /** */
+    private static final String STAT_GLOBAL_VIEW_NAME = "statisticsGlobalData";
+
+    /** */
+    private static final String STAT_GLOBAL_VIEW_DESCRIPTION = "Global statistics.";
+
+    /** Statistics configuration manager. */
+    private final IgniteStatisticsConfigurationManager cfgMgr;
+
+    /** Statistics repository. */
+    private final IgniteStatisticsRepository repo;
+
+    /** Statistics gatherer. */
+    private final StatisticsGatherer gatherer;
+
+    /** Pool to process statistics requests. */
+    private final IgniteThreadPoolExecutor mgmtPool;
+
+    /** Discovery manager to get server node list to statistics master calculation. */
+    private final GridDiscoveryManager discoMgr;
+
+    /** Cluster state processor. */
+    private final GridClusterStateProcessor cluster;
+
+    /** Cache partition exchange manager. */
+    private final GridCachePartitionExchangeManager<?, ?> exchange;
+
+    /** Helper to transform or generate statistics related messages. */
+    private final IgniteStatisticsHelper helper;
+
+    /** Grid io manager to exchange global and local statistics. */
+    private final GridIoManager ioMgr;
+
+    /** Cache for global statistics. */
+    private final ConcurrentMap<StatisticsKey, CacheEntry<ObjectStatisticsImpl>> globalStatistics =
+        new ConcurrentHashMap<>();
+
+    /** Incoming requests which should be served after local statistics collection finish. */
+    private final ConcurrentMap<StatisticsKey, Collection<StatisticsAddressedRequest>> inLocalRequests =
+        new ConcurrentHashMap<>();
+
+    /** Incoming requests which should be served after global statistics collection finish. */
+    private final ConcurrentMap<StatisticsKey, Collection<StatisticsAddressedRequest>> inGloblaRequests =
+        new ConcurrentHashMap<>();
+
+    /** Outcoming global collection requests. */
+    private final ConcurrentMap<StatisticsKey, StatisticsGatheringContext> curCollections = new ConcurrentHashMap<>();
+
+    /** Outcoming global statistics requests to request id. */
+    private final ConcurrentMap<StatisticsKey, UUID> outGlobalStatisticsRequests = new ConcurrentHashMap<>();
+
+    /** Actual topology version for all pending requests. */
+    private volatile AffinityTopologyVersion topVer;
+
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** Exchange listener. */
+    private final PartitionsExchangeAware exchAwareLsnr = new PartitionsExchangeAware() {
+        @Override public void onDoneAfterTopologyUnlock(GridDhtPartitionsExchangeFuture fut) {
+
+            // Skip join/left client nodes.
+            if (fut.exchangeType() != GridDhtPartitionsExchangeFuture.ExchangeType.ALL ||
+                cluster.clusterState().lastState() != ClusterState.ACTIVE)
+                return;
+
+            DiscoveryEvent evt = fut.firstEvent();
+
+            // Skip create/destroy caches.
+            if (evt.type() == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) {
+                DiscoveryCustomMessage msg = ((DiscoveryCustomEvent)evt).customMessage();
+
+                if (msg instanceof DynamicCacheChangeBatch)
+                    return;
+
+                // Just clear all activities and update topology version.
+                if (log.isDebugEnabled())
+                    log.debug("Resetting all global statistics activities due to new topology " +
+                        fut.topologyVersion());
+
+                inLocalRequests.clear();
+                inGloblaRequests.clear();
+                curCollections.clear();
+                outGlobalStatisticsRequests.clear();
+
+                topVer = fut.topologyVersion();
+            }
+        }
+    };
+
+    /**
+     * Constructor.
+     *
+     * @param cfgMgr Statistics configuration manager.
+     */
+    public IgniteGlobalStatisticsManager(
+        IgniteStatisticsConfigurationManager cfgMgr,
+        GridSystemViewManager sysViewMgr,
+        IgniteStatisticsRepository repo,
+        StatisticsGatherer gatherer,
+        IgniteThreadPoolExecutor mgmtPool,
+        GridDiscoveryManager discoMgr,
+        GridClusterStateProcessor cluster,
+        GridCachePartitionExchangeManager<?, ?> exchange,
+        IgniteStatisticsHelper helper,
+        GridIoManager ioMgr,
+        Function<Class<?>, IgniteLogger> logSupplier
+    ) {
+        this.cfgMgr = cfgMgr;
+        this.repo = repo;
+        this.gatherer = gatherer;
+        this.mgmtPool = mgmtPool;
+        this.discoMgr = discoMgr;
+        this.cluster = cluster;
+        this.exchange = exchange;
+        this.helper = helper;
+        this.ioMgr = ioMgr;
+        log = logSupplier.apply(IgniteGlobalStatisticsManager.class);
+
+        ioMgr.addMessageListener(GridTopic.TOPIC_STATISTICS, this);
+
+        sysViewMgr.registerFiltrableView(STAT_GLOBAL_VIEW_NAME, STAT_GLOBAL_VIEW_DESCRIPTION,
+            new StatisticsColumnGlobalDataViewWalker(), this::columnGlobalStatisticsViewSupplier, Function.identity());
+    }
+
+    /**
+     * Statistics column global data view filterable supplier.
+     *
+     * @param filter Filter.
+     * @return Iterable with statistics column global data views.
+     */
+    private Iterable<StatisticsColumnGlobalDataView> columnGlobalStatisticsViewSupplier(Map<String, Object> filter) {
+        String type = (String)filter.get(StatisticsColumnPartitionDataViewWalker.TYPE_FILTER);
+        if (type != null && !StatisticsColumnConfigurationView.TABLE_TYPE.equalsIgnoreCase(type))
+            return Collections.emptyList();
+
+        String schema = (String)filter.get(StatisticsColumnLocalDataViewWalker.SCHEMA_FILTER);
+        String name = (String)filter.get(StatisticsColumnLocalDataViewWalker.NAME_FILTER);
+        String column = (String)filter.get(StatisticsColumnPartitionDataViewWalker.COLUMN_FILTER);
+
+        Map<StatisticsKey, ObjectStatisticsImpl> globalStatsMap;
+        if (!F.isEmpty(schema) && !F.isEmpty(name)) {
+            StatisticsKey key = new StatisticsKey(schema, name);
+
+            CacheEntry<ObjectStatisticsImpl> objLocStat = globalStatistics.get(key);
+
+            if (objLocStat == null || objLocStat.obj == null)
+                return Collections.emptyList();
+
+            globalStatsMap = Collections.singletonMap(key, objLocStat.object());
+        }
+        else
+            globalStatsMap = globalStatistics.entrySet().stream()
+                .filter(e -> e.getValue().object() != null && (F.isEmpty(schema) || schema.equals(e.getKey().schema())))
+                .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().object()));
+
+        List<StatisticsColumnGlobalDataView> res = new ArrayList<>();
+
+        for (Map.Entry<StatisticsKey, ObjectStatisticsImpl> localStatsEntry : globalStatsMap.entrySet()) {
+            StatisticsKey key = localStatsEntry.getKey();
+            ObjectStatisticsImpl stat = localStatsEntry.getValue();
+
+            if (column == null) {
+                for (Map.Entry<String, ColumnStatistics> colStat : localStatsEntry.getValue().columnsStatistics()
+                    .entrySet()) {
+                    StatisticsColumnGlobalDataView colStatView = new StatisticsColumnGlobalDataView(key,
+                        colStat.getKey(), stat);
+
+                    res.add(colStatView);
+                }
+            }
+            else {
+                ColumnStatistics colStat = localStatsEntry.getValue().columnStatistics(column);
+
+                if (colStat != null) {
+                    StatisticsColumnGlobalDataView colStatView = new StatisticsColumnGlobalDataView(key, column, stat);
+
+                    res.add(colStatView);
+                }
+            }
+        }
+
+        return res;
+    }
+
+    /** Start. */
+    public void start() {
+        if (log.isDebugEnabled())
+            log.debug("Global statistics manager starting...");
+
+        globalStatistics.clear();
+        exchange.registerExchangeAwareComponent(exchAwareLsnr);
+
+        if (log.isDebugEnabled())
+            log.debug("Global statistics manager started.");
+    }
+
+    /** Stop. */
+    public void stop() {
+        if (log.isDebugEnabled())
+            log.debug("Global statistics manager stopping...");
+
+        topVer = null;
+
+        globalStatistics.clear();
+
+        inGloblaRequests.clear();
+        inLocalRequests.clear();
+        outGlobalStatisticsRequests.clear();
+        curCollections.clear();
+
+        exchange.unregisterExchangeAwareComponent(exchAwareLsnr);
+
+        if (log.isDebugEnabled())
+            log.debug("Global statistics manager stopped.");
+    }
+
+    /**
+     * Get global statistics for the given key. If there is no cached statistics, but

Review comment:
       but what?

##########
File path: modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteGlobalStatisticsManager.java
##########
@@ -0,0 +1,985 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.stat;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.internal.GridTopic;
+import org.apache.ignite.internal.events.DiscoveryCustomEvent;
+import org.apache.ignite.internal.managers.communication.GridIoManager;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.internal.managers.communication.GridMessageListener;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
+import org.apache.ignite.internal.managers.systemview.GridSystemViewManager;
+import org.apache.ignite.internal.managers.systemview.walker.StatisticsColumnGlobalDataViewWalker;
+import org.apache.ignite.internal.managers.systemview.walker.StatisticsColumnLocalDataViewWalker;
+import org.apache.ignite.internal.managers.systemview.walker.StatisticsColumnPartitionDataViewWalker;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
+import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
+import org.apache.ignite.internal.processors.cluster.GridClusterStateProcessor;
+import org.apache.ignite.internal.processors.query.stat.config.StatisticsColumnConfiguration;
+import org.apache.ignite.internal.processors.query.stat.config.StatisticsObjectConfiguration;
+import org.apache.ignite.internal.processors.query.stat.messages.StatisticsKeyMessage;
+import org.apache.ignite.internal.processors.query.stat.messages.StatisticsObjectData;
+import org.apache.ignite.internal.processors.query.stat.messages.StatisticsRequest;
+import org.apache.ignite.internal.processors.query.stat.messages.StatisticsResponse;
+import org.apache.ignite.internal.processors.query.stat.view.StatisticsColumnConfigurationView;
+import org.apache.ignite.internal.processors.query.stat.view.StatisticsColumnGlobalDataView;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.thread.IgniteThreadPoolExecutor;
+
+/**
+ * TODO: TBD
+ * Crawler to track and handle any requests, related to statistics.
+ * Crawler tracks requests and call back statistics manager to process failed requests.
+ */
+public class IgniteGlobalStatisticsManager implements GridMessageListener {
+    /** */
+    private static final String STAT_GLOBAL_VIEW_NAME = "statisticsGlobalData";
+
+    /** */
+    private static final String STAT_GLOBAL_VIEW_DESCRIPTION = "Global statistics.";
+
+    /** Statistics configuration manager. */
+    private final IgniteStatisticsConfigurationManager cfgMgr;
+
+    /** Statistics repository. */
+    private final IgniteStatisticsRepository repo;
+
+    /** Statistics gatherer. */
+    private final StatisticsGatherer gatherer;
+
+    /** Pool to process statistics requests. */
+    private final IgniteThreadPoolExecutor mgmtPool;
+
+    /** Discovery manager to get server node list to statistics master calculation. */
+    private final GridDiscoveryManager discoMgr;
+
+    /** Cluster state processor. */
+    private final GridClusterStateProcessor cluster;
+
+    /** Cache partition exchange manager. */
+    private final GridCachePartitionExchangeManager<?, ?> exchange;
+
+    /** Helper to transform or generate statistics related messages. */
+    private final IgniteStatisticsHelper helper;
+
+    /** Grid io manager to exchange global and local statistics. */
+    private final GridIoManager ioMgr;
+
+    /** Cache for global statistics. */
+    private final ConcurrentMap<StatisticsKey, CacheEntry<ObjectStatisticsImpl>> globalStatistics =
+        new ConcurrentHashMap<>();
+
+    /** Incoming requests which should be served after local statistics collection finish. */
+    private final ConcurrentMap<StatisticsKey, Collection<StatisticsAddressedRequest>> inLocalRequests =
+        new ConcurrentHashMap<>();
+
+    /** Incoming requests which should be served after global statistics collection finish. */
+    private final ConcurrentMap<StatisticsKey, Collection<StatisticsAddressedRequest>> inGloblaRequests =
+        new ConcurrentHashMap<>();
+
+    /** Outcoming global collection requests. */
+    private final ConcurrentMap<StatisticsKey, StatisticsGatheringContext> curCollections = new ConcurrentHashMap<>();
+
+    /** Outcoming global statistics requests to request id. */
+    private final ConcurrentMap<StatisticsKey, UUID> outGlobalStatisticsRequests = new ConcurrentHashMap<>();
+
+    /** Actual topology version for all pending requests. */
+    private volatile AffinityTopologyVersion topVer;
+
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** Exchange listener. */
+    private final PartitionsExchangeAware exchAwareLsnr = new PartitionsExchangeAware() {
+        @Override public void onDoneAfterTopologyUnlock(GridDhtPartitionsExchangeFuture fut) {
+
+            // Skip join/left client nodes.
+            if (fut.exchangeType() != GridDhtPartitionsExchangeFuture.ExchangeType.ALL ||
+                cluster.clusterState().lastState() != ClusterState.ACTIVE)
+                return;
+
+            DiscoveryEvent evt = fut.firstEvent();
+
+            // Skip create/destroy caches.
+            if (evt.type() == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) {
+                DiscoveryCustomMessage msg = ((DiscoveryCustomEvent)evt).customMessage();
+
+                if (msg instanceof DynamicCacheChangeBatch)
+                    return;
+
+                // Just clear all activities and update topology version.
+                if (log.isDebugEnabled())
+                    log.debug("Resetting all global statistics activities due to new topology " +
+                        fut.topologyVersion());
+
+                inLocalRequests.clear();
+                inGloblaRequests.clear();
+                curCollections.clear();
+                outGlobalStatisticsRequests.clear();
+
+                topVer = fut.topologyVersion();
+            }
+        }
+    };
+
+    /**
+     * Constructor.
+     *
+     * @param cfgMgr Statistics configuration manager.
+     */
+    public IgniteGlobalStatisticsManager(
+        IgniteStatisticsConfigurationManager cfgMgr,
+        GridSystemViewManager sysViewMgr,
+        IgniteStatisticsRepository repo,
+        StatisticsGatherer gatherer,
+        IgniteThreadPoolExecutor mgmtPool,
+        GridDiscoveryManager discoMgr,
+        GridClusterStateProcessor cluster,
+        GridCachePartitionExchangeManager<?, ?> exchange,
+        IgniteStatisticsHelper helper,
+        GridIoManager ioMgr,
+        Function<Class<?>, IgniteLogger> logSupplier
+    ) {
+        this.cfgMgr = cfgMgr;
+        this.repo = repo;
+        this.gatherer = gatherer;
+        this.mgmtPool = mgmtPool;
+        this.discoMgr = discoMgr;
+        this.cluster = cluster;
+        this.exchange = exchange;
+        this.helper = helper;
+        this.ioMgr = ioMgr;
+        log = logSupplier.apply(IgniteGlobalStatisticsManager.class);
+
+        ioMgr.addMessageListener(GridTopic.TOPIC_STATISTICS, this);
+
+        sysViewMgr.registerFiltrableView(STAT_GLOBAL_VIEW_NAME, STAT_GLOBAL_VIEW_DESCRIPTION,
+            new StatisticsColumnGlobalDataViewWalker(), this::columnGlobalStatisticsViewSupplier, Function.identity());
+    }
+
+    /**
+     * Statistics column global data view filterable supplier.
+     *
+     * @param filter Filter.
+     * @return Iterable with statistics column global data views.
+     */
+    private Iterable<StatisticsColumnGlobalDataView> columnGlobalStatisticsViewSupplier(Map<String, Object> filter) {
+        String type = (String)filter.get(StatisticsColumnPartitionDataViewWalker.TYPE_FILTER);
+        if (type != null && !StatisticsColumnConfigurationView.TABLE_TYPE.equalsIgnoreCase(type))
+            return Collections.emptyList();
+
+        String schema = (String)filter.get(StatisticsColumnLocalDataViewWalker.SCHEMA_FILTER);
+        String name = (String)filter.get(StatisticsColumnLocalDataViewWalker.NAME_FILTER);
+        String column = (String)filter.get(StatisticsColumnPartitionDataViewWalker.COLUMN_FILTER);
+
+        Map<StatisticsKey, ObjectStatisticsImpl> globalStatsMap;
+        if (!F.isEmpty(schema) && !F.isEmpty(name)) {
+            StatisticsKey key = new StatisticsKey(schema, name);
+
+            CacheEntry<ObjectStatisticsImpl> objLocStat = globalStatistics.get(key);
+
+            if (objLocStat == null || objLocStat.obj == null)
+                return Collections.emptyList();
+
+            globalStatsMap = Collections.singletonMap(key, objLocStat.object());
+        }
+        else
+            globalStatsMap = globalStatistics.entrySet().stream()
+                .filter(e -> e.getValue().object() != null && (F.isEmpty(schema) || schema.equals(e.getKey().schema())))
+                .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().object()));
+
+        List<StatisticsColumnGlobalDataView> res = new ArrayList<>();
+
+        for (Map.Entry<StatisticsKey, ObjectStatisticsImpl> localStatsEntry : globalStatsMap.entrySet()) {
+            StatisticsKey key = localStatsEntry.getKey();
+            ObjectStatisticsImpl stat = localStatsEntry.getValue();
+
+            if (column == null) {
+                for (Map.Entry<String, ColumnStatistics> colStat : localStatsEntry.getValue().columnsStatistics()
+                    .entrySet()) {
+                    StatisticsColumnGlobalDataView colStatView = new StatisticsColumnGlobalDataView(key,
+                        colStat.getKey(), stat);
+
+                    res.add(colStatView);
+                }
+            }
+            else {
+                ColumnStatistics colStat = localStatsEntry.getValue().columnStatistics(column);
+
+                if (colStat != null) {
+                    StatisticsColumnGlobalDataView colStatView = new StatisticsColumnGlobalDataView(key, column, stat);
+
+                    res.add(colStatView);
+                }
+            }
+        }
+
+        return res;
+    }
+
+    /** Start. */
+    public void start() {
+        if (log.isDebugEnabled())
+            log.debug("Global statistics manager starting...");
+
+        globalStatistics.clear();
+        exchange.registerExchangeAwareComponent(exchAwareLsnr);
+
+        if (log.isDebugEnabled())
+            log.debug("Global statistics manager started.");
+    }
+
+    /** Stop. */
+    public void stop() {
+        if (log.isDebugEnabled())
+            log.debug("Global statistics manager stopping...");
+
+        topVer = null;
+
+        globalStatistics.clear();
+
+        inGloblaRequests.clear();
+        inLocalRequests.clear();
+        outGlobalStatisticsRequests.clear();
+        curCollections.clear();
+
+        exchange.unregisterExchangeAwareComponent(exchAwareLsnr);
+
+        if (log.isDebugEnabled())
+            log.debug("Global statistics manager stopped.");
+    }
+
+    /**
+     * Get global statistics for the given key. If there is no cached statistics, but
+     *
+     * @param key Statistics key.
+     * @return Global object statistics or {@code null} if there is no global statistics available.
+     */
+    public ObjectStatisticsImpl getGlobalStatistics(StatisticsKey key) {
+        CacheEntry<ObjectStatisticsImpl> res = globalStatistics.computeIfAbsent(key, k -> {
+            if (log.isDebugEnabled())
+                log.debug("Scheduling global statistics collection by key " + key);
+
+            mgmtPool.submit(() -> collectGlobalStatistics(key));
+
+            return new CacheEntry<>(null);
+        });
+
+        return res.object();
+    }
+
+    /**
+     * Either send local or global statistics request to get global statistics.
+     *
+     * @param key Statistics key to get global statistics by.
+     */
+    private void collectGlobalStatistics(StatisticsKey key) {
+        try {
+            StatisticsObjectConfiguration statCfg = cfgMgr.config(key);
+
+            if (statCfg != null && !statCfg.columns().isEmpty()) {
+                UUID statMaster = getStatisticsMasterNode(key);
+
+                if (discoMgr.localNode().id().equals(statMaster))
+                    gatherGlobalStatistics(statCfg);
+                else {
+                    StatisticsKeyMessage keyMsg = new StatisticsKeyMessage(key.schema(), key.obj(),
+                        Collections.emptyList());
+
+                    Map<String, Long> versions = statCfg.columns().entrySet().stream()
+                        .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().version()));
+
+                    StatisticsRequest globalReq = new StatisticsRequest(UUID.randomUUID(), keyMsg,
+                        StatisticsType.GLOBAL, null, versions);
+
+                    outGlobalStatisticsRequests.put(key, globalReq.reqId());
+
+                    if (log.isDebugEnabled())
+                        log.debug("Send global statistics request by configuration " + statCfg);
+
+                    send(statMaster, globalReq);
+
+                }
+            }
+            else {
+                if (log.isDebugEnabled())
+                    log.debug("Unable to start global statistics collection due to lack of configuration by key "
+                        + key);
+            }
+
+        }
+        catch (IgniteCheckedException e) {
+            if (log.isInfoEnabled())
+                log.info("Unable to get statistics configuration due to " + e.getMessage());
+        }
+    }
+
+    /**
+     * Collect global statistics on master node.
+     *
+     * @param statCfg Statistics config to gather global statistics by.
+     */
+    private void gatherGlobalStatistics(StatisticsObjectConfiguration statCfg) throws IgniteCheckedException {
+        if (log.isDebugEnabled())
+            log.debug("Start global statistics collection by configuration " + statCfg);
+
+        StatisticsTarget target = new StatisticsTarget(statCfg.key());
+
+        List<StatisticsAddressedRequest> locRequests = helper.generateGatheringRequests(target, statCfg);
+        UUID reqId = locRequests.get(0).req().reqId();
+
+        StatisticsGatheringContext gatCtx = new StatisticsGatheringContext(locRequests.size(), reqId);
+
+        curCollections.put(statCfg.key(), gatCtx);
+
+        for (StatisticsAddressedRequest addReq : locRequests) {
+            if (log.isDebugEnabled())
+                log.debug("Sending local request " + addReq.req().reqId() + " to node " + addReq.nodeId());
+
+            send(addReq.nodeId(), addReq.req());
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
+        mgmtPool.submit(() -> {
+            try {
+                if (msg instanceof StatisticsRequest) {
+                    StatisticsRequest req = (StatisticsRequest)msg;
+                    switch (req.type()) {
+                        case LOCAL:
+                            processLocalRequest(nodeId, req);
+
+                            break;
+
+                        case GLOBAL:
+                            processGlobalRequest(nodeId, req);
+
+                            break;
+
+                        default:
+                            log.warning("Unexpected type " + req.type() + " in statistics request message " + req);
+                    }
+                }
+                else if (msg instanceof StatisticsResponse) {
+                    StatisticsResponse resp = (StatisticsResponse)msg;
+
+                    switch (resp.data().type()) {
+                        case LOCAL:
+                            processLocalResponse(nodeId, resp);
+
+                            break;
+
+                        case GLOBAL:
+                            processGlobalResponse(nodeId, resp);
+
+                            break;
+
+                        default:
+                            log.warning("Unexpected type " + resp.data().type() +
+                                " in statistics reposonse message " + resp);
+                    }
+
+                }
+                else
+                    log.warning("Unknown msg " + msg + " in statistics topic " + GridTopic.TOPIC_STATISTICS +
+                        " from node " + nodeId);
+            }
+            catch (Throwable e) {
+                if (log.isInfoEnabled())

Review comment:
       why INFO level is used here?

##########
File path: modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteGlobalStatisticsManager.java
##########
@@ -0,0 +1,985 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.stat;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.internal.GridTopic;
+import org.apache.ignite.internal.events.DiscoveryCustomEvent;
+import org.apache.ignite.internal.managers.communication.GridIoManager;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.internal.managers.communication.GridMessageListener;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
+import org.apache.ignite.internal.managers.systemview.GridSystemViewManager;
+import org.apache.ignite.internal.managers.systemview.walker.StatisticsColumnGlobalDataViewWalker;
+import org.apache.ignite.internal.managers.systemview.walker.StatisticsColumnLocalDataViewWalker;
+import org.apache.ignite.internal.managers.systemview.walker.StatisticsColumnPartitionDataViewWalker;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
+import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
+import org.apache.ignite.internal.processors.cluster.GridClusterStateProcessor;
+import org.apache.ignite.internal.processors.query.stat.config.StatisticsColumnConfiguration;
+import org.apache.ignite.internal.processors.query.stat.config.StatisticsObjectConfiguration;
+import org.apache.ignite.internal.processors.query.stat.messages.StatisticsKeyMessage;
+import org.apache.ignite.internal.processors.query.stat.messages.StatisticsObjectData;
+import org.apache.ignite.internal.processors.query.stat.messages.StatisticsRequest;
+import org.apache.ignite.internal.processors.query.stat.messages.StatisticsResponse;
+import org.apache.ignite.internal.processors.query.stat.view.StatisticsColumnConfigurationView;
+import org.apache.ignite.internal.processors.query.stat.view.StatisticsColumnGlobalDataView;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.thread.IgniteThreadPoolExecutor;
+
+/**
+ * TODO: TBD
+ * Crawler to track and handle any requests, related to statistics.
+ * Crawler tracks requests and call back statistics manager to process failed requests.
+ */
+public class IgniteGlobalStatisticsManager implements GridMessageListener {
+    /** */
+    private static final String STAT_GLOBAL_VIEW_NAME = "statisticsGlobalData";
+
+    /** */
+    private static final String STAT_GLOBAL_VIEW_DESCRIPTION = "Global statistics.";
+
+    /** Statistics configuration manager. */
+    private final IgniteStatisticsConfigurationManager cfgMgr;
+
+    /** Statistics repository. */
+    private final IgniteStatisticsRepository repo;
+
+    /** Statistics gatherer. */
+    private final StatisticsGatherer gatherer;
+
+    /** Pool to process statistics requests. */
+    private final IgniteThreadPoolExecutor mgmtPool;
+
+    /** Discovery manager to get server node list to statistics master calculation. */
+    private final GridDiscoveryManager discoMgr;
+
+    /** Cluster state processor. */
+    private final GridClusterStateProcessor cluster;
+
+    /** Cache partition exchange manager. */
+    private final GridCachePartitionExchangeManager<?, ?> exchange;
+
+    /** Helper to transform or generate statistics related messages. */
+    private final IgniteStatisticsHelper helper;
+
+    /** Grid io manager to exchange global and local statistics. */
+    private final GridIoManager ioMgr;
+
+    /** Cache for global statistics. */
+    private final ConcurrentMap<StatisticsKey, CacheEntry<ObjectStatisticsImpl>> globalStatistics =
+        new ConcurrentHashMap<>();
+
+    /** Incoming requests which should be served after local statistics collection finish. */
+    private final ConcurrentMap<StatisticsKey, Collection<StatisticsAddressedRequest>> inLocalRequests =
+        new ConcurrentHashMap<>();
+
+    /** Incoming requests which should be served after global statistics collection finish. */
+    private final ConcurrentMap<StatisticsKey, Collection<StatisticsAddressedRequest>> inGloblaRequests =
+        new ConcurrentHashMap<>();
+
+    /** Outcoming global collection requests. */
+    private final ConcurrentMap<StatisticsKey, StatisticsGatheringContext> curCollections = new ConcurrentHashMap<>();
+
+    /** Outcoming global statistics requests to request id. */
+    private final ConcurrentMap<StatisticsKey, UUID> outGlobalStatisticsRequests = new ConcurrentHashMap<>();
+
+    /** Actual topology version for all pending requests. */
+    private volatile AffinityTopologyVersion topVer;
+
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** Exchange listener. */
+    private final PartitionsExchangeAware exchAwareLsnr = new PartitionsExchangeAware() {
+        @Override public void onDoneAfterTopologyUnlock(GridDhtPartitionsExchangeFuture fut) {
+
+            // Skip join/left client nodes.
+            if (fut.exchangeType() != GridDhtPartitionsExchangeFuture.ExchangeType.ALL ||
+                cluster.clusterState().lastState() != ClusterState.ACTIVE)
+                return;
+
+            DiscoveryEvent evt = fut.firstEvent();
+
+            // Skip create/destroy caches.
+            if (evt.type() == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) {
+                DiscoveryCustomMessage msg = ((DiscoveryCustomEvent)evt).customMessage();
+
+                if (msg instanceof DynamicCacheChangeBatch)
+                    return;
+
+                // Just clear all activities and update topology version.
+                if (log.isDebugEnabled())
+                    log.debug("Resetting all global statistics activities due to new topology " +
+                        fut.topologyVersion());
+
+                inLocalRequests.clear();
+                inGloblaRequests.clear();
+                curCollections.clear();
+                outGlobalStatisticsRequests.clear();
+
+                topVer = fut.topologyVersion();
+            }
+        }
+    };
+
+    /**
+     * Constructor.
+     *
+     * @param cfgMgr Statistics configuration manager.
+     */
+    public IgniteGlobalStatisticsManager(
+        IgniteStatisticsConfigurationManager cfgMgr,
+        GridSystemViewManager sysViewMgr,
+        IgniteStatisticsRepository repo,
+        StatisticsGatherer gatherer,
+        IgniteThreadPoolExecutor mgmtPool,
+        GridDiscoveryManager discoMgr,
+        GridClusterStateProcessor cluster,
+        GridCachePartitionExchangeManager<?, ?> exchange,
+        IgniteStatisticsHelper helper,
+        GridIoManager ioMgr,
+        Function<Class<?>, IgniteLogger> logSupplier
+    ) {
+        this.cfgMgr = cfgMgr;
+        this.repo = repo;
+        this.gatherer = gatherer;
+        this.mgmtPool = mgmtPool;
+        this.discoMgr = discoMgr;
+        this.cluster = cluster;
+        this.exchange = exchange;
+        this.helper = helper;
+        this.ioMgr = ioMgr;
+        log = logSupplier.apply(IgniteGlobalStatisticsManager.class);
+
+        ioMgr.addMessageListener(GridTopic.TOPIC_STATISTICS, this);
+
+        sysViewMgr.registerFiltrableView(STAT_GLOBAL_VIEW_NAME, STAT_GLOBAL_VIEW_DESCRIPTION,
+            new StatisticsColumnGlobalDataViewWalker(), this::columnGlobalStatisticsViewSupplier, Function.identity());
+    }
+
+    /**
+     * Statistics column global data view filterable supplier.
+     *
+     * @param filter Filter.
+     * @return Iterable with statistics column global data views.
+     */
+    private Iterable<StatisticsColumnGlobalDataView> columnGlobalStatisticsViewSupplier(Map<String, Object> filter) {
+        String type = (String)filter.get(StatisticsColumnPartitionDataViewWalker.TYPE_FILTER);
+        if (type != null && !StatisticsColumnConfigurationView.TABLE_TYPE.equalsIgnoreCase(type))
+            return Collections.emptyList();
+
+        String schema = (String)filter.get(StatisticsColumnLocalDataViewWalker.SCHEMA_FILTER);
+        String name = (String)filter.get(StatisticsColumnLocalDataViewWalker.NAME_FILTER);
+        String column = (String)filter.get(StatisticsColumnPartitionDataViewWalker.COLUMN_FILTER);
+
+        Map<StatisticsKey, ObjectStatisticsImpl> globalStatsMap;
+        if (!F.isEmpty(schema) && !F.isEmpty(name)) {
+            StatisticsKey key = new StatisticsKey(schema, name);
+
+            CacheEntry<ObjectStatisticsImpl> objLocStat = globalStatistics.get(key);
+
+            if (objLocStat == null || objLocStat.obj == null)
+                return Collections.emptyList();
+
+            globalStatsMap = Collections.singletonMap(key, objLocStat.object());
+        }
+        else
+            globalStatsMap = globalStatistics.entrySet().stream()
+                .filter(e -> e.getValue().object() != null && (F.isEmpty(schema) || schema.equals(e.getKey().schema())))
+                .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().object()));
+
+        List<StatisticsColumnGlobalDataView> res = new ArrayList<>();
+
+        for (Map.Entry<StatisticsKey, ObjectStatisticsImpl> localStatsEntry : globalStatsMap.entrySet()) {
+            StatisticsKey key = localStatsEntry.getKey();
+            ObjectStatisticsImpl stat = localStatsEntry.getValue();
+
+            if (column == null) {
+                for (Map.Entry<String, ColumnStatistics> colStat : localStatsEntry.getValue().columnsStatistics()
+                    .entrySet()) {
+                    StatisticsColumnGlobalDataView colStatView = new StatisticsColumnGlobalDataView(key,
+                        colStat.getKey(), stat);
+
+                    res.add(colStatView);
+                }
+            }
+            else {
+                ColumnStatistics colStat = localStatsEntry.getValue().columnStatistics(column);
+
+                if (colStat != null) {
+                    StatisticsColumnGlobalDataView colStatView = new StatisticsColumnGlobalDataView(key, column, stat);
+
+                    res.add(colStatView);
+                }
+            }
+        }
+
+        return res;
+    }
+
+    /** Start. */
+    public void start() {
+        if (log.isDebugEnabled())
+            log.debug("Global statistics manager starting...");
+
+        globalStatistics.clear();
+        exchange.registerExchangeAwareComponent(exchAwareLsnr);
+
+        if (log.isDebugEnabled())
+            log.debug("Global statistics manager started.");
+    }
+
+    /** Stop. */
+    public void stop() {
+        if (log.isDebugEnabled())
+            log.debug("Global statistics manager stopping...");
+
+        topVer = null;
+
+        globalStatistics.clear();
+
+        inGloblaRequests.clear();
+        inLocalRequests.clear();
+        outGlobalStatisticsRequests.clear();
+        curCollections.clear();
+
+        exchange.unregisterExchangeAwareComponent(exchAwareLsnr);
+
+        if (log.isDebugEnabled())
+            log.debug("Global statistics manager stopped.");
+    }
+
+    /**
+     * Get global statistics for the given key. If there is no cached statistics, but
+     *
+     * @param key Statistics key.
+     * @return Global object statistics or {@code null} if there is no global statistics available.
+     */
+    public ObjectStatisticsImpl getGlobalStatistics(StatisticsKey key) {
+        CacheEntry<ObjectStatisticsImpl> res = globalStatistics.computeIfAbsent(key, k -> {
+            if (log.isDebugEnabled())
+                log.debug("Scheduling global statistics collection by key " + key);
+
+            mgmtPool.submit(() -> collectGlobalStatistics(key));
+
+            return new CacheEntry<>(null);
+        });
+
+        return res.object();
+    }
+
+    /**
+     * Either send local or global statistics request to get global statistics.
+     *
+     * @param key Statistics key to get global statistics by.
+     */
+    private void collectGlobalStatistics(StatisticsKey key) {
+        try {
+            StatisticsObjectConfiguration statCfg = cfgMgr.config(key);
+
+            if (statCfg != null && !statCfg.columns().isEmpty()) {
+                UUID statMaster = getStatisticsMasterNode(key);
+
+                if (discoMgr.localNode().id().equals(statMaster))
+                    gatherGlobalStatistics(statCfg);
+                else {
+                    StatisticsKeyMessage keyMsg = new StatisticsKeyMessage(key.schema(), key.obj(),
+                        Collections.emptyList());
+
+                    Map<String, Long> versions = statCfg.columns().entrySet().stream()
+                        .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().version()));
+
+                    StatisticsRequest globalReq = new StatisticsRequest(UUID.randomUUID(), keyMsg,
+                        StatisticsType.GLOBAL, null, versions);
+
+                    outGlobalStatisticsRequests.put(key, globalReq.reqId());
+
+                    if (log.isDebugEnabled())
+                        log.debug("Send global statistics request by configuration " + statCfg);
+
+                    send(statMaster, globalReq);
+
+                }
+            }
+            else {
+                if (log.isDebugEnabled())
+                    log.debug("Unable to start global statistics collection due to lack of configuration by key "
+                        + key);
+            }
+
+        }
+        catch (IgniteCheckedException e) {
+            if (log.isInfoEnabled())
+                log.info("Unable to get statistics configuration due to " + e.getMessage());
+        }
+    }
+
+    /**
+     * Collect global statistics on master node.
+     *
+     * @param statCfg Statistics config to gather global statistics by.
+     */
+    private void gatherGlobalStatistics(StatisticsObjectConfiguration statCfg) throws IgniteCheckedException {
+        if (log.isDebugEnabled())
+            log.debug("Start global statistics collection by configuration " + statCfg);
+
+        StatisticsTarget target = new StatisticsTarget(statCfg.key());
+
+        List<StatisticsAddressedRequest> locRequests = helper.generateGatheringRequests(target, statCfg);
+        UUID reqId = locRequests.get(0).req().reqId();
+
+        StatisticsGatheringContext gatCtx = new StatisticsGatheringContext(locRequests.size(), reqId);
+
+        curCollections.put(statCfg.key(), gatCtx);
+
+        for (StatisticsAddressedRequest addReq : locRequests) {
+            if (log.isDebugEnabled())
+                log.debug("Sending local request " + addReq.req().reqId() + " to node " + addReq.nodeId());
+
+            send(addReq.nodeId(), addReq.req());
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
+        mgmtPool.submit(() -> {
+            try {
+                if (msg instanceof StatisticsRequest) {
+                    StatisticsRequest req = (StatisticsRequest)msg;
+                    switch (req.type()) {
+                        case LOCAL:
+                            processLocalRequest(nodeId, req);
+
+                            break;
+
+                        case GLOBAL:
+                            processGlobalRequest(nodeId, req);
+
+                            break;
+
+                        default:
+                            log.warning("Unexpected type " + req.type() + " in statistics request message " + req);
+                    }
+                }
+                else if (msg instanceof StatisticsResponse) {
+                    StatisticsResponse resp = (StatisticsResponse)msg;
+
+                    switch (resp.data().type()) {
+                        case LOCAL:
+                            processLocalResponse(nodeId, resp);
+
+                            break;
+
+                        case GLOBAL:
+                            processGlobalResponse(nodeId, resp);
+
+                            break;
+
+                        default:
+                            log.warning("Unexpected type " + resp.data().type() +
+                                " in statistics reposonse message " + resp);
+                    }
+
+                }
+                else
+                    log.warning("Unknown msg " + msg + " in statistics topic " + GridTopic.TOPIC_STATISTICS +
+                        " from node " + nodeId);
+            }
+            catch (Throwable e) {
+                if (log.isInfoEnabled())
+                    log.info("Unable to process statistics message: " + e);
+            }
+        });
+    }
+
+    private void getTopVer(StatisticsKey key) {

Review comment:
       empty method 

##########
File path: modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteGlobalStatisticsManager.java
##########
@@ -0,0 +1,985 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.stat;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.internal.GridTopic;
+import org.apache.ignite.internal.events.DiscoveryCustomEvent;
+import org.apache.ignite.internal.managers.communication.GridIoManager;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.internal.managers.communication.GridMessageListener;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
+import org.apache.ignite.internal.managers.systemview.GridSystemViewManager;
+import org.apache.ignite.internal.managers.systemview.walker.StatisticsColumnGlobalDataViewWalker;
+import org.apache.ignite.internal.managers.systemview.walker.StatisticsColumnLocalDataViewWalker;
+import org.apache.ignite.internal.managers.systemview.walker.StatisticsColumnPartitionDataViewWalker;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
+import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
+import org.apache.ignite.internal.processors.cluster.GridClusterStateProcessor;
+import org.apache.ignite.internal.processors.query.stat.config.StatisticsColumnConfiguration;
+import org.apache.ignite.internal.processors.query.stat.config.StatisticsObjectConfiguration;
+import org.apache.ignite.internal.processors.query.stat.messages.StatisticsKeyMessage;
+import org.apache.ignite.internal.processors.query.stat.messages.StatisticsObjectData;
+import org.apache.ignite.internal.processors.query.stat.messages.StatisticsRequest;
+import org.apache.ignite.internal.processors.query.stat.messages.StatisticsResponse;
+import org.apache.ignite.internal.processors.query.stat.view.StatisticsColumnConfigurationView;
+import org.apache.ignite.internal.processors.query.stat.view.StatisticsColumnGlobalDataView;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.thread.IgniteThreadPoolExecutor;
+
+/**
+ * TODO: TBD
+ * Crawler to track and handle any requests, related to statistics.
+ * Crawler tracks requests and call back statistics manager to process failed requests.
+ */
+public class IgniteGlobalStatisticsManager implements GridMessageListener {
+    /** */
+    private static final String STAT_GLOBAL_VIEW_NAME = "statisticsGlobalData";
+
+    /** */
+    private static final String STAT_GLOBAL_VIEW_DESCRIPTION = "Global statistics.";
+
+    /** Statistics configuration manager. */
+    private final IgniteStatisticsConfigurationManager cfgMgr;
+
+    /** Statistics repository. */
+    private final IgniteStatisticsRepository repo;
+
+    /** Statistics gatherer. */
+    private final StatisticsGatherer gatherer;
+
+    /** Pool to process statistics requests. */
+    private final IgniteThreadPoolExecutor mgmtPool;
+
+    /** Discovery manager to get server node list to statistics master calculation. */
+    private final GridDiscoveryManager discoMgr;
+
+    /** Cluster state processor. */
+    private final GridClusterStateProcessor cluster;
+
+    /** Cache partition exchange manager. */
+    private final GridCachePartitionExchangeManager<?, ?> exchange;
+
+    /** Helper to transform or generate statistics related messages. */
+    private final IgniteStatisticsHelper helper;
+
+    /** Grid io manager to exchange global and local statistics. */
+    private final GridIoManager ioMgr;
+
+    /** Cache for global statistics. */
+    private final ConcurrentMap<StatisticsKey, CacheEntry<ObjectStatisticsImpl>> globalStatistics =
+        new ConcurrentHashMap<>();
+
+    /** Incoming requests which should be served after local statistics collection finish. */
+    private final ConcurrentMap<StatisticsKey, Collection<StatisticsAddressedRequest>> inLocalRequests =
+        new ConcurrentHashMap<>();
+
+    /** Incoming requests which should be served after global statistics collection finish. */
+    private final ConcurrentMap<StatisticsKey, Collection<StatisticsAddressedRequest>> inGloblaRequests =
+        new ConcurrentHashMap<>();
+
+    /** Outcoming global collection requests. */
+    private final ConcurrentMap<StatisticsKey, StatisticsGatheringContext> curCollections = new ConcurrentHashMap<>();
+
+    /** Outcoming global statistics requests to request id. */
+    private final ConcurrentMap<StatisticsKey, UUID> outGlobalStatisticsRequests = new ConcurrentHashMap<>();
+
+    /** Actual topology version for all pending requests. */
+    private volatile AffinityTopologyVersion topVer;
+
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** Exchange listener. */
+    private final PartitionsExchangeAware exchAwareLsnr = new PartitionsExchangeAware() {
+        @Override public void onDoneAfterTopologyUnlock(GridDhtPartitionsExchangeFuture fut) {
+
+            // Skip join/left client nodes.
+            if (fut.exchangeType() != GridDhtPartitionsExchangeFuture.ExchangeType.ALL ||
+                cluster.clusterState().lastState() != ClusterState.ACTIVE)
+                return;
+
+            DiscoveryEvent evt = fut.firstEvent();
+
+            // Skip create/destroy caches.
+            if (evt.type() == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) {
+                DiscoveryCustomMessage msg = ((DiscoveryCustomEvent)evt).customMessage();
+
+                if (msg instanceof DynamicCacheChangeBatch)
+                    return;
+
+                // Just clear all activities and update topology version.
+                if (log.isDebugEnabled())
+                    log.debug("Resetting all global statistics activities due to new topology " +
+                        fut.topologyVersion());
+
+                inLocalRequests.clear();
+                inGloblaRequests.clear();
+                curCollections.clear();
+                outGlobalStatisticsRequests.clear();
+
+                topVer = fut.topologyVersion();
+            }
+        }
+    };
+
+    /**
+     * Constructor.
+     *
+     * @param cfgMgr Statistics configuration manager.
+     */
+    public IgniteGlobalStatisticsManager(
+        IgniteStatisticsConfigurationManager cfgMgr,
+        GridSystemViewManager sysViewMgr,
+        IgniteStatisticsRepository repo,
+        StatisticsGatherer gatherer,
+        IgniteThreadPoolExecutor mgmtPool,
+        GridDiscoveryManager discoMgr,
+        GridClusterStateProcessor cluster,
+        GridCachePartitionExchangeManager<?, ?> exchange,
+        IgniteStatisticsHelper helper,
+        GridIoManager ioMgr,
+        Function<Class<?>, IgniteLogger> logSupplier
+    ) {
+        this.cfgMgr = cfgMgr;
+        this.repo = repo;
+        this.gatherer = gatherer;
+        this.mgmtPool = mgmtPool;
+        this.discoMgr = discoMgr;
+        this.cluster = cluster;
+        this.exchange = exchange;
+        this.helper = helper;
+        this.ioMgr = ioMgr;
+        log = logSupplier.apply(IgniteGlobalStatisticsManager.class);
+
+        ioMgr.addMessageListener(GridTopic.TOPIC_STATISTICS, this);
+
+        sysViewMgr.registerFiltrableView(STAT_GLOBAL_VIEW_NAME, STAT_GLOBAL_VIEW_DESCRIPTION,
+            new StatisticsColumnGlobalDataViewWalker(), this::columnGlobalStatisticsViewSupplier, Function.identity());
+    }
+
+    /**
+     * Statistics column global data view filterable supplier.
+     *
+     * @param filter Filter.
+     * @return Iterable with statistics column global data views.
+     */
+    private Iterable<StatisticsColumnGlobalDataView> columnGlobalStatisticsViewSupplier(Map<String, Object> filter) {
+        String type = (String)filter.get(StatisticsColumnPartitionDataViewWalker.TYPE_FILTER);
+        if (type != null && !StatisticsColumnConfigurationView.TABLE_TYPE.equalsIgnoreCase(type))
+            return Collections.emptyList();
+
+        String schema = (String)filter.get(StatisticsColumnLocalDataViewWalker.SCHEMA_FILTER);
+        String name = (String)filter.get(StatisticsColumnLocalDataViewWalker.NAME_FILTER);
+        String column = (String)filter.get(StatisticsColumnPartitionDataViewWalker.COLUMN_FILTER);
+
+        Map<StatisticsKey, ObjectStatisticsImpl> globalStatsMap;
+        if (!F.isEmpty(schema) && !F.isEmpty(name)) {
+            StatisticsKey key = new StatisticsKey(schema, name);
+
+            CacheEntry<ObjectStatisticsImpl> objLocStat = globalStatistics.get(key);
+
+            if (objLocStat == null || objLocStat.obj == null)
+                return Collections.emptyList();
+
+            globalStatsMap = Collections.singletonMap(key, objLocStat.object());
+        }
+        else
+            globalStatsMap = globalStatistics.entrySet().stream()
+                .filter(e -> e.getValue().object() != null && (F.isEmpty(schema) || schema.equals(e.getKey().schema())))
+                .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().object()));
+
+        List<StatisticsColumnGlobalDataView> res = new ArrayList<>();
+
+        for (Map.Entry<StatisticsKey, ObjectStatisticsImpl> localStatsEntry : globalStatsMap.entrySet()) {
+            StatisticsKey key = localStatsEntry.getKey();
+            ObjectStatisticsImpl stat = localStatsEntry.getValue();
+
+            if (column == null) {
+                for (Map.Entry<String, ColumnStatistics> colStat : localStatsEntry.getValue().columnsStatistics()
+                    .entrySet()) {
+                    StatisticsColumnGlobalDataView colStatView = new StatisticsColumnGlobalDataView(key,
+                        colStat.getKey(), stat);
+
+                    res.add(colStatView);
+                }
+            }
+            else {
+                ColumnStatistics colStat = localStatsEntry.getValue().columnStatistics(column);
+
+                if (colStat != null) {
+                    StatisticsColumnGlobalDataView colStatView = new StatisticsColumnGlobalDataView(key, column, stat);
+
+                    res.add(colStatView);
+                }
+            }
+        }
+
+        return res;
+    }
+
+    /** Start. */
+    public void start() {
+        if (log.isDebugEnabled())
+            log.debug("Global statistics manager starting...");
+
+        globalStatistics.clear();
+        exchange.registerExchangeAwareComponent(exchAwareLsnr);
+
+        if (log.isDebugEnabled())
+            log.debug("Global statistics manager started.");
+    }
+
+    /** Stop. */
+    public void stop() {
+        if (log.isDebugEnabled())
+            log.debug("Global statistics manager stopping...");
+
+        topVer = null;
+
+        globalStatistics.clear();
+
+        inGloblaRequests.clear();
+        inLocalRequests.clear();
+        outGlobalStatisticsRequests.clear();
+        curCollections.clear();
+
+        exchange.unregisterExchangeAwareComponent(exchAwareLsnr);
+
+        if (log.isDebugEnabled())
+            log.debug("Global statistics manager stopped.");
+    }
+
+    /**
+     * Get global statistics for the given key. If there is no cached statistics, but
+     *
+     * @param key Statistics key.
+     * @return Global object statistics or {@code null} if there is no global statistics available.
+     */
+    public ObjectStatisticsImpl getGlobalStatistics(StatisticsKey key) {
+        CacheEntry<ObjectStatisticsImpl> res = globalStatistics.computeIfAbsent(key, k -> {
+            if (log.isDebugEnabled())
+                log.debug("Scheduling global statistics collection by key " + key);
+
+            mgmtPool.submit(() -> collectGlobalStatistics(key));
+
+            return new CacheEntry<>(null);
+        });
+
+        return res.object();
+    }
+
+    /**
+     * Either send local or global statistics request to get global statistics.
+     *
+     * @param key Statistics key to get global statistics by.
+     */
+    private void collectGlobalStatistics(StatisticsKey key) {
+        try {
+            StatisticsObjectConfiguration statCfg = cfgMgr.config(key);
+
+            if (statCfg != null && !statCfg.columns().isEmpty()) {
+                UUID statMaster = getStatisticsMasterNode(key);
+
+                if (discoMgr.localNode().id().equals(statMaster))
+                    gatherGlobalStatistics(statCfg);
+                else {
+                    StatisticsKeyMessage keyMsg = new StatisticsKeyMessage(key.schema(), key.obj(),
+                        Collections.emptyList());
+
+                    Map<String, Long> versions = statCfg.columns().entrySet().stream()
+                        .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().version()));
+
+                    StatisticsRequest globalReq = new StatisticsRequest(UUID.randomUUID(), keyMsg,
+                        StatisticsType.GLOBAL, null, versions);
+
+                    outGlobalStatisticsRequests.put(key, globalReq.reqId());
+
+                    if (log.isDebugEnabled())
+                        log.debug("Send global statistics request by configuration " + statCfg);
+
+                    send(statMaster, globalReq);
+
+                }
+            }
+            else {
+                if (log.isDebugEnabled())
+                    log.debug("Unable to start global statistics collection due to lack of configuration by key "
+                        + key);
+            }
+
+        }
+        catch (IgniteCheckedException e) {
+            if (log.isInfoEnabled())
+                log.info("Unable to get statistics configuration due to " + e.getMessage());
+        }
+    }
+
+    /**
+     * Collect global statistics on master node.
+     *
+     * @param statCfg Statistics config to gather global statistics by.
+     */
+    private void gatherGlobalStatistics(StatisticsObjectConfiguration statCfg) throws IgniteCheckedException {
+        if (log.isDebugEnabled())
+            log.debug("Start global statistics collection by configuration " + statCfg);
+
+        StatisticsTarget target = new StatisticsTarget(statCfg.key());
+
+        List<StatisticsAddressedRequest> locRequests = helper.generateGatheringRequests(target, statCfg);
+        UUID reqId = locRequests.get(0).req().reqId();
+
+        StatisticsGatheringContext gatCtx = new StatisticsGatheringContext(locRequests.size(), reqId);
+
+        curCollections.put(statCfg.key(), gatCtx);
+
+        for (StatisticsAddressedRequest addReq : locRequests) {
+            if (log.isDebugEnabled())
+                log.debug("Sending local request " + addReq.req().reqId() + " to node " + addReq.nodeId());
+
+            send(addReq.nodeId(), addReq.req());
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
+        mgmtPool.submit(() -> {
+            try {
+                if (msg instanceof StatisticsRequest) {
+                    StatisticsRequest req = (StatisticsRequest)msg;
+                    switch (req.type()) {
+                        case LOCAL:
+                            processLocalRequest(nodeId, req);
+
+                            break;
+
+                        case GLOBAL:
+                            processGlobalRequest(nodeId, req);
+
+                            break;
+
+                        default:
+                            log.warning("Unexpected type " + req.type() + " in statistics request message " + req);
+                    }
+                }
+                else if (msg instanceof StatisticsResponse) {
+                    StatisticsResponse resp = (StatisticsResponse)msg;
+
+                    switch (resp.data().type()) {
+                        case LOCAL:
+                            processLocalResponse(nodeId, resp);
+
+                            break;
+
+                        case GLOBAL:
+                            processGlobalResponse(nodeId, resp);
+
+                            break;
+
+                        default:
+                            log.warning("Unexpected type " + resp.data().type() +
+                                " in statistics reposonse message " + resp);
+                    }
+
+                }
+                else
+                    log.warning("Unknown msg " + msg + " in statistics topic " + GridTopic.TOPIC_STATISTICS +
+                        " from node " + nodeId);
+            }
+            catch (Throwable e) {
+                if (log.isInfoEnabled())
+                    log.info("Unable to process statistics message: " + e);
+            }
+        });
+    }
+
+    private void getTopVer(StatisticsKey key) {
+
+    }
+
+    /**
+     * Process request for local statistics.
+     * 1) If there are local statistics for the given key - send response.
+     * 2) If there is no such statistics - add request to incoming queue.
+     * @param nodeId Sender node id.
+     * @param req Request to process.
+     * @throws IgniteCheckedException
+     */
+    private void processLocalRequest(UUID nodeId, StatisticsRequest req) throws IgniteCheckedException {
+        if (log.isDebugEnabled())
+            log.debug("Got local statistics request from node " + nodeId + " : " + req);
+
+        StatisticsKey key = new StatisticsKey(req.key().schema(), req.key().obj());
+        ObjectStatisticsImpl objectStatistics = repo.getLocalStatistics(key, req.topVer());
+
+        if (checkStatisticsVersions(objectStatistics, req.versions()))
+            sendResponse(nodeId, req.reqId(), key, StatisticsType.LOCAL, objectStatistics);
+        else {
+            StatisticsObjectConfiguration cfg = cfgMgr.config(key);
+            CacheGroupContext grpCtx = helper.getGroupContext(key);
+            AffinityTopologyVersion topVer = grpCtx.affinity().lastVersion();
+
+            addToRequests(inLocalRequests, key, new StatisticsAddressedRequest(req, nodeId));
+
+            if (checkStatisticsCfg(cfg, req.versions()) && topVer.compareTo(req.topVer()) >= 0) {
+                cfgMgr.checkLocalStatistics(cfg, topVer);
+                LocalStatisticsGatheringContext ctx = gatherer.gatheringInProgress(cfg.key());
+
+                if (ctx != null)
+                    // If there is no context = aggregation finished and data will be send at double check below
+                    ctx.futureAggregate().thenAccept(stat -> onLocalStatisticsAggregated(key, stat, topVer));
+            }
+
+            // Double check that we have no race with collection finishing.
+            objectStatistics = repo.getLocalStatistics(key, req.topVer());
+
+            if (checkStatisticsVersions(objectStatistics, req.versions())) {
+                StatisticsAddressedRequest removedReq = removeFromRequests(inLocalRequests, key, req.reqId());
+
+                if (removedReq != null)
+                    sendResponse(nodeId, removedReq.req().reqId(), key, StatisticsType.LOCAL, objectStatistics);
+                // else was already processed by on collect handler.
+            }
+        }
+    }
+
+    /**
+     * Test if statistics configuration is fit to all required versions.
+     * @param cfg Statistics configuration to check.
+     * @param versions Map of column name to required version.
+     * @return {@code true} if it is, {@code false} otherwise.
+     */
+    private boolean checkStatisticsCfg(StatisticsObjectConfiguration cfg, Map<String, Long> versions) {
+        if (cfg == null)
+            return false;
+
+        for (Map.Entry<String, Long> version : versions.entrySet()) {
+            StatisticsColumnConfiguration colCfg = cfg.columns().get(version.getKey());
+
+            if (colCfg == null || colCfg.version() < version.getValue())
+                return false;
+        }
+
+        return true;
+    }
+
+    /**
+     * Test if specified statistics is fit to all required versions.
+     *
+     * @param stat Statistics to check.
+     * @param versions Map of column name to required version.
+     * @return {@code true} if it is, {@code false} otherwise.
+     */
+    private boolean checkStatisticsVersions(
+        ObjectStatisticsImpl stat,
+        Map<String, Long> versions
+    ) {
+        if (stat == null)
+            return false;
+
+        for (Map.Entry<String, Long> version : versions.entrySet()) {
+            ColumnStatistics colStat = stat.columnsStatistics().get(version.getKey());
+
+            if (colStat == null || colStat.version() < version.getValue())
+                return false;
+        }
+
+        return true;
+    }
+
+    /**
+     * Process incoming request for global statistics. Either response (if it exists), or collect and response
+     * (if current node is master node for the given key) or ignore (if current node is no more master node for
+     * the given key.
+     *
+     * @param nodeId Sender node id.
+     * @param req Request.
+     */
+    private void processGlobalRequest(UUID nodeId, StatisticsRequest req) throws IgniteCheckedException {

Review comment:
       multiple invocation of this method will trigger multiple collection of global statistics 

##########
File path: modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/PSUBasicValueDistributionTableStatisticsUsageTest.java
##########
@@ -93,7 +93,7 @@
 
         sql("CREATE INDEX empty_distribution_no_stat_col_a ON empty_distribution_no_stat(col_a)");
 
-        collectStatistics("digital_distribution", "empty_distribution");
+        collectStatistics(StatisticsType.GLOBAL, "digital_distribution", "empty_distribution");

Review comment:
       please keep the method with an old signature to avoid dozen of similar changes

##########
File path: modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsConfigurationTest.java
##########
@@ -270,16 +270,18 @@ public void dropUpdate() throws Exception {
 
         createSmallTable(null);
 
-        collectStatistics(SMALL_TARGET);
+        collectStatistics(StatisticsType.GLOBAL, SMALL_TARGET);
 
         waitForStats(SCHEMA, "SMALL", TIMEOUT, checkTotalRows, checkColumStats);
 
+        System.out.println("Drop SMALL.A statistics");

Review comment:
       sout

##########
File path: modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
##########
@@ -376,12 +376,12 @@ protected void checkSplitAndSerialization(IgniteRel rel, IgniteSchema publicSche
             clearTraits(deserialized);
 
             if (!expected.deepEquals(deserialized))
-            assertTrue(
-                "Invalid serialization / deserialization.\n" +
-                    "Expected:\n" + RelOptUtil.toString(expected) +
-                    "Deserialized:\n" + RelOptUtil.toString(deserialized),
-                expected.deepEquals(deserialized)
-            );
+                assertTrue(

Review comment:
       please wrap it with curly braces

##########
File path: modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteGlobalStatisticsManager.java
##########
@@ -0,0 +1,985 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.stat;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.internal.GridTopic;
+import org.apache.ignite.internal.events.DiscoveryCustomEvent;
+import org.apache.ignite.internal.managers.communication.GridIoManager;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.internal.managers.communication.GridMessageListener;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
+import org.apache.ignite.internal.managers.systemview.GridSystemViewManager;
+import org.apache.ignite.internal.managers.systemview.walker.StatisticsColumnGlobalDataViewWalker;
+import org.apache.ignite.internal.managers.systemview.walker.StatisticsColumnLocalDataViewWalker;
+import org.apache.ignite.internal.managers.systemview.walker.StatisticsColumnPartitionDataViewWalker;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
+import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
+import org.apache.ignite.internal.processors.cluster.GridClusterStateProcessor;
+import org.apache.ignite.internal.processors.query.stat.config.StatisticsColumnConfiguration;
+import org.apache.ignite.internal.processors.query.stat.config.StatisticsObjectConfiguration;
+import org.apache.ignite.internal.processors.query.stat.messages.StatisticsKeyMessage;
+import org.apache.ignite.internal.processors.query.stat.messages.StatisticsObjectData;
+import org.apache.ignite.internal.processors.query.stat.messages.StatisticsRequest;
+import org.apache.ignite.internal.processors.query.stat.messages.StatisticsResponse;
+import org.apache.ignite.internal.processors.query.stat.view.StatisticsColumnConfigurationView;
+import org.apache.ignite.internal.processors.query.stat.view.StatisticsColumnGlobalDataView;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.thread.IgniteThreadPoolExecutor;
+
+/**
+ * TODO: TBD
+ * Crawler to track and handle any requests, related to statistics.
+ * Crawler tracks requests and call back statistics manager to process failed requests.
+ */
+public class IgniteGlobalStatisticsManager implements GridMessageListener {
+    /** */
+    private static final String STAT_GLOBAL_VIEW_NAME = "statisticsGlobalData";
+
+    /** */
+    private static final String STAT_GLOBAL_VIEW_DESCRIPTION = "Global statistics.";
+
+    /** Statistics configuration manager. */
+    private final IgniteStatisticsConfigurationManager cfgMgr;
+
+    /** Statistics repository. */
+    private final IgniteStatisticsRepository repo;
+
+    /** Statistics gatherer. */
+    private final StatisticsGatherer gatherer;
+
+    /** Pool to process statistics requests. */
+    private final IgniteThreadPoolExecutor mgmtPool;
+
+    /** Discovery manager to get server node list to statistics master calculation. */
+    private final GridDiscoveryManager discoMgr;
+
+    /** Cluster state processor. */
+    private final GridClusterStateProcessor cluster;
+
+    /** Cache partition exchange manager. */
+    private final GridCachePartitionExchangeManager<?, ?> exchange;
+
+    /** Helper to transform or generate statistics related messages. */
+    private final IgniteStatisticsHelper helper;
+
+    /** Grid io manager to exchange global and local statistics. */
+    private final GridIoManager ioMgr;
+
+    /** Cache for global statistics. */
+    private final ConcurrentMap<StatisticsKey, CacheEntry<ObjectStatisticsImpl>> globalStatistics =
+        new ConcurrentHashMap<>();
+
+    /** Incoming requests which should be served after local statistics collection finish. */
+    private final ConcurrentMap<StatisticsKey, Collection<StatisticsAddressedRequest>> inLocalRequests =
+        new ConcurrentHashMap<>();
+
+    /** Incoming requests which should be served after global statistics collection finish. */
+    private final ConcurrentMap<StatisticsKey, Collection<StatisticsAddressedRequest>> inGloblaRequests =
+        new ConcurrentHashMap<>();
+
+    /** Outcoming global collection requests. */
+    private final ConcurrentMap<StatisticsKey, StatisticsGatheringContext> curCollections = new ConcurrentHashMap<>();
+
+    /** Outcoming global statistics requests to request id. */
+    private final ConcurrentMap<StatisticsKey, UUID> outGlobalStatisticsRequests = new ConcurrentHashMap<>();
+
+    /** Actual topology version for all pending requests. */
+    private volatile AffinityTopologyVersion topVer;

Review comment:
       Looks like topVer is never read

##########
File path: modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteGlobalStatisticsManager.java
##########
@@ -0,0 +1,985 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.stat;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.internal.GridTopic;
+import org.apache.ignite.internal.events.DiscoveryCustomEvent;
+import org.apache.ignite.internal.managers.communication.GridIoManager;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.internal.managers.communication.GridMessageListener;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
+import org.apache.ignite.internal.managers.systemview.GridSystemViewManager;
+import org.apache.ignite.internal.managers.systemview.walker.StatisticsColumnGlobalDataViewWalker;
+import org.apache.ignite.internal.managers.systemview.walker.StatisticsColumnLocalDataViewWalker;
+import org.apache.ignite.internal.managers.systemview.walker.StatisticsColumnPartitionDataViewWalker;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
+import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
+import org.apache.ignite.internal.processors.cluster.GridClusterStateProcessor;
+import org.apache.ignite.internal.processors.query.stat.config.StatisticsColumnConfiguration;
+import org.apache.ignite.internal.processors.query.stat.config.StatisticsObjectConfiguration;
+import org.apache.ignite.internal.processors.query.stat.messages.StatisticsKeyMessage;
+import org.apache.ignite.internal.processors.query.stat.messages.StatisticsObjectData;
+import org.apache.ignite.internal.processors.query.stat.messages.StatisticsRequest;
+import org.apache.ignite.internal.processors.query.stat.messages.StatisticsResponse;
+import org.apache.ignite.internal.processors.query.stat.view.StatisticsColumnConfigurationView;
+import org.apache.ignite.internal.processors.query.stat.view.StatisticsColumnGlobalDataView;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.thread.IgniteThreadPoolExecutor;
+
+/**
+ * TODO: TBD
+ * Crawler to track and handle any requests, related to statistics.
+ * Crawler tracks requests and call back statistics manager to process failed requests.
+ */
+public class IgniteGlobalStatisticsManager implements GridMessageListener {
+    /** */
+    private static final String STAT_GLOBAL_VIEW_NAME = "statisticsGlobalData";
+
+    /** */
+    private static final String STAT_GLOBAL_VIEW_DESCRIPTION = "Global statistics.";
+
+    /** Statistics configuration manager. */
+    private final IgniteStatisticsConfigurationManager cfgMgr;
+
+    /** Statistics repository. */
+    private final IgniteStatisticsRepository repo;
+
+    /** Statistics gatherer. */
+    private final StatisticsGatherer gatherer;
+
+    /** Pool to process statistics requests. */
+    private final IgniteThreadPoolExecutor mgmtPool;
+
+    /** Discovery manager to get server node list to statistics master calculation. */
+    private final GridDiscoveryManager discoMgr;
+
+    /** Cluster state processor. */
+    private final GridClusterStateProcessor cluster;
+
+    /** Cache partition exchange manager. */
+    private final GridCachePartitionExchangeManager<?, ?> exchange;
+
+    /** Helper to transform or generate statistics related messages. */
+    private final IgniteStatisticsHelper helper;
+
+    /** Grid io manager to exchange global and local statistics. */
+    private final GridIoManager ioMgr;
+
+    /** Cache for global statistics. */
+    private final ConcurrentMap<StatisticsKey, CacheEntry<ObjectStatisticsImpl>> globalStatistics =
+        new ConcurrentHashMap<>();
+
+    /** Incoming requests which should be served after local statistics collection finish. */
+    private final ConcurrentMap<StatisticsKey, Collection<StatisticsAddressedRequest>> inLocalRequests =
+        new ConcurrentHashMap<>();
+
+    /** Incoming requests which should be served after global statistics collection finish. */
+    private final ConcurrentMap<StatisticsKey, Collection<StatisticsAddressedRequest>> inGloblaRequests =
+        new ConcurrentHashMap<>();
+
+    /** Outcoming global collection requests. */
+    private final ConcurrentMap<StatisticsKey, StatisticsGatheringContext> curCollections = new ConcurrentHashMap<>();
+
+    /** Outcoming global statistics requests to request id. */
+    private final ConcurrentMap<StatisticsKey, UUID> outGlobalStatisticsRequests = new ConcurrentHashMap<>();
+
+    /** Actual topology version for all pending requests. */
+    private volatile AffinityTopologyVersion topVer;
+
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** Exchange listener. */
+    private final PartitionsExchangeAware exchAwareLsnr = new PartitionsExchangeAware() {
+        @Override public void onDoneAfterTopologyUnlock(GridDhtPartitionsExchangeFuture fut) {
+
+            // Skip join/left client nodes.
+            if (fut.exchangeType() != GridDhtPartitionsExchangeFuture.ExchangeType.ALL ||
+                cluster.clusterState().lastState() != ClusterState.ACTIVE)
+                return;
+
+            DiscoveryEvent evt = fut.firstEvent();
+
+            // Skip create/destroy caches.
+            if (evt.type() == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) {
+                DiscoveryCustomMessage msg = ((DiscoveryCustomEvent)evt).customMessage();
+
+                if (msg instanceof DynamicCacheChangeBatch)
+                    return;
+
+                // Just clear all activities and update topology version.
+                if (log.isDebugEnabled())
+                    log.debug("Resetting all global statistics activities due to new topology " +
+                        fut.topologyVersion());
+
+                inLocalRequests.clear();
+                inGloblaRequests.clear();
+                curCollections.clear();
+                outGlobalStatisticsRequests.clear();
+
+                topVer = fut.topologyVersion();
+            }
+        }
+    };
+
+    /**
+     * Constructor.
+     *
+     * @param cfgMgr Statistics configuration manager.
+     */
+    public IgniteGlobalStatisticsManager(
+        IgniteStatisticsConfigurationManager cfgMgr,
+        GridSystemViewManager sysViewMgr,
+        IgniteStatisticsRepository repo,
+        StatisticsGatherer gatherer,
+        IgniteThreadPoolExecutor mgmtPool,
+        GridDiscoveryManager discoMgr,
+        GridClusterStateProcessor cluster,
+        GridCachePartitionExchangeManager<?, ?> exchange,
+        IgniteStatisticsHelper helper,
+        GridIoManager ioMgr,
+        Function<Class<?>, IgniteLogger> logSupplier
+    ) {
+        this.cfgMgr = cfgMgr;
+        this.repo = repo;
+        this.gatherer = gatherer;
+        this.mgmtPool = mgmtPool;
+        this.discoMgr = discoMgr;
+        this.cluster = cluster;
+        this.exchange = exchange;
+        this.helper = helper;
+        this.ioMgr = ioMgr;
+        log = logSupplier.apply(IgniteGlobalStatisticsManager.class);
+
+        ioMgr.addMessageListener(GridTopic.TOPIC_STATISTICS, this);
+
+        sysViewMgr.registerFiltrableView(STAT_GLOBAL_VIEW_NAME, STAT_GLOBAL_VIEW_DESCRIPTION,
+            new StatisticsColumnGlobalDataViewWalker(), this::columnGlobalStatisticsViewSupplier, Function.identity());
+    }
+
+    /**
+     * Statistics column global data view filterable supplier.
+     *
+     * @param filter Filter.
+     * @return Iterable with statistics column global data views.
+     */
+    private Iterable<StatisticsColumnGlobalDataView> columnGlobalStatisticsViewSupplier(Map<String, Object> filter) {
+        String type = (String)filter.get(StatisticsColumnPartitionDataViewWalker.TYPE_FILTER);
+        if (type != null && !StatisticsColumnConfigurationView.TABLE_TYPE.equalsIgnoreCase(type))
+            return Collections.emptyList();
+
+        String schema = (String)filter.get(StatisticsColumnLocalDataViewWalker.SCHEMA_FILTER);
+        String name = (String)filter.get(StatisticsColumnLocalDataViewWalker.NAME_FILTER);
+        String column = (String)filter.get(StatisticsColumnPartitionDataViewWalker.COLUMN_FILTER);
+
+        Map<StatisticsKey, ObjectStatisticsImpl> globalStatsMap;
+        if (!F.isEmpty(schema) && !F.isEmpty(name)) {
+            StatisticsKey key = new StatisticsKey(schema, name);
+
+            CacheEntry<ObjectStatisticsImpl> objLocStat = globalStatistics.get(key);
+
+            if (objLocStat == null || objLocStat.obj == null)
+                return Collections.emptyList();
+
+            globalStatsMap = Collections.singletonMap(key, objLocStat.object());
+        }
+        else
+            globalStatsMap = globalStatistics.entrySet().stream()
+                .filter(e -> e.getValue().object() != null && (F.isEmpty(schema) || schema.equals(e.getKey().schema())))
+                .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().object()));
+
+        List<StatisticsColumnGlobalDataView> res = new ArrayList<>();
+
+        for (Map.Entry<StatisticsKey, ObjectStatisticsImpl> localStatsEntry : globalStatsMap.entrySet()) {
+            StatisticsKey key = localStatsEntry.getKey();
+            ObjectStatisticsImpl stat = localStatsEntry.getValue();
+
+            if (column == null) {
+                for (Map.Entry<String, ColumnStatistics> colStat : localStatsEntry.getValue().columnsStatistics()
+                    .entrySet()) {
+                    StatisticsColumnGlobalDataView colStatView = new StatisticsColumnGlobalDataView(key,
+                        colStat.getKey(), stat);
+
+                    res.add(colStatView);
+                }
+            }
+            else {
+                ColumnStatistics colStat = localStatsEntry.getValue().columnStatistics(column);
+
+                if (colStat != null) {
+                    StatisticsColumnGlobalDataView colStatView = new StatisticsColumnGlobalDataView(key, column, stat);
+
+                    res.add(colStatView);
+                }
+            }
+        }
+
+        return res;
+    }
+
+    /** Start. */
+    public void start() {
+        if (log.isDebugEnabled())
+            log.debug("Global statistics manager starting...");
+
+        globalStatistics.clear();
+        exchange.registerExchangeAwareComponent(exchAwareLsnr);
+
+        if (log.isDebugEnabled())
+            log.debug("Global statistics manager started.");
+    }
+
+    /** Stop. */
+    public void stop() {
+        if (log.isDebugEnabled())
+            log.debug("Global statistics manager stopping...");
+
+        topVer = null;
+
+        globalStatistics.clear();
+
+        inGloblaRequests.clear();
+        inLocalRequests.clear();
+        outGlobalStatisticsRequests.clear();
+        curCollections.clear();
+
+        exchange.unregisterExchangeAwareComponent(exchAwareLsnr);
+
+        if (log.isDebugEnabled())
+            log.debug("Global statistics manager stopped.");
+    }
+
+    /**
+     * Get global statistics for the given key. If there is no cached statistics, but
+     *
+     * @param key Statistics key.
+     * @return Global object statistics or {@code null} if there is no global statistics available.
+     */
+    public ObjectStatisticsImpl getGlobalStatistics(StatisticsKey key) {
+        CacheEntry<ObjectStatisticsImpl> res = globalStatistics.computeIfAbsent(key, k -> {
+            if (log.isDebugEnabled())
+                log.debug("Scheduling global statistics collection by key " + key);
+
+            mgmtPool.submit(() -> collectGlobalStatistics(key));
+
+            return new CacheEntry<>(null);
+        });
+
+        return res.object();
+    }
+
+    /**
+     * Either send local or global statistics request to get global statistics.
+     *
+     * @param key Statistics key to get global statistics by.
+     */
+    private void collectGlobalStatistics(StatisticsKey key) {
+        try {
+            StatisticsObjectConfiguration statCfg = cfgMgr.config(key);
+
+            if (statCfg != null && !statCfg.columns().isEmpty()) {
+                UUID statMaster = getStatisticsMasterNode(key);
+
+                if (discoMgr.localNode().id().equals(statMaster))
+                    gatherGlobalStatistics(statCfg);
+                else {
+                    StatisticsKeyMessage keyMsg = new StatisticsKeyMessage(key.schema(), key.obj(),
+                        Collections.emptyList());
+
+                    Map<String, Long> versions = statCfg.columns().entrySet().stream()
+                        .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().version()));
+
+                    StatisticsRequest globalReq = new StatisticsRequest(UUID.randomUUID(), keyMsg,
+                        StatisticsType.GLOBAL, null, versions);
+
+                    outGlobalStatisticsRequests.put(key, globalReq.reqId());
+
+                    if (log.isDebugEnabled())
+                        log.debug("Send global statistics request by configuration " + statCfg);
+
+                    send(statMaster, globalReq);
+
+                }
+            }
+            else {
+                if (log.isDebugEnabled())
+                    log.debug("Unable to start global statistics collection due to lack of configuration by key "
+                        + key);
+            }
+
+        }
+        catch (IgniteCheckedException e) {
+            if (log.isInfoEnabled())
+                log.info("Unable to get statistics configuration due to " + e.getMessage());
+        }
+    }
+
+    /**
+     * Collect global statistics on master node.
+     *
+     * @param statCfg Statistics config to gather global statistics by.
+     */
+    private void gatherGlobalStatistics(StatisticsObjectConfiguration statCfg) throws IgniteCheckedException {
+        if (log.isDebugEnabled())
+            log.debug("Start global statistics collection by configuration " + statCfg);
+
+        StatisticsTarget target = new StatisticsTarget(statCfg.key());
+
+        List<StatisticsAddressedRequest> locRequests = helper.generateGatheringRequests(target, statCfg);
+        UUID reqId = locRequests.get(0).req().reqId();
+
+        StatisticsGatheringContext gatCtx = new StatisticsGatheringContext(locRequests.size(), reqId);
+
+        curCollections.put(statCfg.key(), gatCtx);
+
+        for (StatisticsAddressedRequest addReq : locRequests) {
+            if (log.isDebugEnabled())
+                log.debug("Sending local request " + addReq.req().reqId() + " to node " + addReq.nodeId());
+
+            send(addReq.nodeId(), addReq.req());
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
+        mgmtPool.submit(() -> {

Review comment:
       do we really need to spawn a task for unsupported messages?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org