You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2018/04/06 12:37:15 UTC
[1/5] ignite git commit: IGNITE-7933 Checkpoing file markers should
be written atomically - Fixes #3633.
Repository: ignite
Updated Branches:
refs/heads/ignite-6083 cbf65cb44 -> 6e92fffca
IGNITE-7933 Checkpoing file markers should be written atomically - Fixes #3633.
Signed-off-by: Alexey Goncharuk <al...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4a0695ce
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4a0695ce
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4a0695ce
Branch: refs/heads/ignite-6083
Commit: 4a0695ceae2f99c4841e8382e723daff4580ea3d
Parents: a064702
Author: Pavel Kovalenko <jo...@gmail.com>
Authored: Fri Apr 6 10:35:17 2018 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Fri Apr 6 10:35:17 2018 +0300
----------------------------------------------------------------------
.../org/apache/ignite/internal/IgnitionEx.java | 2 +-
.../GridCacheDatabaseSharedManager.java | 145 ++++++++----
.../cache/persistence/file/AsyncFileIO.java | 9 +-
.../cache/persistence/file/FileIO.java | 20 +-
.../cache/persistence/file/FileIODecorator.java | 9 +-
.../persistence/file/RandomAccessFileIO.java | 13 +-
.../cache/persistence/file/UnzipFileIO.java | 7 +-
.../file/IgnitePdsDiskErrorsRecoveringTest.java | 231 +++++++++++++++----
.../db/wal/IgniteWalFlushFailoverTest.java | 4 +-
...lFlushMultiNodeFailoverAbstractSelfTest.java | 4 +-
.../pagemem/PagesWriteThrottleSmokeTest.java | 4 +-
.../file/AlignedBuffersDirectFileIO.java | 7 +-
12 files changed, 353 insertions(+), 102 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/4a0695ce/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index 8073faa..4708dd3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -410,7 +410,7 @@ public class IgnitionEx {
" milliseconds. Killing node...");
// We are not able to kill only one grid so whole JVM will be stopped.
- System.exit(Ignition.KILL_EXIT_CODE);
+ Runtime.getRuntime().halt(Ignition.KILL_EXIT_CODE);
}
}
}, timeoutMs, TimeUnit.MILLISECONDS);
http://git-wip-us.apache.org/repos/asf/ignite/blob/4a0695ce/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 71f3baa..70fc688 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -27,6 +27,7 @@ import java.nio.ByteOrder;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.channels.OverlappingFileLockException;
+import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
@@ -116,8 +117,10 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalP
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
+import org.apache.ignite.internal.processors.cache.persistence.file.PersistentStorageIOException;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageLifecycleListener;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.CheckpointMetricsTracker;
@@ -211,11 +214,14 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
/** Checkpoint file name pattern. */
private static final Pattern CP_FILE_NAME_PATTERN = Pattern.compile("(\\d+)-(.*)-(START|END)\\.bin");
+ /** Checkpoint file temporary suffix. This is needed to safe writing checkpoint markers through temporary file and renaming. */
+ public static final String FILE_TMP_SUFFIX = ".tmp";
+
/** Node started file patter. */
private static final Pattern NODE_STARTED_FILE_NAME_PATTERN = Pattern.compile("(\\d+)-node-started\\.bin");
/** Node started file suffix. */
- private static final String NODE_STARTED_FILE_NAME_SUFFIX = "-node-started.bin";
+ public static final String NODE_STARTED_FILE_NAME_SUFFIX = "-node-started.bin";
/** */
private static final FileFilter CP_FILE_FILTER = new FileFilter() {
@@ -378,6 +384,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
/** Initially disabled cache groups. */
private Collection<Integer> initiallyWalDisabledGrps;
+ /** File I/O factory for writing checkpoint markers. */
+ private final FileIOFactory ioFactory;
+
/**
* @param ctx Kernal context.
*/
@@ -402,6 +411,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
maxCpHistMemSize = Math.min(persistenceCfg.getWalHistorySize(),
IgniteSystemProperties.getInteger(IGNITE_PDS_MAX_CHECKPOINT_MEMORY_HISTORY_SIZE, 100));
+
+ ioFactory = persistenceCfg.getFileIOFactory();
}
/** */
@@ -494,6 +505,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
if (!U.mkdirs(cpDir))
throw new IgniteCheckedException("Could not create directory for checkpoint metadata: " + cpDir);
+ cleanupCheckpointDirectory();
+
final FileLockHolder preLocked = kernalCtx.pdsFolderResolver()
.resolveFolders()
.getLockedFileLockHolder();
@@ -508,6 +521,26 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
}
/**
+ * Cleanup checkpoint directory from all temporary files {@link #FILE_TMP_SUFFIX}.
+ */
+ private void cleanupCheckpointDirectory() throws IgniteCheckedException {
+ try {
+ try (DirectoryStream<Path> files = Files.newDirectoryStream(cpDir.toPath(), new DirectoryStream.Filter<Path>() {
+ @Override
+ public boolean accept(Path path) throws IOException {
+ return path.endsWith(FILE_TMP_SUFFIX);
+ }
+ })) {
+ for (Path path : files)
+ Files.delete(path);
+ }
+ }
+ catch (IOException e) {
+ throw new IgniteCheckedException("Failed to cleanup checkpoint directory: " + cpDir, e);
+ }
+ }
+
+ /**
*
*/
private void initDataBase() {
@@ -749,7 +782,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
notifyMetastorageReadyForReadWrite();
}
- catch (StorageException e) {
+ catch (StorageException | PersistentStorageIOException e) {
cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
throw new IgniteCheckedException(e);
@@ -760,41 +793,52 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
}
/**
+ * Creates file with current timestamp and specific "node-started.bin" suffix
+ * and writes into memory recovery pointer.
+ *
* @param ptr Memory recovery wal pointer.
*/
private void nodeStart(WALPointer ptr) throws IgniteCheckedException {
FileWALPointer p = (FileWALPointer)ptr;
- String fileName = U.currentTimeMillis() + "-node-started.bin";
+ String fileName = U.currentTimeMillis() + NODE_STARTED_FILE_NAME_SUFFIX;
+ String tmpFileName = fileName + FILE_TMP_SUFFIX;
ByteBuffer buf = ByteBuffer.allocate(20);
buf.order(ByteOrder.nativeOrder());
- try (FileChannel ch = FileChannel.open(
- Paths.get(cpDir.getAbsolutePath(), fileName),
- StandardOpenOption.CREATE_NEW, StandardOpenOption.APPEND)
- ) {
- buf.putLong(p.index());
+ try {
+ try (FileIO io = ioFactory.create(Paths.get(cpDir.getAbsolutePath(), tmpFileName).toFile(),
+ StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE)) {
+ buf.putLong(p.index());
- buf.putInt(p.fileOffset());
+ buf.putInt(p.fileOffset());
- buf.putInt(p.length());
+ buf.putInt(p.length());
- buf.flip();
+ buf.flip();
- ch.write(buf);
+ io.write(buf);
- buf.clear();
+ buf.clear();
+
+ io.force(true);
+ }
- ch.force(true);
+ Files.move(Paths.get(cpDir.getAbsolutePath(), tmpFileName), Paths.get(cpDir.getAbsolutePath(), fileName));
}
catch (IOException e) {
- throw new IgniteCheckedException(e);
+ throw new PersistentStorageIOException("Failed to write node start marker: " + ptr, e);
}
}
/**
+ * Collects memory recovery pointers from node started files. See {@link #nodeStart(WALPointer)}.
+ * Each pointer associated with timestamp extracted from file.
+ * Tuples are sorted by timestamp.
*
+ * @return Sorted list of tuples (node started timestamp, memory recovery pointer).
+ * @throws IgniteCheckedException
*/
public List<T2<Long, WALPointer>> nodeStartedPointers() throws IgniteCheckedException {
List<T2<Long, WALPointer>> res = new ArrayList<>();
@@ -806,15 +850,10 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
String n1 = o1.getName();
String n2 = o2.getName();
- Long ts1 = Long.valueOf(n1.substring(0, n1.length() - NODE_STARTED_FILE_NAME_SUFFIX.length()));
- Long ts2 = Long.valueOf(n2.substring(0, n2.length() - NODE_STARTED_FILE_NAME_SUFFIX.length()));
+ long ts1 = Long.valueOf(n1.substring(0, n1.length() - NODE_STARTED_FILE_NAME_SUFFIX.length()));
+ long ts2 = Long.valueOf(n2.substring(0, n2.length() - NODE_STARTED_FILE_NAME_SUFFIX.length()));
- if (ts1 == ts2)
- return 0;
- else if (ts1 < ts2)
- return -1;
- else
- return 1;
+ return Long.compare(ts1, ts2);
}
});
@@ -826,8 +865,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
Long ts = Long.valueOf(name.substring(0, name.length() - NODE_STARTED_FILE_NAME_SUFFIX.length()));
- try (FileChannel ch = FileChannel.open(f.toPath(), READ)) {
- ch.read(buf);
+ try (FileIO io = ioFactory.create(f, READ)) {
+ io.read(buf);
buf.flip();
@@ -1869,8 +1908,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
private WALPointer readPointer(File cpMarkerFile, ByteBuffer buf) throws IgniteCheckedException {
buf.position(0);
- try (FileChannel ch = FileChannel.open(cpMarkerFile.toPath(), READ)) {
- ch.read(buf);
+ try (FileIO io = ioFactory.create(cpMarkerFile, READ)) {
+ io.read(buf);
buf.flip();
@@ -2584,6 +2623,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
}
/**
+ * Writes into specified file checkpoint entry containing WAL pointer to checkpoint record.
+ *
* @param cpId Checkpoint ID.
* @param ptr Wal pointer of current checkpoint.
*/
@@ -2600,31 +2641,40 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
FileWALPointer filePtr = (FileWALPointer)ptr;
String fileName = checkpointFileName(cpTs, cpId, type);
+ String tmpFileName = fileName + FILE_TMP_SUFFIX;
- try (FileChannel ch = FileChannel.open(Paths.get(cpDir.getAbsolutePath(), fileName),
- StandardOpenOption.CREATE_NEW, StandardOpenOption.APPEND)) {
+ try {
+ try (FileIO io = ioFactory.create(Paths.get(cpDir.getAbsolutePath(), skipSync ? fileName : tmpFileName).toFile(),
+ StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE)) {
- tmpWriteBuf.rewind();
+ tmpWriteBuf.rewind();
- tmpWriteBuf.putLong(filePtr.index());
+ tmpWriteBuf.putLong(filePtr.index());
- tmpWriteBuf.putInt(filePtr.fileOffset());
+ tmpWriteBuf.putInt(filePtr.fileOffset());
- tmpWriteBuf.putInt(filePtr.length());
+ tmpWriteBuf.putInt(filePtr.length());
- tmpWriteBuf.flip();
+ tmpWriteBuf.flip();
- ch.write(tmpWriteBuf);
+ io.write(tmpWriteBuf);
- tmpWriteBuf.clear();
+ tmpWriteBuf.clear();
+
+ if (!skipSync)
+ io.force(true);
+ }
if (!skipSync)
- ch.force(true);
+ Files.move(Paths.get(cpDir.getAbsolutePath(), tmpFileName), Paths.get(cpDir.getAbsolutePath(), fileName));
return createCheckPointEntry(cpTs, ptr, cpId, rec, type);
}
catch (IOException e) {
- throw new IgniteCheckedException(e);
+ throw new PersistentStorageIOException("Failed to write checkpoint entry [ptr=" + filePtr
+ + ", cpTs=" + cpTs
+ + ", cpId=" + cpId
+ + ", type=" + type + "]", e);
}
}
@@ -2691,8 +2741,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
if (type != CheckpointEntryType.START)
return null;
- CheckpointEntry entry;
-
Map<Integer, CacheState> cacheGrpStates = null;
// Create lazy checkpoint entry.
@@ -2827,7 +2875,20 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
try {
CheckpointMetricsTracker tracker = new CheckpointMetricsTracker();
- Checkpoint chp = markCheckpointBegin(tracker);
+ Checkpoint chp;
+
+ try {
+ chp = markCheckpointBegin(tracker);
+ }
+ catch (IgniteCheckedException e) {
+ if (curCpProgress != null)
+ curCpProgress.cpFinishFut.onDone(e);
+
+ // In case of checkpoint initialization error node should be invalidated and stopped.
+ cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
+
+ return;
+ }
currCheckpointPagesCnt = chp.pagesSize;
@@ -2885,7 +2946,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
} catch (IgniteCheckedException e) {
chp.progress.cpFinishFut.onDone(e);
- // In case of writing error node should be invalidated and stopped.
+ // In case of checkpoint writing error node should be invalidated and stopped.
cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
return;
http://git-wip-us.apache.org/repos/asf/ignite/blob/4a0695ce/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AsyncFileIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AsyncFileIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AsyncFileIO.java
index b1db79d..799a78c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AsyncFileIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AsyncFileIO.java
@@ -169,13 +169,18 @@ public class AsyncFileIO implements FileIO {
}
/** {@inheritDoc} */
- @Override public MappedByteBuffer map(int maxWalSegmentSize) throws IOException {
+ @Override public MappedByteBuffer map(int sizeBytes) throws IOException {
throw new UnsupportedOperationException("AsynchronousFileChannel doesn't support mmap.");
}
/** {@inheritDoc} */
@Override public void force() throws IOException {
- ch.force(false);
+ force(false);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void force(boolean withMetadata) throws IOException {
+ ch.force(withMetadata);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/4a0695ce/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIO.java
index 73e44b0..822bd66 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIO.java
@@ -124,7 +124,16 @@ public interface FileIO extends AutoCloseable {
*/
public void write(byte[] buf, int off, int len) throws IOException;
- public MappedByteBuffer map(int maxWalSegmentSize) throws IOException;
+ /**
+ * Allocates memory mapped buffer for this file with given size.
+ *
+ * @param sizeBytes Size of buffer.
+ *
+ * @return Instance of mapped byte buffer.
+ *
+ * @throws IOException If some I/O error occurs.
+ */
+ public MappedByteBuffer map(int sizeBytes) throws IOException;
/**
* Forces any updates of this file to be written to the storage
@@ -135,6 +144,15 @@ public interface FileIO extends AutoCloseable {
public void force() throws IOException;
/**
+ * Forces any updates of this file to be written to the storage
+ * device that contains it.
+ *
+ * @param withMetadata If {@code true} force also file metadata.
+ * @throws IOException If some I/O error occurs.
+ */
+ public void force(boolean withMetadata) throws IOException;
+
+ /**
* Returns current file size in bytes.
*
* @return File size.
http://git-wip-us.apache.org/repos/asf/ignite/blob/4a0695ce/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIODecorator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIODecorator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIODecorator.java
index dd563f2..683845b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIODecorator.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIODecorator.java
@@ -77,8 +77,8 @@ public class FileIODecorator implements FileIO {
}
/** {@inheritDoc} */
- @Override public MappedByteBuffer map(int maxWalSegmentSize) throws IOException {
- return delegate.map(maxWalSegmentSize);
+ @Override public MappedByteBuffer map(int sizeBytes) throws IOException {
+ return delegate.map(sizeBytes);
}
/** {@inheritDoc} */
@@ -87,6 +87,11 @@ public class FileIODecorator implements FileIO {
}
/** {@inheritDoc} */
+ @Override public void force(boolean withMetadata) throws IOException {
+ delegate.force(withMetadata);
+ }
+
+ /** {@inheritDoc} */
@Override public long size() throws IOException {
return delegate.size();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4a0695ce/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIO.java
index 23d6ebf..8f7454d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIO.java
@@ -84,8 +84,8 @@ public class RandomAccessFileIO implements FileIO {
}
/** {@inheritDoc} */
- @Override public void force() throws IOException {
- ch.force(false);
+ @Override public void force(boolean withMetadata) throws IOException {
+ ch.force(withMetadata);
}
/** {@inheritDoc} */
@@ -104,7 +104,12 @@ public class RandomAccessFileIO implements FileIO {
}
/** {@inheritDoc} */
- @Override public MappedByteBuffer map(int maxWalSegmentSize) throws IOException {
- return ch.map(FileChannel.MapMode.READ_WRITE, 0, maxWalSegmentSize);
+ @Override public MappedByteBuffer map(int sizeBytes) throws IOException {
+ return ch.map(FileChannel.MapMode.READ_WRITE, 0, sizeBytes);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void force() throws IOException {
+ force(false);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4a0695ce/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/UnzipFileIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/UnzipFileIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/UnzipFileIO.java
index 83ff91b..469cf3e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/UnzipFileIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/UnzipFileIO.java
@@ -116,6 +116,11 @@ public class UnzipFileIO implements FileIO {
/** {@inheritDoc} */
@Override public void force() throws IOException {
+ force(false);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void force(boolean withMetadata) throws IOException {
throw new UnsupportedOperationException();
}
@@ -130,7 +135,7 @@ public class UnzipFileIO implements FileIO {
}
/** {@inheritDoc} */
- @Override public MappedByteBuffer map(int maxWalSegmentSize) throws IOException {
+ @Override public MappedByteBuffer map(int sizeBytes) throws IOException {
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4a0695ce/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsDiskErrorsRecoveringTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsDiskErrorsRecoveringTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsDiskErrorsRecoveringTest.java
index 3e85c77..c902879 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsDiskErrorsRecoveringTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsDiskErrorsRecoveringTest.java
@@ -25,6 +25,7 @@ import java.nio.file.OpenOption;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheRebalanceMode;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
@@ -39,15 +40,20 @@ import org.apache.ignite.failure.StopNodeFailureHandler;
import org.apache.ignite.internal.GridKernalState;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIO;
import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Assert;
+import static java.nio.file.StandardOpenOption.CREATE;
+import static java.nio.file.StandardOpenOption.READ;
+import static java.nio.file.StandardOpenOption.WRITE;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_MMAP;
/**
@@ -61,19 +67,13 @@ public class IgnitePdsDiskErrorsRecoveringTest extends GridCommonAbstractTest {
private static final int WAL_SEGMENT_SIZE = 1024 * PAGE_SIZE;
/** */
- private static final long DFLT_DISK_SPACE_BYTES = Long.MAX_VALUE;
-
- /** */
private static final long STOP_TIMEOUT_MS = 30 * 1000;
/** */
private static final String CACHE_NAME = "cache";
- /** */
- private boolean failPageStoreDiskOperations = false;
-
- /** */
- private long diskSpaceBytes = DFLT_DISK_SPACE_BYTES;
+ /** Specified i/o factory for particular test. */
+ private FileIOFactory ioFactory;
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
@@ -88,8 +88,7 @@ public class IgnitePdsDiskErrorsRecoveringTest extends GridCommonAbstractTest {
cleanPersistenceDir();
- failPageStoreDiskOperations = false;
- diskSpaceBytes = DFLT_DISK_SPACE_BYTES;
+ ioFactory = null;
System.clearProperty(IGNITE_WAL_MMAP);
}
@@ -103,10 +102,11 @@ public class IgnitePdsDiskErrorsRecoveringTest extends GridCommonAbstractTest {
.setWalMode(WALMode.LOG_ONLY)
.setWalCompactionEnabled(false)
.setWalSegmentSize(WAL_SEGMENT_SIZE)
+ .setCheckpointFrequency(240 * 60 * 1000)
.setConcurrencyLevel(Runtime.getRuntime().availableProcessors() * 4);
- if (failPageStoreDiskOperations)
- dsCfg.setFileIOFactory(new LimitedSizeFileIOFactory(new RandomAccessFileIOFactory(), diskSpaceBytes));
+ if (ioFactory != null)
+ dsCfg.setFileIOFactory(ioFactory);
cfg.setDataStorageConfiguration(dsCfg);
@@ -122,19 +122,17 @@ public class IgnitePdsDiskErrorsRecoveringTest extends GridCommonAbstractTest {
}
/**
- *
+ * Test node stopping & recovering on cache initialization fail.
*/
- public void testRecoveringOnCacheInitError() throws Exception {
- failPageStoreDiskOperations = true;
-
- // Two pages is enough to initialize MetaStorage.
- diskSpaceBytes = 2 * PAGE_SIZE;
+ public void testRecoveringOnCacheInitFail() throws Exception {
+ // Fail to initialize page store. 2 extra pages is needed for MetaStorage.
+ ioFactory = new FilteringFileIOFactory(".bin", new LimitedSizeFileIOFactory(new RandomAccessFileIOFactory(), 2 * PAGE_SIZE));
final IgniteEx grid = startGrid(0);
boolean failed = false;
try {
- grid.active(true);
+ grid.cluster().active(true);
} catch (Exception expected) {
log.warning("Expected cache error", expected);
@@ -147,21 +145,128 @@ public class IgnitePdsDiskErrorsRecoveringTest extends GridCommonAbstractTest {
awaitStop(grid);
// Grid should be successfully recovered after stopping.
- failPageStoreDiskOperations = false;
+ ioFactory = null;
IgniteEx recoveredGrid = startGrid(0);
recoveredGrid.active(true);
}
/**
+ * Test node stopping & recovering on start marker writing fail during activation.
*
+ * @throws Exception If test failed.
*/
- public void testRecoveringOnCheckpointWritingError() throws Exception {
- failPageStoreDiskOperations = true;
- diskSpaceBytes = 1024 * PAGE_SIZE;
+ public void testRecoveringOnNodeStartMarkerWriteFail() throws Exception {
+ // Fail to write node start marker tmp file at the second checkpoint. Pass only initial checkpoint.
+ ioFactory = new FilteringFileIOFactory("started.bin" + GridCacheDatabaseSharedManager.FILE_TMP_SUFFIX, new LimitedSizeFileIOFactory(new RandomAccessFileIOFactory(), 20));
+
+ IgniteEx grid = startGrid(0);
+ grid.cluster().active(true);
+
+ for (int i = 0; i < 1000; i++) {
+ byte payload = (byte) i;
+ byte[] data = new byte[2048];
+ Arrays.fill(data, payload);
+
+ grid.cache(CACHE_NAME).put(i, data);
+ }
+
+ stopAllGrids();
+
+ boolean activationFailed = false;
+ try {
+ grid = startGrid(0);
+ grid.cluster().active(true);
+ }
+ catch (IgniteException e) {
+ log.warning("Activation test exception", e);
+
+ activationFailed = true;
+ }
+
+ Assert.assertTrue("Activation must be failed", activationFailed);
+
+ // Grid should be automatically stopped after checkpoint fail.
+ awaitStop(grid);
+
+ // Grid should be successfully recovered after stopping.
+ ioFactory = null;
+
+ IgniteEx recoveredGrid = startGrid(0);
+ recoveredGrid.cluster().active(true);
+
+ for (int i = 0; i < 1000; i++) {
+ byte payload = (byte) i;
+ byte[] data = new byte[2048];
+ Arrays.fill(data, payload);
+
+ byte[] actualData = (byte[]) recoveredGrid.cache(CACHE_NAME).get(i);
+ Assert.assertArrayEquals(data, actualData);
+ }
+ }
+
+
+ /**
+ * Test node stopping & recovering on checkpoint begin fail.
+ *
+ * @throws Exception If test failed.
+ */
+ public void testRecoveringOnCheckpointBeginFail() throws Exception {
+ // Fail to write checkpoint start marker tmp file at the second checkpoint. Pass only initial checkpoint.
+ ioFactory = new FilteringFileIOFactory("START.bin" + GridCacheDatabaseSharedManager.FILE_TMP_SUFFIX, new LimitedSizeFileIOFactory(new RandomAccessFileIOFactory(), 20));
final IgniteEx grid = startGrid(0);
- grid.active(true);
+ grid.cluster().active(true);
+
+ for (int i = 0; i < 1000; i++) {
+ byte payload = (byte) i;
+ byte[] data = new byte[2048];
+ Arrays.fill(data, payload);
+
+ grid.cache(CACHE_NAME).put(i, data);
+ }
+
+ String errMsg = "Failed to write checkpoint entry";
+
+ boolean checkpointFailed = false;
+ try {
+ forceCheckpoint();
+ }
+ catch (IgniteCheckedException e) {
+ if (e.getMessage().contains(errMsg))
+ checkpointFailed = true;
+ }
+
+ Assert.assertTrue("Checkpoint must be failed by IgniteCheckedException: " + errMsg, checkpointFailed);
+
+ // Grid should be automatically stopped after checkpoint fail.
+ awaitStop(grid);
+
+ // Grid should be successfully recovered after stopping.
+ ioFactory = null;
+
+ IgniteEx recoveredGrid = startGrid(0);
+ recoveredGrid.cluster().active(true);
+
+ for (int i = 0; i < 1000; i++) {
+ byte payload = (byte) i;
+ byte[] data = new byte[2048];
+ Arrays.fill(data, payload);
+
+ byte[] actualData = (byte[]) recoveredGrid.cache(CACHE_NAME).get(i);
+ Assert.assertArrayEquals(data, actualData);
+ }
+ }
+
+ /**
+ * Test node stopping & recovering on checkpoint pages write fail.
+ */
+ public void testRecoveringOnCheckpointWriteFail() throws Exception {
+ // Fail write partition and index files at the second checkpoint. Pass only initial checkpoint.
+ ioFactory = new FilteringFileIOFactory(".bin", new LimitedSizeFileIOFactory(new RandomAccessFileIOFactory(), 128 * PAGE_SIZE));
+
+ final IgniteEx grid = startGrid(0);
+ grid.cluster().active(true);
for (int i = 0; i < 1000; i++) {
byte payload = (byte) i;
@@ -187,10 +292,10 @@ public class IgnitePdsDiskErrorsRecoveringTest extends GridCommonAbstractTest {
awaitStop(grid);
// Grid should be successfully recovered after stopping.
- failPageStoreDiskOperations = false;
+ ioFactory = null;
IgniteEx recoveredGrid = startGrid(0);
- recoveredGrid.active(true);
+ recoveredGrid.cluster().active(true);
for (int i = 0; i < 1000; i++) {
byte payload = (byte) i;
@@ -203,33 +308,35 @@ public class IgnitePdsDiskErrorsRecoveringTest extends GridCommonAbstractTest {
}
/**
- *
+ * Test node stopping & recovering on WAL writing fail with enabled MMAP (Batch allocation for WAL segments).
*/
- public void testRecoveringOnWALErrorWithMmap() throws Exception {
- diskSpaceBytes = WAL_SEGMENT_SIZE;
+ public void testRecoveringOnWALWritingFail1() throws Exception {
+ // Allow to allocate only 1 wal segment, fail on write to second.
+ ioFactory = new FilteringFileIOFactory(".wal", new LimitedSizeFileIOFactory(new RandomAccessFileIOFactory(), WAL_SEGMENT_SIZE));
System.setProperty(IGNITE_WAL_MMAP, "true");
- emulateRecoveringOnWALWritingError();
+ doTestRecoveringOnWALWritingFail();
}
/**
- *
+ * Test node stopping & recovering on WAL writing fail with disabled MMAP.
*/
- public void testRecoveringOnWALErrorWithoutMmap() throws Exception {
- diskSpaceBytes = 2 * WAL_SEGMENT_SIZE;
+ public void testRecoveringOnWALWritingFail2() throws Exception {
+ // Fail somewhere on the second wal segment.
+ ioFactory = new FilteringFileIOFactory(".wal", new LimitedSizeFileIOFactory(new RandomAccessFileIOFactory(), (long) (1.5 * WAL_SEGMENT_SIZE)));
System.setProperty(IGNITE_WAL_MMAP, "false");
- emulateRecoveringOnWALWritingError();
+ doTestRecoveringOnWALWritingFail();
}
/**
- *
+ * Test node stopping & recovery on WAL writing fail.
*/
- private void emulateRecoveringOnWALWritingError() throws Exception {
+ private void doTestRecoveringOnWALWritingFail() throws Exception {
final IgniteEx grid = startGrid(0);
FileWriteAheadLogManager wal = (FileWriteAheadLogManager)grid.context().cache().context().wal();
- wal.setFileIOFactory(new LimitedSizeFileIOFactory(new RandomAccessFileIOFactory(), diskSpaceBytes));
+ wal.setFileIOFactory(ioFactory);
- grid.active(true);
+ grid.cluster().active(true);
int failedPosition = -1;
@@ -254,9 +361,11 @@ public class IgnitePdsDiskErrorsRecoveringTest extends GridCommonAbstractTest {
// Grid should be automatically stopped after WAL fail.
awaitStop(grid);
+ ioFactory = null;
+
// Grid should be successfully recovered after stopping.
IgniteEx recoveredGrid = startGrid(0);
- recoveredGrid.active(true);
+ recoveredGrid.cluster().active(true);
for (int i = 0; i < failedPosition; i++) {
byte payload = (byte) i;
@@ -328,11 +437,49 @@ public class IgnitePdsDiskErrorsRecoveringTest extends GridCommonAbstractTest {
}
/** {@inheritDoc} */
- @Override public MappedByteBuffer map(int maxWalSegmentSize) throws IOException {
- availableSpaceBytes.addAndGet(-maxWalSegmentSize);
+ @Override public MappedByteBuffer map(int sizeBytes) throws IOException {
+ availableSpaceBytes.addAndGet(-sizeBytes);
if (availableSpaceBytes.get() < 0)
throw new IOException("Not enough space!");
- return super.map(maxWalSegmentSize);
+ return super.map(sizeBytes);
+ }
+ }
+
+ /**
+ * Factory to provide custom File I/O interfaces only for files with specified suffix.
+ * For other files {@link RandomAccessFileIO} will be used.
+ */
+ private static class FilteringFileIOFactory implements FileIOFactory {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
+
+ /** Delegate. */
+ private final FileIOFactory delegate;
+
+ /** File suffix pattern. */
+ private final String pattern;
+
+ /**
+ * Constructor.
+ *
+ * @param pattern File suffix pattern.
+ * @param delegate I/O Factory delegate.
+ */
+ FilteringFileIOFactory(String pattern, FileIOFactory delegate) {
+ this.delegate = delegate;
+ this.pattern = pattern;
+ }
+
+ /** {@inheritDoc} */
+ @Override public FileIO create(File file) throws IOException {
+ return create(file, CREATE, WRITE, READ);
+ }
+
+ /** {@inheritDoc} */
+ @Override public FileIO create(File file, OpenOption... modes) throws IOException {
+ if (file.getName().endsWith(pattern))
+ return delegate.create(file, modes);
+ return new RandomAccessFileIO(file, modes);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4a0695ce/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java
index 946b4e8..042a447 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java
@@ -206,8 +206,8 @@ public class IgniteWalFlushFailoverTest extends GridCommonAbstractTest {
}
/** {@inheritDoc} */
- @Override public MappedByteBuffer map(int maxWalSegmentSize) throws IOException {
- return delegate.map(maxWalSegmentSize);
+ @Override public MappedByteBuffer map(int sizeBytes) throws IOException {
+ return delegate.map(sizeBytes);
}
};
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4a0695ce/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushMultiNodeFailoverAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushMultiNodeFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushMultiNodeFailoverAbstractSelfTest.java
index 1259c3c..fe16328 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushMultiNodeFailoverAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushMultiNodeFailoverAbstractSelfTest.java
@@ -250,8 +250,8 @@ public abstract class IgniteWalFlushMultiNodeFailoverAbstractSelfTest extends Gr
}
/** {@inheritDoc} */
- @Override public MappedByteBuffer map(int maxWalSegmentSize) throws IOException {
- return delegate.map(maxWalSegmentSize);
+ @Override public MappedByteBuffer map(int sizeBytes) throws IOException {
+ return delegate.map(sizeBytes);
}
};
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4a0695ce/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSmokeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSmokeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSmokeTest.java
index 9f1342f..249718b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSmokeTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSmokeTest.java
@@ -319,8 +319,8 @@ public class PagesWriteThrottleSmokeTest extends GridCommonAbstractTest {
}
/** {@inheritDoc} */
- @Override public MappedByteBuffer map(int maxWalSegmentSize) throws IOException {
- return delegate.map(maxWalSegmentSize);
+ @Override public MappedByteBuffer map(int sizeBytes) throws IOException {
+ return delegate.map(sizeBytes);
}
};
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4a0695ce/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AlignedBuffersDirectFileIO.java
----------------------------------------------------------------------
diff --git a/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AlignedBuffersDirectFileIO.java b/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AlignedBuffersDirectFileIO.java
index 3cb4886..681426c 100644
--- a/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AlignedBuffersDirectFileIO.java
+++ b/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AlignedBuffersDirectFileIO.java
@@ -460,12 +460,17 @@ public class AlignedBuffersDirectFileIO implements FileIO {
}
/** {@inheritDoc} */
- @Override public MappedByteBuffer map(int maxWalSegmentSize) throws IOException {
+ @Override public MappedByteBuffer map(int sizeBytes) throws IOException {
throw new UnsupportedOperationException("AsynchronousFileChannel doesn't support mmap.");
}
/** {@inheritDoc} */
@Override public void force() throws IOException {
+ force(false);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void force(boolean withMetadata) throws IOException {
if (IgniteNativeIoLib.fsync(fdCheckOpened()) < 0)
throw new IOException(String.format("Error fsync()'ing %s, got %s", file, getLastError()));
}
[4/5] ignite git commit: IGNITE-8163 PDS Indexing suite is hanging on
TC in different branches including master - Fixes #3766.
Posted by ag...@apache.org.
IGNITE-8163 PDS Indexing suite is hanging on TC in different branches including master - Fixes #3766.
Signed-off-by: dpavlov <dp...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8053fc1c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8053fc1c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8053fc1c
Branch: refs/heads/ignite-6083
Commit: 8053fc1c9c774037018601e1661992644c8319df
Parents: 57aecd7
Author: Sergey Chugunov <se...@gmail.com>
Authored: Fri Apr 6 15:18:12 2018 +0300
Committer: dpavlov <dp...@apache.org>
Committed: Fri Apr 6 15:18:12 2018 +0300
----------------------------------------------------------------------
.../cache/persistence/db/wal/WalCompactionTest.java | 10 ++++++----
1 file changed, 6 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/8053fc1c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java
index 6b79d90..c1e8967 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java
@@ -105,7 +105,7 @@ public class WalCompactionTest extends GridCommonAbstractTest {
*/
public void testApplyingUpdatesFromCompactedWal() throws Exception {
IgniteEx ig = (IgniteEx)startGrids(3);
- ig.active(true);
+ ig.cluster().active(true);
IgniteCache<Integer, byte[]> cache = ig.cache("cache");
@@ -162,7 +162,8 @@ public class WalCompactionTest extends GridCommonAbstractTest {
f.delete();
ig = (IgniteEx)startGrids(3);
- ig.active(true);
+
+ awaitPartitionMapExchange();
cache = ig.cache(CACHE_NAME);
@@ -192,7 +193,7 @@ public class WalCompactionTest extends GridCommonAbstractTest {
*/
public void testSeekingStartInCompactedSegment() throws Exception {
IgniteEx ig = (IgniteEx)startGrids(3);
- ig.active(true);
+ ig.cluster().active(true);
IgniteCache<Integer, byte[]> cache = ig.cache("cache");
@@ -281,7 +282,8 @@ public class WalCompactionTest extends GridCommonAbstractTest {
f.delete();
ig = (IgniteEx)startGrids(3);
- ig.active(true);
+
+ awaitPartitionMapExchange();
cache = ig.cache(CACHE_NAME);
[2/5] ignite git commit: IGNITE-8160 Fixed flaky data structures test
Posted by ag...@apache.org.
IGNITE-8160 Fixed flaky data structures test
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/84a40e53
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/84a40e53
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/84a40e53
Branch: refs/heads/ignite-6083
Commit: 84a40e53a971b7a96d9dd80b2a2c6873bd6e09e2
Parents: 4a0695c
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Fri Apr 6 12:45:51 2018 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Fri Apr 6 12:45:51 2018 +0300
----------------------------------------------------------------------
...eAbstractDataStructuresFailoverSelfTest.java | 56 +++++++++++++++++---
1 file changed, 49 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/84a40e53/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
index 9f9e577..69a466d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
@@ -19,7 +19,9 @@ package org.apache.ignite.internal.processors.cache.datastructures;
import java.io.Closeable;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;
@@ -37,6 +39,7 @@ import org.apache.ignite.IgniteAtomicLong;
import org.apache.ignite.IgniteAtomicReference;
import org.apache.ignite.IgniteAtomicSequence;
import org.apache.ignite.IgniteAtomicStamped;
+import org.apache.ignite.IgniteCompute;
import org.apache.ignite.IgniteCountDownLatch;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteInterruptedException;
@@ -50,11 +53,13 @@ import org.apache.ignite.configuration.CollectionConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.util.GridLeanSet;
import org.apache.ignite.internal.util.typedef.CA;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.PA;
+import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteCallable;
@@ -1109,8 +1114,36 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
* @throws Exception If failed.
*/
public void testAtomicSequenceInitialization() throws Exception {
+ checkAtomicSequenceInitialization(false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicSequenceInitializationOnStableNodes() throws Exception {
+ checkAtomicSequenceInitialization(true);
+ }
+
+ /**
+ * @param limitProjection {@code True} if test should call init only on stable nodes.
+ * @throws Exception If failed.
+ */
+ private void checkAtomicSequenceInitialization(boolean limitProjection) throws Exception {
int threadCnt = 3;
+ IgniteCompute compute;
+
+ if (limitProjection) {
+ List<UUID> nodeIds = new ArrayList<>(gridCount());
+
+ for (int i = 0; i < gridCount(); i++)
+ nodeIds.add(grid(i).cluster().localNode().id());
+
+ compute = grid(0).compute(grid(0).cluster().forNodeIds(nodeIds));
+ }
+ else
+ compute = grid(0).compute();
+
final AtomicInteger idx = new AtomicInteger(gridCount());
IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
@@ -1137,20 +1170,29 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
}, threadCnt, "test-thread");
while (!fut.isDone()) {
- grid(0).compute().call(new IgniteCallable<Object>() {
+ compute.call(new IgniteCallable<Object>() {
/** */
@IgniteInstanceResource
private Ignite g;
- @Override public Object call() throws Exception {
- IgniteAtomicSequence seq = g.atomicSequence(STRUCTURE_NAME, 1, true);
+ @Override public Object call() {
+ try {
+ IgniteAtomicSequence seq = g.atomicSequence(STRUCTURE_NAME, 1, true);
+
+ assert seq != null;
- assert seq != null;
+ for (int i = 0; i < 1000; i++)
+ seq.getAndIncrement();
- for (int i = 0; i < 1000; i++)
- seq.getAndIncrement();
+ return null;
+ }
+ catch (IgniteException e) {
+ // Fail if we are on stable nodes or exception is not node stop.
+ if (limitProjection || !X.hasCause(e, NodeStoppingException.class))
+ throw e;
- return null;
+ return null;
+ }
}
});
}
[3/5] ignite git commit: IGNITE-8018 Optimized GridCacheMapEntry
initialValue() - Fixes #3686.
Posted by ag...@apache.org.
IGNITE-8018 Optimized GridCacheMapEntry initialValue() - Fixes #3686.
Signed-off-by: Alexey Goncharuk <al...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/57aecd7a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/57aecd7a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/57aecd7a
Branch: refs/heads/ignite-6083
Commit: 57aecd7ab2bf5f0836aabf40e4b4051fd07228d2
Parents: 84a40e5
Author: Ilya Lantukh <il...@gridgain.com>
Authored: Fri Apr 6 13:49:10 2018 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Fri Apr 6 13:49:10 2018 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheMapEntry.java | 158 +++++++++++++++----
.../colocated/GridDhtDetachedCacheEntry.java | 3 +-
.../distributed/near/GridNearCacheEntry.java | 3 +-
3 files changed, 131 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/57aecd7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 74dabe9..a6ef0d2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -77,6 +77,7 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgnitePredicate;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_EXPIRED;
@@ -2699,40 +2700,80 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
) throws IgniteCheckedException, GridCacheEntryRemovedException {
ensureFreeSpace();
+ boolean deferred = false;
+ boolean obsolete = false;
+
+ GridCacheVersion oldVer = null;
+
lockEntry();
try {
checkObsolete();
+ boolean walEnabled = !cctx.isNear() && cctx.group().persistenceEnabled() && cctx.group().walEnabled();
+
+ long expTime = expireTime < 0 ? CU.toExpireTime(ttl) : expireTime;
+
+ val = cctx.kernalContext().cacheObjects().prepareForCache(val, cctx);
+
+ final boolean unswapped = ((flags & IS_UNSWAPPED_MASK) != 0);
+
boolean update;
- boolean walEnabled = !cctx.isNear() && cctx.group().persistenceEnabled() && cctx.group().walEnabled();
+ IgnitePredicate<CacheDataRow> p = new IgnitePredicate<CacheDataRow>() {
+ @Override public boolean apply(@Nullable CacheDataRow row) {
+ boolean update0;
+
+ GridCacheVersion currentVer = row != null ? row.version() : GridCacheMapEntry.this.ver;
- if (cctx.group().persistenceEnabled()) {
- unswap(false);
+ boolean isStartVer = currentVer.nodeOrder() == cctx.localNode().order()
+ && currentVer.order() == startVer;
- if (!isNew()) {
- if (cctx.atomic())
- update = ATOMIC_VER_COMPARATOR.compare(this.ver, ver) < 0;
+ if (cctx.group().persistenceEnabled()) {
+ if (!isStartVer) {
+ if (cctx.atomic())
+ update0 = ATOMIC_VER_COMPARATOR.compare(currentVer, ver) < 0;
+ else
+ update0 = currentVer.compareTo(ver) < 0;
+ }
+ else
+ update0 = true;
+ }
else
- update = this.ver.compareTo(ver) < 0;
+ update0 = isStartVer;
+
+ update0 |= (!preload && deletedUnlocked());
+
+ return update0;
}
- else
- update = true;
- }
- else
- update = isNew() && !cctx.offheap().containsKey(this);
+ };
- update |= !preload && deletedUnlocked();
+ if (unswapped) {
+ update = p.apply(null);
- if (update) {
- long expTime = expireTime < 0 ? CU.toExpireTime(ttl) : expireTime;
+ if (update) {
+ // If entry is already unswapped and we are modifying it, we must run deletion callbacks for old value.
+ long oldExpTime = expireTimeUnlocked();
+ long delta = (oldExpTime == 0 ? 0 : oldExpTime - U.currentTimeMillis());
- val = cctx.kernalContext().cacheObjects().prepareForCache(val, cctx);
+ if (delta < 0) {
+ if (onExpired(this.val, null)) {
+ if (cctx.deferredDelete()) {
+ deferred = true;
+ oldVer = this.ver;
+ }
+ else if (val == null)
+ obsolete = true;
+ }
+ }
- if (val != null)
storeValue(val, expTime, ver, null);
+ }
+ }
+ else // Optimization to access storage only once.
+ update = storeValue(val, expTime, ver, null, p);
+ if (update) {
update(val, expTime, ttl, ver, true);
boolean skipQryNtf = false;
@@ -2797,6 +2838,20 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
}
finally {
unlockEntry();
+
+ // It is necessary to execute these callbacks outside of lock to avoid deadlocks.
+
+ if (obsolete) {
+ onMarkedObsolete();
+
+ cctx.cache().removeEntry(this);
+ }
+
+ if (deferred) {
+ assert oldVer != null;
+
+ cctx.onDeferredDelete(this, oldVer);
+ }
}
}
@@ -3516,14 +3571,39 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
* @param oldRow Old row if available.
* @throws IgniteCheckedException If update failed.
*/
- protected void storeValue(CacheObject val,
+ protected boolean storeValue(CacheObject val,
long expireTime,
GridCacheVersion ver,
@Nullable CacheDataRow oldRow) throws IgniteCheckedException {
- assert lock.isHeldByCurrentThread();
assert val != null : "null values in update for key: " + key;
- cctx.offheap().invoke(cctx, key, localPartition(), new UpdateClosure(this, val, ver, expireTime));
+ return storeValue(val, expireTime, ver, oldRow, null);
+ }
+
+ /**
+ * Stores value in offheap.
+ *
+ * @param val Value.
+ * @param expireTime Expire time.
+ * @param ver New entry version.
+ * @param oldRow Old row if available.
+ * @param predicate Optional predicate.
+ * @throws IgniteCheckedException If update failed.
+ * @return {@code True} if storage was modified.
+ */
+ protected boolean storeValue(
+ @Nullable CacheObject val,
+ long expireTime,
+ GridCacheVersion ver,
+ @Nullable CacheDataRow oldRow,
+ @Nullable IgnitePredicate<CacheDataRow> predicate) throws IgniteCheckedException {
+ assert lock.isHeldByCurrentThread();
+
+ UpdateClosure closure = new UpdateClosure(this, val, ver, expireTime, predicate);
+
+ cctx.offheap().invoke(cctx, key, localPartition(), closure);
+
+ return closure.treeOp != IgniteTree.OperationType.NOOP;
}
/**
@@ -4295,7 +4375,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
private final GridCacheMapEntry entry;
/** */
- private final CacheObject val;
+ @Nullable private final CacheObject val;
/** */
private final GridCacheVersion ver;
@@ -4304,6 +4384,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
private final long expireTime;
/** */
+ @Nullable private final IgnitePredicate<CacheDataRow> predicate;
+
+ /** */
private CacheDataRow newRow;
/** */
@@ -4317,31 +4400,44 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
* @param val New value.
* @param ver New version.
* @param expireTime New expire time.
+ * @param predicate Optional predicate.
*/
- UpdateClosure(GridCacheMapEntry entry, CacheObject val, GridCacheVersion ver, long expireTime) {
+ UpdateClosure(GridCacheMapEntry entry, @Nullable CacheObject val, GridCacheVersion ver, long expireTime,
+ @Nullable IgnitePredicate<CacheDataRow> predicate) {
this.entry = entry;
this.val = val;
this.ver = ver;
this.expireTime = expireTime;
+ this.predicate = predicate;
}
/** {@inheritDoc} */
@Override public void call(@Nullable CacheDataRow oldRow) throws IgniteCheckedException {
this.oldRow = oldRow;
+ if (predicate != null && !predicate.apply(oldRow)) {
+ treeOp = IgniteTree.OperationType.NOOP;
+
+ return;
+ }
+
if (oldRow != null)
oldRow.key(entry.key);
- newRow = entry.cctx.offheap().dataStore(entry.localPartition()).createRow(
- entry.cctx,
- entry.key,
- val,
- ver,
- expireTime,
- oldRow);
+ if (val != null) {
+ newRow = entry.cctx.offheap().dataStore(entry.localPartition()).createRow(
+ entry.cctx,
+ entry.key,
+ val,
+ ver,
+ expireTime,
+ oldRow);
- treeOp = oldRow != null && oldRow.link() == newRow.link() ?
- IgniteTree.OperationType.NOOP : IgniteTree.OperationType.PUT;
+ treeOp = oldRow != null && oldRow.link() == newRow.link() ?
+ IgniteTree.OperationType.NOOP : IgniteTree.OperationType.PUT;
+ }
+ else
+ treeOp = oldRow != null ? IgniteTree.OperationType.REMOVE : IgniteTree.OperationType.NOOP;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/57aecd7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
index 3536908..d02015b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
@@ -65,10 +65,11 @@ public class GridDhtDetachedCacheEntry extends GridDistributedCacheEntry {
}
/** {@inheritDoc} */
- @Override protected void storeValue(CacheObject val,
+ @Override protected boolean storeValue(CacheObject val,
long expireTime,
GridCacheVersion ver,
CacheDataRow oldRow) throws IgniteCheckedException {
+ return false;
// No-op for detached entries, index is updated on primary nodes.
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/57aecd7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
index 322e63c..fb41f5c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
@@ -458,7 +458,8 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
}
/** {@inheritDoc} */
- @Override protected void storeValue(CacheObject val, long expireTime, GridCacheVersion ver, CacheDataRow oldRow) {
+ @Override protected boolean storeValue(CacheObject val, long expireTime, GridCacheVersion ver, CacheDataRow oldRow) {
+ return false;
// No-op: queries are disabled for near cache.
}
[5/5] ignite git commit: Merge branch 'master' of
https://git-wip-us.apache.org/repos/asf/ignite into ignite-6083
Posted by ag...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/ignite into ignite-6083
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6e92fffc
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6e92fffc
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6e92fffc
Branch: refs/heads/ignite-6083
Commit: 6e92fffcada7645d364233ae6948f1bddc378cce
Parents: cbf65cb 8053fc1
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Fri Apr 6 15:37:01 2018 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Fri Apr 6 15:37:01 2018 +0300
----------------------------------------------------------------------
.../org/apache/ignite/internal/IgnitionEx.java | 2 +-
.../processors/cache/GridCacheMapEntry.java | 158 ++++++++++---
.../colocated/GridDhtDetachedCacheEntry.java | 3 +-
.../distributed/near/GridNearCacheEntry.java | 3 +-
.../GridCacheDatabaseSharedManager.java | 145 ++++++++----
.../cache/persistence/file/AsyncFileIO.java | 9 +-
.../cache/persistence/file/FileIO.java | 20 +-
.../cache/persistence/file/FileIODecorator.java | 9 +-
.../persistence/file/RandomAccessFileIO.java | 13 +-
.../cache/persistence/file/UnzipFileIO.java | 7 +-
...eAbstractDataStructuresFailoverSelfTest.java | 56 ++++-
.../file/IgnitePdsDiskErrorsRecoveringTest.java | 231 +++++++++++++++----
.../db/wal/IgniteWalFlushFailoverTest.java | 4 +-
...lFlushMultiNodeFailoverAbstractSelfTest.java | 4 +-
.../persistence/db/wal/WalCompactionTest.java | 10 +-
.../pagemem/PagesWriteThrottleSmokeTest.java | 4 +-
.../file/AlignedBuffersDirectFileIO.java | 7 +-
17 files changed, 539 insertions(+), 146 deletions(-)
----------------------------------------------------------------------