You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/08/21 08:40:14 UTC

[05/50] [abbrv] ignite git commit: IGNITE-5741 - Replaced HeapByteBuffer with DirectByteBuffer in WAL records iterator - Fixes #2329.

IGNITE-5741 - Replaced HeapByteBuffer with DirectByteBuffer in WAL records iterator - Fixes #2329.

Signed-off-by: Alexey Goncharuk <al...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/81195dbe
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/81195dbe
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/81195dbe

Branch: refs/heads/ignite-5578
Commit: 81195dbe74e59acb591b6365ca4f34918956e6a0
Parents: 18e79e5
Author: Dmitriy Govorukhin <dm...@gmail.com>
Authored: Mon Aug 14 16:33:12 2017 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Mon Aug 14 16:33:12 2017 +0300

----------------------------------------------------------------------
 .../wal/AbstractWalRecordsIterator.java         | 11 ++++-
 .../persistence/wal/ByteBufferExpander.java     | 22 ++++++---
 .../wal/FileWriteAheadLogManager.java           | 11 ++---
 .../reader/StandaloneWalRecordsIterator.java    |  9 ++--
 .../apache/ignite/internal/util/GridUnsafe.java | 14 ++++++
 .../db/wal/crc/IgniteDataIntegrityTests.java    | 52 +++++++++++++++++++-
 6 files changed, 100 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/81195dbe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
index beed90b..db949c3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
@@ -95,7 +95,6 @@ public abstract class AbstractWalRecordsIterator
         this.serializer = serializer;
         this.ioFactory = ioFactory;
 
-        // Do not allocate direct buffer for iterator.
         buf = new ByteBufferExpander(bufSize, ByteOrder.nativeOrder());
     }
 
@@ -128,6 +127,16 @@ public abstract class AbstractWalRecordsIterator
         return curRec != null;
     }
 
+    /** {@inheritDoc} */
+    @Override protected void onClose() throws IgniteCheckedException {
+        try {
+            buf.close();
+        }
+        catch (Exception ex) {
+            throw new IgniteCheckedException(ex);
+        }
+    }
+
     /**
      * Switches records iterator to the next record.
      * <ul>

http://git-wip-us.apache.org/repos/asf/ignite/blob/81195dbe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/ByteBufferExpander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/ByteBufferExpander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/ByteBufferExpander.java
index 829cd5c..cf1db84 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/ByteBufferExpander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/ByteBufferExpander.java
@@ -19,19 +19,24 @@ package org.apache.ignite.internal.processors.cache.persistence.wal;
 
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
+import org.apache.ignite.internal.util.GridUnsafe;
 
 /**
  * ByteBuffer wrapper for dynamically expand buffer size.
  */
-public class ByteBufferExpander {
+public class ByteBufferExpander implements AutoCloseable {
     /** Byte buffer */
     private ByteBuffer buf;
 
+    /**
+     * @param initSize Initial size.
+     * @param order Byte order.
+     */
     public ByteBufferExpander(int initSize, ByteOrder order) {
-        ByteBuffer buffer = ByteBuffer.allocate(initSize);
+        ByteBuffer buffer = GridUnsafe.allocateBuffer(initSize);
         buffer.order(order);
 
-        this.buf = buffer;
+        buf = buffer;
     }
 
     /**
@@ -49,16 +54,17 @@ public class ByteBufferExpander {
      * @return ByteBuffer with requested size.
      */
     public ByteBuffer expand(int size) {
-        ByteBuffer newBuf = ByteBuffer.allocate(size);
+        ByteBuffer newBuf = GridUnsafe.reallocateBuffer(buf, size);
 
         newBuf.order(buf.order());
 
-        newBuf.put(buf);
-
-        newBuf.flip();
-
         buf = newBuf;
 
         return newBuf;
     }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        GridUnsafe.freeBuffer(buf);
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/81195dbe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index 17db8f8..bb1f910 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -1430,12 +1430,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
      */
     private int readSerializerVersion(FileIO io, File file, long idx)
         throws IOException, IgniteCheckedException {
-        try {
-            ByteBuffer buf = ByteBuffer.allocate(RecordV1Serializer.HEADER_RECORD_SIZE);
-            buf.order(ByteOrder.nativeOrder());
-
-            FileInput in = new FileInput(io,
-                new ByteBufferExpander(RecordV1Serializer.HEADER_RECORD_SIZE, ByteOrder.nativeOrder()));
+        try (ByteBufferExpander buf = new ByteBufferExpander(RecordV1Serializer.HEADER_RECORD_SIZE, ByteOrder.nativeOrder())){
+            FileInput in = new FileInput(io, buf);
 
             // Header record must be agnostic to the serializer version.
             WALRecord rec = serializer.readRecord(in, new FileWALPointer(idx, 0, 0));
@@ -2402,9 +2398,12 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
 
         /** {@inheritDoc} */
         @Override protected void onClose() throws IgniteCheckedException {
+            super.onClose();
+
             curRec = null;
 
             final ReadFileHandle handle = closeCurrentWalSegment();
+
             if (handle != null && handle.workDir)
                 releaseWorkSegment(curWalSegmIdx);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/81195dbe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
index 85022ad..cd0f8ab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
@@ -180,9 +180,11 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator {
 
             FileWALPointer ptr;
 
-            try (FileIO fileIO = ioFactory.create(file, "r")) {
-                final DataInput in = new FileInput(fileIO,
-                    new ByteBufferExpander(HEADER_RECORD_SIZE, ByteOrder.nativeOrder()));
+            try (
+                FileIO fileIO = ioFactory.create(file, "r");
+                ByteBufferExpander buf = new ByteBufferExpander(HEADER_RECORD_SIZE, ByteOrder.nativeOrder())
+            ) {
+                final DataInput in = new FileInput(fileIO, buf);
 
                 // Header record must be agnostic to the serializer version.
                 final int type = in.readUnsignedByte();
@@ -256,6 +258,7 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator {
     /** {@inheritDoc} */
     @Override protected void onClose() throws IgniteCheckedException {
         super.onClose();
+
         curRec = null;
 
         closeCurrentWalSegment();

http://git-wip-us.apache.org/repos/asf/ignite/blob/81195dbe/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java
index 0add64d..15e6f2c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java
@@ -139,6 +139,20 @@ public abstract class GridUnsafe {
     }
 
     /**
+     *
+     * @param buf Buffer.
+     * @param len New length.
+     * @return Reallocated direct buffer.
+     */
+    public static ByteBuffer reallocateBuffer(ByteBuffer buf, int len) {
+        long ptr = bufferAddress(buf);
+
+        long newPtr = reallocateMemory(ptr, len);
+
+        return wrapPointer(newPtr, len);
+    }
+
+    /**
      * Gets boolean value from object field.
      *
      * @param obj Object.

http://git-wip-us.apache.org/repos/asf/ignite/blob/81195dbe/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteDataIntegrityTests.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteDataIntegrityTests.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteDataIntegrityTests.java
index b93c74d..270c560 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteDataIntegrityTests.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteDataIntegrityTests.java
@@ -41,6 +41,9 @@ public class IgniteDataIntegrityTests extends TestCase {
     /** Random access file. */
     private RandomAccessFile randomAccessFile;
 
+    /** Buffer expander. */
+    private ByteBufferExpander expBuf;
+
     /** {@inheritDoc} */
     @Override protected void setUp() throws Exception {
         super.setUp();
@@ -50,9 +53,11 @@ public class IgniteDataIntegrityTests extends TestCase {
 
         randomAccessFile = new RandomAccessFile(file, "rw");
 
+        expBuf = new ByteBufferExpander(1024, ByteOrder.BIG_ENDIAN);
+
         fileInput = new FileInput(
             new RandomAccessFileIO(randomAccessFile),
-            new ByteBufferExpander(1024, ByteOrder.BIG_ENDIAN)
+            expBuf
         );
 
         ByteBuffer buf = ByteBuffer.allocate(1024);
@@ -70,6 +75,12 @@ public class IgniteDataIntegrityTests extends TestCase {
         randomAccessFile.getFD().sync();
     }
 
+    /** {@inheritDoc} */
+    @Override protected void tearDown() throws Exception {
+        randomAccessFile.close();
+        expBuf.close();
+    }
+
     /**
      *
      */
@@ -108,6 +119,45 @@ public class IgniteDataIntegrityTests extends TestCase {
     }
 
     /**
+     *
+     */
+    public void testExpandBuffer() {
+        ByteBufferExpander expBuf = new ByteBufferExpander(16, ByteOrder.nativeOrder());
+
+        ByteBuffer b1 = expBuf.buffer();
+
+        b1.put((byte)1);
+        b1.putInt(2);
+        b1.putLong(3L);
+
+        assertEquals(13, b1.position());
+        assertEquals(16, b1.limit());
+
+        ByteBuffer b2 = expBuf.expand(32);
+
+        assertEquals(0, b2.position());
+        assertEquals((byte)1, b2.get());
+        assertEquals(2, b2.getInt());
+        assertEquals(3L, b2.getLong());
+        assertEquals(13, b2.position());
+        assertEquals(32, b2.limit());
+
+        b2.putInt(4);
+
+        assertEquals(17, b2.position());
+        assertEquals(32, b2.limit());
+
+        b2.flip();
+
+        assertEquals(0, b2.position());
+        assertEquals((byte)1, b2.get());
+        assertEquals(2, b2.getInt());
+        assertEquals(3L, b2.getLong());
+        assertEquals(4, b2.getInt());
+        assertEquals(17, b2.limit());
+    }
+
+    /**
      * @param rangeFrom Range from.
      * @param rangeTo Range to.
      */