You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2018/04/06 12:37:15 UTC

[1/5] ignite git commit: IGNITE-7933 Checkpoing file markers should be written atomically - Fixes #3633.

Repository: ignite
Updated Branches:
  refs/heads/ignite-6083 cbf65cb44 -> 6e92fffca


IGNITE-7933 Checkpoing file markers should be written atomically - Fixes #3633.

Signed-off-by: Alexey Goncharuk <al...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4a0695ce
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4a0695ce
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4a0695ce

Branch: refs/heads/ignite-6083
Commit: 4a0695ceae2f99c4841e8382e723daff4580ea3d
Parents: a064702
Author: Pavel Kovalenko <jo...@gmail.com>
Authored: Fri Apr 6 10:35:17 2018 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Fri Apr 6 10:35:17 2018 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/internal/IgnitionEx.java  |   2 +-
 .../GridCacheDatabaseSharedManager.java         | 145 ++++++++----
 .../cache/persistence/file/AsyncFileIO.java     |   9 +-
 .../cache/persistence/file/FileIO.java          |  20 +-
 .../cache/persistence/file/FileIODecorator.java |   9 +-
 .../persistence/file/RandomAccessFileIO.java    |  13 +-
 .../cache/persistence/file/UnzipFileIO.java     |   7 +-
 .../file/IgnitePdsDiskErrorsRecoveringTest.java | 231 +++++++++++++++----
 .../db/wal/IgniteWalFlushFailoverTest.java      |   4 +-
 ...lFlushMultiNodeFailoverAbstractSelfTest.java |   4 +-
 .../pagemem/PagesWriteThrottleSmokeTest.java    |   4 +-
 .../file/AlignedBuffersDirectFileIO.java        |   7 +-
 12 files changed, 353 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/4a0695ce/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index 8073faa..4708dd3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -410,7 +410,7 @@ public class IgnitionEx {
                             " milliseconds. Killing node...");
 
                     // We are not able to kill only one grid so whole JVM will be stopped.
-                    System.exit(Ignition.KILL_EXIT_CODE);
+                    Runtime.getRuntime().halt(Ignition.KILL_EXIT_CODE);
                 }
             }
         }, timeoutMs, TimeUnit.MILLISECONDS);

http://git-wip-us.apache.org/repos/asf/ignite/blob/4a0695ce/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 71f3baa..70fc688 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -27,6 +27,7 @@ import java.nio.ByteOrder;
 import java.nio.channels.FileChannel;
 import java.nio.channels.FileLock;
 import java.nio.channels.OverlappingFileLockException;
+import java.nio.file.DirectoryStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
@@ -116,8 +117,10 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalP
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
 import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore;
 import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
+import org.apache.ignite.internal.processors.cache.persistence.file.PersistentStorageIOException;
 import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage;
 import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageLifecycleListener;
 import org.apache.ignite.internal.processors.cache.persistence.pagemem.CheckpointMetricsTracker;
@@ -211,11 +214,14 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
     /** Checkpoint file name pattern. */
     private static final Pattern CP_FILE_NAME_PATTERN = Pattern.compile("(\\d+)-(.*)-(START|END)\\.bin");
 
+    /** Checkpoint file temporary suffix. This is needed to safe writing checkpoint markers through temporary file and renaming. */
+    public static final String FILE_TMP_SUFFIX = ".tmp";
+
     /** Node started file patter. */
     private static final Pattern NODE_STARTED_FILE_NAME_PATTERN = Pattern.compile("(\\d+)-node-started\\.bin");
 
     /** Node started file suffix. */
-    private static final String NODE_STARTED_FILE_NAME_SUFFIX = "-node-started.bin";
+    public static final String NODE_STARTED_FILE_NAME_SUFFIX = "-node-started.bin";
 
     /** */
     private static final FileFilter CP_FILE_FILTER = new FileFilter() {
@@ -378,6 +384,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
     /** Initially disabled cache groups. */
     private Collection<Integer> initiallyWalDisabledGrps;
 
+    /** File I/O factory for writing checkpoint markers. */
+    private final FileIOFactory ioFactory;
+
     /**
      * @param ctx Kernal context.
      */
@@ -402,6 +411,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
 
         maxCpHistMemSize = Math.min(persistenceCfg.getWalHistorySize(),
             IgniteSystemProperties.getInteger(IGNITE_PDS_MAX_CHECKPOINT_MEMORY_HISTORY_SIZE, 100));
+
+        ioFactory = persistenceCfg.getFileIOFactory();
     }
 
     /** */
@@ -494,6 +505,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
             if (!U.mkdirs(cpDir))
                 throw new IgniteCheckedException("Could not create directory for checkpoint metadata: " + cpDir);
 
+            cleanupCheckpointDirectory();
+
             final FileLockHolder preLocked = kernalCtx.pdsFolderResolver()
                 .resolveFolders()
                 .getLockedFileLockHolder();
@@ -508,6 +521,26 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
     }
 
     /**
+     * Cleanup checkpoint directory from all temporary files {@link #FILE_TMP_SUFFIX}.
+     */
+    private void cleanupCheckpointDirectory() throws IgniteCheckedException {
+        try {
+            try (DirectoryStream<Path> files = Files.newDirectoryStream(cpDir.toPath(), new DirectoryStream.Filter<Path>() {
+                @Override
+                public boolean accept(Path path) throws IOException {
+                    return path.endsWith(FILE_TMP_SUFFIX);
+                }
+            })) {
+                for (Path path : files)
+                    Files.delete(path);
+            }
+        }
+        catch (IOException e) {
+            throw new IgniteCheckedException("Failed to cleanup checkpoint directory: " + cpDir, e);
+        }
+    }
+
+    /**
      *
      */
     private void initDataBase() {
@@ -749,7 +782,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
 
             notifyMetastorageReadyForReadWrite();
         }
-        catch (StorageException e) {
+        catch (StorageException | PersistentStorageIOException e) {
             cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
 
             throw new IgniteCheckedException(e);
@@ -760,41 +793,52 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
     }
 
     /**
+     * Creates file with current timestamp and specific "node-started.bin" suffix
+     * and writes into memory recovery pointer.
+     *
      * @param ptr Memory recovery wal pointer.
      */
     private void nodeStart(WALPointer ptr) throws IgniteCheckedException {
         FileWALPointer p = (FileWALPointer)ptr;
 
-        String fileName = U.currentTimeMillis() + "-node-started.bin";
+        String fileName = U.currentTimeMillis() + NODE_STARTED_FILE_NAME_SUFFIX;
+        String tmpFileName = fileName + FILE_TMP_SUFFIX;
 
         ByteBuffer buf = ByteBuffer.allocate(20);
         buf.order(ByteOrder.nativeOrder());
 
-        try (FileChannel ch = FileChannel.open(
-            Paths.get(cpDir.getAbsolutePath(), fileName),
-            StandardOpenOption.CREATE_NEW, StandardOpenOption.APPEND)
-        ) {
-            buf.putLong(p.index());
+        try {
+            try (FileIO io = ioFactory.create(Paths.get(cpDir.getAbsolutePath(), tmpFileName).toFile(),
+                    StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE)) {
+                buf.putLong(p.index());
 
-            buf.putInt(p.fileOffset());
+                buf.putInt(p.fileOffset());
 
-            buf.putInt(p.length());
+                buf.putInt(p.length());
 
-            buf.flip();
+                buf.flip();
 
-            ch.write(buf);
+                io.write(buf);
 
-            buf.clear();
+                buf.clear();
+
+                io.force(true);
+            }
 
-            ch.force(true);
+            Files.move(Paths.get(cpDir.getAbsolutePath(), tmpFileName), Paths.get(cpDir.getAbsolutePath(), fileName));
         }
         catch (IOException e) {
-            throw new IgniteCheckedException(e);
+            throw new PersistentStorageIOException("Failed to write node start marker: " + ptr, e);
         }
     }
 
     /**
+     * Collects memory recovery pointers from node started files. See {@link #nodeStart(WALPointer)}.
+     * Each pointer associated with timestamp extracted from file.
+     * Tuples are sorted by timestamp.
      *
+     * @return Sorted list of tuples (node started timestamp, memory recovery pointer).
+     * @throws IgniteCheckedException
      */
     public List<T2<Long, WALPointer>> nodeStartedPointers() throws IgniteCheckedException {
         List<T2<Long, WALPointer>> res = new ArrayList<>();
@@ -806,15 +850,10 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
                 String n1 = o1.getName();
                 String n2 = o2.getName();
 
-                Long ts1 = Long.valueOf(n1.substring(0, n1.length() - NODE_STARTED_FILE_NAME_SUFFIX.length()));
-                Long ts2 = Long.valueOf(n2.substring(0, n2.length() - NODE_STARTED_FILE_NAME_SUFFIX.length()));
+                long ts1 = Long.valueOf(n1.substring(0, n1.length() - NODE_STARTED_FILE_NAME_SUFFIX.length()));
+                long ts2 = Long.valueOf(n2.substring(0, n2.length() - NODE_STARTED_FILE_NAME_SUFFIX.length()));
 
-                if (ts1 == ts2)
-                    return 0;
-                else if (ts1 < ts2)
-                    return -1;
-                else
-                    return 1;
+                return Long.compare(ts1, ts2);
             }
         });
 
@@ -826,8 +865,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
 
             Long ts = Long.valueOf(name.substring(0, name.length() - NODE_STARTED_FILE_NAME_SUFFIX.length()));
 
-            try (FileChannel ch = FileChannel.open(f.toPath(), READ)) {
-                ch.read(buf);
+            try (FileIO io = ioFactory.create(f, READ)) {
+                io.read(buf);
 
                 buf.flip();
 
@@ -1869,8 +1908,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
     private WALPointer readPointer(File cpMarkerFile, ByteBuffer buf) throws IgniteCheckedException {
         buf.position(0);
 
-        try (FileChannel ch = FileChannel.open(cpMarkerFile.toPath(), READ)) {
-            ch.read(buf);
+        try (FileIO io = ioFactory.create(cpMarkerFile, READ)) {
+            io.read(buf);
 
             buf.flip();
 
@@ -2584,6 +2623,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
     }
 
     /**
+     * Writes into specified file checkpoint entry containing WAL pointer to checkpoint record.
+     *
      * @param cpId Checkpoint ID.
      * @param ptr Wal pointer of current checkpoint.
      */
@@ -2600,31 +2641,40 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
         FileWALPointer filePtr = (FileWALPointer)ptr;
 
         String fileName = checkpointFileName(cpTs, cpId, type);
+        String tmpFileName = fileName + FILE_TMP_SUFFIX;
 
-        try (FileChannel ch = FileChannel.open(Paths.get(cpDir.getAbsolutePath(), fileName),
-            StandardOpenOption.CREATE_NEW, StandardOpenOption.APPEND)) {
+        try {
+            try (FileIO io = ioFactory.create(Paths.get(cpDir.getAbsolutePath(), skipSync ? fileName : tmpFileName).toFile(),
+                    StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE)) {
 
-            tmpWriteBuf.rewind();
+                tmpWriteBuf.rewind();
 
-            tmpWriteBuf.putLong(filePtr.index());
+                tmpWriteBuf.putLong(filePtr.index());
 
-            tmpWriteBuf.putInt(filePtr.fileOffset());
+                tmpWriteBuf.putInt(filePtr.fileOffset());
 
-            tmpWriteBuf.putInt(filePtr.length());
+                tmpWriteBuf.putInt(filePtr.length());
 
-            tmpWriteBuf.flip();
+                tmpWriteBuf.flip();
 
-            ch.write(tmpWriteBuf);
+                io.write(tmpWriteBuf);
 
-            tmpWriteBuf.clear();
+                tmpWriteBuf.clear();
+
+                if (!skipSync)
+                    io.force(true);
+            }
 
             if (!skipSync)
-                ch.force(true);
+                Files.move(Paths.get(cpDir.getAbsolutePath(), tmpFileName), Paths.get(cpDir.getAbsolutePath(), fileName));
 
             return createCheckPointEntry(cpTs, ptr, cpId, rec, type);
         }
         catch (IOException e) {
-            throw new IgniteCheckedException(e);
+            throw new PersistentStorageIOException("Failed to write checkpoint entry [ptr=" + filePtr
+                    + ", cpTs=" + cpTs
+                    + ", cpId=" + cpId
+                    + ", type=" + type + "]", e);
         }
     }
 
@@ -2691,8 +2741,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
         if (type != CheckpointEntryType.START)
             return null;
 
-        CheckpointEntry entry;
-
         Map<Integer, CacheState> cacheGrpStates = null;
 
         // Create lazy checkpoint entry.
@@ -2827,7 +2875,20 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
             try {
                 CheckpointMetricsTracker tracker = new CheckpointMetricsTracker();
 
-                Checkpoint chp = markCheckpointBegin(tracker);
+                Checkpoint chp;
+
+                try {
+                    chp = markCheckpointBegin(tracker);
+                }
+                catch (IgniteCheckedException e) {
+                    if (curCpProgress != null)
+                        curCpProgress.cpFinishFut.onDone(e);
+
+                    // In case of checkpoint initialization error node should be invalidated and stopped.
+                    cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
+
+                    return;
+                }
 
                 currCheckpointPagesCnt = chp.pagesSize;
 
@@ -2885,7 +2946,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
                         } catch (IgniteCheckedException e) {
                             chp.progress.cpFinishFut.onDone(e);
 
-                            // In case of writing error node should be invalidated and stopped.
+                            // In case of checkpoint writing error node should be invalidated and stopped.
                             cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
 
                             return;

http://git-wip-us.apache.org/repos/asf/ignite/blob/4a0695ce/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AsyncFileIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AsyncFileIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AsyncFileIO.java
index b1db79d..799a78c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AsyncFileIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AsyncFileIO.java
@@ -169,13 +169,18 @@ public class AsyncFileIO implements FileIO {
     }
 
     /** {@inheritDoc} */
-    @Override public MappedByteBuffer map(int maxWalSegmentSize) throws IOException {
+    @Override public MappedByteBuffer map(int sizeBytes) throws IOException {
         throw new UnsupportedOperationException("AsynchronousFileChannel doesn't support mmap.");
     }
 
     /** {@inheritDoc} */
     @Override public void force() throws IOException {
-        ch.force(false);
+        force(false);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void force(boolean withMetadata) throws IOException {
+        ch.force(withMetadata);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/4a0695ce/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIO.java
index 73e44b0..822bd66 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIO.java
@@ -124,7 +124,16 @@ public interface FileIO extends AutoCloseable {
      */
     public void write(byte[] buf, int off, int len) throws IOException;
 
-    public MappedByteBuffer map(int maxWalSegmentSize) throws IOException;
+    /**
+     * Allocates memory mapped buffer for this file with given size.
+     *
+     * @param sizeBytes Size of buffer.
+     *
+     * @return Instance of mapped byte buffer.
+     *
+     * @throws IOException If some I/O error occurs.
+     */
+    public MappedByteBuffer map(int sizeBytes) throws IOException;
 
     /**
      * Forces any updates of this file to be written to the storage
@@ -135,6 +144,15 @@ public interface FileIO extends AutoCloseable {
     public void force() throws IOException;
 
     /**
+     * Forces any updates of this file to be written to the storage
+     * device that contains it.
+     *
+     * @param withMetadata If {@code true} force also file metadata.
+     * @throws IOException If some I/O error occurs.
+     */
+    public void force(boolean withMetadata) throws IOException;
+
+    /**
      * Returns current file size in bytes.
      *
      * @return File size.

http://git-wip-us.apache.org/repos/asf/ignite/blob/4a0695ce/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIODecorator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIODecorator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIODecorator.java
index dd563f2..683845b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIODecorator.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIODecorator.java
@@ -77,8 +77,8 @@ public class FileIODecorator implements FileIO {
     }
 
     /** {@inheritDoc} */
-    @Override public MappedByteBuffer map(int maxWalSegmentSize) throws IOException {
-        return delegate.map(maxWalSegmentSize);
+    @Override public MappedByteBuffer map(int sizeBytes) throws IOException {
+        return delegate.map(sizeBytes);
     }
 
     /** {@inheritDoc} */
@@ -87,6 +87,11 @@ public class FileIODecorator implements FileIO {
     }
 
     /** {@inheritDoc} */
+    @Override public void force(boolean withMetadata) throws IOException {
+        delegate.force(withMetadata);
+    }
+
+    /** {@inheritDoc} */
     @Override public long size() throws IOException {
         return delegate.size();
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/4a0695ce/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIO.java
index 23d6ebf..8f7454d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIO.java
@@ -84,8 +84,8 @@ public class RandomAccessFileIO implements FileIO {
     }
 
     /** {@inheritDoc} */
-    @Override public void force() throws IOException {
-        ch.force(false);
+    @Override public void force(boolean withMetadata) throws IOException {
+        ch.force(withMetadata);
     }
 
     /** {@inheritDoc} */
@@ -104,7 +104,12 @@ public class RandomAccessFileIO implements FileIO {
     }
 
     /** {@inheritDoc} */
-    @Override public MappedByteBuffer map(int maxWalSegmentSize) throws IOException {
-        return ch.map(FileChannel.MapMode.READ_WRITE, 0, maxWalSegmentSize);
+    @Override public MappedByteBuffer map(int sizeBytes) throws IOException {
+        return ch.map(FileChannel.MapMode.READ_WRITE, 0, sizeBytes);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void force() throws IOException {
+        force(false);
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/4a0695ce/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/UnzipFileIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/UnzipFileIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/UnzipFileIO.java
index 83ff91b..469cf3e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/UnzipFileIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/UnzipFileIO.java
@@ -116,6 +116,11 @@ public class UnzipFileIO implements FileIO {
 
     /** {@inheritDoc} */
     @Override public void force() throws IOException {
+        force(false);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void force(boolean withMetadata) throws IOException {
         throw new UnsupportedOperationException();
     }
 
@@ -130,7 +135,7 @@ public class UnzipFileIO implements FileIO {
     }
 
     /** {@inheritDoc} */
-    @Override public MappedByteBuffer map(int maxWalSegmentSize) throws IOException {
+    @Override public MappedByteBuffer map(int sizeBytes) throws IOException {
         throw new UnsupportedOperationException();
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/4a0695ce/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsDiskErrorsRecoveringTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsDiskErrorsRecoveringTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsDiskErrorsRecoveringTest.java
index 3e85c77..c902879 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsDiskErrorsRecoveringTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsDiskErrorsRecoveringTest.java
@@ -25,6 +25,7 @@ import java.nio.file.OpenOption;
 import java.util.Arrays;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheRebalanceMode;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
@@ -39,15 +40,20 @@ import org.apache.ignite.failure.StopNodeFailureHandler;
 import org.apache.ignite.internal.GridKernalState;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIO;
 import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
 import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.Assert;
 
+import static java.nio.file.StandardOpenOption.CREATE;
+import static java.nio.file.StandardOpenOption.READ;
+import static java.nio.file.StandardOpenOption.WRITE;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_MMAP;
 
 /**
@@ -61,19 +67,13 @@ public class IgnitePdsDiskErrorsRecoveringTest extends GridCommonAbstractTest {
     private static final int WAL_SEGMENT_SIZE = 1024 * PAGE_SIZE;
 
     /** */
-    private static final long DFLT_DISK_SPACE_BYTES = Long.MAX_VALUE;
-
-    /** */
     private static final long STOP_TIMEOUT_MS = 30 * 1000;
 
     /** */
     private static final String CACHE_NAME = "cache";
 
-    /** */
-    private boolean failPageStoreDiskOperations = false;
-
-    /** */
-    private long diskSpaceBytes = DFLT_DISK_SPACE_BYTES;
+    /** Specified i/o factory for particular test. */
+    private FileIOFactory ioFactory;
 
     /** {@inheritDoc} */
     @Override protected void beforeTest() throws Exception {
@@ -88,8 +88,7 @@ public class IgnitePdsDiskErrorsRecoveringTest extends GridCommonAbstractTest {
 
         cleanPersistenceDir();
 
-        failPageStoreDiskOperations = false;
-        diskSpaceBytes = DFLT_DISK_SPACE_BYTES;
+        ioFactory = null;
         System.clearProperty(IGNITE_WAL_MMAP);
     }
 
@@ -103,10 +102,11 @@ public class IgnitePdsDiskErrorsRecoveringTest extends GridCommonAbstractTest {
                 .setWalMode(WALMode.LOG_ONLY)
                 .setWalCompactionEnabled(false)
                 .setWalSegmentSize(WAL_SEGMENT_SIZE)
+                .setCheckpointFrequency(240 * 60 * 1000)
                 .setConcurrencyLevel(Runtime.getRuntime().availableProcessors() * 4);
 
-        if (failPageStoreDiskOperations)
-            dsCfg.setFileIOFactory(new LimitedSizeFileIOFactory(new RandomAccessFileIOFactory(), diskSpaceBytes));
+        if (ioFactory != null)
+            dsCfg.setFileIOFactory(ioFactory);
 
         cfg.setDataStorageConfiguration(dsCfg);
 
@@ -122,19 +122,17 @@ public class IgnitePdsDiskErrorsRecoveringTest extends GridCommonAbstractTest {
     }
 
     /**
-     *
+     * Test node stopping & recovering on cache initialization fail.
      */
-    public void testRecoveringOnCacheInitError() throws Exception {
-        failPageStoreDiskOperations = true;
-
-        // Two pages is enough to initialize MetaStorage.
-        diskSpaceBytes = 2 * PAGE_SIZE;
+    public void testRecoveringOnCacheInitFail() throws Exception {
+        // Fail to initialize page store. 2 extra pages is needed for MetaStorage.
+        ioFactory = new FilteringFileIOFactory(".bin", new LimitedSizeFileIOFactory(new RandomAccessFileIOFactory(), 2 * PAGE_SIZE));
 
         final IgniteEx grid = startGrid(0);
 
         boolean failed = false;
         try {
-            grid.active(true);
+            grid.cluster().active(true);
         } catch (Exception expected) {
             log.warning("Expected cache error", expected);
 
@@ -147,21 +145,128 @@ public class IgnitePdsDiskErrorsRecoveringTest extends GridCommonAbstractTest {
         awaitStop(grid);
 
         // Grid should be successfully recovered after stopping.
-        failPageStoreDiskOperations = false;
+        ioFactory = null;
 
         IgniteEx recoveredGrid = startGrid(0);
         recoveredGrid.active(true);
     }
 
     /**
+     * Test node stopping & recovering on start marker writing fail during activation.
      *
+     * @throws Exception If test failed.
      */
-    public void testRecoveringOnCheckpointWritingError() throws Exception {
-        failPageStoreDiskOperations = true;
-        diskSpaceBytes = 1024 * PAGE_SIZE;
+    public void testRecoveringOnNodeStartMarkerWriteFail() throws Exception {
+        // Fail to write node start marker tmp file at the second checkpoint. Pass only initial checkpoint.
+        ioFactory = new FilteringFileIOFactory("started.bin" + GridCacheDatabaseSharedManager.FILE_TMP_SUFFIX, new LimitedSizeFileIOFactory(new RandomAccessFileIOFactory(), 20));
+
+        IgniteEx grid = startGrid(0);
+        grid.cluster().active(true);
+
+        for (int i = 0; i < 1000; i++) {
+            byte payload = (byte) i;
+            byte[] data = new byte[2048];
+            Arrays.fill(data, payload);
+
+            grid.cache(CACHE_NAME).put(i, data);
+        }
+
+        stopAllGrids();
+
+        boolean activationFailed = false;
+        try {
+            grid = startGrid(0);
+            grid.cluster().active(true);
+        }
+        catch (IgniteException e) {
+            log.warning("Activation test exception", e);
+
+            activationFailed = true;
+        }
+
+        Assert.assertTrue("Activation must be failed", activationFailed);
+
+        // Grid should be automatically stopped after checkpoint fail.
+        awaitStop(grid);
+
+        // Grid should be successfully recovered after stopping.
+        ioFactory = null;
+
+        IgniteEx recoveredGrid = startGrid(0);
+        recoveredGrid.cluster().active(true);
+
+        for (int i = 0; i < 1000; i++) {
+            byte payload = (byte) i;
+            byte[] data = new byte[2048];
+            Arrays.fill(data, payload);
+
+            byte[] actualData = (byte[]) recoveredGrid.cache(CACHE_NAME).get(i);
+            Assert.assertArrayEquals(data, actualData);
+        }
+    }
+
+
+    /**
+     * Test node stopping & recovering on checkpoint begin fail.
+     *
+     * @throws Exception If test failed.
+     */
+    public void testRecoveringOnCheckpointBeginFail() throws Exception {
+        // Fail to write checkpoint start marker tmp file at the second checkpoint. Pass only initial checkpoint.
+        ioFactory = new FilteringFileIOFactory("START.bin" + GridCacheDatabaseSharedManager.FILE_TMP_SUFFIX, new LimitedSizeFileIOFactory(new RandomAccessFileIOFactory(), 20));
 
         final IgniteEx grid = startGrid(0);
-        grid.active(true);
+        grid.cluster().active(true);
+
+        for (int i = 0; i < 1000; i++) {
+            byte payload = (byte) i;
+            byte[] data = new byte[2048];
+            Arrays.fill(data, payload);
+
+            grid.cache(CACHE_NAME).put(i, data);
+        }
+
+        String errMsg = "Failed to write checkpoint entry";
+
+        boolean checkpointFailed = false;
+        try {
+            forceCheckpoint();
+        }
+        catch (IgniteCheckedException e) {
+            if (e.getMessage().contains(errMsg))
+                checkpointFailed = true;
+        }
+
+        Assert.assertTrue("Checkpoint must be failed by IgniteCheckedException: " + errMsg, checkpointFailed);
+
+        // Grid should be automatically stopped after checkpoint fail.
+        awaitStop(grid);
+
+        // Grid should be successfully recovered after stopping.
+        ioFactory = null;
+
+        IgniteEx recoveredGrid = startGrid(0);
+        recoveredGrid.cluster().active(true);
+
+        for (int i = 0; i < 1000; i++) {
+            byte payload = (byte) i;
+            byte[] data = new byte[2048];
+            Arrays.fill(data, payload);
+
+            byte[] actualData = (byte[]) recoveredGrid.cache(CACHE_NAME).get(i);
+            Assert.assertArrayEquals(data, actualData);
+        }
+    }
+
+    /**
+     * Test node stopping & recovering on checkpoint pages write fail.
+     */
+    public void testRecoveringOnCheckpointWriteFail() throws Exception {
+        // Fail write partition and index files at the second checkpoint. Pass only initial checkpoint.
+        ioFactory = new FilteringFileIOFactory(".bin", new LimitedSizeFileIOFactory(new RandomAccessFileIOFactory(), 128 * PAGE_SIZE));
+
+        final IgniteEx grid = startGrid(0);
+        grid.cluster().active(true);
 
         for (int i = 0; i < 1000; i++) {
             byte payload = (byte) i;
@@ -187,10 +292,10 @@ public class IgnitePdsDiskErrorsRecoveringTest extends GridCommonAbstractTest {
         awaitStop(grid);
 
         // Grid should be successfully recovered after stopping.
-        failPageStoreDiskOperations = false;
+        ioFactory = null;
 
         IgniteEx recoveredGrid = startGrid(0);
-        recoveredGrid.active(true);
+        recoveredGrid.cluster().active(true);
 
         for (int i = 0; i < 1000; i++) {
             byte payload = (byte) i;
@@ -203,33 +308,35 @@ public class IgnitePdsDiskErrorsRecoveringTest extends GridCommonAbstractTest {
     }
 
     /**
-     *
+     * Test node stopping & recovering on WAL writing fail with enabled MMAP (Batch allocation for WAL segments).
      */
-    public void testRecoveringOnWALErrorWithMmap() throws Exception {
-        diskSpaceBytes = WAL_SEGMENT_SIZE;
+    public void testRecoveringOnWALWritingFail1() throws Exception {
+        // Allow to allocate only 1 wal segment, fail on write to second.
+        ioFactory = new FilteringFileIOFactory(".wal", new LimitedSizeFileIOFactory(new RandomAccessFileIOFactory(), WAL_SEGMENT_SIZE));
         System.setProperty(IGNITE_WAL_MMAP, "true");
-        emulateRecoveringOnWALWritingError();
+        doTestRecoveringOnWALWritingFail();
     }
 
     /**
-     *
+     * Test node stopping & recovering on WAL writing fail with disabled MMAP.
      */
-    public void testRecoveringOnWALErrorWithoutMmap() throws Exception {
-        diskSpaceBytes = 2 * WAL_SEGMENT_SIZE;
+    public void testRecoveringOnWALWritingFail2() throws Exception {
+        // Fail somewhere on the second wal segment.
+        ioFactory = new FilteringFileIOFactory(".wal", new LimitedSizeFileIOFactory(new RandomAccessFileIOFactory(), (long) (1.5 * WAL_SEGMENT_SIZE)));
         System.setProperty(IGNITE_WAL_MMAP, "false");
-        emulateRecoveringOnWALWritingError();
+        doTestRecoveringOnWALWritingFail();
     }
 
     /**
-     *
+     * Test node stopping & recovery on WAL writing fail.
      */
-    private void emulateRecoveringOnWALWritingError() throws Exception {
+    private void doTestRecoveringOnWALWritingFail() throws Exception {
         final IgniteEx grid = startGrid(0);
 
         FileWriteAheadLogManager wal = (FileWriteAheadLogManager)grid.context().cache().context().wal();
-        wal.setFileIOFactory(new LimitedSizeFileIOFactory(new RandomAccessFileIOFactory(), diskSpaceBytes));
+        wal.setFileIOFactory(ioFactory);
 
-        grid.active(true);
+        grid.cluster().active(true);
 
         int failedPosition = -1;
 
@@ -254,9 +361,11 @@ public class IgnitePdsDiskErrorsRecoveringTest extends GridCommonAbstractTest {
         // Grid should be automatically stopped after WAL fail.
         awaitStop(grid);
 
+        ioFactory = null;
+
         // Grid should be successfully recovered after stopping.
         IgniteEx recoveredGrid = startGrid(0);
-        recoveredGrid.active(true);
+        recoveredGrid.cluster().active(true);
 
         for (int i = 0; i < failedPosition; i++) {
             byte payload = (byte) i;
@@ -328,11 +437,49 @@ public class IgnitePdsDiskErrorsRecoveringTest extends GridCommonAbstractTest {
         }
 
         /** {@inheritDoc} */
-        @Override public MappedByteBuffer map(int maxWalSegmentSize) throws IOException {
-            availableSpaceBytes.addAndGet(-maxWalSegmentSize);
+        @Override public MappedByteBuffer map(int sizeBytes) throws IOException {
+            availableSpaceBytes.addAndGet(-sizeBytes);
             if (availableSpaceBytes.get() < 0)
                 throw new IOException("Not enough space!");
-            return super.map(maxWalSegmentSize);
+            return super.map(sizeBytes);
+        }
+    }
+
+    /**
+     * Factory to provide custom File I/O interfaces only for files with specified suffix.
+     * For other files {@link RandomAccessFileIO} will be used.
+     */
+    private static class FilteringFileIOFactory implements FileIOFactory {
+        /** Serial version uid. */
+        private static final long serialVersionUID = 0L;
+
+        /** Delegate. */
+        private final FileIOFactory delegate;
+
+        /** File suffix pattern. */
+        private final String pattern;
+
+        /**
+         * Constructor.
+         *
+         * @param pattern File suffix pattern.
+         * @param delegate I/O Factory delegate.
+         */
+        FilteringFileIOFactory(String pattern, FileIOFactory delegate) {
+            this.delegate = delegate;
+            this.pattern = pattern;
+        }
+
+        /** {@inheritDoc} */
+        @Override public FileIO create(File file) throws IOException {
+            return create(file, CREATE, WRITE, READ);
+        }
+
+        /** {@inheritDoc} */
+        @Override public FileIO create(File file, OpenOption... modes) throws IOException {
+            if (file.getName().endsWith(pattern))
+                return delegate.create(file, modes);
+            return new RandomAccessFileIO(file, modes);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/4a0695ce/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java
index 946b4e8..042a447 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java
@@ -206,8 +206,8 @@ public class IgniteWalFlushFailoverTest extends GridCommonAbstractTest {
                 }
 
                 /** {@inheritDoc} */
-                @Override public MappedByteBuffer map(int maxWalSegmentSize) throws IOException {
-                    return delegate.map(maxWalSegmentSize);
+                @Override public MappedByteBuffer map(int sizeBytes) throws IOException {
+                    return delegate.map(sizeBytes);
                 }
             };
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/4a0695ce/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushMultiNodeFailoverAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushMultiNodeFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushMultiNodeFailoverAbstractSelfTest.java
index 1259c3c..fe16328 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushMultiNodeFailoverAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushMultiNodeFailoverAbstractSelfTest.java
@@ -250,8 +250,8 @@ public abstract class IgniteWalFlushMultiNodeFailoverAbstractSelfTest extends Gr
                 }
 
                 /** {@inheritDoc} */
-                @Override public MappedByteBuffer map(int maxWalSegmentSize) throws IOException {
-                    return delegate.map(maxWalSegmentSize);
+                @Override public MappedByteBuffer map(int sizeBytes) throws IOException {
+                    return delegate.map(sizeBytes);
                 }
             };
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/4a0695ce/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSmokeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSmokeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSmokeTest.java
index 9f1342f..249718b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSmokeTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSmokeTest.java
@@ -319,8 +319,8 @@ public class PagesWriteThrottleSmokeTest extends GridCommonAbstractTest {
                 }
 
                 /** {@inheritDoc} */
-                @Override public MappedByteBuffer map(int maxWalSegmentSize) throws IOException {
-                    return delegate.map(maxWalSegmentSize);
+                @Override public MappedByteBuffer map(int sizeBytes) throws IOException {
+                    return delegate.map(sizeBytes);
                 }
             };
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/4a0695ce/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AlignedBuffersDirectFileIO.java
----------------------------------------------------------------------
diff --git a/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AlignedBuffersDirectFileIO.java b/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AlignedBuffersDirectFileIO.java
index 3cb4886..681426c 100644
--- a/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AlignedBuffersDirectFileIO.java
+++ b/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AlignedBuffersDirectFileIO.java
@@ -460,12 +460,17 @@ public class AlignedBuffersDirectFileIO implements FileIO {
     }
 
     /** {@inheritDoc} */
-    @Override public MappedByteBuffer map(int maxWalSegmentSize) throws IOException {
+    @Override public MappedByteBuffer map(int sizeBytes) throws IOException {
         throw new UnsupportedOperationException("AsynchronousFileChannel doesn't support mmap.");
     }
 
     /** {@inheritDoc} */
     @Override public void force() throws IOException {
+        force(false);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void force(boolean withMetadata) throws IOException {
         if (IgniteNativeIoLib.fsync(fdCheckOpened()) < 0)
             throw new IOException(String.format("Error fsync()'ing %s, got %s", file, getLastError()));
     }


[4/5] ignite git commit: IGNITE-8163 PDS Indexing suite is hanging on TC in different branches including master - Fixes #3766.

Posted by ag...@apache.org.
IGNITE-8163 PDS Indexing suite is hanging on TC in different branches including master - Fixes #3766.

Signed-off-by: dpavlov <dp...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8053fc1c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8053fc1c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8053fc1c

Branch: refs/heads/ignite-6083
Commit: 8053fc1c9c774037018601e1661992644c8319df
Parents: 57aecd7
Author: Sergey Chugunov <se...@gmail.com>
Authored: Fri Apr 6 15:18:12 2018 +0300
Committer: dpavlov <dp...@apache.org>
Committed: Fri Apr 6 15:18:12 2018 +0300

----------------------------------------------------------------------
 .../cache/persistence/db/wal/WalCompactionTest.java       | 10 ++++++----
 1 file changed, 6 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/8053fc1c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java
index 6b79d90..c1e8967 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java
@@ -105,7 +105,7 @@ public class WalCompactionTest extends GridCommonAbstractTest {
      */
     public void testApplyingUpdatesFromCompactedWal() throws Exception {
         IgniteEx ig = (IgniteEx)startGrids(3);
-        ig.active(true);
+        ig.cluster().active(true);
 
         IgniteCache<Integer, byte[]> cache = ig.cache("cache");
 
@@ -162,7 +162,8 @@ public class WalCompactionTest extends GridCommonAbstractTest {
             f.delete();
 
         ig = (IgniteEx)startGrids(3);
-        ig.active(true);
+
+        awaitPartitionMapExchange();
 
         cache = ig.cache(CACHE_NAME);
 
@@ -192,7 +193,7 @@ public class WalCompactionTest extends GridCommonAbstractTest {
      */
     public void testSeekingStartInCompactedSegment() throws Exception {
         IgniteEx ig = (IgniteEx)startGrids(3);
-        ig.active(true);
+        ig.cluster().active(true);
 
         IgniteCache<Integer, byte[]> cache = ig.cache("cache");
 
@@ -281,7 +282,8 @@ public class WalCompactionTest extends GridCommonAbstractTest {
             f.delete();
 
         ig = (IgniteEx)startGrids(3);
-        ig.active(true);
+
+        awaitPartitionMapExchange();
 
         cache = ig.cache(CACHE_NAME);
 


[2/5] ignite git commit: IGNITE-8160 Fixed flaky data structures test

Posted by ag...@apache.org.
IGNITE-8160 Fixed flaky data structures test


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/84a40e53
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/84a40e53
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/84a40e53

Branch: refs/heads/ignite-6083
Commit: 84a40e53a971b7a96d9dd80b2a2c6873bd6e09e2
Parents: 4a0695c
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Fri Apr 6 12:45:51 2018 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Fri Apr 6 12:45:51 2018 +0300

----------------------------------------------------------------------
 ...eAbstractDataStructuresFailoverSelfTest.java | 56 +++++++++++++++++---
 1 file changed, 49 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/84a40e53/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
index 9f9e577..69a466d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
@@ -19,7 +19,9 @@ package org.apache.ignite.internal.processors.cache.datastructures;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.List;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.UUID;
@@ -37,6 +39,7 @@ import org.apache.ignite.IgniteAtomicLong;
 import org.apache.ignite.IgniteAtomicReference;
 import org.apache.ignite.IgniteAtomicSequence;
 import org.apache.ignite.IgniteAtomicStamped;
+import org.apache.ignite.IgniteCompute;
 import org.apache.ignite.IgniteCountDownLatch;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteInterruptedException;
@@ -50,11 +53,13 @@ import org.apache.ignite.configuration.CollectionConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.util.GridLeanSet;
 import org.apache.ignite.internal.util.typedef.CA;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.PA;
+import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteCallable;
@@ -1109,8 +1114,36 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
      * @throws Exception If failed.
      */
     public void testAtomicSequenceInitialization() throws Exception {
+        checkAtomicSequenceInitialization(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicSequenceInitializationOnStableNodes() throws Exception {
+        checkAtomicSequenceInitialization(true);
+    }
+
+    /**
+     * @param limitProjection {@code True} if test should call init only on stable nodes.
+     * @throws Exception If failed.
+     */
+    private void checkAtomicSequenceInitialization(boolean limitProjection) throws Exception {
         int threadCnt = 3;
 
+        IgniteCompute compute;
+
+        if (limitProjection) {
+            List<UUID> nodeIds = new ArrayList<>(gridCount());
+
+            for (int i = 0; i < gridCount(); i++)
+                nodeIds.add(grid(i).cluster().localNode().id());
+
+            compute = grid(0).compute(grid(0).cluster().forNodeIds(nodeIds));
+        }
+        else
+            compute = grid(0).compute();
+
         final AtomicInteger idx = new AtomicInteger(gridCount());
 
         IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
@@ -1137,20 +1170,29 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
         }, threadCnt, "test-thread");
 
         while (!fut.isDone()) {
-            grid(0).compute().call(new IgniteCallable<Object>() {
+            compute.call(new IgniteCallable<Object>() {
                 /** */
                 @IgniteInstanceResource
                 private Ignite g;
 
-                @Override public Object call() throws Exception {
-                    IgniteAtomicSequence seq = g.atomicSequence(STRUCTURE_NAME, 1, true);
+                @Override public Object call() {
+                    try {
+                        IgniteAtomicSequence seq = g.atomicSequence(STRUCTURE_NAME, 1, true);
+
+                        assert seq != null;
 
-                    assert seq != null;
+                        for (int i = 0; i < 1000; i++)
+                            seq.getAndIncrement();
 
-                    for (int i = 0; i < 1000; i++)
-                        seq.getAndIncrement();
+                        return null;
+                    }
+                    catch (IgniteException e) {
+                        // Fail if we are on stable nodes or exception is not node stop.
+                        if (limitProjection || !X.hasCause(e, NodeStoppingException.class))
+                            throw e;
 
-                    return null;
+                        return null;
+                    }
                 }
             });
         }


[3/5] ignite git commit: IGNITE-8018 Optimized GridCacheMapEntry initialValue() - Fixes #3686.

Posted by ag...@apache.org.
IGNITE-8018 Optimized GridCacheMapEntry initialValue() - Fixes #3686.

Signed-off-by: Alexey Goncharuk <al...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/57aecd7a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/57aecd7a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/57aecd7a

Branch: refs/heads/ignite-6083
Commit: 57aecd7ab2bf5f0836aabf40e4b4051fd07228d2
Parents: 84a40e5
Author: Ilya Lantukh <il...@gridgain.com>
Authored: Fri Apr 6 13:49:10 2018 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Fri Apr 6 13:49:10 2018 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheMapEntry.java     | 158 +++++++++++++++----
 .../colocated/GridDhtDetachedCacheEntry.java    |   3 +-
 .../distributed/near/GridNearCacheEntry.java    |   3 +-
 3 files changed, 131 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/57aecd7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 74dabe9..a6ef0d2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -77,6 +77,7 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgnitePredicate;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_EXPIRED;
@@ -2699,40 +2700,80 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
     ) throws IgniteCheckedException, GridCacheEntryRemovedException {
         ensureFreeSpace();
 
+        boolean deferred = false;
+        boolean obsolete = false;
+
+        GridCacheVersion oldVer = null;
+
         lockEntry();
 
         try {
             checkObsolete();
 
+            boolean walEnabled = !cctx.isNear() && cctx.group().persistenceEnabled() && cctx.group().walEnabled();
+
+            long expTime = expireTime < 0 ? CU.toExpireTime(ttl) : expireTime;
+
+            val = cctx.kernalContext().cacheObjects().prepareForCache(val, cctx);
+
+            final boolean unswapped = ((flags & IS_UNSWAPPED_MASK) != 0);
+
             boolean update;
 
-            boolean walEnabled = !cctx.isNear() && cctx.group().persistenceEnabled() && cctx.group().walEnabled();
+            IgnitePredicate<CacheDataRow> p = new IgnitePredicate<CacheDataRow>() {
+                @Override public boolean apply(@Nullable CacheDataRow row) {
+                    boolean update0;
+
+                    GridCacheVersion currentVer = row != null ? row.version() : GridCacheMapEntry.this.ver;
 
-            if (cctx.group().persistenceEnabled()) {
-                unswap(false);
+                    boolean isStartVer = currentVer.nodeOrder() == cctx.localNode().order()
+                        && currentVer.order() == startVer;
 
-                if (!isNew()) {
-                    if (cctx.atomic())
-                        update = ATOMIC_VER_COMPARATOR.compare(this.ver, ver) < 0;
+                    if (cctx.group().persistenceEnabled()) {
+                        if (!isStartVer) {
+                            if (cctx.atomic())
+                                update0 = ATOMIC_VER_COMPARATOR.compare(currentVer, ver) < 0;
+                            else
+                                update0 = currentVer.compareTo(ver) < 0;
+                        }
+                        else
+                            update0 = true;
+                    }
                     else
-                        update = this.ver.compareTo(ver) < 0;
+                        update0 = isStartVer;
+
+                    update0 |= (!preload && deletedUnlocked());
+
+                    return update0;
                 }
-                else
-                    update = true;
-            }
-            else
-                update = isNew() && !cctx.offheap().containsKey(this);
+            };
 
-            update |= !preload && deletedUnlocked();
+            if (unswapped) {
+                update = p.apply(null);
 
-            if (update) {
-                long expTime = expireTime < 0 ? CU.toExpireTime(ttl) : expireTime;
+                if (update) {
+                    // If entry is already unswapped and we are modifying it, we must run deletion callbacks for old value.
+                    long oldExpTime = expireTimeUnlocked();
+                    long delta = (oldExpTime == 0 ? 0 : oldExpTime - U.currentTimeMillis());
 
-                val = cctx.kernalContext().cacheObjects().prepareForCache(val, cctx);
+                    if (delta < 0) {
+                        if (onExpired(this.val, null)) {
+                            if (cctx.deferredDelete()) {
+                                deferred = true;
+                                oldVer = this.ver;
+                            }
+                            else if (val == null)
+                                obsolete = true;
+                        }
+                    }
 
-                if (val != null)
                     storeValue(val, expTime, ver, null);
+                }
+            }
+            else // Optimization to access storage only once.
+                update = storeValue(val, expTime, ver, null, p);
 
+            if (update) {
                 update(val, expTime, ttl, ver, true);
 
                 boolean skipQryNtf = false;
@@ -2797,6 +2838,20 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         }
         finally {
             unlockEntry();
+
+            // It is necessary to execute these callbacks outside of lock to avoid deadlocks.
+
+            if (obsolete) {
+                onMarkedObsolete();
+
+                cctx.cache().removeEntry(this);
+            }
+
+            if (deferred) {
+                assert oldVer != null;
+
+                cctx.onDeferredDelete(this, oldVer);
+            }
         }
     }
 
@@ -3516,14 +3571,39 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
      * @param oldRow Old row if available.
      * @throws IgniteCheckedException If update failed.
      */
-    protected void storeValue(CacheObject val,
+    protected boolean storeValue(CacheObject val,
         long expireTime,
         GridCacheVersion ver,
         @Nullable CacheDataRow oldRow) throws IgniteCheckedException {
-        assert lock.isHeldByCurrentThread();
         assert val != null : "null values in update for key: " + key;
 
-        cctx.offheap().invoke(cctx, key,  localPartition(), new UpdateClosure(this, val, ver, expireTime));
+        return storeValue(val, expireTime, ver, oldRow, null);
+    }
+
+    /**
+     * Stores value in offheap.
+     *
+     * @param val Value.
+     * @param expireTime Expire time.
+     * @param ver New entry version.
+     * @param oldRow Old row if available.
+     * @param predicate Optional predicate.
+     * @throws IgniteCheckedException If update failed.
+     * @return {@code True} if storage was modified.
+     */
+    protected boolean storeValue(
+        @Nullable CacheObject val,
+        long expireTime,
+        GridCacheVersion ver,
+        @Nullable CacheDataRow oldRow,
+        @Nullable IgnitePredicate<CacheDataRow> predicate) throws IgniteCheckedException {
+        assert lock.isHeldByCurrentThread();
+
+        UpdateClosure closure = new UpdateClosure(this, val, ver, expireTime, predicate);
+
+        cctx.offheap().invoke(cctx, key,  localPartition(), closure);
+
+        return closure.treeOp != IgniteTree.OperationType.NOOP;
     }
 
     /**
@@ -4295,7 +4375,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         private final GridCacheMapEntry entry;
 
         /** */
-        private final CacheObject val;
+        @Nullable private final CacheObject val;
 
         /** */
         private final GridCacheVersion ver;
@@ -4304,6 +4384,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         private final long expireTime;
 
         /** */
+        @Nullable private final IgnitePredicate<CacheDataRow> predicate;
+
+        /** */
         private CacheDataRow newRow;
 
         /** */
@@ -4317,31 +4400,44 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
          * @param val New value.
          * @param ver New version.
          * @param expireTime New expire time.
+         * @param predicate Optional predicate.
          */
-        UpdateClosure(GridCacheMapEntry entry, CacheObject val, GridCacheVersion ver, long expireTime) {
+        UpdateClosure(GridCacheMapEntry entry, @Nullable CacheObject val, GridCacheVersion ver, long expireTime,
+            @Nullable IgnitePredicate<CacheDataRow> predicate) {
             this.entry = entry;
             this.val = val;
             this.ver = ver;
             this.expireTime = expireTime;
+            this.predicate = predicate;
         }
 
         /** {@inheritDoc} */
         @Override public void call(@Nullable CacheDataRow oldRow) throws IgniteCheckedException {
             this.oldRow = oldRow;
 
+            if (predicate != null && !predicate.apply(oldRow)) {
+                treeOp = IgniteTree.OperationType.NOOP;
+
+                return;
+            }
+
             if (oldRow != null)
                 oldRow.key(entry.key);
 
-            newRow = entry.cctx.offheap().dataStore(entry.localPartition()).createRow(
-                entry.cctx,
-                entry.key,
-                val,
-                ver,
-                expireTime,
-                oldRow);
+            if (val != null) {
+                newRow = entry.cctx.offheap().dataStore(entry.localPartition()).createRow(
+                    entry.cctx,
+                    entry.key,
+                    val,
+                    ver,
+                    expireTime,
+                    oldRow);
 
-            treeOp = oldRow != null && oldRow.link() == newRow.link() ?
-                IgniteTree.OperationType.NOOP : IgniteTree.OperationType.PUT;
+                treeOp = oldRow != null && oldRow.link() == newRow.link() ?
+                    IgniteTree.OperationType.NOOP : IgniteTree.OperationType.PUT;
+            }
+            else
+                treeOp = oldRow != null ? IgniteTree.OperationType.REMOVE : IgniteTree.OperationType.NOOP;
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/57aecd7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
index 3536908..d02015b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
@@ -65,10 +65,11 @@ public class GridDhtDetachedCacheEntry extends GridDistributedCacheEntry {
     }
 
     /** {@inheritDoc} */
-    @Override protected void storeValue(CacheObject val,
+    @Override protected boolean storeValue(CacheObject val,
         long expireTime,
         GridCacheVersion ver,
         CacheDataRow oldRow) throws IgniteCheckedException {
+        return false;
         // No-op for detached entries, index is updated on primary nodes.
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/57aecd7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
index 322e63c..fb41f5c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
@@ -458,7 +458,8 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
     }
 
     /** {@inheritDoc} */
-    @Override protected void storeValue(CacheObject val, long expireTime, GridCacheVersion ver, CacheDataRow oldRow) {
+    @Override protected boolean storeValue(CacheObject val, long expireTime, GridCacheVersion ver, CacheDataRow oldRow) {
+        return false;
         // No-op: queries are disabled for near cache.
     }
 


[5/5] ignite git commit: Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/ignite into ignite-6083

Posted by ag...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/ignite into ignite-6083


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6e92fffc
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6e92fffc
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6e92fffc

Branch: refs/heads/ignite-6083
Commit: 6e92fffcada7645d364233ae6948f1bddc378cce
Parents: cbf65cb 8053fc1
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Fri Apr 6 15:37:01 2018 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Fri Apr 6 15:37:01 2018 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/internal/IgnitionEx.java  |   2 +-
 .../processors/cache/GridCacheMapEntry.java     | 158 ++++++++++---
 .../colocated/GridDhtDetachedCacheEntry.java    |   3 +-
 .../distributed/near/GridNearCacheEntry.java    |   3 +-
 .../GridCacheDatabaseSharedManager.java         | 145 ++++++++----
 .../cache/persistence/file/AsyncFileIO.java     |   9 +-
 .../cache/persistence/file/FileIO.java          |  20 +-
 .../cache/persistence/file/FileIODecorator.java |   9 +-
 .../persistence/file/RandomAccessFileIO.java    |  13 +-
 .../cache/persistence/file/UnzipFileIO.java     |   7 +-
 ...eAbstractDataStructuresFailoverSelfTest.java |  56 ++++-
 .../file/IgnitePdsDiskErrorsRecoveringTest.java | 231 +++++++++++++++----
 .../db/wal/IgniteWalFlushFailoverTest.java      |   4 +-
 ...lFlushMultiNodeFailoverAbstractSelfTest.java |   4 +-
 .../persistence/db/wal/WalCompactionTest.java   |  10 +-
 .../pagemem/PagesWriteThrottleSmokeTest.java    |   4 +-
 .../file/AlignedBuffersDirectFileIO.java        |   7 +-
 17 files changed, 539 insertions(+), 146 deletions(-)
----------------------------------------------------------------------