You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2021/09/24 11:55:56 UTC

[GitHub] [ignite] Berkof commented on a change in pull request #9423: IGNITE-15281 New implementation of local statistics collection.

Berkof commented on a change in pull request #9423:
URL: https://github.com/apache/ignite/pull/9423#discussion_r715487617



##########
File path: modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsManagerImpl.java
##########
@@ -163,61 +179,68 @@ else if (db == null)
                 if (log.isInfoEnabled())
                     log.info(String.format("Statistics usage state was changed from %s to %s", oldVal, newVal));
 
+                lastUsageState = newVal;
+
                 if (oldVal == newVal)
                     return;
 
-                switch (newVal) {
-                    case OFF:
-                        disableOperations();
-
-                        break;
-                    case ON:
-                    case NO_UPDATE:
-                        enableOperations();
-
-                        break;
-                }
+                stateChanged();

Review comment:
       done, but not start/stop - tryStart/tryStop because there are a lot of conditions to be meet to start.

##########
File path: modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/StatisticsProcessor.java
##########
@@ -0,0 +1,615 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.stat;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Function;
+
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.NodeStoppingException;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
+import org.apache.ignite.internal.processors.query.stat.config.StatisticsColumnConfiguration;
+import org.apache.ignite.internal.processors.query.stat.config.StatisticsObjectConfiguration;
+import org.apache.ignite.internal.processors.query.stat.task.GatherPartitionStatistics;
+import org.apache.ignite.internal.util.GridBusyLock;
+import org.apache.ignite.thread.IgniteThreadPoolExecutor;
+
+/**
+ * Process all tasks, related to statistics repository. Mostly - statistics collection,
+ * invalidation (due to configuration, topology or obsolescence issues) and loads.
+ * Input tasks should be scheduled throug management pool while gathering pool used to process heavy
+ * operations in parallel.
+ *
+ * Manage gathering pool and it's jobs. To guarantee gracefull shutdown:
+ * 1) Any job can be added into gatheringInProgress only in active state (check after adding)
+ * 2) State can be disactivated only after cancelling all jobs and getting busyLock block
+ * 3) Each job should do it's work in busyLock with periodically checking of it's cancellation status.
+ */
+public class StatisticsProcessor {
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** Ignite statistics repository. */
+    private final IgniteStatisticsRepository statRepo;
+
+    /** Ignite Thread pool executor to do statistics collection tasks. */
+    private final IgniteThreadPoolExecutor gatherPool;
+
+    /** (cacheGroupId -> gather context) */
+    private final ConcurrentMap<StatisticsKey, LocalStatisticsGatheringContext> gatheringInProgress =
+        new ConcurrentHashMap<>();
+
+    /** Active flag (used to skip commands in inactive cluster.) */
+    private volatile boolean active;
+
+    /* Lock protection of started gathering during deactivation. */
+    private static final GridBusyLock busyLock = new GridBusyLock();
+
+    /**
+     * Constructor.
+     *
+     * @param repo IgniteStatisticsRepository.
+     * @param gatherPool Thread pool to gather statistics in.
+     * @param logSupplier Log supplier function.
+     */
+    public StatisticsProcessor(
+        IgniteStatisticsRepository repo,
+        IgniteThreadPoolExecutor gatherPool,
+        Function<Class<?>, IgniteLogger> logSupplier
+    ) {
+        this.statRepo = repo;
+        this.gatherPool = gatherPool;
+        this.log = logSupplier.apply(StatisticsProcessor.class);
+    }
+
+    /**
+     * Update statistics for the given key to actual state.
+     * If byObsolescence and tbl is not {@code null} - does not clear any other partitions.
+     * Should run throw management pool only.
+     * 1) Replace previous gathering context if exist and needed (replace byObsolescence gathering with new one or
+     * replace gathering with older configuration or topology version with new one).
+     * 2) If byObsolescence and no table awailable - clean obsolescence and partition statistics for the given key.
+     * 3) Submit tasks for each specified partition.
+     * 4) after last task finish gathering - it starts aggregation.
+     * 5) read all partitions & obsolescence from repo and
+     * if byObsolescence = {@code true} - remove unnecessary one and aggregate by specified list
+     * if byObsolescence = {@code false} - aggregate all presented in store (because it should contains only actual ones)
+     * 5) save aggregated local statistics
+     *
+     * @param byObsolescence Update only obsolescence partitions.
+     * @param tbl Table to update. If {@code null} - just clear all partitions and obsolescence from the repo
+     * @param cfg Statistics configuration to use.
+     * @param partsToProcess Partitions to update, if !byObsolescence - all primary partitions for the given topology.
+     * @param topVer Topology version, can be {@code null} if tbl is null.
+     */
+    public void updateKeyAsync(
+        boolean byObsolescence,
+        GridH2Table tbl,
+        StatisticsObjectConfiguration cfg,
+        Set<Integer> partsToProcess,
+        AffinityTopologyVersion topVer
+    ) {
+        if (!startJob("Updating key " + cfg.key()))
+            return;
+
+        try {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format(
+                    "Start statistics processing: byObsolescence=%b, cfg=%s, partToProcess = %s, topVer=%s",
+                    byObsolescence, cfg, partsToProcess, topVer));
+            }
+
+            LocalStatisticsGatheringContext newCtx = new LocalStatisticsGatheringContext(byObsolescence, tbl, cfg,
+                partsToProcess, topVer);
+            LocalStatisticsGatheringContext registeredCtx = registerNewTask(newCtx);
+
+            if (registeredCtx != null) {
+

Review comment:
       between two blocks if and if. Or what are you mean?

##########
File path: modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsInMemoryStoreImpl.java
##########
@@ -203,11 +239,43 @@ public IgniteStatisticsInMemoryStoreImpl(Function<Class<?>, IgniteLogger> logSup
         Collection<ObjectPartitionStatisticsImpl> statistics
     ) {
         IntMap<ObjectPartitionStatisticsImpl> statisticsMap = new IntHashMap<ObjectPartitionStatisticsImpl>();
+
         for (ObjectPartitionStatisticsImpl s : statistics) {
             if (statisticsMap.put(s.partId(), s) != null)
                 log.warning(String.format("Trying to save more than one %s.%s partition statistics for partition %d",
                     key.schema(), key.obj(), s.partId()));
         }
+
         return statisticsMap;
     }
+
+    /** {@inheritDoc} */
+    @Override public Collection<Integer> loadObsolescenceMap(StatisticsKey key) {
+        Collection<Integer> res[] = new Collection[1];
+        res[0] = new ArrayList<>();

Review comment:
       removed.

##########
File path: modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsRepository.java
##########
@@ -375,6 +292,15 @@ public ObjectStatisticsImpl getLocalStatistics(StatisticsKey key) {
         return locStats.get(key);
     }
 
+    /**
+     * Get all local statistics. Return internal map without copying.
+     *
+     * @return Local (for current node) object statistics.
+     */
+    public Map<StatisticsKey, ObjectStatisticsImpl> getAllLocalStatisticsInt() {

Review comment:
       done

##########
File path: modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsInMemoryStoreImpl.java
##########
@@ -203,11 +239,43 @@ public IgniteStatisticsInMemoryStoreImpl(Function<Class<?>, IgniteLogger> logSup
         Collection<ObjectPartitionStatisticsImpl> statistics
     ) {
         IntMap<ObjectPartitionStatisticsImpl> statisticsMap = new IntHashMap<ObjectPartitionStatisticsImpl>();
+
         for (ObjectPartitionStatisticsImpl s : statistics) {
             if (statisticsMap.put(s.partId(), s) != null)
                 log.warning(String.format("Trying to save more than one %s.%s partition statistics for partition %d",
                     key.schema(), key.obj(), s.partId()));
         }
+
         return statisticsMap;
     }
+
+    /** {@inheritDoc} */
+    @Override public Collection<Integer> loadObsolescenceMap(StatisticsKey key) {
+        Collection<Integer> res[] = new Collection[1];
+        res[0] = new ArrayList<>();
+
+        obsStats.computeIfPresent(key, (k, v) -> {
+            for (Integer partId : v.keys())
+                res[0].add(partId);
+
+            return v;
+        });
+
+        return res[0];
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<Integer> loadLocalPartitionMap(StatisticsKey key) {
+        Collection<Integer> res[] = new Collection[1];
+        res[0] = new ArrayList<>();

Review comment:
       removed

##########
File path: modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsConfigurationManager.java
##########
@@ -144,10 +137,10 @@
         }
     };
 
-    /** Exchange listener. */
+    /** Exchange listener to update all local statistics. */
     private final PartitionsExchangeAware exchAwareLsnr = new PartitionsExchangeAware() {
         @Override public void onDoneAfterTopologyUnlock(GridDhtPartitionsExchangeFuture fut) {

Review comment:
       To not to hold topology. Anyway, we schedule processing asynchronously.

##########
File path: modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsRepository.java
##########
@@ -353,6 +252,7 @@ public ObjectPartitionStatisticsImpl getLocalPartitionStatistics(StatisticsKey k
      */
     public void clearLocalPartitionStatistics(StatisticsKey key, int partId) {

Review comment:
       All other methods use LocalPartition to emphasize that they operate with partition level statistics.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org