You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by dg...@apache.org on 2019/04/16 15:01:53 UTC

[ignite] branch master updated: IGNITE-11641 Fixed server node copies a lot of WAL files in WAL archive after restart

This is an automated email from the ASF dual-hosted git repository.

dgovorukhin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 6670daf  IGNITE-11641 Fixed server node copies a lot of WAL files in WAL archive after restart
6670daf is described below

commit 6670daf79ae41be2adcf75765dd90fcbfa675e30
Author: Dmitriy Govorukhin <dm...@gmail.com>
AuthorDate: Tue Apr 16 18:01:42 2019 +0300

    IGNITE-11641 Fixed server node copies a lot of WAL files in WAL archive after restart
---
 .../persistence/wal/FileWriteAheadLogManager.java  | 167 +++++++++++++------
 .../db/IgnitePdsStartWIthEmptyArchive.java         | 178 +++++++++++++++++++++
 .../ignite/testsuites/IgnitePdsTestSuite4.java     |   2 +
 3 files changed, 301 insertions(+), 46 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index c6cdf7c..cf83e3a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.persistence.wal;
 
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
+import java.io.DataInput;
 import java.io.EOFException;
 import java.io.File;
 import java.io.FileFilter;
@@ -29,6 +30,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.nio.channels.ClosedByInterruptException;
+import java.nio.channels.FileChannel;
 import java.nio.file.FileAlreadyExistsException;
 import java.nio.file.Files;
 import java.sql.Time;
@@ -41,7 +43,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
-import java.util.TreeSet;
+import java.util.TreeMap;
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicLongArray;
@@ -126,6 +128,7 @@ import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
 import static java.nio.file.StandardOpenOption.CREATE;
+import static java.nio.file.StandardOpenOption.CREATE_NEW;
 import static java.nio.file.StandardOpenOption.READ;
 import static java.nio.file.StandardOpenOption.WRITE;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_CHECKPOINT_TRIGGER_ARCHIVE_SIZE_PERCENTAGE;
@@ -137,8 +140,10 @@ import static org.apache.ignite.events.EventType.EVT_WAL_SEGMENT_ARCHIVED;
 import static org.apache.ignite.events.EventType.EVT_WAL_SEGMENT_COMPACTED;
 import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR;
 import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.TMP_SUFFIX;
 import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactory.LATEST_SERIALIZER_VERSION;
 import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.HEADER_RECORD_SIZE;
+import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.readPosition;
 import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.readSegmentHeader;
 
 /**
@@ -445,22 +450,13 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
                     }
                 });
 
-            IgniteBiTuple<Long, Long> tup = scanMinMaxArchiveIndices();
-
             segmentAware = new SegmentAware(dsCfg.getWalSegments(), dsCfg.isWalCompactionEnabled());
 
-            segmentAware.lastTruncatedArchiveIdx(tup == null ? -1 : tup.get1() - 1);
-
-            long lastAbsArchivedIdx = tup == null ? -1 : tup.get2();
-
             if (isArchiverEnabled())
-                archiver = new FileArchiver(lastAbsArchivedIdx, log);
+                archiver = new FileArchiver(segmentAware, log);
             else
                 archiver = null;
 
-            if (lastAbsArchivedIdx > 0)
-                segmentAware.setLastArchivedAbsoluteIndex(lastAbsArchivedIdx);
-
             if (dsCfg.isWalCompactionEnabled()) {
                 compressor = new FileCompressor(log);
 
@@ -1098,41 +1094,36 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
     }
 
     /**
-     * Lists files in archive directory and returns the indices of least and last archived files.
-     * In case of holes, first segment after last "hole" is considered as minimum.
-     * Example: minimum(0, 1, 10, 11, 20, 21, 22) should be 20
-     *
-     * @return The absolute indices of min and max archived files.
+     * @param file File to read.
+     * @param ioFactory IO factory.
      */
-    private IgniteBiTuple<Long, Long> scanMinMaxArchiveIndices() {
-        TreeSet<Long> archiveIndices = new TreeSet<>();
-
-        for (File file : walArchiveDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER)) {
-            try {
-                long idx = Long.parseLong(file.getName().substring(0, 16));
+    private FileDescriptor readFileDescriptor(File file, FileIOFactory ioFactory) {
+        FileDescriptor ds = new FileDescriptor(file);
 
-                archiveIndices.add(idx);
-            }
-            catch (NumberFormatException | IndexOutOfBoundsException ignore) {
-                // No-op.
-            }
-        }
+        try (
+            SegmentIO fileIO = ds.toIO(ioFactory);
+            ByteBufferExpander buf = new ByteBufferExpander(HEADER_RECORD_SIZE, ByteOrder.nativeOrder())
+        ) {
+            final DataInput in = segmentFileInputFactory.createFileInput(fileIO, buf);
 
-        if (archiveIndices.isEmpty())
-            return null;
-        else {
-            Long min = archiveIndices.first();
-            Long max = archiveIndices.last();
+            // Header record must be agnostic to the serializer version.
+            final int type = in.readUnsignedByte();
 
-            if (max - min == archiveIndices.size() - 1)
-                return F.t(min, max); // Short path.
+            if (type == WALRecord.RecordType.STOP_ITERATION_RECORD_TYPE) {
+                if (log.isInfoEnabled())
+                    log.info("Reached logical end of the segment for file " + file);
 
-            for (Long idx : archiveIndices.descendingSet()) {
-                if (!archiveIndices.contains(idx - 1))
-                    return F.t(idx, max);
+                return null;
             }
 
-            throw new IllegalStateException("Should never happen if TreeSet is valid.");
+            FileWALPointer ptr = readPosition(in);
+
+            return new FileDescriptor(file, ptr.index());
+        }
+        catch (IOException e) {
+            U.warn(log, "Failed to read file header [" + file + "]. Skipping this file", e);
+
+            return null;
         }
     }
 
@@ -1476,7 +1467,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         if (log.isDebugEnabled())
             log.debug("Creating new file [exists=" + file.exists() + ", file=" + file.getAbsolutePath() + ']');
 
-        File tmp = new File(file.getParent(), file.getName() + FilePageStoreManager.TMP_SUFFIX);
+        File tmp = new File(file.getParent(), file.getName() + TMP_SUFFIX);
 
         formatFile(tmp);
 
@@ -1618,11 +1609,95 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         /**
          *
          */
-        private FileArchiver(long lastAbsArchivedIdx, IgniteLogger log) {
+        private FileArchiver(SegmentAware segmentAware, IgniteLogger log) throws IgniteCheckedException {
             super(cctx.igniteInstanceName(), "wal-file-archiver%" + cctx.igniteInstanceName(), log,
                 cctx.kernalContext().workersRegistry());
 
-            segmentAware.setLastArchivedAbsoluteIndex(lastAbsArchivedIdx);
+            init(segmentAware);
+        }
+
+        /**
+         * @param segmentAware Segment aware.
+         * @throws IgniteCheckedException If initialization failed.
+         */
+        private void init(SegmentAware segmentAware) throws IgniteCheckedException {
+            IgniteBiTuple<Long, Long> tup = scanMinMaxArchiveIndices();
+
+            segmentAware.lastTruncatedArchiveIdx(tup == null ? -1 : tup.get1() - 1);
+
+            long lastAbsArchivedIdx = tup == null ? -1 : tup.get2();
+
+            if (lastAbsArchivedIdx >= 0)
+                segmentAware.setLastArchivedAbsoluteIndex(lastAbsArchivedIdx);
+        }
+
+        /**
+         * Lists files in archive directory and returns the indices of least and last archived files.
+         * In case of holes, first segment after last "hole" is considered as minimum.
+         * Example: minimum(0, 1, 10, 11, 20, 21, 22) should be 20
+         *
+         * @return The absolute indices of min and max archived files.
+         */
+        private IgniteBiTuple<Long, Long> scanMinMaxArchiveIndices() throws IgniteCheckedException {
+            TreeMap<Long, FileDescriptor> archiveIndices = new TreeMap<>();
+
+            for (File file : walArchiveDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER)) {
+                try {
+                    long idx = Long.parseLong(file.getName().substring(0, 16));
+
+                    FileDescriptor desc = readFileDescriptor(file, ioFactory);
+
+                    if (desc != null) {
+                        if (desc.idx() == idx)
+                            archiveIndices.put(desc.idx(), desc);
+                    }
+                    else
+                        log.warning("Skip file, failed read file header " + file);
+                }
+                catch (NumberFormatException | IndexOutOfBoundsException ignore) {
+                    log.warning("Skip file " + file);
+                }
+            }
+
+            if (!archiveIndices.isEmpty()) {
+                Long min = archiveIndices.navigableKeySet().first();
+                Long max = archiveIndices.navigableKeySet().last();
+
+                if (max - min == archiveIndices.size() - 1)
+                    return F.t(min, max); // Short path.
+
+                // Try to find min and max if we have skipped range semgnets in archive. Find firs gap.
+                for (Long idx : archiveIndices.descendingKeySet()) {
+                    if (!archiveIndices.keySet().contains(idx - 1))
+                        return F.t(idx, max);
+                }
+
+                throw new IllegalStateException("Should never happen if archiveIndices TreeMap is valid.");
+            }
+
+            // If WAL archive is empty, try to find last not archived segment in work directory and copy to WAL archive.
+            TreeMap<Long, FileDescriptor> workIndices = new TreeMap<>();
+
+            for (File file : walWorkDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER)) {
+                FileDescriptor desc = readFileDescriptor(file, ioFactory);
+
+                if (desc != null)
+                    workIndices.put(desc.idx(), desc);
+            }
+
+            if (!workIndices.isEmpty()) {
+                FileDescriptor first = workIndices.firstEntry().getValue();
+                FileDescriptor last = workIndices.lastEntry().getValue();
+
+                if (first.idx() != last.idx()) {
+                    archiveSegment(first.idx());
+
+                    // Use copied segment as min archived segment.
+                    return F.t(first.idx(), first.idx());
+                }
+            }
+
+            return null;
         }
 
         /**
@@ -1799,14 +1874,14 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
          *
          * @param absIdx Absolute index to archive.
          */
-        private SegmentArchiveResult archiveSegment(long absIdx) throws StorageException {
+        public SegmentArchiveResult archiveSegment(long absIdx) throws StorageException {
             long segIdx = absIdx % dsCfg.getWalSegments();
 
             File origFile = new File(walWorkDir, FileDescriptor.fileName(segIdx));
 
             String name = FileDescriptor.fileName(absIdx);
 
-            File dstTmpFile = new File(walArchiveDir, name + FilePageStoreManager.TMP_SUFFIX);
+            File dstTmpFile = new File(walArchiveDir, name + TMP_SUFFIX);
 
             File dstFile = new File(walArchiveDir, name);
 
@@ -2037,7 +2112,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
                     deleteObsoleteRawSegments();
 
                     File tmpZip = new File(walArchiveDir, FileDescriptor.fileName(segIdx)
-                            + FilePageStoreManager.ZIP_SUFFIX + FilePageStoreManager.TMP_SUFFIX);
+                            + FilePageStoreManager.ZIP_SUFFIX + TMP_SUFFIX);
 
                     File zip = new File(walArchiveDir, FileDescriptor.fileName(segIdx) + FilePageStoreManager.ZIP_SUFFIX);
 
@@ -2225,7 +2300,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
                         File zip = new File(walArchiveDir, FileDescriptor.fileName(segmentToDecompress)
                             + FilePageStoreManager.ZIP_SUFFIX);
                         File unzipTmp = new File(walArchiveDir, FileDescriptor.fileName(segmentToDecompress)
-                            + FilePageStoreManager.TMP_SUFFIX);
+                            + TMP_SUFFIX);
                         File unzip = new File(walArchiveDir, FileDescriptor.fileName(segmentToDecompress));
 
                         try (ZipInputStream zis = new ZipInputStream(new BufferedInputStream(new FileInputStream(zip)));
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsStartWIthEmptyArchive.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsStartWIthEmptyArchive.java
new file mode 100644
index 0000000..5104e65
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsStartWIthEmptyArchive.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.db;
+
+import java.io.File;
+import java.util.Arrays;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.WalSegmentArchivedEvent;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
+import org.apache.ignite.internal.processors.cache.persistence.wal.aware.SegmentAware;
+import org.apache.ignite.internal.processors.cache.persistence.wal.filehandle.FileWriteHandle;
+import org.apache.ignite.internal.util.future.CountDownFuture;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.ignite.events.EventType.EVT_WAL_SEGMENT_ARCHIVED;
+import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER;
+
+/**
+ *
+ */
+public class IgnitePdsStartWIthEmptyArchive extends GridCommonAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.setConsistentId(igniteInstanceName);
+
+        cfg.setCacheConfiguration(
+            new CacheConfiguration(DEFAULT_CACHE_NAME)
+        );
+
+        cfg.setDataStorageConfiguration(
+            new DataStorageConfiguration()
+                // Checkpoint should not remove any WAL archive files.
+                .setMaxWalArchiveSize(Long.MAX_VALUE)
+                .setDefaultDataRegionConfiguration(
+                    new DataRegionConfiguration()
+                        .setPersistenceEnabled(true)
+                )
+        );
+
+        return cfg;
+    }
+
+    @Before
+    public void before() throws Exception {
+        stopAllGrids();
+
+        cleanPersistenceDir();
+    }
+
+    @Test
+    public void test() throws Exception {
+        IgniteEx ig = startGrid(0);
+
+        ig.cluster().active(true);
+
+        FileWriteAheadLogManager walMgr = (FileWriteAheadLogManager)ig.context().cache().context().wal();
+
+        // Populate data for generate WAL archive segments.
+        try (IgniteDataStreamer<Integer, byte[]> st = ig.dataStreamer(DEFAULT_CACHE_NAME)) {
+            int entries = 1000;
+
+            for (int i = 0; i < entries; i++) {
+                st.addData(i, new byte[1024 * 1024]);
+            }
+        }
+
+        File archiveDir = U.field(walMgr, "walArchiveDir");
+
+        stopGrid(0, false);
+
+        SegmentAware beforeSaw = U.field(walMgr, "segmentAware");
+
+        long beforeLastArchivedAbsoluteIndex = beforeSaw.lastArchivedAbsoluteIndex();
+
+        FileWriteHandle fhBefore = U.field(walMgr, "currHnd");
+
+        long idxBefore = fhBefore.getSegmentId();
+
+        File[] files = archiveDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER);
+
+        Arrays.sort(files);
+
+        // Cleanup archive directory.
+        for (File f : files) {
+            if (f.delete())
+                log.info("File " + f.getAbsolutePath() + " deleted");
+        }
+
+        Assert.assertEquals(0, archiveDir.listFiles().length);
+
+        // Restart grid again after archive was removed.
+        ig = startGrid(0);
+
+        walMgr = (FileWriteAheadLogManager)ig.context().cache().context().wal();
+
+        SegmentAware afterSaw = U.field(walMgr, "segmentAware");
+
+        long afterLastArchivedAbsoluteIndex = afterSaw.lastArchivedAbsoluteIndex();
+
+        int segments = ig.configuration().getDataStorageConfiguration().getWalSegments();
+
+        Assert.assertTrue(
+            "lastArchivedBeforeIdx=" + beforeLastArchivedAbsoluteIndex +
+                ", lastArchivedAfterIdx=" + afterLastArchivedAbsoluteIndex + ",  segments=" + segments,
+            afterLastArchivedAbsoluteIndex >=
+            (beforeLastArchivedAbsoluteIndex - segments));
+
+        ig.cluster().active(true);
+
+        FileWriteHandle fhAfter = U.field(walMgr, "currHnd");
+
+        Assert.assertNotNull(fhAfter);
+
+        long idxAfter = fhAfter.getSegmentId();
+
+        Assert.assertEquals(idxBefore, idxAfter);
+        Assert.assertTrue(idxAfter >= beforeLastArchivedAbsoluteIndex);
+
+        // Future for await all current available semgment will be archived.
+        CountDownFuture awaitAchviedSegmentsLatch = new CountDownFuture(
+            // One is a last archived, secod is a current write segment.
+            (int)(idxAfter - afterLastArchivedAbsoluteIndex - 2)
+        );
+
+        log.info("currentIdx=" + idxAfter + ", lastArchivedBeforeIdx=" + beforeLastArchivedAbsoluteIndex +
+            ", lastArchivedAfteridx=" + afterLastArchivedAbsoluteIndex + ",  segments=" + segments);
+
+        ig.events().localListen(e -> {
+            WalSegmentArchivedEvent archComplEvt = (WalSegmentArchivedEvent)e;
+
+            log.info("EVT_WAL_SEGMENT_ARCHIVED:" + archComplEvt.getAbsWalSegmentIdx());
+
+            if (archComplEvt.getAbsWalSegmentIdx() > afterLastArchivedAbsoluteIndex){
+                awaitAchviedSegmentsLatch.onDone();
+
+                return true;
+            }
+
+            if (archComplEvt.getAbsWalSegmentIdx() < afterLastArchivedAbsoluteIndex){
+                awaitAchviedSegmentsLatch.onDone(new IgniteException("Unexected segment for archivation. idx="
+                    + archComplEvt.getAbsWalSegmentIdx()));
+
+                return false;
+            }
+
+            return true;
+        }, EVT_WAL_SEGMENT_ARCHIVED);
+
+        awaitAchviedSegmentsLatch.get();
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
index 5e48ee4..72967c0 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
@@ -30,6 +30,7 @@ import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsTaskCanc
 import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsCacheWalDisabledOnRebalancingTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsPageEvictionDuringPartitionClearTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsPartitionPreloadTest;
+import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsStartWIthEmptyArchive;
 import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsTransactionsHangTest;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileDownloaderTest;
 import org.apache.ignite.testframework.GridTestUtils;
@@ -63,6 +64,7 @@ public class IgnitePdsTestSuite4 {
         GridTestUtils.addTestIfNeeded(suite, IgniteRebalanceOnCachesStoppingOrDestroyingTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, CachePageWriteLockUnlockTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, IgnitePdsCacheWalDisabledOnRebalancingTest.class, ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, IgnitePdsStartWIthEmptyArchive.class, ignoredTests);
 
         return suite;
     }