You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2019/02/01 01:00:31 UTC
[ignite] branch master updated: IGNITE-9903 Archived WAL segment
size is reduced for case when SWITCH_SEGMENT_RECORD is present in the file
This is an automated email from the ASF dual-hosted git repository.
agura pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new cde31aa IGNITE-9903 Archived WAL segment size is reduced for case when SWITCH_SEGMENT_RECORD is present in the file
cde31aa is described below
commit cde31aa46db3950561322a66c45ad91fb869ddb3
Author: Alexey Stelmak <sp...@gmail.com>
AuthorDate: Fri Feb 1 03:48:53 2019 +0300
IGNITE-9903 Archived WAL segment size is reduced for case when SWITCH_SEGMENT_RECORD is present in the file
Signed-off-by: Andrey Gura <ag...@apache.org>
---
.../persistence/wal/FileWriteAheadLogManager.java | 37 ++++++++-
.../wal/filehandle/FileWriteHandle.java | 5 ++
.../wal/filehandle/FileWriteHandleImpl.java | 17 +++-
.../wal/filehandle/FsyncFileWriteHandle.java | 9 ++
.../ignite/internal/util/io/GridFileUtils.java | 87 ++++++++++++++++++++
.../persistence/db/wal/WalCompactionTest.java | 95 +++++++++++++++++++++-
6 files changed, 243 insertions(+), 7 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index dbd2f14..db245fc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -44,6 +44,7 @@ import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicLongArray;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.regex.Pattern;
import java.util.stream.Stream;
@@ -89,8 +90,8 @@ import org.apache.ignite.internal.processors.cache.persistence.wal.aware.Segment
import org.apache.ignite.internal.processors.cache.persistence.wal.crc.FastCrc;
import org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDataIntegrityViolationException;
import org.apache.ignite.internal.processors.cache.persistence.wal.filehandle.AbstractFileHandle;
-import org.apache.ignite.internal.processors.cache.persistence.wal.filehandle.FileHandleManagerFactory;
import org.apache.ignite.internal.processors.cache.persistence.wal.filehandle.FileHandleManager;
+import org.apache.ignite.internal.processors.cache.persistence.wal.filehandle.FileHandleManagerFactory;
import org.apache.ignite.internal.processors.cache.persistence.wal.filehandle.FileWriteHandle;
import org.apache.ignite.internal.processors.cache.persistence.wal.io.FileInput;
import org.apache.ignite.internal.processors.cache.persistence.wal.io.LockedSegmentFileInputFactory;
@@ -107,6 +108,7 @@ import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.io.GridFileUtils;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.CIX1;
import org.apache.ignite.internal.util.typedef.CO;
@@ -350,6 +352,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
/** FileHandleManagerFactory. */
private final FileHandleManagerFactory fileHandleManagerFactory;
+ /** Switch segment record offset. */
+ private final AtomicLongArray switchSegmentRecordOffset;
+
/**
* @param ctx Kernal context.
*/
@@ -380,6 +385,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
maxSegCountWithoutCheckpoint =
(long)((U.adjustedWalHistorySize(dsCfg, log) * CHECKPOINT_TRIGGER_ARCHIVE_SIZE_PERCENTAGE)
/ dsCfg.getWalSegmentSize());
+
+ switchSegmentRecordOffset = isArchiverEnabled() ? new AtomicLongArray(dsCfg.getWalSegments()) : null;
}
/**
@@ -1175,6 +1182,12 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
if (metrics.metricsEnabled())
metrics.onWallRollOver();
+ if (switchSegmentRecordOffset != null) {
+ int idx = (int)((cur.getSegmentId() + 1) % dsCfg.getWalSegments());
+
+ switchSegmentRecordOffset.set(idx, hnd.getSwitchSegmentRecordOffset());
+ }
+
FileWriteHandle next = initNextWriteHandle(cur);
next.writeHeader();
@@ -1308,6 +1321,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
boolean interrupted = false;
+ if (switchSegmentRecordOffset != null)
+ switchSegmentRecordOffset.set((int)((cur.getSegmentId() + 1) % dsCfg.getWalSegments()), 0);
+
while (true) {
try {
fileIO = new SegmentIO(cur.getSegmentId() + 1, ioFactory.create(nextFile));
@@ -1788,7 +1804,24 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
try {
Files.deleteIfExists(dstTmpFile.toPath());
- Files.copy(origFile.toPath(), dstTmpFile.toPath());
+ boolean copied = false;
+
+ if (switchSegmentRecordOffset != null) {
+ long offs = switchSegmentRecordOffset.get((int)segIdx);
+
+ if (offs > 0) {
+ switchSegmentRecordOffset.set((int)segIdx, 0);
+
+ if (offs < origFile.length()) {
+ GridFileUtils.copy(ioFactory, origFile, ioFactory, dstTmpFile, offs);
+
+ copied = true;
+ }
+ }
+ }
+
+ if (!copied)
+ Files.copy(origFile.toPath(), dstTmpFile.toPath());
Files.move(dstTmpFile.toPath(), dstFile.toPath());
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandle.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandle.java
index 410cd56..11e94e2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandle.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandle.java
@@ -110,4 +110,9 @@ public interface FileWriteHandle {
* @return Absolute WAL segment file index (incremental counter).
*/
long getSegmentId();
+
+ /**
+ * @return SwitchSegmentRecord offset (0 if undef)
+ */
+ int getSwitchSegmentRecordOffset();
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandleImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandleImpl.java
index e40ada2..732fa9a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandleImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandleImpl.java
@@ -148,6 +148,9 @@ class FileWriteHandleImpl extends AbstractFileHandle implements FileWriteHandle
/** WAL writer worker. */
private final FileHandleManagerImpl.WALWriter walWriter;
+ /** Switch segment record offset. */
+ private int switchSegmentRecordOffset;
+
/**
* @param cctx Context.
* @param fileIO I/O file interface to use
@@ -489,8 +492,13 @@ class FileWriteHandleImpl extends AbstractFileHandle implements FileWriteHandle
WALPointer segRecPtr = addRecord(segmentRecord);
- if (segRecPtr != null)
- fsync((FileWALPointer)segRecPtr);
+ if (segRecPtr != null) {
+ FileWALPointer filePtr = (FileWALPointer)segRecPtr;
+
+ fsync(filePtr);
+
+ switchSegmentRecordOffset = filePtr.fileOffset() + switchSegmentRecSize;
+ }
}
if (mmap) {
@@ -597,4 +605,9 @@ class FileWriteHandleImpl extends AbstractFileHandle implements FileWriteHandle
return "{Failed to read channel position: " + e.getMessage() + '}';
}
}
+
+ /** {@inheritDoc} */
+ @Override public int getSwitchSegmentRecordOffset() {
+ return switchSegmentRecordOffset;
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FsyncFileWriteHandle.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FsyncFileWriteHandle.java
index 26ce9d8..dc4f473 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FsyncFileWriteHandle.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FsyncFileWriteHandle.java
@@ -104,6 +104,8 @@ class FsyncFileWriteHandle extends AbstractFileHandle implements FileWriteHandle
protected final IgniteLogger log;
/** Fsync delay. */
private final long fsyncDelay;
+ /** Switch segment record offset. */
+ private int switchSegmentRecordOffset;
/**
* Thread local byte buffer for saving serialized WAL records chain, see {@link FsyncFileWriteHandle#head}.
@@ -630,6 +632,8 @@ class FsyncFileWriteHandle extends AbstractFileHandle implements FileWriteHandle
buf.rewind();
written += fileIO.writeFully(buf, written);
+
+ switchSegmentRecordOffset = (int)written;
}
}
catch (IgniteCheckedException e) {
@@ -809,6 +813,11 @@ class FsyncFileWriteHandle extends AbstractFileHandle implements FileWriteHandle
}
}
+ /** {@inheritDoc} */
+ @Override public int getSwitchSegmentRecordOffset() {
+ return switchSegmentRecordOffset;
+ }
+
/**
* Fake record is zero-sized record, which is not stored into file. Fake record is used for storing position in file
* {@link WALRecord#position()}. Fake record is allowed to have no previous record.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/io/GridFileUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/io/GridFileUtils.java
new file mode 100644
index 0000000..a3caaa5
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/io/GridFileUtils.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.io;
+
+import java.io.File;
+import java.io.IOException;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+
+import static java.nio.file.StandardOpenOption.CREATE;
+import static java.nio.file.StandardOpenOption.READ;
+import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
+import static java.nio.file.StandardOpenOption.WRITE;
+
+/**
+ * General files manipulation utilities.
+ */
+public class GridFileUtils {
+ /** Copy buffer size. */
+ private static final int COPY_BUFFER_SIZE = 1024 * 1024;
+
+ /**
+ * Copy file
+ *
+ * @param src Source.
+ * @param dst Dst.
+ * @param maxBytes Max bytes.
+ */
+ public static void copy(FileIO src, FileIO dst, long maxBytes) throws IOException {
+ assert maxBytes >= 0;
+
+ long bytes = Math.min(src.size(), maxBytes);
+
+ byte[] buf = new byte[COPY_BUFFER_SIZE];
+
+ while (bytes > 0)
+ bytes -= dst.writeFully(buf, 0, src.readFully(buf, 0, (int)Math.min(COPY_BUFFER_SIZE, bytes)));
+
+ dst.force();
+ }
+
+ /**
+ * Copy file
+ *
+ * @param srcFactory Source factory.
+ * @param src Source.
+ * @param dstFactory Dst factory.
+ * @param dst Dst.
+ * @param maxBytes Max bytes.
+ */
+ public static void copy(
+ FileIOFactory srcFactory,
+ File src,
+ FileIOFactory dstFactory,
+ File dst,
+ long maxBytes
+ ) throws IOException {
+ boolean err = true;
+
+ try (FileIO dstIO = dstFactory.create(dst, CREATE, TRUNCATE_EXISTING, WRITE)) {
+ try (FileIO srcIO = srcFactory.create(src, READ)) {
+ copy(srcIO, dstIO, maxBytes);
+
+ err = false;
+ }
+ }
+ finally {
+ if (err)
+ dst.delete();
+ }
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java
index 519f4d2..b156669 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
import java.util.Comparator;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
@@ -33,7 +34,11 @@ import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.pagemem.FullPageId;
+import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
+import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord;
import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot;
+import org.apache.ignite.internal.pagemem.wal.record.RolloverType;
+import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
import org.apache.ignite.internal.processors.cache.persistence.wal.FileDescriptor;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -59,6 +64,14 @@ public class WalCompactionTest extends GridCommonAbstractTest {
/** Wal mode. */
private WALMode walMode;
+ /** */
+ private static class RolloverRecord extends CheckpointRecord {
+ /** */
+ private RolloverRecord() {
+ super(null);
+ }
+ }
+
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String name) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(name);
@@ -250,6 +263,71 @@ public class WalCompactionTest extends GridCommonAbstractTest {
}
/**
+ *
+ */
+ @Test
+ public void testOptimizedWalSegments() throws Exception {
+ IgniteConfiguration icfg = getConfiguration(getTestIgniteInstanceName(0));
+
+ icfg.getDataStorageConfiguration().setWalSegmentSize(300_000_000);
+ icfg.getDataStorageConfiguration().setWalSegments(1);
+
+ IgniteEx ig = (IgniteEx)startGrid(getTestIgniteInstanceName(0), optimize(icfg), null);
+
+ ig.cluster().active(true);
+
+ IgniteCache<Integer, byte[]> cache = ig.cache(CACHE_NAME);
+
+ for (int i = 0; i < 2500; i++) { // At least 50MB of raw data in total.
+ final byte[] val = new byte[20000];
+
+ val[i] = 1;
+
+ cache.put(i, val);
+ }
+
+ IgniteWriteAheadLogManager walMgr = ig.context().cache().context().wal();
+
+ IgniteCacheDatabaseSharedManager dbMgr = ig.context().cache().context().database();
+
+ RolloverRecord rec = new RolloverRecord();
+
+ try {
+ dbMgr.checkpointReadLock();
+
+ try {
+ walMgr.log(rec, RolloverType.NEXT_SEGMENT);
+ }
+ finally {
+ dbMgr.checkpointReadUnlock();
+ }
+ }
+ catch (IgniteCheckedException e) {
+ log.error(e.getMessage(), e);
+ }
+
+ long start = System.currentTimeMillis();
+
+ do {
+ Thread.yield();
+ } while(walMgr.lastArchivedSegment() < 0 && (System.currentTimeMillis() - start < 15_000));
+
+ assertTrue(System.currentTimeMillis() - start < 15_000);
+
+ String nodeFolderName = ig.context().pdsFolderResolver().resolveFolders().folderName();
+
+ stopAllGrids();
+
+ File dbDir = U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false);
+ File walDir = new File(dbDir, "wal");
+ File archiveDir = new File(walDir, "archive");
+ File nodeArchiveDir = new File(archiveDir, nodeFolderName);
+ File walSegment = new File(nodeArchiveDir, FileDescriptor.fileName(0));
+
+ assertTrue("" + walSegment.length(), walSegment.length() < 200_000_000);
+ }
+
+ /**
* Tests that WAL compaction won't be stopped by single broken WAL segment.
*/
private void testCompressorToleratesEmptyWalSegments(WALMode walMode) throws Exception {
@@ -285,9 +363,20 @@ public class WalCompactionTest extends GridCommonAbstractTest {
File nodeArchiveDir = new File(archiveDir, nodeFolderName);
File walSegment = new File(nodeArchiveDir, FileDescriptor.fileName(emptyIdx));
- try (RandomAccessFile raf = new RandomAccessFile(walSegment, "rw")) {
- raf.setLength(0); // Clear wal segment, but don't delete.
- }
+ long start = U.currentTimeMillis();
+ do {
+ try (RandomAccessFile raf = new RandomAccessFile(walSegment, "rw")) {
+ raf.setLength(0); // Clear wal segment, but don't delete.
+ }
+
+ if (walSegment.length() == 0)
+ break;
+
+ if (U.currentTimeMillis() - start >= 10000)
+ throw new IgniteCheckedException("Can't trucate: " + walSegment.getAbsolutePath());
+
+ Thread.yield();
+ } while (true);
compactionEnabled = true;