You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by dg...@apache.org on 2019/04/16 15:01:53 UTC
[ignite] branch master updated: IGNITE-11641 Fixed server node
copies a lot of WAL files in WAL archive after restart
This is an automated email from the ASF dual-hosted git repository.
dgovorukhin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 6670daf IGNITE-11641 Fixed server node copies a lot of WAL files in WAL archive after restart
6670daf is described below
commit 6670daf79ae41be2adcf75765dd90fcbfa675e30
Author: Dmitriy Govorukhin <dm...@gmail.com>
AuthorDate: Tue Apr 16 18:01:42 2019 +0300
IGNITE-11641 Fixed server node copies a lot of WAL files in WAL archive after restart
---
.../persistence/wal/FileWriteAheadLogManager.java | 167 +++++++++++++------
.../db/IgnitePdsStartWIthEmptyArchive.java | 178 +++++++++++++++++++++
.../ignite/testsuites/IgnitePdsTestSuite4.java | 2 +
3 files changed, 301 insertions(+), 46 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index c6cdf7c..cf83e3a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.persistence.wal;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
+import java.io.DataInput;
import java.io.EOFException;
import java.io.File;
import java.io.FileFilter;
@@ -29,6 +30,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.ClosedByInterruptException;
+import java.nio.channels.FileChannel;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.sql.Time;
@@ -41,7 +43,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
-import java.util.TreeSet;
+import java.util.TreeMap;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongArray;
@@ -126,6 +128,7 @@ import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import static java.nio.file.StandardOpenOption.CREATE;
+import static java.nio.file.StandardOpenOption.CREATE_NEW;
import static java.nio.file.StandardOpenOption.READ;
import static java.nio.file.StandardOpenOption.WRITE;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_CHECKPOINT_TRIGGER_ARCHIVE_SIZE_PERCENTAGE;
@@ -137,8 +140,10 @@ import static org.apache.ignite.events.EventType.EVT_WAL_SEGMENT_ARCHIVED;
import static org.apache.ignite.events.EventType.EVT_WAL_SEGMENT_COMPACTED;
import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR;
import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.TMP_SUFFIX;
import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactory.LATEST_SERIALIZER_VERSION;
import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.HEADER_RECORD_SIZE;
+import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.readPosition;
import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.readSegmentHeader;
/**
@@ -445,22 +450,13 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
}
});
- IgniteBiTuple<Long, Long> tup = scanMinMaxArchiveIndices();
-
segmentAware = new SegmentAware(dsCfg.getWalSegments(), dsCfg.isWalCompactionEnabled());
- segmentAware.lastTruncatedArchiveIdx(tup == null ? -1 : tup.get1() - 1);
-
- long lastAbsArchivedIdx = tup == null ? -1 : tup.get2();
-
if (isArchiverEnabled())
- archiver = new FileArchiver(lastAbsArchivedIdx, log);
+ archiver = new FileArchiver(segmentAware, log);
else
archiver = null;
- if (lastAbsArchivedIdx > 0)
- segmentAware.setLastArchivedAbsoluteIndex(lastAbsArchivedIdx);
-
if (dsCfg.isWalCompactionEnabled()) {
compressor = new FileCompressor(log);
@@ -1098,41 +1094,36 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
}
/**
- * Lists files in archive directory and returns the indices of least and last archived files.
- * In case of holes, first segment after last "hole" is considered as minimum.
- * Example: minimum(0, 1, 10, 11, 20, 21, 22) should be 20
- *
- * @return The absolute indices of min and max archived files.
+ * @param file File to read.
+ * @param ioFactory IO factory.
*/
- private IgniteBiTuple<Long, Long> scanMinMaxArchiveIndices() {
- TreeSet<Long> archiveIndices = new TreeSet<>();
-
- for (File file : walArchiveDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER)) {
- try {
- long idx = Long.parseLong(file.getName().substring(0, 16));
+ private FileDescriptor readFileDescriptor(File file, FileIOFactory ioFactory) {
+ FileDescriptor ds = new FileDescriptor(file);
- archiveIndices.add(idx);
- }
- catch (NumberFormatException | IndexOutOfBoundsException ignore) {
- // No-op.
- }
- }
+ try (
+ SegmentIO fileIO = ds.toIO(ioFactory);
+ ByteBufferExpander buf = new ByteBufferExpander(HEADER_RECORD_SIZE, ByteOrder.nativeOrder())
+ ) {
+ final DataInput in = segmentFileInputFactory.createFileInput(fileIO, buf);
- if (archiveIndices.isEmpty())
- return null;
- else {
- Long min = archiveIndices.first();
- Long max = archiveIndices.last();
+ // Header record must be agnostic to the serializer version.
+ final int type = in.readUnsignedByte();
- if (max - min == archiveIndices.size() - 1)
- return F.t(min, max); // Short path.
+ if (type == WALRecord.RecordType.STOP_ITERATION_RECORD_TYPE) {
+ if (log.isInfoEnabled())
+ log.info("Reached logical end of the segment for file " + file);
- for (Long idx : archiveIndices.descendingSet()) {
- if (!archiveIndices.contains(idx - 1))
- return F.t(idx, max);
+ return null;
}
- throw new IllegalStateException("Should never happen if TreeSet is valid.");
+ FileWALPointer ptr = readPosition(in);
+
+ return new FileDescriptor(file, ptr.index());
+ }
+ catch (IOException e) {
+ U.warn(log, "Failed to read file header [" + file + "]. Skipping this file", e);
+
+ return null;
}
}
@@ -1476,7 +1467,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
if (log.isDebugEnabled())
log.debug("Creating new file [exists=" + file.exists() + ", file=" + file.getAbsolutePath() + ']');
- File tmp = new File(file.getParent(), file.getName() + FilePageStoreManager.TMP_SUFFIX);
+ File tmp = new File(file.getParent(), file.getName() + TMP_SUFFIX);
formatFile(tmp);
@@ -1618,11 +1609,95 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
/**
*
*/
- private FileArchiver(long lastAbsArchivedIdx, IgniteLogger log) {
+ private FileArchiver(SegmentAware segmentAware, IgniteLogger log) throws IgniteCheckedException {
super(cctx.igniteInstanceName(), "wal-file-archiver%" + cctx.igniteInstanceName(), log,
cctx.kernalContext().workersRegistry());
- segmentAware.setLastArchivedAbsoluteIndex(lastAbsArchivedIdx);
+ init(segmentAware);
+ }
+
+ /**
+ * @param segmentAware Segment aware.
+ * @throws IgniteCheckedException If initialization failed.
+ */
+ private void init(SegmentAware segmentAware) throws IgniteCheckedException {
+ IgniteBiTuple<Long, Long> tup = scanMinMaxArchiveIndices();
+
+ segmentAware.lastTruncatedArchiveIdx(tup == null ? -1 : tup.get1() - 1);
+
+ long lastAbsArchivedIdx = tup == null ? -1 : tup.get2();
+
+ if (lastAbsArchivedIdx >= 0)
+ segmentAware.setLastArchivedAbsoluteIndex(lastAbsArchivedIdx);
+ }
+
+ /**
+ * Lists files in archive directory and returns the indices of least and last archived files.
+ * In case of holes, first segment after last "hole" is considered as minimum.
+ * Example: minimum(0, 1, 10, 11, 20, 21, 22) should be 20
+ *
+ * @return The absolute indices of min and max archived files.
+ */
+ private IgniteBiTuple<Long, Long> scanMinMaxArchiveIndices() throws IgniteCheckedException {
+ TreeMap<Long, FileDescriptor> archiveIndices = new TreeMap<>();
+
+ for (File file : walArchiveDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER)) {
+ try {
+ long idx = Long.parseLong(file.getName().substring(0, 16));
+
+ FileDescriptor desc = readFileDescriptor(file, ioFactory);
+
+ if (desc != null) {
+ if (desc.idx() == idx)
+ archiveIndices.put(desc.idx(), desc);
+ }
+ else
+ log.warning("Skip file, failed read file header " + file);
+ }
+ catch (NumberFormatException | IndexOutOfBoundsException ignore) {
+ log.warning("Skip file " + file);
+ }
+ }
+
+ if (!archiveIndices.isEmpty()) {
+ Long min = archiveIndices.navigableKeySet().first();
+ Long max = archiveIndices.navigableKeySet().last();
+
+ if (max - min == archiveIndices.size() - 1)
+ return F.t(min, max); // Short path.
+
+ // Try to find min and max if we have skipped range semgnets in archive. Find firs gap.
+ for (Long idx : archiveIndices.descendingKeySet()) {
+ if (!archiveIndices.keySet().contains(idx - 1))
+ return F.t(idx, max);
+ }
+
+ throw new IllegalStateException("Should never happen if archiveIndices TreeMap is valid.");
+ }
+
+ // If WAL archive is empty, try to find last not archived segment in work directory and copy to WAL archive.
+ TreeMap<Long, FileDescriptor> workIndices = new TreeMap<>();
+
+ for (File file : walWorkDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER)) {
+ FileDescriptor desc = readFileDescriptor(file, ioFactory);
+
+ if (desc != null)
+ workIndices.put(desc.idx(), desc);
+ }
+
+ if (!workIndices.isEmpty()) {
+ FileDescriptor first = workIndices.firstEntry().getValue();
+ FileDescriptor last = workIndices.lastEntry().getValue();
+
+ if (first.idx() != last.idx()) {
+ archiveSegment(first.idx());
+
+ // Use copied segment as min archived segment.
+ return F.t(first.idx(), first.idx());
+ }
+ }
+
+ return null;
}
/**
@@ -1799,14 +1874,14 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
*
* @param absIdx Absolute index to archive.
*/
- private SegmentArchiveResult archiveSegment(long absIdx) throws StorageException {
+ public SegmentArchiveResult archiveSegment(long absIdx) throws StorageException {
long segIdx = absIdx % dsCfg.getWalSegments();
File origFile = new File(walWorkDir, FileDescriptor.fileName(segIdx));
String name = FileDescriptor.fileName(absIdx);
- File dstTmpFile = new File(walArchiveDir, name + FilePageStoreManager.TMP_SUFFIX);
+ File dstTmpFile = new File(walArchiveDir, name + TMP_SUFFIX);
File dstFile = new File(walArchiveDir, name);
@@ -2037,7 +2112,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
deleteObsoleteRawSegments();
File tmpZip = new File(walArchiveDir, FileDescriptor.fileName(segIdx)
- + FilePageStoreManager.ZIP_SUFFIX + FilePageStoreManager.TMP_SUFFIX);
+ + FilePageStoreManager.ZIP_SUFFIX + TMP_SUFFIX);
File zip = new File(walArchiveDir, FileDescriptor.fileName(segIdx) + FilePageStoreManager.ZIP_SUFFIX);
@@ -2225,7 +2300,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
File zip = new File(walArchiveDir, FileDescriptor.fileName(segmentToDecompress)
+ FilePageStoreManager.ZIP_SUFFIX);
File unzipTmp = new File(walArchiveDir, FileDescriptor.fileName(segmentToDecompress)
- + FilePageStoreManager.TMP_SUFFIX);
+ + TMP_SUFFIX);
File unzip = new File(walArchiveDir, FileDescriptor.fileName(segmentToDecompress));
try (ZipInputStream zis = new ZipInputStream(new BufferedInputStream(new FileInputStream(zip)));
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsStartWIthEmptyArchive.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsStartWIthEmptyArchive.java
new file mode 100644
index 0000000..5104e65
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsStartWIthEmptyArchive.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.db;
+
+import java.io.File;
+import java.util.Arrays;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.WalSegmentArchivedEvent;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
+import org.apache.ignite.internal.processors.cache.persistence.wal.aware.SegmentAware;
+import org.apache.ignite.internal.processors.cache.persistence.wal.filehandle.FileWriteHandle;
+import org.apache.ignite.internal.util.future.CountDownFuture;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.ignite.events.EventType.EVT_WAL_SEGMENT_ARCHIVED;
+import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER;
+
+/**
+ *
+ */
+public class IgnitePdsStartWIthEmptyArchive extends GridCommonAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setConsistentId(igniteInstanceName);
+
+ cfg.setCacheConfiguration(
+ new CacheConfiguration(DEFAULT_CACHE_NAME)
+ );
+
+ cfg.setDataStorageConfiguration(
+ new DataStorageConfiguration()
+ // Checkpoint should not remove any WAL archive files.
+ .setMaxWalArchiveSize(Long.MAX_VALUE)
+ .setDefaultDataRegionConfiguration(
+ new DataRegionConfiguration()
+ .setPersistenceEnabled(true)
+ )
+ );
+
+ return cfg;
+ }
+
+ @Before
+ public void before() throws Exception {
+ stopAllGrids();
+
+ cleanPersistenceDir();
+ }
+
+ @Test
+ public void test() throws Exception {
+ IgniteEx ig = startGrid(0);
+
+ ig.cluster().active(true);
+
+ FileWriteAheadLogManager walMgr = (FileWriteAheadLogManager)ig.context().cache().context().wal();
+
+ // Populate data for generate WAL archive segments.
+ try (IgniteDataStreamer<Integer, byte[]> st = ig.dataStreamer(DEFAULT_CACHE_NAME)) {
+ int entries = 1000;
+
+ for (int i = 0; i < entries; i++) {
+ st.addData(i, new byte[1024 * 1024]);
+ }
+ }
+
+ File archiveDir = U.field(walMgr, "walArchiveDir");
+
+ stopGrid(0, false);
+
+ SegmentAware beforeSaw = U.field(walMgr, "segmentAware");
+
+ long beforeLastArchivedAbsoluteIndex = beforeSaw.lastArchivedAbsoluteIndex();
+
+ FileWriteHandle fhBefore = U.field(walMgr, "currHnd");
+
+ long idxBefore = fhBefore.getSegmentId();
+
+ File[] files = archiveDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER);
+
+ Arrays.sort(files);
+
+ // Cleanup archive directory.
+ for (File f : files) {
+ if (f.delete())
+ log.info("File " + f.getAbsolutePath() + " deleted");
+ }
+
+ Assert.assertEquals(0, archiveDir.listFiles().length);
+
+ // Restart grid again after archive was removed.
+ ig = startGrid(0);
+
+ walMgr = (FileWriteAheadLogManager)ig.context().cache().context().wal();
+
+ SegmentAware afterSaw = U.field(walMgr, "segmentAware");
+
+ long afterLastArchivedAbsoluteIndex = afterSaw.lastArchivedAbsoluteIndex();
+
+ int segments = ig.configuration().getDataStorageConfiguration().getWalSegments();
+
+ Assert.assertTrue(
+ "lastArchivedBeforeIdx=" + beforeLastArchivedAbsoluteIndex +
+ ", lastArchivedAfterIdx=" + afterLastArchivedAbsoluteIndex + ", segments=" + segments,
+ afterLastArchivedAbsoluteIndex >=
+ (beforeLastArchivedAbsoluteIndex - segments));
+
+ ig.cluster().active(true);
+
+ FileWriteHandle fhAfter = U.field(walMgr, "currHnd");
+
+ Assert.assertNotNull(fhAfter);
+
+ long idxAfter = fhAfter.getSegmentId();
+
+ Assert.assertEquals(idxBefore, idxAfter);
+ Assert.assertTrue(idxAfter >= beforeLastArchivedAbsoluteIndex);
+
+ // Future for await all current available semgment will be archived.
+ CountDownFuture awaitAchviedSegmentsLatch = new CountDownFuture(
+ // One is a last archived, secod is a current write segment.
+ (int)(idxAfter - afterLastArchivedAbsoluteIndex - 2)
+ );
+
+ log.info("currentIdx=" + idxAfter + ", lastArchivedBeforeIdx=" + beforeLastArchivedAbsoluteIndex +
+ ", lastArchivedAfteridx=" + afterLastArchivedAbsoluteIndex + ", segments=" + segments);
+
+ ig.events().localListen(e -> {
+ WalSegmentArchivedEvent archComplEvt = (WalSegmentArchivedEvent)e;
+
+ log.info("EVT_WAL_SEGMENT_ARCHIVED:" + archComplEvt.getAbsWalSegmentIdx());
+
+ if (archComplEvt.getAbsWalSegmentIdx() > afterLastArchivedAbsoluteIndex){
+ awaitAchviedSegmentsLatch.onDone();
+
+ return true;
+ }
+
+ if (archComplEvt.getAbsWalSegmentIdx() < afterLastArchivedAbsoluteIndex){
+ awaitAchviedSegmentsLatch.onDone(new IgniteException("Unexected segment for archivation. idx="
+ + archComplEvt.getAbsWalSegmentIdx()));
+
+ return false;
+ }
+
+ return true;
+ }, EVT_WAL_SEGMENT_ARCHIVED);
+
+ awaitAchviedSegmentsLatch.get();
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
index 5e48ee4..72967c0 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
@@ -30,6 +30,7 @@ import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsTaskCanc
import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsCacheWalDisabledOnRebalancingTest;
import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsPageEvictionDuringPartitionClearTest;
import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsPartitionPreloadTest;
+import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsStartWIthEmptyArchive;
import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsTransactionsHangTest;
import org.apache.ignite.internal.processors.cache.persistence.file.FileDownloaderTest;
import org.apache.ignite.testframework.GridTestUtils;
@@ -63,6 +64,7 @@ public class IgnitePdsTestSuite4 {
GridTestUtils.addTestIfNeeded(suite, IgniteRebalanceOnCachesStoppingOrDestroyingTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, CachePageWriteLockUnlockTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, IgnitePdsCacheWalDisabledOnRebalancingTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite, IgnitePdsStartWIthEmptyArchive.class, ignoredTests);
return suite;
}