You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/07/07 09:37:26 UTC

[25/50] [abbrv] ignite git commit: IGNITE-5604 - Expand WAL iterator buffer if record size is greater than current buffer size - Fixes #2244.

IGNITE-5604 - Expand WAL iterator buffer if record size is greater than current buffer size - Fixes #2244.

Signed-off-by: Alexey Goncharuk <al...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/69357c5d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/69357c5d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/69357c5d

Branch: refs/heads/master
Commit: 69357c5d8be431aa51fc3add9e345807fe984fee
Parents: 905e34d
Author: Dmitriy Govorukhin <dm...@gmail.com>
Authored: Wed Jul 5 19:24:47 2017 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Wed Jul 5 19:24:47 2017 +0300

----------------------------------------------------------------------
 .../rendezvous/RendezvousAffinityFunction.java  |   4 -
 .../wal/AbstractWalRecordsIterator.java         |  14 ++-
 .../persistence/wal/ByteBufferExpander.java     |  47 +++++++++
 .../cache/persistence/wal/FileInput.java        |  20 +++-
 .../wal/FileWriteAheadLogManager.java           |   2 +-
 .../db/wal/IgniteWalRecoveryTest.java           | 100 ++++++++++++++-----
 6 files changed, 146 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/69357c5d/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
index 1bd0587..0fb20ee 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
@@ -17,10 +17,6 @@
 
 package org.apache.ignite.cache.affinity.rendezvous;
 
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;

http://git-wip-us.apache.org/repos/asf/ignite/blob/69357c5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
index 7dc0a28..f4bace1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
@@ -22,7 +22,6 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.nio.channels.FileChannel;
 import org.apache.ignite.IgniteCheckedException;
@@ -41,8 +40,8 @@ import org.jetbrains.annotations.Nullable;
  * Iterator over WAL segments. This abstract class provides most functionality for reading records in log.
  * Subclasses are to override segment switching functionality
  */
-public abstract class AbstractWalRecordsIterator extends GridCloseableIteratorAdapter<IgniteBiTuple<WALPointer, WALRecord>>
-    implements WALIterator {
+public abstract class AbstractWalRecordsIterator
+    extends GridCloseableIteratorAdapter<IgniteBiTuple<WALPointer, WALRecord>> implements WALIterator {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -73,7 +72,7 @@ public abstract class AbstractWalRecordsIterator extends GridCloseableIteratorAd
     @NotNull private final RecordSerializer serializer;
 
     /** Utility buffer for reading records */
-    private final ByteBuffer buf;
+    private final ByteBufferExpander buf;
 
     /**
      * @param log Logger
@@ -85,15 +84,14 @@ public abstract class AbstractWalRecordsIterator extends GridCloseableIteratorAd
         @NotNull final IgniteLogger log,
         @NotNull final GridCacheSharedContext sharedCtx,
         @NotNull final RecordSerializer serializer,
-        final int bufSize) {
+        final int bufSize
+    ) {
         this.log = log;
         this.sharedCtx = sharedCtx;
         this.serializer = serializer;
 
         // Do not allocate direct buffer for iterator.
-        buf = ByteBuffer.allocate(bufSize);
-        buf.order(ByteOrder.nativeOrder());
-
+        buf = new ByteBufferExpander(bufSize, ByteOrder.nativeOrder());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/69357c5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/ByteBufferExpander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/ByteBufferExpander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/ByteBufferExpander.java
new file mode 100644
index 0000000..75d3a98
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/ByteBufferExpander.java
@@ -0,0 +1,47 @@
+package org.apache.ignite.internal.processors.cache.persistence.wal;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+/**
+ * ByteBuffer wrapper for dynamically expand buffer size.
+ */
+public class ByteBufferExpander {
+    /** Byte buffer */
+    private ByteBuffer buf;
+
+    public ByteBufferExpander(int initSize, ByteOrder order) {
+        ByteBuffer buffer = ByteBuffer.allocate(initSize);
+        buffer.order(order);
+
+        this.buf = buffer;
+    }
+
+    /**
+     * Current byte buffer.
+     *
+     * @return Current byteBuffer.
+     */
+    public ByteBuffer buffer() {
+        return buf;
+    }
+
+    /**
+     * Expands current byte buffer to the requested size.
+     *
+     * @return ByteBuffer with requested size.
+     */
+    public ByteBuffer expand(int size) {
+        ByteBuffer newBuf = ByteBuffer.allocate(size);
+
+        newBuf.order(buf.order());
+
+        newBuf.put(buf);
+
+        newBuf.flip();
+
+        buf = newBuf;
+
+        return newBuf;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/69357c5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java
index e2d7cba..00c7c02 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java
@@ -42,6 +42,9 @@ public final class FileInput implements ByteBufferBackedDataInput {
     /** */
     private long pos;
 
+    /** */
+    private ByteBufferExpander expBuf;
+
     /**
      * @param ch  Channel to read from
      * @param buf Buffer for reading blocks of data into
@@ -58,6 +61,16 @@ public final class FileInput implements ByteBufferBackedDataInput {
     }
 
     /**
+     * @param ch Channel to read from
+     * @param expBuf ByteBufferWrapper with ability expand buffer dynamically.
+     */
+    public FileInput(FileChannel ch, ByteBufferExpander expBuf) throws IOException {
+        this(ch, expBuf.buffer());
+
+        this.expBuf = expBuf;
+    }
+
+    /**
      * Clear buffer.
      */
     private void clearBuffer() {
@@ -96,8 +109,11 @@ public final class FileInput implements ByteBufferBackedDataInput {
         if (available >= requested)
             return;
 
-        if (buf.capacity() < requested)
-            throw new IOException("Requested size is greater than buffer: " + requested);
+        if (buf.capacity() < requested) {
+            buf = expBuf.expand(requested);
+
+            assert available == buf.remaining();
+        }
 
         buf.compact();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/69357c5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index 8993112..162f43d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -2327,7 +2327,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
             super(log,
                 cctx,
                 serializer,
-                Math.min(16 * tlbSize, psCfg.getWalRecordIteratorBufferSize()));
+                psCfg.getWalRecordIteratorBufferSize());
             this.walWorkDir = walWorkDir;
             this.walArchiveDir = walArchiveDir;
             this.psCfg = psCfg;

http://git-wip-us.apache.org/repos/asf/ignite/blob/69357c5d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java
index 6b4907c..843fb5b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java
@@ -136,6 +136,8 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest {
 
         PersistentStoreConfiguration pCfg = new PersistentStoreConfiguration();
 
+        pCfg.setWalRecordIteratorBufferSize(1024 * 1024);
+
         if (logOnly)
             pCfg.setWalMode(WALMode.LOG_ONLY);
 
@@ -180,46 +182,79 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest {
      * @throws Exception if failed.
      */
     public void testWalBig() throws Exception {
-        try {
-            IgniteEx ignite = startGrid(1);
+        IgniteEx ignite = startGrid(1);
 
-            ignite.active(true);
+        ignite.active(true);
 
-            IgniteCache<Object, Object> cache = ignite.cache("partitioned");
+        IgniteCache<Object, Object> cache = ignite.cache("partitioned");
 
-            Random rnd = new Random();
+        Random rnd = new Random();
 
-            Map<Integer, IndexedObject> map = new HashMap<>();
+        Map<Integer, IndexedObject> map = new HashMap<>();
 
-            for (int i = 0; i < 10_000; i++) {
-                if (i % 1000 == 0)
-                    X.println(" >> " + i);
+        for (int i = 0; i < 10_000; i++) {
+            if (i % 1000 == 0)
+                X.println(" >> " + i);
 
-                int k = rnd.nextInt(300_000);
-                IndexedObject v = new IndexedObject(rnd.nextInt(10_000));
+            int k = rnd.nextInt(300_000);
+            IndexedObject v = new IndexedObject(rnd.nextInt(10_000));
 
-                cache.put(k, v);
-                map.put(k, v);
-            }
+            cache.put(k, v);
+            map.put(k, v);
+        }
 
-            // Check.
-            for (Integer k : map.keySet())
-                assertEquals(map.get(k), cache.get(k));
+        // Check.
+        for (Integer k : map.keySet())
+            assertEquals(map.get(k), cache.get(k));
 
-            stopGrid(1);
+        stopGrid(1);
 
-            ignite = startGrid(1);
+        ignite = startGrid(1);
 
-            ignite.active(true);
+        ignite.active(true);
 
-            cache = ignite.cache("partitioned");
+        cache = ignite.cache("partitioned");
 
-            // Check.
-            for (Integer k : map.keySet())
-                assertEquals(map.get(k), cache.get(k));
+        // Check.
+        for (Integer k : map.keySet())
+            assertEquals(map.get(k), cache.get(k));
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testWalBigObjectNodeCancel() throws Exception {
+        final int MAX_SIZE_POWER = 21;
+
+        IgniteEx ignite = startGrid(1);
+
+        ignite.active(true);
+
+        IgniteCache<Object, Object> cache = ignite.cache("partitioned");
+
+        for (int i = 0; i < MAX_SIZE_POWER; ++i) {
+            int size = 1 << i;
+
+            cache.put("key_" + i, createTestData(size));
         }
-        finally {
-            stopAllGrids();
+
+        stopGrid(1, true);
+
+        ignite = startGrid(1);
+
+        ignite.active(true);
+
+        cache = ignite.cache("partitioned");
+
+        // Check.
+        for (int i = 0; i < MAX_SIZE_POWER; ++i) {
+            int size = 1 << i;
+
+            int[] data = createTestData(size);
+
+            int[] val = (int[])cache.get("key_" + i);
+
+            assertTrue("Invalid data. [key=key_" + i + ']', Arrays.equals(data, val));
         }
     }
 
@@ -977,6 +1012,19 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest {
     }
 
     /**
+     * @param size Size of data.
+     * @return Test data.
+     */
+    private int[] createTestData(int size) {
+        int[] data = new int[size];
+
+        for (int d = 0; d < size; ++d)
+            data[d] = d;
+
+        return data;
+    }
+
+    /**
      *
      */
     private static class LoadRunnable implements IgniteRunnable {