You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by yz...@apache.org on 2017/10/13 17:43:06 UTC
[21/50] [abbrv] ignite git commit: IGNITE-6228 Avoid closing page
store by thread interruption. Fixes #2715
IGNITE-6228 Avoid closing page store by thread interruption. Fixes #2715
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/111d8abb
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/111d8abb
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/111d8abb
Branch: refs/heads/ignite-2.1.5-p1
Commit: 111d8abbe6ec7710c7f0e7ebe6d43f3ccb904dcb
Parents: be8afd4
Author: Alexei Scherbakov <al...@gmail.com>
Authored: Thu Sep 21 17:40:16 2017 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Thu Sep 21 18:10:22 2017 +0300
----------------------------------------------------------------------
.../apache/ignite/IgniteSystemProperties.java | 5 +
.../PersistentStoreConfiguration.java | 6 +-
.../GridCacheDatabaseSharedManager.java | 7 +-
.../cache/persistence/file/AsyncFileIO.java | 218 +++++++++++++++++++
.../persistence/file/AsyncFileIOFactory.java | 52 +++++
.../cache/persistence/file/FileIOFactory.java | 25 ++-
.../cache/persistence/file/FilePageStore.java | 5 +-
.../file/FileVersionCheckingFactory.java | 2 +-
.../persistence/file/RandomAccessFileIO.java | 48 ++--
.../file/RandomAccessFileIOFactory.java | 14 +-
.../wal/AbstractWalRecordsIterator.java | 2 +-
.../cache/persistence/wal/FileInput.java | 7 +
.../wal/FileWriteAheadLogManager.java | 8 +-
.../reader/StandaloneWalRecordsIterator.java | 4 +-
.../internal/util/future/GridFutureAdapter.java | 16 ++
.../resources/META-INF/classnames.properties | 2 +
.../file/IgnitePdsThreadInterruptionTest.java | 205 +++++++++++++++++
.../db/wal/IgniteWalFlushFailoverTest.java | 22 +-
.../db/wal/crc/IgniteDataIntegrityTests.java | 39 ++--
.../development/utils/IgniteWalConverter.java | 1 -
.../IgnitePdsWithIndexingCoreTestSuite.java | 2 +
21 files changed, 606 insertions(+), 84 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/111d8abb/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 30d5339..628b165 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -706,6 +706,11 @@ public final class IgniteSystemProperties {
public static final String IGNITE_ENABLE_FORCIBLE_NODE_KILL = "IGNITE_ENABLE_FORCIBLE_NODE_KILL";
/**
+ * If this property is set, then Ignite will use Async File IO factory by default.
+ */
+ public static final String IGNITE_USE_ASYNC_FILE_IO_FACTORY = "IGNITE_USE_ASYNC_FILE_IO_FACTORY";
+
+ /**
* If the property is set {@link org.apache.ignite.internal.pagemem.wal.record.TxRecord} records
* will be logged to WAL.
*
http://git-wip-us.apache.org/repos/asf/ignite/blob/111d8abb/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java
index 888bf42..abca5a5 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java
@@ -17,6 +17,8 @@
package org.apache.ignite.configuration;
import java.io.Serializable;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.internal.processors.cache.persistence.file.AsyncFileIOFactory;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -137,7 +139,9 @@ public class PersistentStoreConfiguration implements Serializable {
private boolean alwaysWriteFullPages = DFLT_WAL_ALWAYS_WRITE_FULL_PAGES;
/** Factory to provide I/O interface for files */
- private FileIOFactory fileIOFactory = new RandomAccessFileIOFactory();
+ private FileIOFactory fileIOFactory =
+ IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_USE_ASYNC_FILE_IO_FACTORY, false) ?
+ new AsyncFileIOFactory() : new RandomAccessFileIOFactory();
/**
* Number of sub-intervals the whole {@link #setRateTimeInterval(long)} will be split into to calculate
http://git-wip-us.apache.org/repos/asf/ignite/blob/111d8abb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index eef667e..277143c 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -144,6 +144,7 @@ import org.apache.ignite.thread.IgniteThreadPoolExecutor;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
+import static java.nio.file.StandardOpenOption.READ;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_SKIP_CRC;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD;
@@ -742,7 +743,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
* @param partFile Partition file.
*/
private int resolvePageSizeFromPartitionFile(Path partFile) throws IOException, IgniteCheckedException {
- try (FileIO fileIO = persistenceCfg.getFileIOFactory().create(partFile.toFile(), "r")) {
+ try (FileIO fileIO = persistenceCfg.getFileIOFactory().create(partFile.toFile())) {
int minimalHdr = FilePageStore.HEADER_SIZE;
if (fileIO.size() < minimalHdr)
@@ -961,7 +962,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
checkpointLock.readLock().unlock();
try {
- checkpointer.wakeupForCheckpoint(0, "too many dirty pages").cpBeginFut.get();
+ checkpointer.wakeupForCheckpoint(0, "too many dirty pages").cpBeginFut.getUninterruptibly();
}
catch (IgniteCheckedException e) {
throw new IgniteException("Failed to wait for checkpoint begin.", e);
@@ -1390,7 +1391,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
private WALPointer readPointer(File cpMarkerFile, ByteBuffer buf) throws IgniteCheckedException {
buf.position(0);
- try (FileChannel ch = FileChannel.open(cpMarkerFile.toPath(), StandardOpenOption.READ)) {
+ try (FileChannel ch = FileChannel.open(cpMarkerFile.toPath(), READ)) {
ch.read(buf);
buf.flip();
http://git-wip-us.apache.org/repos/asf/ignite/blob/111d8abb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AsyncFileIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AsyncFileIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AsyncFileIO.java
new file mode 100644
index 0000000..8fad7a5
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AsyncFileIO.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.file;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.AsynchronousFileChannel;
+import java.nio.channels.CompletionHandler;
+import java.nio.file.OpenOption;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+
+/**
+ * File I/O implementation based on {@link AsynchronousFileChannel}.
+ */
+public class AsyncFileIO implements FileIO {
+ /**
+ * File channel associated with {@code file}
+ */
+ private final AsynchronousFileChannel ch;
+
+ /**
+ * Channel's position.
+ */
+ private volatile long position;
+
+ /** */
+ private final ThreadLocal<ChannelOpFuture> holder;
+
+ /** */
+ private GridConcurrentHashSet<ChannelOpFuture> asyncFuts = new GridConcurrentHashSet<>();
+
+ /**
+ * Creates I/O implementation for specified {@code file}
+ * @param file Random access file
+ * @param modes Open modes.
+ */
+ public AsyncFileIO(File file, ThreadLocal<ChannelOpFuture> holder, OpenOption... modes) throws IOException {
+ this.ch = AsynchronousFileChannel.open(file.toPath(), modes);
+
+ this.holder = holder;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long position() throws IOException {
+ return position;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void position(long newPosition) throws IOException {
+ this.position = newPosition;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int read(ByteBuffer destinationBuffer) throws IOException {
+ ChannelOpFuture fut = holder.get();
+ fut.reset();
+
+ ch.read(destinationBuffer, position, this, fut);
+
+ try {
+ return fut.getUninterruptibly();
+ }
+ catch (IgniteCheckedException e) {
+ throw new IOException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public int read(ByteBuffer destinationBuffer, long position) throws IOException {
+ ChannelOpFuture fut = holder.get();
+ fut.reset();
+
+ ch.read(destinationBuffer, position, null, fut);
+
+ try {
+ return fut.getUninterruptibly();
+ }
+ catch (IgniteCheckedException e) {
+ throw new IOException(e);
+ }
+ finally {
+ asyncFuts.remove(fut);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public int read(byte[] buffer, int offset, int length) throws IOException {
+ ChannelOpFuture fut = holder.get();
+ fut.reset();
+
+ ch.read(ByteBuffer.wrap(buffer, offset, length), position, this, fut);
+
+ try {
+ return fut.getUninterruptibly();
+ }
+ catch (IgniteCheckedException e) {
+ throw new IOException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public int write(ByteBuffer sourceBuffer) throws IOException {
+ ChannelOpFuture fut = holder.get();
+ fut.reset();
+
+ ch.write(sourceBuffer, position, this, fut);
+
+ try {
+ return fut.getUninterruptibly();
+ }
+ catch (IgniteCheckedException e) {
+ throw new IOException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public int write(ByteBuffer sourceBuffer, long position) throws IOException {
+ ChannelOpFuture fut = holder.get();
+ fut.reset();
+
+ asyncFuts.add(fut);
+
+ ch.write(sourceBuffer, position, null, fut);
+
+ try {
+ return fut.getUninterruptibly();
+ }
+ catch (IgniteCheckedException e) {
+ throw new IOException(e);
+ }
+ finally {
+ asyncFuts.remove(fut);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(byte[] buffer, int offset, int length) throws IOException {
+ ChannelOpFuture fut = holder.get();
+ fut.reset();
+
+ ch.write(ByteBuffer.wrap(buffer, offset, length), position, this, fut);
+
+ try {
+ fut.getUninterruptibly();
+ }
+ catch (IgniteCheckedException e) {
+ throw new IOException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void force() throws IOException {
+ ch.force(false);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long size() throws IOException {
+ return ch.size();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void clear() throws IOException {
+ ch.truncate(0);
+
+ this.position = 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() throws IOException {
+ for (ChannelOpFuture asyncFut : asyncFuts) {
+ try {
+ asyncFut.getUninterruptibly(); // Ignore interrupts while waiting for channel close.
+ }
+ catch (IgniteCheckedException e) {
+ throw new IOException(e);
+ }
+ }
+
+ ch.close();
+ }
+
+ /** */
+ static class ChannelOpFuture extends GridFutureAdapter<Integer> implements CompletionHandler<Integer, AsyncFileIO> {
+ /** {@inheritDoc} */
+ @Override public void completed(Integer result, AsyncFileIO attachment) {
+ if (attachment != null) {
+ if (result != -1)
+ attachment.position += result;
+ }
+
+ // Release waiter and allow next operation to begin.
+ super.onDone(result, null);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void failed(Throwable exc, AsyncFileIO attachment) {
+ super.onDone(exc);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/111d8abb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AsyncFileIOFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AsyncFileIOFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AsyncFileIOFactory.java
new file mode 100644
index 0000000..0fb3052
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AsyncFileIOFactory.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.file;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.channels.AsynchronousFileChannel;
+import java.nio.file.OpenOption;
+
+import static java.nio.file.StandardOpenOption.CREATE;
+import static java.nio.file.StandardOpenOption.READ;
+import static java.nio.file.StandardOpenOption.WRITE;
+
+/**
+ * File I/O factory which uses {@link AsynchronousFileChannel} based implementation of FileIO.
+ */
+public class AsyncFileIOFactory implements FileIOFactory {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** {@inheritDoc} */
+ @Override public FileIO create(File file) throws IOException {
+ return create(file, CREATE, READ, WRITE);
+ }
+
+ /** */
+ private ThreadLocal<AsyncFileIO.ChannelOpFuture> holder = new ThreadLocal<AsyncFileIO.ChannelOpFuture>() {
+ @Override protected AsyncFileIO.ChannelOpFuture initialValue() {
+ return new AsyncFileIO.ChannelOpFuture();
+ }
+ };
+
+ /** {@inheritDoc} */
+ @Override public FileIO create(File file, OpenOption... modes) throws IOException {
+ return new AsyncFileIO(file, holder, modes);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/111d8abb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIOFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIOFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIOFactory.java
index 0ffc653..c3a75f5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIOFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIOFactory.java
@@ -20,26 +20,29 @@ package org.apache.ignite.internal.processors.cache.persistence.file;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
+import java.nio.file.OpenOption;
+/**
+ * {@link FileIO} factory definition.
+ */
public interface FileIOFactory extends Serializable {
-
/**
- * Creates I/O interface for file with default I/O mode
+ * Creates I/O interface for file with default I/O mode.
*
- * @param file File
- * @return File I/O interface
- * @throws IOException If I/O interface creation was failed
+ * @param file File.
+ * @return File I/O interface.
+ * @throws IOException If I/O interface creation was failed.
*/
- FileIO create(File file) throws IOException;
+ public FileIO create(File file) throws IOException;
/**
- * Creates I/O interface for file with specified mode
+ * Creates I/O interface for file with specified mode.
*
* @param file File
- * @param mode I/O mode in
- * @return File I/O interface
- * @throws IOException If I/O interface creation was failed
+ * @param modes Open modes.
+ * @return File I/O interface.
+ * @throws IOException If I/O interface creation was failed.
*/
- FileIO create(File file, String mode) throws IOException;
+ public FileIO create(File file, OpenOption... modes) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/111d8abb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
index 6052a7c..98764a2 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
@@ -35,6 +35,9 @@ import org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDat
import org.apache.ignite.internal.processors.cache.persistence.wal.crc.PureJavaCrc32;
import org.apache.ignite.internal.util.typedef.internal.U;
+import static java.nio.file.StandardOpenOption.CREATE;
+import static java.nio.file.StandardOpenOption.READ;
+import static java.nio.file.StandardOpenOption.WRITE;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_SKIP_CRC;
/**
@@ -400,7 +403,7 @@ public class FilePageStore implements PageStore {
IgniteCheckedException err = null;
try {
- this.fileIO = fileIO = ioFactory.create(cfgFile, "rw");
+ this.fileIO = fileIO = ioFactory.create(cfgFile, CREATE, READ, WRITE);
if (cfgFile.length() == 0)
allocated.set(initFile());
http://git-wip-us.apache.org/repos/asf/ignite/blob/111d8abb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileVersionCheckingFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileVersionCheckingFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileVersionCheckingFactory.java
index 53bd802..40870dc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileVersionCheckingFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileVersionCheckingFactory.java
@@ -55,7 +55,7 @@ public class FileVersionCheckingFactory implements FilePageStoreFactory {
if (!file.exists())
return createPageStore(type, file, latestVersion());
- try (FileIO fileIO = fileIOFactory.create(file, "r")) {
+ try (FileIO fileIO = fileIOFactory.create(file)) {
int minHdr = FilePageStore.HEADER_SIZE;
if (fileIO.size() < minHdr)
http://git-wip-us.apache.org/repos/asf/ignite/blob/111d8abb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIO.java
index 73a560a..55849fe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIO.java
@@ -17,94 +17,88 @@
package org.apache.ignite.internal.processors.cache.persistence.file;
+import java.io.File;
import java.io.IOException;
-import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
+import java.nio.file.OpenOption;
/**
- * File I/O implementation based on {@code java.io.RandomAccessFile}.
+ * File I/O implementation based on {@link FileChannel}.
*/
public class RandomAccessFileIO implements FileIO {
-
- /**
- * Random access file associated with this I/O
- */
- private final RandomAccessFile file;
-
/**
- * File channel associated with {@code file}
+ * File channel.
*/
- private final FileChannel channel;
+ private final FileChannel ch;
/**
* Creates I/O implementation for specified {@code file}
*
- * @param file Random access file
+ * @param file File.
+ * @param modes Open modes.
*/
- public RandomAccessFileIO(RandomAccessFile file) {
- this.file = file;
- this.channel = file.getChannel();
+ public RandomAccessFileIO(File file, OpenOption... modes) throws IOException {
+ ch = FileChannel.open(file.toPath(), modes);
}
/** {@inheritDoc} */
@Override public long position() throws IOException {
- return channel.position();
+ return ch.position();
}
/** {@inheritDoc} */
@Override public void position(long newPosition) throws IOException {
- channel.position(newPosition);
+ ch.position(newPosition);
}
/** {@inheritDoc} */
@Override public int read(ByteBuffer destinationBuffer) throws IOException {
- return channel.read(destinationBuffer);
+ return ch.read(destinationBuffer);
}
/** {@inheritDoc} */
@Override public int read(ByteBuffer destinationBuffer, long position) throws IOException {
- return channel.read(destinationBuffer, position);
+ return ch.read(destinationBuffer, position);
}
/** {@inheritDoc} */
@Override public int read(byte[] buffer, int offset, int length) throws IOException {
- return file.read(buffer, offset, length);
+ return ch.read(ByteBuffer.wrap(buffer, offset, length));
}
/** {@inheritDoc} */
@Override public int write(ByteBuffer sourceBuffer) throws IOException {
- return channel.write(sourceBuffer);
+ return ch.write(sourceBuffer);
}
/** {@inheritDoc} */
@Override public int write(ByteBuffer sourceBuffer, long position) throws IOException {
- return channel.write(sourceBuffer, position);
+ return ch.write(sourceBuffer, position);
}
/** {@inheritDoc} */
@Override public void write(byte[] buffer, int offset, int length) throws IOException {
- file.write(buffer, offset, length);
+ ch.write(ByteBuffer.wrap(buffer, offset, length));
}
/** {@inheritDoc} */
@Override public void force() throws IOException {
- channel.force(false);
+ ch.force(false);
}
/** {@inheritDoc} */
@Override public long size() throws IOException {
- return channel.size();
+ return ch.size();
}
/** {@inheritDoc} */
@Override public void clear() throws IOException {
- channel.position(0);
- file.setLength(0);
+ ch.truncate(0);
}
/** {@inheritDoc} */
@Override public void close() throws IOException {
- file.close();
+ ch.close();
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/111d8abb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIOFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIOFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIOFactory.java
index 6b731f2..856ba1c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIOFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIOFactory.java
@@ -19,7 +19,11 @@ package org.apache.ignite.internal.processors.cache.persistence.file;
import java.io.File;
import java.io.IOException;
-import java.io.RandomAccessFile;
+import java.nio.file.OpenOption;
+
+import static java.nio.file.StandardOpenOption.CREATE;
+import static java.nio.file.StandardOpenOption.READ;
+import static java.nio.file.StandardOpenOption.WRITE;
/**
* File I/O factory which provides RandomAccessFileIO implementation of FileIO.
@@ -30,13 +34,11 @@ public class RandomAccessFileIOFactory implements FileIOFactory {
/** {@inheritDoc} */
@Override public FileIO create(File file) throws IOException {
- return create(file, "rw");
+ return create(file, CREATE, READ, WRITE);
}
/** {@inheritDoc} */
- @Override public FileIO create(File file, String mode) throws IOException {
- RandomAccessFile rf = new RandomAccessFile(file, mode);
-
- return new RandomAccessFileIO(rf);
+ @Override public FileIO create(File file, OpenOption... modes) throws IOException {
+ return new RandomAccessFileIO(file, modes);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/111d8abb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
index 2749d5c..d5a2555 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
@@ -258,7 +258,7 @@ public abstract class AbstractWalRecordsIterator
@Nullable final FileWALPointer start)
throws IgniteCheckedException, FileNotFoundException {
try {
- FileIO fileIO = ioFactory.create(desc.file, "r");
+ FileIO fileIO = ioFactory.create(desc.file);
try {
FileInput in = new FileInput(fileIO, buf);
http://git-wip-us.apache.org/repos/asf/ignite/blob/111d8abb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java
index 74edbfa..3b20fce 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java
@@ -63,6 +63,13 @@ public final class FileInput implements ByteBufferBackedDataInput {
}
/**
+ * File I/O.
+ */
+ public FileIO io() {
+ return io;
+ }
+
+ /**
* Clear buffer.
*/
private void clearBuffer() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/111d8abb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index 18584a8..87069d9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -79,6 +79,10 @@ import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
+import static java.nio.file.StandardOpenOption.CREATE;
+import static java.nio.file.StandardOpenOption.READ;
+import static java.nio.file.StandardOpenOption.WRITE;
+
/**
* File WAL manager.
*/
@@ -940,7 +944,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
if (log.isDebugEnabled())
log.debug("Formatting file [exists=" + file.exists() + ", file=" + file.getAbsolutePath() + ']');
- try (FileIO fileIO = ioFactory.create(file, "rw")) {
+ try (FileIO fileIO = ioFactory.create(file, CREATE, READ, WRITE)) {
int left = psCfg.getWalSegmentSize();
if (mode == WALMode.DEFAULT) {
@@ -1365,7 +1369,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
Files.move(dstTmpFile.toPath(), dstFile.toPath());
if (mode == WALMode.DEFAULT) {
- try (FileIO f0 = ioFactory.create(dstFile, "rw")) {
+ try (FileIO f0 = ioFactory.create(dstFile, CREATE, READ, WRITE)) {
f0.force();
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/111d8abb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
index 900aab5..c92d572 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
@@ -181,7 +181,7 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator {
/**
* This methods checks all provided files to be correct WAL segment.
- * Header record and its position is checked. WAL position is used to deremine real index.
+ * Header record and its position is checked. WAL position is used to determine real index.
* File index from file name is ignored.
*
* @param allFiles files to scan
@@ -202,7 +202,7 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator {
FileWALPointer ptr;
try (
- FileIO fileIO = ioFactory.create(file, "r");
+ FileIO fileIO = ioFactory.create(file);
ByteBufferExpander buf = new ByteBufferExpander(HEADER_RECORD_SIZE, ByteOrder.nativeOrder())
) {
final DataInput in = new FileInput(fileIO, buf);
http://git-wip-us.apache.org/repos/asf/ignite/blob/111d8abb/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
index 323babd..f8c0b14 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
@@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.locks.LockSupport;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.IgniteFutureCancelledCheckedException;
import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
@@ -499,6 +500,21 @@ public class GridFutureAdapter<R> implements IgniteInternalFuture<R> {
}
/**
+ * Resets future for subsequent reuse.
+ */
+ public void reset() {
+ final Object oldState = state;
+
+ if (oldState == INIT)
+ return;
+
+ if (!isDone(oldState))
+ throw new IgniteException("Illegal state");
+
+ compareAndSetState(oldState, INIT);
+ }
+
+ /**
* Callback to notify that future is cancelled.
*
* @return {@code True} if cancel flag was set by this call.
http://git-wip-us.apache.org/repos/asf/ignite/blob/111d8abb/modules/core/src/main/resources/META-INF/classnames.properties
----------------------------------------------------------------------
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties
index 2fb8f4b..ad3846f 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -2090,3 +2090,5 @@ org.apache.ignite.transactions.TransactionRollbackException
org.apache.ignite.transactions.TransactionState
org.apache.ignite.transactions.TransactionTimeoutException
org.apache.ignite.util.AttributeNodeFilter
+org.apache.ignite.internal.processors.cache.persistence.file.AsyncFileIO
+org.apache.ignite.internal.processors.cache.persistence.file.AsyncFileIOFactory
http://git-wip-us.apache.org/repos/asf/ignite/blob/111d8abb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsThreadInterruptionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsThreadInterruptionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsThreadInterruptionTest.java
new file mode 100644
index 0000000..aab569a
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsThreadInterruptionTest.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.db.file;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.MemoryConfiguration;
+import org.apache.ignite.configuration.MemoryPolicyConfiguration;
+import org.apache.ignite.configuration.PersistentStoreConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.internal.processors.cache.persistence.file.AsyncFileIOFactory;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jsr166.ThreadLocalRandom8;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Test what interruptions of writing threads do not affect PDS.
+ */
+public class IgnitePdsThreadInterruptionTest extends GridCommonAbstractTest {
+ /** */
+ private static final int PAGE_SIZE = 1 << 12; // 4096
+
+ /** */
+ public static final int THREADS_CNT = 1;
+
+ /**
+ * Cache name.
+ */
+ private final String cacheName = "cache";
+
+ /** */
+ private volatile boolean stop = false;
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ final IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setPersistentStoreConfiguration(storeConfiguration());
+
+ cfg.setMemoryConfiguration(memoryConfiguration());
+
+ cfg.setCacheConfiguration(new CacheConfiguration<>(cacheName));
+
+ return cfg;
+ }
+
+ /**
+ * @return Store config.
+ */
+ private PersistentStoreConfiguration storeConfiguration() {
+ PersistentStoreConfiguration cfg = new PersistentStoreConfiguration();
+
+ cfg.setWalMode(WALMode.LOG_ONLY);
+
+ cfg.setWalFsyncDelayNanos(0);
+
+ cfg.setFileIOFactory(new AsyncFileIOFactory());
+
+ return cfg;
+ }
+
+ /**
+ * @return Memory config.
+ */
+ private MemoryConfiguration memoryConfiguration() {
+ final MemoryConfiguration memCfg = new MemoryConfiguration();
+
+ MemoryPolicyConfiguration memPlcCfg = new MemoryPolicyConfiguration();
+ // memPlcCfg.setPageEvictionMode(RANDOM_LRU); TODO Fix NPE on start.
+ memPlcCfg.setName("dfltMemPlc");
+
+ memCfg.setPageSize(PAGE_SIZE);
+ memCfg.setConcurrencyLevel(1);
+ memCfg.setMemoryPolicies(memPlcCfg);
+ memCfg.setDefaultMemoryPolicyName("dfltMemPlc");
+
+ return memCfg;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ deleteWorkFiles();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override protected void afterTestsStopped() throws Exception {
+ super.afterTestsStopped();
+
+ stopAllGrids();
+
+ deleteWorkFiles();
+ }
+
+ /**
+ * Tests interruptions on WAL write.
+ *
+ * @throws Exception
+ */
+ public void testInterruptsOnWALWrite() throws Exception {
+ final Ignite ignite = startGrid();
+
+ ignite.active(true);
+
+ final int valLen = 8192;
+
+ final byte[] payload = new byte[valLen];
+
+ final int maxKey = 100_000;
+
+ Thread[] workers = new Thread[THREADS_CNT];
+
+ final AtomicReference<Throwable> fail = new AtomicReference<>();
+
+ Runnable clo = new Runnable() {
+ @Override public void run() {
+ IgniteCache<Object, Object> cache = ignite.cache(cacheName);
+
+ while (!stop)
+ cache.put(ThreadLocalRandom8.current().nextInt(maxKey), payload);
+ }
+ };
+
+ for (int i = 0; i < workers.length; i++) {
+ workers[i] = new Thread(clo);
+ workers[i].setName("writer-" + i);
+ workers[i].setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
+ @Override public void uncaughtException(Thread t, Throwable e) {
+ fail.compareAndSet(null, e);
+ }
+ });
+ }
+
+ for (Thread worker : workers)
+ worker.start();
+
+ Thread.sleep(3_000);
+
+ // Interrupts should not affect writes.
+ for (Thread worker : workers)
+ worker.interrupt();
+
+ Thread.sleep(3_000);
+
+ stop = true;
+
+ for (Thread worker : workers)
+ worker.join();
+
+ Throwable t = fail.get();
+
+ assert t == null : t;
+
+ IgniteCache<Object, Object> cache = ignite.cache(cacheName);
+
+ int verifiedKeys = 0;
+
+ // Post check.
+ for (int i = 0; i < maxKey; i++) {
+ byte[] val = (byte[]) cache.get(i);
+
+ if (val != null) {
+ assertEquals("Illegal length", valLen, val.length);
+
+ verifiedKeys++;
+ }
+ }
+
+ log.info("Verified keys: " + verifiedKeys);
+ }
+
+ /**
+ * @throws IgniteCheckedException If fail.
+ */
+ private void deleteWorkFiles() throws IgniteCheckedException {
+ deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/111d8abb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java
index cad10ae..048e8bf 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java
@@ -42,12 +42,16 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
+import java.nio.file.OpenOption;
+
+import static java.nio.file.StandardOpenOption.CREATE;
+import static java.nio.file.StandardOpenOption.READ;
+import static java.nio.file.StandardOpenOption.WRITE;
/**
*
*/
public class IgniteWalFlushFailoverTest extends GridCommonAbstractTest {
-
/** */
private static final String TEST_CACHE = "testCache";
@@ -168,22 +172,22 @@ public class IgniteWalFlushFailoverTest extends GridCommonAbstractTest {
private static class FailingFileIOFactory implements FileIOFactory {
private static final long serialVersionUID = 0L;
+ /** */
private final FileIOFactory delegateFactory = new RandomAccessFileIOFactory();
- @Override
- public FileIO create(File file) throws IOException {
- return create(file, "rw");
+ /** {@inheritDoc} */
+ @Override public FileIO create(File file) throws IOException {
+ return create(file, CREATE, READ, WRITE);
}
- @Override
- public FileIO create(File file, String mode) throws IOException {
- FileIO delegate = delegateFactory.create(file, mode);
+ /** {@inheritDoc} */
+ @Override public FileIO create(File file, OpenOption... modes) throws IOException {
+ FileIO delegate = delegateFactory.create(file, modes);
return new FileIODecorator(delegate) {
int writeAttempts = 2;
- @Override
- public int write(ByteBuffer sourceBuffer) throws IOException {
+ @Override public int write(ByteBuffer sourceBuffer) throws IOException {
if (--writeAttempts == 0)
throw new RuntimeException("Test exception. Unable to write to file.");
http://git-wip-us.apache.org/repos/asf/ignite/blob/111d8abb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteDataIntegrityTests.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteDataIntegrityTests.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteDataIntegrityTests.java
index e4874d9..3d52507 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteDataIntegrityTests.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteDataIntegrityTests.java
@@ -21,11 +21,11 @@ import junit.framework.TestCase;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
-import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.concurrent.ThreadLocalRandom;
-import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIO;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferExpander;
import org.apache.ignite.internal.processors.cache.persistence.wal.FileInput;
import org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDataIntegrityViolationException;
@@ -38,9 +38,6 @@ public class IgniteDataIntegrityTests extends TestCase {
/** File input. */
private FileInput fileInput;
- /** Random access file. */
- private RandomAccessFile randomAccessFile;
-
/** Buffer expander. */
private ByteBufferExpander expBuf;
@@ -51,13 +48,13 @@ public class IgniteDataIntegrityTests extends TestCase {
File file = File.createTempFile("integrity", "dat");
file.deleteOnExit();
- randomAccessFile = new RandomAccessFile(file, "rw");
-
expBuf = new ByteBufferExpander(1024, ByteOrder.BIG_ENDIAN);
+ FileIOFactory factory = new RandomAccessFileIOFactory();
+
fileInput = new FileInput(
- new RandomAccessFileIO(randomAccessFile),
- expBuf
+ factory.create(file),
+ expBuf
);
ByteBuffer buf = ByteBuffer.allocate(1024);
@@ -71,13 +68,15 @@ public class IgniteDataIntegrityTests extends TestCase {
buf.putInt(PureJavaCrc32.calcCrc32(buf, 12));
}
- randomAccessFile.write(buf.array());
- randomAccessFile.getFD().sync();
+ buf.rewind();
+
+ fileInput.io().write(buf);
+ fileInput.io().force();
}
/** {@inheritDoc} */
@Override protected void tearDown() throws Exception {
- randomAccessFile.close();
+ fileInput.io().close();
expBuf.close();
}
@@ -177,22 +176,24 @@ public class IgniteDataIntegrityTests extends TestCase {
*/
private void toggleOneRandomBit(int rangeFrom, int rangeTo) throws IOException {
int pos = ThreadLocalRandom.current().nextInt(rangeFrom, rangeTo);
- randomAccessFile.seek(pos);
+ fileInput.io().position(pos);
+
+ byte[] buf = new byte[1];
- byte b = randomAccessFile.readByte();
+ fileInput.io().read(buf, 0, 1);
- b ^= (1 << 3);
+ buf[0] ^= (1 << 3);
- randomAccessFile.seek(pos);
- randomAccessFile.writeByte(b);
- randomAccessFile.getFD().sync();
+ fileInput.io().position(pos);
+ fileInput.io().write(buf, 0, 1);
+ fileInput.io().force();
}
/**
*
*/
private void checkIntegrity() throws Exception {
- randomAccessFile.seek(0);
+ fileInput.io().position(0);
for (int i = 0; i < 1024 / 16; i++) {
try(FileInput.Crc32CheckingFileInput in = fileInput.startRead(false)) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/111d8abb/modules/dev-utils/src/main/java/org/apache/ignite/development/utils/IgniteWalConverter.java
----------------------------------------------------------------------
diff --git a/modules/dev-utils/src/main/java/org/apache/ignite/development/utils/IgniteWalConverter.java b/modules/dev-utils/src/main/java/org/apache/ignite/development/utils/IgniteWalConverter.java
index b7c3cb7..f3268d9 100644
--- a/modules/dev-utils/src/main/java/org/apache/ignite/development/utils/IgniteWalConverter.java
+++ b/modules/dev-utils/src/main/java/org/apache/ignite/development/utils/IgniteWalConverter.java
@@ -21,7 +21,6 @@ import java.io.File;
import org.apache.ignite.internal.pagemem.wal.WALIterator;
import org.apache.ignite.internal.pagemem.wal.WALPointer;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
-import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
http://git-wip-us.apache.org/repos/asf/ignite/blob/111d8abb/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java
index ae8ea18..cfbe2e0 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsMulti
import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsPageEvictionTest;
import org.apache.ignite.internal.processors.cache.persistence.db.file.IgnitePdsCacheIntegrationTest;
import org.apache.ignite.internal.processors.cache.persistence.db.file.IgnitePdsNoActualWalHistoryTest;
+import org.apache.ignite.internal.processors.cache.persistence.db.file.IgnitePdsThreadInterruptionTest;
import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalRecoveryTest;
import org.apache.ignite.internal.processors.cache.persistence.db.wal.WalRecoveryTxLogicalRecordsTest;
@@ -53,6 +54,7 @@ public class IgnitePdsWithIndexingCoreTestSuite extends TestSuite {
suite.addTestSuite(IgnitePdsBinaryMetadataOnClusterRestartTest.class);
suite.addTestSuite(IgnitePdsMarshallerMappingRestoreOnNodeStartTest.class);
+ suite.addTestSuite(IgnitePdsThreadInterruptionTest.class);
return suite;
}