You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/01/17 11:52:13 UTC
[6/7] ignite git commit: ignite-3477 PageMemory optimizations - use
page address instead of ByteBuffer to work with page memory - got rid of
pages pin/unpin - do not copy byte array for cache key comparison - reduced
size of data tree search row
http://git-wip-us.apache.org/repos/asf/ignite/blob/7db65ddd/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/SplitExistingPageRecord.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/SplitExistingPageRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/SplitExistingPageRecord.java
index 8ec5f8f..418d28b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/SplitExistingPageRecord.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/SplitExistingPageRecord.java
@@ -17,8 +17,8 @@
package org.apache.ignite.internal.pagemem.wal.record.delta;
-import java.nio.ByteBuffer;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.PageMemory;
import org.apache.ignite.internal.processors.cache.database.tree.io.BPlusIO;
import org.apache.ignite.internal.processors.cache.database.tree.io.PageIO;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -50,10 +50,10 @@ public class SplitExistingPageRecord extends PageDeltaRecord {
}
/** {@inheritDoc} */
- @Override public void applyDelta(ByteBuffer buf) throws IgniteCheckedException {
- BPlusIO<?> io = PageIO.getBPlusIO(buf);
+ @Override public void applyDelta(PageMemory pageMem, long pageAddr) throws IgniteCheckedException {
+ BPlusIO<?> io = PageIO.getBPlusIO(pageAddr);
- io.splitExistingPage(buf, mid, fwdId);
+ io.splitExistingPage(pageAddr, mid, fwdId);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/7db65ddd/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/SplitForwardPageRecord.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/SplitForwardPageRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/SplitForwardPageRecord.java
index b4487fa..39f2669 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/SplitForwardPageRecord.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/SplitForwardPageRecord.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.pagemem.wal.record.delta;
import java.nio.ByteBuffer;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.pagemem.PageMemory;
/**
* Split forward page record.
@@ -79,7 +80,7 @@ public class SplitForwardPageRecord extends PageDeltaRecord {
}
/** {@inheritDoc} */
- @Override public void applyDelta(ByteBuffer fwdBuf) throws IgniteCheckedException {
+ @Override public void applyDelta(PageMemory pageMem, long pageAddr) throws IgniteCheckedException {
throw new IgniteCheckedException("Split forward page record should not be logged.");
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7db65ddd/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/TrackingPageDeltaRecord.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/TrackingPageDeltaRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/TrackingPageDeltaRecord.java
index 9d00d77..7cd0948 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/TrackingPageDeltaRecord.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/TrackingPageDeltaRecord.java
@@ -17,8 +17,8 @@
package org.apache.ignite.internal.pagemem.wal.record.delta;
-import java.nio.ByteBuffer;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.PageMemory;
import org.apache.ignite.internal.processors.cache.database.tree.io.TrackingPageIO;
/**
@@ -70,8 +70,12 @@ public class TrackingPageDeltaRecord extends PageDeltaRecord {
}
/** {@inheritDoc} */
- @Override public void applyDelta(ByteBuffer buf) throws IgniteCheckedException {
- TrackingPageIO.VERSIONS.forPage(buf).markChanged(buf, pageIdToMark, nextSnapshotId, lastSuccessfulSnapshotId, buf.capacity());
+ @Override public void applyDelta(PageMemory pageMem, long pageAddr) throws IgniteCheckedException {
+ TrackingPageIO.VERSIONS.forPage(pageAddr).markChanged(pageMem.pageBuffer(pageAddr),
+ pageIdToMark,
+ nextSnapshotId,
+ lastSuccessfulSnapshotId,
+ pageMem.pageSize());
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/7db65ddd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java
index 92b72ce..c226ba2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java
@@ -68,6 +68,13 @@ public interface CacheObject extends Message {
public boolean putValue(ByteBuffer buf) throws IgniteCheckedException;
/**
+ * @param addr Address tp write value to.
+ * @return Number of bytes written.
+ * @throws IgniteCheckedException If failed.
+ */
+ public int putValue(long addr) throws IgniteCheckedException;
+
+ /**
* @param buf Buffer to write value to.
* @param off Offset in source binary data.
* @param len Length of the data to write.
http://git-wip-us.apache.org/repos/asf/ignite/blob/7db65ddd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java
index 1f13c6f..688b92f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java
@@ -24,6 +24,7 @@ import java.io.ObjectOutput;
import java.nio.ByteBuffer;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.pagemem.PageUtils;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -78,6 +79,35 @@ public abstract class CacheObjectAdapter implements CacheObject, Externalizable
}
/** {@inheritDoc} */
+ @Override public int putValue(long addr) throws IgniteCheckedException {
+ assert valBytes != null : "Value bytes must be initialized before object is stored";
+
+ return putValue(addr, cacheObjectType(), valBytes, 0);
+ }
+
+ /**
+ * @param addr Write address.
+ * @param type Object type.
+ * @param valBytes Value bytes array.
+ * @param valOff Value bytes array offset.
+ * @return
+ */
+ public static int putValue(long addr, byte type, byte[] valBytes, int valOff) {
+ int off = 0;
+
+ PageUtils.putInt(addr, off, valBytes.length);
+ off += 4;
+
+ PageUtils.putByte(addr, off, type);
+ off++;
+
+ PageUtils.putBytes(addr, off, valBytes, valOff);
+ off += valBytes.length - valOff;
+
+ return off;
+ }
+
+ /** {@inheritDoc} */
@Override public boolean putValue(final ByteBuffer buf, int off, int len) throws IgniteCheckedException {
assert valBytes != null : "Value bytes must be initialized before object is stored";
@@ -167,8 +197,14 @@ public abstract class CacheObjectAdapter implements CacheObject, Externalizable
* @return {@code True} if data were successfully written.
* @throws IgniteCheckedException If failed.
*/
- public static boolean putValue(byte cacheObjType, final ByteBuffer buf, int off, int len,
- byte[] valBytes, final int start) throws IgniteCheckedException {
+ public static boolean putValue(byte cacheObjType,
+ final ByteBuffer buf,
+ int off,
+ int len,
+ byte[] valBytes,
+ final int start)
+ throws IgniteCheckedException
+ {
int dataLen = valBytes.length;
if (buf.remaining() < len)
http://git-wip-us.apache.org/repos/asf/ignite/blob/7db65ddd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectByteArrayImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectByteArrayImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectByteArrayImpl.java
index b3a4117..eee6fcc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectByteArrayImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectByteArrayImpl.java
@@ -82,6 +82,11 @@ public class CacheObjectByteArrayImpl implements CacheObject, Externalizable {
}
/** {@inheritDoc} */
+ @Override public int putValue(long addr) throws IgniteCheckedException {
+ return CacheObjectAdapter.putValue(addr, cacheObjectType(), val, 0);
+ }
+
+ /** {@inheritDoc} */
@Override public boolean putValue(final ByteBuffer buf, int off, int len) throws IgniteCheckedException {
assert val != null : "Value is not initialized";
http://git-wip-us.apache.org/repos/asf/ignite/blob/7db65ddd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 76450fb..f7e46d8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -23,7 +23,6 @@ import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import javax.cache.Cache;
import org.apache.ignite.IgniteCheckedException;
@@ -31,11 +30,14 @@ import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.pagemem.FullPageId;
+import org.apache.ignite.internal.pagemem.Page;
import org.apache.ignite.internal.pagemem.PageIdUtils;
import org.apache.ignite.internal.pagemem.PageMemory;
+import org.apache.ignite.internal.pagemem.PageUtils;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.database.CacheDataRow;
import org.apache.ignite.internal.processors.cache.database.CacheDataRowAdapter;
+import org.apache.ignite.internal.processors.cache.database.CacheSearchRow;
import org.apache.ignite.internal.processors.cache.database.IgniteCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.database.RootPage;
import org.apache.ignite.internal.processors.cache.database.RowStore;
@@ -44,6 +46,8 @@ import org.apache.ignite.internal.processors.cache.database.tree.BPlusTree;
import org.apache.ignite.internal.processors.cache.database.tree.io.BPlusIO;
import org.apache.ignite.internal.processors.cache.database.tree.io.BPlusInnerIO;
import org.apache.ignite.internal.processors.cache.database.tree.io.BPlusLeafIO;
+import org.apache.ignite.internal.processors.cache.database.tree.io.DataPageIO;
+import org.apache.ignite.internal.processors.cache.database.tree.io.DataPagePayload;
import org.apache.ignite.internal.processors.cache.database.tree.io.IOVersions;
import org.apache.ignite.internal.processors.cache.database.tree.reuse.ReuseList;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
@@ -69,6 +73,8 @@ import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_IDX;
import static org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION;
+import static org.apache.ignite.internal.pagemem.PageIdUtils.itemId;
+import static org.apache.ignite.internal.pagemem.PageIdUtils.pageId;
/**
*
@@ -880,7 +886,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
CacheObject val,
GridCacheVersion ver,
long expireTime) throws IgniteCheckedException {
- DataRow dataRow = new DataRow(key.hashCode(), key, val, ver, p, expireTime);
+ DataRow dataRow = new DataRow(key, val, ver, p, expireTime);
// Make sure value bytes initialized.
key.valueBytes(cctx.cacheObjectContext());
@@ -894,7 +900,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
assert dataRow.link() != 0 : dataRow;
- DataRow old = dataTree.put(dataRow);
+ CacheDataRow old = dataTree.put(dataRow);
if (old == null)
storageSize.incrementAndGet();
@@ -933,7 +939,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
throw new NodeStoppingException("Operation has been cancelled (node is stopping).");
try {
- DataRow dataRow = dataTree.remove(new KeySearchRow(key.hashCode(), key, 0));
+ CacheDataRow dataRow = dataTree.remove(new SearchRow(key));
CacheObject val = null;
GridCacheVersion ver = null;
@@ -970,7 +976,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
/** {@inheritDoc} */
@Override public CacheDataRow find(KeyCacheObject key)
throws IgniteCheckedException {
- return dataTree.findOne(new KeySearchRow(key.hashCode(), key, 0));
+ return dataTree.findOne(new SearchRow(key));
}
/** {@inheritDoc} */
@@ -981,14 +987,14 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
/** {@inheritDoc} */
@Override public GridCursor<? extends CacheDataRow> cursor(KeyCacheObject lower,
KeyCacheObject upper) throws IgniteCheckedException {
- KeySearchRow lowerRow = null;
- KeySearchRow upperRow = null;
+ SearchRow lowerRow = null;
+ SearchRow upperRow = null;
if (lower != null)
- lowerRow = new KeySearchRow(lower.hashCode(), lower, 0);
+ lowerRow = new SearchRow(lower);
if (upper != null)
- upperRow = new KeySearchRow(upper.hashCode(), upper, 0);
+ upperRow = new SearchRow(upper);
return dataTree.find(lowerRow, upperRow);
}
@@ -1034,82 +1040,81 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
/**
*
*/
- private class KeySearchRow extends CacheDataRowAdapter {
+ private static class SearchRow implements CacheSearchRow {
/** */
- protected int hash;
+ private final KeyCacheObject key;
+
+ /** */
+ private final int hash;
/**
- * @param hash Hash code.
* @param key Key.
- * @param link Link.
*/
- KeySearchRow(int hash, KeyCacheObject key, long link) {
- super(link);
-
+ SearchRow(KeyCacheObject key) {
this.key = key;
- this.hash = hash;
- }
- /**
- * Init data.
- *
- * @param keyOnly Initialize only key.
- */
- protected final void initData(boolean keyOnly) {
- if (key != null)
- return;
-
- assert link() != 0;
-
- try {
- initFromLink(cctx, keyOnly);
- }
- catch (IgniteCheckedException e) {
- throw new IgniteException(e.getMessage(), e);
- }
+ hash = key.hashCode();
}
- /**
- * @return Key.
- */
+ /** {@inheritDoc} */
@Override public KeyCacheObject key() {
- initData(true);
-
return key;
}
+
+ /** {@inheritDoc} */
+ @Override public long link() {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hash() {
+ return hash;
+ }
}
/**
*
*/
- private class DataRow extends KeySearchRow {
+ private class DataRow extends CacheDataRowAdapter {
/** */
protected int part = -1;
+ /** */
+ protected int hash;
+
/**
* @param hash Hash code.
* @param link Link.
+ * @param keyOnly If {@code true} initializes only key.
*/
- DataRow(int hash, long link) {
- super(hash, null, link);
+ DataRow(int hash, long link, boolean keyOnly) {
+ super(link);
+
+ this.hash = hash;
part = PageIdUtils.partId(link);
- // We can not init data row lazily because underlying buffer can be concurrently cleared.
- initData(false);
+ try {
+ // We can not init data row lazily because underlying buffer can be concurrently cleared.
+ initFromLink(cctx, keyOnly);
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
}
/**
- * @param hash Hash code.
* @param key Key.
* @param val Value.
* @param ver Version.
* @param part Partition.
* @param expireTime Expire time.
*/
- DataRow(int hash, KeyCacheObject key, CacheObject val, GridCacheVersion ver, int part, long expireTime) {
- super(hash, key, 0);
+ DataRow(KeyCacheObject key, CacheObject val, GridCacheVersion ver, int part, long expireTime) {
+ super(0);
+ this.hash = key.hashCode();
+ this.key = key;
this.val = val;
this.ver = ver;
this.part = part;
@@ -1122,6 +1127,11 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
}
/** {@inheritDoc} */
+ @Override public int hash() {
+ return hash;
+ }
+
+ /** {@inheritDoc} */
@Override public void link(long link) {
this.link = link;
}
@@ -1130,7 +1140,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
/**
*
*/
- protected static class CacheDataTree extends BPlusTree<KeySearchRow, DataRow> {
+ protected static class CacheDataTree extends BPlusTree<CacheSearchRow, CacheDataRow> {
/** */
private final CacheDataRowStore rowStore;
@@ -1169,38 +1179,76 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
}
/** {@inheritDoc} */
- @Override protected int compare(BPlusIO<KeySearchRow> io, ByteBuffer buf, int idx, KeySearchRow row)
+ @Override protected int compare(BPlusIO<CacheSearchRow> io, long pageAddr, int idx, CacheSearchRow row)
throws IgniteCheckedException {
- int hash = ((RowLinkIO)io).getHash(buf, idx);
+ int hash = ((RowLinkIO)io).getHash(pageAddr, idx);
- int cmp = Integer.compare(hash, row.hash);
+ int cmp = Integer.compare(hash, row.hash());
if (cmp != 0)
return cmp;
- KeySearchRow row0 = io.getLookupRow(this, buf, idx);
+ long link = ((RowLinkIO)io).getLink(pageAddr, idx);
- return compareKeys(row0.key(), row.key());
- }
+ assert row.key() != null : row;
- /** {@inheritDoc} */
- @Override protected DataRow getRow(BPlusIO<KeySearchRow> io, ByteBuffer buf, int idx)
- throws IgniteCheckedException {
- int hash = ((RowLinkIO)io).getHash(buf, idx);
- long link = ((RowLinkIO)io).getLink(buf, idx);
-
- return rowStore.dataRow(hash, link);
+ return compareKeys(row.key(), link);
}
/**
- * @param key1 First key.
- * @param key2 Second key.
+ * @param key Key.
+ * @param link Link.
* @return Compare result.
* @throws IgniteCheckedException If failed.
*/
- private int compareKeys(CacheObject key1, CacheObject key2) throws IgniteCheckedException {
- byte[] bytes1 = key1.valueBytes(cctx.cacheObjectContext());
- byte[] bytes2 = key2.valueBytes(cctx.cacheObjectContext());
+ private int compareKeys(KeyCacheObject key, final long link) throws IgniteCheckedException {
+ byte[] bytes = key.valueBytes(cctx.cacheObjectContext());
+
+ PageMemory pageMem = cctx.shared().database().pageMemory();
+
+ try (Page page = page(pageId(link))) {
+ long pageAddr = page.getForReadPointer(); // Non-empty data page must not be recycled.
+
+ assert pageAddr != 0L : link;
+
+ try {
+ DataPageIO io = DataPageIO.VERSIONS.forPage(pageAddr);
+
+ DataPagePayload data = io.readPayload(pageAddr,
+ itemId(link),
+ pageMem.pageSize());
+
+ if (data.nextLink() == 0) {
+ long addr = pageAddr + data.offset();
+
+ int len = PageUtils.getInt(addr, 0);
+
+ int size = Math.min(bytes.length, len);
+
+ addr += 5; // Skip length and type byte.
+
+ for (int i = 0; i < size; i++) {
+ byte b1 = PageUtils.getByte(addr, i);
+ byte b2 = bytes[i];
+
+ if (b1 != b2)
+ return b1 > b2 ? 1 : -1;
+ }
+
+ return Integer.compare(len, bytes.length);
+ }
+ }
+ finally {
+ page.releaseRead();
+ }
+ }
+
+ // TODO GG-11768.
+ CacheDataRowAdapter other = new CacheDataRowAdapter(link);
+ other.initFromLink(cctx, true);
+
+ byte[] bytes1 = other.key().valueBytes(cctx.cacheObjectContext());
+ byte[] bytes2 = key.valueBytes(cctx.cacheObjectContext());
int len = Math.min(bytes1.length, bytes2.length);
@@ -1214,6 +1262,15 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
return Integer.compare(bytes1.length, bytes2.length);
}
+
+ /** {@inheritDoc} */
+ @Override protected CacheDataRow getRow(BPlusIO<CacheSearchRow> io, long pageAddr, int idx)
+ throws IgniteCheckedException {
+ int hash = ((RowLinkIO)io).getHash(pageAddr, idx);
+ long link = ((RowLinkIO)io).getLink(pageAddr, idx);
+
+ return rowStore.dataRow(hash, link);
+ }
}
/**
@@ -1233,8 +1290,8 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
* @param link Link.
* @return Search row.
*/
- private KeySearchRow keySearchRow(int hash, long link) {
- return new KeySearchRow(hash, null, link);
+ private CacheSearchRow keySearchRow(int hash, long link) {
+ return new DataRow(hash, link, true);
}
/**
@@ -1242,8 +1299,8 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
* @param link Link.
* @return Data row.
*/
- private DataRow dataRow(int hash, long link) {
- return new DataRow(hash, link);
+ private CacheDataRow dataRow(int hash, long link) {
+ return new DataRow(hash, link, false);
}
}
@@ -1259,28 +1316,39 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
}
/**
+ * @param pageAddr Page address.
+ * @param off Offset.
+ * @param link Link.
+ * @param hash Hash.
+ */
+ private static void store0(long pageAddr, int off, long link, int hash) {
+ PageUtils.putLong(pageAddr, off, link);
+ PageUtils.putInt(pageAddr, off + 8, hash);
+ }
+
+ /**
*
*/
private interface RowLinkIO {
/**
- * @param buf Buffer.
+ * @param pageAddr Page address.
* @param idx Index.
* @return Row link.
*/
- public long getLink(ByteBuffer buf, int idx);
+ public long getLink(long pageAddr, int idx);
/**
- * @param buf Buffer.
+ * @param pageAddr Page address.
* @param idx Index.
* @return Key hash code.
*/
- public int getHash(ByteBuffer buf, int idx);
+ public int getHash(long pageAddr, int idx);
}
/**
*
*/
- public static final class DataInnerIO extends BPlusInnerIO<KeySearchRow> implements RowLinkIO {
+ public static final class DataInnerIO extends BPlusInnerIO<CacheSearchRow> implements RowLinkIO {
/** */
public static final IOVersions<DataInnerIO> VERSIONS = new IOVersions<>(
new DataInnerIO(1)
@@ -1294,46 +1362,53 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
}
/** {@inheritDoc} */
- @Override public void storeByOffset(ByteBuffer buf, int off, KeySearchRow row) {
+ @Override public void storeByOffset(ByteBuffer buf, int off, CacheSearchRow row) throws IgniteCheckedException {
assert row.link() != 0;
- store0(buf, off, row.link(), row.hash);
+ store0(buf, off, row.link(), row.hash());
}
/** {@inheritDoc} */
- @Override public KeySearchRow getLookupRow(BPlusTree<KeySearchRow, ?> tree, ByteBuffer buf, int idx) {
- int hash = getHash(buf, idx);
- long link = getLink(buf, idx);
+ @Override public void storeByOffset(long pageAddr, int off, CacheSearchRow row) {
+ assert row.link() != 0;
+
+ store0(pageAddr, off, row.link(), row.hash());
+ }
+
+ /** {@inheritDoc} */
+ @Override public CacheSearchRow getLookupRow(BPlusTree<CacheSearchRow, ?> tree, long pageAddr, int idx) {
+ int hash = getHash(pageAddr, idx);
+ long link = getLink(pageAddr, idx);
return ((CacheDataTree)tree).rowStore.keySearchRow(hash, link);
}
/** {@inheritDoc} */
- @Override public void store(ByteBuffer dst, int dstIdx, BPlusIO<KeySearchRow> srcIo, ByteBuffer src,
+ @Override public void store(long dstPageAddr, int dstIdx, BPlusIO<CacheSearchRow> srcIo, long srcPageAddr,
int srcIdx) {
- int hash = ((RowLinkIO)srcIo).getHash(src, srcIdx);
- long link = ((RowLinkIO)srcIo).getLink(src, srcIdx);
+ int hash = ((RowLinkIO)srcIo).getHash(srcPageAddr, srcIdx);
+ long link = ((RowLinkIO)srcIo).getLink(srcPageAddr, srcIdx);
- store0(dst, offset(dstIdx), link, hash);
+ store0(dstPageAddr, offset(dstIdx), link, hash);
}
/** {@inheritDoc} */
- @Override public long getLink(ByteBuffer buf, int idx) {
- assert idx < getCount(buf) : idx;
+ @Override public long getLink(long pageAddr, int idx) {
+ assert idx < getCount(pageAddr) : idx;
- return buf.getLong(offset(idx));
+ return PageUtils.getLong(pageAddr, offset(idx));
}
/** {@inheritDoc} */
- @Override public int getHash(ByteBuffer buf, int idx) {
- return buf.getInt(offset(idx) + 8);
+ @Override public int getHash(long pageAddr, int idx) {
+ return PageUtils.getInt(pageAddr, offset(idx) + 8);
}
}
/**
*
*/
- public static final class DataLeafIO extends BPlusLeafIO<KeySearchRow> implements RowLinkIO {
+ public static final class DataLeafIO extends BPlusLeafIO<CacheSearchRow> implements RowLinkIO {
/** */
public static final IOVersions<DataLeafIO> VERSIONS = new IOVersions<>(
new DataLeafIO(1)
@@ -1347,20 +1422,27 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
}
/** {@inheritDoc} */
- @Override public void storeByOffset(ByteBuffer buf, int off, KeySearchRow row) {
+ @Override public void storeByOffset(ByteBuffer buf, int off, CacheSearchRow row) throws IgniteCheckedException {
+ assert row.link() != 0;
+
+ store0(buf, off, row.link(), row.hash());
+ }
+
+ /** {@inheritDoc} */
+ @Override public void storeByOffset(long pageAddr, int off, CacheSearchRow row) {
assert row.link() != 0;
- store0(buf, off, row.link(), row.hash);
+ store0(pageAddr, off, row.link(), row.hash());
}
/** {@inheritDoc} */
- @Override public void store(ByteBuffer dst, int dstIdx, BPlusIO<KeySearchRow> srcIo, ByteBuffer src,
+ @Override public void store(long dstPageAddr, int dstIdx, BPlusIO<CacheSearchRow> srcIo, long srcPageAddr,
int srcIdx) {
- store0(dst, offset(dstIdx), getLink(src, srcIdx), getHash(src, srcIdx));
+ store0(dstPageAddr, offset(dstIdx), getLink(srcPageAddr, srcIdx), getHash(srcPageAddr, srcIdx));
}
/** {@inheritDoc} */
- @Override public KeySearchRow getLookupRow(BPlusTree<KeySearchRow, ?> tree, ByteBuffer buf, int idx) {
+ @Override public CacheSearchRow getLookupRow(BPlusTree<CacheSearchRow, ?> tree, long buf, int idx) {
int hash = getHash(buf, idx);
long link = getLink(buf, idx);
@@ -1368,15 +1450,15 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
}
/** {@inheritDoc} */
- @Override public long getLink(ByteBuffer buf, int idx) {
- assert idx < getCount(buf) : idx;
+ @Override public long getLink(long pageAddr, int idx) {
+ assert idx < getCount(pageAddr) : idx;
- return buf.getLong(offset(idx));
+ return PageUtils.getLong(pageAddr, offset(idx));
}
/** {@inheritDoc} */
- @Override public int getHash(ByteBuffer buf, int idx) {
- return buf.getInt(offset(idx) + 8);
+ @Override public int getHash(long pageAddr, int idx) {
+ return PageUtils.getInt(pageAddr, offset(idx) + 8);
}
}
@@ -1471,9 +1553,9 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
}
/** {@inheritDoc} */
- @Override protected int compare(BPlusIO<PendingRow> io, ByteBuffer buf, int idx, PendingRow row)
+ @Override protected int compare(BPlusIO<PendingRow> io, long pageAddr, int idx, PendingRow row)
throws IgniteCheckedException {
- long expireTime = ((PendingRowIO)io).getExpireTime(buf, idx);
+ long expireTime = ((PendingRowIO)io).getExpireTime(pageAddr, idx);
int cmp = Long.compare(expireTime, row.expireTime);
@@ -1483,15 +1565,15 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
if (row.link == 0L)
return 0;
- long link = ((PendingRowIO)io).getLink(buf, idx);
+ long link = ((PendingRowIO)io).getLink(pageAddr, idx);
return Long.compare(link, row.link);
}
/** {@inheritDoc} */
- @Override protected PendingRow getRow(BPlusIO<PendingRow> io, ByteBuffer buf, int idx)
+ @Override protected PendingRow getRow(BPlusIO<PendingRow> io, long pageAddr, int idx)
throws IgniteCheckedException {
- return io.getLookupRow(this, buf, idx);
+ return io.getLookupRow(this, pageAddr, idx);
}
}
@@ -1500,18 +1582,18 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
*/
private interface PendingRowIO {
/**
- * @param buf Buffer.
+ * @param pageAddr Page address.
* @param idx Index.
* @return Expire time.
*/
- long getExpireTime(ByteBuffer buf, int idx);
+ long getExpireTime(long pageAddr, int idx);
/**
- * @param buf Buffer.
+ * @param pageAddr Page address.
* @param idx Index.
* @return Link.
*/
- long getLink(ByteBuffer buf, int idx);
+ long getLink(long pageAddr, int idx);
}
/**
@@ -1540,34 +1622,45 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
}
/** {@inheritDoc} */
- @Override public void store(ByteBuffer dst,
+ @Override public void storeByOffset(long pageAddr, int off, PendingRow row) throws IgniteCheckedException {
+ assert row.link != 0;
+ assert row.expireTime != 0;
+
+ PageUtils.putLong(pageAddr, off, row.expireTime);
+ PageUtils.putLong(pageAddr, off + 8, row.link);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void store(long dstPageAddr,
int dstIdx,
BPlusIO<PendingRow> srcIo,
- ByteBuffer src,
+ long srcPageAddr,
int srcIdx) throws IgniteCheckedException {
int dstOff = offset(dstIdx);
- long link = ((PendingRowIO)srcIo).getLink(src, srcIdx);
- long expireTime = ((PendingRowIO)srcIo).getExpireTime(src, srcIdx);
+ long link = ((PendingRowIO)srcIo).getLink(srcPageAddr, srcIdx);
+ long expireTime = ((PendingRowIO)srcIo).getExpireTime(srcPageAddr, srcIdx);
- dst.putLong(dstOff, expireTime);
- dst.putLong(dstOff + 8, link);
+ PageUtils.putLong(dstPageAddr, dstOff, expireTime);
+ PageUtils.putLong(dstPageAddr, dstOff + 8, link);
}
/** {@inheritDoc} */
- @Override public PendingRow getLookupRow(BPlusTree<PendingRow, ?> tree, ByteBuffer buf, int idx)
+ @Override public PendingRow getLookupRow(BPlusTree<PendingRow, ?> tree, long pageAddr, int idx)
throws IgniteCheckedException {
- return PendingRow.createRowWithKey(((PendingEntriesTree)tree).cctx, getExpireTime(buf, idx), getLink(buf, idx));
+ return PendingRow.createRowWithKey(((PendingEntriesTree)tree).cctx,
+ getExpireTime(pageAddr, idx),
+ getLink(pageAddr, idx));
}
/** {@inheritDoc} */
- @Override public long getExpireTime(ByteBuffer buf, int idx) {
- return buf.getLong(offset(idx));
+ @Override public long getExpireTime(long pageAddr, int idx) {
+ return PageUtils.getLong(pageAddr, offset(idx));
}
/** {@inheritDoc} */
- @Override public long getLink(ByteBuffer buf, int idx) {
- return buf.getLong(offset(idx) + 8);
+ @Override public long getLink(long pageAddr, int idx) {
+ return PageUtils.getLong(pageAddr, offset(idx) + 8);
}
}
@@ -1597,34 +1690,45 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
}
/** {@inheritDoc} */
- @Override public void store(ByteBuffer dst,
+ @Override public void storeByOffset(long pageAddr, int off, PendingRow row) throws IgniteCheckedException {
+ assert row.link != 0;
+ assert row.expireTime != 0;
+
+ PageUtils.putLong(pageAddr, off, row.expireTime);
+ PageUtils.putLong(pageAddr, off + 8, row.link);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void store(long dstPageAddr,
int dstIdx,
BPlusIO<PendingRow> srcIo,
- ByteBuffer src,
+ long srcPageAddr,
int srcIdx) throws IgniteCheckedException {
int dstOff = offset(dstIdx);
- long link = ((PendingRowIO)srcIo).getLink(src, srcIdx);
- long expireTime = ((PendingRowIO)srcIo).getExpireTime(src, srcIdx);
+ long link = ((PendingRowIO)srcIo).getLink(srcPageAddr, srcIdx);
+ long expireTime = ((PendingRowIO)srcIo).getExpireTime(srcPageAddr, srcIdx);
- dst.putLong(dstOff, expireTime);
- dst.putLong(dstOff + 8, link);
+ PageUtils.putLong(dstPageAddr, dstOff, expireTime);
+ PageUtils.putLong(dstPageAddr, dstOff + 8, link);
}
/** {@inheritDoc} */
- @Override public PendingRow getLookupRow(BPlusTree<PendingRow, ?> tree, ByteBuffer buf, int idx)
+ @Override public PendingRow getLookupRow(BPlusTree<PendingRow, ?> tree, long pageAddr, int idx)
throws IgniteCheckedException {
- return PendingRow.createRowWithKey(((PendingEntriesTree)tree).cctx, getExpireTime(buf, idx), getLink(buf, idx));
+ return PendingRow.createRowWithKey(((PendingEntriesTree)tree).cctx,
+ getExpireTime(pageAddr, idx),
+ getLink(pageAddr, idx));
}
/** {@inheritDoc} */
- @Override public long getExpireTime(ByteBuffer buf, int idx) {
- return buf.getLong(offset(idx));
+ @Override public long getExpireTime(long pageAddr, int idx) {
+ return PageUtils.getLong(pageAddr, offset(idx));
}
/** {@inheritDoc} */
- @Override public long getLink(ByteBuffer buf, int idx) {
- return buf.getLong(offset(idx) + 8);
+ @Override public long getLink(long pageAddr, int idx) {
+ return PageUtils.getLong(pageAddr, offset(idx) + 8);
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7db65ddd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRow.java
index d4d7020..75ab8e4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRow.java
@@ -18,18 +18,12 @@
package org.apache.ignite.internal.processors.cache.database;
import org.apache.ignite.internal.processors.cache.CacheObject;
-import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
/**
* Cache data row.
*/
-public interface CacheDataRow {
- /**
- * @return Cache key.
- */
- public KeyCacheObject key();
-
+public interface CacheDataRow extends CacheSearchRow {
/**
* @return Cache value.
*/
@@ -51,11 +45,6 @@ public interface CacheDataRow {
public int partition();
/**
- * @return Link for this row.
- */
- public long link();
-
- /**
* @param link Link for this row.
*/
public void link(long link);
http://git-wip-us.apache.org/repos/asf/ignite/blob/7db65ddd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java
index b5babc4..5288aad 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java
@@ -21,14 +21,17 @@ import java.nio.ByteBuffer;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.pagemem.Page;
import org.apache.ignite.internal.pagemem.PageIdUtils;
+import org.apache.ignite.internal.pagemem.PageMemory;
+import org.apache.ignite.internal.pagemem.PageUtils;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheObjectContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.IncompleteCacheObject;
import org.apache.ignite.internal.processors.cache.IncompleteObject;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.database.tree.io.DataPageIO;
import org.apache.ignite.internal.processors.cache.database.tree.io.CacheVersionIO;
+import org.apache.ignite.internal.processors.cache.database.tree.io.DataPageIO;
+import org.apache.ignite.internal.processors.cache.database.tree.io.DataPagePayload;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -88,20 +91,26 @@ public class CacheDataRowAdapter implements CacheDataRow {
boolean first = true;
do {
+ PageMemory pageMem = cctx.shared().database().pageMemory();
+
try (Page page = page(pageId(nextLink), cctx)) {
- ByteBuffer buf = page.getForRead(); // Non-empty data page must not be recycled.
+ long pageAddr = page.getForReadPointer(); // Non-empty data page must not be recycled.
- assert buf != null: nextLink;
+ assert pageAddr != 0L : nextLink;
try {
- DataPageIO io = DataPageIO.VERSIONS.forPage(buf);
+ DataPageIO io = DataPageIO.VERSIONS.forPage(pageAddr);
- nextLink = io.setPositionAndLimitOnPayload(buf, itemId(nextLink));
+ DataPagePayload data = io.readPayload(pageAddr,
+ itemId(nextLink),
+ pageMem.pageSize());
+
+ nextLink = data.nextLink();
if (first) {
if (nextLink == 0) {
// Fast path for a single page row.
- readFullRow(coctx, buf, keyOnly);
+ readFullRow(coctx, pageAddr + data.offset(), keyOnly);
return;
}
@@ -109,6 +118,11 @@ public class CacheDataRowAdapter implements CacheDataRow {
first = false;
}
+ ByteBuffer buf = pageMem.pageBuffer(pageAddr);
+
+ buf.position(data.offset());
+ buf.limit(data.offset() + data.payloadSize());
+
incomplete = readFragment(coctx, buf, keyOnly, incomplete);
if (keyOnly && key != null)
@@ -121,7 +135,7 @@ public class CacheDataRowAdapter implements CacheDataRow {
}
while(nextLink != 0);
- assert isReady(): "ready";
+ assert isReady() : "ready";
}
/**
@@ -130,6 +144,7 @@ public class CacheDataRowAdapter implements CacheDataRow {
* @param keyOnly {@code true} If need to read only key object.
* @param incomplete Incomplete object.
* @throws IgniteCheckedException If failed.
+ * @return Read object.
*/
private IncompleteObject<?> readFragment(
CacheObjectContext coctx,
@@ -175,12 +190,23 @@ public class CacheDataRowAdapter implements CacheDataRow {
/**
* @param coctx Cache object context.
- * @param buf Buffer.
+ * @param addr Address.
* @param keyOnly {@code true} If need to read only key object.
* @throws IgniteCheckedException If failed.
*/
- private void readFullRow(CacheObjectContext coctx, ByteBuffer buf, boolean keyOnly) throws IgniteCheckedException {
- key = coctx.processor().toKeyCacheObject(coctx, buf);
+ private void readFullRow(CacheObjectContext coctx, long addr, boolean keyOnly) throws IgniteCheckedException {
+ int off = 0;
+
+ int len = PageUtils.getInt(addr, off);
+ off += 4;
+
+ byte type = PageUtils.getByte(addr, off);
+ off++;
+
+ byte[] bytes = PageUtils.getBytes(addr, off, len);
+ off += len;
+
+ key = coctx.processor().toKeyCacheObject(coctx, type, bytes);
if (keyOnly) {
assert key != null: "key";
@@ -188,9 +214,22 @@ public class CacheDataRowAdapter implements CacheDataRow {
return;
}
- val = coctx.processor().toCacheObject(coctx, buf);
- ver = CacheVersionIO.read(buf, false);
- expireTime = buf.getLong();
+ len = PageUtils.getInt(addr, off);
+ off += 4;
+
+ type = PageUtils.getByte(addr, off);
+ off++;
+
+ bytes = PageUtils.getBytes(addr, off, len);
+ off += len;
+
+ val = coctx.processor().toCacheObject(coctx, type, bytes);
+
+ ver = CacheVersionIO.read(addr + off, false);
+
+ off += CacheVersionIO.size(ver, false);
+
+ expireTime = PageUtils.getLong(addr, off);
assert isReady(): "ready";
}
@@ -249,6 +288,7 @@ public class CacheDataRowAdapter implements CacheDataRow {
* @param buf Buffer.
* @param incomplete Incomplete object.
* @return Incomplete object.
+ * @throws IgniteCheckedException If failed.
*/
private IncompleteObject<?> readIncompleteExpireTime(
ByteBuffer buf,
@@ -292,6 +332,7 @@ public class CacheDataRowAdapter implements CacheDataRow {
* @param buf Buffer.
* @param incomplete Incomplete object.
* @return Incomplete object.
+ * @throws IgniteCheckedException If failed.
*/
private IncompleteObject<?> readIncompleteVersion(
ByteBuffer buf,
@@ -385,6 +426,11 @@ public class CacheDataRowAdapter implements CacheDataRow {
}
/** {@inheritDoc} */
+ @Override public int hash() {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(CacheDataRowAdapter.class, this, "link", U.hexLong(link));
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7db65ddd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheSearchRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheSearchRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheSearchRow.java
new file mode 100644
index 0000000..d51cf0e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheSearchRow.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.database;
+
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+
+/**
+ *
+ */
+public interface CacheSearchRow {
+ /**
+ * @return Cache key.
+ */
+ public KeyCacheObject key();
+
+ /**
+ * @return Link for this row.
+ */
+ public long link();
+
+ /**
+ * @return Key hash code.
+ */
+ public int hash();
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7db65ddd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/DataStructure.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/DataStructure.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/DataStructure.java
index 5fd64b0..f47a697 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/DataStructure.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/DataStructure.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.processors.cache.database;
-import java.nio.ByteBuffer;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.ignite.IgniteCheckedException;
@@ -131,35 +130,35 @@ public abstract class DataStructure implements PageLockListener {
/**
* @param page Page.
- * @return Buffer.
+ * @return Page address.
*/
- protected final ByteBuffer tryWriteLock(Page page) {
+ protected final long tryWriteLock(Page page) {
return PageHandler.writeLock(page, this, true);
}
/**
* @param page Page.
- * @return Buffer.
+ * @return Page address.
*/
- protected final ByteBuffer writeLock(Page page) {
+ protected final long writeLock(Page page) {
return PageHandler.writeLock(page, this, false);
}
/**
* @param page Page.
- * @param buf Buffer.
+ * @param pageAddr Page address.
* @param dirty Dirty page.
*/
- protected final void writeUnlock(Page page, ByteBuffer buf, boolean dirty) {
- PageHandler.writeUnlock(page, buf, this, dirty);
+ protected final void writeUnlock(Page page, long pageAddr, boolean dirty) {
+ PageHandler.writeUnlock(page, pageAddr, this, dirty);
}
/**
* @param page Page.
- * @return Buffer.
+ * @return Page address.
*/
- protected final ByteBuffer readLock(Page page) {
+ protected final long readLock(Page page) {
return PageHandler.readLock(page, this);
}
@@ -167,22 +166,29 @@ public abstract class DataStructure implements PageLockListener {
* @param page Page.
* @param buf Buffer.
*/
- protected final void readUnlock(Page page, ByteBuffer buf) {
+ protected final void readUnlock(Page page, long buf) {
PageHandler.readUnlock(page, buf, this);
}
+ /**
+ * @return Page size.
+ */
+ protected final int pageSize() {
+ return pageMem.pageSize();
+ }
+
/** {@inheritDoc} */
@Override public void onBeforeWriteLock(Page page) {
// No-op.
}
/** {@inheritDoc} */
- @Override public void onWriteLock(Page page, ByteBuffer buf) {
+ @Override public void onWriteLock(Page page, long pageAddr) {
// No-op.
}
/** {@inheritDoc} */
- @Override public void onWriteUnlock(Page page, ByteBuffer buf) {
+ @Override public void onWriteUnlock(Page page, long pageAddr) {
// No-op.
}
@@ -192,12 +198,12 @@ public abstract class DataStructure implements PageLockListener {
}
/** {@inheritDoc} */
- @Override public void onReadLock(Page page, ByteBuffer buf) {
+ @Override public void onReadLock(Page page, long pageAddr) {
// No-op.
}
/** {@inheritDoc} */
- @Override public void onReadUnlock(Page page, ByteBuffer buf) {
+ @Override public void onReadUnlock(Page page, long pageAddr) {
// No-op.
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7db65ddd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabaseSharedManager.java
index 18b3a1f..9c10057 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabaseSharedManager.java
@@ -256,7 +256,7 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap
true,
sizes);
- return new PageMemoryNoStoreImpl(log, memProvider, cctx, dbCfg.getPageSize());
+ return new PageMemoryNoStoreImpl(log, memProvider, cctx, dbCfg.getPageSize(), false);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/7db65ddd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/MetadataStorage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/MetadataStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/MetadataStorage.java
index 26151ac..cf6decb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/MetadataStorage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/MetadataStorage.java
@@ -24,6 +24,7 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.pagemem.FullPageId;
import org.apache.ignite.internal.pagemem.PageMemory;
+import org.apache.ignite.internal.pagemem.PageUtils;
import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
import org.apache.ignite.internal.processors.cache.database.tree.BPlusTree;
import org.apache.ignite.internal.processors.cache.database.tree.io.BPlusIO;
@@ -31,6 +32,7 @@ import org.apache.ignite.internal.processors.cache.database.tree.io.BPlusInnerIO
import org.apache.ignite.internal.processors.cache.database.tree.io.BPlusLeafIO;
import org.apache.ignite.internal.processors.cache.database.tree.io.IOVersions;
import org.apache.ignite.internal.processors.cache.database.tree.reuse.ReuseList;
+import org.apache.ignite.internal.processors.cache.database.tree.util.PageHandler;
import org.apache.ignite.internal.util.typedef.internal.U;
/**
@@ -190,19 +192,19 @@ public class MetadataStorage implements MetaStore {
}
/** {@inheritDoc} */
- @Override protected int compare(final BPlusIO<IndexItem> io, final ByteBuffer buf, final int idx,
+ @Override protected int compare(final BPlusIO<IndexItem> io, final long pageAddr, final int idx,
final IndexItem row) throws IgniteCheckedException {
final int off = ((IndexIO)io).getOffset(idx);
int shift = 0;
// Compare index names.
- final byte len = buf.get(off + shift);
+ final byte len = PageUtils.getByte(pageAddr, off + shift);
shift += BYTE_LEN;
for (int i = 0; i < len && i < row.idxName.length; i++) {
- final int cmp = Byte.compare(buf.get(off + i + shift), row.idxName[i]);
+ final int cmp = Byte.compare(PageUtils.getByte(pageAddr, off + i + shift), row.idxName[i]);
if (cmp != 0)
return cmp;
@@ -212,9 +214,9 @@ public class MetadataStorage implements MetaStore {
}
/** {@inheritDoc} */
- @Override protected IndexItem getRow(final BPlusIO<IndexItem> io, final ByteBuffer buf,
+ @Override protected IndexItem getRow(final BPlusIO<IndexItem> io, final long pageAddr,
final int idx) throws IgniteCheckedException {
- return readRow(buf, ((IndexIO)io).getOffset(idx));
+ return readRow(pageAddr, ((IndexIO)io).getOffset(idx));
}
}
@@ -275,78 +277,78 @@ public class MetadataStorage implements MetaStore {
}
/**
- * Copy row data.
+ * Store row to buffer.
*
- * @param dst Destination buffer.
- * @param dstOff Destination buf offset.
- * @param src Source buffer.
- * @param srcOff Src buf offset.
+ * @param pageAddr Page address.
+ * @param off Offset in buf.
+ * @param row Row to store.
*/
private static void storeRow(
- final ByteBuffer dst,
- final int dstOff,
- final ByteBuffer src,
- final int srcOff
+ final long pageAddr,
+ int off,
+ final IndexItem row
) {
- int srcOrigPos = src.position();
- int dstOrigPos = dst.position();
-
- try {
- src.position(srcOff);
- dst.position(dstOff);
-
- // Index name length.
- final byte len = src.get();
+ // Index name length.
+ PageUtils.putByte(pageAddr, off, (byte)row.idxName.length);
+ off++;
- dst.put(len);
+ // Index name.
+ PageUtils.putBytes(pageAddr, off, row.idxName);
+ off += row.idxName.length;
- int lim = src.limit();
+ // Page ID.
+ PageUtils.putLong(pageAddr, off, row.pageId);
+ }
- src.limit(src.position() + len);
+ /**
+ * Copy row data.
+ *
+ * @param dstPageAddr Destination page address.
+ * @param dstOff Destination buf offset.
+ * @param srcPageAddr Source page address.
+ * @param srcOff Src buf offset.
+ */
+ private static void storeRow(
+ final long dstPageAddr,
+ int dstOff,
+ final long srcPageAddr,
+ int srcOff
+ ) {
+ // Index name length.
+ final byte len = PageUtils.getByte(srcPageAddr, srcOff);
+ srcOff++;
- // Index name.
- dst.put(src);
+ PageUtils.putByte(dstPageAddr, dstOff, len);
+ dstOff++;
- src.limit(lim);
+ PageHandler.copyMemory(srcPageAddr, srcOff, dstPageAddr, dstOff, len);
+ srcOff += len;
+ dstOff += len;
- // Page ID.
- dst.putLong(src.getLong());
- }
- finally {
- src.position(srcOrigPos);
- dst.position(dstOrigPos);
- }
+ // Page ID.
+ PageUtils.putLong(dstPageAddr, dstOff, PageUtils.getLong(srcPageAddr, srcOff));
}
/**
* Read row from buffer.
*
- * @param buf Buffer to read.
- * @param off Offset in buf.
+ * @param pageAddr Page address.
+ * @param off Offset.
* @return Read row.
*/
- private static IndexItem readRow(final ByteBuffer buf, final int off) {
- int origOff = buf.position();
-
- try {
- buf.position(off);
-
- // Index name length.
- final int len = buf.get() & 0xFF;
-
- // Index name.
- final byte[] idxName = new byte[len];
+ private static IndexItem readRow(final long pageAddr, int off) {
+ // Index name length.
+ final int len = PageUtils.getByte(pageAddr, off) & 0xFF;
+ off++;
- buf.get(idxName);
+ // Index name.
+ final byte[] idxName = PageUtils.getBytes(pageAddr, off, len);
+ off += len;
- // Page ID.
- final long pageId = buf.getLong();
+ // Page ID.
+ final long pageId = PageUtils.getLong(pageAddr, off);
- return new IndexItem(idxName, pageId);
- }
- finally {
- buf.position(origOff);
- }
+ return new IndexItem(idxName, pageId);
}
/**
@@ -383,16 +385,21 @@ public class MetadataStorage implements MetaStore {
}
/** {@inheritDoc} */
- @Override public void store(final ByteBuffer dst, final int dstIdx, final BPlusIO<IndexItem> srcIo,
- final ByteBuffer src,
+ @Override public void storeByOffset(long pageAddr, int off, IndexItem row) throws IgniteCheckedException {
+ storeRow(pageAddr, off, row);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void store(final long dstPageAddr, final int dstIdx, final BPlusIO<IndexItem> srcIo,
+ final long srcPageAddr,
final int srcIdx) throws IgniteCheckedException {
- storeRow(dst, offset(dstIdx), src, ((IndexIO)srcIo).getOffset(srcIdx));
+ storeRow(dstPageAddr, offset(dstIdx), srcPageAddr, ((IndexIO)srcIo).getOffset(srcIdx));
}
/** {@inheritDoc} */
- @Override public IndexItem getLookupRow(final BPlusTree<IndexItem, ?> tree, final ByteBuffer buf,
+ @Override public IndexItem getLookupRow(final BPlusTree<IndexItem, ?> tree, final long pageAddr,
final int idx) throws IgniteCheckedException {
- return readRow(buf, offset(idx));
+ return readRow(pageAddr, offset(idx));
}
/** {@inheritDoc} */
@@ -424,16 +431,24 @@ public class MetadataStorage implements MetaStore {
}
/** {@inheritDoc} */
- @Override public void store(final ByteBuffer dst, final int dstIdx, final BPlusIO<IndexItem> srcIo,
- final ByteBuffer src,
+ @Override public void storeByOffset(long buf, int off, IndexItem row) throws IgniteCheckedException {
+ storeRow(buf, off, row);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void store(final long dstPageAddr,
+ final int dstIdx,
+ final BPlusIO<IndexItem> srcIo,
+ final long srcPageAddr,
final int srcIdx) throws IgniteCheckedException {
- storeRow(dst, offset(dstIdx), src, ((IndexIO)srcIo).getOffset(srcIdx));
+ storeRow(dstPageAddr, offset(dstIdx), srcPageAddr, ((IndexIO)srcIo).getOffset(srcIdx));
}
/** {@inheritDoc} */
- @Override public IndexItem getLookupRow(final BPlusTree<IndexItem, ?> tree, final ByteBuffer buf,
+ @Override public IndexItem getLookupRow(final BPlusTree<IndexItem, ?> tree,
+ final long pageAddr,
final int idx) throws IgniteCheckedException {
- return readRow(buf, offset(idx));
+ return readRow(pageAddr, offset(idx));
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/7db65ddd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeListImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeListImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeListImpl.java
index 6a29027..6c1b21b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeListImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeListImpl.java
@@ -17,13 +17,13 @@
package org.apache.ignite.internal.processors.cache.database.freelist;
-import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicReferenceArray;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.pagemem.Page;
import org.apache.ignite.internal.pagemem.PageIdAllocator;
import org.apache.ignite.internal.pagemem.PageIdUtils;
import org.apache.ignite.internal.pagemem.PageMemory;
+import org.apache.ignite.internal.pagemem.PageUtils;
import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageInsertFragmentRecord;
import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageInsertRecord;
@@ -31,6 +31,7 @@ import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageRemoveRecord;
import org.apache.ignite.internal.processors.cache.database.CacheDataRow;
import org.apache.ignite.internal.processors.cache.database.tree.io.CacheVersionIO;
import org.apache.ignite.internal.processors.cache.database.tree.io.DataPageIO;
+import org.apache.ignite.internal.processors.cache.database.tree.io.DataPagePayload;
import org.apache.ignite.internal.processors.cache.database.tree.io.PageIO;
import org.apache.ignite.internal.processors.cache.database.tree.reuse.ReuseBag;
import org.apache.ignite.internal.processors.cache.database.tree.reuse.ReuseList;
@@ -72,26 +73,26 @@ public class FreeListImpl extends PagesList implements FreeList, ReuseList {
/** */
private final PageHandler<CacheDataRow, Integer> writeRow =
new PageHandler<CacheDataRow, Integer>() {
- @Override public Integer run(Page page, PageIO iox, ByteBuffer buf, CacheDataRow row, int written)
+ @Override public Integer run(Page page, PageIO iox, long pageAddr, CacheDataRow row, int written)
throws IgniteCheckedException {
DataPageIO io = (DataPageIO)iox;
int rowSize = getRowSize(row);
- int oldFreeSpace = io.getFreeSpace(buf);
+ int oldFreeSpace = io.getFreeSpace(pageAddr);
assert oldFreeSpace > 0 : oldFreeSpace;
// If the full row does not fit into this page write only a fragment.
- written = (written == 0 && oldFreeSpace >= rowSize) ? addRow(page, buf, io, row, rowSize):
- addRowFragment(page, buf, io, row, written, rowSize);
+ written = (written == 0 && oldFreeSpace >= rowSize) ? addRow(page, pageAddr, io, row, rowSize):
+ addRowFragment(page, pageAddr, io, row, written, rowSize);
// Reread free space after update.
- int newFreeSpace = io.getFreeSpace(buf);
+ int newFreeSpace = io.getFreeSpace(pageAddr);
if (newFreeSpace > MIN_PAGE_FREE_SPACE) {
int bucket = bucket(newFreeSpace, false);
- put(null, page, buf, bucket);
+ put(null, page, pageAddr, bucket);
}
// Avoid boxing with garbage generation for usual case.
@@ -109,24 +110,22 @@ public class FreeListImpl extends PagesList implements FreeList, ReuseList {
*/
private int addRow(
Page page,
- ByteBuffer buf,
+ long buf,
DataPageIO io,
CacheDataRow row,
int rowSize
) throws IgniteCheckedException {
- // TODO: context parameter.
- io.addRow(buf, row, rowSize);
+ io.addRow(buf, row, rowSize, pageSize());
if (isWalDeltaRecordNeeded(wal, page)) {
// TODO This record must contain only a reference to a logical WAL record with the actual data.
byte[] payload = new byte[rowSize];
- io.setPositionAndLimitOnPayload(buf, PageIdUtils.itemId(row.link()));
+ DataPagePayload data = io.readPayload(buf, PageIdUtils.itemId(row.link()), pageSize());
- assert buf.remaining() == rowSize;
+ assert data.payloadSize() == rowSize;
- buf.get(payload);
- buf.position(0);
+ PageUtils.getBytes(buf, data.offset(), payload, 0, rowSize);
wal.log(new DataPageInsertRecord(
cacheId,
@@ -149,7 +148,7 @@ public class FreeListImpl extends PagesList implements FreeList, ReuseList {
*/
private int addRowFragment(
Page page,
- ByteBuffer buf,
+ long buf,
DataPageIO io,
CacheDataRow row,
int written,
@@ -158,17 +157,17 @@ public class FreeListImpl extends PagesList implements FreeList, ReuseList {
// Read last link before the fragment write, because it will be updated there.
long lastLink = row.link();
- int payloadSize = io.addRowFragment(buf, row, written, rowSize);
+ int payloadSize = io.addRowFragment(pageMem, buf, row, written, rowSize, pageSize());
- assert payloadSize > 0: payloadSize;
+ assert payloadSize > 0 : payloadSize;
if (isWalDeltaRecordNeeded(wal, page)) {
// TODO This record must contain only a reference to a logical WAL record with the actual data.
byte[] payload = new byte[payloadSize];
- io.setPositionAndLimitOnPayload(buf, PageIdUtils.itemId(row.link()));
- buf.get(payload);
- buf.position(0);
+ DataPagePayload data = io.readPayload(buf, PageIdUtils.itemId(row.link()), pageSize());
+
+ PageUtils.getBytes(buf, data.offset(), payload, 0, payloadSize);
wal.log(new DataPageInsertFragmentRecord(cacheId, page.id(), payload, lastLink));
}
@@ -179,15 +178,15 @@ public class FreeListImpl extends PagesList implements FreeList, ReuseList {
/** */
private final PageHandler<Void, Long> rmvRow = new PageHandler<Void, Long>() {
- @Override public Long run(Page page, PageIO iox, ByteBuffer buf, Void arg, int itemId)
+ @Override public Long run(Page page, PageIO iox, long pageAddr, Void arg, int itemId)
throws IgniteCheckedException {
DataPageIO io = (DataPageIO)iox;
- int oldFreeSpace = io.getFreeSpace(buf);
+ int oldFreeSpace = io.getFreeSpace(pageAddr);
assert oldFreeSpace >= 0: oldFreeSpace;
- long nextLink = io.removeRow(buf, itemId);
+ long nextLink = io.removeRow(pageAddr, itemId, pageSize());
if (isWalDeltaRecordNeeded(wal, page))
wal.log(new DataPageRemoveRecord(cacheId, page.id(), itemId));
@@ -200,7 +199,7 @@ public class FreeListImpl extends PagesList implements FreeList, ReuseList {
// put(null, page, buf, REUSE_BUCKET);
// }
- int newFreeSpace = io.getFreeSpace(buf);
+ int newFreeSpace = io.getFreeSpace(pageAddr);
if (newFreeSpace > MIN_PAGE_FREE_SPACE) {
int newBucket = bucket(newFreeSpace, false);
@@ -210,12 +209,12 @@ public class FreeListImpl extends PagesList implements FreeList, ReuseList {
if (oldBucket != newBucket) {
// It is possible that page was concurrently taken for put, in this case put will handle bucket change.
- if (removeDataPage(page, buf, io, oldBucket))
- put(null, page, buf, newBucket);
+ if (removeDataPage(page, pageAddr, io, oldBucket))
+ put(null, page, pageAddr, newBucket);
}
}
else
- put(null, page, buf, newBucket);
+ put(null, page, pageAddr, newBucket);
}
// For common case boxed 0L will be cached inside of Long, so no garbage will be produced.
@@ -326,7 +325,7 @@ public class FreeListImpl extends PagesList implements FreeList, ReuseList {
// If it is an existing page, we do not need to initialize it.
DataPageIO init = reuseBucket || pageId == 0L ? DataPageIO.VERSIONS.latest() : null;
- written = writePage(page, this, writeRow, init, wal, row, written, FAIL_I);
+ written = writePage(pageMem, page, this, writeRow, init, wal, row, written, FAIL_I);
assert written != FAIL_I; // We can't fail here.
}
@@ -344,7 +343,7 @@ public class FreeListImpl extends PagesList implements FreeList, ReuseList {
long nextLink;
try (Page page = pageMem.page(cacheId, pageId)) {
- nextLink = writePage(page, this, rmvRow, null, itemId, FAIL_L);
+ nextLink = writePage(pageMem, page, this, rmvRow, null, itemId, FAIL_L);
assert nextLink != FAIL_L; // Can't fail here.
}
@@ -354,7 +353,7 @@ public class FreeListImpl extends PagesList implements FreeList, ReuseList {
pageId = PageIdUtils.pageId(nextLink);
try (Page page = pageMem.page(cacheId, pageId)) {
- nextLink = writePage(page, this, rmvRow, null, itemId, FAIL_L);
+ nextLink = writePage(pageMem, page, this, rmvRow, null, itemId, FAIL_L);
assert nextLink != FAIL_L; // Can't fail here.
}
@@ -380,7 +379,7 @@ public class FreeListImpl extends PagesList implements FreeList, ReuseList {
@Override public void addForRecycle(ReuseBag bag) throws IgniteCheckedException {
assert reuseList == this: "not allowed to be a reuse list";
- put(bag, null, null, REUSE_BUCKET);
+ put(bag, null, 0L, REUSE_BUCKET);
}
/** {@inheritDoc} */