You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/07/06 08:04:31 UTC
[16/21] ignite git commit: IGNITE-5604 - Expand WAL iterator buffer
if record size is greater than current buffer size - Fixes #2244.
IGNITE-5604 - Expand WAL iterator buffer if record size is greater than current buffer size - Fixes #2244.
Signed-off-by: Alexey Goncharuk <al...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/69357c5d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/69357c5d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/69357c5d
Branch: refs/heads/ignite-gg-12306-1
Commit: 69357c5d8be431aa51fc3add9e345807fe984fee
Parents: 905e34d
Author: Dmitriy Govorukhin <dm...@gmail.com>
Authored: Wed Jul 5 19:24:47 2017 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Wed Jul 5 19:24:47 2017 +0300
----------------------------------------------------------------------
.../rendezvous/RendezvousAffinityFunction.java | 4 -
.../wal/AbstractWalRecordsIterator.java | 14 ++-
.../persistence/wal/ByteBufferExpander.java | 47 +++++++++
.../cache/persistence/wal/FileInput.java | 20 +++-
.../wal/FileWriteAheadLogManager.java | 2 +-
.../db/wal/IgniteWalRecoveryTest.java | 100 ++++++++++++++-----
6 files changed, 146 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/69357c5d/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
index 1bd0587..0fb20ee 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
@@ -17,10 +17,6 @@
package org.apache.ignite.cache.affinity.rendezvous;
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
http://git-wip-us.apache.org/repos/asf/ignite/blob/69357c5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
index 7dc0a28..f4bace1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
@@ -22,7 +22,6 @@ import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.FileChannel;
import org.apache.ignite.IgniteCheckedException;
@@ -41,8 +40,8 @@ import org.jetbrains.annotations.Nullable;
* Iterator over WAL segments. This abstract class provides most functionality for reading records in log.
* Subclasses are to override segment switching functionality
*/
-public abstract class AbstractWalRecordsIterator extends GridCloseableIteratorAdapter<IgniteBiTuple<WALPointer, WALRecord>>
- implements WALIterator {
+public abstract class AbstractWalRecordsIterator
+ extends GridCloseableIteratorAdapter<IgniteBiTuple<WALPointer, WALRecord>> implements WALIterator {
/** */
private static final long serialVersionUID = 0L;
@@ -73,7 +72,7 @@ public abstract class AbstractWalRecordsIterator extends GridCloseableIteratorAd
@NotNull private final RecordSerializer serializer;
/** Utility buffer for reading records */
- private final ByteBuffer buf;
+ private final ByteBufferExpander buf;
/**
* @param log Logger
@@ -85,15 +84,14 @@ public abstract class AbstractWalRecordsIterator extends GridCloseableIteratorAd
@NotNull final IgniteLogger log,
@NotNull final GridCacheSharedContext sharedCtx,
@NotNull final RecordSerializer serializer,
- final int bufSize) {
+ final int bufSize
+ ) {
this.log = log;
this.sharedCtx = sharedCtx;
this.serializer = serializer;
// Do not allocate direct buffer for iterator.
- buf = ByteBuffer.allocate(bufSize);
- buf.order(ByteOrder.nativeOrder());
-
+ buf = new ByteBufferExpander(bufSize, ByteOrder.nativeOrder());
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/69357c5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/ByteBufferExpander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/ByteBufferExpander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/ByteBufferExpander.java
new file mode 100644
index 0000000..75d3a98
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/ByteBufferExpander.java
@@ -0,0 +1,47 @@
+package org.apache.ignite.internal.processors.cache.persistence.wal;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+/**
+ * ByteBuffer wrapper for dynamically expand buffer size.
+ */
+public class ByteBufferExpander {
+ /** Byte buffer */
+ private ByteBuffer buf;
+
+ public ByteBufferExpander(int initSize, ByteOrder order) {
+ ByteBuffer buffer = ByteBuffer.allocate(initSize);
+ buffer.order(order);
+
+ this.buf = buffer;
+ }
+
+ /**
+ * Current byte buffer.
+ *
+ * @return Current byteBuffer.
+ */
+ public ByteBuffer buffer() {
+ return buf;
+ }
+
+ /**
+ * Expands current byte buffer to the requested size.
+ *
+ * @return ByteBuffer with requested size.
+ */
+ public ByteBuffer expand(int size) {
+ ByteBuffer newBuf = ByteBuffer.allocate(size);
+
+ newBuf.order(buf.order());
+
+ newBuf.put(buf);
+
+ newBuf.flip();
+
+ buf = newBuf;
+
+ return newBuf;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/69357c5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java
index e2d7cba..00c7c02 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java
@@ -42,6 +42,9 @@ public final class FileInput implements ByteBufferBackedDataInput {
/** */
private long pos;
+ /** */
+ private ByteBufferExpander expBuf;
+
/**
* @param ch Channel to read from
* @param buf Buffer for reading blocks of data into
@@ -58,6 +61,16 @@ public final class FileInput implements ByteBufferBackedDataInput {
}
/**
+ * @param ch Channel to read from
+ * @param expBuf ByteBufferWrapper with ability expand buffer dynamically.
+ */
+ public FileInput(FileChannel ch, ByteBufferExpander expBuf) throws IOException {
+ this(ch, expBuf.buffer());
+
+ this.expBuf = expBuf;
+ }
+
+ /**
* Clear buffer.
*/
private void clearBuffer() {
@@ -96,8 +109,11 @@ public final class FileInput implements ByteBufferBackedDataInput {
if (available >= requested)
return;
- if (buf.capacity() < requested)
- throw new IOException("Requested size is greater than buffer: " + requested);
+ if (buf.capacity() < requested) {
+ buf = expBuf.expand(requested);
+
+ assert available == buf.remaining();
+ }
buf.compact();
http://git-wip-us.apache.org/repos/asf/ignite/blob/69357c5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index 8993112..162f43d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -2327,7 +2327,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
super(log,
cctx,
serializer,
- Math.min(16 * tlbSize, psCfg.getWalRecordIteratorBufferSize()));
+ psCfg.getWalRecordIteratorBufferSize());
this.walWorkDir = walWorkDir;
this.walArchiveDir = walArchiveDir;
this.psCfg = psCfg;
http://git-wip-us.apache.org/repos/asf/ignite/blob/69357c5d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java
index 6b4907c..843fb5b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java
@@ -136,6 +136,8 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest {
PersistentStoreConfiguration pCfg = new PersistentStoreConfiguration();
+ pCfg.setWalRecordIteratorBufferSize(1024 * 1024);
+
if (logOnly)
pCfg.setWalMode(WALMode.LOG_ONLY);
@@ -180,46 +182,79 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest {
* @throws Exception if failed.
*/
public void testWalBig() throws Exception {
- try {
- IgniteEx ignite = startGrid(1);
+ IgniteEx ignite = startGrid(1);
- ignite.active(true);
+ ignite.active(true);
- IgniteCache<Object, Object> cache = ignite.cache("partitioned");
+ IgniteCache<Object, Object> cache = ignite.cache("partitioned");
- Random rnd = new Random();
+ Random rnd = new Random();
- Map<Integer, IndexedObject> map = new HashMap<>();
+ Map<Integer, IndexedObject> map = new HashMap<>();
- for (int i = 0; i < 10_000; i++) {
- if (i % 1000 == 0)
- X.println(" >> " + i);
+ for (int i = 0; i < 10_000; i++) {
+ if (i % 1000 == 0)
+ X.println(" >> " + i);
- int k = rnd.nextInt(300_000);
- IndexedObject v = new IndexedObject(rnd.nextInt(10_000));
+ int k = rnd.nextInt(300_000);
+ IndexedObject v = new IndexedObject(rnd.nextInt(10_000));
- cache.put(k, v);
- map.put(k, v);
- }
+ cache.put(k, v);
+ map.put(k, v);
+ }
- // Check.
- for (Integer k : map.keySet())
- assertEquals(map.get(k), cache.get(k));
+ // Check.
+ for (Integer k : map.keySet())
+ assertEquals(map.get(k), cache.get(k));
- stopGrid(1);
+ stopGrid(1);
- ignite = startGrid(1);
+ ignite = startGrid(1);
- ignite.active(true);
+ ignite.active(true);
- cache = ignite.cache("partitioned");
+ cache = ignite.cache("partitioned");
- // Check.
- for (Integer k : map.keySet())
- assertEquals(map.get(k), cache.get(k));
+ // Check.
+ for (Integer k : map.keySet())
+ assertEquals(map.get(k), cache.get(k));
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ public void testWalBigObjectNodeCancel() throws Exception {
+ final int MAX_SIZE_POWER = 21;
+
+ IgniteEx ignite = startGrid(1);
+
+ ignite.active(true);
+
+ IgniteCache<Object, Object> cache = ignite.cache("partitioned");
+
+ for (int i = 0; i < MAX_SIZE_POWER; ++i) {
+ int size = 1 << i;
+
+ cache.put("key_" + i, createTestData(size));
}
- finally {
- stopAllGrids();
+
+ stopGrid(1, true);
+
+ ignite = startGrid(1);
+
+ ignite.active(true);
+
+ cache = ignite.cache("partitioned");
+
+ // Check.
+ for (int i = 0; i < MAX_SIZE_POWER; ++i) {
+ int size = 1 << i;
+
+ int[] data = createTestData(size);
+
+ int[] val = (int[])cache.get("key_" + i);
+
+ assertTrue("Invalid data. [key=key_" + i + ']', Arrays.equals(data, val));
}
}
@@ -977,6 +1012,19 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest {
}
/**
+ * @param size Size of data.
+ * @return Test data.
+ */
+ private int[] createTestData(int size) {
+ int[] data = new int[size];
+
+ for (int d = 0; d < size; ++d)
+ data[d] = d;
+
+ return data;
+ }
+
+ /**
*
*/
private static class LoadRunnable implements IgniteRunnable {