You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by dg...@apache.org on 2018/11/09 12:07:49 UTC
[2/4] ignite git commit: IGNITE-9909 Merge FsyncWalManager and
WalManager - Fixes #5013.
http://git-wip-us.apache.org/repos/asf/ignite/blob/889ce79b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/AbstractFileHandle.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/AbstractFileHandle.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/AbstractFileHandle.java
new file mode 100644
index 0000000..127d737
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/AbstractFileHandle.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal.filehandle;
+
+import org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentIO;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ *
+ */
+public abstract class AbstractFileHandle {
+ /** I/O interface for read/write operations with file. */
+ protected SegmentIO fileIO;
+
+ /** Segment idx corresponded to fileIo. */
+ private final long segmentIdx;
+
+ /**
+ * @param fileIO I/O interface for read/write operations of AbstractFileHandle.
+ */
+ public AbstractFileHandle(@NotNull SegmentIO fileIO) {
+ this.fileIO = fileIO;
+ segmentIdx = fileIO.getSegmentId();
+ }
+
+ /**
+ * @return Absolute WAL segment file index (incremental counter).
+ */
+ public long getSegmentId(){
+ return segmentIdx;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/889ce79b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManager.java
new file mode 100644
index 0000000..6597e46
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManager.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal.filehandle;
+
+import java.io.IOException;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.wal.WALPointer;
+import org.apache.ignite.internal.processors.cache.persistence.StorageException;
+import org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentIO;
+import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer;
+
+/**
+ * Manager of {@link FileWriteHandle}.
+ */
+public interface FileHandleManager {
+ /**
+ * Initialize {@link FileWriteHandle} for first time.
+ *
+ * @param fileIO FileIO.
+ * @param position Init position.
+ * @param serializer Serializer for file handle.
+ * @return Created file handle.
+ * @throws IOException if creation was not success.
+ */
+ FileWriteHandle initHandle(SegmentIO fileIO, long position, RecordSerializer serializer) throws IOException;
+
+ /**
+ * Create next file handle.
+ *
+ * @param fileIO FileIO.
+ * @param serializer Serializer for file handle.
+ * @return Created file handle.
+ * @throws IOException if creation was not success.
+ */
+ FileWriteHandle nextHandle(SegmentIO fileIO, RecordSerializer serializer) throws IOException;
+
+ /**
+ * Start manager.
+ */
+ void start();
+
+ /**
+ * On activate.
+ */
+ void onActivate();
+
+ /**
+ * On deactivate.
+ *
+ * @throws IgniteCheckedException if fail.
+ */
+ void onDeactivate() throws IgniteCheckedException;
+
+ /**
+ * Resume logging.
+ */
+ void resumeLogging();
+
+ /**
+ * @param ptr Pointer until need to flush.
+ * @param explicitFsync {@code true} if fsync required.
+ * @throws IgniteCheckedException if fail.
+ * @throws StorageException if storage was fail.
+ */
+ void flush(WALPointer ptr, boolean explicitFsync) throws IgniteCheckedException, StorageException;
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/889ce79b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerFactory.java
new file mode 100644
index 0000000..b0c456e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerFactory.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal.filehandle;
+
+import java.util.function.Supplier;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.internal.pagemem.wal.WALPointer;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl;
+import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer;
+
+/**
+ * Factory of {@link FileHandleManager}.
+ */
+public class FileHandleManagerFactory {
+ /** */
+ private final boolean walFsyncWithDedicatedWorker =
+ IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_WAL_FSYNC_WITH_DEDICATED_WORKER, false);
+
+ /** Data storage configuration. */
+ private final DataStorageConfiguration dsConf;
+
+ /**
+ * @param conf Data storage configuration.
+ */
+ public FileHandleManagerFactory(DataStorageConfiguration conf) {
+ dsConf = conf;
+ }
+
+ /**
+ * @param cctx Cache context.
+ * @param metrics Data storage metrics.
+ * @param mmap Using mmap.
+ * @param lastWALPtr Last WAL pointer.
+ * @param serializer Serializer.
+ * @param currHandleSupplier Supplier of current handle.
+ * @return One of implementation of {@link FileHandleManager}.
+ */
+ public FileHandleManager build(
+ GridCacheSharedContext cctx,
+ DataStorageMetricsImpl metrics,
+ boolean mmap,
+ Supplier<WALPointer> lastWALPtr,
+ RecordSerializer serializer,
+ Supplier<FileWriteHandle> currHandleSupplier
+ ) {
+ if (dsConf.getWalMode() == WALMode.FSYNC && !walFsyncWithDedicatedWorker)
+ return new FsyncFileHandleManagerImpl(
+ cctx,
+ metrics,
+ lastWALPtr,
+ serializer,
+ currHandleSupplier,
+ dsConf.getWalMode(),
+ dsConf.getWalSegmentSize(),
+ dsConf.getWalFsyncDelayNanos(),
+ dsConf.getWalThreadLocalBufferSize()
+ );
+ else
+ return new FileHandleManagerImpl(
+ cctx,
+ metrics,
+ mmap,
+ lastWALPtr,
+ serializer,
+ currHandleSupplier,
+ dsConf.getWalMode(),
+ dsConf.getWalBufferSize(),
+ dsConf.getWalSegmentSize(),
+ dsConf.getWalFsyncDelayNanos()
+ );
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/889ce79b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerImpl.java
new file mode 100644
index 0000000..1daac31
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerImpl.java
@@ -0,0 +1,603 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal.filehandle;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.ClosedByInterruptException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.LockSupport;
+import java.util.function.Supplier;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.failure.FailureContext;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.pagemem.wal.WALPointer;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl;
+import org.apache.ignite.internal.processors.cache.persistence.StorageException;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
+import org.apache.ignite.internal.processors.cache.persistence.wal.SegmentedRingByteBuffer;
+import org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentIO;
+import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.util.worker.GridWorker;
+import org.apache.ignite.thread.IgniteThread;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_SEGMENT_SYNC_TIMEOUT;
+import static org.apache.ignite.configuration.WALMode.LOG_ONLY;
+import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR;
+import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION;
+import static org.apache.ignite.internal.processors.cache.persistence.wal.SegmentedRingByteBuffer.BufferMode.DIRECT;
+import static org.apache.ignite.internal.util.IgniteUtils.sleep;
+
+/**
+ * Manager for {@link FileWriteHandleImpl}.
+ */
+public class FileHandleManagerImpl implements FileHandleManager {
+ /** Default wal segment sync timeout. */
+ private static final long DFLT_WAL_SEGMENT_SYNC_TIMEOUT = 500L;
+
+ /** WAL writer worker. */
+ private WALWriter walWriter;
+ /** Wal segment sync worker. */
+ private WalSegmentSyncer walSegmentSyncWorker;
+ /** Context. */
+ protected final GridCacheSharedContext cctx;
+ /** Logger. */
+ private final IgniteLogger log;
+ /** */
+ private final WALMode mode;
+ /** Persistence metrics tracker. */
+ private final DataStorageMetricsImpl metrics;
+ /** Use mapped byte buffer. */
+ private final boolean mmap;
+ /** Last WAL pointer. */
+ private final Supplier<WALPointer> lastWALPtr;
+ /** */
+ private final RecordSerializer serializer;
+ /** Current handle supplier. */
+ private final Supplier<FileWriteHandle> currentHandleSupplier;
+ /** WAL buffer size. */
+ private final int walBufferSize;
+ /** WAL segment size in bytes. . This is maximum value, actual segments may be shorter. */
+ private final long maxWalSegmentSize;
+ /** Fsync delay. */
+ private final long fsyncDelay;
+
+ /**
+ * @param cctx Context.
+ * @param metrics Data storage metrics.
+ * @param mmap Mmap.
+ * @param lastWALPtr Last WAL pointer.
+ * @param serializer Serializer.
+ * @param currentHandleSupplier Current handle supplier.
+ * @param mode WAL mode.
+ * @param walBufferSize WAL buffer size.
+ * @param maxWalSegmentSize Max WAL segment size.
+ * @param fsyncDelay Fsync delay.
+ */
+ public FileHandleManagerImpl(
+ GridCacheSharedContext cctx,
+ DataStorageMetricsImpl metrics,
+ boolean mmap,
+ Supplier<WALPointer> lastWALPtr,
+ RecordSerializer serializer,
+ Supplier<FileWriteHandle> currentHandleSupplier,
+ WALMode mode,
+ int walBufferSize,
+ long maxWalSegmentSize,
+ long fsyncDelay) {
+ this.cctx = cctx;
+ this.log = cctx.logger(FileHandleManagerImpl.class);
+ this.mode = mode;
+ this.metrics = metrics;
+ this.mmap = mmap;
+ this.lastWALPtr = lastWALPtr;
+ this.serializer = serializer;
+ this.currentHandleSupplier = currentHandleSupplier;
+ this.walBufferSize = walBufferSize;
+ this.maxWalSegmentSize = maxWalSegmentSize;
+ this.fsyncDelay = fsyncDelay;
+ }
+
+ /** {@inheritDoc} */
+ @Override public FileWriteHandle initHandle(
+ SegmentIO fileIO,
+ long position,
+ RecordSerializer serializer
+ ) throws IOException {
+ SegmentedRingByteBuffer rbuf;
+
+ if (mmap) {
+ MappedByteBuffer buf = fileIO.map((int)maxWalSegmentSize);
+
+ rbuf = new SegmentedRingByteBuffer(buf, metrics);
+ }
+ else
+ rbuf = new SegmentedRingByteBuffer(walBufferSize, maxWalSegmentSize, DIRECT, metrics);
+
+ rbuf.init(position);
+
+ return new FileWriteHandleImpl(
+ cctx, fileIO, rbuf, serializer, metrics, walWriter, position,
+ mode, mmap, true, fsyncDelay, maxWalSegmentSize
+ );
+ }
+
+ /** {@inheritDoc} */
+ @Override public FileWriteHandle nextHandle(SegmentIO fileIO, RecordSerializer serializer) throws IOException {
+ SegmentedRingByteBuffer rbuf;
+
+ if (mmap) {
+ MappedByteBuffer buf = fileIO.map((int)maxWalSegmentSize);
+
+ rbuf = new SegmentedRingByteBuffer(buf, metrics);
+ }
+ else
+ rbuf = currentHandle().buf.reset();
+
+ try {
+ return new FileWriteHandleImpl(
+ cctx, fileIO, rbuf, serializer, metrics, walWriter, 0,
+ mode, mmap, false, fsyncDelay, maxWalSegmentSize
+ );
+ }
+ catch (ClosedByInterruptException e) {
+ if (rbuf != null)
+ rbuf.free();
+ }
+
+ return null;
+ }
+
+ /**
+ * @return Current handle.
+ */
+ private FileWriteHandleImpl currentHandle() {
+ return (FileWriteHandleImpl)currentHandleSupplier.get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void start() {
+ if (mode != WALMode.NONE && mode != WALMode.FSYNC) {
+ walSegmentSyncWorker = new WalSegmentSyncer(cctx.igniteInstanceName(),
+ cctx.kernalContext().log(WalSegmentSyncer.class));
+
+ if (log.isInfoEnabled())
+ log.info("Started write-ahead log manager [mode=" + mode + ']');
+ }
+ else
+ U.quietAndWarn(log, "Started write-ahead log manager in NONE mode, persisted data may be lost in " +
+ "a case of unexpected node failure. Make sure to deactivate the cluster before shutdown.");
+
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onActivate() {
+ if (!cctx.kernalContext().clientNode()) {
+ if (walSegmentSyncWorker != null)
+ new IgniteThread(walSegmentSyncWorker).start();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onDeactivate() throws IgniteCheckedException {
+ FileWriteHandleImpl currHnd = currentHandle();
+
+ if (mode == WALMode.BACKGROUND) {
+ if (currHnd != null)
+ currHnd.flush(null);
+ }
+
+ if (currHnd != null)
+ currHnd.close(false);
+
+ if (walSegmentSyncWorker != null)
+ walSegmentSyncWorker.shutdown();
+
+ if (walWriter != null)
+ walWriter.shutdown();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void resumeLogging() {
+ walWriter = new WALWriter(log);
+
+ if (!mmap)
+ new IgniteThread(walWriter).start();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void flush(WALPointer ptr, boolean explicitFsync) throws IgniteCheckedException, StorageException {
+ if (serializer == null || mode == WALMode.NONE)
+ return;
+
+ FileWriteHandleImpl cur = currentHandle();
+
+ // WAL manager was not started (client node).
+ if (cur == null)
+ return;
+
+ FileWALPointer filePtr = (FileWALPointer)(ptr == null ? lastWALPtr.get() : ptr);
+
+ if (mode == LOG_ONLY)
+ cur.flushOrWait(filePtr);
+
+ if (!explicitFsync && mode != WALMode.FSYNC)
+ return; // No need to sync in LOG_ONLY or BACKGROUND unless explicit fsync is required.
+
+ // No need to sync if was rolled over.
+ if (filePtr != null && !cur.needFsync(filePtr))
+ return;
+
+ cur.fsync(filePtr);
+ }
+
+ /**
+ * @throws StorageException If node is no longer valid and we missed a WAL operation.
+ */
+ private void checkNode() throws StorageException {
+ if (cctx.kernalContext().invalid())
+ throw new StorageException("Failed to perform WAL operation (environment was invalidated by a " +
+ "previous error)");
+ }
+
+ /**
+ * WAL writer worker.
+ */
+ public class WALWriter extends GridWorker {
+ /** Unconditional flush. */
+ private static final long UNCONDITIONAL_FLUSH = -1L;
+
+ /** File close. */
+ private static final long FILE_CLOSE = -2L;
+
+ /** File force. */
+ private static final long FILE_FORCE = -3L;
+
+ /** Err. */
+ private volatile Throwable err;
+
+ //TODO: replace with GC free data structure.
+ /** Parked threads. */
+ final Map<Thread, Long> waiters = new ConcurrentHashMap<>();
+
+ /**
+ * Default constructor.
+ *
+ * @param log Logger.
+ */
+ WALWriter(IgniteLogger log) {
+ super(cctx.igniteInstanceName(), "wal-write-worker%" + cctx.igniteInstanceName(), log,
+ cctx.kernalContext().workersRegistry());
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void body() {
+ Throwable err = null;
+
+ try {
+ while (!isCancelled()) {
+ onIdle();
+
+ while (waiters.isEmpty()) {
+ if (!isCancelled()) {
+ blockingSectionBegin();
+
+ try {
+ LockSupport.park();
+ }
+ finally {
+ blockingSectionEnd();
+ }
+ }
+ else {
+ unparkWaiters(Long.MAX_VALUE);
+
+ return;
+ }
+ }
+
+ Long pos = null;
+
+ for (Long val : waiters.values()) {
+ if (val > Long.MIN_VALUE)
+ pos = val;
+ }
+
+ updateHeartbeat();
+
+ if (pos == null)
+ continue;
+ else if (pos < UNCONDITIONAL_FLUSH) {
+ try {
+ assert pos == FILE_CLOSE || pos == FILE_FORCE : pos;
+
+ if (pos == FILE_CLOSE)
+ currentHandle().fileIO.close();
+ else if (pos == FILE_FORCE)
+ currentHandle().fileIO.force();
+ }
+ catch (IOException e) {
+ log.error("Exception in WAL writer thread: ", e);
+
+ err = e;
+
+ unparkWaiters(Long.MAX_VALUE);
+
+ return;
+ }
+
+ unparkWaiters(pos);
+ }
+
+ updateHeartbeat();
+
+ List<SegmentedRingByteBuffer.ReadSegment> segs = currentHandle().buf.poll(pos);
+
+ if (segs == null) {
+ unparkWaiters(pos);
+
+ continue;
+ }
+
+ for (int i = 0; i < segs.size(); i++) {
+ SegmentedRingByteBuffer.ReadSegment seg = segs.get(i);
+
+ updateHeartbeat();
+
+ try {
+ writeBuffer(seg.position(), seg.buffer());
+ }
+ catch (Throwable e) {
+ log.error("Exception in WAL writer thread:", e);
+
+ err = e;
+ }
+ finally {
+ seg.release();
+
+ long p = pos <= UNCONDITIONAL_FLUSH || err != null ? Long.MAX_VALUE : currentHandle().written;
+
+ unparkWaiters(p);
+ }
+ }
+ }
+ }
+ catch (Throwable t) {
+ err = t;
+ }
+ finally {
+ unparkWaiters(Long.MAX_VALUE);
+
+ if (err == null && !isCancelled)
+ err = new IllegalStateException("Worker " + name() + " is terminated unexpectedly");
+
+ if (err instanceof OutOfMemoryError)
+ cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, err));
+ else if (err != null)
+ cctx.kernalContext().failure().process(new FailureContext(SYSTEM_WORKER_TERMINATION, err));
+ }
+ }
+
+ /**
+ * Shutdowns thread.
+ */
+ private void shutdown() throws IgniteInterruptedCheckedException {
+ U.cancel(this);
+
+ LockSupport.unpark(runner());
+
+ U.join(runner());
+ }
+
+ /**
+ * Unparks waiting threads.
+ *
+ * @param pos Pos.
+ */
+ private void unparkWaiters(long pos) {
+ assert pos > Long.MIN_VALUE : pos;
+
+ for (Map.Entry<Thread, Long> e : waiters.entrySet()) {
+ Long val = e.getValue();
+
+ if (val <= pos) {
+ if (val != Long.MIN_VALUE)
+ waiters.put(e.getKey(), Long.MIN_VALUE);
+
+ LockSupport.unpark(e.getKey());
+ }
+ }
+ }
+
+ /**
+ * Forces all made changes to the file.
+ */
+ void force() throws IgniteCheckedException {
+ flushBuffer(FILE_FORCE);
+ }
+
+ /**
+ * Closes file.
+ */
+ void close() throws IgniteCheckedException {
+ flushBuffer(FILE_CLOSE);
+ }
+
+ /**
+ * Flushes all data from the buffer.
+ */
+ void flushAll() throws IgniteCheckedException {
+ flushBuffer(UNCONDITIONAL_FLUSH);
+ }
+
+ /**
+ * @param expPos Expected position.
+ */
+ void flushBuffer(long expPos) throws IgniteCheckedException {
+ if (mmap)
+ return;
+
+ Throwable err = walWriter.err;
+
+ if (err != null)
+ cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, err));
+
+ if (expPos == UNCONDITIONAL_FLUSH)
+ expPos = (currentHandle().buf.tail());
+
+ Thread t = Thread.currentThread();
+
+ waiters.put(t, expPos);
+
+ LockSupport.unpark(walWriter.runner());
+
+ while (true) {
+ Long val = waiters.get(t);
+
+ assert val != null : "Only this thread can remove thread from waiters";
+
+ if (val == Long.MIN_VALUE) {
+ waiters.remove(t);
+
+ Throwable walWriterError = walWriter.err;
+
+ if (walWriterError != null)
+ throw new IgniteCheckedException("Flush buffer failed.", walWriterError);
+
+ return;
+ }
+ else
+ LockSupport.park();
+ }
+ }
+
+ /**
+ * @param pos Position in file to start write from. May be checked against actual position to wait previous
+ * writes to complete.
+ * @param buf Buffer to write to file.
+ * @throws StorageException If failed.
+ * @throws IgniteCheckedException If failed.
+ */
+ private void writeBuffer(long pos, ByteBuffer buf) throws StorageException, IgniteCheckedException {
+ FileWriteHandleImpl hdl = currentHandle();
+
+ assert hdl.fileIO != null : "Writing to a closed segment.";
+
+ checkNode();
+
+ long lastLogged = U.currentTimeMillis();
+
+ long logBackoff = 2_000;
+
+ // If we were too fast, need to wait previous writes to complete.
+ while (hdl.written != pos) {
+ assert hdl.written < pos : "written = " + hdl.written + ", pos = " + pos; // No one can write further than we are now.
+
+ // Permutation occurred between blocks write operations.
+ // Order of acquiring lock is not the same as order of write.
+ long now = U.currentTimeMillis();
+
+ if (now - lastLogged >= logBackoff) {
+ if (logBackoff < 60 * 60_000)
+ logBackoff *= 2;
+
+ U.warn(log, "Still waiting for a concurrent write to complete [written=" + hdl.written +
+ ", pos=" + pos + ", lastFsyncPos=" + hdl.lastFsyncPos + ", stop=" + hdl.stop.get() +
+ ", actualPos=" + hdl.safePosition() + ']');
+
+ lastLogged = now;
+ }
+
+ checkNode();
+ }
+
+ // Do the write.
+ int size = buf.remaining();
+
+ assert size > 0 : size;
+
+ try {
+ assert hdl.written == hdl.fileIO.position();
+
+ hdl.written += hdl.fileIO.writeFully(buf);
+
+ metrics.onWalBytesWritten(size);
+
+ assert hdl.written == hdl.fileIO.position();
+ }
+ catch (IOException e) {
+ StorageException se = new StorageException("Failed to write buffer.", e);
+
+ cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, se));
+
+ throw se;
+ }
+ }
+ }
+
+ /**
+ * Syncs WAL segment file.
+ */
+ public class WalSegmentSyncer extends GridWorker {
+ /** Sync timeout. */
+ long syncTimeout;
+
+ /**
+ * @param igniteInstanceName Ignite instance name.
+ * @param log Logger.
+ */
+ private WalSegmentSyncer(String igniteInstanceName, IgniteLogger log) {
+ super(igniteInstanceName, "wal-segment-syncer", log);
+
+ syncTimeout = Math.max(IgniteSystemProperties.getLong(IGNITE_WAL_SEGMENT_SYNC_TIMEOUT,
+ DFLT_WAL_SEGMENT_SYNC_TIMEOUT), 100L);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
+ while (!isCancelled()) {
+ sleep(syncTimeout);
+
+ try {
+ flush(null, true);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Exception when flushing WAL.", e);
+ }
+ }
+ }
+
+ /** Shutted down the worker. */
+ private void shutdown() {
+ synchronized (this) {
+ U.cancel(this);
+ }
+
+ U.join(this, log);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/889ce79b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandle.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandle.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandle.java
new file mode 100644
index 0000000..410cd56
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandle.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal.filehandle;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.wal.WALPointer;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+import org.apache.ignite.internal.processors.cache.persistence.StorageException;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * File write handle.
+ */
+public interface FileWriteHandle {
+
+ /**
+ * @return Version of serializer.
+ */
+ int serializerVersion();
+
+ /**
+ * Do action after finish resume logging.
+ */
+ void finishResumeLogging();
+
+ /**
+ * Write header to segment.
+ *
+ * @throws IgniteCheckedException if fail.
+ */
+ void writeHeader() throws IgniteCheckedException;
+
+ /**
+ * @param rec Record to be added.
+ * @return Pointer or null if roll over to next segment is required or already started by other thread.
+ * @throws StorageException if storage was failed.
+ * @throws IgniteCheckedException if fail.
+ */
+ @Nullable WALPointer addRecord(WALRecord rec) throws StorageException, IgniteCheckedException;
+
+ /**
+ * Flush all records.
+ *
+ * @throws IgniteCheckedException if fail.
+ */
+ void flushAll() throws IgniteCheckedException;
+
+ /**
+ * @param ptr Pointer.
+ * @return {@code true} if fsync needed.
+ */
+ boolean needFsync(FileWALPointer ptr);
+
+ /**
+ * @return Pointer to the end of the last written record (probably not fsync-ed).
+ */
+ FileWALPointer position();
+
+ /**
+ * Do fsync.
+ *
+ * @param ptr Pointer to which fsync required.
+ * @throws StorageException if storage fail.
+ * @throws IgniteCheckedException if fail.
+ */
+ void fsync(FileWALPointer ptr) throws StorageException, IgniteCheckedException;
+
+ /**
+ * Close buffer.
+ */
+ void closeBuffer();
+
+ /**
+ * Close segment.
+ *
+ * @param rollOver Close for rollover.
+ * @return {@code true} if close was success.
+ * @throws IgniteCheckedException if fail.
+ * @throws StorageException if storage was fail.
+ */
+ boolean close(boolean rollOver) throws IgniteCheckedException, StorageException;
+
+ /**
+ * Signals next segment available to wake up other worker threads waiting for WAL to write.
+ */
+ void signalNextAvailable();
+
+ /**
+ * Awaiting when next segment would be initialized.
+ */
+ void awaitNext();
+
+ /**
+ * @return Absolute WAL segment file index (incremental counter).
+ */
+ long getSegmentId();
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/889ce79b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandleImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandleImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandleImpl.java
new file mode 100644
index 0000000..f582dbd
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandleImpl.java
@@ -0,0 +1,601 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal.filehandle;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.internal.pagemem.wal.WALPointer;
+import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord;
+import org.apache.ignite.internal.pagemem.wal.record.SwitchSegmentRecord;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl;
+import org.apache.ignite.internal.processors.cache.persistence.StorageException;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
+import org.apache.ignite.internal.processors.cache.persistence.wal.SegmentedRingByteBuffer;
+import org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentIO;
+import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer;
+import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactoryImpl;
+import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_SERIALIZER_VERSION;
+import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.SWITCH_SEGMENT_RECORD;
+import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.prepareSerializerVersionBuffer;
+import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactory.LATEST_SERIALIZER_VERSION;
+import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.HEADER_RECORD_SIZE;
+import static org.apache.ignite.internal.util.IgniteUtils.findField;
+import static org.apache.ignite.internal.util.IgniteUtils.findNonPublicMethod;
+
+/**
+ * File handle for one log segment.
+ */
+@SuppressWarnings("SignalWithoutCorrespondingAwait")
+class FileWriteHandleImpl extends AbstractFileHandle implements FileWriteHandle {
+ /** {@link MappedByteBuffer#force0(java.io.FileDescriptor, long, long)}. */
+ private static final Method force0 = findNonPublicMethod(
+ MappedByteBuffer.class, "force0",
+ java.io.FileDescriptor.class, long.class, long.class
+ );
+ /** {@link FileWriteHandleImpl#written} atomic field updater. */
+ private static final AtomicLongFieldUpdater<FileWriteHandleImpl> WRITTEN_UPD =
+ AtomicLongFieldUpdater.newUpdater(FileWriteHandleImpl.class, "written");
+
+ /** {@link MappedByteBuffer#mappingOffset()}. */
+ private static final Method mappingOffset = findNonPublicMethod(MappedByteBuffer.class, "mappingOffset");
+
+ /** {@link MappedByteBuffer#mappingAddress(long)}. */
+ private static final Method mappingAddress = findNonPublicMethod(
+ MappedByteBuffer.class, "mappingAddress", long.class
+ );
+
+ /** {@link MappedByteBuffer#fd} */
+ private static final Field fd = findField(MappedByteBuffer.class, "fd");
+
+ /** Page size. */
+ private static final int PAGE_SIZE = GridUnsafe.pageSize();
+
+ /** Serializer latest version to use. */
+ private final int serializerVer =
+ IgniteSystemProperties.getInteger(IGNITE_WAL_SERIALIZER_VERSION, LATEST_SERIALIZER_VERSION);
+
+ /** Use mapped byte buffer. */
+ private final boolean mmap;
+
+ /** Created on resume logging. */
+ private volatile boolean resume;
+
+ /**
+ * Position in current file after the end of last written record (incremented after file channel write operation)
+ */
+ volatile long written;
+
+ /** */
+ protected volatile long lastFsyncPos;
+
+ /** Stop guard to provide warranty that only one thread will be successful in calling {@link #close(boolean)}. */
+ protected final AtomicBoolean stop = new AtomicBoolean(false);
+
+ /** */
+ private final Lock lock = new ReentrantLock();
+
+ /** Condition for timed wait of several threads, see {@link DataStorageConfiguration#getWalFsyncDelayNanos()}. */
+ private final Condition fsync = lock.newCondition();
+
+ /**
+ * Next segment available condition. Protection from "spurious wakeup" is provided by predicate {@link
+ * #fileIO}=<code>null</code>.
+ */
+ private final Condition nextSegment = lock.newCondition();
+
+ /** Buffer. */
+ protected final SegmentedRingByteBuffer buf;
+
+ /** */
+ private final WALMode mode;
+
+ /** Fsync delay. */
+ private final long fsyncDelay;
+
+ /** Persistence metrics tracker. */
+ private final DataStorageMetricsImpl metrics;
+
+ /** WAL segment size in bytes. This is maximum value, actual segments may be shorter. */
+ private final long maxWalSegmentSize;
+
+ /** Logger. */
+ protected final IgniteLogger log;
+
+ /** */
+ private final RecordSerializer serializer;
+
+ /** Context. */
+ protected final GridCacheSharedContext cctx;
+
+ /** WAL writer worker. */
+ private final FileHandleManagerImpl.WALWriter walWriter;
+
+ /**
+ * @param cctx Context.
+ * @param fileIO I/O file interface to use
+ * @param serializer Serializer.
+ * @param metrics Data storage metrics.
+ * @param writer WAL writer.
+ * @param pos Initial position.
+ * @param mode WAL mode.
+ * @param mmap Mmap.
+ * @param resume Created on resume logging flag.
+ * @param fsyncDelay Fsync delay.
+ * @param maxWalSegmentSize Max WAL segment size.
+ * @throws IOException If failed.
+ */
+ FileWriteHandleImpl(
+ GridCacheSharedContext cctx, SegmentIO fileIO, SegmentedRingByteBuffer rbuf, RecordSerializer serializer,
+ DataStorageMetricsImpl metrics, FileHandleManagerImpl.WALWriter writer, long pos, WALMode mode, boolean mmap,
+ boolean resume, long fsyncDelay, long maxWalSegmentSize) throws IOException {
+ super(fileIO);
+ assert serializer != null;
+
+ this.mmap = mmap;
+ this.mode = mode;
+ this.fsyncDelay = fsyncDelay;
+ this.metrics = metrics;
+ this.maxWalSegmentSize = maxWalSegmentSize;
+ this.log = cctx.logger(FileWriteHandleImpl.class);
+ this.cctx = cctx;
+ this.walWriter = writer;
+ this.serializer = serializer;
+ this.written = pos;
+ this.lastFsyncPos = pos;
+ this.resume = resume;
+ this.buf = rbuf;
+
+ if (!mmap)
+ fileIO.position(pos);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int serializerVersion() {
+ return serializer.version();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void finishResumeLogging() {
+ resume = false;
+ }
+
+ /**
+ * @throws StorageException If node is no longer valid and we missed a WAL operation.
+ */
+ private void checkNode() throws StorageException {
+ if (cctx.kernalContext().invalid())
+ throw new StorageException("Failed to perform WAL operation (environment was invalidated by a " +
+ "previous error)");
+ }
+
+ /**
+ * Write serializer version to current handle.
+ */
+ @Override public void writeHeader() {
+ SegmentedRingByteBuffer.WriteSegment seg = buf.offer(HEADER_RECORD_SIZE);
+
+ assert seg != null && seg.position() > 0;
+
+ prepareSerializerVersionBuffer(getSegmentId(), serializerVer, false, seg.buffer());
+
+ seg.release();
+ }
+
+ /**
+ * @param rec Record to be added to write queue.
+ * @return Pointer or null if roll over to next segment is required or already started by other thread.
+ * @throws StorageException If failed.
+ * @throws IgniteCheckedException If failed.
+ */
+ @Override @Nullable public WALPointer addRecord(WALRecord rec) throws StorageException, IgniteCheckedException {
+ assert rec.size() > 0 : rec;
+
+ for (; ; ) {
+ checkNode();
+
+ SegmentedRingByteBuffer.WriteSegment seg;
+
+ // Buffer can be in open state in case of resuming with different serializer version.
+ if (rec.type() == SWITCH_SEGMENT_RECORD && !resume)
+ seg = buf.offerSafe(rec.size());
+ else
+ seg = buf.offer(rec.size());
+
+ FileWALPointer ptr = null;
+
+ if (seg != null) {
+ try {
+ int pos = (int)(seg.position() - rec.size());
+
+ ByteBuffer buf = seg.buffer();
+
+ if (buf == null)
+ return null; // Can not write to this segment, need to switch to the next one.
+
+ ptr = new FileWALPointer(getSegmentId(), pos, rec.size());
+
+ rec.position(ptr);
+
+ fillBuffer(buf, rec);
+
+ if (mmap) {
+ // written field must grow only, but segment with greater position can be serialized
+ // earlier than segment with smaller position.
+ while (true) {
+ long written0 = written;
+
+ if (seg.position() > written0) {
+ if (WRITTEN_UPD.compareAndSet(this, written0, seg.position()))
+ break;
+ }
+ else
+ break;
+ }
+ }
+
+ return ptr;
+ }
+ finally {
+ seg.release();
+
+ if (mode == WALMode.BACKGROUND && rec instanceof CheckpointRecord)
+ flushOrWait(ptr);
+ }
+ }
+ else
+ walWriter.flushAll();
+ }
+ }
+
+ /**
+ * Flush or wait for concurrent flush completion.
+ *
+ * @param ptr Pointer.
+ */
+ public void flushOrWait(FileWALPointer ptr) throws IgniteCheckedException {
+ if (ptr != null) {
+ // If requested obsolete file index, it must be already flushed by close.
+ if (ptr.index() != getSegmentId())
+ return;
+ }
+
+ flush(ptr);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void flushAll() throws IgniteCheckedException {
+ flush(null);
+ }
+
+ /**
+ * @param ptr Pointer.
+ */
+ public void flush(FileWALPointer ptr) throws IgniteCheckedException {
+ if (ptr == null) { // Unconditional flush.
+ walWriter.flushAll();
+
+ return;
+ }
+
+ assert ptr.index() == getSegmentId();
+
+ walWriter.flushBuffer(ptr.fileOffset());
+ }
+
+ /**
+ * @param buf Buffer.
+ * @param rec WAL record.
+ * @throws IgniteCheckedException If failed.
+ */
+ private void fillBuffer(ByteBuffer buf, WALRecord rec) throws IgniteCheckedException {
+ try {
+ serializer.writeRecord(rec, buf);
+ }
+ catch (RuntimeException e) {
+ throw new IllegalStateException("Failed to write record: " + rec, e);
+ }
+ }
+
+ /**
+ * Non-blocking check if this pointer needs to be sync'ed.
+ *
+ * @param ptr WAL pointer to check.
+ * @return {@code False} if this pointer has been already sync'ed.
+ */
+ @Override public boolean needFsync(FileWALPointer ptr) {
+ // If index has changed, it means that the log was rolled over and already sync'ed.
+ // If requested position is smaller than last sync'ed, it also means all is good.
+ // If position is equal, then our record is the last not synced.
+ return getSegmentId() == ptr.index() && lastFsyncPos <= ptr.fileOffset();
+ }
+
+ /**
+ * @return Pointer to the end of the last written record (probably not fsync-ed).
+ */
+ @Override public FileWALPointer position() {
+ lock.lock();
+
+ try {
+ return new FileWALPointer(getSegmentId(), (int)written, 0);
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * @param ptr Pointer to sync.
+ * @throws StorageException If failed.
+ */
+ @Override public void fsync(FileWALPointer ptr) throws StorageException, IgniteCheckedException {
+ lock.lock();
+
+ try {
+ if (ptr != null) {
+ if (!needFsync(ptr))
+ return;
+
+ if (fsyncDelay > 0 && !stop.get()) {
+ // Delay fsync to collect as many updates as possible: trade latency for throughput.
+ U.await(fsync, fsyncDelay, TimeUnit.NANOSECONDS);
+
+ if (!needFsync(ptr))
+ return;
+ }
+ }
+
+ flushOrWait(ptr);
+
+ if (stop.get())
+ return;
+
+ long lastFsyncPos0 = lastFsyncPos;
+ long written0 = written;
+
+ if (lastFsyncPos0 != written0) {
+ // Fsync position must be behind.
+ assert lastFsyncPos0 < written0 : "lastFsyncPos=" + lastFsyncPos0 + ", written=" + written0;
+
+ boolean metricsEnabled = metrics.metricsEnabled();
+
+ long start = metricsEnabled ? System.nanoTime() : 0;
+
+ if (mmap) {
+ long pos = ptr == null ? -1 : ptr.fileOffset();
+
+ List<SegmentedRingByteBuffer.ReadSegment> segs = buf.poll(pos);
+
+ if (segs != null) {
+ assert segs.size() == 1;
+
+ SegmentedRingByteBuffer.ReadSegment seg = segs.get(0);
+
+ int off = seg.buffer().position();
+ int len = seg.buffer().limit() - off;
+
+ fsync((MappedByteBuffer)buf.buf, off, len);
+
+ seg.release();
+ }
+ }
+ else
+ walWriter.force();
+
+ lastFsyncPos = written;
+
+ if (fsyncDelay > 0)
+ fsync.signalAll();
+
+ long end = metricsEnabled ? System.nanoTime() : 0;
+
+ if (metricsEnabled)
+ metrics.onFsync(end - start);
+ }
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * @param buf Mapped byte buffer.
+ * @param off Offset.
+ * @param len Length.
+ */
+ private void fsync(MappedByteBuffer buf, int off, int len) throws IgniteCheckedException {
+ try {
+ long mappedOff = (Long)mappingOffset.invoke(buf);
+
+ assert mappedOff == 0 : mappedOff;
+
+ long addr = (Long)mappingAddress.invoke(buf, mappedOff);
+
+ long delta = (addr + off) % PAGE_SIZE;
+
+ long alignedAddr = (addr + off) - delta;
+
+ force0.invoke(buf, fd.get(buf), alignedAddr, len + delta);
+ }
+ catch (IllegalAccessException | InvocationTargetException e) {
+ throw new IgniteCheckedException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void closeBuffer() {
+ buf.close();
+ }
+
+ /**
+ * @return {@code true} If this thread actually closed the segment.
+ * @throws IgniteCheckedException If failed.
+ * @throws StorageException If failed.
+ */
+ @Override public boolean close(boolean rollOver) throws IgniteCheckedException, StorageException {
+ if (stop.compareAndSet(false, true)) {
+ lock.lock();
+
+ try {
+ flushOrWait(null);
+
+ try {
+ RecordSerializer backwardSerializer = new RecordSerializerFactoryImpl(cctx)
+ .createSerializer(serializerVer);
+
+ SwitchSegmentRecord segmentRecord = new SwitchSegmentRecord();
+
+ int switchSegmentRecSize = backwardSerializer.size(segmentRecord);
+
+ if (rollOver && written < (maxWalSegmentSize - switchSegmentRecSize)) {
+ segmentRecord.size(switchSegmentRecSize);
+
+ WALPointer segRecPtr = addRecord(segmentRecord);
+
+ if (segRecPtr != null)
+ fsync((FileWALPointer)segRecPtr);
+ }
+
+ if (mmap) {
+ List<SegmentedRingByteBuffer.ReadSegment> segs = buf.poll(maxWalSegmentSize);
+
+ if (segs != null) {
+ assert segs.size() == 1;
+
+ segs.get(0).release();
+ }
+ }
+
+ // Do the final fsync.
+ if (mode != WALMode.NONE) {
+ if (mmap)
+ ((MappedByteBuffer)buf.buf).force();
+ else
+ fileIO.force();
+
+ lastFsyncPos = written;
+ }
+
+ if (mmap) {
+ try {
+ fileIO.close();
+ }
+ catch (IOException ignore) {
+ // No-op.
+ }
+ }
+ else {
+ walWriter.close();
+
+ if (!rollOver)
+ buf.free();
+ }
+ }
+ catch (IOException e) {
+ throw new StorageException("Failed to close WAL write handle [idx=" + getSegmentId() + "]", e);
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Closed WAL write handle [idx=" + getSegmentId() + "]");
+
+ return true;
+ }
+ finally {
+ if (mmap)
+ buf.free();
+
+ lock.unlock();
+ }
+ }
+ else
+ return false;
+ }
+
+ /**
+ * Signals next segment available to wake up other worker threads waiting for WAL to write.
+ */
+ @Override public void signalNextAvailable() {
+ lock.lock();
+
+ try {
+ assert cctx.kernalContext().invalid() ||
+ written == lastFsyncPos || mode != WALMode.FSYNC :
+ "fsync [written=" + written + ", lastFsync=" + lastFsyncPos + ", idx=" + getSegmentId() + ']';
+
+ fileIO = null;
+
+ nextSegment.signalAll();
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void awaitNext() {
+ lock.lock();
+
+ try {
+ while (fileIO != null)
+ U.awaitQuiet(nextSegment);
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * @return Safely reads current position of the file channel as String. Will return "null" if channel is null.
+ */
+ public String safePosition() {
+ FileIO io = fileIO;
+
+ if (io == null)
+ return "null";
+
+ try {
+ return String.valueOf(io.position());
+ }
+ catch (IOException e) {
+ return "{Failed to read channel position: " + e.getMessage() + '}';
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/889ce79b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FsyncFileHandleManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FsyncFileHandleManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FsyncFileHandleManagerImpl.java
new file mode 100644
index 0000000..e456f04
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FsyncFileHandleManagerImpl.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal.filehandle;
+
+import java.io.IOException;
+import java.util.function.Supplier;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.internal.pagemem.wal.WALPointer;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl;
+import org.apache.ignite.internal.processors.cache.persistence.StorageException;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
+import org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentIO;
+import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer;
+
+/**
+ * Implementation of {@link FileWriteHandle} for FSYNC mode.
+ */
+public class FsyncFileHandleManagerImpl implements FileHandleManager {
+ /** Context. */
+ protected final GridCacheSharedContext cctx;
+ /** Logger. */
+ protected final IgniteLogger log;
+ /** */
+ private final WALMode mode;
+ /** Persistence metrics tracker. */
+ private final DataStorageMetricsImpl metrics;
+ /** Last WAL pointer. */
+ private final Supplier<WALPointer> lastWALPtr;
+ /** */
+ protected final RecordSerializer serializer;
+ /** Current handle supplier. */
+ private final Supplier<FileWriteHandle> currentHandleSupplier;
+ /** WAL segment size in bytes. This is maximum value, actual segments may be shorter. */
+ private final long maxWalSegmentSize;
+ /** Fsync delay. */
+ private final long fsyncDelay;
+ /** Thread local byte buffer size. */
+ private final int tlbSize;
+
+ /**
+ * @param cctx Context.
+ * @param metrics Data storage metrics.
+ * @param ptr Last WAL pointer.
+ * @param serializer Serializer.
+ * @param handle Current handle supplier.
+ * @param mode WAL mode.
+ * @param maxWalSegmentSize Max WAL segment size.
+ * @param fsyncDelay Fsync delay.
+ * @param tlbSize Thread local byte buffer size.
+ */
+ public FsyncFileHandleManagerImpl(GridCacheSharedContext cctx,
+ DataStorageMetricsImpl metrics, Supplier<WALPointer> ptr, RecordSerializer serializer,
+ Supplier<FileWriteHandle> handle, WALMode mode,
+ long maxWalSegmentSize, long fsyncDelay, int tlbSize) {
+ this.cctx = cctx;
+ this.log = cctx.logger(FsyncFileHandleManagerImpl.class);
+ this.mode = mode;
+ this.metrics = metrics;
+ lastWALPtr = ptr;
+ this.serializer = serializer;
+ currentHandleSupplier = handle;
+ this.maxWalSegmentSize = maxWalSegmentSize;
+ this.fsyncDelay = fsyncDelay;
+ this.tlbSize = tlbSize;
+ }
+
+ /** {@inheritDoc} */
+ @Override public FileWriteHandle initHandle(SegmentIO fileIO, long position,
+ RecordSerializer serializer) throws IOException {
+ return new FsyncFileWriteHandle(
+ cctx, fileIO, metrics, serializer, position,
+ mode, maxWalSegmentSize, tlbSize, fsyncDelay
+ );
+ }
+
+ /** {@inheritDoc} */
+ @Override public FileWriteHandle nextHandle(SegmentIO fileIO,
+ RecordSerializer serializer) throws IOException {
+ return new FsyncFileWriteHandle(
+ cctx, fileIO, metrics, serializer, 0,
+ mode, maxWalSegmentSize, tlbSize, fsyncDelay
+ );
+ }
+
+ /**
+ * @return Current handle.
+ */
+ private FsyncFileWriteHandle currentHandle() {
+ return (FsyncFileWriteHandle)currentHandleSupplier.get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void start() {
+ //NOOP.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onActivate() {
+ //NOOP.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onDeactivate() throws IgniteCheckedException {
+ FsyncFileWriteHandle currHnd = currentHandle();
+
+ if (mode == WALMode.BACKGROUND) {
+ if (currHnd != null)
+ currHnd.flushAllOnStop();
+ }
+
+ if (currHnd != null)
+ currHnd.close(false);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void resumeLogging() {
+ //NOOP.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void flush(WALPointer ptr, boolean explicitFsync) throws IgniteCheckedException, StorageException {
+ if (serializer == null || mode == WALMode.NONE)
+ return;
+
+ FsyncFileWriteHandle cur = currentHandle();
+
+ // WAL manager was not started (client node).
+ if (cur == null)
+ return;
+
+ FileWALPointer filePtr = (FileWALPointer)(ptr == null ? lastWALPtr.get() : ptr);
+
+ // No need to sync if was rolled over.
+ if (filePtr != null && !cur.needFsync(filePtr))
+ return;
+
+ cur.fsync(filePtr, false);
+ }
+}