You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/11/27 13:28:03 UTC
[06/13] ignite git commit: ignite-5938 WAL logs compaction and
compression after checkpoint
http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
index a4d9e95..b36c2db 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
@@ -45,6 +45,7 @@ import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferExp
import org.apache.ignite.internal.processors.cache.persistence.wal.FileInput;
import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
+import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactoryImpl;
import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer;
import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
import org.apache.ignite.internal.util.typedef.F;
@@ -108,7 +109,7 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator {
) throws IgniteCheckedException {
super(log,
sharedCtx,
- FileWriteAheadLogManager.forVersion(sharedCtx, FileWriteAheadLogManager.LATEST_SERIALIZER_VERSION),
+ new RecordSerializerFactoryImpl(sharedCtx),
ioFactory,
BUF_SIZE);
this.keepBinary = keepBinary;
@@ -136,7 +137,7 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator {
@NotNull File... walFiles) throws IgniteCheckedException {
super(log,
sharedCtx,
- FileWriteAheadLogManager.forVersion(sharedCtx, FileWriteAheadLogManager.LATEST_SERIALIZER_VERSION),
+ new RecordSerializerFactoryImpl(sharedCtx),
ioFactory,
BUF_SIZE);
http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/record/HeaderRecord.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/record/HeaderRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/record/HeaderRecord.java
index 4fa6232..35c94a8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/record/HeaderRecord.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/record/HeaderRecord.java
@@ -24,8 +24,11 @@ import org.apache.ignite.internal.util.typedef.internal.S;
* Header record.
*/
public class HeaderRecord extends WALRecord {
- /** */
- public static final long MAGIC = 0xB0D045A_CE7ED045AL;
+ /** Magic of regular WAL segment. */
+ public static final long REGULAR_MAGIC = 0xB0D045A_CE7ED045AL;
+
+ /** Magic of WAL segment with skipped physical records. */
+ public static final long COMPACTED_MAGIC = 0x4E07AE0_E573A694EL;
/** Serializer version */
private final int ver;
http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/record/RecordTypes.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/record/RecordTypes.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/record/RecordTypes.java
new file mode 100644
index 0000000..9654748
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/record/RecordTypes.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal.record;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+
+/**
+ * Utility class for handling WAL record types.
+ */
+public final class RecordTypes {
+ /** */
+ public static final Set<WALRecord.RecordType> DELTA_TYPE_SET = new HashSet<>();
+
+ static {
+ DELTA_TYPE_SET.add(WALRecord.RecordType.PAGE_RECORD);
+ DELTA_TYPE_SET.add(WALRecord.RecordType.INIT_NEW_PAGE_RECORD);
+ DELTA_TYPE_SET.add(WALRecord.RecordType.DATA_PAGE_INSERT_RECORD);
+ DELTA_TYPE_SET.add(WALRecord.RecordType.DATA_PAGE_INSERT_FRAGMENT_RECORD);
+ DELTA_TYPE_SET.add(WALRecord.RecordType.DATA_PAGE_REMOVE_RECORD);
+ DELTA_TYPE_SET.add(WALRecord.RecordType.DATA_PAGE_SET_FREE_LIST_PAGE);
+ DELTA_TYPE_SET.add(WALRecord.RecordType.BTREE_META_PAGE_INIT_ROOT);
+ DELTA_TYPE_SET.add(WALRecord.RecordType.BTREE_META_PAGE_ADD_ROOT);
+ DELTA_TYPE_SET.add(WALRecord.RecordType.BTREE_META_PAGE_CUT_ROOT);
+ DELTA_TYPE_SET.add(WALRecord.RecordType.BTREE_INIT_NEW_ROOT);
+ DELTA_TYPE_SET.add(WALRecord.RecordType.BTREE_PAGE_RECYCLE);
+ DELTA_TYPE_SET.add(WALRecord.RecordType.BTREE_PAGE_INSERT);
+ DELTA_TYPE_SET.add(WALRecord.RecordType.BTREE_FIX_LEFTMOST_CHILD);
+ DELTA_TYPE_SET.add(WALRecord.RecordType.BTREE_FIX_COUNT);
+ DELTA_TYPE_SET.add(WALRecord.RecordType.BTREE_PAGE_REPLACE);
+ DELTA_TYPE_SET.add(WALRecord.RecordType.BTREE_PAGE_REMOVE);
+ DELTA_TYPE_SET.add(WALRecord.RecordType.BTREE_PAGE_INNER_REPLACE);
+ DELTA_TYPE_SET.add(WALRecord.RecordType.BTREE_FIX_REMOVE_ID);
+ DELTA_TYPE_SET.add(WALRecord.RecordType.BTREE_FORWARD_PAGE_SPLIT);
+ DELTA_TYPE_SET.add(WALRecord.RecordType.BTREE_EXISTING_PAGE_SPLIT);
+ DELTA_TYPE_SET.add(WALRecord.RecordType.BTREE_PAGE_MERGE);
+ DELTA_TYPE_SET.add(WALRecord.RecordType.PAGES_LIST_SET_NEXT);
+ DELTA_TYPE_SET.add(WALRecord.RecordType.PAGES_LIST_SET_PREVIOUS);
+ DELTA_TYPE_SET.add(WALRecord.RecordType.PAGES_LIST_INIT_NEW_PAGE);
+ DELTA_TYPE_SET.add(WALRecord.RecordType.PAGES_LIST_ADD_PAGE);
+ DELTA_TYPE_SET.add(WALRecord.RecordType.PAGES_LIST_REMOVE_PAGE);
+ DELTA_TYPE_SET.add(WALRecord.RecordType.META_PAGE_INIT);
+ DELTA_TYPE_SET.add(WALRecord.RecordType.PARTITION_META_PAGE_UPDATE_COUNTERS);
+ DELTA_TYPE_SET.add(WALRecord.RecordType.TRACKING_PAGE_DELTA);
+ DELTA_TYPE_SET.add(WALRecord.RecordType.META_PAGE_UPDATE_LAST_SUCCESSFUL_SNAPSHOT_ID);
+ DELTA_TYPE_SET.add(WALRecord.RecordType.META_PAGE_UPDATE_LAST_SUCCESSFUL_FULL_SNAPSHOT_ID);
+ DELTA_TYPE_SET.add(WALRecord.RecordType.META_PAGE_UPDATE_NEXT_SNAPSHOT_ID);
+ DELTA_TYPE_SET.add(WALRecord.RecordType.META_PAGE_UPDATE_LAST_ALLOCATED_INDEX);
+ DELTA_TYPE_SET.add(WALRecord.RecordType.PAGE_LIST_META_RESET_COUNT_RECORD);
+ DELTA_TYPE_SET.add(WALRecord.RecordType.DATA_PAGE_UPDATE_RECORD);
+ DELTA_TYPE_SET.add(WALRecord.RecordType.BTREE_META_PAGE_INIT_ROOT2);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataSerializer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataSerializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataSerializer.java
new file mode 100644
index 0000000..88c5f42
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataSerializer.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal.serializer;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferBackedDataInput;
+
+/**
+ * Interface to provide size, read and write operations with WAL records
+ * <b>without any headers and meta information</b>.
+ */
+public interface RecordDataSerializer {
+ /**
+ * Calculates size of record data.
+ *
+ * @param record WAL record.
+ * @return Size of record in bytes.
+ * @throws IgniteCheckedException If it's unable to calculate record data size.
+ */
+ int size(WALRecord record) throws IgniteCheckedException;
+
+ /**
+ * Reads record data of {@code type} from buffer {@code in}.
+ *
+ * @param type Record type.
+ * @param in Buffer to read.
+ * @return WAL record.
+ * @throws IOException In case of I/O problems.
+ * @throws IgniteCheckedException If it's unable to read record.
+ */
+ WALRecord readRecord(WALRecord.RecordType type, ByteBufferBackedDataInput in) throws IOException, IgniteCheckedException;
+
+ /**
+ * Writes record data to buffer {@code buf}.
+ *
+ * @param record WAL record.
+ * @param buf Buffer to write.
+ * @throws IgniteCheckedException If it's unable to write record.
+ */
+ void writeRecord(WALRecord record, ByteBuffer buf) throws IgniteCheckedException;
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
index e583df3..d478917 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
@@ -87,7 +87,6 @@ import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusInne
import org.apache.ignite.internal.processors.cache.persistence.tree.io.CacheVersionIO;
import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferBackedDataInput;
import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
-import org.apache.ignite.internal.processors.cache.persistence.wal.RecordDataSerializer;
import org.apache.ignite.internal.processors.cache.persistence.wal.record.HeaderRecord;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
@@ -283,8 +282,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
return /*cacheId*/ 4 + /*pageId*/ 8;
case SWITCH_SEGMENT_RECORD:
- // CRC is not loaded for switch segment.
- return -CRC_SIZE;
+ return 0;
case TX_RECORD:
return txRecordSerializer.sizeOfTxRecord((TxRecord)record);
@@ -391,9 +389,8 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
case HEADER_RECORD:
long magic = in.readLong();
- if (magic != HeaderRecord.MAGIC)
- throw new EOFException("Magic is corrupted [exp=" + U.hexLong(HeaderRecord.MAGIC) +
- ", actual=" + U.hexLong(magic) + ']');
+ if (magic != HeaderRecord.REGULAR_MAGIC && magic != HeaderRecord.COMPACTED_MAGIC)
+ throw new EOFException("Magic is corrupted [actual=" + U.hexLong(magic) + ']');
int ver = in.readInt();
@@ -912,7 +909,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
break;
case HEADER_RECORD:
- buf.putLong(HeaderRecord.MAGIC);
+ buf.putLong(HeaderRecord.REGULAR_MAGIC);
buf.putInt(((HeaderRecord)record).version());
http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java
index c02f36e..16a81a4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java
@@ -24,9 +24,9 @@ import java.util.List;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
+import org.apache.ignite.internal.pagemem.wal.record.SnapshotRecord;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferBackedDataInput;
-import org.apache.ignite.internal.processors.cache.persistence.wal.RecordDataSerializer;
import org.apache.ignite.internal.processors.cache.persistence.wal.record.HeaderRecord;
/**
@@ -54,6 +54,9 @@ public class RecordDataV2Serializer implements RecordDataSerializer {
case DATA_RECORD:
return delegateSerializer.size(record) + 8/*timestamp*/;
+ case SNAPSHOT:
+ return 8 + 1;
+
default:
return delegateSerializer.size(record);
}
@@ -76,6 +79,12 @@ public class RecordDataV2Serializer implements RecordDataSerializer {
return new DataRecord(entries, timeStamp);
+ case SNAPSHOT:
+ long snpId = in.readLong();
+ byte full = in.readByte();
+
+ return new SnapshotRecord(snpId, full == 1);
+
default:
return delegateSerializer.readRecord(type, in);
}
@@ -98,6 +107,14 @@ public class RecordDataV2Serializer implements RecordDataSerializer {
break;
+ case SNAPSHOT:
+ SnapshotRecord snpRec = (SnapshotRecord)record;
+
+ buf.putLong(snpRec.getSnapshotId());
+ buf.put(snpRec.isFull() ? (byte)1 : 0);
+
+ break;
+
default:
delegateSerializer.writeRecord(record, buf);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializer.java
new file mode 100644
index 0000000..c5760ab
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializer.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal.serializer;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.wal.WALPointer;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileInput;
+
+/**
+ * Record serializer.
+ */
+public interface RecordSerializer {
+ /**
+ * @return serializer version
+ */
+ public int version();
+
+ /**
+ * Calculates record size in byte including expected wal pointer, CRC and type field
+ *
+ * @param record Record.
+ * @return Size in bytes.
+ */
+ public int size(WALRecord record) throws IgniteCheckedException;
+
+ /**
+ * @param record Entry to write.
+ * @param buf Buffer.
+ */
+ public void writeRecord(WALRecord record, ByteBuffer buf) throws IgniteCheckedException;
+
+ /**
+ * Loads record from input
+ *
+ * @param in Data input to read data from.
+ * @param expPtr expected WAL pointer for record. Used to validate actual position against expected from the file
+ * @return Read entry.
+ */
+ public WALRecord readRecord(FileInput in, WALPointer expPtr) throws IOException, IgniteCheckedException;
+
+ /**
+ * Flag to write (or not) wal pointer to record
+ */
+ public boolean writePointer();
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializerFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializerFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializerFactory.java
new file mode 100644
index 0000000..f46c315
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializerFactory.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal.serializer;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.wal.WALPointer;
+import org.apache.ignite.internal.pagemem.wal.record.FilteredRecord;
+import org.apache.ignite.internal.pagemem.wal.record.MarshalledRecord;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+import org.apache.ignite.lang.IgniteBiPredicate;
+
+/**
+ * Factory for creating {@link RecordSerializer}.
+ */
+public interface RecordSerializerFactory {
+ /**
+ * Factory method for creation {@link RecordSerializer}.
+ *
+ * @param ver Serializer version.
+ * @return record serializer.
+ */
+ public RecordSerializer createSerializer(int ver) throws IgniteCheckedException;
+
+ /**
+ * TODO: This flag was added under IGNITE-6029, but still unused. Should be either handled or removed.
+ *
+ * @param writePointer Write pointer flag.
+ */
+ public RecordSerializerFactory writePointer(boolean writePointer);
+
+ /**
+ * Specifies deserialization filter. Created serializer will read bulk {@link FilteredRecord} instead of actual
+ * record if record type/pointer doesn't satisfy filter.
+ *
+ * @param readTypeFilter Read type filter.
+ */
+ public RecordSerializerFactory recordDeserializeFilter(IgniteBiPredicate<WALRecord.RecordType, WALPointer> readTypeFilter);
+
+ /**
+ * If marshalledMode is on, created serializer will read {@link MarshalledRecord} with raw binary data instead of
+ * actual record.
+ * Useful for copying binary data from WAL.
+ *
+ * @param marshalledMode Marshalled mode.
+ */
+ public RecordSerializerFactory marshalledMode(boolean marshalledMode);
+
+ /**
+ * If skipPositionCheck is true, created serializer won't check that actual position of record in file is equal to
+ * position in saved record's WALPointer.
+ * Must be true if we are reading from compacted WAL segment.
+ *
+ * @param skipPositionCheck Skip position check.
+ */
+ public RecordSerializerFactory skipPositionCheck(boolean skipPositionCheck);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializerFactoryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializerFactoryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializerFactoryImpl.java
new file mode 100644
index 0000000..468392a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializerFactoryImpl.java
@@ -0,0 +1,133 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.ignite.internal.processors.cache.persistence.wal.serializer;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.wal.WALPointer;
+import org.apache.ignite.internal.pagemem.wal.record.MarshalledRecord;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.lang.IgniteBiPredicate;
+
+/**
+ *
+ */
+public class RecordSerializerFactoryImpl implements RecordSerializerFactory {
+ /** Context. */
+ private GridCacheSharedContext cctx;
+
+ /** Write pointer. */
+ private boolean writePointer;
+
+ /** Read record filter. */
+ private IgniteBiPredicate<WALRecord.RecordType, WALPointer> recordDeserializeFilter;
+
+ /**
+ * Marshalled mode flag.
+ * Records are not deserialized in this mode, {@link MarshalledRecord} with binary representation are read instead.
+ */
+ private boolean marshalledMode;
+
+ /** Skip position check flag. Should be set for reading compacted wal file with skipped physical records. */
+ private boolean skipPositionCheck;
+
+ /**
+ * @param cctx Cctx.
+ */
+ public RecordSerializerFactoryImpl(GridCacheSharedContext cctx) {
+ this.cctx = cctx;
+ }
+
+ /** {@inheritDoc} */
+ @Override public RecordSerializer createSerializer(int ver) throws IgniteCheckedException {
+ if (ver <= 0)
+ throw new IgniteCheckedException("Failed to create a serializer (corrupted WAL file).");
+
+ switch (ver) {
+ case 1:
+ return new RecordV1Serializer(new RecordDataV1Serializer(cctx),
+ writePointer, marshalledMode, skipPositionCheck, recordDeserializeFilter);
+
+ case 2:
+ RecordDataV2Serializer dataV2Serializer = new RecordDataV2Serializer(new RecordDataV1Serializer(cctx));
+
+ return new RecordV2Serializer(dataV2Serializer,
+ writePointer, marshalledMode, skipPositionCheck, recordDeserializeFilter);
+
+ default:
+ throw new IgniteCheckedException("Failed to create a serializer with the given version " +
+ "(forward compatibility is not supported): " + ver);
+ }
+ }
+
+ /**
+ * @return Write pointer.
+ */
+ public boolean writePointer() {
+ return writePointer;
+ }
+
+ /** {@inheritDoc} */
+ @Override public RecordSerializerFactoryImpl writePointer(boolean writePointer) {
+ this.writePointer = writePointer;
+
+ return this;
+ }
+
+ /**
+ * @return Read type filter.
+ */
+ public IgniteBiPredicate<WALRecord.RecordType, WALPointer> recordDeserializeFilter() {
+ return recordDeserializeFilter;
+ }
+
+ /** {@inheritDoc} */
+ @Override public RecordSerializerFactoryImpl recordDeserializeFilter(
+ IgniteBiPredicate<WALRecord.RecordType, WALPointer> readTypeFilter) {
+ this.recordDeserializeFilter = readTypeFilter;
+
+ return this;
+ }
+
+ /**
+ * @return Marshalled mode. Records are not deserialized in this mode, with binary representation are read instead.
+ */
+ public boolean marshalledMode() {
+ return marshalledMode;
+ }
+
+ /** {@inheritDoc} */
+ @Override public RecordSerializerFactoryImpl marshalledMode(boolean marshalledMode) {
+ this.marshalledMode = marshalledMode;
+
+ return this;
+ }
+
+ /**
+ * @return Skip position check flag. Should be set for reading compacted wal file with skipped physical records.
+ */
+ public boolean skipPositionCheck() {
+ return skipPositionCheck;
+ }
+
+ /** {@inheritDoc} */
+ @Override public RecordSerializerFactoryImpl skipPositionCheck(boolean skipPositionCheck) {
+ this.skipPositionCheck = skipPositionCheck;
+
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java
index c4e1bf2..d460705 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java
@@ -21,22 +21,26 @@ import java.io.DataInput;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.internal.pagemem.wal.WALPointer;
+import org.apache.ignite.internal.pagemem.wal.record.FilteredRecord;
+import org.apache.ignite.internal.pagemem.wal.record.MarshalledRecord;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.CacheVersionIO;
import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferBackedDataInput;
import org.apache.ignite.internal.processors.cache.persistence.wal.FileInput;
import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
-import org.apache.ignite.internal.processors.cache.persistence.wal.RecordSerializer;
import org.apache.ignite.internal.processors.cache.persistence.wal.SegmentEofException;
import org.apache.ignite.internal.processors.cache.persistence.wal.WalSegmentTailReachedException;
import org.apache.ignite.internal.processors.cache.persistence.wal.crc.PureJavaCrc32;
import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.io.RecordIO;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgniteBiPredicate;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_SKIP_CRC;
@@ -72,6 +76,32 @@ public class RecordV1Serializer implements RecordSerializer {
/** Write pointer. */
private final boolean writePointer;
+ /**
+ * Record type filter.
+ * {@link FilteredRecord} is deserialized instead of original record if type doesn't match filter.
+ */
+ private final IgniteBiPredicate<RecordType, WALPointer> recordFilter;
+
+ /** Skip position check flag. Should be set for reading compacted wal file with skipped physical records. */
+ private final boolean skipPositionCheck;
+
+ /**
+ * Marshalled mode.
+ * Records are not deserialized in this mode, {@link MarshalledRecord} with binary representation are read instead.
+ */
+ private final boolean marshalledMode;
+
+ /** Thread-local heap byte buffer. */
+ private final ThreadLocal<ByteBuffer> heapTlb = new ThreadLocal<ByteBuffer>() {
+ @Override protected ByteBuffer initialValue() {
+ ByteBuffer buf = ByteBuffer.allocate(4096);
+
+ buf.order(GridUnsafe.NATIVE_BYTE_ORDER);
+
+ return buf;
+ }
+ };
+
/** Record read/write functional interface. */
private final RecordIO recordIO = new RecordIO() {
@@ -89,11 +119,36 @@ public class RecordV1Serializer implements RecordSerializer {
FileWALPointer ptr = readPosition(in);
- if (!F.eq(ptr, expPtr))
+ if (!skipPositionCheck && !F.eq(ptr, expPtr))
throw new SegmentEofException("WAL segment rollover detected (will end iteration) [expPtr=" + expPtr +
", readPtr=" + ptr + ']', null);
- return dataSerializer.readRecord(recType, in);
+ final WALRecord rec = dataSerializer.readRecord(recType, in);
+
+ if (recordFilter != null && !recordFilter.apply(rec.type(), ptr))
+ return new FilteredRecord();
+ else if (marshalledMode) {
+ ByteBuffer buf = heapTlb.get();
+
+ int recordSize = size(rec);
+
+ if (buf.capacity() < recordSize)
+ heapTlb.set(buf = ByteBuffer.allocate(recordSize * 3 / 2).order(ByteOrder.nativeOrder()));
+ else
+ buf.clear();
+
+ rec.position(ptr);
+
+ writeRecord(rec, buf);
+
+ buf.flip();
+
+ assert buf.remaining() == recordSize;
+
+ return new MarshalledRecord(rec.type(), rec.position(), buf);
+ }
+ else
+ return rec;
}
/** {@inheritDoc} */
@@ -111,13 +166,19 @@ public class RecordV1Serializer implements RecordSerializer {
/**
* Create an instance of V1 serializer.
- *
* @param dataSerializer V1 data serializer.
* @param writePointer Write pointer.
+ * @param marshalledMode Marshalled mode.
+ * @param skipPositionCheck Skip position check mode.
+ * @param recordFilter Record type filter. {@link FilteredRecord} is deserialized instead of original record
*/
- public RecordV1Serializer(RecordDataV1Serializer dataSerializer, boolean writePointer) {
+ public RecordV1Serializer(RecordDataV1Serializer dataSerializer, boolean writePointer,
+ boolean marshalledMode, boolean skipPositionCheck, IgniteBiPredicate<RecordType, WALPointer> recordFilter) {
this.dataSerializer = dataSerializer;
this.writePointer = writePointer;
+ this.recordFilter = recordFilter;
+ this.skipPositionCheck = skipPositionCheck;
+ this.marshalledMode = marshalledMode;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java
index 5eb45ac..05f2a24 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java
@@ -18,21 +18,26 @@
package org.apache.ignite.internal.processors.cache.persistence.wal.serializer;
import java.io.DataInput;
+import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.pagemem.wal.WALPointer;
+import org.apache.ignite.internal.pagemem.wal.record.FilteredRecord;
+import org.apache.ignite.internal.pagemem.wal.record.MarshalledRecord;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferBackedDataInput;
import org.apache.ignite.internal.processors.cache.persistence.wal.FileInput;
import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
-import org.apache.ignite.internal.processors.cache.persistence.wal.RecordSerializer;
import org.apache.ignite.internal.processors.cache.persistence.wal.SegmentEofException;
import org.apache.ignite.internal.processors.cache.persistence.wal.WalSegmentTailReachedException;
import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.io.RecordIO;
+import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgniteBiPredicate;
-import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.*;
+import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType;
import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.CRC_SIZE;
import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.REC_TYPE_SIZE;
@@ -57,6 +62,32 @@ public class RecordV2Serializer implements RecordSerializer {
/** Write pointer. */
private final boolean writePointer;
+ /**
+ * Marshalled mode.
+ * Records are not deserialized in this mode, {@link MarshalledRecord} with binary representation are read instead.
+ */
+ private final boolean marshalledMode;
+
+ /** Skip position check flag. Should be set for reading compacted wal file with skipped physical records. */
+ private final boolean skipPositionCheck;
+
+ /** Thread-local heap byte buffer. */
+ private final ThreadLocal<ByteBuffer> heapTlb = new ThreadLocal<ByteBuffer>() {
+ @Override protected ByteBuffer initialValue() {
+ ByteBuffer buf = ByteBuffer.allocate(4096);
+
+ buf.order(GridUnsafe.NATIVE_BYTE_ORDER);
+
+ return buf;
+ }
+ };
+
+ /**
+ * Record type filter.
+ * {@link FilteredRecord} is deserialized instead of original record if type doesn't match filter.
+ */
+ private final IgniteBiPredicate<RecordType, WALPointer> recordFilter;
+
/** Record read/write functional interface. */
private final RecordIO recordIO = new RecordIO() {
@@ -75,9 +106,47 @@ public class RecordV2Serializer implements RecordSerializer {
if (recType == WALRecord.RecordType.SWITCH_SEGMENT_RECORD)
throw new SegmentEofException("Reached end of segment", null);
- FileWALPointer ptr = readPositionAndCheckPoint(in, expPtr);
+ FileWALPointer ptr = readPositionAndCheckPoint(in, expPtr, skipPositionCheck);
+
+ if (recordFilter != null && !recordFilter.apply(recType, ptr)) {
+ int toSkip = ptr.length() - REC_TYPE_SIZE - FILE_WAL_POINTER_SIZE - CRC_SIZE;
+
+ assert toSkip >= 0 : "Too small saved record length: " + ptr;
+
+ if (in.skipBytes(toSkip) < toSkip)
+ throw new EOFException("Reached end of file while reading record: " + ptr);
+
+ return new FilteredRecord();
+ }
+ else if (marshalledMode) {
+ ByteBuffer buf = heapTlb.get();
+
+ if (buf.capacity() < ptr.length())
+ heapTlb.set(buf = ByteBuffer.allocate(ptr.length() * 3 / 2).order(ByteOrder.nativeOrder()));
+ else
+ buf.clear();
+
+ buf.put((byte)(recType.ordinal() + 1));
+
+ buf.putLong(ptr.index());
+ buf.putInt(ptr.fileOffset());
+ buf.putInt(ptr.length());
+
+ in.readFully(buf.array(), buf.position(), ptr.length() - buf.position());
+ buf.position(ptr.length());
+
+ // Unwind reading CRC.
+ in.buffer().position(in.buffer().position() - CRC_SIZE);
+
+ buf.flip();
+
+ assert buf.remaining() == ptr.length();
+
+ return new MarshalledRecord(recType, ptr, buf);
+ }
+ else
+ return dataSerializer.readRecord(recType, in);
- return dataSerializer.readRecord(recType, in);
}
/** {@inheritDoc} */
@@ -98,12 +167,18 @@ public class RecordV2Serializer implements RecordSerializer {
/**
* Create an instance of Record V2 serializer.
- *
* @param dataSerializer V2 data serializer.
+ * @param marshalledMode Marshalled mode.
+ * @param skipPositionCheck Skip position check mode.
+ * @param recordFilter Record type filter. {@link FilteredRecord} is deserialized instead of original record
*/
- public RecordV2Serializer(RecordDataV2Serializer dataSerializer, boolean writePointer) {
+ public RecordV2Serializer(RecordDataV2Serializer dataSerializer, boolean writePointer,
+ boolean marshalledMode, boolean skipPositionCheck, IgniteBiPredicate<RecordType, WALPointer> recordFilter) {
this.dataSerializer = dataSerializer;
this.writePointer = writePointer;
+ this.marshalledMode = marshalledMode;
+ this.skipPositionCheck = skipPositionCheck;
+ this.recordFilter = recordFilter;
}
/** {@inheritDoc} */
@@ -133,12 +208,14 @@ public class RecordV2Serializer implements RecordSerializer {
/**
* @param in Data input to read pointer from.
+ * @param skipPositionCheck Flag for skipping position check.
* @return Read file WAL pointer.
* @throws IOException If failed to write.
*/
public static FileWALPointer readPositionAndCheckPoint(
DataInput in,
- WALPointer expPtr
+ WALPointer expPtr,
+ boolean skipPositionCheck
) throws IgniteCheckedException, IOException {
long idx = in.readLong();
int fileOffset = in.readInt();
@@ -146,7 +223,7 @@ public class RecordV2Serializer implements RecordSerializer {
FileWALPointer p = (FileWALPointer)expPtr;
- if (!F.eq(idx, p.index()) || !F.eq(fileOffset, p.fileOffset()))
+ if (!F.eq(idx, p.index()) || (!skipPositionCheck && !F.eq(fileOffset, p.fileOffset())))
throw new WalSegmentTailReachedException(
"WAL segment tail is reached. [ " +
"Expected next state: {Index=" + p.index() + ",Offset=" + p.fileOffset() + "}, " +
http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
index a8df0c6..0a278d1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
@@ -1680,6 +1680,7 @@ public class PlatformConfigurationUtils {
.setMetricsRateTimeInterval(in.readLong())
.setCheckpointWriteOrder(CheckpointWriteOrder.fromOrdinal(in.readInt()))
.setWriteThrottlingEnabled(in.readBoolean())
+ .setWalCompactionEnabled(in.readBoolean())
.setSystemRegionInitialSize(in.readLong())
.setSystemRegionMaxSize(in.readLong())
.setPageSize(in.readInt())
@@ -1774,6 +1775,7 @@ public class PlatformConfigurationUtils {
w.writeLong(cfg.getMetricsRateTimeInterval());
w.writeInt(cfg.getCheckpointWriteOrder().ordinal());
w.writeBoolean(cfg.isWriteThrottlingEnabled());
+ w.writeBoolean(cfg.isWalCompactionEnabled());
w.writeLong(cfg.getSystemRegionInitialSize());
w.writeLong(cfg.getSystemRegionMaxSize());
w.writeInt(cfg.getPageSize());
http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java
index af8f679..107b467 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.persistence.db.wal;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.file.OpenOption;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.CacheAtomicityMode;
@@ -41,7 +42,6 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
-import java.nio.file.OpenOption;
import static java.nio.file.StandardOpenOption.CREATE;
import static java.nio.file.StandardOpenOption.READ;
@@ -120,16 +120,17 @@ public class IgniteWalFlushFailoverTest extends GridCommonAbstractTest {
*/
private void flushingErrorTest() throws Exception {
final IgniteEx grid = startGrid(0);
- grid.active(true);
- IgniteCache<Object, Object> cache = grid.cache(TEST_CACHE);
+ try {
+ grid.active(true);
- final int iterations = 100;
+ IgniteCache<Object, Object> cache = grid.cache(TEST_CACHE);
+
+ final int iterations = 100;
- try {
for (int i = 0; i < iterations; i++) {
Transaction tx = grid.transactions().txStart(
- TransactionConcurrency.PESSIMISTIC, TransactionIsolation.READ_COMMITTED);
+ TransactionConcurrency.PESSIMISTIC, TransactionIsolation.READ_COMMITTED);
cache.put(i, "testValue" + i);
@@ -144,8 +145,7 @@ public class IgniteWalFlushFailoverTest extends GridCommonAbstractTest {
// We should await successful stop of node.
GridTestUtils.waitForCondition(new GridAbsPredicate() {
- @Override
- public boolean apply() {
+ @Override public boolean apply() {
return grid.context().gateway().getState() == GridKernalState.STOPPED;
}
}, getTestTimeout());
http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java
index 35d85d1..c6d58e5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java
@@ -27,9 +27,9 @@ import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java
index b357877..00627b8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java
@@ -113,7 +113,7 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest {
/** */
private int walSegmentSize;
- /** Logger only. */
+ /** Log only. */
private boolean logOnly;
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryWithCompactionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryWithCompactionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryWithCompactionTest.java
new file mode 100644
index 0000000..d927676
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryWithCompactionTest.java
@@ -0,0 +1,33 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.ignite.internal.processors.cache.persistence.db.wal;
+
+import org.apache.ignite.configuration.IgniteConfiguration;
+
+/**
+ *
+ */
+public class IgniteWalRecoveryWithCompactionTest extends IgniteWalRecoveryTest {
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.getDataStorageConfiguration().setWalCompactionEnabled(true);
+
+ return cfg;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalSerializerVersionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalSerializerVersionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalSerializerVersionTest.java
index 7500fdc..60fb1d0 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalSerializerVersionTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalSerializerVersionTest.java
@@ -34,7 +34,7 @@ import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
import org.apache.ignite.internal.pagemem.wal.record.TimeStampRecord;
import org.apache.ignite.internal.pagemem.wal.record.TxRecord;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
-import org.apache.ignite.internal.processors.cache.persistence.wal.RecordSerializer;
+import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer;
import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer;
import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV2Serializer;
import org.apache.ignite.internal.util.typedef.internal.GPC;
@@ -76,6 +76,8 @@ public class IgniteWalSerializerVersionTest extends GridCommonAbstractTest {
* @throws Exception If failed.
*/
public void testCheckDifferentSerializerVersions() throws Exception {
+ System.setProperty(IGNITE_WAL_SERIALIZER_VERSION, "1");
+
IgniteEx ig0 = (IgniteEx)startGrid();
IgniteWriteAheadLogManager wal0 = ig0.context().cache().context().wal();
@@ -302,6 +304,11 @@ public class IgniteWalSerializerVersionTest extends GridCommonAbstractTest {
deleteWorkFiles();
}
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ System.clearProperty(IGNITE_WAL_SERIALIZER_VERSION);
+ }
+
/**
* @throws IgniteCheckedException If failed.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java
new file mode 100644
index 0000000..6b9f06a
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java
@@ -0,0 +1,312 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.ignite.internal.processors.cache.persistence.db.wal;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.util.Arrays;
+import java.util.Comparator;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.pagemem.FullPageId;
+import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_PAGE_SIZE;
+
+/**
+ *
+ */
+public class WalCompactionTest extends GridCommonAbstractTest {
+ /** Ip finder. */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** Wal segment size. */
+ private static final int WAL_SEGMENT_SIZE = 4 * 1024 * 1024;
+
+ /** Cache name. */
+ public static final String CACHE_NAME = "cache";
+
+ /** Entries count. */
+ public static final int ENTRIES = 1000;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String name) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(name);
+
+ cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER));
+
+ cfg.setDataStorageConfiguration(new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+ .setPersistenceEnabled(true)
+ .setMaxSize(200 * 1024 * 1024))
+ .setWalMode(WALMode.LOG_ONLY)
+ .setWalSegmentSize(WAL_SEGMENT_SIZE)
+ .setWalHistorySize(500)
+ .setWalCompactionEnabled(true));
+
+ CacheConfiguration ccfg = new CacheConfiguration();
+
+ ccfg.setName("cache");
+ ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+ ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+ ccfg.setAffinity(new RendezvousAffinityFunction(false, 16));
+ ccfg.setBackups(0);
+
+ cfg.setCacheConfiguration(ccfg);
+ cfg.setConsistentId(name);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ stopAllGrids();
+
+ deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false));
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testApplyingUpdatesFromCompactedWal() throws Exception {
+ IgniteEx ig = (IgniteEx)startGrids(3);
+ ig.active(true);
+
+ IgniteCache<Integer, byte[]> cache = ig.cache("cache");
+
+ for (int i = 0; i < ENTRIES; i++) { // At least 20MB of raw data in total.
+ final byte[] val = new byte[20000];
+
+ val[i] = 1;
+
+ cache.put(i, val);
+ }
+
+ // Spam WAL to move all data records to compressible WAL zone.
+ for (int i = 0; i < WAL_SEGMENT_SIZE / DFLT_PAGE_SIZE * 2; i++)
+ ig.context().cache().context().wal().log(new PageSnapshot(new FullPageId(-1, -1), new byte[DFLT_PAGE_SIZE]));
+
+ // WAL archive segment is allowed to be compressed when it's at least one checkpoint away from current WAL head.
+ ig.context().cache().context().database().wakeupForCheckpoint("Forced checkpoint").get();
+ ig.context().cache().context().database().wakeupForCheckpoint("Forced checkpoint").get();
+
+ Thread.sleep(15_000); // Allow compressor to archive WAL segments.
+
+ String nodeFolderName = ig.context().pdsFolderResolver().resolveFolders().folderName();
+
+ File dbDir = U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false);
+ File walDir = new File(dbDir, "wal");
+ File archiveDir = new File(walDir, "archive");
+ File nodeArchiveDir = new File(archiveDir, nodeFolderName);
+ File walSegment = new File(nodeArchiveDir, FileWriteAheadLogManager.FileDescriptor.fileName(0) + ".zip");
+
+ assertTrue(walSegment.exists());
+ assertTrue(walSegment.length() < WAL_SEGMENT_SIZE / 2); // Should be compressed at least in half.
+
+ stopAllGrids();
+
+ File nodeLfsDir = new File(dbDir, nodeFolderName);
+ File cpMarkersDir = new File(nodeLfsDir, "cp");
+
+ File[] cpMarkers = cpMarkersDir.listFiles();
+
+ assertNotNull(cpMarkers);
+ assertTrue(cpMarkers.length > 0);
+
+ File cacheDir = new File(nodeLfsDir, "cache-" + CACHE_NAME);
+ File[] lfsFiles = cacheDir.listFiles();
+
+ assertNotNull(lfsFiles);
+ assertTrue(lfsFiles.length > 0);
+
+ // Enforce reading WAL from the very beginning at the next start.
+ for (File f : cpMarkers)
+ f.delete();
+
+ for (File f : lfsFiles)
+ f.delete();
+
+ ig = (IgniteEx)startGrids(3);
+ ig.active(true);
+
+ cache = ig.cache(CACHE_NAME);
+
+ boolean fail = false;
+
+ // Check that all data is recovered from compacted WAL.
+ for (int i = 0; i < ENTRIES; i++) {
+ byte[] arr = cache.get(i);
+
+ if (arr == null) {
+ System.out.println(">>> Missing: " + i);
+
+ fail = true;
+ }
+ else if (arr[i] != 1) {
+ System.out.println(">>> Corrupted: " + i);
+
+ fail = true;
+ }
+ }
+
+ assertFalse(fail);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testSeekingStartInCompactedSegment() throws Exception {
+ IgniteEx ig = (IgniteEx)startGrids(3);
+ ig.active(true);
+
+ IgniteCache<Integer, byte[]> cache = ig.cache("cache");
+
+ for (int i = 0; i < 100; i++) {
+ final byte[] val = new byte[20000];
+
+ val[i] = 1;
+
+ cache.put(i, val);
+ }
+
+ ig.context().cache().context().database().wakeupForCheckpoint("Forced checkpoint").get();
+ ig.context().cache().context().database().wakeupForCheckpoint("Forced checkpoint").get();
+
+ String nodeFolderName = ig.context().pdsFolderResolver().resolveFolders().folderName();
+
+ File dbDir = U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false);
+ File nodeLfsDir = new File(dbDir, nodeFolderName);
+ File cpMarkersDir = new File(nodeLfsDir, "cp");
+
+ final File[] cpMarkersToSave = cpMarkersDir.listFiles();
+
+ assert cpMarkersToSave != null;
+ assertTrue(cpMarkersToSave.length >= 2);
+
+ Arrays.sort(cpMarkersToSave, new Comparator<File>() {
+ @Override public int compare(File o1, File o2) {
+ return o1.getName().compareTo(o2.getName());
+ }
+ });
+
+ for (int i = 100; i < ENTRIES; i++) { // At least 20MB of raw data in total.
+ final byte[] val = new byte[20000];
+
+ val[i] = 1;
+
+ cache.put(i, val);
+ }
+
+ // Spam WAL to move all data records to compressible WAL zone.
+ for (int i = 0; i < WAL_SEGMENT_SIZE / DFLT_PAGE_SIZE * 2; i++)
+ ig.context().cache().context().wal().log(new PageSnapshot(new FullPageId(-1, -1), new byte[DFLT_PAGE_SIZE]));
+
+ // WAL archive segment is allowed to be compressed when it's at least one checkpoint away from current WAL head.
+ ig.context().cache().context().database().wakeupForCheckpoint("Forced checkpoint").get();
+ ig.context().cache().context().database().wakeupForCheckpoint("Forced checkpoint").get();
+
+ Thread.sleep(15_000); // Allow compressor to archive WAL segments.
+
+ File walDir = new File(dbDir, "wal");
+ File archiveDir = new File(walDir, "archive");
+ File nodeArchiveDir = new File(archiveDir, nodeFolderName);
+ File walSegment = new File(nodeArchiveDir, FileWriteAheadLogManager.FileDescriptor.fileName(0) + ".zip");
+
+ assertTrue(walSegment.exists());
+ assertTrue(walSegment.length() < WAL_SEGMENT_SIZE / 2); // Should be compressed at least in half.
+
+ stopAllGrids();
+
+ File[] cpMarkers = cpMarkersDir.listFiles(new FilenameFilter() {
+ @Override public boolean accept(File dir, String name) {
+ return !(name.equals(cpMarkersToSave[0].getName()) || name.equals(cpMarkersToSave[1].getName()));
+ }
+ });
+
+ assertNotNull(cpMarkers);
+ assertTrue(cpMarkers.length > 0);
+
+ File cacheDir = new File(nodeLfsDir, "cache-" + CACHE_NAME);
+ File[] lfsFiles = cacheDir.listFiles();
+
+ assertNotNull(lfsFiles);
+ assertTrue(lfsFiles.length > 0);
+
+ // Enforce reading WAL from the very beginning at the next start.
+ for (File f : cpMarkers)
+ f.delete();
+
+ for (File f : lfsFiles)
+ f.delete();
+
+ ig = (IgniteEx)startGrids(3);
+ ig.active(true);
+
+ cache = ig.cache(CACHE_NAME);
+
+ int missing = 0;
+
+ for (int i = 0; i < 100; i++) {
+ if (!cache.containsKey(i))
+ missing++;
+ }
+
+ System.out.println(">>> Missing " + missing + " entries logged before WAL iteration start");
+ assertTrue(missing > 0);
+
+ boolean fail = false;
+
+ // Check that all data is recovered from compacted WAL.
+ for (int i = 100; i < ENTRIES; i++) {
+ byte[] arr = cache.get(i);
+
+ if (arr == null) {
+ System.out.println(">>> Missing: " + i);
+
+ fail = true;
+ }
+ else if (arr[i] != 1) {
+ System.out.println(">>> Corrupted: " + i);
+
+ fail = true;
+ }
+ }
+
+ assertFalse(fail);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java
index 72450b8..b95208c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java
@@ -77,6 +77,11 @@ public class NoOpWALManager implements IgniteWriteAheadLogManager {
}
/** {@inheritDoc} */
+ @Override public void allowCompressionUntil(WALPointer ptr) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
@Override public boolean reserved(WALPointer ptr) {
return false;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
index adfdb2c..17833ac 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
@@ -36,6 +36,7 @@ import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalF
import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalFlushLogOnlySelfTest;
import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalHistoryReservationsTest;
import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalSerializerVersionTest;
+import org.apache.ignite.internal.processors.cache.persistence.db.wal.WalCompactionTest;
import org.apache.ignite.internal.processors.cache.persistence.db.wal.crc.IgniteDataIntegrityTests;
import org.apache.ignite.internal.processors.cache.persistence.db.wal.reader.IgniteWalReaderTest;
@@ -93,6 +94,8 @@ public class IgnitePdsTestSuite2 extends TestSuite {
suite.addTestSuite(IgniteWalSerializerVersionTest.class);
+ suite.addTestSuite(WalCompactionTest.class);
+
return suite;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java
index 114f630..ea30ce2 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java
@@ -29,6 +29,7 @@ import org.apache.ignite.internal.processors.cache.persistence.db.file.IgnitePds
import org.apache.ignite.internal.processors.cache.persistence.db.file.IgnitePdsThreadInterruptionTest;
import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalRecoveryPPCTest;
import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalRecoveryTest;
+import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalRecoveryWithCompactionTest;
import org.apache.ignite.internal.processors.cache.persistence.db.wal.WalRecoveryTxLogicalRecordsTest;
/**
@@ -49,6 +50,7 @@ public class IgnitePdsWithIndexingCoreTestSuite extends TestSuite {
suite.addTestSuite(WalRecoveryTxLogicalRecordsTest.class);
suite.addTestSuite(IgniteWalRecoveryTest.class);
+ suite.addTestSuite(IgniteWalRecoveryWithCompactionTest.class);
suite.addTestSuite(IgnitePdsNoActualWalHistoryTest.class);
suite.addTestSuite(IgnitePdsAtomicCacheRebalancingTest.class);
suite.addTestSuite(IgnitePdsTxCacheRebalancingTest.class);
http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/platforms/dotnet/Apache.Ignite.Core/Configuration/DataStorageConfiguration.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Configuration/DataStorageConfiguration.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Configuration/DataStorageConfiguration.cs
index 09b3fe4..ae3c0ed 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Configuration/DataStorageConfiguration.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Configuration/DataStorageConfiguration.cs
@@ -134,6 +134,11 @@ namespace Apache.Ignite.Core.Configuration
public const bool DefaultWriteThrottlingEnabled = false;
/// <summary>
+ /// Default value for <see cref="WalCompactionEnabled"/>.
+ /// </summary>
+ public const bool DefaultWalCompactionEnabled = false;
+
+ /// <summary>
/// Default size of a memory chunk reserved for system cache initially.
/// </summary>
public const long DefaultSystemRegionInitialSize = 40 * 1024 * 1024;
@@ -174,6 +179,7 @@ namespace Apache.Ignite.Core.Configuration
WalPath = DefaultWalPath;
CheckpointWriteOrder = DefaultCheckpointWriteOrder;
WriteThrottlingEnabled = DefaultWriteThrottlingEnabled;
+ WalCompactionEnabled = DefaultWalCompactionEnabled;
SystemRegionInitialSize = DefaultSystemRegionInitialSize;
SystemRegionMaxSize = DefaultSystemRegionMaxSize;
PageSize = DefaultPageSize;
@@ -207,6 +213,7 @@ namespace Apache.Ignite.Core.Configuration
MetricsRateTimeInterval = reader.ReadLongAsTimespan();
CheckpointWriteOrder = (CheckpointWriteOrder)reader.ReadInt();
WriteThrottlingEnabled = reader.ReadBoolean();
+ WalCompactionEnabled = reader.ReadBoolean();
SystemRegionInitialSize = reader.ReadLong();
SystemRegionMaxSize = reader.ReadLong();
@@ -256,6 +263,7 @@ namespace Apache.Ignite.Core.Configuration
writer.WriteTimeSpanAsLong(MetricsRateTimeInterval);
writer.WriteInt((int)CheckpointWriteOrder);
writer.WriteBoolean(WriteThrottlingEnabled);
+ writer.WriteBoolean(WalCompactionEnabled);
writer.WriteLong(SystemRegionInitialSize);
writer.WriteLong(SystemRegionMaxSize);
@@ -420,6 +428,14 @@ namespace Apache.Ignite.Core.Configuration
public bool WriteThrottlingEnabled { get; set; }
/// <summary>
+ /// Gets or sets flag indicating whether WAL compaction is enabled.
+ /// If true, system filters and compresses WAL archive in background.
+ /// Compressed WAL archive gets automatically decompressed on demand.
+ /// </summary>
+ [DefaultValue(DefaultWalCompactionEnabled)]
+ public bool WalCompactionEnabled { get; set; }
+
+ /// <summary>
/// Gets or sets the size of a memory chunk reserved for system needs.
/// </summary>
[DefaultValue(DefaultSystemRegionInitialSize)]
http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd b/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd
index 98607b6..c6eb3c1 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd
@@ -1806,6 +1806,11 @@
<xs:documentation>Threads that generate dirty pages too fast during ongoing checkpoint will be throttled.</xs:documentation>
</xs:annotation>
</xs:attribute>
+ <xs:attribute name="walCompactionEnabled" type="xs:boolean">
+ <xs:annotation>
+ <xs:documentation>If true, system will filter and compress WAL archive in background. Compressed WAL archive gets automatically decompressed on demand.</xs:documentation>
+ </xs:annotation>
+ </xs:attribute>
<xs:attribute name="pageSize" type="xs:int">
<xs:annotation>
<xs:documentation>Size of the memory page.</xs:documentation>