You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2019/02/01 01:00:31 UTC

[ignite] branch master updated: IGNITE-9903 Archived WAL segment size is reduced for case when SWITCH_SEGMENT_RECORD is present in the file

This is an automated email from the ASF dual-hosted git repository.

agura pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new cde31aa  IGNITE-9903 Archived WAL segment size is reduced for case when SWITCH_SEGMENT_RECORD is present in the file
cde31aa is described below

commit cde31aa46db3950561322a66c45ad91fb869ddb3
Author: Alexey Stelmak <sp...@gmail.com>
AuthorDate: Fri Feb 1 03:48:53 2019 +0300

    IGNITE-9903 Archived WAL segment size is reduced for case when SWITCH_SEGMENT_RECORD is present in the file
    
    Signed-off-by: Andrey Gura <ag...@apache.org>
---
 .../persistence/wal/FileWriteAheadLogManager.java  | 37 ++++++++-
 .../wal/filehandle/FileWriteHandle.java            |  5 ++
 .../wal/filehandle/FileWriteHandleImpl.java        | 17 +++-
 .../wal/filehandle/FsyncFileWriteHandle.java       |  9 ++
 .../ignite/internal/util/io/GridFileUtils.java     | 87 ++++++++++++++++++++
 .../persistence/db/wal/WalCompactionTest.java      | 95 +++++++++++++++++++++-
 6 files changed, 243 insertions(+), 7 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index dbd2f14..db245fc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -44,6 +44,7 @@ import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicLongArray;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import java.util.regex.Pattern;
 import java.util.stream.Stream;
@@ -89,8 +90,8 @@ import org.apache.ignite.internal.processors.cache.persistence.wal.aware.Segment
 import org.apache.ignite.internal.processors.cache.persistence.wal.crc.FastCrc;
 import org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDataIntegrityViolationException;
 import org.apache.ignite.internal.processors.cache.persistence.wal.filehandle.AbstractFileHandle;
-import org.apache.ignite.internal.processors.cache.persistence.wal.filehandle.FileHandleManagerFactory;
 import org.apache.ignite.internal.processors.cache.persistence.wal.filehandle.FileHandleManager;
+import org.apache.ignite.internal.processors.cache.persistence.wal.filehandle.FileHandleManagerFactory;
 import org.apache.ignite.internal.processors.cache.persistence.wal.filehandle.FileWriteHandle;
 import org.apache.ignite.internal.processors.cache.persistence.wal.io.FileInput;
 import org.apache.ignite.internal.processors.cache.persistence.wal.io.LockedSegmentFileInputFactory;
@@ -107,6 +108,7 @@ import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.io.GridFileUtils;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.CIX1;
 import org.apache.ignite.internal.util.typedef.CO;
@@ -350,6 +352,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
     /** FileHandleManagerFactory. */
     private final FileHandleManagerFactory fileHandleManagerFactory;
 
+    /** Switch segment record offset. */
+    private final AtomicLongArray switchSegmentRecordOffset;
+
     /**
      * @param ctx Kernal context.
      */
@@ -380,6 +385,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         maxSegCountWithoutCheckpoint =
                 (long)((U.adjustedWalHistorySize(dsCfg, log) * CHECKPOINT_TRIGGER_ARCHIVE_SIZE_PERCENTAGE)
                         / dsCfg.getWalSegmentSize());
+
+        switchSegmentRecordOffset = isArchiverEnabled() ? new AtomicLongArray(dsCfg.getWalSegments()) : null;
     }
 
     /**
@@ -1175,6 +1182,12 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
             if (metrics.metricsEnabled())
                 metrics.onWallRollOver();
 
+            if (switchSegmentRecordOffset != null) {
+                int idx = (int)((cur.getSegmentId() + 1) % dsCfg.getWalSegments());
+
+                switchSegmentRecordOffset.set(idx, hnd.getSwitchSegmentRecordOffset());
+            }
+
             FileWriteHandle next = initNextWriteHandle(cur);
 
             next.writeHeader();
@@ -1308,6 +1321,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
 
             boolean interrupted = false;
 
+            if (switchSegmentRecordOffset != null)
+                switchSegmentRecordOffset.set((int)((cur.getSegmentId() + 1) % dsCfg.getWalSegments()), 0);
+
             while (true) {
                 try {
                     fileIO = new SegmentIO(cur.getSegmentId() + 1, ioFactory.create(nextFile));
@@ -1788,7 +1804,24 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
             try {
                 Files.deleteIfExists(dstTmpFile.toPath());
 
-                Files.copy(origFile.toPath(), dstTmpFile.toPath());
+                boolean copied = false;
+
+                if (switchSegmentRecordOffset != null) {
+                    long offs = switchSegmentRecordOffset.get((int)segIdx);
+
+                    if (offs > 0) {
+                        switchSegmentRecordOffset.set((int)segIdx, 0);
+
+                        if (offs < origFile.length()) {
+                            GridFileUtils.copy(ioFactory, origFile, ioFactory, dstTmpFile, offs);
+
+                            copied = true;
+                        }
+                    }
+                }
+
+                if (!copied)
+                    Files.copy(origFile.toPath(), dstTmpFile.toPath());
 
                 Files.move(dstTmpFile.toPath(), dstFile.toPath());
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandle.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandle.java
index 410cd56..11e94e2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandle.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandle.java
@@ -110,4 +110,9 @@ public interface FileWriteHandle {
      * @return Absolute WAL segment file index (incremental counter).
      */
     long getSegmentId();
+
+    /**
+     * @return SwitchSegmentRecord offset (0 if undef)
+     */
+    int getSwitchSegmentRecordOffset();
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandleImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandleImpl.java
index e40ada2..732fa9a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandleImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandleImpl.java
@@ -148,6 +148,9 @@ class FileWriteHandleImpl extends AbstractFileHandle implements FileWriteHandle
     /** WAL writer worker. */
     private final FileHandleManagerImpl.WALWriter walWriter;
 
+    /** Switch segment record offset. */
+    private int switchSegmentRecordOffset;
+
     /**
      * @param cctx Context.
      * @param fileIO I/O file interface to use
@@ -489,8 +492,13 @@ class FileWriteHandleImpl extends AbstractFileHandle implements FileWriteHandle
 
                         WALPointer segRecPtr = addRecord(segmentRecord);
 
-                        if (segRecPtr != null)
-                            fsync((FileWALPointer)segRecPtr);
+                        if (segRecPtr != null) {
+                            FileWALPointer filePtr = (FileWALPointer)segRecPtr;
+
+                            fsync(filePtr);
+
+                            switchSegmentRecordOffset = filePtr.fileOffset() + switchSegmentRecSize;
+                        }
                     }
 
                     if (mmap) {
@@ -597,4 +605,9 @@ class FileWriteHandleImpl extends AbstractFileHandle implements FileWriteHandle
             return "{Failed to read channel position: " + e.getMessage() + '}';
         }
     }
+
+    /** {@inheritDoc} */
+    @Override public int getSwitchSegmentRecordOffset() {
+        return switchSegmentRecordOffset;
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FsyncFileWriteHandle.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FsyncFileWriteHandle.java
index 26ce9d8..dc4f473 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FsyncFileWriteHandle.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FsyncFileWriteHandle.java
@@ -104,6 +104,8 @@ class FsyncFileWriteHandle extends AbstractFileHandle implements FileWriteHandle
     protected final IgniteLogger log;
     /** Fsync delay. */
     private final long fsyncDelay;
+    /** Switch segment record offset. */
+    private int switchSegmentRecordOffset;
 
     /**
      * Thread local byte buffer for saving serialized WAL records chain, see {@link FsyncFileWriteHandle#head}.
@@ -630,6 +632,8 @@ class FsyncFileWriteHandle extends AbstractFileHandle implements FileWriteHandle
                             buf.rewind();
 
                             written += fileIO.writeFully(buf, written);
+
+                            switchSegmentRecordOffset = (int)written;
                         }
                     }
                     catch (IgniteCheckedException e) {
@@ -809,6 +813,11 @@ class FsyncFileWriteHandle extends AbstractFileHandle implements FileWriteHandle
         }
     }
 
+    /** {@inheritDoc} */
+    @Override public int getSwitchSegmentRecordOffset() {
+        return switchSegmentRecordOffset;
+    }
+
     /**
      * Fake record is zero-sized record, which is not stored into file. Fake record is used for storing position in file
      * {@link WALRecord#position()}. Fake record is allowed to have no previous record.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/io/GridFileUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/io/GridFileUtils.java
new file mode 100644
index 0000000..a3caaa5
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/io/GridFileUtils.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.io;
+
+import java.io.File;
+import java.io.IOException;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+
+import static java.nio.file.StandardOpenOption.CREATE;
+import static java.nio.file.StandardOpenOption.READ;
+import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
+import static java.nio.file.StandardOpenOption.WRITE;
+
+/**
+ * General files manipulation utilities.
+ */
+public class GridFileUtils {
+    /** Copy buffer size. */
+    private static final int COPY_BUFFER_SIZE = 1024 * 1024;
+
+    /**
+     * Copy file
+     *
+     * @param src Source.
+     * @param dst Dst.
+     * @param maxBytes Max bytes.
+     */
+    public static void copy(FileIO src, FileIO dst, long maxBytes) throws IOException {
+        assert maxBytes >= 0;
+
+        long bytes = Math.min(src.size(), maxBytes);
+
+        byte[] buf = new byte[COPY_BUFFER_SIZE];
+
+        while (bytes > 0)
+            bytes -= dst.writeFully(buf, 0, src.readFully(buf, 0, (int)Math.min(COPY_BUFFER_SIZE, bytes)));
+
+        dst.force();
+    }
+
+    /**
+     * Copy file
+     *
+     * @param srcFactory Source factory.
+     * @param src Source.
+     * @param dstFactory Dst factory.
+     * @param dst Dst.
+     * @param maxBytes Max bytes.
+     */
+    public static void copy(
+            FileIOFactory srcFactory,
+            File src,
+            FileIOFactory dstFactory,
+            File dst,
+            long maxBytes
+    ) throws IOException {
+        boolean err = true;
+
+        try (FileIO dstIO = dstFactory.create(dst, CREATE, TRUNCATE_EXISTING, WRITE)) {
+            try (FileIO srcIO = srcFactory.create(src, READ)) {
+                copy(srcIO, dstIO, maxBytes);
+
+                err = false;
+            }
+        }
+        finally {
+            if (err)
+                dst.delete();
+        }
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java
index 519f4d2..b156669 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
 import java.util.Comparator;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
@@ -33,7 +34,11 @@ import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.WALMode;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.pagemem.FullPageId;
+import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
+import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord;
 import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot;
+import org.apache.ignite.internal.pagemem.wal.record.RolloverType;
+import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
 import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
 import org.apache.ignite.internal.processors.cache.persistence.wal.FileDescriptor;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -59,6 +64,14 @@ public class WalCompactionTest extends GridCommonAbstractTest {
     /** Wal mode. */
     private WALMode walMode;
 
+    /** */
+    private static class RolloverRecord extends CheckpointRecord {
+        /** */
+        private RolloverRecord() {
+            super(null);
+        }
+    }
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String name) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(name);
@@ -250,6 +263,71 @@ public class WalCompactionTest extends GridCommonAbstractTest {
     }
 
     /**
+     *
+     */
+    @Test
+    public void testOptimizedWalSegments() throws Exception {
+        IgniteConfiguration icfg = getConfiguration(getTestIgniteInstanceName(0));
+
+        icfg.getDataStorageConfiguration().setWalSegmentSize(300_000_000);
+        icfg.getDataStorageConfiguration().setWalSegments(1);
+
+        IgniteEx ig = (IgniteEx)startGrid(getTestIgniteInstanceName(0), optimize(icfg), null);
+
+        ig.cluster().active(true);
+
+        IgniteCache<Integer, byte[]> cache = ig.cache(CACHE_NAME);
+
+        for (int i = 0; i < 2500; i++) { // At least 50MB of raw data in total.
+            final byte[] val = new byte[20000];
+
+            val[i] = 1;
+
+            cache.put(i, val);
+        }
+
+        IgniteWriteAheadLogManager walMgr = ig.context().cache().context().wal();
+
+        IgniteCacheDatabaseSharedManager dbMgr = ig.context().cache().context().database();
+
+        RolloverRecord rec = new RolloverRecord();
+
+        try {
+            dbMgr.checkpointReadLock();
+
+            try {
+                walMgr.log(rec, RolloverType.NEXT_SEGMENT);
+            }
+            finally {
+                dbMgr.checkpointReadUnlock();
+            }
+        }
+        catch (IgniteCheckedException e) {
+            log.error(e.getMessage(), e);
+        }
+
+        long start = System.currentTimeMillis();
+
+        do {
+            Thread.yield();
+        } while(walMgr.lastArchivedSegment() < 0 && (System.currentTimeMillis() - start < 15_000));
+
+        assertTrue(System.currentTimeMillis() - start < 15_000);
+
+        String nodeFolderName = ig.context().pdsFolderResolver().resolveFolders().folderName();
+
+        stopAllGrids();
+
+        File dbDir = U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false);
+        File walDir = new File(dbDir, "wal");
+        File archiveDir = new File(walDir, "archive");
+        File nodeArchiveDir = new File(archiveDir, nodeFolderName);
+        File walSegment = new File(nodeArchiveDir, FileDescriptor.fileName(0));
+
+        assertTrue("" + walSegment.length(), walSegment.length() < 200_000_000);
+    }
+
+    /**
      * Tests that WAL compaction won't be stopped by single broken WAL segment.
      */
     private void testCompressorToleratesEmptyWalSegments(WALMode walMode) throws Exception {
@@ -285,9 +363,20 @@ public class WalCompactionTest extends GridCommonAbstractTest {
         File nodeArchiveDir = new File(archiveDir, nodeFolderName);
         File walSegment = new File(nodeArchiveDir, FileDescriptor.fileName(emptyIdx));
 
-        try (RandomAccessFile raf = new RandomAccessFile(walSegment, "rw")) {
-            raf.setLength(0); // Clear wal segment, but don't delete.
-        }
+        long start = U.currentTimeMillis();
+        do {
+            try (RandomAccessFile raf = new RandomAccessFile(walSegment, "rw")) {
+                raf.setLength(0); // Clear wal segment, but don't delete.
+            }
+
+            if (walSegment.length() == 0)
+                break;
+
+            if (U.currentTimeMillis() - start >= 10000)
+                throw new IgniteCheckedException("Can't trucate: " + walSegment.getAbsolutePath());
+
+            Thread.yield();
+        } while (true);
 
         compactionEnabled = true;