You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/08/16 15:41:36 UTC
[13/25] ignite git commit: IGNITE-5741 - Replaced HeapByteBuffer with
DirectByteBuffer in WAL records iterator - Fixes #2329.
IGNITE-5741 - Replaced HeapByteBuffer with DirectByteBuffer in WAL records iterator - Fixes #2329.
Signed-off-by: Alexey Goncharuk <al...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/81195dbe
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/81195dbe
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/81195dbe
Branch: refs/heads/ignite-5901
Commit: 81195dbe74e59acb591b6365ca4f34918956e6a0
Parents: 18e79e5
Author: Dmitriy Govorukhin <dm...@gmail.com>
Authored: Mon Aug 14 16:33:12 2017 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Mon Aug 14 16:33:12 2017 +0300
----------------------------------------------------------------------
.../wal/AbstractWalRecordsIterator.java | 11 ++++-
.../persistence/wal/ByteBufferExpander.java | 22 ++++++---
.../wal/FileWriteAheadLogManager.java | 11 ++---
.../reader/StandaloneWalRecordsIterator.java | 9 ++--
.../apache/ignite/internal/util/GridUnsafe.java | 14 ++++++
.../db/wal/crc/IgniteDataIntegrityTests.java | 52 +++++++++++++++++++-
6 files changed, 100 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/81195dbe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
index beed90b..db949c3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
@@ -95,7 +95,6 @@ public abstract class AbstractWalRecordsIterator
this.serializer = serializer;
this.ioFactory = ioFactory;
- // Do not allocate direct buffer for iterator.
buf = new ByteBufferExpander(bufSize, ByteOrder.nativeOrder());
}
@@ -128,6 +127,16 @@ public abstract class AbstractWalRecordsIterator
return curRec != null;
}
+ /** {@inheritDoc} */
+ @Override protected void onClose() throws IgniteCheckedException {
+ try {
+ buf.close();
+ }
+ catch (Exception ex) {
+ throw new IgniteCheckedException(ex);
+ }
+ }
+
/**
* Switches records iterator to the next record.
* <ul>
http://git-wip-us.apache.org/repos/asf/ignite/blob/81195dbe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/ByteBufferExpander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/ByteBufferExpander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/ByteBufferExpander.java
index 829cd5c..cf1db84 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/ByteBufferExpander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/ByteBufferExpander.java
@@ -19,19 +19,24 @@ package org.apache.ignite.internal.processors.cache.persistence.wal;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
+import org.apache.ignite.internal.util.GridUnsafe;
/**
* ByteBuffer wrapper for dynamically expand buffer size.
*/
-public class ByteBufferExpander {
+public class ByteBufferExpander implements AutoCloseable {
/** Byte buffer */
private ByteBuffer buf;
+ /**
+ * @param initSize Initial size.
+ * @param order Byte order.
+ */
public ByteBufferExpander(int initSize, ByteOrder order) {
- ByteBuffer buffer = ByteBuffer.allocate(initSize);
+ ByteBuffer buffer = GridUnsafe.allocateBuffer(initSize);
buffer.order(order);
- this.buf = buffer;
+ buf = buffer;
}
/**
@@ -49,16 +54,17 @@ public class ByteBufferExpander {
* @return ByteBuffer with requested size.
*/
public ByteBuffer expand(int size) {
- ByteBuffer newBuf = ByteBuffer.allocate(size);
+ ByteBuffer newBuf = GridUnsafe.reallocateBuffer(buf, size);
newBuf.order(buf.order());
- newBuf.put(buf);
-
- newBuf.flip();
-
buf = newBuf;
return newBuf;
}
+
+ /** {@inheritDoc} */
+ @Override public void close() {
+ GridUnsafe.freeBuffer(buf);
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/81195dbe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index 17db8f8..bb1f910 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -1430,12 +1430,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
*/
private int readSerializerVersion(FileIO io, File file, long idx)
throws IOException, IgniteCheckedException {
- try {
- ByteBuffer buf = ByteBuffer.allocate(RecordV1Serializer.HEADER_RECORD_SIZE);
- buf.order(ByteOrder.nativeOrder());
-
- FileInput in = new FileInput(io,
- new ByteBufferExpander(RecordV1Serializer.HEADER_RECORD_SIZE, ByteOrder.nativeOrder()));
+ try (ByteBufferExpander buf = new ByteBufferExpander(RecordV1Serializer.HEADER_RECORD_SIZE, ByteOrder.nativeOrder())){
+ FileInput in = new FileInput(io, buf);
// Header record must be agnostic to the serializer version.
WALRecord rec = serializer.readRecord(in, new FileWALPointer(idx, 0, 0));
@@ -2402,9 +2398,12 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
/** {@inheritDoc} */
@Override protected void onClose() throws IgniteCheckedException {
+ super.onClose();
+
curRec = null;
final ReadFileHandle handle = closeCurrentWalSegment();
+
if (handle != null && handle.workDir)
releaseWorkSegment(curWalSegmIdx);
http://git-wip-us.apache.org/repos/asf/ignite/blob/81195dbe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
index 85022ad..cd0f8ab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
@@ -180,9 +180,11 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator {
FileWALPointer ptr;
- try (FileIO fileIO = ioFactory.create(file, "r")) {
- final DataInput in = new FileInput(fileIO,
- new ByteBufferExpander(HEADER_RECORD_SIZE, ByteOrder.nativeOrder()));
+ try (
+ FileIO fileIO = ioFactory.create(file, "r");
+ ByteBufferExpander buf = new ByteBufferExpander(HEADER_RECORD_SIZE, ByteOrder.nativeOrder())
+ ) {
+ final DataInput in = new FileInput(fileIO, buf);
// Header record must be agnostic to the serializer version.
final int type = in.readUnsignedByte();
@@ -256,6 +258,7 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator {
/** {@inheritDoc} */
@Override protected void onClose() throws IgniteCheckedException {
super.onClose();
+
curRec = null;
closeCurrentWalSegment();
http://git-wip-us.apache.org/repos/asf/ignite/blob/81195dbe/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java
index 0add64d..15e6f2c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java
@@ -139,6 +139,20 @@ public abstract class GridUnsafe {
}
/**
+ *
+ * @param buf Buffer.
+ * @param len New length.
+ * @return Reallocated direct buffer.
+ */
+ public static ByteBuffer reallocateBuffer(ByteBuffer buf, int len) {
+ long ptr = bufferAddress(buf);
+
+ long newPtr = reallocateMemory(ptr, len);
+
+ return wrapPointer(newPtr, len);
+ }
+
+ /**
* Gets boolean value from object field.
*
* @param obj Object.
http://git-wip-us.apache.org/repos/asf/ignite/blob/81195dbe/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteDataIntegrityTests.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteDataIntegrityTests.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteDataIntegrityTests.java
index b93c74d..270c560 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteDataIntegrityTests.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteDataIntegrityTests.java
@@ -41,6 +41,9 @@ public class IgniteDataIntegrityTests extends TestCase {
/** Random access file. */
private RandomAccessFile randomAccessFile;
+ /** Buffer expander. */
+ private ByteBufferExpander expBuf;
+
/** {@inheritDoc} */
@Override protected void setUp() throws Exception {
super.setUp();
@@ -50,9 +53,11 @@ public class IgniteDataIntegrityTests extends TestCase {
randomAccessFile = new RandomAccessFile(file, "rw");
+ expBuf = new ByteBufferExpander(1024, ByteOrder.BIG_ENDIAN);
+
fileInput = new FileInput(
new RandomAccessFileIO(randomAccessFile),
- new ByteBufferExpander(1024, ByteOrder.BIG_ENDIAN)
+ expBuf
);
ByteBuffer buf = ByteBuffer.allocate(1024);
@@ -70,6 +75,12 @@ public class IgniteDataIntegrityTests extends TestCase {
randomAccessFile.getFD().sync();
}
+ /** {@inheritDoc} */
+ @Override protected void tearDown() throws Exception {
+ randomAccessFile.close();
+ expBuf.close();
+ }
+
/**
*
*/
@@ -108,6 +119,45 @@ public class IgniteDataIntegrityTests extends TestCase {
}
/**
+ *
+ */
+ public void testExpandBuffer() {
+ ByteBufferExpander expBuf = new ByteBufferExpander(16, ByteOrder.nativeOrder());
+
+ ByteBuffer b1 = expBuf.buffer();
+
+ b1.put((byte)1);
+ b1.putInt(2);
+ b1.putLong(3L);
+
+ assertEquals(13, b1.position());
+ assertEquals(16, b1.limit());
+
+ ByteBuffer b2 = expBuf.expand(32);
+
+ assertEquals(0, b2.position());
+ assertEquals((byte)1, b2.get());
+ assertEquals(2, b2.getInt());
+ assertEquals(3L, b2.getLong());
+ assertEquals(13, b2.position());
+ assertEquals(32, b2.limit());
+
+ b2.putInt(4);
+
+ assertEquals(17, b2.position());
+ assertEquals(32, b2.limit());
+
+ b2.flip();
+
+ assertEquals(0, b2.position());
+ assertEquals((byte)1, b2.get());
+ assertEquals(2, b2.getInt());
+ assertEquals(3L, b2.getLong());
+ assertEquals(4, b2.getInt());
+ assertEquals(17, b2.limit());
+ }
+
+ /**
* @param rangeFrom Range from.
* @param rangeTo Range to.
*/