You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/12/26 07:33:38 UTC

[05/12] ignite git commit: ignite-6339 WAL write operations are optimized and file IO operations are non-interruptible from user thread now

http://git-wip-us.apache.org/repos/asf/ignite/blob/a5ffd4eb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java
index 1844bfe..aa8d57f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java
@@ -147,7 +147,6 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest {
             memCfg.setWalAutoArchiveAfterInactivity(archiveIncompleteSegmentAfterInactivityMs);
 
         cfg.setDataStorageConfiguration(memCfg);
-
         return cfg;
     }
 
@@ -441,10 +440,9 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest {
 
         final IgniteCache<Object, Object> entries = txPutDummyRecords(ignite0, cntEntries, txCnt);
 
-        final Map<Object, Object> ctrlMap = new HashMap<>();
-        for (Cache.Entry<Object, Object> next : entries) {
-            ctrlMap.put(next.getKey(), next.getValue());
-        }
+        final Map<Object, Object> ctrlMap = new HashMap<>();    for (Cache.Entry<Object, Object> next : entries)
+                ctrlMap.put(next.getKey(), next.getValue());
+
 
         final String subfolderName = genDbSubfolderName(ignite0, 0);
         stopGrid("node0");
@@ -467,14 +465,16 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest {
 
                 if (val instanceof IndexedObject) {
                     IndexedObject indexedObj = (IndexedObject)val;
+
                     assertEquals(indexedObj.iVal, indexedObj.jVal);
                     assertEquals(indexedObj.iVal, key);
-                    for (byte datum : indexedObj.getData()) {
+
+                    for (byte datum : indexedObj.getData())
                         assertTrue(datum >= 'A' && datum <= 'A' + 10);
-                    }
                 }
             }
         };
+
         scanIterateAndCount(factory, workDir, subfolderName, cntEntries, txCnt, objConsumer, null);
 
         assertTrue(" Control Map is not empty after reading entries " + ctrlMap, ctrlMap.isEmpty());
@@ -555,7 +555,7 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest {
      * @throws Exception if failed.
      */
     public void testFillWalWithDifferentTypes() throws Exception {
-        int cntEntries = 0;
+        int cntEntries;
 
         final Map<Object, Object> ctrlMap = new HashMap<>();
         final Map<Object, Object> ctrlMapForBinaryObjects = new HashMap<>();
@@ -595,14 +595,13 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest {
         ctrlStringsForBinaryObjSearch.add(search3); //validate only string itself
         addlCache.put(key, "SearchKey");
 
-        cntEntries = addlCache.size();
-        for (Cache.Entry<Object, Object> next : addlCache) {
-            ctrlMap.put(next.getKey(), next.getValue());
-        }
+            cntEntries = addlCache.size();
+            for (Cache.Entry<Object, Object> next : addlCache)
+                ctrlMap.put(next.getKey(), next.getValue());
+
+            for (Cache.Entry<Object, Object> next : addlCache)
+                ctrlMapForBinaryObjects.put(next.getKey(), next.getValue());
 
-        for (Cache.Entry<Object, Object> next : addlCache) {
-            ctrlMapForBinaryObjects.put(next.getKey(), next.getValue());
-        }
 
         final String subfolderName = genDbSubfolderName(ignite0, 0);
 
@@ -693,9 +692,8 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest {
                             binaryObj.field("jVal").toString());
 
                         byte data[] = binaryObj.field("data");
-                        for (byte datum : data) {
+                        for (byte datum : data)
                             assertTrue(datum >= 'A' && datum <= 'A' + 10);
-                        }
                     }
                 }
             }
@@ -704,15 +702,19 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest {
         final IgniteInClosure<DataRecord> binObjToStrChecker = new IgniteInClosure<DataRecord>() {
             @Override public void apply(DataRecord record) {
                 String strRepresentation = record.toString();
+
                 for (Iterator<String> iter = ctrlStringsForBinaryObjSearch.iterator(); iter.hasNext(); ) {
                     final String next = iter.next();
+
                     if (strRepresentation.contains(next)) {
                         iter.remove();
+
                         break;
                     }
                 }
             }
         };
+
         scanIterateAndCount(keepBinFactory, workDir, subfolderName, cntEntries, 0, binObjConsumer, binObjToStrChecker);
 
         assertTrue(" Control Map is not empty after reading entries: " + ctrlMapForBinaryObjects,

http://git-wip-us.apache.org/repos/asf/ignite/blob/a5ffd4eb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/MockWalIteratorFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/MockWalIteratorFactory.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/MockWalIteratorFactory.java
index 5c9e084..8068b08 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/MockWalIteratorFactory.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/MockWalIteratorFactory.java
@@ -64,7 +64,7 @@ public class MockWalIteratorFactory {
      * @param log Logger.
      * @param pageSize Page size.
      * @param consistentId Consistent id.
-     * @param subfolderName
+     * @param subfolderName Subfolder name.
      * @param segments Segments.
      */
     public MockWalIteratorFactory(@Nullable IgniteLogger log,
@@ -85,13 +85,14 @@ public class MockWalIteratorFactory {
      * @return iterator
      * @throws IgniteCheckedException if IO failed
      */
+    @SuppressWarnings("unchecked")
     public WALIterator iterator(File wal, File walArchive) throws IgniteCheckedException {
         final DataStorageConfiguration persistentCfg1 = Mockito.mock(DataStorageConfiguration.class);
 
         when(persistentCfg1.getWalPath()).thenReturn(wal.getAbsolutePath());
         when(persistentCfg1.getWalArchivePath()).thenReturn(walArchive.getAbsolutePath());
         when(persistentCfg1.getWalSegments()).thenReturn(segments);
-        when(persistentCfg1.getWalThreadLocalBufferSize()).thenReturn(DataStorageConfiguration.DFLT_TLB_SIZE);
+        when(persistentCfg1.getWalBufferSize()).thenReturn(DataStorageConfiguration.DFLT_WAL_BUFF_SIZE);
         when(persistentCfg1.getWalRecordIteratorBufferSize()).thenReturn(DataStorageConfiguration.DFLT_WAL_RECORD_ITERATOR_BUFFER_SIZE);
 
         final FileIOFactory fileIOFactory = new DataStorageConfiguration().getFileIOFactory();
@@ -121,10 +122,10 @@ public class MockWalIteratorFactory {
         when(sctx.discovery()).thenReturn(disco);
         when(sctx.gridConfig()).thenReturn(cfg);
 
-        final GridCacheDatabaseSharedManager database = Mockito.mock(GridCacheDatabaseSharedManager.class);
+        final GridCacheDatabaseSharedManager db = Mockito.mock(GridCacheDatabaseSharedManager.class);
 
-        when(database.pageSize()).thenReturn(pageSize);
-        when(sctx.database()).thenReturn(database);
+        when(db.pageSize()).thenReturn(pageSize);
+        when(sctx.database()).thenReturn(db);
         when(sctx.logger(any(Class.class))).thenReturn(log);
 
         mgr.start(sctx);

http://git-wip-us.apache.org/repos/asf/ignite/blob/a5ffd4eb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSmokeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSmokeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSmokeTest.java
index 1875cfb..57fecbe 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSmokeTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSmokeTest.java
@@ -20,6 +20,7 @@ import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
 import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
 import java.nio.file.OpenOption;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -229,6 +230,7 @@ public class PagesWriteThrottleSmokeTest extends GridCommonAbstractTest {
         private final int v2;
 
         /** */
+        @SuppressWarnings("unused")
         private byte[] payload = new byte[400 + ThreadLocalRandom.current().nextInt(20)];
 
         /**
@@ -316,6 +318,11 @@ public class PagesWriteThrottleSmokeTest extends GridCommonAbstractTest {
 
                     delegate.write(buf, off, len);
                 }
+
+                /** {@inheritDoc} */
+                @Override public MappedByteBuffer map(int maxWalSegmentSize) throws IOException {
+                    return delegate.map(maxWalSegmentSize);
+                }
             };
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a5ffd4eb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentedRingByteBufferTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentedRingByteBufferTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentedRingByteBufferTest.java
new file mode 100644
index 0000000..e74761e
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentedRingByteBufferTest.java
@@ -0,0 +1,744 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import junit.framework.TestCase;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
+
+import static org.apache.ignite.internal.processors.cache.persistence.wal.SegmentedRingByteBuffer.BufferMode.DIRECT;
+import static org.apache.ignite.internal.processors.cache.persistence.wal.SegmentedRingByteBuffer.BufferMode.ONHEAP;
+
+/**
+ *
+ */
+public class SegmentedRingByteBufferTest extends TestCase {
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAligned() throws Exception {
+        doTestAligned(ONHEAP);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAlignedDirect() throws Exception {
+        doTestAligned(DIRECT);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNotAligned() throws Exception {
+        doTestNotAligned(ONHEAP);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNotAlignedDirect() throws Exception {
+        doTestNotAligned(DIRECT);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNoOverflowMultiThreaded() throws Exception {
+        doTestNoOverflowMultiThreaded(ONHEAP);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNoOverflowMultiThreadedDirect() throws Exception {
+        doTestNoOverflowMultiThreaded(DIRECT);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMultiThreaded() throws Exception {
+        doTestMultiThreaded(ONHEAP);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMultiThreadedDirect() throws Exception {
+        doTestMultiThreaded(DIRECT);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMultiThreaded2() throws Exception {
+        doTestMultiThreaded2(ONHEAP);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMultiThreadedDirect2() throws Exception {
+        doTestMultiThreaded2(DIRECT);
+    }
+
+    /**
+     * @param mode Mode.
+     */
+    private void doTestAligned(SegmentedRingByteBuffer.BufferMode mode) {
+        int cap = 128;
+
+        int size = 8;
+
+        SegmentedRingByteBuffer buf = new SegmentedRingByteBuffer(cap, Long.MAX_VALUE, mode);
+
+        assertNull(buf.poll());
+
+        // Head and tail in initial state.
+        for (int j = 0; j < 2; j++) {
+            for (int i = 0; i < cap / size; i++) {
+                SegmentedRingByteBuffer.WriteSegment seg = buf.offer(size);
+
+                ByteBuffer bbuf = seg.buffer();
+
+                assertEquals(size * i, bbuf.position());
+                assertEquals(size * (i + 1), bbuf.limit());
+
+                bbuf.putLong(i + (j * 10));
+
+                seg.release();
+            }
+
+            assertNull(buf.offer(size));
+
+            List<SegmentedRingByteBuffer.ReadSegment> segs = buf.poll();
+
+            ByteBuffer bbuf = segs.get(0).buffer();
+
+            assertEquals(cap, bbuf.remaining());
+
+            for (int i = 0; i < cap / size; i++)
+                assertEquals(i + (j * 10), bbuf.getLong());
+
+            segs.get(0).release();
+
+            assertEquals(0, bbuf.remaining());
+            assertNull(buf.poll());
+        }
+
+        // Move tail.
+        for (int i = 0; i < 2; i++) {
+            SegmentedRingByteBuffer.WriteSegment seg = buf.offer(size);
+
+            ByteBuffer bbuf = seg.buffer();
+
+            assertEquals(size * i, bbuf.position());
+            assertEquals(size * (i + 1), bbuf.limit());
+
+            bbuf.putLong(i);
+
+            seg.release();
+        }
+
+        // Move head to tail.
+        List<SegmentedRingByteBuffer.ReadSegment> segs = buf.poll();
+
+        ByteBuffer bbuf = segs.get(0).buffer();
+
+        assertEquals(size * 2, bbuf.remaining());
+
+        for (int i = 0; i < 2; i++)
+            assertEquals(i, bbuf.getLong());
+
+        segs.get(0).release();
+
+        assertEquals(0, bbuf.remaining());
+        assertNull(buf.poll());
+    }
+
+    /**
+     * @param mode Mode.
+     */
+    private void doTestNotAligned(SegmentedRingByteBuffer.BufferMode mode) {
+        int size = 8;
+
+        int cap = 32 - size / 2; // 3.5 long values.
+
+        SegmentedRingByteBuffer buf = new SegmentedRingByteBuffer(cap, Long.MAX_VALUE, mode);
+
+        assertNull(buf.poll());
+
+        // Write 2 segments.
+        SegmentedRingByteBuffer.WriteSegment wseg;
+        List<SegmentedRingByteBuffer.ReadSegment> rsegs;
+        ByteBuffer bbuf;
+
+        wseg = buf.offer(size);
+        bbuf = wseg.buffer();
+
+        bbuf.putLong(1);
+
+        wseg.release();
+
+        wseg = buf.offer(size);
+
+        bbuf = wseg.buffer();
+
+        bbuf.putLong(2);
+
+        wseg.release();
+
+        // Read 2 segments.
+        rsegs = buf.poll();
+        bbuf = rsegs.get(0).buffer();
+
+        assertEquals(1, bbuf.getLong());
+        assertEquals(2, bbuf.getLong());
+
+        rsegs.get(0).release();
+
+        assertNull(buf.poll());
+
+        // Write 2 segments.
+        wseg = buf.offer(size);
+        bbuf = wseg.buffer();
+
+        bbuf.putLong(3);
+
+        wseg.release();
+
+        // This one will overflow buffer.
+        wseg = buf.offer(size);
+        bbuf = wseg.buffer();
+
+        bbuf.putLong(4);
+
+        wseg.release();
+
+        // Ring buffer should return two separate segments instead of one due to an overflow.
+        rsegs = buf.poll();
+        bbuf = rsegs.get(0).buffer();
+
+        // First segment.
+        assertEquals(3, bbuf.getLong());
+        assertEquals(4, bbuf.remaining());
+
+        int pos = bbuf.position();
+
+        byte[] tmp = new byte[8];
+
+        byte[] arr = new byte[bbuf.capacity()];
+
+        bbuf.position(0);
+
+        bbuf.limit(bbuf.capacity());
+
+        bbuf.get(arr);
+
+        System.arraycopy(arr, pos, tmp, 0, 4);
+
+        // One more segment available.
+        bbuf = rsegs.get(1).buffer();
+
+        assertEquals(4, bbuf.remaining());
+
+        bbuf.position(0);
+
+        bbuf.limit(bbuf.capacity());
+
+        bbuf.get(arr);
+
+        System.arraycopy(arr, 0, tmp, 4, 4);
+
+        ByteBuffer bb = ByteBuffer.wrap(tmp);
+
+        bb.order(ByteOrder.nativeOrder());
+
+        assertEquals(4, bb.getLong());
+
+        rsegs.get(1).release();
+
+        assertNull(buf.poll());
+    }
+
+    /**
+     * @param mode Mode.
+     */
+    private void doTestNoOverflowMultiThreaded(SegmentedRingByteBuffer.BufferMode mode) throws org.apache.ignite.IgniteCheckedException {
+        int producerCnt = 16;
+
+        final int cap = 256 * 1024;
+
+        final SegmentedRingByteBuffer buf = new SegmentedRingByteBuffer(cap, Long.MAX_VALUE, mode);
+
+        final AtomicBoolean stop = new AtomicBoolean(false);
+
+        final CyclicBarrier barrier = new CyclicBarrier(producerCnt);
+
+        final AtomicLong totalWritten = new AtomicLong();
+
+        final AtomicInteger cnt = new AtomicInteger();
+
+        final Object mux = new Object();
+
+        IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
+            @Override public void run() {
+                try {
+                    barrier.await();
+                }
+                catch (InterruptedException | BrokenBarrierException e) {
+                    e.printStackTrace();
+
+                    fail();
+                }
+
+                while (!stop.get()) {
+                    TestObject obj = new TestObject();
+
+                    SegmentedRingByteBuffer.WriteSegment seg = buf.offer(obj.size());
+                    ByteBuffer bbuf;
+
+                    if (seg == null) {
+                        cnt.incrementAndGet();
+
+                        synchronized (mux) {
+                            try {
+                                mux.wait();
+                            }
+                            catch (InterruptedException e) {
+                                e.printStackTrace();
+                            }
+                        }
+
+                        cnt.decrementAndGet();
+
+                        continue;
+                    }
+
+                    bbuf = seg.buffer();
+
+                    assertEquals(obj.size(), bbuf.remaining());
+
+                    bbuf.putLong(obj.id);
+                    bbuf.putInt(obj.len);
+                    bbuf.put(obj.arr);
+
+                    assertEquals(0, bbuf.remaining());
+
+                    seg.release();
+
+                    long total = totalWritten.addAndGet(obj.size());
+
+                    assertTrue(total <= cap);
+                }
+            }
+        }, producerCnt, "producer-thread");
+
+        long endTime = System.currentTimeMillis() + 60 * 1000L;
+
+        while (System.currentTimeMillis() < endTime) {
+
+            while (cnt.get() < producerCnt)
+                U.sleep(10);
+
+            synchronized (mux) {
+                List<SegmentedRingByteBuffer.ReadSegment> segs = buf.poll();
+
+                if (segs != null) {
+                    for (SegmentedRingByteBuffer.ReadSegment seg : segs)
+                        seg.release();
+                }
+
+                totalWritten.set(0);
+
+                mux.notifyAll();
+            }
+        }
+
+        stop.set(true);
+
+        synchronized (mux) {
+            mux.notifyAll();
+        }
+
+        fut.get();
+    }
+
+    /**
+     * @param mode Mode.
+     */
+    private void doTestMultiThreaded(SegmentedRingByteBuffer.BufferMode mode) throws org.apache.ignite.IgniteCheckedException {
+        int producerCnt = 16;
+
+        final int cap = 256 * 1024;
+
+        final SegmentedRingByteBuffer buf = new SegmentedRingByteBuffer(cap, Long.MAX_VALUE, mode);
+
+        final AtomicBoolean stop = new AtomicBoolean(false);
+
+        final CyclicBarrier barrier = new CyclicBarrier(producerCnt);
+
+        IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
+            @Override public void run() {
+                try {
+                    barrier.await();
+                }
+                catch (InterruptedException | BrokenBarrierException e) {
+                    e.printStackTrace();
+
+                    fail();
+                }
+
+                while (!stop.get()) {
+                    TestObject obj = new TestObject();
+
+                    SegmentedRingByteBuffer.WriteSegment seg;
+                    ByteBuffer bbuf;
+
+                    for (;;) {
+                        if (stop.get())
+                            return;
+
+                        seg = buf.offer(obj.size());
+
+                        if (seg != null)
+                            break;
+                    }
+
+                    bbuf = seg.buffer();
+
+                    assertEquals(obj.size(), bbuf.remaining());
+
+                    bbuf.putLong(obj.id);
+                    bbuf.putInt(obj.len);
+                    bbuf.put(obj.arr);
+
+                    assertEquals(0, bbuf.remaining());
+
+                    seg.release();
+                }
+            }
+        }, producerCnt, "producer-thread");
+
+        Random rnd = new Random();
+
+        long endTime = System.currentTimeMillis() + 60 * 1000L;
+
+        while (System.currentTimeMillis() < endTime) {
+            try {
+                U.sleep(rnd.nextInt(100) + 1);
+            }
+            catch (IgniteInterruptedCheckedException e) {
+                e.printStackTrace();
+            }
+
+            List<SegmentedRingByteBuffer.ReadSegment> segs;
+
+            if ((segs = buf.poll()) != null) {
+                for (SegmentedRingByteBuffer.ReadSegment seg : segs) {
+                    assertTrue(seg.buffer().hasRemaining());
+
+                    seg.release();
+                }
+            }
+        }
+
+        stop.set(true);
+
+        fut.get();
+    }
+
+    /**
+     * @param mode Mode.
+     */
+    private void doTestMultiThreaded2(SegmentedRingByteBuffer.BufferMode mode) throws org.apache.ignite.IgniteCheckedException {
+        int producerCnt = 16;
+
+        final int cap = 256 * 1024;
+
+        final SegmentedRingByteBuffer buf = new SegmentedRingByteBuffer(cap, Long.MAX_VALUE, mode);
+
+        final AtomicBoolean stop = new AtomicBoolean(false);
+
+        final CyclicBarrier barrier = new CyclicBarrier(producerCnt);
+
+        final Set<TestObject> items = Collections.newSetFromMap(new ConcurrentHashMap<TestObject, Boolean>());
+
+        IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
+            @Override public void run() {
+                try {
+                    barrier.await();
+                }
+                catch (InterruptedException | BrokenBarrierException e) {
+                    e.printStackTrace();
+
+                    fail();
+                }
+
+                while (!stop.get()) {
+                    TestObject obj = new TestObject();
+
+                    SegmentedRingByteBuffer.WriteSegment seg;
+                    ByteBuffer bbuf;
+
+                    for (;;) {
+                        if (stop.get())
+                            return;
+
+                        seg = buf.offer(obj.size());
+
+                        if (seg != null)
+                            break;
+                    }
+
+                    bbuf = seg.buffer();
+
+                    assertEquals(obj.size(), bbuf.remaining());
+
+                    bbuf.putLong(obj.id);
+                    bbuf.putInt(obj.len);
+                    bbuf.put(obj.arr);
+
+                    assertEquals(0, bbuf.remaining());
+
+                    assertTrue("Ooops! The same value is already exist in Set! ", items.add(obj));
+
+                    seg.release();
+                }
+            }
+        }, producerCnt, "producer-thread");
+
+        Random rnd = new Random();
+
+        long endTime = System.currentTimeMillis() + 60 * 1000L;
+
+        while (System.currentTimeMillis() < endTime) {
+            try {
+                U.sleep(rnd.nextInt(100) + 1);
+            }
+            catch (IgniteInterruptedCheckedException e) {
+                e.printStackTrace();
+            }
+
+            List<SegmentedRingByteBuffer.ReadSegment> segs;
+
+            while ((segs = buf.poll()) != null) {
+                int size = 0;
+
+                for (SegmentedRingByteBuffer.ReadSegment seg : segs) {
+                    ByteBuffer bbuf = seg.buffer();
+
+                    assertTrue(bbuf.hasRemaining());
+
+                    size += bbuf.remaining();
+                }
+
+                byte[] arr = new byte[size];
+
+                int idx = 0;
+
+                for (SegmentedRingByteBuffer.ReadSegment seg : segs) {
+                    ByteBuffer bbuf = seg.buffer();
+
+                    assertTrue(bbuf.hasRemaining());
+
+                    int len = bbuf.remaining();
+
+                    bbuf.get(arr, idx, len);
+
+                    idx += len;
+                }
+
+                ByteBuffer bbuf = ByteBuffer.wrap(arr);
+
+                bbuf.order(ByteOrder.nativeOrder());
+
+                assertTrue(bbuf.hasRemaining());
+
+                while (bbuf.hasRemaining()) {
+                    long id = bbuf.getLong();
+
+                    int len = bbuf.getInt();
+
+                    arr = new byte[len];
+
+                    bbuf.get(arr);
+
+                    TestObject obj = new TestObject(id, arr);
+
+                    assertTrue(items.remove(obj));
+                }
+
+                for (SegmentedRingByteBuffer.ReadSegment seg : segs)
+                    seg.release();
+            }
+        }
+
+        stop.set(true);
+
+        fut.get();
+
+        List<SegmentedRingByteBuffer.ReadSegment> segs;
+
+        while ((segs = buf.poll()) != null) {
+            int size = 0;
+
+            for (SegmentedRingByteBuffer.ReadSegment seg : segs) {
+                ByteBuffer bbuf = seg.buffer();
+
+                assertTrue(bbuf.hasRemaining());
+
+                size += bbuf.remaining();
+            }
+
+            byte[] arr = new byte[size];
+
+            int idx = 0;
+
+            for (SegmentedRingByteBuffer.ReadSegment seg : segs) {
+                ByteBuffer bbuf = seg.buffer();
+
+                assertTrue(bbuf.hasRemaining());
+
+                int len = bbuf.remaining();
+
+                bbuf.get(arr, idx, len);
+
+                idx += len;
+            }
+
+            ByteBuffer bbuf = ByteBuffer.wrap(arr);
+
+            bbuf.order(ByteOrder.nativeOrder());
+
+            assertTrue(bbuf.hasRemaining());
+
+            while (bbuf.hasRemaining()) {
+                long id = bbuf.getLong();
+
+                int len = bbuf.getInt();
+
+                arr = new byte[len];
+
+                bbuf.get(arr);
+
+                TestObject obj = new TestObject(id, arr);
+
+                assertTrue(items.remove(obj));
+            }
+
+            for (SegmentedRingByteBuffer.ReadSegment seg : segs)
+                seg.release();
+        }
+
+        assertNull(buf.poll());
+        assertEquals(0, items.size());
+    }
+
+    /**
+     *
+     */
+    private static class TestObject {
+        /** Id. */
+        private long id;
+
+        /** Length. */
+        private int len;
+
+        /** Array. */
+        private byte[] arr;
+
+        /**
+         * Default constructor.
+         */
+        public TestObject() {
+            ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+            id = rnd.nextLong();
+            len = rnd.nextInt(32 * 1024);
+            arr = new byte[len];
+
+            rnd.nextBytes(arr);
+        }
+
+        /**
+         * @param id Id.
+         * @param arr Array.
+         */
+        public TestObject(long id, byte[] arr) {
+            this.id = id;
+            this.len = arr.length;
+            this.arr = arr;
+        }
+
+        /**
+         *
+         */
+        public int size() {
+            return 8 + 4 + arr.length;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object that) {
+            if (this == that)
+                return true;
+
+            if (that == null || getClass() != that.getClass())
+                return false;
+
+            TestObject obj = (TestObject)that;
+
+            if (id != obj.id)
+                return false;
+
+            if (len != obj.len)
+                return false;
+
+            return Arrays.equals(arr, obj.arr);
+
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            int res = (int)(id ^ (id >>> 32));
+
+            res = 31 * res + len;
+
+            res = 31 * res + Arrays.hashCode(arr);
+
+            return res;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/a5ffd4eb/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsOutOfMemoryTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsOutOfMemoryTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsOutOfMemoryTestSuite.java
deleted file mode 100644
index 3e040f4..0000000
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsOutOfMemoryTestSuite.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.testsuites;
-
-import junit.framework.TestSuite;
-import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgnitePdsWalTlbTest;
-
-/**
- *
- */
-public class IgnitePdsOutOfMemoryTestSuite extends TestSuite {
-    /**
-     * @return Suite.
-     * @throws Exception If failed.
-     */
-    public static TestSuite suite() throws Exception {
-        TestSuite suite = new TestSuite("Ignite Persistent Store OOM Test Suite");
-
-        suite.addTestSuite(IgnitePdsWalTlbTest.class);
-
-        return suite;
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a5ffd4eb/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
index ef7682f..3b071de 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
@@ -32,6 +32,7 @@ import org.apache.ignite.internal.processors.cache.persistence.pagemem.MetadataS
 import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImplNoLoadTest;
 import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImplTest;
 import org.apache.ignite.internal.processors.cache.persistence.pagemem.PagesWriteThrottleSmokeTest;
+import org.apache.ignite.internal.processors.cache.persistence.wal.SegmentedRingByteBufferTest;
 import org.apache.ignite.internal.processors.database.IgniteDbClientNearCachePutGetTest;
 import org.apache.ignite.internal.processors.database.IgniteDbDynamicCacheSelfTest;
 import org.apache.ignite.internal.processors.database.IgniteDbMultiNodePutGetTest;
@@ -81,6 +82,8 @@ public class IgnitePdsTestSuite extends TestSuite {
 
         suite.addTestSuite(DefaultPageSizeBackwardsCompatibilityTest.class);
 
+        suite.addTestSuite(SegmentedRingByteBufferTest.class);
+
         // Write throttling
         suite.addTestSuite(PagesWriteThrottleSmokeTest.class);