You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/07/11 09:03:34 UTC
[36/49] ignite git commit: GG-12418 - WAL hangs on any error during
segment rollover
GG-12418 - WAL hangs on any error during segment rollover
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/17d881ba
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/17d881ba
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/17d881ba
Branch: refs/heads/ignite-2.1
Commit: 17d881ba0122a7f90cac9846c376300a1d001bdd
Parents: f1c8e59
Author: Pavel Kovalenko <jo...@gmail.com>
Authored: Mon Jul 10 13:55:47 2017 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Mon Jul 10 13:57:51 2017 +0300
----------------------------------------------------------------------
.../PersistentStoreConfiguration.java | 24 +++
.../cache/persistence/file/FileIO.java | 154 +++++++++++++++
.../cache/persistence/file/FileIODecorator.java | 98 ++++++++++
.../cache/persistence/file/FileIOFactory.java | 45 +++++
.../cache/persistence/file/FilePageStore.java | 51 +++--
.../persistence/file/FilePageStoreManager.java | 2 +
.../persistence/file/RandomAccessFileIO.java | 110 +++++++++++
.../file/RandomAccessFileIOFactory.java | 42 ++++
.../wal/AbstractWalRecordsIterator.java | 22 ++-
.../cache/persistence/wal/FileInput.java | 40 ++--
.../wal/FileWriteAheadLogManager.java | 161 ++++++++-------
.../wal/reader/IgniteWalIteratorFactory.java | 13 +-
.../wal/reader/StandaloneGridKernalContext.java | 15 +-
.../reader/StandaloneIgnitePluginProcessor.java | 38 ++++
.../reader/StandaloneWalRecordsIterator.java | 37 ++--
...gnitePdsRecoveryAfterFileCorruptionTest.java | 11 +-
.../db/wal/IgniteWalFlushFailoverTest.java | 195 +++++++++++++++++++
.../db/wal/crc/IgniteDataIntegrityTests.java | 10 +-
.../db/wal/reader/IgniteWalReaderTest.java | 9 +-
.../db/wal/reader/MockWalIteratorFactory.java | 8 +-
.../ignite/testsuites/IgnitePdsTestSuite2.java | 4 +
21 files changed, 919 insertions(+), 170 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/17d881ba/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java
index b531f9d..4792483 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java
@@ -16,6 +16,8 @@
*/
package org.apache.ignite.configuration;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
import org.apache.ignite.internal.util.typedef.internal.S;
import java.io.Serializable;
@@ -133,6 +135,9 @@ public class PersistentStoreConfiguration implements Serializable {
/** Always write full pages. */
private boolean alwaysWriteFullPages = DFLT_WAL_ALWAYS_WRITE_FULL_PAGES;
+ /** Factory to provide I/O interface for files */
+ private FileIOFactory fileIOFactory = new RandomAccessFileIOFactory();
+
/**
* Number of sub-intervals the whole {@link #setRateTimeInterval(long)} will be split into to calculate
* rate-based metrics.
@@ -539,6 +544,25 @@ public class PersistentStoreConfiguration implements Serializable {
}
/**
+ * Factory to provide implementation of FileIO interface
+ * which is used for any file read/write operations
+ *
+ * @return File I/O factory
+ */
+ public FileIOFactory getFileIOFactory() {
+ return fileIOFactory;
+ }
+
+ /**
+ * @param fileIOFactory File I/O factory
+ */
+ public PersistentStoreConfiguration setFileIOFactory(FileIOFactory fileIOFactory) {
+ this.fileIOFactory = fileIOFactory;
+
+ return this;
+ }
+
+ /**
* <b>Note:</b> setting this value with {@link WALMode#DEFAULT} may generate file size overhead for WAL segments in case
* grid is used rarely.
*
http://git-wip-us.apache.org/repos/asf/ignite/blob/17d881ba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIO.java
new file mode 100644
index 0000000..1e81150
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIO.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.file;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * Interface to perform file I/O operations.
+ */
+public interface FileIO extends AutoCloseable {
+ /**
+ * Returns current file position.
+ *
+ * @return Current file position,
+ * a non-negative integer counting the number of bytes
+ * from the beginning of the file to the current position.
+ *
+ * @throws IOException If some I/O error occurs.
+ */
+ public long position() throws IOException;
+
+ /**
+ * Sets new current file position.
+ *
+ * @param newPosition
+ * The new position, a non-negative integer counting
+ * the number of bytes from the beginning of the file.
+ *
+ * @throws IOException If some I/O error occurs.
+ */
+ public void position(long newPosition) throws IOException;
+
+ /**
+ * Reads a sequence of bytes from this file into the {@code destinationBuffer}.
+ *
+ * @param destinationBuffer Destination byte buffer.
+ *
+ * @return Number of read bytes.
+ *
+ * @throws IOException If some I/O error occurs.
+ */
+ public int read(ByteBuffer destinationBuffer) throws IOException;
+
+ /**
+ * Reads a sequence of bytes from this file into the {@code destinationBuffer}
+ * starting from specified file {@code position}.
+ *
+ * @param destinationBuffer Destination byte buffer.
+ * @param position Starting position of file.
+ *
+ * @return Number of read bytes.
+ *
+ * @throws IOException If some I/O error occurs.
+ */
+ public int read(ByteBuffer destinationBuffer, long position) throws IOException;
+
+ /**
+ * Reads a up to {@code length} bytes from this file into the {@code buffer}.
+ *
+ * @param buffer Destination byte array.
+ * @param offset The start offset in array {@code b}
+ * at which the data is written.
+ * @param length Maximum number of bytes read.
+ *
+ * @return Number of read bytes.
+ *
+ * @throws IOException If some I/O error occurs.
+ */
+ public int read(byte[] buffer, int offset, int length) throws IOException;
+
+ /**
+ * Writes a sequence of bytes to this file from the {@code sourceBuffer}.
+ *
+ * @param sourceBuffer Source buffer.
+ *
+ * @return Number of written bytes.
+ *
+ * @throws IOException If some I/O error occurs.
+ */
+ public int write(ByteBuffer sourceBuffer) throws IOException;
+
+ /**
+ * Writes a sequence of bytes to this file from the {@code sourceBuffer}
+ * starting from specified file {@code position}
+ *
+ * @param sourceBuffer Source buffer.
+ * @param position Starting file position.
+ *
+ * @return Number of written bytes.
+ *
+ * @throws IOException If some I/O error occurs.
+ */
+ public int write(ByteBuffer sourceBuffer, long position) throws IOException;
+
+ /**
+ * Writes {@code length} bytes from the {@code buffer}
+ * starting at offset {@code off} to this file.
+ *
+ * @param buffer Source byte array.
+ * @param offset Start offset in the {@code buffer}.
+ * @param length Number of bytes to write.
+ *
+ * @throws IOException If some I/O error occurs.
+ */
+ public void write(byte[] buffer, int offset, int length) throws IOException;
+
+ /**
+ * Forces any updates of this file to be written to the storage
+ * device that contains it.
+ *
+ * @throws IOException If some I/O error occurs.
+ */
+ public void force() throws IOException;
+
+ /**
+ * Returns current file size in bytes.
+ *
+ * @return File size.
+ *
+ * @throws IOException If some I/O error occurs.
+ */
+ public long size() throws IOException;
+
+ /**
+ * Truncates current file to zero length
+ * and resets current file position to zero.
+ *
+ * @throws IOException If some I/O error occurs.
+ */
+ public void clear() throws IOException;
+
+ /**
+ * Closes current file.
+ *
+ * @throws IOException If some I/O error occurs.
+ */
+ @Override public void close() throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/17d881ba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIODecorator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIODecorator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIODecorator.java
new file mode 100644
index 0000000..3e80ef8
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIODecorator.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.file;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * Decorator class for File I/O
+ */
+public class FileIODecorator implements FileIO {
+
+ /** File I/O delegate */
+ private final FileIO delegate;
+
+ /**
+ *
+ * @param delegate File I/O delegate
+ */
+ public FileIODecorator(FileIO delegate) {
+ this.delegate = delegate;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long position() throws IOException {
+ return delegate.position();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void position(long newPosition) throws IOException {
+ delegate.position(newPosition);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int read(ByteBuffer destinationBuffer) throws IOException {
+ return delegate.read(destinationBuffer);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int read(ByteBuffer destinationBuffer, long position) throws IOException {
+ return delegate.read(destinationBuffer, position);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int read(byte[] buffer, int offset, int length) throws IOException {
+ return delegate.read(buffer, offset, length);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int write(ByteBuffer sourceBuffer) throws IOException {
+ return delegate.write(sourceBuffer);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int write(ByteBuffer sourceBuffer, long position) throws IOException {
+ return delegate.write(sourceBuffer, position);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(byte[] buffer, int offset, int length) throws IOException {
+ delegate.write(buffer, offset, length);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void force() throws IOException {
+ delegate.force();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long size() throws IOException {
+ return delegate.size();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void clear() throws IOException {
+ delegate.clear();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() throws IOException {
+ delegate.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/17d881ba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIOFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIOFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIOFactory.java
new file mode 100644
index 0000000..0ffc653
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIOFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.file;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+
+public interface FileIOFactory extends Serializable {
+
+ /**
+ * Creates I/O interface for file with default I/O mode
+ *
+ * @param file File
+ * @return File I/O interface
+ * @throws IOException If I/O interface creation was failed
+ */
+ FileIO create(File file) throws IOException;
+
+ /**
+ * Creates I/O interface for file with specified mode
+ *
+ * @param file File
+ * @param mode I/O mode in
+ * @return File I/O interface
+ * @throws IOException If I/O interface creation was failed
+ */
+ FileIO create(File file, String mode) throws IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/17d881ba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
index 6ddc9fc..c827e96 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
@@ -19,10 +19,8 @@ package org.apache.ignite.internal.processors.cache.persistence.file;
import java.io.File;
import java.io.IOException;
-import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
-import java.nio.channels.FileChannel;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -61,11 +59,11 @@ public class FilePageStore implements PageStore {
/** Database configuration. */
private final MemoryConfiguration dbCfg;
- /** */
- private RandomAccessFile file;
+ /** Factory to provide I/O interfaces for read/write operations with files */
+ private final FileIOFactory ioFactory;
- /** */
- private FileChannel ch;
+ /** I/O interface for read/write operations with file */
+ private FileIO fileIO;
/** */
private final AtomicLong allocated;
@@ -91,11 +89,12 @@ public class FilePageStore implements PageStore {
/**
* @param file File.
*/
- public FilePageStore(byte type, File file, MemoryConfiguration cfg) {
+ public FilePageStore(byte type, File file, FileIOFactory factory, MemoryConfiguration cfg) {
this.type = type;
cfgFile = file;
dbCfg = cfg;
+ ioFactory = factory;
allocated = new AtomicLong();
@@ -136,7 +135,7 @@ public class FilePageStore implements PageStore {
ByteBuffer hdr = header(type, dbCfg.getPageSize());
while (hdr.remaining() > 0)
- ch.write(hdr);
+ fileIO.write(hdr);
}
catch (IOException e) {
throw new IgniteException("Check file failed.", e);
@@ -154,7 +153,7 @@ public class FilePageStore implements PageStore {
ByteBuffer hdr = ByteBuffer.allocate(HEADER_SIZE).order(ByteOrder.LITTLE_ENDIAN);
while (hdr.remaining() > 0)
- ch.read(hdr);
+ fileIO.read(hdr);
hdr.rewind();
@@ -186,7 +185,7 @@ public class FilePageStore implements PageStore {
" [expectedPageSize=" + dbCfg.getPageSize() +
", filePageSize=" + pageSize + "]");
- long fileSize = file.length();
+ long fileSize = cfgFile.length();
if (fileSize == HEADER_SIZE) // Every file has a special meta page.
fileSize = pageSize + HEADER_SIZE;
@@ -214,9 +213,9 @@ public class FilePageStore implements PageStore {
if (!inited)
return;
- ch.force(false);
+ fileIO.force();
- file.close();
+ fileIO.close();
if (cleanFile)
cfgFile.delete();
@@ -241,9 +240,7 @@ public class FilePageStore implements PageStore {
this.tag = tag;
- ch.position(0);
-
- file.setLength(0);
+ fileIO.clear();
allocated.set(initFile());
}
@@ -277,7 +274,7 @@ public class FilePageStore implements PageStore {
try {
if (inited)
- allocated.set(ch.size());
+ allocated.set(fileIO.size());
recover = false;
}
@@ -303,7 +300,7 @@ public class FilePageStore implements PageStore {
int len = pageSize;
do {
- int n = ch.read(pageBuf, off);
+ int n = fileIO.read(pageBuf, off);
// If page was not written yet, nothing to read.
if (n < 0) {
@@ -330,7 +327,7 @@ public class FilePageStore implements PageStore {
if ((savedCrc32 ^ curCrc32) != 0)
throw new IgniteDataIntegrityViolationException("Failed to read page (CRC validation failed) " +
"[id=" + U.hexLong(pageId) + ", off=" + (off - pageSize) +
- ", file=" + cfgFile.getAbsolutePath() + ", fileSize=" + ch.size() +
+ ", file=" + cfgFile.getAbsolutePath() + ", fileSize=" + fileIO.size() +
", savedCrc=" + U.hexInt(savedCrc32) + ", curCrc=" + U.hexInt(curCrc32) + "]");
}
@@ -356,7 +353,7 @@ public class FilePageStore implements PageStore {
long off = 0;
do {
- int n = ch.read(buf, off);
+ int n = fileIO.read(buf, off);
// If page was not written yet, nothing to read.
if (n < 0)
@@ -382,16 +379,14 @@ public class FilePageStore implements PageStore {
try {
if (!inited) {
- RandomAccessFile rndFile = null;
+ FileIO fileIO = null;
IgniteCheckedException err = null;
try {
- file = rndFile = new RandomAccessFile(cfgFile, "rw");
-
- ch = file.getChannel();
+ this.fileIO = fileIO = ioFactory.create(cfgFile, "rw");
- if (file.length() == 0)
+ if (cfgFile.length() == 0)
allocated.set(initFile());
else
allocated.set(checkFile());
@@ -402,9 +397,9 @@ public class FilePageStore implements PageStore {
throw err = new IgniteCheckedException("Can't open file: " + cfgFile.getName(), e);
}
finally {
- if (err != null && rndFile != null)
+ if (err != null && fileIO != null)
try {
- rndFile.close();
+ fileIO.close();
}
catch (IOException e) {
err.addSuppressed(e);
@@ -447,7 +442,7 @@ public class FilePageStore implements PageStore {
int len = pageSize;
do {
- int n = ch.write(pageBuf, off);
+ int n = fileIO.write(pageBuf, off);
off += n;
@@ -478,7 +473,7 @@ public class FilePageStore implements PageStore {
try {
init();
- ch.force(false);
+ fileIO.force();
}
catch (IOException e) {
throw new IgniteCheckedException("Sync error", e);
http://git-wip-us.apache.org/repos/asf/ignite/blob/17d881ba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
index 6aa2243..4a56ec7 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
@@ -367,6 +367,7 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
FilePageStore idxStore = new FilePageStore(
PageMemory.FLAG_IDX,
idxFile,
+ pstCfg.getFileIOFactory(),
cctx.kernalContext().config().getMemoryConfiguration());
FilePageStore[] partStores = new FilePageStore[grpDesc.config().getAffinity().partitions()];
@@ -375,6 +376,7 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
FilePageStore partStore = new FilePageStore(
PageMemory.FLAG_DATA,
new File(cacheWorkDir, String.format(PART_FILE_TEMPLATE, partId)),
+ pstCfg.getFileIOFactory(),
cctx.kernalContext().config().getMemoryConfiguration()
);
http://git-wip-us.apache.org/repos/asf/ignite/blob/17d881ba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIO.java
new file mode 100644
index 0000000..73a560a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIO.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.file;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+/**
+ * File I/O implementation based on {@code java.io.RandomAccessFile}.
+ */
+public class RandomAccessFileIO implements FileIO {
+
+ /**
+ * Random access file associated with this I/O
+ */
+ private final RandomAccessFile file;
+
+ /**
+ * File channel associated with {@code file}
+ */
+ private final FileChannel channel;
+
+ /**
+ * Creates I/O implementation for specified {@code file}
+ *
+ * @param file Random access file
+ */
+ public RandomAccessFileIO(RandomAccessFile file) {
+ this.file = file;
+ this.channel = file.getChannel();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long position() throws IOException {
+ return channel.position();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void position(long newPosition) throws IOException {
+ channel.position(newPosition);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int read(ByteBuffer destinationBuffer) throws IOException {
+ return channel.read(destinationBuffer);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int read(ByteBuffer destinationBuffer, long position) throws IOException {
+ return channel.read(destinationBuffer, position);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int read(byte[] buffer, int offset, int length) throws IOException {
+ return file.read(buffer, offset, length);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int write(ByteBuffer sourceBuffer) throws IOException {
+ return channel.write(sourceBuffer);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int write(ByteBuffer sourceBuffer, long position) throws IOException {
+ return channel.write(sourceBuffer, position);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(byte[] buffer, int offset, int length) throws IOException {
+ file.write(buffer, offset, length);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void force() throws IOException {
+ channel.force(false);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long size() throws IOException {
+ return channel.size();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void clear() throws IOException {
+ channel.position(0);
+ file.setLength(0);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() throws IOException {
+ file.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/17d881ba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIOFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIOFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIOFactory.java
new file mode 100644
index 0000000..6b731f2
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIOFactory.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.file;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+
+/**
+ * File I/O factory which provides RandomAccessFileIO implementation of FileIO.
+ */
+public class RandomAccessFileIOFactory implements FileIOFactory {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** {@inheritDoc} */
+ @Override public FileIO create(File file) throws IOException {
+ return create(file, "rw");
+ }
+
+ /** {@inheritDoc} */
+ @Override public FileIO create(File file, String mode) throws IOException {
+ RandomAccessFile rf = new RandomAccessFile(file, mode);
+
+ return new RandomAccessFileIO(rf);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/17d881ba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
index f4bace1..beed90b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
@@ -21,15 +21,15 @@ import java.io.EOFException;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.io.RandomAccessFile;
import java.nio.ByteOrder;
-import java.nio.channels.FileChannel;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.pagemem.wal.WALIterator;
import org.apache.ignite.internal.pagemem.wal.WALPointer;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
import org.apache.ignite.internal.processors.cache.persistence.wal.record.HeaderRecord;
import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
import org.apache.ignite.lang.IgniteBiTuple;
@@ -71,6 +71,9 @@ public abstract class AbstractWalRecordsIterator
/** Serializer of current version to read headers. */
@NotNull private final RecordSerializer serializer;
+ /** Factory to provide I/O interfaces for read/write operations with files */
+ @NotNull protected final FileIOFactory ioFactory;
+
/** Utility buffer for reading records */
private final ByteBufferExpander buf;
@@ -84,11 +87,13 @@ public abstract class AbstractWalRecordsIterator
@NotNull final IgniteLogger log,
@NotNull final GridCacheSharedContext sharedCtx,
@NotNull final RecordSerializer serializer,
+ @NotNull final FileIOFactory ioFactory,
final int bufSize
) {
this.log = log;
this.sharedCtx = sharedCtx;
this.serializer = serializer;
+ this.ioFactory = ioFactory;
// Do not allocate direct buffer for iterator.
buf = new ByteBufferExpander(bufSize, ByteOrder.nativeOrder());
@@ -229,15 +234,14 @@ public abstract class AbstractWalRecordsIterator
@Nullable final FileWALPointer start)
throws IgniteCheckedException, FileNotFoundException {
try {
- RandomAccessFile rf = new RandomAccessFile(desc.file, "r");
+ FileIO fileIO = ioFactory.create(desc.file, "r");
try {
- FileChannel ch = rf.getChannel();
- FileInput in = new FileInput(ch, buf);
+ FileInput in = new FileInput(fileIO, buf);
// Header record must be agnostic to the serializer version.
WALRecord rec = serializer.readRecord(in,
- new FileWALPointer(desc.idx, (int)ch.position(), 0));
+ new FileWALPointer(desc.idx, (int)fileIO.position(), 0));
if (rec == null)
return null;
@@ -252,11 +256,11 @@ public abstract class AbstractWalRecordsIterator
if (start != null && desc.idx == start.index())
in.seek(start.fileOffset());
- return new FileWriteAheadLogManager.ReadFileHandle(rf, desc.idx, sharedCtx.igniteInstanceName(), ser, in);
+ return new FileWriteAheadLogManager.ReadFileHandle(fileIO, desc.idx, sharedCtx.igniteInstanceName(), ser, in);
}
catch (SegmentEofException | EOFException ignore) {
try {
- rf.close();
+ fileIO.close();
}
catch (IOException ce) {
throw new IgniteCheckedException(ce);
@@ -266,7 +270,7 @@ public abstract class AbstractWalRecordsIterator
}
catch (IOException | IgniteCheckedException e) {
try {
- rf.close();
+ fileIO.close();
}
catch (IOException ce) {
e.addSuppressed(ce);
http://git-wip-us.apache.org/repos/asf/ignite/blob/17d881ba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java
index 00c7c02..6443a7c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java
@@ -20,7 +20,7 @@ package org.apache.ignite.internal.processors.cache.persistence.wal;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
import org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDataIntegrityViolationException;
import org.apache.ignite.internal.processors.cache.persistence.wal.crc.PureJavaCrc32;
import org.jetbrains.annotations.NotNull;
@@ -36,8 +36,8 @@ public final class FileInput implements ByteBufferBackedDataInput {
*/
private ByteBuffer buf;
- /** File channel to read chunks from */
- private FileChannel ch;
+ /** I/O interface for read/write operations with file */
+ private FileIO io;
/** */
private long pos;
@@ -46,28 +46,20 @@ public final class FileInput implements ByteBufferBackedDataInput {
private ByteBufferExpander expBuf;
/**
- * @param ch Channel to read from
- * @param buf Buffer for reading blocks of data into
+ * @param io FileIO to read from.
+ * @param buf Buffer for reading blocks of data into.
*/
- public FileInput(FileChannel ch, ByteBuffer buf) throws IOException {
- assert ch != null;
+ public FileInput(FileIO io, ByteBufferExpander buf) throws IOException {
+ assert io != null;
- this.ch = ch;
- this.buf = buf;
+ this.io = io;
+ this.buf = buf.buffer();
- pos = ch.position();
+ expBuf = buf;
- clearBuffer();
- }
-
- /**
- * @param ch Channel to read from
- * @param expBuf ByteBufferWrapper with ability expand buffer dynamically.
- */
- public FileInput(FileChannel ch, ByteBufferExpander expBuf) throws IOException {
- this(ch, expBuf.buffer());
+ pos = io.position();
- this.expBuf = expBuf;
+ clearBuffer();
}
/**
@@ -84,10 +76,10 @@ public final class FileInput implements ByteBufferBackedDataInput {
* @param pos Position in bytes from file begin.
*/
public void seek(long pos) throws IOException {
- if (pos > ch.size())
+ if (pos > io.size())
throw new EOFException();
- ch.position(pos);
+ io.position(pos);
this.pos = pos;
@@ -118,10 +110,10 @@ public final class FileInput implements ByteBufferBackedDataInput {
buf.compact();
do {
- int read = ch.read(buf);
+ int read = io.read(buf);
if (read == -1)
- throw new EOFException("EOF at position [" + ch.position() + "] expected to read [" + requested + "] bytes");
+ throw new EOFException("EOF at position [" + io.position() + "] expected to read [" + requested + "] bytes");
available += read;
http://git-wip-us.apache.org/repos/asf/ignite/blob/17d881ba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index 162f43d..5c112fb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -22,10 +22,8 @@ import java.io.File;
import java.io.FileFilter;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
-import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.sql.Time;
import java.util.Arrays;
@@ -48,6 +46,7 @@ import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.PersistentStoreConfiguration;
import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.events.EventType;
+import org.apache.ignite.events.WalSegmentArchivedEvent;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
@@ -61,7 +60,8 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.PersistenceMetricsImpl;
-import org.apache.ignite.events.WalSegmentArchivedEvent;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
import org.apache.ignite.internal.processors.cache.persistence.wal.record.HeaderRecord;
import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
@@ -153,6 +153,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
/** */
private volatile long oldestArchiveSegmentIdx;
+ /** Factory to provide I/O interfaces for read/write operations with files */
+ private final FileIOFactory ioFactory;
+
/** Updater for {@link #currentHnd}, used for verify there are no concurrent update for current log segment handle */
private static final AtomicReferenceFieldUpdater<FileWriteAheadLogManager, FileWriteHandle> currentHndUpd =
AtomicReferenceFieldUpdater.newUpdater(FileWriteAheadLogManager.class, FileWriteHandle.class, "currentHnd");
@@ -181,6 +184,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
/** Current log segment handle */
private volatile FileWriteHandle currentHnd;
+ /** Environment failure. */
+ private volatile Throwable envFailed;
+
/**
* Positive (non-0) value indicates WAL can be archived even if not complete<br>
* See {@link PersistentStoreConfiguration#setWalAutoArchiveAfterInactivity(long)}<br>
@@ -225,6 +231,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
flushFreq = psCfg.getWalFlushFrequency();
fsyncDelay = psCfg.getWalFsyncDelay();
alwaysWriteFullPages = psCfg.isAlwaysWriteFullPages();
+ ioFactory = psCfg.getFileIOFactory();
walAutoArchiveAfterInactivity = psCfg.getWalAutoArchiveAfterInactivity();
evt = ctx.event();
}
@@ -322,7 +329,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
archiver.shutdown();
}
catch (Exception e) {
- U.error(log, "Failed to gracefully close WAL segment: " + currHnd.file, e);
+ U.error(log, "Failed to gracefully close WAL segment: " + currentHnd.fileIO, e);
}
}
@@ -493,6 +500,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
return ptr;
}
+ checkEnvironment();
+
if (isStopping())
throw new IgniteCheckedException("Stopping.");
}
@@ -549,6 +558,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
end,
psCfg,
serializer,
+ ioFactory,
archiver,
log,
tlbSize
@@ -800,13 +810,13 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
int len = lastReadPtr == null ? 0 : lastReadPtr.length();
try {
- RandomAccessFile file = new RandomAccessFile(curFile, "rw");
+ FileIO fileIO = ioFactory.create(curFile);
try {
// readSerializerVersion will change the channel position.
// This is fine because the FileWriteHandle consitructor will move it
// to offset + len anyways.
- int serVer = readSerializerVersion(file, curFile, absIdx);
+ int serVer = readSerializerVersion(fileIO, curFile, absIdx);
RecordSerializer ser = forVersion(cctx, serVer);
@@ -815,7 +825,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
", offset=" + offset + ", ver=" + serVer + ']');
FileWriteHandle hnd = new FileWriteHandle(
- file,
+ fileIO,
absIdx,
cctx.igniteInstanceName(),
offset + len,
@@ -835,7 +845,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
return hnd;
}
catch (IgniteCheckedException | IOException e) {
- file.close();
+ fileIO.close();
throw e;
}
@@ -862,10 +872,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
if (log.isDebugEnabled())
log.debug("Switching to a new WAL segment: " + nextFile.getAbsolutePath());
- RandomAccessFile file = new RandomAccessFile(nextFile, "rw");
+ FileIO fileIO = ioFactory.create(nextFile);
FileWriteHandle hnd = new FileWriteHandle(
- file,
+ fileIO,
curIdx + 1,
cctx.igniteInstanceName(),
0,
@@ -929,22 +939,22 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
if (log.isDebugEnabled())
log.debug("Formatting file [exists=" + file.exists() + ", file=" + file.getAbsolutePath() + ']');
- try (RandomAccessFile rnd = new RandomAccessFile(file, "rw")) {
+ try (FileIO fileIO = ioFactory.create(file, "rw")) {
int left = psCfg.getWalSegmentSize();
if (mode == WALMode.DEFAULT) {
while (left > 0) {
int toWrite = Math.min(FILL_BUF.length, left);
- rnd.write(FILL_BUF, 0, toWrite);
+ fileIO.write(FILL_BUF, 0, toWrite);
left -= toWrite;
}
- rnd.getChannel().force(false);
+ fileIO.force();
}
else
- rnd.setLength(0);
+ fileIO.clear();
}
catch (IOException e) {
throw new IgniteCheckedException("Failed to format WAL segment file: " + file.getAbsolutePath(), e);
@@ -1033,6 +1043,15 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
}
/**
+ * @throws StorageException If environment is no longer valid and we missed a WAL write.
+ */
+ private void checkEnvironment() throws StorageException {
+ if (envFailed != null)
+ throw new StorageException("Failed to flush WAL buffer (environment was invalidated by a " +
+ "previous error)", envFailed);
+ }
+
+ /**
* File archiver operates on absolute segment indexes. For any given absolute segment index N we can calculate
* the work WAL segment: S(N) = N % psCfg.walSegments.
* When a work segment is finished, it is given to the archiver. If the absolute index of last archived segment
@@ -1337,8 +1356,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
Files.move(dstTmpFile.toPath(), dstFile.toPath());
if (mode == WALMode.DEFAULT) {
- try (RandomAccessFile f0 = new RandomAccessFile(dstFile, "rw")) {
- f0.getChannel().force(false);
+ try (FileIO f0 = ioFactory.create(dstFile, "rw")) {
+ f0.force();
}
}
}
@@ -1402,20 +1421,21 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
}
/**
- * @param rf Random access file.
+ * @param io I/O interface for file.
* @param file File object.
* @param idx File index to read.
* @return Serializer version stored in the file.
* @throws IOException If failed to read serializer version.
* @throws IgniteCheckedException If failed to read serializer version.
*/
- private int readSerializerVersion(RandomAccessFile rf, File file, long idx)
+ private int readSerializerVersion(FileIO io, File file, long idx)
throws IOException, IgniteCheckedException {
try {
ByteBuffer buf = ByteBuffer.allocate(RecordV1Serializer.HEADER_RECORD_SIZE);
buf.order(ByteOrder.nativeOrder());
- FileInput in = new FileInput(rf.getChannel(), buf);
+ FileInput in = new FileInput(io,
+ new ByteBufferExpander(RecordV1Serializer.HEADER_RECORD_SIZE, ByteOrder.nativeOrder()));
// Header record must be agnostic to the serializer version.
WALRecord rec = serializer.readRecord(in, new FileWALPointer(idx, 0, 0));
@@ -1541,11 +1561,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
*
*/
private abstract static class FileHandle {
- /** */
- protected RandomAccessFile file;
-
- /** */
- protected FileChannel ch;
+ /** I/O interface for read/write operations with file */
+ protected FileIO fileIO;
/** Absolute WAL segment file index (incremental counter) */
protected final long idx;
@@ -1554,17 +1571,13 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
protected String gridName;
/**
- * @param file File.
+ * @param fileIO I/O interface for read/write operations of FileHandle.
* @param idx Absolute WAL segment file index (incremental counter).
*/
- private FileHandle(RandomAccessFile file, long idx, String gridName) {
- this.file = file;
+ private FileHandle(FileIO fileIO, long idx, String gridName) {
+ this.fileIO = fileIO;
this.idx = idx;
this.gridName = gridName;
-
- ch = file.getChannel();
-
- assert ch != null;
}
}
@@ -1585,19 +1598,19 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
private boolean workDir;
/**
- * @param file File to read.
+ * @param fileIO I/O interface for read/write operations of FileHandle.
* @param idx Absolute WAL segment file index (incremental counter).
* @param ser Entry serializer.
* @param in File input.
*/
ReadFileHandle(
- RandomAccessFile file,
- long idx,
- String gridName,
- RecordSerializer ser,
- FileInput in
+ FileIO fileIO,
+ long idx,
+ String gridName,
+ RecordSerializer ser,
+ FileInput in
) {
- super(file, idx, gridName);
+ super(fileIO, idx, gridName);
this.ser = ser;
this.in = in;
@@ -1608,7 +1621,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
*/
public void close() throws IgniteCheckedException {
try {
- file.close();
+ fileIO.close();
}
catch (IOException e) {
throw new IgniteCheckedException(e);
@@ -1644,10 +1657,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
/** */
private volatile long lastFsyncPos;
- /** Environment failure. */
- private volatile Throwable envFailed;
-
- /** Stop guard to provide warranty that only one thread will be successful in calling {@link #close(boolean)} */
+ /** Stop guard to provide warranty that only one thread will be successful in calling {@link #close(boolean)}*/
private final AtomicBoolean stop = new AtomicBoolean(false);
/** */
@@ -1661,12 +1671,12 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
/**
* Next segment available condition.
- * Protection from "spurious wakeup" is provided by predicate {@link #ch}=<code>null</code>
+ * Protection from "spurious wakeup" is provided by predicate {@link #fileIO}=<code>null</code>
*/
private final Condition nextSegment = lock.newCondition();
/**
- * @param file Mapped file to use.
+ * @param fileIO I/O file interface to use
* @param idx Absolute WAL segment file index for easy access.
* @param pos Position.
* @param maxSegmentSize Max segment size.
@@ -1674,18 +1684,18 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
* @throws IOException If failed.
*/
private FileWriteHandle(
- RandomAccessFile file,
+ FileIO fileIO,
long idx,
String gridName,
long pos,
long maxSegmentSize,
RecordSerializer serializer
) throws IOException {
- super(file, idx, gridName);
+ super(fileIO, idx, gridName);
assert serializer != null;
- ch.position(pos);
+ fileIO.position(pos);
this.maxSegmentSize = maxSegmentSize;
this.serializer = serializer;
@@ -1887,6 +1897,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
catch (Throwable e) {
invalidateEnvironment(e);
+ // All workers waiting for a next segment must be woken up and stopped
+ signalNextAvailable();
+
throw e;
}
}
@@ -1990,7 +2003,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
long start = metricsEnabled ? System.nanoTime() : 0;
try {
- ch.force(false);
+ fileIO.force();
}
catch (IOException e) {
throw new StorageException(e);
@@ -2027,20 +2040,24 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
try {
int switchSegmentRecSize = RecordV1Serializer.REC_TYPE_SIZE + RecordV1Serializer.FILE_WAL_POINTER_SIZE;
+
if (rollOver && written < (maxSegmentSize - switchSegmentRecSize)) {
//it is expected there is sufficient space for this record because rollover should run early
final ByteBuffer buf = ByteBuffer.allocate(switchSegmentRecSize);
buf.put((byte)(WALRecord.RecordType.SWITCH_SEGMENT_RECORD.ordinal() + 1));
- final FileWALPointer pointer = new FileWALPointer(idx, (int)ch.position(), -1);
+
+ final FileWALPointer pointer = new FileWALPointer(idx, (int)fileIO.position(), -1);
RecordV1Serializer.putPosition(buf, pointer);
+
buf.rewind();
- ch.write(buf, written);
+
+ fileIO.write(buf, written);
if (mode == WALMode.DEFAULT)
- ch.force(false);
+ fileIO.force();
}
- ch.close();
+ fileIO.close();
}
catch (IOException e) {
throw new IgniteCheckedException(e);
@@ -2064,13 +2081,15 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
try {
WALRecord rec = head.get();
- assert rec instanceof FakeRecord : "Expected head FakeRecord, actual head "
+ if (envFailed == null) {
+ assert rec instanceof FakeRecord : "Expected head FakeRecord, actual head "
+ (rec != null ? rec.getClass().getSimpleName() : "null");
- assert written == lastFsyncPos || mode != WALMode.DEFAULT :
+ assert written == lastFsyncPos || mode != WALMode.DEFAULT :
"fsync [written=" + written + ", lastFsync=" + lastFsyncPos + ']';
+ }
- ch = null;
+ fileIO = null;
nextSegment.signalAll();
}
@@ -2086,7 +2105,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
lock.lock();
try {
- while (ch != null)
+ while (fileIO != null)
U.await(nextSegment);
}
finally {
@@ -2108,7 +2127,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
lock.lock();
try {
- assert ch != null : "Writing to a closed segment.";
+ assert fileIO != null : "Writing to a closed segment.";
checkEnvironment();
@@ -2151,10 +2170,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
assert size > 0 : size;
try {
- assert written == ch.position();
+ assert written == fileIO.position();
do {
- ch.write(buf);
+ fileIO.write(buf);
}
while (buf.hasRemaining());
@@ -2162,7 +2181,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
metrics.onWalBytesWritten(size);
- assert written == ch.position();
+ assert written == fileIO.position();
}
catch (IOException e) {
invalidateEnvironmentLocked(e);
@@ -2215,25 +2234,16 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
}
/**
- * @throws StorageException If environment is no longer valid and we missed a WAL write.
- */
- private void checkEnvironment() throws StorageException {
- if (envFailed != null)
- throw new StorageException("Failed to flush WAL buffer (environment was invalidated by a " +
- "previous error)", envFailed);
- }
-
- /**
* @return Safely reads current position of the file channel as String. Will return "null" if channel is null.
*/
private String safePosition() {
- FileChannel ch = this.ch;
+ FileIO io = this.fileIO;
- if (ch == null)
+ if (io == null)
return "null";
try {
- return String.valueOf(ch.position());
+ return String.valueOf(io.position());
}
catch (IOException e) {
return "{Failed to read channel position: " + e.getMessage() + "}";
@@ -2320,6 +2330,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
@Nullable FileWALPointer end,
PersistentStoreConfiguration psCfg,
@NotNull RecordSerializer serializer,
+ FileIOFactory ioFactory,
FileArchiver archiver,
IgniteLogger log,
int tlbSize
@@ -2327,6 +2338,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
super(log,
cctx,
serializer,
+ ioFactory,
psCfg.getWalRecordIteratorBufferSize());
this.walWorkDir = walWorkDir;
this.walArchiveDir = walArchiveDir;
@@ -2479,11 +2491,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
*/
private void doFlush() {
final FileWriteHandle hnd = currentHandle();
-
try {
hnd.flush(hnd.head.get());
}
- catch (IgniteCheckedException e) {
+ catch (Exception e) {
U.warn(log, "Failed to flush WAL record queue", e);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/17d881ba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
index 8ea0585..4e3998b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
@@ -24,6 +24,7 @@ import org.apache.ignite.configuration.MemoryConfiguration;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.pagemem.wal.WALIterator;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
import org.jetbrains.annotations.NotNull;
/**
@@ -34,15 +35,18 @@ public class IgniteWalIteratorFactory {
private final IgniteLogger log;
/** Page size, in standalone iterator mode this value can't be taken from memory configuration */
private final int pageSize;
+ /** Factory to provide I/O interfaces for read/write operations with files */
+ private final FileIOFactory ioFactory;
/**
* Creates WAL files iterator factory
* @param log Logger.
* @param pageSize Page size, size is validated
*/
- public IgniteWalIteratorFactory(@NotNull final IgniteLogger log, final int pageSize) {
+ public IgniteWalIteratorFactory(@NotNull IgniteLogger log, @NotNull FileIOFactory ioFactory, int pageSize) {
this.log = log;
this.pageSize = pageSize;
+ this.ioFactory = ioFactory;
new MemoryConfiguration().setPageSize(pageSize); // just for validate
}
@@ -57,7 +61,7 @@ public class IgniteWalIteratorFactory {
* @throws IgniteCheckedException if failed to read folder
*/
public WALIterator iteratorArchiveDirectory(@NotNull final File walDirWithConsistentId) throws IgniteCheckedException {
- return new StandaloneWalRecordsIterator(walDirWithConsistentId, log, prepareSharedCtx());
+ return new StandaloneWalRecordsIterator(walDirWithConsistentId, log, prepareSharedCtx(), ioFactory);
}
/**
@@ -69,7 +73,7 @@ public class IgniteWalIteratorFactory {
* @throws IgniteCheckedException if failed to read files
*/
public WALIterator iteratorArchiveFiles(@NotNull final File ...files) throws IgniteCheckedException {
- return new StandaloneWalRecordsIterator(log, prepareSharedCtx(), false, files);
+ return new StandaloneWalRecordsIterator(log, prepareSharedCtx(), ioFactory, false, files);
}
/**
@@ -81,7 +85,7 @@ public class IgniteWalIteratorFactory {
* @throws IgniteCheckedException if failed to read files
*/
public WALIterator iteratorWorkFiles(@NotNull final File ...files) throws IgniteCheckedException {
- return new StandaloneWalRecordsIterator(log, prepareSharedCtx(), true, files);
+ return new StandaloneWalRecordsIterator(log, prepareSharedCtx(), ioFactory, true, files);
}
/**
@@ -93,6 +97,7 @@ public class IgniteWalIteratorFactory {
final StandaloneIgniteCacheDatabaseSharedManager dbMgr = new StandaloneIgniteCacheDatabaseSharedManager();
dbMgr.setPageSize(pageSize);
+
return new GridCacheSharedContext<>(
kernalCtx, null, null, null,
null, null, dbMgr, null,
http://git-wip-us.apache.org/repos/asf/ignite/blob/17d881ba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
index df932e6..02b9352 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
@@ -22,6 +22,7 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.GridComponent;
@@ -82,13 +83,25 @@ import org.jetbrains.annotations.Nullable;
* Dummy grid kernal context
*/
public class StandaloneGridKernalContext implements GridKernalContext {
+ /** */
private IgniteLogger log;
+ /** */
+ private IgnitePluginProcessor pluginProc;
+
/**
* @param log Logger.
*/
StandaloneGridKernalContext(IgniteLogger log) {
this.log = log;
+
+ try {
+ pluginProc = new StandaloneIgnitePluginProcessor(
+ this, config());
+ }
+ catch (IgniteCheckedException e) {
+ throw new IllegalStateException("Must not fail on empty providers list.", e);
+ }
}
/** {@inheritDoc} */
@@ -278,7 +291,7 @@ public class StandaloneGridKernalContext implements GridKernalContext {
/** {@inheritDoc} */
@Override public IgnitePluginProcessor plugins() {
- return null;
+ return pluginProc;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/17d881ba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneIgnitePluginProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneIgnitePluginProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneIgnitePluginProcessor.java
new file mode 100644
index 0000000..838fc85
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneIgnitePluginProcessor.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal.reader;
+
+import java.util.Collections;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.plugin.IgnitePluginProcessor;
+import org.apache.ignite.plugin.PluginProvider;
+
+/**
+ *
+ */
+public class StandaloneIgnitePluginProcessor extends IgnitePluginProcessor {
+ /**
+ * @param ctx Kernal context.
+ * @param cfg Ignite configuration.
+ */
+ public StandaloneIgnitePluginProcessor(GridKernalContext ctx, IgniteConfiguration cfg) throws IgniteCheckedException {
+ super(ctx, cfg, Collections.<PluginProvider>emptyList());
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/17d881ba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
index f17c112..ecad70a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
@@ -21,10 +21,7 @@ import java.io.DataInput;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
import java.nio.ByteOrder;
-import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -33,7 +30,10 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
import org.apache.ignite.internal.processors.cache.persistence.wal.AbstractWalRecordsIterator;
+import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferExpander;
import org.apache.ignite.internal.processors.cache.persistence.wal.FileInput;
import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
@@ -83,14 +83,17 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator {
* @param walFilesDir Wal files directory. Should already contain node consistent ID as subfolder
* @param log Logger.
* @param sharedCtx Shared context.
+ * @param ioFactory File I/O factory.
*/
StandaloneWalRecordsIterator(
- @NotNull final File walFilesDir,
- @NotNull final IgniteLogger log,
- @NotNull final GridCacheSharedContext sharedCtx) throws IgniteCheckedException {
+ @NotNull File walFilesDir,
+ @NotNull IgniteLogger log,
+ @NotNull GridCacheSharedContext sharedCtx,
+ @NotNull FileIOFactory ioFactory) throws IgniteCheckedException {
super(log,
sharedCtx,
new RecordV1Serializer(sharedCtx),
+ ioFactory,
BUF_SIZE);
init(walFilesDir, false, null);
advance();
@@ -101,17 +104,20 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator {
*
* @param log Logger.
* @param sharedCtx Shared context.
+ * @param ioFactory File I/O factory.
* @param workDir Work directory is scanned, false - archive
* @param walFiles Wal files.
*/
StandaloneWalRecordsIterator(
- @NotNull final IgniteLogger log,
- @NotNull final GridCacheSharedContext sharedCtx,
- final boolean workDir,
- @NotNull final File... walFiles) throws IgniteCheckedException {
+ @NotNull IgniteLogger log,
+ @NotNull GridCacheSharedContext sharedCtx,
+ @NotNull FileIOFactory ioFactory,
+ boolean workDir,
+ @NotNull File... walFiles) throws IgniteCheckedException {
super(log,
sharedCtx,
new RecordV1Serializer(sharedCtx),
+ ioFactory,
BUF_SIZE);
this.workDir = workDir;
init(null, workDir, walFiles);
@@ -138,10 +144,12 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator {
}
else {
this.workDir = workDir;
+
if (workDir)
walFileDescriptors = scanIndexesFromFileHeaders(walFiles);
else
walFileDescriptors = new ArrayList<>(Arrays.asList(FileWriteAheadLogManager.scan(walFiles)));
+
curWalSegmIdx = !walFileDescriptors.isEmpty() ? walFileDescriptors.get(0).getIdx() : 0;
}
curWalSegmIdx--;
@@ -172,13 +180,10 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator {
FileWALPointer ptr;
- try (RandomAccessFile rf = new RandomAccessFile(file, "r");) {
- final FileChannel ch = rf.getChannel();
- final ByteBuffer buf = ByteBuffer.allocate(HEADER_RECORD_SIZE);
-
- buf.order(ByteOrder.nativeOrder());
+ try (FileIO fileIO = ioFactory.create(file, "r")) {
+ final DataInput in = new FileInput(fileIO,
+ new ByteBufferExpander(HEADER_RECORD_SIZE, ByteOrder.nativeOrder()));
- final DataInput in = new FileInput(ch, buf);
// Header record must be agnostic to the serializer version.
final int type = in.readUnsignedByte();
http://git-wip-us.apache.org/repos/asf/ignite/blob/17d881ba/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsRecoveryAfterFileCorruptionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsRecoveryAfterFileCorruptionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsRecoveryAfterFileCorruptionTest.java
index 6847482..e086258 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsRecoveryAfterFileCorruptionTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsRecoveryAfterFileCorruptionTest.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.cache.persistence;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
-import java.nio.channels.FileChannel;
import java.util.Collection;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
@@ -43,7 +42,7 @@ import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
import org.apache.ignite.internal.pagemem.wal.WALPointer;
import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImpl;
@@ -191,13 +190,13 @@ public class IgnitePdsRecoveryAfterFileCorruptionTest extends GridCommonAbstract
FilePageStore filePageStore = (FilePageStore)store;
- FileChannel ch = U.field(filePageStore, "ch");
+ FileIO fileIO = U.field(filePageStore, "fileIO");
- long size = ch.size();
+ long size = fileIO.size();
- ch.write(ByteBuffer.allocate((int)size - FilePageStore.HEADER_SIZE), FilePageStore.HEADER_SIZE);
+ fileIO.write(ByteBuffer.allocate((int)size - FilePageStore.HEADER_SIZE), FilePageStore.HEADER_SIZE);
- ch.force(false);
+ fileIO.force();
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/17d881ba/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java
new file mode 100644
index 0000000..cad10ae
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.db.wal;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.MemoryConfiguration;
+import org.apache.ignite.configuration.MemoryPolicyConfiguration;
+import org.apache.ignite.configuration.PersistentStoreConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.internal.GridKernalState;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+
+/**
+ *
+ */
+public class IgniteWalFlushFailoverTest extends GridCommonAbstractTest {
+
+ /** */
+ private static final String TEST_CACHE = "testCache";
+
+ /** */
+ private boolean flushByTimeout;
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ deleteWorkFiles();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ deleteWorkFiles();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return 30_000L;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ CacheConfiguration cacheCfg = new CacheConfiguration(TEST_CACHE)
+ .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+
+ cfg.setCacheConfiguration(cacheCfg);
+
+ MemoryPolicyConfiguration memPlcCfg = new MemoryPolicyConfiguration()
+ .setName("dfltMemPlc")
+ .setInitialSize(2 * 1024L * 1024L * 1024L);
+
+ MemoryConfiguration memCfg = new MemoryConfiguration()
+ .setMemoryPolicies(memPlcCfg)
+ .setDefaultMemoryPolicyName(memPlcCfg.getName());
+
+ cfg.setMemoryConfiguration(memCfg);
+
+ PersistentStoreConfiguration storeCfg = new PersistentStoreConfiguration()
+ .setFileIOFactory(new FailingFileIOFactory())
+ .setWalMode(WALMode.BACKGROUND)
+ // Setting WAL Segment size to high values forces flushing by timeout.
+ .setWalSegmentSize(flushByTimeout ? 500_000 : 50_000);
+
+ cfg.setPersistentStoreConfiguration(storeCfg);
+
+ return cfg;
+ }
+
+ /**
+ * Test flushing error recovery when flush is triggered asynchronously by timeout
+ *
+ * @throws Exception In case of fail
+ */
+ public void testErrorOnFlushByTimeout() throws Exception {
+ flushByTimeout = true;
+ flushingErrorTest();
+ }
+
+ /**
+ * Test flushing error recovery when flush is triggered directly by transaction commit
+ *
+ * @throws Exception In case of fail
+ */
+ public void testErrorOnDirectFlush() throws Exception {
+ flushByTimeout = false;
+ flushingErrorTest();
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ private void flushingErrorTest() throws Exception {
+ final IgniteEx grid = startGrid(0);
+ grid.active(true);
+
+ IgniteCache<Object, Object> cache = grid.cache(TEST_CACHE);
+
+ final int iterations = 100;
+
+ try {
+ for (int i = 0; i < iterations; i++) {
+ Transaction tx = grid.transactions().txStart(
+ TransactionConcurrency.PESSIMISTIC, TransactionIsolation.READ_COMMITTED);
+
+ cache.put(i, "testValue" + i);
+
+ Thread.sleep(100L);
+
+ tx.commitAsync().get();
+ }
+ }
+ catch (Exception expected) {
+ // There can be any exception. Do nothing.
+ }
+
+ // We should await successful stop of node.
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override
+ public boolean apply() {
+ return grid.context().gateway().getState() == GridKernalState.STOPPED;
+ }
+ }, getTestTimeout());
+ }
+
+ /**
+ * @throws IgniteCheckedException
+ */
+ private void deleteWorkFiles() throws IgniteCheckedException {
+ deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false));
+ }
+
+ /**
+ * Create File I/O which fails after second attempt to write to File
+ */
+ private static class FailingFileIOFactory implements FileIOFactory {
+ private static final long serialVersionUID = 0L;
+
+ private final FileIOFactory delegateFactory = new RandomAccessFileIOFactory();
+
+ @Override
+ public FileIO create(File file) throws IOException {
+ return create(file, "rw");
+ }
+
+ @Override
+ public FileIO create(File file, String mode) throws IOException {
+ FileIO delegate = delegateFactory.create(file, mode);
+
+ return new FileIODecorator(delegate) {
+ int writeAttempts = 2;
+
+ @Override
+ public int write(ByteBuffer sourceBuffer) throws IOException {
+ if (--writeAttempts == 0)
+ throw new RuntimeException("Test exception. Unable to write to file.");
+
+ return super.write(sourceBuffer);
+ }
+ };
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/17d881ba/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteDataIntegrityTests.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteDataIntegrityTests.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteDataIntegrityTests.java
index 303f14e..b93c74d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteDataIntegrityTests.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteDataIntegrityTests.java
@@ -23,7 +23,10 @@ import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
import java.util.concurrent.ThreadLocalRandom;
+import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIO;
+import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferExpander;
import org.apache.ignite.internal.processors.cache.persistence.wal.FileInput;
import org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDataIntegrityViolationException;
import org.apache.ignite.internal.processors.cache.persistence.wal.crc.PureJavaCrc32;
@@ -47,9 +50,10 @@ public class IgniteDataIntegrityTests extends TestCase {
randomAccessFile = new RandomAccessFile(file, "rw");
- fileInput = new FileInput(randomAccessFile.getChannel(), ByteBuffer.allocate(1024));
-
- PureJavaCrc32 pureJavaCrc32 = new PureJavaCrc32();
+ fileInput = new FileInput(
+ new RandomAccessFileIO(randomAccessFile),
+ new ByteBufferExpander(1024, ByteOrder.BIG_ENDIAN)
+ );
ByteBuffer buf = ByteBuffer.allocate(1024);
ThreadLocalRandom curr = ThreadLocalRandom.current();