You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2021/03/18 12:10:41 UTC

[ignite] branch master updated: IGNITE-14170 New metrics for number of bytes written to WAL log and compressed in archive - Fixes #8794.

This is an automated email from the ASF dual-hosted git repository.

sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 90aeb56  IGNITE-14170 New metrics for number of bytes written to WAL log and compressed in archive - Fixes #8794.
90aeb56 is described below

commit 90aeb56b3f8aae2e452243e8c0ce2c08e8ebb6fe
Author: Kirill Tkalenko <tk...@yandex.ru>
AuthorDate: Thu Mar 18 14:41:24 2021 +0300

    IGNITE-14170 New metrics for number of bytes written to WAL log and compressed in archive - Fixes #8794.
    
    Signed-off-by: Sergey Chugunov <se...@gmail.com>
---
 .../java/org/apache/ignite/DataStorageMetrics.java |  14 ++
 .../cache/persistence/DataStorageMetricsImpl.java  |  57 ++++++++-
 .../persistence/DataStorageMetricsSnapshot.java    |  18 +++
 .../persistence/wal/FileWriteAheadLogManager.java  |  13 +-
 .../ignite/mxbean/DataStorageMetricsMXBean.java    |   8 ++
 .../IgniteDataStorageMetricsSelfTest.java          | 142 +++++++++++++++++++++
 6 files changed, 245 insertions(+), 7 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/DataStorageMetrics.java b/modules/core/src/main/java/org/apache/ignite/DataStorageMetrics.java
index b54d2b3..1a549c5 100644
--- a/modules/core/src/main/java/org/apache/ignite/DataStorageMetrics.java
+++ b/modules/core/src/main/java/org/apache/ignite/DataStorageMetrics.java
@@ -249,4 +249,18 @@ public interface DataStorageMetrics {
      *         or negative value is not supported.
      */
     public long getSparseStorageSize();
+
+    /**
+     * Getting the total number of logged bytes into the WAL.
+     *
+     * @return Number of bytes.
+     */
+    long getWalWrittenBytes();
+
+    /**
+     * Getting the total size of the compressed segments in bytes.
+     *
+     * @return Number of bytes.
+     */
+    long getWalCompressedBytes();
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DataStorageMetricsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DataStorageMetricsImpl.java
index 1083a50..73c8dc98 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DataStorageMetricsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DataStorageMetricsImpl.java
@@ -25,10 +25,12 @@ import org.apache.ignite.internal.processors.metric.MetricRegistry;
 import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric;
 import org.apache.ignite.internal.processors.metric.impl.HistogramMetricImpl;
 import org.apache.ignite.internal.processors.metric.impl.HitRateMetric;
+import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteOutClosure;
 import org.apache.ignite.mxbean.DataStorageMetricsMXBean;
+import org.jetbrains.annotations.Nullable;
 
 /**
  *
@@ -106,8 +108,8 @@ public class DataStorageMetricsImpl implements DataStorageMetricsMXBean {
     /** */
     private volatile boolean metricsEnabled;
 
-    /** */
-    private volatile IgniteWriteAheadLogManager wal;
+    /** WAL manager. */
+    @Nullable private volatile IgniteWriteAheadLogManager wal;
 
     /** */
     private volatile IgniteOutClosure<Long> walSizeProvider;
@@ -160,6 +162,12 @@ public class DataStorageMetricsImpl implements DataStorageMetricsMXBean {
     /** */
     private final HistogramMetricImpl cpHistogram;
 
+    /** Total number of logged bytes into the WAL. */
+    private final LongAdderMetric walWrittenBytes;
+
+    /** Total size of the compressed segments in bytes. */
+    private final LongAdderMetric walCompressedBytes;
+
     /**
      * @param mmgr Metrics manager.
      * @param metricsEnabled Metrics enabled flag.
@@ -308,6 +316,16 @@ public class DataStorageMetricsImpl implements DataStorageMetricsMXBean {
 
         cpHistogram = mreg.histogram("CheckpointHistogram", cpBounds,
                 "Histogram of checkpoint duration in milliseconds.");
+
+        walWrittenBytes = mreg.longAdderMetric(
+            "WalWrittenBytes",
+            "Total number of logged bytes into the WAL."
+        );
+
+        walCompressedBytes = mreg.longAdderMetric(
+            "WalCompressedBytes",
+            "Total size of the compressed segments in bytes."
+        );
     }
 
     /** {@inheritDoc} */
@@ -331,7 +349,9 @@ public class DataStorageMetricsImpl implements DataStorageMetricsMXBean {
         if (!metricsEnabled)
             return 0;
 
-        return wal.walArchiveSegments();
+        IgniteWriteAheadLogManager walMgr = this.wal;
+
+        return walMgr == null ? 0 : walMgr.walArchiveSegments();
     }
 
     /** {@inheritDoc} */
@@ -785,10 +805,14 @@ public class DataStorageMetricsImpl implements DataStorageMetricsMXBean {
     }
 
     /**
+     * Callback on logging a record to a WAL.
      *
+     * @param size Record size in bytes.
      */
-    public void onWalRecordLogged() {
+    public void onWalRecordLogged(long size) {
         walLoggingRate.increment();
+
+        walWrittenBytes.add(size);
     }
 
     /**
@@ -826,4 +850,29 @@ public class DataStorageMetricsImpl implements DataStorageMetricsMXBean {
         walFsyncTimeDuration.reset(rateTimeInterval, subInts);
         walFsyncTimeNum.reset(rateTimeInterval, subInts);
     }
+
+    /** {@inheritDoc} */
+    @Override public long getWalWrittenBytes() {
+        if (!metricsEnabled)
+            return 0;
+
+        return walWrittenBytes.value();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getWalCompressedBytes() {
+        if (!metricsEnabled)
+            return 0;
+
+        return walCompressedBytes.value();
+    }
+
+    /**
+     * Callback on compression of a WAL segment.
+     *
+     * @param size Size of the compressed segment in bytes.
+     */
+    public void onWalSegmentCompressed(long size) {
+        walCompressedBytes.add(size);
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DataStorageMetricsSnapshot.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DataStorageMetricsSnapshot.java
index a698508..7f65952 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DataStorageMetricsSnapshot.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DataStorageMetricsSnapshot.java
@@ -120,6 +120,12 @@ public class DataStorageMetricsSnapshot implements DataStorageMetrics {
     /** */
     private long sparseStorageSize;
 
+    /** Total number of logged bytes into the WAL. */
+    private long walWrittenBytes;
+
+    /** Total size of the compressed segments in bytes. */
+    private long walCompressedBytes;
+
     /**
      * @param metrics Metrics.
      */
@@ -153,6 +159,8 @@ public class DataStorageMetricsSnapshot implements DataStorageMetrics {
         totalAllocatedSize = metrics.getTotalAllocatedSize();
         storageSize = metrics.getStorageSize();
         sparseStorageSize = metrics.getSparseStorageSize();
+        walWrittenBytes = metrics.getWalWrittenBytes();
+        walCompressedBytes = metrics.getWalCompressedBytes();
     }
 
     /** {@inheritDoc} */
@@ -301,6 +309,16 @@ public class DataStorageMetricsSnapshot implements DataStorageMetrics {
     }
 
     /** {@inheritDoc} */
+    @Override public long getWalWrittenBytes() {
+        return walWrittenBytes;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getWalCompressedBytes() {
+        return walCompressedBytes;
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(DataStorageMetricsSnapshot.class, this);
     }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index ebd859d..607561d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -898,7 +898,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
             }
 
             if (ptr != null) {
-                metrics.onWalRecordLogged();
+                metrics.onWalRecordLogged(rec.size());
 
                 if (walAutoArchiveAfterInactivity > 0)
                     lastRecordLoggedMs.set(U.currentTimeMillis());
@@ -2115,6 +2115,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
 
             if (alreadyCompressed.length > 0)
                 segmentAware.onSegmentCompressed(alreadyCompressed[alreadyCompressed.length - 1].idx());
+
+            for (FileDescriptor fd : alreadyCompressed)
+                metrics.onWalSegmentCompressed(fd.file().length());
         }
 
         /**
@@ -2238,8 +2241,12 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
                             f0.force();
                         }
 
-                        segmentSize.put(segIdx, zip.length());
-                        segmentAware.addCurrentWalArchiveSize(zip.length());
+                        long zipLen = zip.length();
+
+                        segmentSize.put(segIdx, zipLen);
+                        segmentAware.addCurrentWalArchiveSize(zipLen);
+
+                        metrics.onWalSegmentCompressed(zipLen);
 
                         segmentAware.onSegmentCompressed(segIdx);
 
diff --git a/modules/core/src/main/java/org/apache/ignite/mxbean/DataStorageMetricsMXBean.java b/modules/core/src/main/java/org/apache/ignite/mxbean/DataStorageMetricsMXBean.java
index 3b44828..c038a67 100644
--- a/modules/core/src/main/java/org/apache/ignite/mxbean/DataStorageMetricsMXBean.java
+++ b/modules/core/src/main/java/org/apache/ignite/mxbean/DataStorageMetricsMXBean.java
@@ -195,4 +195,12 @@ public interface DataStorageMetricsMXBean extends DataStorageMetrics {
     /** {@inheritDoc} */
     @MXBeanDescription("Storage space allocated adjusted for possible sparsity, in bytes.")
     @Override long getSparseStorageSize();
+
+    /** {@inheritDoc} */
+    @MXBeanDescription("Getting the total number of logged bytes into the WAL.")
+    @Override long getWalWrittenBytes();
+
+    /** {@inheritDoc} */
+    @MXBeanDescription("Getting the total size of the compressed segments in bytes.")
+    @Override long getWalCompressedBytes();
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteDataStorageMetricsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteDataStorageMetricsSelfTest.java
index a06cad1..0be8220 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteDataStorageMetricsSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteDataStorageMetricsSelfTest.java
@@ -20,8 +20,10 @@ package org.apache.ignite.internal.processors.cache.persistence;
 import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Objects;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.UnaryOperator;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import org.apache.ignite.DataRegionMetrics;
@@ -38,21 +40,31 @@ import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.WALMode;
 import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
+import org.apache.ignite.internal.processors.cache.WalStateManager.WALDisableContext;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileDescriptor;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
 import org.apache.ignite.internal.processors.metric.MetricRegistry;
 import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric;
+import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.PAX;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.mxbean.DataStorageMetricsMXBean;
 import org.apache.ignite.spi.metric.HistogramMetric;
 import org.apache.ignite.testframework.ListeningTestLogger;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.Test;
 
+import static java.util.Collections.emptyList;
 import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 import static org.apache.ignite.cluster.ClusterState.ACTIVE;
 import static org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl.DATASTORAGE_METRIC_PREFIX;
+import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.HEADER_RECORD_SIZE;
+import static org.apache.ignite.testframework.GridTestUtils.setFieldValue;
 import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
 
 /**
@@ -305,6 +317,82 @@ public class IgniteDataStorageMetricsSelfTest extends GridCommonAbstractTest {
     }
 
     /**
+     * Checking that the metrics of the total logged bytes are working correctly.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testWalWrittenBytes() throws Exception {
+        IgniteEx n = startGrid(0, (UnaryOperator<IgniteConfiguration>)cfg -> {
+            cfg.getDataStorageConfiguration().setWalSegmentSize((int)(2 * U.MB));
+
+            return cfg;
+        });
+
+        n.cluster().state(ACTIVE);
+        awaitPartitionMapExchange();
+
+        for (int i = 0; i < 10; i++)
+            n.cache("cache").put(ThreadLocalRandom.current().nextLong(), new byte[(int)(32 * U.KB)]);
+
+        WALDisableContext walDisableCtx = n.context().cache().context().walState().walDisableContext();
+        assertNotNull(walDisableCtx);
+
+        setFieldValue(walDisableCtx, "disableWal", true);
+
+        assertTrue(walDisableCtx.check());
+        assertNull(walMgr(n).log(new DataRecord(emptyList())));
+
+        assertEquals(-1, walMgr(n).lastArchivedSegment());
+
+        long exp = walMgr(n).lastWritePointer().fileOffset() - HEADER_RECORD_SIZE;
+
+        assertEquals(exp, dbMgr(n).persistentStoreMetrics().getWalWrittenBytes());
+        assertEquals(exp, dsMetricsMXBean(n).getWalWrittenBytes());
+        assertEquals(exp, ((LongAdderMetric)dsMetricRegistry(n).findMetric("WalWrittenBytes")).value());
+    }
+
+    /**
+     * Checking that the metrics of the total size compressed segment are working correctly.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testWalCompressedBytes() throws Exception {
+        IgniteEx n0 = startGrid(0, (UnaryOperator<IgniteConfiguration>)cfg -> {
+            cfg.getDataStorageConfiguration().setWalCompactionEnabled(true).setWalSegmentSize((int)(2 * U.MB));
+
+            return cfg;
+        });
+
+        n0.cluster().state(ACTIVE);
+        awaitPartitionMapExchange();
+
+        while (walMgr(n0).lastArchivedSegment() < 3)
+            n0.cache("cache").put(ThreadLocalRandom.current().nextLong(), new byte[(int)(32 * U.KB)]);
+
+        waitForCondition(
+            () -> walMgr(n0).lastArchivedSegment() == walMgr(n0).lastCompactedSegment(),
+            getTestTimeout()
+        );
+
+        assertCorrectWalCompressedBytesMetrics(n0);
+
+        stopAllGrids();
+
+        IgniteEx n1 = startGrid(0, (UnaryOperator<IgniteConfiguration>)cfg -> {
+            cfg.getDataStorageConfiguration().setWalCompactionEnabled(true);
+
+            return cfg;
+        });
+
+        n1.cluster().state(ACTIVE);
+        awaitPartitionMapExchange();
+
+        assertCorrectWalCompressedBytesMetrics(n1);
+    }
+
+    /**
      *
      */
     static class Person implements Serializable {
@@ -351,4 +439,58 @@ public class IgniteDataStorageMetricsSelfTest extends GridCommonAbstractTest {
             return Objects.hash(fName, lName);
         }
     }
+
+    /**
+     * Getting WAL manger.
+     *
+     * @param n Node.
+     * @return WAL manager.
+     */
+    private FileWriteAheadLogManager walMgr(IgniteEx n) {
+        return (FileWriteAheadLogManager)n.context().cache().context().wal();
+    }
+
+    /**
+     * Getting db manager of node.
+     *
+     * @param n Node.
+     * @return Db manager.
+     */
+    private GridCacheDatabaseSharedManager dbMgr(IgniteEx n) {
+        return (GridCacheDatabaseSharedManager)n.context().cache().context().database();
+    }
+
+    /**
+     * Getting DATASTORAGE_METRIC_PREFIX metric registry.
+     *
+     * @param n Node.
+     * @return Group of metrics.
+     */
+    private MetricRegistry dsMetricRegistry(IgniteEx n) {
+        return n.context().metric().registry(DATASTORAGE_METRIC_PREFIX);
+    }
+
+    /**
+     * Getting data storage MXBean.
+     *
+     * @param n Node.
+     * @return MXBean.
+     */
+    private DataStorageMetricsMXBean dsMetricsMXBean(IgniteEx n) {
+        return getMxBean(n.name(), "Persistent Store", "DataStorageMetrics", DataStorageMetricsMXBean.class);
+    }
+
+    /**
+     * Check that the metric of the total size compressed segment is working correctly.
+     *
+     * @param n Node.
+     */
+    private void assertCorrectWalCompressedBytesMetrics(IgniteEx n) {
+        long exp = Arrays.stream(walMgr(n).walArchiveFiles()).filter(FileDescriptor::isCompressed)
+            .mapToLong(fd -> fd.file().length()).sum();
+
+        assertEquals(exp, dbMgr(n).persistentStoreMetrics().getWalCompressedBytes());
+        assertEquals(exp, dsMetricsMXBean(n).getWalCompressedBytes());
+        assertEquals(exp, ((LongAdderMetric)dsMetricRegistry(n).findMetric("WalCompressedBytes")).value());
+    }
 }