You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2021/03/18 12:10:41 UTC
[ignite] branch master updated: IGNITE-14170 New metrics for number
of bytes written to WAL log and compressed in archive - Fixes #8794.
This is an automated email from the ASF dual-hosted git repository.
sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 90aeb56 IGNITE-14170 New metrics for number of bytes written to WAL log and compressed in archive - Fixes #8794.
90aeb56 is described below
commit 90aeb56b3f8aae2e452243e8c0ce2c08e8ebb6fe
Author: Kirill Tkalenko <tk...@yandex.ru>
AuthorDate: Thu Mar 18 14:41:24 2021 +0300
IGNITE-14170 New metrics for number of bytes written to WAL log and compressed in archive - Fixes #8794.
Signed-off-by: Sergey Chugunov <se...@gmail.com>
---
.../java/org/apache/ignite/DataStorageMetrics.java | 14 ++
.../cache/persistence/DataStorageMetricsImpl.java | 57 ++++++++-
.../persistence/DataStorageMetricsSnapshot.java | 18 +++
.../persistence/wal/FileWriteAheadLogManager.java | 13 +-
.../ignite/mxbean/DataStorageMetricsMXBean.java | 8 ++
.../IgniteDataStorageMetricsSelfTest.java | 142 +++++++++++++++++++++
6 files changed, 245 insertions(+), 7 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/DataStorageMetrics.java b/modules/core/src/main/java/org/apache/ignite/DataStorageMetrics.java
index b54d2b3..1a549c5 100644
--- a/modules/core/src/main/java/org/apache/ignite/DataStorageMetrics.java
+++ b/modules/core/src/main/java/org/apache/ignite/DataStorageMetrics.java
@@ -249,4 +249,18 @@ public interface DataStorageMetrics {
* or negative value is not supported.
*/
public long getSparseStorageSize();
+
+ /**
+ * Getting the total number of logged bytes into the WAL.
+ *
+ * @return Number of bytes.
+ */
+ long getWalWrittenBytes();
+
+ /**
+ * Getting the total size of the compressed segments in bytes.
+ *
+ * @return Number of bytes.
+ */
+ long getWalCompressedBytes();
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DataStorageMetricsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DataStorageMetricsImpl.java
index 1083a50..73c8dc98 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DataStorageMetricsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DataStorageMetricsImpl.java
@@ -25,10 +25,12 @@ import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric;
import org.apache.ignite.internal.processors.metric.impl.HistogramMetricImpl;
import org.apache.ignite.internal.processors.metric.impl.HitRateMetric;
+import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteOutClosure;
import org.apache.ignite.mxbean.DataStorageMetricsMXBean;
+import org.jetbrains.annotations.Nullable;
/**
*
@@ -106,8 +108,8 @@ public class DataStorageMetricsImpl implements DataStorageMetricsMXBean {
/** */
private volatile boolean metricsEnabled;
- /** */
- private volatile IgniteWriteAheadLogManager wal;
+ /** WAL manager. */
+ @Nullable private volatile IgniteWriteAheadLogManager wal;
/** */
private volatile IgniteOutClosure<Long> walSizeProvider;
@@ -160,6 +162,12 @@ public class DataStorageMetricsImpl implements DataStorageMetricsMXBean {
/** */
private final HistogramMetricImpl cpHistogram;
+ /** Total number of logged bytes into the WAL. */
+ private final LongAdderMetric walWrittenBytes;
+
+ /** Total size of the compressed segments in bytes. */
+ private final LongAdderMetric walCompressedBytes;
+
/**
* @param mmgr Metrics manager.
* @param metricsEnabled Metrics enabled flag.
@@ -308,6 +316,16 @@ public class DataStorageMetricsImpl implements DataStorageMetricsMXBean {
cpHistogram = mreg.histogram("CheckpointHistogram", cpBounds,
"Histogram of checkpoint duration in milliseconds.");
+
+ walWrittenBytes = mreg.longAdderMetric(
+ "WalWrittenBytes",
+ "Total number of logged bytes into the WAL."
+ );
+
+ walCompressedBytes = mreg.longAdderMetric(
+ "WalCompressedBytes",
+ "Total size of the compressed segments in bytes."
+ );
}
/** {@inheritDoc} */
@@ -331,7 +349,9 @@ public class DataStorageMetricsImpl implements DataStorageMetricsMXBean {
if (!metricsEnabled)
return 0;
- return wal.walArchiveSegments();
+ IgniteWriteAheadLogManager walMgr = this.wal;
+
+ return walMgr == null ? 0 : walMgr.walArchiveSegments();
}
/** {@inheritDoc} */
@@ -785,10 +805,14 @@ public class DataStorageMetricsImpl implements DataStorageMetricsMXBean {
}
/**
+ * Callback on logging a record to a WAL.
*
+ * @param size Record size in bytes.
*/
- public void onWalRecordLogged() {
+ public void onWalRecordLogged(long size) {
walLoggingRate.increment();
+
+ walWrittenBytes.add(size);
}
/**
@@ -826,4 +850,29 @@ public class DataStorageMetricsImpl implements DataStorageMetricsMXBean {
walFsyncTimeDuration.reset(rateTimeInterval, subInts);
walFsyncTimeNum.reset(rateTimeInterval, subInts);
}
+
+ /** {@inheritDoc} */
+ @Override public long getWalWrittenBytes() {
+ if (!metricsEnabled)
+ return 0;
+
+ return walWrittenBytes.value();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getWalCompressedBytes() {
+ if (!metricsEnabled)
+ return 0;
+
+ return walCompressedBytes.value();
+ }
+
+ /**
+ * Callback on compression of a WAL segment.
+ *
+ * @param size Size of the compressed segment in bytes.
+ */
+ public void onWalSegmentCompressed(long size) {
+ walCompressedBytes.add(size);
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DataStorageMetricsSnapshot.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DataStorageMetricsSnapshot.java
index a698508..7f65952 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DataStorageMetricsSnapshot.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DataStorageMetricsSnapshot.java
@@ -120,6 +120,12 @@ public class DataStorageMetricsSnapshot implements DataStorageMetrics {
/** */
private long sparseStorageSize;
+ /** Total number of logged bytes into the WAL. */
+ private long walWrittenBytes;
+
+ /** Total size of the compressed segments in bytes. */
+ private long walCompressedBytes;
+
/**
* @param metrics Metrics.
*/
@@ -153,6 +159,8 @@ public class DataStorageMetricsSnapshot implements DataStorageMetrics {
totalAllocatedSize = metrics.getTotalAllocatedSize();
storageSize = metrics.getStorageSize();
sparseStorageSize = metrics.getSparseStorageSize();
+ walWrittenBytes = metrics.getWalWrittenBytes();
+ walCompressedBytes = metrics.getWalCompressedBytes();
}
/** {@inheritDoc} */
@@ -301,6 +309,16 @@ public class DataStorageMetricsSnapshot implements DataStorageMetrics {
}
/** {@inheritDoc} */
+ @Override public long getWalWrittenBytes() {
+ return walWrittenBytes;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getWalCompressedBytes() {
+ return walCompressedBytes;
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(DataStorageMetricsSnapshot.class, this);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index ebd859d..607561d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -898,7 +898,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
}
if (ptr != null) {
- metrics.onWalRecordLogged();
+ metrics.onWalRecordLogged(rec.size());
if (walAutoArchiveAfterInactivity > 0)
lastRecordLoggedMs.set(U.currentTimeMillis());
@@ -2115,6 +2115,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
if (alreadyCompressed.length > 0)
segmentAware.onSegmentCompressed(alreadyCompressed[alreadyCompressed.length - 1].idx());
+
+ for (FileDescriptor fd : alreadyCompressed)
+ metrics.onWalSegmentCompressed(fd.file().length());
}
/**
@@ -2238,8 +2241,12 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
f0.force();
}
- segmentSize.put(segIdx, zip.length());
- segmentAware.addCurrentWalArchiveSize(zip.length());
+ long zipLen = zip.length();
+
+ segmentSize.put(segIdx, zipLen);
+ segmentAware.addCurrentWalArchiveSize(zipLen);
+
+ metrics.onWalSegmentCompressed(zipLen);
segmentAware.onSegmentCompressed(segIdx);
diff --git a/modules/core/src/main/java/org/apache/ignite/mxbean/DataStorageMetricsMXBean.java b/modules/core/src/main/java/org/apache/ignite/mxbean/DataStorageMetricsMXBean.java
index 3b44828..c038a67 100644
--- a/modules/core/src/main/java/org/apache/ignite/mxbean/DataStorageMetricsMXBean.java
+++ b/modules/core/src/main/java/org/apache/ignite/mxbean/DataStorageMetricsMXBean.java
@@ -195,4 +195,12 @@ public interface DataStorageMetricsMXBean extends DataStorageMetrics {
/** {@inheritDoc} */
@MXBeanDescription("Storage space allocated adjusted for possible sparsity, in bytes.")
@Override long getSparseStorageSize();
+
+ /** {@inheritDoc} */
+ @MXBeanDescription("Getting the total number of logged bytes into the WAL.")
+ @Override long getWalWrittenBytes();
+
+ /** {@inheritDoc} */
+ @MXBeanDescription("Getting the total size of the compressed segments in bytes.")
+ @Override long getWalCompressedBytes();
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteDataStorageMetricsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteDataStorageMetricsSelfTest.java
index a06cad1..0be8220 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteDataStorageMetricsSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteDataStorageMetricsSelfTest.java
@@ -20,8 +20,10 @@ package org.apache.ignite.internal.processors.cache.persistence;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Objects;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.UnaryOperator;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.ignite.DataRegionMetrics;
@@ -38,21 +40,31 @@ import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
+import org.apache.ignite.internal.processors.cache.WalStateManager.WALDisableContext;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileDescriptor;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric;
+import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.PAX;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.mxbean.DataStorageMetricsMXBean;
import org.apache.ignite.spi.metric.HistogramMetric;
import org.apache.ignite.testframework.ListeningTestLogger;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
+import static java.util.Collections.emptyList;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.cluster.ClusterState.ACTIVE;
import static org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl.DATASTORAGE_METRIC_PREFIX;
+import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.HEADER_RECORD_SIZE;
+import static org.apache.ignite.testframework.GridTestUtils.setFieldValue;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
/**
@@ -305,6 +317,82 @@ public class IgniteDataStorageMetricsSelfTest extends GridCommonAbstractTest {
}
/**
+ * Checking that the metrics of the total logged bytes are working correctly.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testWalWrittenBytes() throws Exception {
+ IgniteEx n = startGrid(0, (UnaryOperator<IgniteConfiguration>)cfg -> {
+ cfg.getDataStorageConfiguration().setWalSegmentSize((int)(2 * U.MB));
+
+ return cfg;
+ });
+
+ n.cluster().state(ACTIVE);
+ awaitPartitionMapExchange();
+
+ for (int i = 0; i < 10; i++)
+ n.cache("cache").put(ThreadLocalRandom.current().nextLong(), new byte[(int)(32 * U.KB)]);
+
+ WALDisableContext walDisableCtx = n.context().cache().context().walState().walDisableContext();
+ assertNotNull(walDisableCtx);
+
+ setFieldValue(walDisableCtx, "disableWal", true);
+
+ assertTrue(walDisableCtx.check());
+ assertNull(walMgr(n).log(new DataRecord(emptyList())));
+
+ assertEquals(-1, walMgr(n).lastArchivedSegment());
+
+ long exp = walMgr(n).lastWritePointer().fileOffset() - HEADER_RECORD_SIZE;
+
+ assertEquals(exp, dbMgr(n).persistentStoreMetrics().getWalWrittenBytes());
+ assertEquals(exp, dsMetricsMXBean(n).getWalWrittenBytes());
+ assertEquals(exp, ((LongAdderMetric)dsMetricRegistry(n).findMetric("WalWrittenBytes")).value());
+ }
+
+ /**
+ * Checking that the metrics of the total size compressed segment are working correctly.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testWalCompressedBytes() throws Exception {
+ IgniteEx n0 = startGrid(0, (UnaryOperator<IgniteConfiguration>)cfg -> {
+ cfg.getDataStorageConfiguration().setWalCompactionEnabled(true).setWalSegmentSize((int)(2 * U.MB));
+
+ return cfg;
+ });
+
+ n0.cluster().state(ACTIVE);
+ awaitPartitionMapExchange();
+
+ while (walMgr(n0).lastArchivedSegment() < 3)
+ n0.cache("cache").put(ThreadLocalRandom.current().nextLong(), new byte[(int)(32 * U.KB)]);
+
+ waitForCondition(
+ () -> walMgr(n0).lastArchivedSegment() == walMgr(n0).lastCompactedSegment(),
+ getTestTimeout()
+ );
+
+ assertCorrectWalCompressedBytesMetrics(n0);
+
+ stopAllGrids();
+
+ IgniteEx n1 = startGrid(0, (UnaryOperator<IgniteConfiguration>)cfg -> {
+ cfg.getDataStorageConfiguration().setWalCompactionEnabled(true);
+
+ return cfg;
+ });
+
+ n1.cluster().state(ACTIVE);
+ awaitPartitionMapExchange();
+
+ assertCorrectWalCompressedBytesMetrics(n1);
+ }
+
+ /**
*
*/
static class Person implements Serializable {
@@ -351,4 +439,58 @@ public class IgniteDataStorageMetricsSelfTest extends GridCommonAbstractTest {
return Objects.hash(fName, lName);
}
}
+
+ /**
+ * Getting WAL manger.
+ *
+ * @param n Node.
+ * @return WAL manager.
+ */
+ private FileWriteAheadLogManager walMgr(IgniteEx n) {
+ return (FileWriteAheadLogManager)n.context().cache().context().wal();
+ }
+
+ /**
+ * Getting db manager of node.
+ *
+ * @param n Node.
+ * @return Db manager.
+ */
+ private GridCacheDatabaseSharedManager dbMgr(IgniteEx n) {
+ return (GridCacheDatabaseSharedManager)n.context().cache().context().database();
+ }
+
+ /**
+ * Getting DATASTORAGE_METRIC_PREFIX metric registry.
+ *
+ * @param n Node.
+ * @return Group of metrics.
+ */
+ private MetricRegistry dsMetricRegistry(IgniteEx n) {
+ return n.context().metric().registry(DATASTORAGE_METRIC_PREFIX);
+ }
+
+ /**
+ * Getting data storage MXBean.
+ *
+ * @param n Node.
+ * @return MXBean.
+ */
+ private DataStorageMetricsMXBean dsMetricsMXBean(IgniteEx n) {
+ return getMxBean(n.name(), "Persistent Store", "DataStorageMetrics", DataStorageMetricsMXBean.class);
+ }
+
+ /**
+ * Check that the metric of the total size compressed segment is working correctly.
+ *
+ * @param n Node.
+ */
+ private void assertCorrectWalCompressedBytesMetrics(IgniteEx n) {
+ long exp = Arrays.stream(walMgr(n).walArchiveFiles()).filter(FileDescriptor::isCompressed)
+ .mapToLong(fd -> fd.file().length()).sum();
+
+ assertEquals(exp, dbMgr(n).persistentStoreMetrics().getWalCompressedBytes());
+ assertEquals(exp, dsMetricsMXBean(n).getWalCompressedBytes());
+ assertEquals(exp, ((LongAdderMetric)dsMetricRegistry(n).findMetric("WalCompressedBytes")).value());
+ }
}