You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/12/26 07:33:38 UTC
[05/12] ignite git commit: ignite-6339 WAL write operations are
optimized and file IO operations are non-interruptible from user thread now
http://git-wip-us.apache.org/repos/asf/ignite/blob/a5ffd4eb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java
index 1844bfe..aa8d57f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java
@@ -147,7 +147,6 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest {
memCfg.setWalAutoArchiveAfterInactivity(archiveIncompleteSegmentAfterInactivityMs);
cfg.setDataStorageConfiguration(memCfg);
-
return cfg;
}
@@ -441,10 +440,9 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest {
final IgniteCache<Object, Object> entries = txPutDummyRecords(ignite0, cntEntries, txCnt);
- final Map<Object, Object> ctrlMap = new HashMap<>();
- for (Cache.Entry<Object, Object> next : entries) {
- ctrlMap.put(next.getKey(), next.getValue());
- }
+ final Map<Object, Object> ctrlMap = new HashMap<>(); for (Cache.Entry<Object, Object> next : entries)
+ ctrlMap.put(next.getKey(), next.getValue());
+
final String subfolderName = genDbSubfolderName(ignite0, 0);
stopGrid("node0");
@@ -467,14 +465,16 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest {
if (val instanceof IndexedObject) {
IndexedObject indexedObj = (IndexedObject)val;
+
assertEquals(indexedObj.iVal, indexedObj.jVal);
assertEquals(indexedObj.iVal, key);
- for (byte datum : indexedObj.getData()) {
+
+ for (byte datum : indexedObj.getData())
assertTrue(datum >= 'A' && datum <= 'A' + 10);
- }
}
}
};
+
scanIterateAndCount(factory, workDir, subfolderName, cntEntries, txCnt, objConsumer, null);
assertTrue(" Control Map is not empty after reading entries " + ctrlMap, ctrlMap.isEmpty());
@@ -555,7 +555,7 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest {
* @throws Exception if failed.
*/
public void testFillWalWithDifferentTypes() throws Exception {
- int cntEntries = 0;
+ int cntEntries;
final Map<Object, Object> ctrlMap = new HashMap<>();
final Map<Object, Object> ctrlMapForBinaryObjects = new HashMap<>();
@@ -595,14 +595,13 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest {
ctrlStringsForBinaryObjSearch.add(search3); //validate only string itself
addlCache.put(key, "SearchKey");
- cntEntries = addlCache.size();
- for (Cache.Entry<Object, Object> next : addlCache) {
- ctrlMap.put(next.getKey(), next.getValue());
- }
+ cntEntries = addlCache.size();
+ for (Cache.Entry<Object, Object> next : addlCache)
+ ctrlMap.put(next.getKey(), next.getValue());
+
+ for (Cache.Entry<Object, Object> next : addlCache)
+ ctrlMapForBinaryObjects.put(next.getKey(), next.getValue());
- for (Cache.Entry<Object, Object> next : addlCache) {
- ctrlMapForBinaryObjects.put(next.getKey(), next.getValue());
- }
final String subfolderName = genDbSubfolderName(ignite0, 0);
@@ -693,9 +692,8 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest {
binaryObj.field("jVal").toString());
byte data[] = binaryObj.field("data");
- for (byte datum : data) {
+ for (byte datum : data)
assertTrue(datum >= 'A' && datum <= 'A' + 10);
- }
}
}
}
@@ -704,15 +702,19 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest {
final IgniteInClosure<DataRecord> binObjToStrChecker = new IgniteInClosure<DataRecord>() {
@Override public void apply(DataRecord record) {
String strRepresentation = record.toString();
+
for (Iterator<String> iter = ctrlStringsForBinaryObjSearch.iterator(); iter.hasNext(); ) {
final String next = iter.next();
+
if (strRepresentation.contains(next)) {
iter.remove();
+
break;
}
}
}
};
+
scanIterateAndCount(keepBinFactory, workDir, subfolderName, cntEntries, 0, binObjConsumer, binObjToStrChecker);
assertTrue(" Control Map is not empty after reading entries: " + ctrlMapForBinaryObjects,
http://git-wip-us.apache.org/repos/asf/ignite/blob/a5ffd4eb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/MockWalIteratorFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/MockWalIteratorFactory.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/MockWalIteratorFactory.java
index 5c9e084..8068b08 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/MockWalIteratorFactory.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/MockWalIteratorFactory.java
@@ -64,7 +64,7 @@ public class MockWalIteratorFactory {
* @param log Logger.
* @param pageSize Page size.
* @param consistentId Consistent id.
- * @param subfolderName
+ * @param subfolderName Subfolder name.
* @param segments Segments.
*/
public MockWalIteratorFactory(@Nullable IgniteLogger log,
@@ -85,13 +85,14 @@ public class MockWalIteratorFactory {
* @return iterator
* @throws IgniteCheckedException if IO failed
*/
+ @SuppressWarnings("unchecked")
public WALIterator iterator(File wal, File walArchive) throws IgniteCheckedException {
final DataStorageConfiguration persistentCfg1 = Mockito.mock(DataStorageConfiguration.class);
when(persistentCfg1.getWalPath()).thenReturn(wal.getAbsolutePath());
when(persistentCfg1.getWalArchivePath()).thenReturn(walArchive.getAbsolutePath());
when(persistentCfg1.getWalSegments()).thenReturn(segments);
- when(persistentCfg1.getWalThreadLocalBufferSize()).thenReturn(DataStorageConfiguration.DFLT_TLB_SIZE);
+ when(persistentCfg1.getWalBufferSize()).thenReturn(DataStorageConfiguration.DFLT_WAL_BUFF_SIZE);
when(persistentCfg1.getWalRecordIteratorBufferSize()).thenReturn(DataStorageConfiguration.DFLT_WAL_RECORD_ITERATOR_BUFFER_SIZE);
final FileIOFactory fileIOFactory = new DataStorageConfiguration().getFileIOFactory();
@@ -121,10 +122,10 @@ public class MockWalIteratorFactory {
when(sctx.discovery()).thenReturn(disco);
when(sctx.gridConfig()).thenReturn(cfg);
- final GridCacheDatabaseSharedManager database = Mockito.mock(GridCacheDatabaseSharedManager.class);
+ final GridCacheDatabaseSharedManager db = Mockito.mock(GridCacheDatabaseSharedManager.class);
- when(database.pageSize()).thenReturn(pageSize);
- when(sctx.database()).thenReturn(database);
+ when(db.pageSize()).thenReturn(pageSize);
+ when(sctx.database()).thenReturn(db);
when(sctx.logger(any(Class.class))).thenReturn(log);
mgr.start(sctx);
http://git-wip-us.apache.org/repos/asf/ignite/blob/a5ffd4eb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSmokeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSmokeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSmokeTest.java
index 1875cfb..57fecbe 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSmokeTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSmokeTest.java
@@ -20,6 +20,7 @@ import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
import java.nio.file.OpenOption;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -229,6 +230,7 @@ public class PagesWriteThrottleSmokeTest extends GridCommonAbstractTest {
private final int v2;
/** */
+ @SuppressWarnings("unused")
private byte[] payload = new byte[400 + ThreadLocalRandom.current().nextInt(20)];
/**
@@ -316,6 +318,11 @@ public class PagesWriteThrottleSmokeTest extends GridCommonAbstractTest {
delegate.write(buf, off, len);
}
+
+ /** {@inheritDoc} */
+ @Override public MappedByteBuffer map(int maxWalSegmentSize) throws IOException {
+ return delegate.map(maxWalSegmentSize);
+ }
};
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a5ffd4eb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentedRingByteBufferTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentedRingByteBufferTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentedRingByteBufferTest.java
new file mode 100644
index 0000000..e74761e
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentedRingByteBufferTest.java
@@ -0,0 +1,744 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import junit.framework.TestCase;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
+
+import static org.apache.ignite.internal.processors.cache.persistence.wal.SegmentedRingByteBuffer.BufferMode.DIRECT;
+import static org.apache.ignite.internal.processors.cache.persistence.wal.SegmentedRingByteBuffer.BufferMode.ONHEAP;
+
+/**
+ *
+ */
+public class SegmentedRingByteBufferTest extends TestCase {
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAligned() throws Exception {
+ doTestAligned(ONHEAP);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAlignedDirect() throws Exception {
+ doTestAligned(DIRECT);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNotAligned() throws Exception {
+ doTestNotAligned(ONHEAP);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNotAlignedDirect() throws Exception {
+ doTestNotAligned(DIRECT);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNoOverflowMultiThreaded() throws Exception {
+ doTestNoOverflowMultiThreaded(ONHEAP);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNoOverflowMultiThreadedDirect() throws Exception {
+ doTestNoOverflowMultiThreaded(DIRECT);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testMultiThreaded() throws Exception {
+ doTestMultiThreaded(ONHEAP);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testMultiThreadedDirect() throws Exception {
+ doTestMultiThreaded(DIRECT);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testMultiThreaded2() throws Exception {
+ doTestMultiThreaded2(ONHEAP);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testMultiThreadedDirect2() throws Exception {
+ doTestMultiThreaded2(DIRECT);
+ }
+
+ /**
+ * @param mode Mode.
+ */
+ private void doTestAligned(SegmentedRingByteBuffer.BufferMode mode) {
+ int cap = 128;
+
+ int size = 8;
+
+ SegmentedRingByteBuffer buf = new SegmentedRingByteBuffer(cap, Long.MAX_VALUE, mode);
+
+ assertNull(buf.poll());
+
+ // Head and tail in initial state.
+ for (int j = 0; j < 2; j++) {
+ for (int i = 0; i < cap / size; i++) {
+ SegmentedRingByteBuffer.WriteSegment seg = buf.offer(size);
+
+ ByteBuffer bbuf = seg.buffer();
+
+ assertEquals(size * i, bbuf.position());
+ assertEquals(size * (i + 1), bbuf.limit());
+
+ bbuf.putLong(i + (j * 10));
+
+ seg.release();
+ }
+
+ assertNull(buf.offer(size));
+
+ List<SegmentedRingByteBuffer.ReadSegment> segs = buf.poll();
+
+ ByteBuffer bbuf = segs.get(0).buffer();
+
+ assertEquals(cap, bbuf.remaining());
+
+ for (int i = 0; i < cap / size; i++)
+ assertEquals(i + (j * 10), bbuf.getLong());
+
+ segs.get(0).release();
+
+ assertEquals(0, bbuf.remaining());
+ assertNull(buf.poll());
+ }
+
+ // Move tail.
+ for (int i = 0; i < 2; i++) {
+ SegmentedRingByteBuffer.WriteSegment seg = buf.offer(size);
+
+ ByteBuffer bbuf = seg.buffer();
+
+ assertEquals(size * i, bbuf.position());
+ assertEquals(size * (i + 1), bbuf.limit());
+
+ bbuf.putLong(i);
+
+ seg.release();
+ }
+
+ // Move head to tail.
+ List<SegmentedRingByteBuffer.ReadSegment> segs = buf.poll();
+
+ ByteBuffer bbuf = segs.get(0).buffer();
+
+ assertEquals(size * 2, bbuf.remaining());
+
+ for (int i = 0; i < 2; i++)
+ assertEquals(i, bbuf.getLong());
+
+ segs.get(0).release();
+
+ assertEquals(0, bbuf.remaining());
+ assertNull(buf.poll());
+ }
+
+ /**
+ * @param mode Mode.
+ */
+ private void doTestNotAligned(SegmentedRingByteBuffer.BufferMode mode) {
+ int size = 8;
+
+ int cap = 32 - size / 2; // 3.5 long values.
+
+ SegmentedRingByteBuffer buf = new SegmentedRingByteBuffer(cap, Long.MAX_VALUE, mode);
+
+ assertNull(buf.poll());
+
+ // Write 2 segments.
+ SegmentedRingByteBuffer.WriteSegment wseg;
+ List<SegmentedRingByteBuffer.ReadSegment> rsegs;
+ ByteBuffer bbuf;
+
+ wseg = buf.offer(size);
+ bbuf = wseg.buffer();
+
+ bbuf.putLong(1);
+
+ wseg.release();
+
+ wseg = buf.offer(size);
+
+ bbuf = wseg.buffer();
+
+ bbuf.putLong(2);
+
+ wseg.release();
+
+ // Read 2 segments.
+ rsegs = buf.poll();
+ bbuf = rsegs.get(0).buffer();
+
+ assertEquals(1, bbuf.getLong());
+ assertEquals(2, bbuf.getLong());
+
+ rsegs.get(0).release();
+
+ assertNull(buf.poll());
+
+ // Write 2 segments.
+ wseg = buf.offer(size);
+ bbuf = wseg.buffer();
+
+ bbuf.putLong(3);
+
+ wseg.release();
+
+ // This one will overflow buffer.
+ wseg = buf.offer(size);
+ bbuf = wseg.buffer();
+
+ bbuf.putLong(4);
+
+ wseg.release();
+
+ // Ring buffer should return two separate segments instead of one due to an overflow.
+ rsegs = buf.poll();
+ bbuf = rsegs.get(0).buffer();
+
+ // First segment.
+ assertEquals(3, bbuf.getLong());
+ assertEquals(4, bbuf.remaining());
+
+ int pos = bbuf.position();
+
+ byte[] tmp = new byte[8];
+
+ byte[] arr = new byte[bbuf.capacity()];
+
+ bbuf.position(0);
+
+ bbuf.limit(bbuf.capacity());
+
+ bbuf.get(arr);
+
+ System.arraycopy(arr, pos, tmp, 0, 4);
+
+ // One more segment available.
+ bbuf = rsegs.get(1).buffer();
+
+ assertEquals(4, bbuf.remaining());
+
+ bbuf.position(0);
+
+ bbuf.limit(bbuf.capacity());
+
+ bbuf.get(arr);
+
+ System.arraycopy(arr, 0, tmp, 4, 4);
+
+ ByteBuffer bb = ByteBuffer.wrap(tmp);
+
+ bb.order(ByteOrder.nativeOrder());
+
+ assertEquals(4, bb.getLong());
+
+ rsegs.get(1).release();
+
+ assertNull(buf.poll());
+ }
+
+ /**
+ * @param mode Mode.
+ */
+ private void doTestNoOverflowMultiThreaded(SegmentedRingByteBuffer.BufferMode mode) throws org.apache.ignite.IgniteCheckedException {
+ int producerCnt = 16;
+
+ final int cap = 256 * 1024;
+
+ final SegmentedRingByteBuffer buf = new SegmentedRingByteBuffer(cap, Long.MAX_VALUE, mode);
+
+ final AtomicBoolean stop = new AtomicBoolean(false);
+
+ final CyclicBarrier barrier = new CyclicBarrier(producerCnt);
+
+ final AtomicLong totalWritten = new AtomicLong();
+
+ final AtomicInteger cnt = new AtomicInteger();
+
+ final Object mux = new Object();
+
+ IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
+ @Override public void run() {
+ try {
+ barrier.await();
+ }
+ catch (InterruptedException | BrokenBarrierException e) {
+ e.printStackTrace();
+
+ fail();
+ }
+
+ while (!stop.get()) {
+ TestObject obj = new TestObject();
+
+ SegmentedRingByteBuffer.WriteSegment seg = buf.offer(obj.size());
+ ByteBuffer bbuf;
+
+ if (seg == null) {
+ cnt.incrementAndGet();
+
+ synchronized (mux) {
+ try {
+ mux.wait();
+ }
+ catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ cnt.decrementAndGet();
+
+ continue;
+ }
+
+ bbuf = seg.buffer();
+
+ assertEquals(obj.size(), bbuf.remaining());
+
+ bbuf.putLong(obj.id);
+ bbuf.putInt(obj.len);
+ bbuf.put(obj.arr);
+
+ assertEquals(0, bbuf.remaining());
+
+ seg.release();
+
+ long total = totalWritten.addAndGet(obj.size());
+
+ assertTrue(total <= cap);
+ }
+ }
+ }, producerCnt, "producer-thread");
+
+ long endTime = System.currentTimeMillis() + 60 * 1000L;
+
+ while (System.currentTimeMillis() < endTime) {
+
+ while (cnt.get() < producerCnt)
+ U.sleep(10);
+
+ synchronized (mux) {
+ List<SegmentedRingByteBuffer.ReadSegment> segs = buf.poll();
+
+ if (segs != null) {
+ for (SegmentedRingByteBuffer.ReadSegment seg : segs)
+ seg.release();
+ }
+
+ totalWritten.set(0);
+
+ mux.notifyAll();
+ }
+ }
+
+ stop.set(true);
+
+ synchronized (mux) {
+ mux.notifyAll();
+ }
+
+ fut.get();
+ }
+
+ /**
+ * @param mode Mode.
+ */
+ private void doTestMultiThreaded(SegmentedRingByteBuffer.BufferMode mode) throws org.apache.ignite.IgniteCheckedException {
+ int producerCnt = 16;
+
+ final int cap = 256 * 1024;
+
+ final SegmentedRingByteBuffer buf = new SegmentedRingByteBuffer(cap, Long.MAX_VALUE, mode);
+
+ final AtomicBoolean stop = new AtomicBoolean(false);
+
+ final CyclicBarrier barrier = new CyclicBarrier(producerCnt);
+
+ IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
+ @Override public void run() {
+ try {
+ barrier.await();
+ }
+ catch (InterruptedException | BrokenBarrierException e) {
+ e.printStackTrace();
+
+ fail();
+ }
+
+ while (!stop.get()) {
+ TestObject obj = new TestObject();
+
+ SegmentedRingByteBuffer.WriteSegment seg;
+ ByteBuffer bbuf;
+
+ for (;;) {
+ if (stop.get())
+ return;
+
+ seg = buf.offer(obj.size());
+
+ if (seg != null)
+ break;
+ }
+
+ bbuf = seg.buffer();
+
+ assertEquals(obj.size(), bbuf.remaining());
+
+ bbuf.putLong(obj.id);
+ bbuf.putInt(obj.len);
+ bbuf.put(obj.arr);
+
+ assertEquals(0, bbuf.remaining());
+
+ seg.release();
+ }
+ }
+ }, producerCnt, "producer-thread");
+
+ Random rnd = new Random();
+
+ long endTime = System.currentTimeMillis() + 60 * 1000L;
+
+ while (System.currentTimeMillis() < endTime) {
+ try {
+ U.sleep(rnd.nextInt(100) + 1);
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ e.printStackTrace();
+ }
+
+ List<SegmentedRingByteBuffer.ReadSegment> segs;
+
+ if ((segs = buf.poll()) != null) {
+ for (SegmentedRingByteBuffer.ReadSegment seg : segs) {
+ assertTrue(seg.buffer().hasRemaining());
+
+ seg.release();
+ }
+ }
+ }
+
+ stop.set(true);
+
+ fut.get();
+ }
+
+ /**
+ * @param mode Mode.
+ */
+ private void doTestMultiThreaded2(SegmentedRingByteBuffer.BufferMode mode) throws org.apache.ignite.IgniteCheckedException {
+ int producerCnt = 16;
+
+ final int cap = 256 * 1024;
+
+ final SegmentedRingByteBuffer buf = new SegmentedRingByteBuffer(cap, Long.MAX_VALUE, mode);
+
+ final AtomicBoolean stop = new AtomicBoolean(false);
+
+ final CyclicBarrier barrier = new CyclicBarrier(producerCnt);
+
+ final Set<TestObject> items = Collections.newSetFromMap(new ConcurrentHashMap<TestObject, Boolean>());
+
+ IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
+ @Override public void run() {
+ try {
+ barrier.await();
+ }
+ catch (InterruptedException | BrokenBarrierException e) {
+ e.printStackTrace();
+
+ fail();
+ }
+
+ while (!stop.get()) {
+ TestObject obj = new TestObject();
+
+ SegmentedRingByteBuffer.WriteSegment seg;
+ ByteBuffer bbuf;
+
+ for (;;) {
+ if (stop.get())
+ return;
+
+ seg = buf.offer(obj.size());
+
+ if (seg != null)
+ break;
+ }
+
+ bbuf = seg.buffer();
+
+ assertEquals(obj.size(), bbuf.remaining());
+
+ bbuf.putLong(obj.id);
+ bbuf.putInt(obj.len);
+ bbuf.put(obj.arr);
+
+ assertEquals(0, bbuf.remaining());
+
+ assertTrue("Ooops! The same value is already exist in Set! ", items.add(obj));
+
+ seg.release();
+ }
+ }
+ }, producerCnt, "producer-thread");
+
+ Random rnd = new Random();
+
+ long endTime = System.currentTimeMillis() + 60 * 1000L;
+
+ while (System.currentTimeMillis() < endTime) {
+ try {
+ U.sleep(rnd.nextInt(100) + 1);
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ e.printStackTrace();
+ }
+
+ List<SegmentedRingByteBuffer.ReadSegment> segs;
+
+ while ((segs = buf.poll()) != null) {
+ int size = 0;
+
+ for (SegmentedRingByteBuffer.ReadSegment seg : segs) {
+ ByteBuffer bbuf = seg.buffer();
+
+ assertTrue(bbuf.hasRemaining());
+
+ size += bbuf.remaining();
+ }
+
+ byte[] arr = new byte[size];
+
+ int idx = 0;
+
+ for (SegmentedRingByteBuffer.ReadSegment seg : segs) {
+ ByteBuffer bbuf = seg.buffer();
+
+ assertTrue(bbuf.hasRemaining());
+
+ int len = bbuf.remaining();
+
+ bbuf.get(arr, idx, len);
+
+ idx += len;
+ }
+
+ ByteBuffer bbuf = ByteBuffer.wrap(arr);
+
+ bbuf.order(ByteOrder.nativeOrder());
+
+ assertTrue(bbuf.hasRemaining());
+
+ while (bbuf.hasRemaining()) {
+ long id = bbuf.getLong();
+
+ int len = bbuf.getInt();
+
+ arr = new byte[len];
+
+ bbuf.get(arr);
+
+ TestObject obj = new TestObject(id, arr);
+
+ assertTrue(items.remove(obj));
+ }
+
+ for (SegmentedRingByteBuffer.ReadSegment seg : segs)
+ seg.release();
+ }
+ }
+
+ stop.set(true);
+
+ fut.get();
+
+ List<SegmentedRingByteBuffer.ReadSegment> segs;
+
+ while ((segs = buf.poll()) != null) {
+ int size = 0;
+
+ for (SegmentedRingByteBuffer.ReadSegment seg : segs) {
+ ByteBuffer bbuf = seg.buffer();
+
+ assertTrue(bbuf.hasRemaining());
+
+ size += bbuf.remaining();
+ }
+
+ byte[] arr = new byte[size];
+
+ int idx = 0;
+
+ for (SegmentedRingByteBuffer.ReadSegment seg : segs) {
+ ByteBuffer bbuf = seg.buffer();
+
+ assertTrue(bbuf.hasRemaining());
+
+ int len = bbuf.remaining();
+
+ bbuf.get(arr, idx, len);
+
+ idx += len;
+ }
+
+ ByteBuffer bbuf = ByteBuffer.wrap(arr);
+
+ bbuf.order(ByteOrder.nativeOrder());
+
+ assertTrue(bbuf.hasRemaining());
+
+ while (bbuf.hasRemaining()) {
+ long id = bbuf.getLong();
+
+ int len = bbuf.getInt();
+
+ arr = new byte[len];
+
+ bbuf.get(arr);
+
+ TestObject obj = new TestObject(id, arr);
+
+ assertTrue(items.remove(obj));
+ }
+
+ for (SegmentedRingByteBuffer.ReadSegment seg : segs)
+ seg.release();
+ }
+
+ assertNull(buf.poll());
+ assertEquals(0, items.size());
+ }
+
+ /**
+ *
+ */
+ private static class TestObject {
+ /** Id. */
+ private long id;
+
+ /** Length. */
+ private int len;
+
+ /** Array. */
+ private byte[] arr;
+
+ /**
+ * Default constructor.
+ */
+ public TestObject() {
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ id = rnd.nextLong();
+ len = rnd.nextInt(32 * 1024);
+ arr = new byte[len];
+
+ rnd.nextBytes(arr);
+ }
+
+ /**
+ * @param id Id.
+ * @param arr Array.
+ */
+ public TestObject(long id, byte[] arr) {
+ this.id = id;
+ this.len = arr.length;
+ this.arr = arr;
+ }
+
+ /**
+ *
+ */
+ public int size() {
+ return 8 + 4 + arr.length;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object that) {
+ if (this == that)
+ return true;
+
+ if (that == null || getClass() != that.getClass())
+ return false;
+
+ TestObject obj = (TestObject)that;
+
+ if (id != obj.id)
+ return false;
+
+ if (len != obj.len)
+ return false;
+
+ return Arrays.equals(arr, obj.arr);
+
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int res = (int)(id ^ (id >>> 32));
+
+ res = 31 * res + len;
+
+ res = 31 * res + Arrays.hashCode(arr);
+
+ return res;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/a5ffd4eb/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsOutOfMemoryTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsOutOfMemoryTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsOutOfMemoryTestSuite.java
deleted file mode 100644
index 3e040f4..0000000
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsOutOfMemoryTestSuite.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.testsuites;
-
-import junit.framework.TestSuite;
-import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgnitePdsWalTlbTest;
-
-/**
- *
- */
-public class IgnitePdsOutOfMemoryTestSuite extends TestSuite {
- /**
- * @return Suite.
- * @throws Exception If failed.
- */
- public static TestSuite suite() throws Exception {
- TestSuite suite = new TestSuite("Ignite Persistent Store OOM Test Suite");
-
- suite.addTestSuite(IgnitePdsWalTlbTest.class);
-
- return suite;
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a5ffd4eb/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
index ef7682f..3b071de 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
@@ -32,6 +32,7 @@ import org.apache.ignite.internal.processors.cache.persistence.pagemem.MetadataS
import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImplNoLoadTest;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImplTest;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.PagesWriteThrottleSmokeTest;
+import org.apache.ignite.internal.processors.cache.persistence.wal.SegmentedRingByteBufferTest;
import org.apache.ignite.internal.processors.database.IgniteDbClientNearCachePutGetTest;
import org.apache.ignite.internal.processors.database.IgniteDbDynamicCacheSelfTest;
import org.apache.ignite.internal.processors.database.IgniteDbMultiNodePutGetTest;
@@ -81,6 +82,8 @@ public class IgnitePdsTestSuite extends TestSuite {
suite.addTestSuite(DefaultPageSizeBackwardsCompatibilityTest.class);
+ suite.addTestSuite(SegmentedRingByteBufferTest.class);
+
// Write throttling
suite.addTestSuite(PagesWriteThrottleSmokeTest.class);