You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by dg...@apache.org on 2018/11/09 12:07:49 UTC

[2/4] ignite git commit: IGNITE-9909 Merge FsyncWalManager and WalManager - Fixes #5013.

http://git-wip-us.apache.org/repos/asf/ignite/blob/889ce79b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/AbstractFileHandle.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/AbstractFileHandle.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/AbstractFileHandle.java
new file mode 100644
index 0000000..127d737
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/AbstractFileHandle.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal.filehandle;
+
+import org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentIO;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ *
+ */
+public abstract class AbstractFileHandle {
+    /** I/O interface for read/write operations with file. */
+    protected SegmentIO fileIO;
+
+    /** Segment idx corresponded to fileIo. */
+    private final long segmentIdx;
+
+    /**
+     * @param fileIO I/O interface for read/write operations of AbstractFileHandle.
+     */
+    public AbstractFileHandle(@NotNull SegmentIO fileIO) {
+        this.fileIO = fileIO;
+        segmentIdx = fileIO.getSegmentId();
+    }
+
+    /**
+     * @return Absolute WAL segment file index (incremental counter).
+     */
+    public long getSegmentId(){
+        return segmentIdx;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/889ce79b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManager.java
new file mode 100644
index 0000000..6597e46
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManager.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal.filehandle;
+
+import java.io.IOException;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.wal.WALPointer;
+import org.apache.ignite.internal.processors.cache.persistence.StorageException;
+import org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentIO;
+import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer;
+
+/**
+ * Manager of {@link FileWriteHandle}.
+ */
+public interface FileHandleManager {
+    /**
+     * Initialize {@link FileWriteHandle} for first time.
+     *
+     * @param fileIO FileIO.
+     * @param position Init position.
+     * @param serializer Serializer for file handle.
+     * @return Created file handle.
+     * @throws IOException if creation was not success.
+     */
+    FileWriteHandle initHandle(SegmentIO fileIO, long position, RecordSerializer serializer) throws IOException;
+
+    /**
+     * Create next file handle.
+     *
+     * @param fileIO FileIO.
+     * @param serializer Serializer for file handle.
+     * @return Created file handle.
+     * @throws IOException if creation was not success.
+     */
+    FileWriteHandle nextHandle(SegmentIO fileIO, RecordSerializer serializer) throws IOException;
+
+    /**
+     * Start manager.
+     */
+    void start();
+
+    /**
+     * On activate.
+     */
+    void onActivate();
+
+    /**
+     * On deactivate.
+     *
+     * @throws IgniteCheckedException if fail.
+     */
+    void onDeactivate() throws IgniteCheckedException;
+
+    /**
+     * Resume logging.
+     */
+    void resumeLogging();
+
+    /**
+     * @param ptr Pointer until need to flush.
+     * @param explicitFsync {@code true} if fsync required.
+     * @throws IgniteCheckedException if fail.
+     * @throws StorageException if storage was fail.
+     */
+    void flush(WALPointer ptr, boolean explicitFsync) throws IgniteCheckedException, StorageException;
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/889ce79b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerFactory.java
new file mode 100644
index 0000000..b0c456e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerFactory.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal.filehandle;
+
+import java.util.function.Supplier;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.internal.pagemem.wal.WALPointer;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl;
+import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer;
+
+/**
+ * Factory of {@link FileHandleManager}.
+ */
+public class FileHandleManagerFactory {
+    /**   */
+    private final boolean walFsyncWithDedicatedWorker =
+        IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_WAL_FSYNC_WITH_DEDICATED_WORKER, false);
+
+    /** Data storage configuration. */
+    private final DataStorageConfiguration dsConf;
+
+    /**
+     * @param conf Data storage configuration.
+     */
+    public FileHandleManagerFactory(DataStorageConfiguration conf) {
+        dsConf = conf;
+    }
+
+    /**
+     * @param cctx Cache context.
+     * @param metrics Data storage metrics.
+     * @param mmap Using mmap.
+     * @param lastWALPtr Last WAL pointer.
+     * @param serializer Serializer.
+     * @param currHandleSupplier Supplier of current handle.
+     * @return One of implementation of {@link FileHandleManager}.
+     */
+    public FileHandleManager build(
+        GridCacheSharedContext cctx,
+        DataStorageMetricsImpl metrics,
+        boolean mmap,
+        Supplier<WALPointer> lastWALPtr,
+        RecordSerializer serializer,
+        Supplier<FileWriteHandle> currHandleSupplier
+    ) {
+        if (dsConf.getWalMode() == WALMode.FSYNC && !walFsyncWithDedicatedWorker)
+            return new FsyncFileHandleManagerImpl(
+                cctx,
+                metrics,
+                lastWALPtr,
+                serializer,
+                currHandleSupplier,
+                dsConf.getWalMode(),
+                dsConf.getWalSegmentSize(),
+                dsConf.getWalFsyncDelayNanos(),
+                dsConf.getWalThreadLocalBufferSize()
+            );
+        else
+            return new FileHandleManagerImpl(
+                cctx,
+                metrics,
+                mmap,
+                lastWALPtr,
+                serializer,
+                currHandleSupplier,
+                dsConf.getWalMode(),
+                dsConf.getWalBufferSize(),
+                dsConf.getWalSegmentSize(),
+                dsConf.getWalFsyncDelayNanos()
+            );
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/889ce79b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerImpl.java
new file mode 100644
index 0000000..1daac31
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerImpl.java
@@ -0,0 +1,603 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal.filehandle;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.ClosedByInterruptException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.LockSupport;
+import java.util.function.Supplier;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.failure.FailureContext;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.pagemem.wal.WALPointer;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl;
+import org.apache.ignite.internal.processors.cache.persistence.StorageException;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
+import org.apache.ignite.internal.processors.cache.persistence.wal.SegmentedRingByteBuffer;
+import org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentIO;
+import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.util.worker.GridWorker;
+import org.apache.ignite.thread.IgniteThread;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_SEGMENT_SYNC_TIMEOUT;
+import static org.apache.ignite.configuration.WALMode.LOG_ONLY;
+import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR;
+import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION;
+import static org.apache.ignite.internal.processors.cache.persistence.wal.SegmentedRingByteBuffer.BufferMode.DIRECT;
+import static org.apache.ignite.internal.util.IgniteUtils.sleep;
+
+/**
+ * Manager for {@link FileWriteHandleImpl}.
+ */
+public class FileHandleManagerImpl implements FileHandleManager {
+    /** Default wal segment sync timeout. */
+    private static final long DFLT_WAL_SEGMENT_SYNC_TIMEOUT = 500L;
+
+    /** WAL writer worker. */
+    private WALWriter walWriter;
+    /** Wal segment sync worker. */
+    private WalSegmentSyncer walSegmentSyncWorker;
+    /** Context. */
+    protected final GridCacheSharedContext cctx;
+    /** Logger. */
+    private final IgniteLogger log;
+    /** */
+    private final WALMode mode;
+    /** Persistence metrics tracker. */
+    private final DataStorageMetricsImpl metrics;
+    /** Use mapped byte buffer. */
+    private final boolean mmap;
+    /** Last WAL pointer. */
+    private final Supplier<WALPointer> lastWALPtr;
+    /** */
+    private final RecordSerializer serializer;
+    /** Current handle supplier. */
+    private final Supplier<FileWriteHandle> currentHandleSupplier;
+    /** WAL buffer size. */
+    private final int walBufferSize;
+    /** WAL segment size in bytes. . This is maximum value, actual segments may be shorter. */
+    private final long maxWalSegmentSize;
+    /** Fsync delay. */
+    private final long fsyncDelay;
+
+    /**
+     * @param cctx Context.
+     * @param metrics Data storage metrics.
+     * @param mmap Mmap.
+     * @param lastWALPtr Last WAL pointer.
+     * @param serializer Serializer.
+     * @param currentHandleSupplier Current handle supplier.
+     * @param mode WAL mode.
+     * @param walBufferSize WAL buffer size.
+     * @param maxWalSegmentSize Max WAL segment size.
+     * @param fsyncDelay Fsync delay.
+     */
+    public FileHandleManagerImpl(
+        GridCacheSharedContext cctx,
+        DataStorageMetricsImpl metrics,
+        boolean mmap,
+        Supplier<WALPointer> lastWALPtr,
+        RecordSerializer serializer,
+        Supplier<FileWriteHandle> currentHandleSupplier,
+        WALMode mode,
+        int walBufferSize,
+        long maxWalSegmentSize,
+        long fsyncDelay) {
+        this.cctx = cctx;
+        this.log = cctx.logger(FileHandleManagerImpl.class);
+        this.mode = mode;
+        this.metrics = metrics;
+        this.mmap = mmap;
+        this.lastWALPtr = lastWALPtr;
+        this.serializer = serializer;
+        this.currentHandleSupplier = currentHandleSupplier;
+        this.walBufferSize = walBufferSize;
+        this.maxWalSegmentSize = maxWalSegmentSize;
+        this.fsyncDelay = fsyncDelay;
+    }
+
+    /** {@inheritDoc} */
+    @Override public FileWriteHandle initHandle(
+        SegmentIO fileIO,
+        long position,
+        RecordSerializer serializer
+    ) throws IOException {
+        SegmentedRingByteBuffer rbuf;
+
+        if (mmap) {
+            MappedByteBuffer buf = fileIO.map((int)maxWalSegmentSize);
+
+            rbuf = new SegmentedRingByteBuffer(buf, metrics);
+        }
+        else
+            rbuf = new SegmentedRingByteBuffer(walBufferSize, maxWalSegmentSize, DIRECT, metrics);
+
+        rbuf.init(position);
+
+        return new FileWriteHandleImpl(
+            cctx, fileIO, rbuf, serializer, metrics, walWriter, position,
+            mode, mmap, true, fsyncDelay, maxWalSegmentSize
+        );
+    }
+
+    /** {@inheritDoc} */
+    @Override public FileWriteHandle nextHandle(SegmentIO fileIO, RecordSerializer serializer) throws IOException {
+        SegmentedRingByteBuffer rbuf;
+
+        if (mmap) {
+            MappedByteBuffer buf = fileIO.map((int)maxWalSegmentSize);
+
+            rbuf = new SegmentedRingByteBuffer(buf, metrics);
+        }
+        else
+            rbuf = currentHandle().buf.reset();
+
+        try {
+            return new FileWriteHandleImpl(
+                cctx, fileIO, rbuf, serializer, metrics, walWriter, 0,
+                mode, mmap, false, fsyncDelay, maxWalSegmentSize
+            );
+        }
+        catch (ClosedByInterruptException e) {
+            if (rbuf != null)
+                rbuf.free();
+        }
+
+        return null;
+    }
+
+    /**
+     * @return Current handle.
+     */
+    private FileWriteHandleImpl currentHandle() {
+        return (FileWriteHandleImpl)currentHandleSupplier.get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start() {
+        if (mode != WALMode.NONE && mode != WALMode.FSYNC) {
+            walSegmentSyncWorker = new WalSegmentSyncer(cctx.igniteInstanceName(),
+                cctx.kernalContext().log(WalSegmentSyncer.class));
+
+            if (log.isInfoEnabled())
+                log.info("Started write-ahead log manager [mode=" + mode + ']');
+        }
+        else
+            U.quietAndWarn(log, "Started write-ahead log manager in NONE mode, persisted data may be lost in " +
+                "a case of unexpected node failure. Make sure to deactivate the cluster before shutdown.");
+
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onActivate() {
+        if (!cctx.kernalContext().clientNode()) {
+            if (walSegmentSyncWorker != null)
+                new IgniteThread(walSegmentSyncWorker).start();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onDeactivate() throws IgniteCheckedException {
+        FileWriteHandleImpl currHnd = currentHandle();
+
+        if (mode == WALMode.BACKGROUND) {
+            if (currHnd != null)
+                currHnd.flush(null);
+        }
+
+        if (currHnd != null)
+            currHnd.close(false);
+
+        if (walSegmentSyncWorker != null)
+            walSegmentSyncWorker.shutdown();
+
+        if (walWriter != null)
+            walWriter.shutdown();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void resumeLogging() {
+        walWriter = new WALWriter(log);
+
+        if (!mmap)
+            new IgniteThread(walWriter).start();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void flush(WALPointer ptr, boolean explicitFsync) throws IgniteCheckedException, StorageException {
+        if (serializer == null || mode == WALMode.NONE)
+            return;
+
+        FileWriteHandleImpl cur = currentHandle();
+
+        // WAL manager was not started (client node).
+        if (cur == null)
+            return;
+
+        FileWALPointer filePtr = (FileWALPointer)(ptr == null ? lastWALPtr.get() : ptr);
+
+        if (mode == LOG_ONLY)
+            cur.flushOrWait(filePtr);
+
+        if (!explicitFsync && mode != WALMode.FSYNC)
+            return; // No need to sync in LOG_ONLY or BACKGROUND unless explicit fsync is required.
+
+        // No need to sync if was rolled over.
+        if (filePtr != null && !cur.needFsync(filePtr))
+            return;
+
+        cur.fsync(filePtr);
+    }
+
+    /**
+     * @throws StorageException If node is no longer valid and we missed a WAL operation.
+     */
+    private void checkNode() throws StorageException {
+        if (cctx.kernalContext().invalid())
+            throw new StorageException("Failed to perform WAL operation (environment was invalidated by a " +
+                "previous error)");
+    }
+
+    /**
+     * WAL writer worker.
+     */
+    public class WALWriter extends GridWorker {
+        /** Unconditional flush. */
+        private static final long UNCONDITIONAL_FLUSH = -1L;
+
+        /** File close. */
+        private static final long FILE_CLOSE = -2L;
+
+        /** File force. */
+        private static final long FILE_FORCE = -3L;
+
+        /** Err. */
+        private volatile Throwable err;
+
+        //TODO: replace with GC free data structure.
+        /** Parked threads. */
+        final Map<Thread, Long> waiters = new ConcurrentHashMap<>();
+
+        /**
+         * Default constructor.
+         *
+         * @param log Logger.
+         */
+        WALWriter(IgniteLogger log) {
+            super(cctx.igniteInstanceName(), "wal-write-worker%" + cctx.igniteInstanceName(), log,
+                cctx.kernalContext().workersRegistry());
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void body() {
+            Throwable err = null;
+
+            try {
+                while (!isCancelled()) {
+                    onIdle();
+
+                    while (waiters.isEmpty()) {
+                        if (!isCancelled()) {
+                            blockingSectionBegin();
+
+                            try {
+                                LockSupport.park();
+                            }
+                            finally {
+                                blockingSectionEnd();
+                            }
+                        }
+                        else {
+                            unparkWaiters(Long.MAX_VALUE);
+
+                            return;
+                        }
+                    }
+
+                    Long pos = null;
+
+                    for (Long val : waiters.values()) {
+                        if (val > Long.MIN_VALUE)
+                            pos = val;
+                    }
+
+                    updateHeartbeat();
+
+                    if (pos == null)
+                        continue;
+                    else if (pos < UNCONDITIONAL_FLUSH) {
+                        try {
+                            assert pos == FILE_CLOSE || pos == FILE_FORCE : pos;
+
+                            if (pos == FILE_CLOSE)
+                                currentHandle().fileIO.close();
+                            else if (pos == FILE_FORCE)
+                                currentHandle().fileIO.force();
+                        }
+                        catch (IOException e) {
+                            log.error("Exception in WAL writer thread: ", e);
+
+                            err = e;
+
+                            unparkWaiters(Long.MAX_VALUE);
+
+                            return;
+                        }
+
+                        unparkWaiters(pos);
+                    }
+
+                    updateHeartbeat();
+
+                    List<SegmentedRingByteBuffer.ReadSegment> segs = currentHandle().buf.poll(pos);
+
+                    if (segs == null) {
+                        unparkWaiters(pos);
+
+                        continue;
+                    }
+
+                    for (int i = 0; i < segs.size(); i++) {
+                        SegmentedRingByteBuffer.ReadSegment seg = segs.get(i);
+
+                        updateHeartbeat();
+
+                        try {
+                            writeBuffer(seg.position(), seg.buffer());
+                        }
+                        catch (Throwable e) {
+                            log.error("Exception in WAL writer thread:", e);
+
+                            err = e;
+                        }
+                        finally {
+                            seg.release();
+
+                            long p = pos <= UNCONDITIONAL_FLUSH || err != null ? Long.MAX_VALUE : currentHandle().written;
+
+                            unparkWaiters(p);
+                        }
+                    }
+                }
+            }
+            catch (Throwable t) {
+                err = t;
+            }
+            finally {
+                unparkWaiters(Long.MAX_VALUE);
+
+                if (err == null && !isCancelled)
+                    err = new IllegalStateException("Worker " + name() + " is terminated unexpectedly");
+
+                if (err instanceof OutOfMemoryError)
+                    cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, err));
+                else if (err != null)
+                    cctx.kernalContext().failure().process(new FailureContext(SYSTEM_WORKER_TERMINATION, err));
+            }
+        }
+
+        /**
+         * Shutdowns thread.
+         */
+        private void shutdown() throws IgniteInterruptedCheckedException {
+            U.cancel(this);
+
+            LockSupport.unpark(runner());
+
+            U.join(runner());
+        }
+
+        /**
+         * Unparks waiting threads.
+         *
+         * @param pos Pos.
+         */
+        private void unparkWaiters(long pos) {
+            assert pos > Long.MIN_VALUE : pos;
+
+            for (Map.Entry<Thread, Long> e : waiters.entrySet()) {
+                Long val = e.getValue();
+
+                if (val <= pos) {
+                    if (val != Long.MIN_VALUE)
+                        waiters.put(e.getKey(), Long.MIN_VALUE);
+
+                    LockSupport.unpark(e.getKey());
+                }
+            }
+        }
+
+        /**
+         * Forces all made changes to the file.
+         */
+        void force() throws IgniteCheckedException {
+            flushBuffer(FILE_FORCE);
+        }
+
+        /**
+         * Closes file.
+         */
+        void close() throws IgniteCheckedException {
+            flushBuffer(FILE_CLOSE);
+        }
+
+        /**
+         * Flushes all data from the buffer.
+         */
+        void flushAll() throws IgniteCheckedException {
+            flushBuffer(UNCONDITIONAL_FLUSH);
+        }
+
+        /**
+         * @param expPos Expected position.
+         */
+        void flushBuffer(long expPos) throws IgniteCheckedException {
+            if (mmap)
+                return;
+
+            Throwable err = walWriter.err;
+
+            if (err != null)
+                cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, err));
+
+            if (expPos == UNCONDITIONAL_FLUSH)
+                expPos = (currentHandle().buf.tail());
+
+            Thread t = Thread.currentThread();
+
+            waiters.put(t, expPos);
+
+            LockSupport.unpark(walWriter.runner());
+
+            while (true) {
+                Long val = waiters.get(t);
+
+                assert val != null : "Only this thread can remove thread from waiters";
+
+                if (val == Long.MIN_VALUE) {
+                    waiters.remove(t);
+
+                    Throwable walWriterError = walWriter.err;
+
+                    if (walWriterError != null)
+                        throw new IgniteCheckedException("Flush buffer failed.", walWriterError);
+
+                    return;
+                }
+                else
+                    LockSupport.park();
+            }
+        }
+
+        /**
+         * @param pos Position in file to start write from. May be checked against actual position to wait previous
+         * writes to complete.
+         * @param buf Buffer to write to file.
+         * @throws StorageException If failed.
+         * @throws IgniteCheckedException If failed.
+         */
+        private void writeBuffer(long pos, ByteBuffer buf) throws StorageException, IgniteCheckedException {
+            FileWriteHandleImpl hdl = currentHandle();
+
+            assert hdl.fileIO != null : "Writing to a closed segment.";
+
+            checkNode();
+
+            long lastLogged = U.currentTimeMillis();
+
+            long logBackoff = 2_000;
+
+            // If we were too fast, need to wait previous writes to complete.
+            while (hdl.written != pos) {
+                assert hdl.written < pos : "written = " + hdl.written + ", pos = " + pos; // No one can write further than we are now.
+
+                // Permutation occurred between blocks write operations.
+                // Order of acquiring lock is not the same as order of write.
+                long now = U.currentTimeMillis();
+
+                if (now - lastLogged >= logBackoff) {
+                    if (logBackoff < 60 * 60_000)
+                        logBackoff *= 2;
+
+                    U.warn(log, "Still waiting for a concurrent write to complete [written=" + hdl.written +
+                        ", pos=" + pos + ", lastFsyncPos=" + hdl.lastFsyncPos + ", stop=" + hdl.stop.get() +
+                        ", actualPos=" + hdl.safePosition() + ']');
+
+                    lastLogged = now;
+                }
+
+                checkNode();
+            }
+
+            // Do the write.
+            int size = buf.remaining();
+
+            assert size > 0 : size;
+
+            try {
+                assert hdl.written == hdl.fileIO.position();
+
+                hdl.written += hdl.fileIO.writeFully(buf);
+
+                metrics.onWalBytesWritten(size);
+
+                assert hdl.written == hdl.fileIO.position();
+            }
+            catch (IOException e) {
+                StorageException se = new StorageException("Failed to write buffer.", e);
+
+                cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, se));
+
+                throw se;
+            }
+        }
+    }
+
+    /**
+     * Syncs WAL segment file.
+     */
+    public class WalSegmentSyncer extends GridWorker {
+        /** Sync timeout. */
+        long syncTimeout;
+
+        /**
+         * @param igniteInstanceName Ignite instance name.
+         * @param log Logger.
+         */
+        private WalSegmentSyncer(String igniteInstanceName, IgniteLogger log) {
+            super(igniteInstanceName, "wal-segment-syncer", log);
+
+            syncTimeout = Math.max(IgniteSystemProperties.getLong(IGNITE_WAL_SEGMENT_SYNC_TIMEOUT,
+                DFLT_WAL_SEGMENT_SYNC_TIMEOUT), 100L);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
+            while (!isCancelled()) {
+                sleep(syncTimeout);
+
+                try {
+                    flush(null, true);
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Exception when flushing WAL.", e);
+                }
+            }
+        }
+
+        /** Shutted down the worker. */
+        private void shutdown() {
+            synchronized (this) {
+                U.cancel(this);
+            }
+
+            U.join(this, log);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/889ce79b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandle.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandle.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandle.java
new file mode 100644
index 0000000..410cd56
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandle.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal.filehandle;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.wal.WALPointer;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+import org.apache.ignite.internal.processors.cache.persistence.StorageException;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * File write handle.
+ */
+public interface FileWriteHandle {
+
+    /**
+     * @return Version of serializer.
+     */
+    int serializerVersion();
+
+    /**
+     * Do action after finish resume logging.
+     */
+    void finishResumeLogging();
+
+    /**
+     * Write header to segment.
+     *
+     * @throws IgniteCheckedException if fail.
+     */
+    void writeHeader() throws IgniteCheckedException;
+
+    /**
+     * @param rec Record to be added.
+     * @return Pointer or null if roll over to next segment is required or already started by other thread.
+     * @throws StorageException if storage was failed.
+     * @throws IgniteCheckedException if fail.
+     */
+    @Nullable WALPointer addRecord(WALRecord rec) throws StorageException, IgniteCheckedException;
+
+    /**
+     * Flush all records.
+     *
+     * @throws IgniteCheckedException if fail.
+     */
+    void flushAll() throws IgniteCheckedException;
+
+    /**
+     * @param ptr Pointer.
+     * @return {@code true} if fsync needed.
+     */
+    boolean needFsync(FileWALPointer ptr);
+
+    /**
+     * @return Pointer to the end of the last written record (probably not fsync-ed).
+     */
+    FileWALPointer position();
+
+    /**
+     * Do fsync.
+     *
+     * @param ptr Pointer to which fsync required.
+     * @throws StorageException if storage fail.
+     * @throws IgniteCheckedException if fail.
+     */
+    void fsync(FileWALPointer ptr) throws StorageException, IgniteCheckedException;
+
+    /**
+     * Close buffer.
+     */
+    void closeBuffer();
+
+    /**
+     * Close segment.
+     *
+     * @param rollOver Close for rollover.
+     * @return {@code true} if close was success.
+     * @throws IgniteCheckedException if fail.
+     * @throws StorageException if storage was fail.
+     */
+    boolean close(boolean rollOver) throws IgniteCheckedException, StorageException;
+
+    /**
+     * Signals next segment available to wake up other worker threads waiting for WAL to write.
+     */
+    void signalNextAvailable();
+
+    /**
+     * Awaiting when next segment would be initialized.
+     */
+    void awaitNext();
+
+    /**
+     * @return Absolute WAL segment file index (incremental counter).
+     */
+    long getSegmentId();
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/889ce79b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandleImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandleImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandleImpl.java
new file mode 100644
index 0000000..f582dbd
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandleImpl.java
@@ -0,0 +1,601 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal.filehandle;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.internal.pagemem.wal.WALPointer;
+import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord;
+import org.apache.ignite.internal.pagemem.wal.record.SwitchSegmentRecord;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl;
+import org.apache.ignite.internal.processors.cache.persistence.StorageException;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
+import org.apache.ignite.internal.processors.cache.persistence.wal.SegmentedRingByteBuffer;
+import org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentIO;
+import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer;
+import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactoryImpl;
+import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_SERIALIZER_VERSION;
+import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.SWITCH_SEGMENT_RECORD;
+import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.prepareSerializerVersionBuffer;
+import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactory.LATEST_SERIALIZER_VERSION;
+import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.HEADER_RECORD_SIZE;
+import static org.apache.ignite.internal.util.IgniteUtils.findField;
+import static org.apache.ignite.internal.util.IgniteUtils.findNonPublicMethod;
+
+/**
+ * File handle for one log segment.
+ */
+@SuppressWarnings("SignalWithoutCorrespondingAwait")
+class FileWriteHandleImpl extends AbstractFileHandle implements FileWriteHandle {
+    /** {@link MappedByteBuffer#force0(java.io.FileDescriptor, long, long)}. */
+    private static final Method force0 = findNonPublicMethod(
+        MappedByteBuffer.class, "force0",
+        java.io.FileDescriptor.class, long.class, long.class
+    );
+    /** {@link FileWriteHandleImpl#written} atomic field updater. */
+    private static final AtomicLongFieldUpdater<FileWriteHandleImpl> WRITTEN_UPD =
+        AtomicLongFieldUpdater.newUpdater(FileWriteHandleImpl.class, "written");
+
+    /** {@link MappedByteBuffer#mappingOffset()}. */
+    private static final Method mappingOffset = findNonPublicMethod(MappedByteBuffer.class, "mappingOffset");
+
+    /** {@link MappedByteBuffer#mappingAddress(long)}. */
+    private static final Method mappingAddress = findNonPublicMethod(
+        MappedByteBuffer.class, "mappingAddress", long.class
+    );
+
+    /** {@link MappedByteBuffer#fd} */
+    private static final Field fd = findField(MappedByteBuffer.class, "fd");
+
+    /** Page size. */
+    private static final int PAGE_SIZE = GridUnsafe.pageSize();
+
+    /** Serializer latest version to use. */
+    private final int serializerVer =
+        IgniteSystemProperties.getInteger(IGNITE_WAL_SERIALIZER_VERSION, LATEST_SERIALIZER_VERSION);
+
+    /** Use mapped byte buffer. */
+    private final boolean mmap;
+
+    /** Created on resume logging. */
+    private volatile boolean resume;
+
+    /**
+     * Position in current file after the end of last written record (incremented after file channel write operation)
+     */
+    volatile long written;
+
+    /** */
+    protected volatile long lastFsyncPos;
+
+    /** Stop guard to provide warranty that only one thread will be successful in calling {@link #close(boolean)}. */
+    protected final AtomicBoolean stop = new AtomicBoolean(false);
+
+    /** */
+    private final Lock lock = new ReentrantLock();
+
+    /** Condition for timed wait of several threads, see {@link DataStorageConfiguration#getWalFsyncDelayNanos()}. */
+    private final Condition fsync = lock.newCondition();
+
+    /**
+     * Next segment available condition. Protection from "spurious wakeup" is provided by predicate {@link
+     * #fileIO}=<code>null</code>.
+     */
+    private final Condition nextSegment = lock.newCondition();
+
+    /** Buffer. */
+    protected final SegmentedRingByteBuffer buf;
+
+    /** */
+    private final WALMode mode;
+
+    /** Fsync delay. */
+    private final long fsyncDelay;
+
+    /** Persistence metrics tracker. */
+    private final DataStorageMetricsImpl metrics;
+
+    /** WAL segment size in bytes. This is maximum value, actual segments may be shorter. */
+    private final long maxWalSegmentSize;
+
+    /** Logger. */
+    protected final IgniteLogger log;
+
+    /** */
+    private final RecordSerializer serializer;
+
+    /** Context. */
+    protected final GridCacheSharedContext cctx;
+
+    /** WAL writer worker. */
+    private final FileHandleManagerImpl.WALWriter walWriter;
+
+    /**
+     * @param cctx Context.
+     * @param fileIO I/O file interface to use
+     * @param serializer Serializer.
+     * @param metrics Data storage metrics.
+     * @param writer WAL writer.
+     * @param pos Initial position.
+     * @param mode WAL mode.
+     * @param mmap Mmap.
+     * @param resume Created on resume logging flag.
+     * @param fsyncDelay Fsync delay.
+     * @param maxWalSegmentSize Max WAL segment size.
+     * @throws IOException If failed.
+     */
+    FileWriteHandleImpl(
+        GridCacheSharedContext cctx, SegmentIO fileIO, SegmentedRingByteBuffer rbuf, RecordSerializer serializer,
+        DataStorageMetricsImpl metrics, FileHandleManagerImpl.WALWriter writer, long pos, WALMode mode, boolean mmap,
+        boolean resume, long fsyncDelay, long maxWalSegmentSize) throws IOException {
+        super(fileIO);
+        assert serializer != null;
+
+        this.mmap = mmap;
+        this.mode = mode;
+        this.fsyncDelay = fsyncDelay;
+        this.metrics = metrics;
+        this.maxWalSegmentSize = maxWalSegmentSize;
+        this.log = cctx.logger(FileWriteHandleImpl.class);
+        this.cctx = cctx;
+        this.walWriter = writer;
+        this.serializer = serializer;
+        this.written = pos;
+        this.lastFsyncPos = pos;
+        this.resume = resume;
+        this.buf = rbuf;
+
+        if (!mmap)
+            fileIO.position(pos);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int serializerVersion() {
+        return serializer.version();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void finishResumeLogging() {
+        resume = false;
+    }
+
+    /**
+     * @throws StorageException If node is no longer valid and we missed a WAL operation.
+     */
+    private void checkNode() throws StorageException {
+        if (cctx.kernalContext().invalid())
+            throw new StorageException("Failed to perform WAL operation (environment was invalidated by a " +
+                "previous error)");
+    }
+
+    /**
+     * Write serializer version to current handle.
+     */
+    @Override public void writeHeader() {
+        SegmentedRingByteBuffer.WriteSegment seg = buf.offer(HEADER_RECORD_SIZE);
+
+        assert seg != null && seg.position() > 0;
+
+        prepareSerializerVersionBuffer(getSegmentId(), serializerVer, false, seg.buffer());
+
+        seg.release();
+    }
+
+    /**
+     * @param rec Record to be added to write queue.
+     * @return Pointer or null if roll over to next segment is required or already started by other thread.
+     * @throws StorageException If failed.
+     * @throws IgniteCheckedException If failed.
+     */
+    @Override @Nullable public WALPointer addRecord(WALRecord rec) throws StorageException, IgniteCheckedException {
+        assert rec.size() > 0 : rec;
+
+        for (; ; ) {
+            checkNode();
+
+            SegmentedRingByteBuffer.WriteSegment seg;
+
+            // Buffer can be in open state in case of resuming with different serializer version.
+            if (rec.type() == SWITCH_SEGMENT_RECORD && !resume)
+                seg = buf.offerSafe(rec.size());
+            else
+                seg = buf.offer(rec.size());
+
+            FileWALPointer ptr = null;
+
+            if (seg != null) {
+                try {
+                    int pos = (int)(seg.position() - rec.size());
+
+                    ByteBuffer buf = seg.buffer();
+
+                    if (buf == null)
+                        return null; // Can not write to this segment, need to switch to the next one.
+
+                    ptr = new FileWALPointer(getSegmentId(), pos, rec.size());
+
+                    rec.position(ptr);
+
+                    fillBuffer(buf, rec);
+
+                    if (mmap) {
+                        // written field must grow only, but segment with greater position can be serialized
+                        // earlier than segment with smaller position.
+                        while (true) {
+                            long written0 = written;
+
+                            if (seg.position() > written0) {
+                                if (WRITTEN_UPD.compareAndSet(this, written0, seg.position()))
+                                    break;
+                            }
+                            else
+                                break;
+                        }
+                    }
+
+                    return ptr;
+                }
+                finally {
+                    seg.release();
+
+                    if (mode == WALMode.BACKGROUND && rec instanceof CheckpointRecord)
+                        flushOrWait(ptr);
+                }
+            }
+            else
+                walWriter.flushAll();
+        }
+    }
+
+    /**
+     * Flush or wait for concurrent flush completion.
+     *
+     * @param ptr Pointer.
+     */
+    public void flushOrWait(FileWALPointer ptr) throws IgniteCheckedException {
+        if (ptr != null) {
+            // If requested obsolete file index, it must be already flushed by close.
+            if (ptr.index() != getSegmentId())
+                return;
+        }
+
+        flush(ptr);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void flushAll() throws IgniteCheckedException {
+        flush(null);
+    }
+
+    /**
+     * @param ptr Pointer.
+     */
+    public void flush(FileWALPointer ptr) throws IgniteCheckedException {
+        if (ptr == null) { // Unconditional flush.
+            walWriter.flushAll();
+
+            return;
+        }
+
+        assert ptr.index() == getSegmentId();
+
+        walWriter.flushBuffer(ptr.fileOffset());
+    }
+
+    /**
+     * @param buf Buffer.
+     * @param rec WAL record.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void fillBuffer(ByteBuffer buf, WALRecord rec) throws IgniteCheckedException {
+        try {
+            serializer.writeRecord(rec, buf);
+        }
+        catch (RuntimeException e) {
+            throw new IllegalStateException("Failed to write record: " + rec, e);
+        }
+    }
+
+    /**
+     * Non-blocking check if this pointer needs to be sync'ed.
+     *
+     * @param ptr WAL pointer to check.
+     * @return {@code False} if this pointer has been already sync'ed.
+     */
+    @Override public boolean needFsync(FileWALPointer ptr) {
+        // If index has changed, it means that the log was rolled over and already sync'ed.
+        // If requested position is smaller than last sync'ed, it also means all is good.
+        // If position is equal, then our record is the last not synced.
+        return getSegmentId() == ptr.index() && lastFsyncPos <= ptr.fileOffset();
+    }
+
+    /**
+     * @return Pointer to the end of the last written record (probably not fsync-ed).
+     */
+    @Override public FileWALPointer position() {
+        lock.lock();
+
+        try {
+            return new FileWALPointer(getSegmentId(), (int)written, 0);
+        }
+        finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * @param ptr Pointer to sync.
+     * @throws StorageException If failed.
+     */
+    @Override public void fsync(FileWALPointer ptr) throws StorageException, IgniteCheckedException {
+        lock.lock();
+
+        try {
+            if (ptr != null) {
+                if (!needFsync(ptr))
+                    return;
+
+                if (fsyncDelay > 0 && !stop.get()) {
+                    // Delay fsync to collect as many updates as possible: trade latency for throughput.
+                    U.await(fsync, fsyncDelay, TimeUnit.NANOSECONDS);
+
+                    if (!needFsync(ptr))
+                        return;
+                }
+            }
+
+            flushOrWait(ptr);
+
+            if (stop.get())
+                return;
+
+            long lastFsyncPos0 = lastFsyncPos;
+            long written0 = written;
+
+            if (lastFsyncPos0 != written0) {
+                // Fsync position must be behind.
+                assert lastFsyncPos0 < written0 : "lastFsyncPos=" + lastFsyncPos0 + ", written=" + written0;
+
+                boolean metricsEnabled = metrics.metricsEnabled();
+
+                long start = metricsEnabled ? System.nanoTime() : 0;
+
+                if (mmap) {
+                    long pos = ptr == null ? -1 : ptr.fileOffset();
+
+                    List<SegmentedRingByteBuffer.ReadSegment> segs = buf.poll(pos);
+
+                    if (segs != null) {
+                        assert segs.size() == 1;
+
+                        SegmentedRingByteBuffer.ReadSegment seg = segs.get(0);
+
+                        int off = seg.buffer().position();
+                        int len = seg.buffer().limit() - off;
+
+                        fsync((MappedByteBuffer)buf.buf, off, len);
+
+                        seg.release();
+                    }
+                }
+                else
+                    walWriter.force();
+
+                lastFsyncPos = written;
+
+                if (fsyncDelay > 0)
+                    fsync.signalAll();
+
+                long end = metricsEnabled ? System.nanoTime() : 0;
+
+                if (metricsEnabled)
+                    metrics.onFsync(end - start);
+            }
+        }
+        finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * @param buf Mapped byte buffer.
+     * @param off Offset.
+     * @param len Length.
+     */
+    private void fsync(MappedByteBuffer buf, int off, int len) throws IgniteCheckedException {
+        try {
+            long mappedOff = (Long)mappingOffset.invoke(buf);
+
+            assert mappedOff == 0 : mappedOff;
+
+            long addr = (Long)mappingAddress.invoke(buf, mappedOff);
+
+            long delta = (addr + off) % PAGE_SIZE;
+
+            long alignedAddr = (addr + off) - delta;
+
+            force0.invoke(buf, fd.get(buf), alignedAddr, len + delta);
+        }
+        catch (IllegalAccessException | InvocationTargetException e) {
+            throw new IgniteCheckedException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void closeBuffer() {
+        buf.close();
+    }
+
+    /**
+     * @return {@code true} If this thread actually closed the segment.
+     * @throws IgniteCheckedException If failed.
+     * @throws StorageException If failed.
+     */
+    @Override public boolean close(boolean rollOver) throws IgniteCheckedException, StorageException {
+        if (stop.compareAndSet(false, true)) {
+            lock.lock();
+
+            try {
+                flushOrWait(null);
+
+                try {
+                    RecordSerializer backwardSerializer = new RecordSerializerFactoryImpl(cctx)
+                        .createSerializer(serializerVer);
+
+                    SwitchSegmentRecord segmentRecord = new SwitchSegmentRecord();
+
+                    int switchSegmentRecSize = backwardSerializer.size(segmentRecord);
+
+                    if (rollOver && written < (maxWalSegmentSize - switchSegmentRecSize)) {
+                        segmentRecord.size(switchSegmentRecSize);
+
+                        WALPointer segRecPtr = addRecord(segmentRecord);
+
+                        if (segRecPtr != null)
+                            fsync((FileWALPointer)segRecPtr);
+                    }
+
+                    if (mmap) {
+                        List<SegmentedRingByteBuffer.ReadSegment> segs = buf.poll(maxWalSegmentSize);
+
+                        if (segs != null) {
+                            assert segs.size() == 1;
+
+                            segs.get(0).release();
+                        }
+                    }
+
+                    // Do the final fsync.
+                    if (mode != WALMode.NONE) {
+                        if (mmap)
+                            ((MappedByteBuffer)buf.buf).force();
+                        else
+                            fileIO.force();
+
+                        lastFsyncPos = written;
+                    }
+
+                    if (mmap) {
+                        try {
+                            fileIO.close();
+                        }
+                        catch (IOException ignore) {
+                            // No-op.
+                        }
+                    }
+                    else {
+                        walWriter.close();
+
+                        if (!rollOver)
+                            buf.free();
+                    }
+                }
+                catch (IOException e) {
+                    throw new StorageException("Failed to close WAL write handle [idx=" + getSegmentId() + "]", e);
+                }
+
+                if (log.isDebugEnabled())
+                    log.debug("Closed WAL write handle [idx=" + getSegmentId() + "]");
+
+                return true;
+            }
+            finally {
+                if (mmap)
+                    buf.free();
+
+                lock.unlock();
+            }
+        }
+        else
+            return false;
+    }
+
+    /**
+     * Signals next segment available to wake up other worker threads waiting for WAL to write.
+     */
+    @Override public void signalNextAvailable() {
+        lock.lock();
+
+        try {
+            assert cctx.kernalContext().invalid() ||
+                written == lastFsyncPos || mode != WALMode.FSYNC :
+                "fsync [written=" + written + ", lastFsync=" + lastFsyncPos + ", idx=" + getSegmentId() + ']';
+
+            fileIO = null;
+
+            nextSegment.signalAll();
+        }
+        finally {
+            lock.unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void awaitNext() {
+        lock.lock();
+
+        try {
+            while (fileIO != null)
+                U.awaitQuiet(nextSegment);
+        }
+        finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * @return Safely reads current position of the file channel as String. Will return "null" if channel is null.
+     */
+    public String safePosition() {
+        FileIO io = fileIO;
+
+        if (io == null)
+            return "null";
+
+        try {
+            return String.valueOf(io.position());
+        }
+        catch (IOException e) {
+            return "{Failed to read channel position: " + e.getMessage() + '}';
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/889ce79b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FsyncFileHandleManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FsyncFileHandleManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FsyncFileHandleManagerImpl.java
new file mode 100644
index 0000000..e456f04
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FsyncFileHandleManagerImpl.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal.filehandle;
+
+import java.io.IOException;
+import java.util.function.Supplier;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.internal.pagemem.wal.WALPointer;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl;
+import org.apache.ignite.internal.processors.cache.persistence.StorageException;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
+import org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentIO;
+import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer;
+
+/**
+ * Implementation of {@link FileWriteHandle} for FSYNC mode.
+ */
+public class FsyncFileHandleManagerImpl implements FileHandleManager {
+    /** Context. */
+    protected final GridCacheSharedContext cctx;
+    /** Logger. */
+    protected final IgniteLogger log;
+    /** */
+    private final WALMode mode;
+    /** Persistence metrics tracker. */
+    private final DataStorageMetricsImpl metrics;
+    /** Last WAL pointer. */
+    private final Supplier<WALPointer> lastWALPtr;
+    /** */
+    protected final RecordSerializer serializer;
+    /** Current handle supplier. */
+    private final Supplier<FileWriteHandle> currentHandleSupplier;
+    /** WAL segment size in bytes. This is maximum value, actual segments may be shorter. */
+    private final long maxWalSegmentSize;
+    /** Fsync delay. */
+    private final long fsyncDelay;
+    /** Thread local byte buffer size. */
+    private final int tlbSize;
+
+    /**
+     * @param cctx Context.
+     * @param metrics Data storage metrics.
+     * @param ptr Last WAL pointer.
+     * @param serializer Serializer.
+     * @param handle Current handle supplier.
+     * @param mode WAL mode.
+     * @param maxWalSegmentSize Max WAL segment size.
+     * @param fsyncDelay Fsync delay.
+     * @param tlbSize Thread local byte buffer size.
+     */
+    public FsyncFileHandleManagerImpl(GridCacheSharedContext cctx,
+        DataStorageMetricsImpl metrics, Supplier<WALPointer> ptr, RecordSerializer serializer,
+        Supplier<FileWriteHandle> handle, WALMode mode,
+        long maxWalSegmentSize, long fsyncDelay, int tlbSize) {
+        this.cctx = cctx;
+        this.log = cctx.logger(FsyncFileHandleManagerImpl.class);
+        this.mode = mode;
+        this.metrics = metrics;
+        lastWALPtr = ptr;
+        this.serializer = serializer;
+        currentHandleSupplier = handle;
+        this.maxWalSegmentSize = maxWalSegmentSize;
+        this.fsyncDelay = fsyncDelay;
+        this.tlbSize = tlbSize;
+    }
+
+    /** {@inheritDoc} */
+    @Override public FileWriteHandle initHandle(SegmentIO fileIO, long position,
+        RecordSerializer serializer) throws IOException {
+        return new FsyncFileWriteHandle(
+            cctx, fileIO, metrics, serializer, position,
+            mode, maxWalSegmentSize, tlbSize, fsyncDelay
+        );
+    }
+
+    /** {@inheritDoc} */
+    @Override public FileWriteHandle nextHandle(SegmentIO fileIO,
+        RecordSerializer serializer) throws IOException {
+        return new FsyncFileWriteHandle(
+            cctx, fileIO, metrics, serializer, 0,
+            mode, maxWalSegmentSize, tlbSize, fsyncDelay
+        );
+    }
+
+    /**
+     * @return Current handle.
+     */
+    private FsyncFileWriteHandle currentHandle() {
+        return (FsyncFileWriteHandle)currentHandleSupplier.get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start() {
+        //NOOP.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onActivate() {
+        //NOOP.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onDeactivate() throws IgniteCheckedException {
+        FsyncFileWriteHandle currHnd = currentHandle();
+
+        if (mode == WALMode.BACKGROUND) {
+            if (currHnd != null)
+                currHnd.flushAllOnStop();
+        }
+
+        if (currHnd != null)
+            currHnd.close(false);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void resumeLogging() {
+        //NOOP.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void flush(WALPointer ptr, boolean explicitFsync) throws IgniteCheckedException, StorageException {
+        if (serializer == null || mode == WALMode.NONE)
+            return;
+
+        FsyncFileWriteHandle cur = currentHandle();
+
+        // WAL manager was not started (client node).
+        if (cur == null)
+            return;
+
+        FileWALPointer filePtr = (FileWALPointer)(ptr == null ? lastWALPtr.get() : ptr);
+
+        // No need to sync if was rolled over.
+        if (filePtr != null && !cur.needFsync(filePtr))
+            return;
+
+        cur.fsync(filePtr, false);
+    }
+}