You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/11/27 13:28:04 UTC
[07/13] ignite git commit: ignite-5938 WAL logs compaction and
compression after checkpoint
ignite-5938 WAL logs compaction and compression after checkpoint
Signed-off-by: agura <ag...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f50b2354
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f50b2354
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f50b2354
Branch: refs/heads/ignite-zk
Commit: f50b2354d6b87084f0c29376915821296f3fd480
Parents: 010d02b
Author: Ivan Rakov <iv...@gmail.com>
Authored: Fri Nov 24 18:59:16 2017 +0300
Committer: agura <ag...@apache.org>
Committed: Fri Nov 24 18:59:52 2017 +0300
----------------------------------------------------------------------
...tingToWalV2SerializerWithCompactionTest.java | 250 +++++++
.../IgniteCompatibilityBasicTestSuite.java | 3 +
.../apache/ignite/IgniteSystemProperties.java | 5 +
.../configuration/DataStorageConfiguration.java | 27 +
.../pagemem/wal/IgniteWriteAheadLogManager.java | 8 +
.../pagemem/wal/record/FilteredRecord.java | 31 +
.../pagemem/wal/record/MarshalledRecord.java | 61 ++
.../GridCacheDatabaseSharedManager.java | 5 +
.../wal/AbstractWalRecordsIterator.java | 73 ++-
.../cache/persistence/wal/FileInput.java | 8 +-
.../wal/FileWriteAheadLogManager.java | 650 +++++++++++++++----
.../persistence/wal/RecordDataSerializer.java | 58 --
.../cache/persistence/wal/RecordSerializer.java | 62 --
.../SingleSegmentLogicalRecordsIterator.java | 141 ++++
.../reader/StandaloneWalRecordsIterator.java | 5 +-
.../persistence/wal/record/HeaderRecord.java | 7 +-
.../persistence/wal/record/RecordTypes.java | 69 ++
.../wal/serializer/RecordDataSerializer.java | 59 ++
.../wal/serializer/RecordDataV1Serializer.java | 11 +-
.../wal/serializer/RecordDataV2Serializer.java | 19 +-
.../wal/serializer/RecordSerializer.java | 63 ++
.../wal/serializer/RecordSerializerFactory.java | 71 ++
.../serializer/RecordSerializerFactoryImpl.java | 133 ++++
.../wal/serializer/RecordV1Serializer.java | 71 +-
.../wal/serializer/RecordV2Serializer.java | 93 ++-
.../utils/PlatformConfigurationUtils.java | 2 +
.../db/wal/IgniteWalFlushFailoverTest.java | 16 +-
.../wal/IgniteWalHistoryReservationsTest.java | 2 +-
.../db/wal/IgniteWalRecoveryTest.java | 2 +-
.../IgniteWalRecoveryWithCompactionTest.java | 33 +
.../db/wal/IgniteWalSerializerVersionTest.java | 9 +-
.../persistence/db/wal/WalCompactionTest.java | 312 +++++++++
.../persistence/pagemem/NoOpWALManager.java | 5 +
.../ignite/testsuites/IgnitePdsTestSuite2.java | 3 +
.../IgnitePdsWithIndexingCoreTestSuite.java | 2 +
.../Configuration/DataStorageConfiguration.cs | 16 +
.../IgniteConfigurationSection.xsd | 5 +
37 files changed, 2102 insertions(+), 288 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/compatibility/src/test/java/org/apache/ignite/compatibility/persistence/MigratingToWalV2SerializerWithCompactionTest.java
----------------------------------------------------------------------
diff --git a/modules/compatibility/src/test/java/org/apache/ignite/compatibility/persistence/MigratingToWalV2SerializerWithCompactionTest.java b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/persistence/MigratingToWalV2SerializerWithCompactionTest.java
new file mode 100644
index 0000000..0ca3833
--- /dev/null
+++ b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/persistence/MigratingToWalV2SerializerWithCompactionTest.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.compatibility.persistence;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.GridCacheAbstractFullApiSelfTest;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+
+/**
+ * Saves data using previous version of ignite and then load this data using actual version
+ */
+public class MigratingToWalV2SerializerWithCompactionTest extends IgnitePersistenceCompatibilityAbstractTest {
+ /** */
+ private static final String TEST_CACHE_NAME = DummyPersistenceCompatibilityTest.class.getSimpleName();
+
+ /** Entries count. */
+ private static final int ENTRIES = 300;
+
+ /** Wal segment size. */
+ private static final int WAL_SEGMENT_SIZE = 1024 * 1024;
+
+ /** Entry payload size. */
+ private static final int PAYLOAD_SIZE = 20000;
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), "binary_meta", false));
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setPeerClassLoadingEnabled(false);
+
+ DataStorageConfiguration memCfg = new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(
+ new DataRegionConfiguration().setPersistenceEnabled(true))
+ .setWalSegmentSize(WAL_SEGMENT_SIZE)
+ .setWalCompactionEnabled(true)
+ .setWalMode(WALMode.LOG_ONLY)
+ .setWalHistorySize(200);
+
+ cfg.setDataStorageConfiguration(memCfg);
+
+ return cfg;
+ }
+
+ /**
+ * Tests opportunity to read data from previous Ignite DB version.
+ *
+ * @throws Exception If failed.
+ */
+ public void testCompactingOldWalFiles() throws Exception {
+ doTestStartupWithOldVersion("2.3.0");
+ }
+
+ /**
+ * Tests opportunity to read data from previous Ignite DB version.
+ *
+ * @param ver 3-digits version of ignite
+ * @throws Exception If failed.
+ */
+ private void doTestStartupWithOldVersion(String ver) throws Exception {
+ try {
+ startGrid(1, ver, new ConfigurationClosure(), new PostStartupClosure());
+
+ stopAllGrids();
+
+ IgniteEx ignite = startGrid(0);
+
+ ignite.active(true);
+
+ IgniteCache<Integer, byte[]> cache = ignite.getOrCreateCache(TEST_CACHE_NAME);
+
+ for (int i = ENTRIES; i < ENTRIES * 2; i++) {
+ final byte[] val = new byte[PAYLOAD_SIZE];
+
+ ThreadLocalRandom.current().nextBytes(val);
+
+ val[i] = 1;
+
+ cache.put(i, val);
+ }
+
+ // WAL archive segment is allowed to be compressed when it's at least one checkpoint away from current WAL head.
+ ignite.context().cache().context().database().wakeupForCheckpoint("Forced checkpoint").get();
+ ignite.context().cache().context().database().wakeupForCheckpoint("Forced checkpoint").get();
+
+ Thread.sleep(15_000); // Time to compress WAL.
+
+ int expCompressedWalSegments = PAYLOAD_SIZE * ENTRIES * 4 / WAL_SEGMENT_SIZE - 1;
+
+ String nodeFolderName = ignite.context().pdsFolderResolver().resolveFolders().folderName();
+
+ File dbDir = U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false);
+ File walDir = new File(dbDir, "wal");
+ File archiveDir = new File(walDir, "archive");
+ File nodeArchiveDir = new File(archiveDir, nodeFolderName);
+
+ File[] compressedSegments = nodeArchiveDir.listFiles(new FilenameFilter() {
+ @Override public boolean accept(File dir, String name) {
+ return name.endsWith(".wal.zip");
+ }
+ });
+
+ final int actualCompressedWalSegments = compressedSegments == null ? 0 : compressedSegments.length;
+
+ assertTrue("expected=" + expCompressedWalSegments + ", actual=" + actualCompressedWalSegments,
+ actualCompressedWalSegments >= expCompressedWalSegments);
+
+ stopAllGrids();
+
+ File nodeLfsDir = new File(dbDir, nodeFolderName);
+ File cpMarkersDir = new File(nodeLfsDir, "cp");
+
+ File[] cpMarkers = cpMarkersDir.listFiles();
+
+ assertNotNull(cpMarkers);
+ assertTrue(cpMarkers.length > 0);
+
+ File cacheDir = new File(nodeLfsDir, "cache-" + TEST_CACHE_NAME);
+ File[] partFiles = cacheDir.listFiles(new FilenameFilter() {
+ @Override public boolean accept(File dir, String name) {
+ return name.startsWith("part");
+ }
+ });
+
+ assertNotNull(partFiles);
+ assertTrue(partFiles.length > 0);
+
+ // Enforce reading WAL from the very beginning at the next start.
+ for (File f : cpMarkers)
+ f.delete();
+
+ for (File f : partFiles)
+ f.delete();
+
+ ignite = startGrid(0);
+
+ ignite.active(true);
+
+ cache = ignite.cache(TEST_CACHE_NAME);
+
+ boolean fail = false;
+
+ // Check that all data is recovered from compacted WAL.
+ for (int i = 0; i < ENTRIES * 2; i++) {
+ byte[] arr = cache.get(i);
+
+ if (arr == null) {
+ System.out.println(">>> Missing: " + i);
+
+ fail = true;
+ }
+ else if (arr[i] != 1) {
+ System.out.println(">>> Corrupted: " + i);
+
+ fail = true;
+ }
+ }
+
+ assertFalse(fail);
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /** */
+ private static class PostStartupClosure implements IgniteInClosure<Ignite> {
+ /** {@inheritDoc} */
+ @Override public void apply(Ignite ignite) {
+ ignite.active(true);
+
+ CacheConfiguration<Object, Object> cacheCfg = new CacheConfiguration<>();
+ cacheCfg.setName(TEST_CACHE_NAME);
+ cacheCfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+ cacheCfg.setBackups(0);
+ cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+
+ IgniteCache<Object, Object> cache = ignite.getOrCreateCache(cacheCfg);
+
+ for (int i = 0; i < ENTRIES; i++) { // At least 20MB of raw data in total.
+ final byte[] val = new byte[20000];
+
+ ThreadLocalRandom.current().nextBytes(val);
+
+ val[i] = 1;
+
+ cache.put(i, val);
+ }
+ }
+ }
+
+ /** */
+ private static class ConfigurationClosure implements IgniteInClosure<IgniteConfiguration> {
+ /** {@inheritDoc} */
+ @Override public void apply(IgniteConfiguration cfg) {
+ cfg.setLocalHost("127.0.0.1");
+
+ TcpDiscoverySpi disco = new TcpDiscoverySpi();
+ disco.setIpFinder(GridCacheAbstractFullApiSelfTest.LOCAL_IP_FINDER);
+
+ cfg.setDiscoverySpi(disco);
+
+ cfg.setPeerClassLoadingEnabled(false);
+
+ DataStorageConfiguration memCfg = new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(
+ new DataRegionConfiguration().setPersistenceEnabled(true))
+ .setWalSegmentSize(WAL_SEGMENT_SIZE)
+ .setWalMode(WALMode.LOG_ONLY)
+ .setWalHistorySize(100);
+
+ cfg.setDataStorageConfiguration(memCfg);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/compatibility/src/test/java/org/apache/ignite/compatibility/testsuites/IgniteCompatibilityBasicTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/compatibility/src/test/java/org/apache/ignite/compatibility/testsuites/IgniteCompatibilityBasicTestSuite.java b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/testsuites/IgniteCompatibilityBasicTestSuite.java
index 351a0e7..20643d4 100644
--- a/modules/compatibility/src/test/java/org/apache/ignite/compatibility/testsuites/IgniteCompatibilityBasicTestSuite.java
+++ b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/testsuites/IgniteCompatibilityBasicTestSuite.java
@@ -20,6 +20,7 @@ package org.apache.ignite.compatibility.testsuites;
import junit.framework.TestSuite;
import org.apache.ignite.compatibility.persistence.DummyPersistenceCompatibilityTest;
import org.apache.ignite.compatibility.persistence.FoldersReuseCompatibilityTest;
+import org.apache.ignite.compatibility.persistence.MigratingToWalV2SerializerWithCompactionTest;
/**
* Compatibility tests basic test suite.
@@ -36,6 +37,8 @@ public class IgniteCompatibilityBasicTestSuite {
suite.addTestSuite(FoldersReuseCompatibilityTest.class);
+ suite.addTestSuite(MigratingToWalV2SerializerWithCompactionTest.class);
+
return suite;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index e0ace11..8e2298f 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -716,6 +716,11 @@ public final class IgniteSystemProperties {
public static final String IGNITE_ENABLE_FORCIBLE_NODE_KILL = "IGNITE_ENABLE_FORCIBLE_NODE_KILL";
/**
+ *
+ */
+ public static final String IGNITE_WAL_ARCHIVE_COMPACT_SKIP_DELTA_RECORD = "IGNITE_WAL_ARCHIVE_COMPACT_SKIP_DELTA_RECORD";
+
+ /**
* Tasks stealing will be started if tasks queue size per data-streamer thread exceeds this threshold.
* <p>
* Default value is {@code 4}.
http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java
index 8202ef8..2c90398 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java
@@ -145,6 +145,9 @@ public class DataStorageConfiguration implements Serializable {
/** Default write throttling enabled. */
public static final boolean DFLT_WRITE_THROTTLING_ENABLED = false;
+ /** Default wal compaction enabled. */
+ public static final boolean DFLT_WAL_COMPACTION_ENABLED = false;
+
/** Size of a memory chunk reserved for system cache initially. */
private long sysRegionInitSize = DFLT_SYS_CACHE_INIT_SIZE;
@@ -243,6 +246,12 @@ public class DataStorageConfiguration implements Serializable {
private boolean writeThrottlingEnabled = DFLT_WRITE_THROTTLING_ENABLED;
/**
+ * Flag to enable WAL compaction. If true, system filters and compresses WAL archive in background.
+ * Compressed WAL archive gets automatically decompressed on demand.
+ */
+ private boolean walCompactionEnabled = DFLT_WAL_COMPACTION_ENABLED;
+
+ /**
* Initial size of a data region reserved for system cache.
*
* @return Size in bytes.
@@ -850,4 +859,22 @@ public class DataStorageConfiguration implements Serializable {
return this;
}
+
+ /**
+ * @return Flag indicating whether WAL compaction is enabled.
+ */
+ public boolean isWalCompactionEnabled() {
+ return walCompactionEnabled;
+ }
+
+ /**
+ * Sets flag indicating whether WAL compaction is enabled.
+ *
+ * @param walCompactionEnabled Wal compaction enabled flag.
+ */
+ public DataStorageConfiguration setWalCompactionEnabled(boolean walCompactionEnabled) {
+ this.walCompactionEnabled = walCompactionEnabled;
+
+ return this;
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
index ce28ff2..42d9611 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
@@ -102,6 +102,14 @@ public interface IgniteWriteAheadLogManager extends GridCacheSharedManager, Igni
public int truncate(WALPointer ptr);
/**
+ * Gives a hint to WAL manager to compact WAL until given pointer (exclusively).
+ * Compaction implies filtering out physical records and ZIP compression.
+ *
+ * @param ptr Pointer for which it is safe to compact the log.
+ */
+ public void allowCompressionUntil(WALPointer ptr);
+
+ /**
* @return Total number of segments in the WAL archive.
*/
public int walArchiveSegments();
http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/FilteredRecord.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/FilteredRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/FilteredRecord.java
new file mode 100644
index 0000000..519e825
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/FilteredRecord.java
@@ -0,0 +1,31 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.ignite.internal.pagemem.wal.record;
+
+import org.apache.ignite.internal.processors.cache.persistence.wal.AbstractWalRecordsIterator;
+
+/**
+ * Special type of WAL record. Shouldn't be stored in file.
+ * Returned by deserializer if next record is not matched by filter. Automatically handled by
+ * {@link AbstractWalRecordsIterator}.
+ */
+public class FilteredRecord extends WALRecord {
+ /** {@inheritDoc} */
+ @Override public RecordType type() {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MarshalledRecord.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MarshalledRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MarshalledRecord.java
new file mode 100644
index 0000000..448a32c
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MarshalledRecord.java
@@ -0,0 +1,61 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.ignite.internal.pagemem.wal.record;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.pagemem.wal.WALPointer;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
+
+/**
+ * Special type of WAL record. Shouldn't be stored in file.
+ * Contains complete binary representation of record in {@link #buf} and record position in {@link #pos}.
+ */
+public class MarshalledRecord extends WALRecord {
+ /** Type of marshalled record. */
+ private WALRecord.RecordType type;
+
+ /**
+ * Heap buffer with marshalled record bytes.
+ * Due to performance reasons accessible only by thread that performs WAL iteration and until next record is read.
+ */
+ private ByteBuffer buf;
+
+ /**
+ * @param type Type of marshalled record.
+ * @param pos WAL pointer to record.
+ * @param buf Reusable buffer with record data.
+ */
+ public MarshalledRecord(RecordType type, WALPointer pos, ByteBuffer buf) {
+ this.type = type;
+ this.buf = buf;
+
+ position(pos);
+ }
+
+ /** {@inheritDoc} */
+ @Override public RecordType type() {
+ return type;
+ }
+
+ /**
+ * @return Buffer with marshalled record bytes. Due to performance reasons accessible only by thread that performs
+ * WAL iteration and until next record is read.
+ */
+ public ByteBuffer buffer() {
+ return buf;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index e07aef7..c0e59bc 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -1447,6 +1447,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
cctx.pageStore().beginRecover();
}
+ else
+ cctx.wal().allowCompressionUntil(status.startPtr);
long start = U.currentTimeMillis();
int applied = 0;
@@ -2973,6 +2975,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
}
chp.walFilesDeleted = deleted;
+
+ if (!chp.cpPages.isEmpty())
+ cctx.wal().allowCompressionUntil(chp.cpEntry.checkpointMark());
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
index 5be6e55..7415db3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
@@ -30,7 +30,9 @@ import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactory;
import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
+import org.apache.ignite.internal.util.typedef.P2;
import org.apache.ignite.lang.IgniteBiTuple;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -70,8 +72,8 @@ public abstract class AbstractWalRecordsIterator
*/
@NotNull protected final GridCacheSharedContext sharedCtx;
- /** Serializer of current version to read headers. */
- @NotNull private final RecordSerializer serializer;
+ /** Serializer factory. */
+ @NotNull private final RecordSerializerFactory serializerFactory;
/** Factory to provide I/O interfaces for read/write operations with files */
@NotNull protected final FileIOFactory ioFactory;
@@ -82,20 +84,20 @@ public abstract class AbstractWalRecordsIterator
/**
* @param log Logger.
* @param sharedCtx Shared context.
- * @param serializer Serializer of current version to read headers.
+ * @param serializerFactory Serializer of current version to read headers.
* @param ioFactory ioFactory for file IO access.
* @param bufSize buffer for reading records size.
*/
protected AbstractWalRecordsIterator(
@NotNull final IgniteLogger log,
@NotNull final GridCacheSharedContext sharedCtx,
- @NotNull final RecordSerializer serializer,
+ @NotNull final RecordSerializerFactory serializerFactory,
@NotNull final FileIOFactory ioFactory,
final int bufSize
) {
this.log = log;
this.sharedCtx = sharedCtx;
- this.serializer = serializer;
+ this.serializerFactory = serializerFactory;
this.ioFactory = ioFactory;
buf = new ByteBufferExpander(bufSize, ByteOrder.nativeOrder());
@@ -107,7 +109,7 @@ public abstract class AbstractWalRecordsIterator
* @return found WAL file descriptors
*/
protected static FileWriteAheadLogManager.FileDescriptor[] loadFileDescriptors(@NotNull final File walFilesDir) throws IgniteCheckedException {
- final File[] files = walFilesDir.listFiles(FileWriteAheadLogManager.WAL_SEGMENT_FILE_FILTER);
+ final File[] files = walFilesDir.listFiles(FileWriteAheadLogManager.WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER);
if (files == null) {
throw new IgniteCheckedException("WAL files directory does not not denote a " +
@@ -156,8 +158,12 @@ public abstract class AbstractWalRecordsIterator
try {
curRec = advanceRecord(currWalSegment);
- if (curRec != null)
+ if (curRec != null) {
+ if (curRec.get2().type() == null)
+ continue; // Record was skipped by filter of current serializer, should read next record.
+
return;
+ }
else {
currWalSegment = advanceSegment(currWalSegment);
@@ -275,20 +281,31 @@ public abstract class AbstractWalRecordsIterator
FileIO fileIO = ioFactory.create(desc.file);
try {
- int serVer = FileWriteAheadLogManager.readSerializerVersion(fileIO);
+ IgniteBiTuple<Integer, Boolean> tup = FileWriteAheadLogManager.readSerializerVersionAndCompactedFlag(fileIO);
- RecordSerializer ser = FileWriteAheadLogManager.forVersion(sharedCtx, serVer);
+ int serVer = tup.get1();
+
+ boolean isCompacted = tup.get2();
FileInput in = new FileInput(fileIO, buf);
if (start != null && desc.idx == start.index()) {
- // Make sure we skip header with serializer version.
- long startOffset = Math.max(start.fileOffset(), fileIO.position());
-
- in.seek(startOffset);
+ if (isCompacted) {
+ serializerFactory.skipPositionCheck(true);
+
+ if (start.fileOffset() != 0)
+ serializerFactory.recordDeserializeFilter(new StartSeekingFilter(start));
+ }
+ else {
+ // Make sure we skip header with serializer version.
+ long startOff = Math.max(start.fileOffset(), fileIO.position());
+
+ in.seek(startOff);
+ }
}
- return new FileWriteAheadLogManager.ReadFileHandle(fileIO, desc.idx, sharedCtx.igniteInstanceName(), ser, in);
+ return new FileWriteAheadLogManager.ReadFileHandle(
+ fileIO, desc.idx, sharedCtx.igniteInstanceName(), serializerFactory.createSerializer(serVer), in);
}
catch (SegmentEofException | EOFException ignore) {
try {
@@ -320,4 +337,32 @@ public abstract class AbstractWalRecordsIterator
}
}
+ /**
+ * Filter that drops all records until given start pointer is reached.
+ */
+ private static class StartSeekingFilter implements P2<WALRecord.RecordType, WALPointer> {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
+
+ /** Start pointer. */
+ private final FileWALPointer start;
+
+ /** Start reached flag. */
+ private boolean startReached;
+
+ /**
+ * @param start Start.
+ */
+ StartSeekingFilter(FileWALPointer start) {
+ this.start = start;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(WALRecord.RecordType type, WALPointer pointer) {
+ if (start.fileOffset() == ((FileWALPointer)pointer).fileOffset())
+ startReached = true;
+
+ return startReached;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java
index 3b20fce..303a023 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java
@@ -348,7 +348,13 @@ public final class FileInput implements ByteBufferBackedDataInput {
/** {@inheritDoc} */
@Override public int skipBytes(int n) throws IOException {
- throw new UnsupportedOperationException();
+ ensure(n);
+
+ int skipped = Math.min(buf.remaining(), n);
+
+ buf.position(buf.position() + skipped);
+
+ return skipped;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index 3d8d78f..a450521 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -17,10 +17,14 @@
package org.apache.ignite.internal.processors.cache.persistence.wal;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
import java.io.EOFException;
import java.io.File;
import java.io.FileFilter;
+import java.io.FileInputStream;
import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
@@ -31,6 +35,8 @@ import java.util.HashMap;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -40,6 +46,9 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+import java.util.zip.ZipOutputStream;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
@@ -49,6 +58,7 @@ import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.events.EventType;
import org.apache.ignite.events.WalSegmentArchivedEvent;
import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.IgnitionEx;
import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
@@ -57,24 +67,28 @@ import org.apache.ignite.internal.pagemem.wal.StorageException;
import org.apache.ignite.internal.pagemem.wal.WALIterator;
import org.apache.ignite.internal.pagemem.wal.WALPointer;
import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord;
+import org.apache.ignite.internal.pagemem.wal.record.MarshalledRecord;
import org.apache.ignite.internal.pagemem.wal.record.SwitchSegmentRecord;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
-import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderSettings;
import org.apache.ignite.internal.processors.cache.persistence.wal.crc.PureJavaCrc32;
import org.apache.ignite.internal.processors.cache.persistence.wal.record.HeaderRecord;
-import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordDataV1Serializer;
-import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordDataV2Serializer;
+import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer;
+import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactory;
+import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactoryImpl;
import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer;
-import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV2Serializer;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.CIX1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.SB;
@@ -123,8 +137,36 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
}
};
+ /** */
+ private static final Pattern WAL_SEGMENT_FILE_COMPACTED_PATTERN = Pattern.compile("\\d{16}\\.wal\\.zip");
+
+ /** WAL segment file filter, see {@link #WAL_NAME_PATTERN} */
+ public static final FileFilter WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER = new FileFilter() {
+ @Override public boolean accept(File file) {
+ return !file.isDirectory() && (WAL_NAME_PATTERN.matcher(file.getName()).matches() ||
+ WAL_SEGMENT_FILE_COMPACTED_PATTERN.matcher(file.getName()).matches());
+ }
+ };
+
+ /** */
+ private static final Pattern WAL_SEGMENT_TEMP_FILE_COMPACTED_PATTERN = Pattern.compile("\\d{16}\\.wal\\.zip\\.tmp");
+
+ /** */
+ private static final FileFilter WAL_SEGMENT_FILE_COMPACTED_FILTER = new FileFilter() {
+ @Override public boolean accept(File file) {
+ return !file.isDirectory() && WAL_SEGMENT_FILE_COMPACTED_PATTERN.matcher(file.getName()).matches();
+ }
+ };
+
+ /** */
+ private static final FileFilter WAL_SEGMENT_TEMP_FILE_COMPACTED_FILTER = new FileFilter() {
+ @Override public boolean accept(File file) {
+ return !file.isDirectory() && WAL_SEGMENT_TEMP_FILE_COMPACTED_PATTERN.matcher(file.getName()).matches();
+ }
+ };
+
/** Latest serializer version to use. */
- public static final int LATEST_SERIALIZER_VERSION = 1;
+ private static final int LATEST_SERIALIZER_VERSION = 2;
/** */
private final boolean alwaysWriteFullPages;
@@ -169,8 +211,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
private final int serializerVersion =
IgniteSystemProperties.getInteger(IGNITE_WAL_SERIALIZER_VERSION, LATEST_SERIALIZER_VERSION);
- /** */
- private volatile long oldestArchiveSegmentIdx;
+ /** Latest segment cleared by {@link #truncate(WALPointer)}. */
+ private volatile long lastTruncatedArchiveIdx = -1L;
/** Factory to provide I/O interfaces for read/write operations with files */
private final FileIOFactory ioFactory;
@@ -197,6 +239,12 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
/** */
private volatile FileArchiver archiver;
+ /** Compressor. */
+ private volatile FileCompressor compressor;
+
+ /** Decompressor. */
+ private volatile FileDecompressor decompressor;
+
/** */
private final ThreadLocal<WALPointer> lastWALPtr = new ThreadLocal<>();
@@ -276,7 +324,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
"write ahead log archive directory"
);
- serializer = forVersion(cctx, serializerVersion);
+ serializer = new RecordSerializerFactoryImpl(cctx).createSerializer(serializerVersion);
GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)cctx.database();
@@ -286,10 +334,16 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
IgniteBiTuple<Long, Long> tup = scanMinMaxArchiveIndices();
- oldestArchiveSegmentIdx = tup == null ? 0 : tup.get1();
+ lastTruncatedArchiveIdx = tup == null ? -1 : tup.get1() - 1;
archiver = new FileArchiver(tup == null ? -1 : tup.get2());
+ if (dsCfg.isWalCompactionEnabled()) {
+ compressor = new FileCompressor();
+
+ decompressor = new FileDecompressor();
+ }
+
if (mode != WALMode.NONE) {
if (log.isInfoEnabled())
log.info("Started write-ahead log manager [mode=" + mode + ']');
@@ -338,6 +392,12 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
if (archiver != null)
archiver.shutdown();
+
+ if (compressor != null)
+ compressor.shutdown();
+
+ if (decompressor != null)
+ decompressor.shutdown();
}
catch (Exception e) {
U.error(log, "Failed to gracefully close WAL segment: " + currentHnd.fileIO, e);
@@ -354,8 +414,13 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
if (!cctx.kernalContext().clientNode()) {
assert archiver != null;
-
archiver.start();
+
+ if (compressor != null)
+ compressor.start();
+
+ if (decompressor != null)
+ decompressor.start();
}
}
@@ -575,11 +640,11 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
(FileWALPointer)start,
end,
dsCfg,
- serializer,
+ new RecordSerializerFactoryImpl(cctx),
ioFactory,
archiver,
- log,
- tlbSize
+ decompressor,
+ log
);
}
@@ -626,9 +691,12 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
* @return {@code true} if has this index.
*/
private boolean hasIndex(long absIdx) {
- String name = FileDescriptor.fileName(absIdx);
+ String segmentName = FileDescriptor.fileName(absIdx);
+
+ String zipSegmentName = FileDescriptor.fileName(absIdx) + ".zip";
- boolean inArchive = new File(walArchiveDir, name).exists();
+ boolean inArchive = new File(walArchiveDir, segmentName).exists() ||
+ new File(walArchiveDir, zipSegmentName).exists();
if (inArchive)
return true;
@@ -651,7 +719,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
// File pointer bound: older entries will be deleted from archive
FileWALPointer fPtr = (FileWALPointer)ptr;
- FileDescriptor[] descs = scan(walArchiveDir.listFiles(WAL_SEGMENT_FILE_FILTER));
+ FileDescriptor[] descs = scan(walArchiveDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER));
int deleted = 0;
@@ -671,8 +739,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
deleted++;
// Bump up the oldest archive segment index.
- if (oldestArchiveSegmentIdx < desc.idx)
- oldestArchiveSegmentIdx = desc.idx;
+ if (lastTruncatedArchiveIdx < desc.idx)
+ lastTruncatedArchiveIdx = desc.idx;
}
}
@@ -680,15 +748,21 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
}
/** {@inheritDoc} */
+ @Override public void allowCompressionUntil(WALPointer ptr) {
+ if (compressor != null)
+ compressor.allowCompressionUntil(((FileWALPointer)ptr).index());
+ }
+
+ /** {@inheritDoc} */
@Override public int walArchiveSegments() {
- long oldest = oldestArchiveSegmentIdx;
+ long lastTruncated = lastTruncatedArchiveIdx;
long lastArchived = archiver.lastArchivedAbsoluteIndex();
if (lastArchived == -1)
return 0;
- int res = (int)(lastArchived - oldest);
+ int res = (int)(lastArchived - lastTruncated);
return res >= 0 ? res : 0;
}
@@ -710,7 +784,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
private long lastArchivedIndex() {
long lastIdx = -1;
- for (File file : walArchiveDir.listFiles(WAL_SEGMENT_FILE_FILTER)) {
+ for (File file : walArchiveDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER)) {
try {
long idx = Long.parseLong(file.getName().substring(0, 16));
@@ -725,27 +799,42 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
}
/**
- * Lists files in archive directory and returns the index of last archived file.
+ * Lists files in archive directory and returns the indices of least and last archived files.
+ * In case of holes, first segment after last "hole" is considered as minimum.
+ * Example: minimum(0, 1, 10, 11, 20, 21, 22) should be 20
*
- * @return The absolute index of last archived file.
+ * @return The absolute indices of min and max archived files.
*/
private IgniteBiTuple<Long, Long> scanMinMaxArchiveIndices() {
- long minIdx = Integer.MAX_VALUE;
- long maxIdx = -1;
+ TreeSet<Long> archiveIndices = new TreeSet<>();
- for (File file : walArchiveDir.listFiles(WAL_SEGMENT_FILE_FILTER)) {
+ for (File file : walArchiveDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER)) {
try {
long idx = Long.parseLong(file.getName().substring(0, 16));
- minIdx = Math.min(minIdx, idx);
- maxIdx = Math.max(maxIdx, idx);
+ archiveIndices.add(idx);
}
catch (NumberFormatException | IndexOutOfBoundsException ignore) {
-
+ // No-op.
}
}
- return maxIdx == -1 ? null : F.t(minIdx, maxIdx);
+ if (archiveIndices.isEmpty())
+ return null;
+ else {
+ Long min = archiveIndices.first();
+ Long max = archiveIndices.last();
+
+ if (max - min == archiveIndices.size() - 1)
+ return F.t(min, max); // Short path.
+
+ for (Long idx : archiveIndices.descendingSet()) {
+ if (!archiveIndices.contains(idx - 1))
+ return F.t(idx, max);
+ }
+
+ throw new IllegalStateException("Should never happen if TreeSet is valid.");
+ }
}
/**
@@ -836,14 +925,14 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
// If we have existing segment, try to read version from it.
if (lastReadPtr != null) {
try {
- serVer = readSerializerVersion(fileIO);
+ serVer = readSerializerVersionAndCompactedFlag(fileIO).get1();
}
catch (SegmentEofException | EOFException ignore) {
serVer = serializerVersion;
}
}
- RecordSerializer ser = forVersion(cctx, serVer);
+ RecordSerializer ser = new RecordSerializerFactoryImpl(cctx).createSerializer(serVer);
if (log.isInfoEnabled())
log.info("Resuming logging to WAL segment [file=" + curFile.getAbsolutePath() +
@@ -1021,37 +1110,6 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
return new File(walWorkDir, FileDescriptor.fileName(segmentIdx));
}
- /**
- * @param cctx Shared context.
- * @param ver Serializer version.
- * @return Entry serializer.
- */
- public static RecordSerializer forVersion(GridCacheSharedContext cctx, int ver) throws IgniteCheckedException {
- return forVersion(cctx, ver, false);
- }
-
- /**
- * @param ver Serializer version.
- * @return Entry serializer.
- */
- static RecordSerializer forVersion(GridCacheSharedContext cctx, int ver, boolean writePointer) throws IgniteCheckedException {
- if (ver <= 0)
- throw new IgniteCheckedException("Failed to create a serializer (corrupted WAL file).");
-
- switch (ver) {
- case 1:
- return new RecordV1Serializer(new RecordDataV1Serializer(cctx), writePointer);
-
- case 2:
- RecordDataV2Serializer dataV2Serializer = new RecordDataV2Serializer(new RecordDataV1Serializer(cctx));
-
- return new RecordV2Serializer(dataV2Serializer, writePointer);
-
- default:
- throw new IgniteCheckedException("Failed to create a serializer with the given version " +
- "(forward compatibility is not supported): " + ver);
- }
- }
/**
* @return Sorted WAL files descriptors.
@@ -1103,13 +1161,13 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
private IgniteCheckedException cleanException;
/**
- * Absolute current segment index WAL Manger writes to. Guarded by <code>this</code>.
+ * Absolute current segment index WAL Manager writes to. Guarded by <code>this</code>.
* Incremented during rollover. Also may be directly set if WAL is resuming logging after start.
*/
private long curAbsWalIdx = -1;
/** Last archived file index (absolute, 0-based). Guarded by <code>this</code>. */
- private long lastAbsArchivedIdx = -1;
+ private volatile long lastAbsArchivedIdx = -1;
/** current thread stopping advice */
private volatile boolean stopped;
@@ -1135,7 +1193,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
/**
* @return Last archived segment absolute index.
*/
- private synchronized long lastArchivedAbsoluteIndex() {
+ private long lastArchivedAbsoluteIndex() {
return lastAbsArchivedIdx;
}
@@ -1221,7 +1279,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
wait();
if (curAbsWalIdx != 0 && lastAbsArchivedIdx == -1)
- lastAbsArchivedIdx = curAbsWalIdx - 1;
+ changeLastArchivedIndexAndWakeupCompressor(curAbsWalIdx - 1);
}
while (!Thread.currentThread().isInterrupted() && !stopped) {
@@ -1252,7 +1310,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
formatFile(res.getOrigWorkFile());
// Then increase counter to allow rollover on clean working file
- lastAbsArchivedIdx = toArchive;
+ changeLastArchivedIndexAndWakeupCompressor(toArchive);
notifyAll();
}
@@ -1275,6 +1333,16 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
}
/**
+ * @param idx Index.
+ */
+ private void changeLastArchivedIndexAndWakeupCompressor(long idx) {
+ lastAbsArchivedIdx = idx;
+
+ if (compressor != null)
+ compressor.onNextSegmentArchived();
+ }
+
+ /**
* Gets the absolute index of the next WAL segment available to write.
* Blocks till there are available file to write
*
@@ -1301,7 +1369,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
return curAbsWalIdx;
}
}
- catch (InterruptedException e) {
+ catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IgniteInterruptedCheckedException(e);
@@ -1426,6 +1494,314 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
}
/**
+ * Responsible for compressing WAL archive segments.
+ * Also responsible for deleting raw copies of already compressed WAL archive segments if they are not reserved.
+ */
+ private class FileCompressor extends Thread {
+ /** Current thread stopping advice. */
+ private volatile boolean stopped;
+
+ /** Last successfully compressed segment. */
+ private volatile long lastCompressedIdx = -1L;
+
+ /** All segments prior to this (inclusive) can be compressed. */
+ private volatile long lastAllowedToCompressIdx = -1L;
+
+ /**
+ *
+ */
+ FileCompressor() {
+ super("wal-file-compressor%" + cctx.igniteInstanceName());
+ }
+
+ /**
+ *
+ */
+ private void init() {
+ File[] toDel = walArchiveDir.listFiles(WAL_SEGMENT_TEMP_FILE_COMPACTED_FILTER);
+
+ for (File f : toDel) {
+ if (stopped)
+ return;
+
+ f.delete();
+ }
+
+ FileDescriptor[] alreadyCompressed = scan(walArchiveDir.listFiles(WAL_SEGMENT_FILE_COMPACTED_FILTER));
+
+ if (alreadyCompressed.length > 0)
+ lastCompressedIdx = alreadyCompressed[alreadyCompressed.length - 1].getIdx();
+ }
+
+ /**
+ * @param lastCpStartIdx Segment index to allow compression until (exclusively).
+ */
+ synchronized void allowCompressionUntil(long lastCpStartIdx) {
+ lastAllowedToCompressIdx = lastCpStartIdx - 1;
+
+ notify();
+ }
+
+ /**
+ * Callback for waking up compressor when new segment is archived.
+ */
+ synchronized void onNextSegmentArchived() {
+ notify();
+ }
+
+ /**
+ * Pessimistically tries to reserve segment for compression in order to avoid concurrent truncation.
+ * Waits if there's no segment to archive right now.
+ */
+ private long tryReserveNextSegmentOrWait() throws InterruptedException, IgniteCheckedException {
+ long segmentToCompress = lastCompressedIdx + 1;
+
+ synchronized (this) {
+ while (segmentToCompress > Math.min(lastAllowedToCompressIdx, archiver.lastArchivedAbsoluteIndex())) {
+ wait();
+
+ if (stopped)
+ return -1;
+ }
+ }
+
+ segmentToCompress = Math.max(segmentToCompress, lastTruncatedArchiveIdx + 1);
+
+ boolean reserved = reserve(new FileWALPointer(segmentToCompress, 0, 0));
+
+ return reserved ? segmentToCompress : -1;
+ }
+
+ /**
+ *
+ */
+ private void deleteObsoleteRawSegments() {
+ FileDescriptor[] descs = scan(walArchiveDir.listFiles(WAL_SEGMENT_FILE_FILTER));
+
+ FileArchiver archiver0 = archiver;
+
+ for (FileDescriptor desc : descs) {
+ // Do not delete reserved or locked segment and any segment after it.
+ if (archiver0 != null && archiver0.reserved(desc.idx))
+ return;
+
+ if (desc.idx < lastCompressedIdx) {
+ if (!desc.file.delete())
+ U.warn(log, "Failed to remove obsolete WAL segment (make sure the process has enough rights): " +
+ desc.file.getAbsolutePath() + ", exists: " + desc.file.exists());
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run() {
+ init();
+
+ while (!Thread.currentThread().isInterrupted() && !stopped) {
+ try {
+ deleteObsoleteRawSegments();
+
+ long nextSegment = tryReserveNextSegmentOrWait();
+ if (nextSegment == -1)
+ continue;
+
+ File tmpZip = new File(walArchiveDir, FileDescriptor.fileName(nextSegment) + ".zip" + ".tmp");
+
+ File zip = new File(walArchiveDir, FileDescriptor.fileName(nextSegment) + ".zip");
+
+ File raw = new File(walArchiveDir, FileDescriptor.fileName(nextSegment));
+ if (!Files.exists(raw.toPath()))
+ throw new IgniteCheckedException("WAL archive segment is missing: " + raw);
+
+ compressSegmentToFile(nextSegment, raw, tmpZip);
+
+ Files.move(tmpZip.toPath(), zip.toPath());
+
+ if (mode == WALMode.DEFAULT) {
+ try (FileIO f0 = ioFactory.create(zip, CREATE, READ, WRITE)) {
+ f0.force();
+ }
+ }
+
+ lastCompressedIdx = nextSegment;
+ }
+ catch (IgniteCheckedException | IOException e) {
+ U.error(log, "Unexpected error during WAL compression", e);
+
+ FileWriteHandle handle = currentHandle();
+
+ if (handle != null)
+ handle.invalidateEnvironment(e);
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ /**
+ * @param nextSegment Next segment absolute idx.
+ * @param raw Raw file.
+ * @param zip Zip file.
+ */
+ private void compressSegmentToFile(long nextSegment, File raw, File zip)
+ throws IOException, IgniteCheckedException {
+ int segmentSerializerVer;
+
+ try (FileIO fileIO = ioFactory.create(raw)) {
+ IgniteBiTuple<Integer, Boolean> tup = FileWriteAheadLogManager.readSerializerVersionAndCompactedFlag(fileIO);
+
+ segmentSerializerVer = tup.get1();
+ }
+
+ try (ZipOutputStream zos = new ZipOutputStream(new BufferedOutputStream(new FileOutputStream(zip)))) {
+ zos.putNextEntry(new ZipEntry(""));
+
+ zos.write(prepareSerializerVersionBuffer(nextSegment, segmentSerializerVer, true).array());
+
+ final CIX1<WALRecord> appendToZipC = new CIX1<WALRecord>() {
+ @Override public void applyx(WALRecord record) throws IgniteCheckedException {
+ final MarshalledRecord marshRec = (MarshalledRecord)record;
+
+ try {
+ zos.write(marshRec.buffer().array(), 0, marshRec.buffer().remaining());
+ }
+ catch (IOException e) {
+ throw new IgniteCheckedException(e);
+ }
+ }
+ };
+
+ try (SingleSegmentLogicalRecordsIterator iter = new SingleSegmentLogicalRecordsIterator(
+ log, cctx, ioFactory, tlbSize, nextSegment, walArchiveDir, appendToZipC)) {
+
+ while (iter.hasNextX())
+ iter.nextX();
+ }
+ }
+ finally {
+ release(new FileWALPointer(nextSegment, 0, 0));
+ }
+ }
+
+ /**
+ * @throws IgniteInterruptedCheckedException If failed to wait for thread shutdown.
+ */
+ private void shutdown() throws IgniteInterruptedCheckedException {
+ synchronized (this) {
+ stopped = true;
+
+ notifyAll();
+ }
+
+ U.join(this);
+ }
+ }
+
+ /**
+ * Responsible for decompressing previously compressed segments of WAL archive if they are needed for replay.
+ */
+ private class FileDecompressor extends Thread {
+ /** Current thread stopping advice. */
+ private volatile boolean stopped;
+
+ /** Decompression futures. */
+ private Map<Long, GridFutureAdapter<Void>> decompressionFutures = new HashMap<>();
+
+ /** Segments queue. */
+ private PriorityBlockingQueue<Long> segmentsQueue = new PriorityBlockingQueue<>();
+
+ /** Byte array for draining data. */
+ private byte[] arr = new byte[tlbSize];
+
+ /**
+ *
+ */
+ FileDecompressor() {
+ super("wal-file-decompressor%" + cctx.igniteInstanceName());
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run() {
+ while (!Thread.currentThread().isInterrupted() && !stopped) {
+ try {
+ long segmentToDecompress = segmentsQueue.take();
+
+ if (stopped)
+ break;
+
+ File zip = new File(walArchiveDir, FileDescriptor.fileName(segmentToDecompress) + ".zip");
+ File unzipTmp = new File(walArchiveDir, FileDescriptor.fileName(segmentToDecompress) + ".tmp");
+ File unzip = new File(walArchiveDir, FileDescriptor.fileName(segmentToDecompress));
+
+ try (ZipInputStream zis = new ZipInputStream(new BufferedInputStream(new FileInputStream(zip)));
+ FileIO io = ioFactory.create(unzipTmp)) {
+ zis.getNextEntry();
+
+ int bytesRead;
+ while ((bytesRead = zis.read(arr)) > 0)
+ io.write(arr, 0, bytesRead);
+ }
+
+ Files.move(unzipTmp.toPath(), unzip.toPath());
+
+ synchronized (this) {
+ decompressionFutures.remove(segmentToDecompress).onDone();
+ }
+ }
+ catch (InterruptedException e){
+ Thread.currentThread().interrupt();
+ }
+ catch (IOException e) {
+ U.error(log, "Unexpected error during WAL decompression", e);
+
+ FileWriteHandle handle = currentHandle();
+
+ if (handle != null)
+ handle.invalidateEnvironment(e);
+ }
+ }
+ }
+
+ /**
+ * Asynchronously decompresses WAL segment which is present only in .zip file.
+ *
+ * @return Future which is completed once file is decompressed.
+ */
+ synchronized IgniteInternalFuture<Void> decompressFile(long idx) {
+ if (decompressionFutures.containsKey(idx))
+ return decompressionFutures.get(idx);
+
+ File f = new File(walArchiveDir, FileDescriptor.fileName(idx));
+
+ if (f.exists())
+ return new GridFinishedFuture<>();
+
+ segmentsQueue.put(idx);
+
+ GridFutureAdapter<Void> res = new GridFutureAdapter<>();
+
+ decompressionFutures.put(idx, res);
+
+ return res;
+ }
+
+ /**
+ * @throws IgniteInterruptedCheckedException If failed to wait for thread shutdown.
+ */
+ private void shutdown() throws IgniteInterruptedCheckedException {
+ synchronized (this) {
+ stopped = true;
+
+ // Put fake -1 to wake thread from queue.take()
+ segmentsQueue.put(-1L);
+ }
+
+ U.join(this);
+ }
+ }
+
+ /**
* Validate files depending on {@link DataStorageConfiguration#getWalSegments()} and create if need.
* Check end when exit condition return false or all files are passed.
*
@@ -1452,14 +1828,14 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
}
/**
- * Reads record serializer version from provided {@code io}.
+ * Reads record serializer version from provided {@code io} along with compacted flag.
* NOTE: Method mutates position of {@code io}.
*
* @param io I/O interface for file.
* @return Serializer version stored in the file.
* @throws IgniteCheckedException If failed to read serializer version.
*/
- public static int readSerializerVersion(FileIO io)
+ public static IgniteBiTuple<Integer, Boolean> readSerializerVersionAndCompactedFlag(FileIO io)
throws IgniteCheckedException, IOException {
try (ByteBufferExpander buf = new ByteBufferExpander(RecordV1Serializer.HEADER_RECORD_SIZE, ByteOrder.nativeOrder())) {
FileInput in = new FileInput(io, buf);
@@ -1481,19 +1857,25 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
assert ptr.fileOffset() == 0 : "Header record should be placed at the beginning of file " + ptr;
- long headerMagicNumber = in.readLong();
+ long hdrMagicNum = in.readLong();
- if (headerMagicNumber != HeaderRecord.MAGIC)
- throw new IOException("Magic is corrupted [exp=" + U.hexLong(HeaderRecord.MAGIC) +
- ", actual=" + U.hexLong(headerMagicNumber) + ']');
+ boolean compacted;
+ if (hdrMagicNum == HeaderRecord.REGULAR_MAGIC)
+ compacted = false;
+ else if (hdrMagicNum == HeaderRecord.COMPACTED_MAGIC)
+ compacted = true;
+ else {
+ throw new IOException("Magic is corrupted [exp=" + U.hexLong(HeaderRecord.REGULAR_MAGIC) +
+ ", actual=" + U.hexLong(hdrMagicNum) + ']');
+ }
// Read serializer version.
- int version = in.readInt();
+ int ver = in.readInt();
// Read and skip CRC.
in.readInt();
- return version;
+ return new IgniteBiTuple<>(ver, compacted);
}
}
@@ -1508,47 +1890,58 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
* @throws IOException If failed to write serializer version.
*/
public static long writeSerializerVersion(FileIO io, long idx, int version) throws IOException {
- ByteBuffer buffer = ByteBuffer.allocate(RecordV1Serializer.HEADER_RECORD_SIZE);
- buffer.order(ByteOrder.nativeOrder());
+ ByteBuffer buffer = prepareSerializerVersionBuffer(idx, version, false);
+
+ do {
+ io.write(buffer);
+ }
+ while (buffer.hasRemaining());
+
+ // Flush
+ io.force();
+
+ return io.position();
+ }
+
+ /**
+ * @param idx Index.
+ * @param ver Version.
+ * @param compacted Compacted flag.
+ */
+ @NotNull private static ByteBuffer prepareSerializerVersionBuffer(long idx, int ver, boolean compacted) {
+ ByteBuffer buf = ByteBuffer.allocate(RecordV1Serializer.HEADER_RECORD_SIZE);
+ buf.order(ByteOrder.nativeOrder());
// Write record type.
- buffer.put((byte) (WALRecord.RecordType.HEADER_RECORD.ordinal() + 1));
+ buf.put((byte) (WALRecord.RecordType.HEADER_RECORD.ordinal() + 1));
// Write position.
- RecordV1Serializer.putPosition(buffer, new FileWALPointer(idx, 0, 0));
+ RecordV1Serializer.putPosition(buf, new FileWALPointer(idx, 0, 0));
// Place magic number.
- buffer.putLong(HeaderRecord.MAGIC);
+ buf.putLong(compacted ? HeaderRecord.COMPACTED_MAGIC : HeaderRecord.REGULAR_MAGIC);
// Place serializer version.
- buffer.putInt(version);
+ buf.putInt(ver);
// Place CRC if needed.
if (!RecordV1Serializer.SKIP_CRC) {
- int curPos = buffer.position();
+ int curPos = buf.position();
- buffer.position(0);
+ buf.position(0);
// This call will move buffer position to the end of the record again.
- int crcVal = PureJavaCrc32.calcCrc32(buffer, curPos);
+ int crcVal = PureJavaCrc32.calcCrc32(buf, curPos);
- buffer.putInt(crcVal);
+ buf.putInt(crcVal);
}
else
- buffer.putInt(0);
+ buf.putInt(0);
// Write header record through io.
- buffer.position(0);
-
- do {
- io.write(buffer);
- }
- while (buffer.hasRemaining());
+ buf.position(0);
- // Flush
- io.force();
-
- return io.position();
+ return buf;
}
/**
@@ -1579,11 +1972,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
String fileName = file.getName();
- assert fileName.endsWith(WAL_SEGMENT_FILE_EXT);
-
- int end = fileName.length() - WAL_SEGMENT_FILE_EXT.length();
+ assert fileName.contains(WAL_SEGMENT_FILE_EXT);
- this.idx = idx == null ? Long.parseLong(fileName.substring(0, end)) : idx;
+ this.idx = idx == null ? Long.parseLong(fileName.substring(0, 16)) : idx;
}
/**
@@ -2193,17 +2584,18 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
assert stopped() : "Segment is not closed after close flush: " + head.get();
try {
- int switchSegmentRecSize = RecordV1Serializer.REC_TYPE_SIZE + RecordV1Serializer.FILE_WAL_POINTER_SIZE + RecordV1Serializer.CRC_SIZE;
+ RecordSerializer backwardSerializer = new RecordSerializerFactoryImpl(cctx)
+ .createSerializer(serializerVersion);
- if (rollOver && written < (maxSegmentSize - switchSegmentRecSize)) {
- RecordV1Serializer backwardSerializer =
- new RecordV1Serializer(new RecordDataV1Serializer(cctx), true);
+ SwitchSegmentRecord segmentRecord = new SwitchSegmentRecord();
- final ByteBuffer buf = ByteBuffer.allocate(switchSegmentRecSize);
+ int switchSegmentRecSize = backwardSerializer.size(segmentRecord);
- SwitchSegmentRecord segmentRecord = new SwitchSegmentRecord();
- segmentRecord.position( new FileWALPointer(idx, (int)written, -1));
- backwardSerializer.writeRecord(segmentRecord,buf);
+ if (rollOver && written < (maxSegmentSize - switchSegmentRecSize)) {
+ final ByteBuffer buf = ByteBuffer.allocate(switchSegmentRecSize);
+
+ segmentRecord.position(new FileWALPointer(idx, (int)written, switchSegmentRecSize));
+ backwardSerializer.writeRecord(segmentRecord, buf);
buf.rewind();
@@ -2487,6 +2879,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
private final FileArchiver archiver;
/** */
+ private final FileDecompressor decompressor;
+
+ /** */
private final DataStorageConfiguration psCfg;
/** Optional start pointer. */
@@ -2504,10 +2899,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
* @param start Optional start pointer.
* @param end Optional end pointer.
* @param psCfg Database configuration.
- * @param serializer Serializer of current version to read headers.
+ * @param serializerFactory Serializer factory.
* @param archiver Archiver.
- * @param log Logger
- * @throws IgniteCheckedException If failed to initialize WAL segment.
+ * @param decompressor Decompressor.
+ *@param log Logger @throws IgniteCheckedException If failed to initialize WAL segment.
*/
private RecordsIterator(
GridCacheSharedContext cctx,
@@ -2516,15 +2911,15 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
@Nullable FileWALPointer start,
@Nullable FileWALPointer end,
DataStorageConfiguration psCfg,
- @NotNull RecordSerializer serializer,
+ @NotNull RecordSerializerFactory serializerFactory,
FileIOFactory ioFactory,
FileArchiver archiver,
- IgniteLogger log,
- int tlbSize
+ FileDecompressor decompressor,
+ IgniteLogger log
) throws IgniteCheckedException {
super(log,
cctx,
- serializer,
+ serializerFactory,
ioFactory,
psCfg.getWalRecordIteratorBufferSize());
this.walWorkDir = walWorkDir;
@@ -2533,6 +2928,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
this.archiver = archiver;
this.start = start;
this.end = end;
+ this.decompressor = decompressor;
init();
@@ -2540,6 +2936,26 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
}
/** {@inheritDoc} */
+ @Override protected ReadFileHandle initReadHandle(
+ @NotNull FileDescriptor desc,
+ @Nullable FileWALPointer start
+ ) throws IgniteCheckedException, FileNotFoundException {
+ if (decompressor != null && !desc.file.exists()) {
+ FileDescriptor zipFile = new FileDescriptor(
+ new File(walArchiveDir, FileDescriptor.fileName(desc.getIdx()) + ".zip"));
+
+ if (!zipFile.file.exists()) {
+ throw new FileNotFoundException("Both compressed and raw segment files are missing in archive " +
+ "[segmentIdx=" + desc.idx + "]");
+ }
+
+ decompressor.decompressFile(desc.idx).get();
+ }
+
+ return super.initReadHandle(desc, start);
+ }
+
+ /** {@inheritDoc} */
@Override protected void onClose() throws IgniteCheckedException {
super.onClose();
http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/RecordDataSerializer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/RecordDataSerializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/RecordDataSerializer.java
deleted file mode 100644
index 5a14095..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/RecordDataSerializer.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.persistence.wal;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
-
-/**
- * Interface to provide size, read and write operations with WAL records
- * <b>without any headers and meta information</b>.
- */
-public interface RecordDataSerializer {
- /**
- * Calculates size of record data.
- *
- * @param record WAL record.
- * @return Size of record in bytes.
- * @throws IgniteCheckedException If it's unable to calculate record data size.
- */
- int size(WALRecord record) throws IgniteCheckedException;
-
- /**
- * Reads record data of {@code type} from buffer {@code in}.
- *
- * @param type Record type.
- * @param in Buffer to read.
- * @return WAL record.
- * @throws IOException In case of I/O problems.
- * @throws IgniteCheckedException If it's unable to read record.
- */
- WALRecord readRecord(WALRecord.RecordType type, ByteBufferBackedDataInput in) throws IOException, IgniteCheckedException;
-
- /**
- * Writes record data to buffer {@code buf}.
- *
- * @param record WAL record.
- * @param buf Buffer to write.
- * @throws IgniteCheckedException If it's unable to write record.
- */
- void writeRecord(WALRecord record, ByteBuffer buf) throws IgniteCheckedException;
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/RecordSerializer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/RecordSerializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/RecordSerializer.java
deleted file mode 100644
index 12e16a8..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/RecordSerializer.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.persistence.wal;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.pagemem.wal.WALPointer;
-import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
-
-/**
- * Record serializer.
- */
-public interface RecordSerializer {
- /**
- * @return serializer version
- */
- public int version();
-
- /**
- * Calculates record size in byte including expected wal pointer, CRC and type field
- *
- * @param record Record.
- * @return Size in bytes.
- */
- public int size(WALRecord record) throws IgniteCheckedException;
-
- /**
- * @param record Entry to write.
- * @param buf Buffer.
- */
- public void writeRecord(WALRecord record, ByteBuffer buf) throws IgniteCheckedException;
-
- /**
- * Loads record from input
- *
- * @param in Data input to read data from.
- * @param expPtr expected WAL pointer for record. Used to validate actual position against expected from the file
- * @return Read entry.
- */
- public WALRecord readRecord(FileInput in, WALPointer expPtr) throws IOException, IgniteCheckedException;
-
- /**
- * Flag to write (or not) wal pointer to record
- */
- public boolean writePointer();
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SingleSegmentLogicalRecordsIterator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SingleSegmentLogicalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SingleSegmentLogicalRecordsIterator.java
new file mode 100644
index 0000000..4a846b4
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SingleSegmentLogicalRecordsIterator.java
@@ -0,0 +1,141 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.ignite.internal.processors.cache.persistence.wal;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.util.Set;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.pagemem.wal.WALPointer;
+import org.apache.ignite.internal.pagemem.wal.record.MarshalledRecord;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.wal.record.RecordTypes;
+import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactory;
+import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactoryImpl;
+import org.apache.ignite.internal.util.typedef.CIX1;
+import org.apache.ignite.internal.util.typedef.P2;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Iterates over logical records of one WAL segment from archive. Used for WAL archive compression.
+ * Doesn't deserialize actual record data, returns {@link MarshalledRecord} instances instead.
+ */
+public class SingleSegmentLogicalRecordsIterator extends AbstractWalRecordsIterator {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
+
+ /** Segment initialized flag. */
+ private boolean segmentInitialized;
+
+ /** Archived segment index. */
+ private long archivedSegIdx;
+
+ /** Archive directory. */
+ private File archiveDir;
+
+ /** Closure which is executed right after advance. */
+ private CIX1<WALRecord> advanceC;
+
+ /**
+ * @param log Logger.
+ * @param sharedCtx Shared context.
+ * @param ioFactory Io factory.
+ * @param bufSize Buffer size.
+ * @param archivedSegIdx Archived seg index.
+ * @param archiveDir Directory with segment.
+ * @param advanceC Closure which is executed right after advance.
+ */
+ SingleSegmentLogicalRecordsIterator(
+ @NotNull IgniteLogger log,
+ @NotNull GridCacheSharedContext sharedCtx,
+ @NotNull FileIOFactory ioFactory,
+ int bufSize,
+ long archivedSegIdx,
+ File archiveDir,
+ CIX1<WALRecord> advanceC
+ ) throws IgniteCheckedException {
+ super(log, sharedCtx, initLogicalRecordsSerializerFactory(sharedCtx), ioFactory, bufSize);
+
+ this.archivedSegIdx = archivedSegIdx;
+ this.archiveDir = archiveDir;
+ this.advanceC = advanceC;
+
+ advance();
+ }
+
+ /**
+ * @param sharedCtx Shared context.
+ */
+ private static RecordSerializerFactory initLogicalRecordsSerializerFactory(GridCacheSharedContext sharedCtx)
+ throws IgniteCheckedException {
+
+ return new RecordSerializerFactoryImpl(sharedCtx)
+ .recordDeserializeFilter(new LogicalRecordsFilter())
+ .marshalledMode(true);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected FileWriteAheadLogManager.ReadFileHandle advanceSegment(
+ @Nullable FileWriteAheadLogManager.ReadFileHandle curWalSegment) throws IgniteCheckedException {
+ if (segmentInitialized) {
+ closeCurrentWalSegment();
+ // No advance as we iterate over single segment.
+ return null;
+ }
+ else {
+ segmentInitialized = true;
+
+ FileWriteAheadLogManager.FileDescriptor fd = new FileWriteAheadLogManager.FileDescriptor(
+ new File(archiveDir, FileWriteAheadLogManager.FileDescriptor.fileName(archivedSegIdx)));
+
+ try {
+ return initReadHandle(fd, null);
+ }
+ catch (FileNotFoundException e) {
+ throw new IgniteCheckedException("Missing WAL segment in the archive", e);
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void advance() throws IgniteCheckedException {
+ super.advance();
+
+ if (curRec != null && advanceC != null)
+ advanceC.apply(curRec.get2());
+ }
+
+ /**
+ *
+ */
+ private static class LogicalRecordsFilter implements P2<WALRecord.RecordType, WALPointer> {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
+
+ /** Records type to skip. */
+ private final Set<WALRecord.RecordType> skip = RecordTypes.DELTA_TYPE_SET;
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(WALRecord.RecordType type, WALPointer ptr) {
+ return !skip.contains(type);
+ }
+ }
+}