You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/01/18 13:43:57 UTC
[40/50] [abbrv] ignite git commit: ignite-3477 PageMemory
optimizations - use page address instead of ByteBuffer to work with page
memory - got rid of pages pin/unpin - do not copy byte array for cache key
comparison - reduced size of data tree search ro
http://git-wip-us.apache.org/repos/asf/ignite/blob/7db65ddd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/BPlusIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/BPlusIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/BPlusIO.java
index 5fc3d25..c34296a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/BPlusIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/BPlusIO.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.database.tree.io;
import java.nio.ByteBuffer;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.PageUtils;
import org.apache.ignite.internal.processors.cache.database.tree.BPlusTree;
/**
@@ -72,56 +73,56 @@ public abstract class BPlusIO<L> extends PageIO {
}
/** {@inheritDoc} */
- @Override public void initNewPage(ByteBuffer buf, long pageId) {
- super.initNewPage(buf, pageId);
+ @Override public void initNewPage(long pageAddr, long pageId, int pageSize) {
+ super.initNewPage(pageAddr, pageId, pageSize);
- setCount(buf, 0);
- setForward(buf, 0);
- setRemoveId(buf, 0);
+ setCount(pageAddr, 0);
+ setForward(pageAddr, 0);
+ setRemoveId(pageAddr, 0);
}
/**
- * @param buf Buffer.
+ * @param pageAddr Page address.
* @return Forward page ID.
*/
- public final long getForward(ByteBuffer buf) {
- return buf.getLong(FORWARD_OFF);
+ public final long getForward(long pageAddr) {
+ return PageUtils.getLong(pageAddr, FORWARD_OFF);
}
/**
- * @param buf Buffer.
+ * @param pageAddr Page address.
* @param pageId Forward page ID.
*/
- public final void setForward(ByteBuffer buf, long pageId) {
- buf.putLong(FORWARD_OFF, pageId);
+ public final void setForward(long pageAddr, long pageId) {
+ PageUtils.putLong(pageAddr, FORWARD_OFF, pageId);
- assert getForward(buf) == pageId;
+ assert getForward(pageAddr) == pageId;
}
/**
- * @param buf Buffer.
+ * @param pageAddr Page address.
* @return Remove ID.
*/
- public final long getRemoveId(ByteBuffer buf) {
- return buf.getLong(REMOVE_ID_OFF);
+ public final long getRemoveId(long pageAddr) {
+ return PageUtils.getLong(pageAddr, REMOVE_ID_OFF);
}
/**
- * @param buf Buffer.
+ * @param pageAddr Page address.
* @param rmvId Remove ID.
*/
- public final void setRemoveId(ByteBuffer buf, long rmvId) {
- buf.putLong(REMOVE_ID_OFF, rmvId);
+ public final void setRemoveId(long pageAddr, long rmvId) {
+ PageUtils.putLong(pageAddr, REMOVE_ID_OFF, rmvId);
- assert getRemoveId(buf) == rmvId;
+ assert getRemoveId(pageAddr) == rmvId;
}
/**
- * @param buf Buffer.
+ * @param pageAddr Page address.
* @return Items count in the page.
*/
- public final int getCount(ByteBuffer buf) {
- int cnt = buf.getShort(CNT_OFF) & 0xFFFF;
+ public final int getCount(long pageAddr) {
+ int cnt = PageUtils.getShort(pageAddr, CNT_OFF) & 0xFFFF;
assert cnt >= 0: cnt;
@@ -129,20 +130,20 @@ public abstract class BPlusIO<L> extends PageIO {
}
/**
- * @param buf Buffer.
+ * @param pageAddr Page address.
* @param cnt Count.
*/
- public final void setCount(ByteBuffer buf, int cnt) {
+ public final void setCount(long pageAddr, int cnt) {
assert cnt >= 0: cnt;
- buf.putShort(CNT_OFF, (short)cnt);
+ PageUtils.putShort(pageAddr, CNT_OFF, (short)cnt);
- assert getCount(buf) == cnt;
+ assert getCount(pageAddr) == cnt;
}
/**
* @return {@code true} If we can get the full row from this page using
- * method {@link BPlusTree#getRow(BPlusIO, ByteBuffer, int)}.
+ * method {@link BPlusTree#getRow(BPlusIO, long, int)}.
* Must always be {@code true} for leaf pages.
*/
public final boolean canGetRow() {
@@ -157,27 +158,28 @@ public abstract class BPlusIO<L> extends PageIO {
}
/**
- * @param buf Buffer.
+ * @param pageAddr Page address.
+ * @param pageSize Page size.
* @return Max items count.
*/
- public abstract int getMaxCount(ByteBuffer buf);
+ public abstract int getMaxCount(long pageAddr, int pageSize);
/**
* Store the needed info about the row in the page. Leaf and inner pages can store different info.
*
- * @param buf Buffer.
+ * @param pageAddr Page address.
* @param idx Index.
* @param row Lookup or full row.
* @param rowBytes Row bytes.
* @throws IgniteCheckedException If failed.
*/
- public final void store(ByteBuffer buf, int idx, L row, byte[] rowBytes) throws IgniteCheckedException {
+ public final void store(long pageAddr, int idx, L row, byte[] rowBytes) throws IgniteCheckedException {
int off = offset(idx);
if (rowBytes == null)
- storeByOffset(buf, off, row);
+ storeByOffset(pageAddr, off, row);
else
- putBytes(buf, off, rowBytes);
+ putBytes(pageAddr, off, rowBytes);
}
/**
@@ -189,6 +191,17 @@ public abstract class BPlusIO<L> extends PageIO {
/**
* Store the needed info about the row in the page. Leaf and inner pages can store different info.
*
+ * @param pageAddr Page address.
+ * @param off Offset in bytes.
+ * @param row Lookup or full row.
+ * @throws IgniteCheckedException If failed.
+ */
+ public abstract void storeByOffset(long pageAddr, int off, L row) throws IgniteCheckedException;
+
+
+ /**
+ * Store the needed info about the row in the page. Leaf and inner pages can store different info.
+ *
* @param buf Buffer.
* @param off Offset in bytes.
* @param row Lookup or full row.
@@ -199,137 +212,141 @@ public abstract class BPlusIO<L> extends PageIO {
/**
* Store row info from the given source.
*
- * @param dst Destination buffer
+ * @param dstPageAddr Destination page address.
* @param dstIdx Destination index.
* @param srcIo Source IO.
- * @param src Source buffer.
+ * @param srcPageAddr Source page address.
* @param srcIdx Source index.
* @throws IgniteCheckedException If failed.
*/
- public abstract void store(ByteBuffer dst, int dstIdx, BPlusIO<L> srcIo, ByteBuffer src, int srcIdx)
+ public abstract void store(long dstPageAddr, int dstIdx, BPlusIO<L> srcIo, long srcPageAddr, int srcIdx)
throws IgniteCheckedException;
/**
* Get lookup row.
*
* @param tree Tree.
- * @param buf Buffer.
+ * @param pageAddr Page address.
* @param idx Index.
* @return Lookup row.
* @throws IgniteCheckedException If failed.
*/
- public abstract L getLookupRow(BPlusTree<L, ?> tree, ByteBuffer buf, int idx) throws IgniteCheckedException;
+ public abstract L getLookupRow(BPlusTree<L, ?> tree, long pageAddr, int idx) throws IgniteCheckedException;
/**
- * Copy items from source buffer to destination buffer.
+ * Copy items from source page to destination page.
* Both pages must be of the same type and the same version.
*
- * @param src Source buffer.
- * @param dst Destination buffer.
+ * @param srcPageAddr Source page address.
+ * @param dstPageAddr Destination page address.
* @param srcIdx Source begin index.
* @param dstIdx Destination begin index.
* @param cnt Items count.
* @param cpLeft Copy leftmost link (makes sense only for inner pages).
* @throws IgniteCheckedException If failed.
*/
- public abstract void copyItems(ByteBuffer src, ByteBuffer dst, int srcIdx, int dstIdx, int cnt, boolean cpLeft)
+ public abstract void copyItems(long srcPageAddr, long dstPageAddr, int srcIdx, int dstIdx, int cnt, boolean cpLeft)
throws IgniteCheckedException;
// Methods for B+Tree logic.
/**
- * @param buf Buffer.
+ * @param pageAddr Page address.
* @param idx Index.
* @param row Row to insert.
* @param rowBytes Row bytes.
* @param rightId Page ID which will be to the right child for the inserted item.
* @throws IgniteCheckedException If failed.
*/
- public void insert(ByteBuffer buf, int idx, L row, byte[] rowBytes, long rightId)
+ public void insert(long pageAddr, int idx, L row, byte[] rowBytes, long rightId)
throws IgniteCheckedException {
- int cnt = getCount(buf);
+ int cnt = getCount(pageAddr);
// Move right all the greater elements to make a free slot for a new row link.
- copyItems(buf, buf, idx, idx + 1, cnt - idx, false);
+ copyItems(pageAddr, pageAddr, idx, idx + 1, cnt - idx, false);
- setCount(buf, cnt + 1);
+ setCount(pageAddr, cnt + 1);
- store(buf, idx, row, rowBytes);
+ store(pageAddr, idx, row, rowBytes);
}
/**
- * @param buf Splitting buffer.
+ * @param pageAddr Splitting page address.
* @param fwdId Forward page ID.
- * @param fwdBuf Forward buffer.
+ * @param fwdPageAddr Forward page address.
* @param mid Bisection index.
* @param cnt Initial elements count in the page being split.
+ * @param pageSize Page size.
* @throws IgniteCheckedException If failed.
*/
public void splitForwardPage(
- ByteBuffer buf,
+ long pageAddr,
long fwdId,
- ByteBuffer fwdBuf,
+ long fwdPageAddr,
int mid,
- int cnt
+ int cnt,
+ int pageSize
) throws IgniteCheckedException {
- initNewPage(fwdBuf, fwdId);
+ initNewPage(fwdPageAddr, fwdId, pageSize);
cnt -= mid;
- copyItems(buf, fwdBuf, mid, 0, cnt, true);
+ copyItems(pageAddr, fwdPageAddr, mid, 0, cnt, true);
- setCount(fwdBuf, cnt);
- setForward(fwdBuf, getForward(buf));
+ setCount(fwdPageAddr, cnt);
+ setForward(fwdPageAddr, getForward(pageAddr));
// Copy remove ID to make sure that if inner remove touched this page, then retry
// will happen even for newly allocated forward page.
- setRemoveId(fwdBuf, getRemoveId(buf));
+ setRemoveId(fwdPageAddr, getRemoveId(pageAddr));
}
/**
- * @param buf Buffer.
+ * @param pageAddr Page address.
* @param mid Bisection index.
* @param fwdId New forward page ID.
*/
- public void splitExistingPage(ByteBuffer buf, int mid, long fwdId) {
- setCount(buf, mid);
- setForward(buf, fwdId);
+ public void splitExistingPage(long pageAddr, int mid, long fwdId) {
+ setCount(pageAddr, mid);
+ setForward(pageAddr, fwdId);
}
/**
- * @param buf Buffer.
+ * @param pageAddr Page address.
* @param idx Index.
* @param cnt Count.
* @throws IgniteCheckedException If failed.
*/
- public void remove(ByteBuffer buf, int idx, int cnt) throws IgniteCheckedException {
+ public void remove(long pageAddr, int idx, int cnt) throws IgniteCheckedException {
cnt--;
- copyItems(buf, buf, idx + 1, idx, cnt - idx, false);
- setCount(buf, cnt);
+ copyItems(pageAddr, pageAddr, idx + 1, idx, cnt - idx, false);
+ setCount(pageAddr, cnt);
}
/**
* @param prntIo Parent IO.
- * @param prnt Parent buffer.
+ * @param prntPageAddr Parent page address.
* @param prntIdx Split key index in parent.
- * @param left Left buffer.
- * @param right Right buffer.
+ * @param leftPageAddr Left page address.
+ * @param rightPageAddr Right page address.
* @param emptyBranch We are merging an empty branch.
+ * @param pageSize Page size.
* @return {@code false} If we were not able to merge.
* @throws IgniteCheckedException If failed.
*/
public boolean merge(
BPlusIO<L> prntIo,
- ByteBuffer prnt,
+ long prntPageAddr,
int prntIdx,
- ByteBuffer left,
- ByteBuffer right,
- boolean emptyBranch
+ long leftPageAddr,
+ long rightPageAddr,
+ boolean emptyBranch,
+ int pageSize
) throws IgniteCheckedException {
- int prntCnt = prntIo.getCount(prnt);
- int leftCnt = getCount(left);
- int rightCnt = getCount(right);
+ int prntCnt = prntIo.getCount(prntPageAddr);
+ int leftCnt = getCount(leftPageAddr);
+ int rightCnt = getCount(rightPageAddr);
int newCnt = leftCnt + rightCnt;
@@ -337,13 +354,13 @@ public abstract class BPlusIO<L> extends PageIO {
if (!isLeaf() && !emptyBranch)
newCnt++;
- if (newCnt > getMaxCount(left)) {
+ if (newCnt > getMaxCount(leftPageAddr, pageSize)) {
assert !emptyBranch;
return false;
}
- setCount(left, newCnt);
+ setCount(leftPageAddr, newCnt);
// Move down split key in inner pages.
if (!isLeaf() && !emptyBranch) {
@@ -351,37 +368,29 @@ public abstract class BPlusIO<L> extends PageIO {
// We can be sure that we have enough free space to store split key here,
// because we've done remove already and did not release child locks.
- store(left, leftCnt, prntIo, prnt, prntIdx);
+ store(leftPageAddr, leftCnt, prntIo, prntPageAddr, prntIdx);
leftCnt++;
}
- copyItems(right, left, 0, leftCnt, rightCnt, !emptyBranch);
- setForward(left, getForward(right));
+ copyItems(rightPageAddr, leftPageAddr, 0, leftCnt, rightCnt, !emptyBranch);
+ setForward(leftPageAddr, getForward(rightPageAddr));
- long rmvId = getRemoveId(right);
+ long rmvId = getRemoveId(rightPageAddr);
// Need to have maximum remove ID.
- if (rmvId > getRemoveId(left))
- setRemoveId(left, rmvId);
+ if (rmvId > getRemoveId(leftPageAddr))
+ setRemoveId(leftPageAddr, rmvId);
return true;
}
/**
- * @param buf Buffer.
- * @param pos Position in buffer.
+ * @param pageAddr Page address.
+ * @param pos Position in page.
* @param bytes Bytes.
*/
- private static void putBytes(ByteBuffer buf, int pos, byte[] bytes) {
- int oldPos = buf.position();
-
- try {
- buf.position(pos);
- buf.put(bytes);
- }
- finally {
- buf.position(oldPos);
- }
+ private static void putBytes(long pageAddr, int pos, byte[] bytes) {
+ PageUtils.putBytes(pageAddr, pos, bytes);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7db65ddd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/BPlusInnerIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/BPlusInnerIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/BPlusInnerIO.java
index 90b0f37..c7018bd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/BPlusInnerIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/BPlusInnerIO.java
@@ -17,8 +17,8 @@
package org.apache.ignite.internal.processors.cache.database.tree.io;
-import java.nio.ByteBuffer;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.PageUtils;
import org.apache.ignite.internal.processors.cache.database.tree.util.PageHandler;
/**
@@ -45,71 +45,71 @@ public abstract class BPlusInnerIO<L> extends BPlusIO<L> {
}
/** {@inheritDoc} */
- @Override public int getMaxCount(ByteBuffer buf) {
+ @Override public int getMaxCount(long pageAddr, int pageSize) {
// The structure of the page is the following:
// |ITEMS_OFF|w|A|x|B|y|C|z|
// where capital letters are data items, lowercase letters are 8 byte page references.
- return (buf.capacity() - ITEMS_OFF - 8) / (itemSize + 8);
+ return (pageSize - ITEMS_OFF - 8) / (itemSize + 8);
}
/**
- * @param buf Buffer.
+ * @param pageAddr Page address.
* @param idx Index.
* @return Page ID.
*/
- public final long getLeft(ByteBuffer buf, int idx) {
- return buf.getLong(offset(idx, SHIFT_LEFT));
+ public final long getLeft(long pageAddr, int idx) {
+ return PageUtils.getLong(pageAddr, (offset(idx, SHIFT_LEFT)));
}
/**
- * @param buf Buffer.
+ * @param pageAddr Page address.
* @param idx Index.
* @param pageId Page ID.
*/
- public final void setLeft(ByteBuffer buf, int idx, long pageId) {
- buf.putLong(offset(idx, SHIFT_LEFT), pageId);
+ public final void setLeft(long pageAddr, int idx, long pageId) {
+ PageUtils.putLong(pageAddr, offset(idx, SHIFT_LEFT), pageId);
- assert pageId == getLeft(buf, idx);
+ assert pageId == getLeft(pageAddr, idx);
}
/**
- * @param buf Buffer.
+ * @param pageAddr Page address.
* @param idx Index.
* @return Page ID.
*/
- public final long getRight(ByteBuffer buf, int idx) {
- return buf.getLong(offset(idx, SHIFT_RIGHT));
+ public final long getRight(long pageAddr, int idx) {
+ return PageUtils.getLong(pageAddr, offset(idx, SHIFT_RIGHT));
}
/**
- * @param buf Buffer.
+ * @param pageAddr Page address.
* @param idx Index.
* @param pageId Page ID.
*/
- public final void setRight(ByteBuffer buf, int idx, long pageId) {
- buf.putLong(offset(idx, SHIFT_RIGHT), pageId);
+ private void setRight(long pageAddr, int idx, long pageId) {
+ PageUtils.putLong(pageAddr, offset(idx, SHIFT_RIGHT), pageId);
- assert pageId == getRight(buf, idx);
+ assert pageId == getRight(pageAddr, idx);
}
/** {@inheritDoc} */
- @Override public final void copyItems(ByteBuffer src, ByteBuffer dst, int srcIdx, int dstIdx, int cnt,
+ @Override public final void copyItems(long srcPageAddr, long dstPageAddr, int srcIdx, int dstIdx, int cnt,
boolean cpLeft) throws IgniteCheckedException {
- assert srcIdx != dstIdx || src != dst;
+ assert srcIdx != dstIdx || srcPageAddr != dstPageAddr;
cnt *= itemSize + 8; // From items to bytes.
if (dstIdx > srcIdx) {
- PageHandler.copyMemory(src, dst, offset(srcIdx), offset(dstIdx), cnt);
+ PageHandler.copyMemory(srcPageAddr, dstPageAddr, offset(srcIdx), offset(dstIdx), cnt);
if (cpLeft)
- dst.putLong(offset(dstIdx, SHIFT_LEFT), src.getLong(offset(srcIdx, SHIFT_LEFT)));
+ PageUtils.putLong(dstPageAddr, offset(dstIdx, SHIFT_LEFT), PageUtils.getLong(srcPageAddr, (offset(srcIdx, SHIFT_LEFT))));
}
else {
if (cpLeft)
- dst.putLong(offset(dstIdx, SHIFT_LEFT), src.getLong(offset(srcIdx, SHIFT_LEFT)));
+ PageUtils.putLong(dstPageAddr, offset(dstIdx, SHIFT_LEFT), PageUtils.getLong(srcPageAddr, (offset(srcIdx, SHIFT_LEFT))));
- PageHandler.copyMemory(src, dst, offset(srcIdx), offset(dstIdx), cnt);
+ PageHandler.copyMemory(srcPageAddr, dstPageAddr, offset(srcIdx), offset(dstIdx), cnt);
}
}
@@ -131,39 +131,42 @@ public abstract class BPlusInnerIO<L> extends BPlusIO<L> {
/** {@inheritDoc} */
@Override public void insert(
- ByteBuffer buf,
+ long pageAddr,
int idx,
L row,
byte[] rowBytes,
long rightId
) throws IgniteCheckedException {
- super.insert(buf, idx, row, rowBytes, rightId);
+ super.insert(pageAddr, idx, row, rowBytes, rightId);
// Setup reference to the right page on split.
- setRight(buf, idx, rightId);
+ setRight(pageAddr, idx, rightId);
}
/**
- * @param newRootBuf New root buffer.
+ * @param newRootPageAddr New root page address.
* @param newRootId New root ID.
* @param leftChildId Left child ID.
* @param row Moved up row.
+ * @param rowBytes Bytes.
* @param rightChildId Right child ID.
+ * @param pageSize Page size.
* @throws IgniteCheckedException If failed.
*/
public void initNewRoot(
- ByteBuffer newRootBuf,
+ long newRootPageAddr,
long newRootId,
long leftChildId,
L row,
byte[] rowBytes,
- long rightChildId
+ long rightChildId,
+ int pageSize
) throws IgniteCheckedException {
- initNewPage(newRootBuf, newRootId);
+ initNewPage(newRootPageAddr, newRootId, pageSize);
- setCount(newRootBuf, 1);
- setLeft(newRootBuf, 0, leftChildId);
- store(newRootBuf, 0, row, rowBytes);
- setRight(newRootBuf, 0, rightChildId);
+ setCount(newRootPageAddr, 1);
+ setLeft(newRootPageAddr, 0, leftChildId);
+ store(newRootPageAddr, 0, row, rowBytes);
+ setRight(newRootPageAddr, 0, rightChildId);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7db65ddd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/BPlusLeafIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/BPlusLeafIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/BPlusLeafIO.java
index 824c943..f3dccee 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/BPlusLeafIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/BPlusLeafIO.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.processors.cache.database.tree.io;
-import java.nio.ByteBuffer;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.cache.database.tree.util.PageHandler;
@@ -35,16 +34,16 @@ public abstract class BPlusLeafIO<L> extends BPlusIO<L> {
}
/** {@inheritDoc} */
- @Override public int getMaxCount(ByteBuffer buf) {
- return (buf.capacity() - ITEMS_OFF) / itemSize;
+ @Override public int getMaxCount(long pageAddr, int pageSize) {
+ return (pageSize - ITEMS_OFF) / itemSize;
}
/** {@inheritDoc} */
- @Override public final void copyItems(ByteBuffer src, ByteBuffer dst, int srcIdx, int dstIdx, int cnt,
+ @Override public final void copyItems(long srcPageAddr, long dstPageAddr, int srcIdx, int dstIdx, int cnt,
boolean cpLeft) throws IgniteCheckedException {
- assert srcIdx != dstIdx || src != dst;
+ assert srcIdx != dstIdx || srcPageAddr != dstPageAddr;
- PageHandler.copyMemory(src, dst, offset(srcIdx), offset(dstIdx), cnt * itemSize);
+ PageHandler.copyMemory(srcPageAddr, dstPageAddr, offset(srcIdx), offset(dstIdx), cnt * itemSize);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/7db65ddd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/BPlusMetaIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/BPlusMetaIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/BPlusMetaIO.java
index 15a49ef..5925547 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/BPlusMetaIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/BPlusMetaIO.java
@@ -17,7 +17,7 @@
package org.apache.ignite.internal.processors.cache.database.tree.io;
-import java.nio.ByteBuffer;
+import org.apache.ignite.internal.pagemem.PageUtils;
/**
* IO routines for B+Tree meta pages.
@@ -37,45 +37,48 @@ public class BPlusMetaIO extends PageIO {
/**
* @param ver Page format version.
*/
- protected BPlusMetaIO(int ver) {
+ private BPlusMetaIO(int ver) {
super(T_BPLUS_META, ver);
}
/**
- * @param buf Buffer.
+ * @param pageAdrr Page address.
* @param rootId Root page ID.
+ * @param pageSize Page size.
*/
- public void initRoot(ByteBuffer buf, long rootId) {
- setLevelsCount(buf, 1);
- setFirstPageId(buf, 0, rootId);
+ public void initRoot(long pageAdrr, long rootId, int pageSize) {
+ setLevelsCount(pageAdrr, 1, pageSize);
+ setFirstPageId(pageAdrr, 0, rootId);
}
/**
- * @param buf Buffer.
+ * @param pageAddr Page address.
* @return Number of levels in this tree.
*/
- public int getLevelsCount(ByteBuffer buf) {
- return buf.get(LVLS_OFF);
+ public int getLevelsCount(long pageAddr) {
+ return PageUtils.getByte(pageAddr, LVLS_OFF);
}
/**
- * @param buf Buffer.
+ * @param pageAddr Page address.
+ * @param pageSize Page size.
* @return Max levels possible for this page size.
*/
- public int getMaxLevels(ByteBuffer buf) {
- return (buf.capacity() - REFS_OFF) / 8;
+ private int getMaxLevels(long pageAddr, int pageSize) {
+ return (pageSize - REFS_OFF) / 8;
}
/**
- * @param buf Buffer.
+ * @param pageAddr Page address.
* @param lvls Number of levels in this tree.
+ * @param pageSize Page size.
*/
- public void setLevelsCount(ByteBuffer buf, int lvls) {
- assert lvls >= 0 && lvls <= getMaxLevels(buf): lvls;
+ private void setLevelsCount(long pageAddr, int lvls, int pageSize) {
+ assert lvls >= 0 && lvls <= getMaxLevels(pageAddr, pageSize) : lvls;
- buf.put(LVLS_OFF, (byte)lvls);
+ PageUtils.putByte(pageAddr, LVLS_OFF, (byte)lvls);
- assert getLevelsCount(buf) == lvls;
+ assert getLevelsCount(pageAddr) == lvls;
}
/**
@@ -87,33 +90,33 @@ public class BPlusMetaIO extends PageIO {
}
/**
- * @param buf Buffer.
+ * @param pageAddr Page address.
* @param lvl Level.
* @return First page ID at that level.
*/
- public long getFirstPageId(ByteBuffer buf, int lvl) {
- return buf.getLong(offset(lvl));
+ public long getFirstPageId(long pageAddr, int lvl) {
+ return PageUtils.getLong(pageAddr, offset(lvl));
}
/**
- * @param buf Buffer.
+ * @param pageAddr Page address.
* @param lvl Level.
* @param pageId Page ID.
*/
- public void setFirstPageId(ByteBuffer buf, int lvl, long pageId) {
- assert lvl >= 0 && lvl < getLevelsCount(buf);
+ private void setFirstPageId(long pageAddr, int lvl, long pageId) {
+ assert lvl >= 0 && lvl < getLevelsCount(pageAddr);
- buf.putLong(offset(lvl), pageId);
+ PageUtils.putLong(pageAddr, offset(lvl), pageId);
- assert getFirstPageId(buf, lvl) == pageId;
+ assert getFirstPageId(pageAddr, lvl) == pageId;
}
/**
- * @param buf Buffer.
+ * @param pageAddr Page address.
* @return Root level.
*/
- public int getRootLevel(ByteBuffer buf) {
- int lvls = getLevelsCount(buf); // The highest level page is root.
+ public int getRootLevel(long pageAddr) {
+ int lvls = getLevelsCount(pageAddr); // The highest level page is root.
assert lvls > 0 : lvls;
@@ -121,22 +124,24 @@ public class BPlusMetaIO extends PageIO {
}
/**
- * @param buf Buffer.
+ * @param pageAddr Page address.
* @param rootPageId New root page ID.
+ * @param pageSize Page size.
*/
- public void addRoot(ByteBuffer buf, long rootPageId) {
- int lvl = getLevelsCount(buf);
+ public void addRoot(long pageAddr, long rootPageId, int pageSize) {
+ int lvl = getLevelsCount(pageAddr);
- setLevelsCount(buf, lvl + 1);
- setFirstPageId(buf, lvl, rootPageId);
+ setLevelsCount(pageAddr, lvl + 1, pageSize);
+ setFirstPageId(pageAddr, lvl, rootPageId);
}
/**
- * @param buf Buffer.
+ * @param pageAddr Page address.
+ * @param pageSize Page size.
*/
- public void cutRoot(ByteBuffer buf) {
- int lvl = getRootLevel(buf);
+ public void cutRoot(long pageAddr, int pageSize) {
+ int lvl = getRootLevel(pageAddr);
- setLevelsCount(buf, lvl); // Decrease tree height.
+ setLevelsCount(pageAddr, lvl, pageSize); // Decrease tree height.
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7db65ddd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/CacheVersionIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/CacheVersionIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/CacheVersionIO.java
index 5e9fd6d..8a630cc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/CacheVersionIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/CacheVersionIO.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.database.tree.io;
import java.nio.ByteBuffer;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.PageUtils;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
/**
@@ -77,6 +78,29 @@ public class CacheVersionIO {
}
/**
+ * @param addr Write address.
+ * @param ver Version to write.
+ * @param allowNull Is {@code null} version allowed.
+ */
+ public static void write(long addr, GridCacheVersion ver, boolean allowNull) {
+ if (ver == null) {
+ if (allowNull)
+ PageUtils.putByte(addr, 0, NULL_PROTO_VER);
+ else
+ throw new IllegalStateException("Cache version is null");
+ }
+ else {
+ byte protoVer = 1; // Version of serialization protocol.
+
+ PageUtils.putByte(addr, 0, protoVer);
+ PageUtils.putInt(addr, 1, ver.topologyVersion());
+ PageUtils.putInt(addr, 5, ver.nodeOrderAndDrIdRaw());
+ PageUtils.putLong(addr, 9, ver.globalTime());
+ PageUtils.putLong(addr, 17, ver.order());
+ }
+ }
+
+ /**
* @param protoVer Serialization protocol version.
* @param allowNull Is {@code null} version allowed.
* @return Protocol version.
@@ -139,4 +163,26 @@ public class CacheVersionIO {
return new GridCacheVersion(topVer, nodeOrderDrId, globalTime, order);
}
+
+ /**
+ * Reads GridCacheVersion instance from the given address.
+ *
+ * @param pageAddr Page address.
+ * @param allowNull Is {@code null} version allowed.
+ * @return Version.
+ * @throws IgniteCheckedException If failed.
+ */
+ public static GridCacheVersion read(long pageAddr, boolean allowNull) throws IgniteCheckedException {
+ byte protoVer = checkProtocolVersion(PageUtils.getByte(pageAddr, 0), allowNull);
+
+ if (protoVer == NULL_PROTO_VER)
+ return null;
+
+ int topVer = PageUtils.getInt(pageAddr, 1);
+ int nodeOrderDrId = PageUtils.getInt(pageAddr, 5);
+ long globalTime = PageUtils.getLong(pageAddr, 9);
+ long order = PageUtils.getLong(pageAddr, 17);
+
+ return new GridCacheVersion(topVer, nodeOrderDrId, globalTime, order);
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7db65ddd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/DataPageIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/DataPageIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/DataPageIO.java
index a69caab..548e300 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/DataPageIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/DataPageIO.java
@@ -23,6 +23,8 @@ import java.util.Collection;
import java.util.HashSet;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.pagemem.PageIdUtils;
+import org.apache.ignite.internal.pagemem.PageMemory;
+import org.apache.ignite.internal.pagemem.PageUtils;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.database.CacheDataRow;
import org.apache.ignite.internal.processors.cache.database.tree.util.PageHandler;
@@ -88,47 +90,48 @@ public class DataPageIO extends PageIO {
}
/** {@inheritDoc} */
- @Override public void initNewPage(ByteBuffer buf, long pageId) {
- super.initNewPage(buf, pageId);
+ @Override public void initNewPage(long pageAddr, long pageId, int pageSize) {
+ super.initNewPage(pageAddr, pageId, pageSize);
- setEmptyPage(buf);
- setFreeListPageId(buf, 0L);
+ setEmptyPage(pageAddr, pageSize);
+ setFreeListPageId(pageAddr, 0L);
}
/**
- * @param buf Buffer.
+ * @param pageAddr Page address.
+ * @param pageSize Page size.
*/
- private void setEmptyPage(ByteBuffer buf) {
- setDirectCount(buf, 0);
- setIndirectCount(buf, 0);
- setFirstEntryOffset(buf, buf.capacity());
- setRealFreeSpace(buf, buf.capacity() - ITEMS_OFF);
+ private void setEmptyPage(long pageAddr, int pageSize) {
+ setDirectCount(pageAddr, 0);
+ setIndirectCount(pageAddr, 0);
+ setFirstEntryOffset(pageAddr, pageSize, pageSize);
+ setRealFreeSpace(pageAddr, pageSize - ITEMS_OFF, pageSize);
}
/**
- * @param buf Byte buffer.
+ * @param pageAddr Page address.
* @param freeListPageId Free list page ID.
*/
- public void setFreeListPageId(ByteBuffer buf, long freeListPageId) {
- buf.putLong(FREE_LIST_PAGE_ID_OFF, freeListPageId);
+ public void setFreeListPageId(long pageAddr, long freeListPageId) {
+ PageUtils.putLong(pageAddr, FREE_LIST_PAGE_ID_OFF, freeListPageId);
}
/**
- * @param buf Byte buffer.
+ * @param pageAddr Page address.
* @return Free list page ID.
*/
- public long getFreeListPageId(ByteBuffer buf) {
- return buf.getLong(FREE_LIST_PAGE_ID_OFF);
+ public long getFreeListPageId(long pageAddr) {
+ return PageUtils.getLong(pageAddr, FREE_LIST_PAGE_ID_OFF);
}
/**
- * @param buf Buffer.
+ * @param pageAddr Page address.
* @param dataOff Data offset.
* @param show What elements of data page entry to show in the result.
* @return Data page entry size.
*/
- private int getPageEntrySize(ByteBuffer buf, int dataOff, int show) {
- int payloadLen = buf.getShort(dataOff) & 0xFFFF;
+ private int getPageEntrySize(long pageAddr, int dataOff, int show) {
+ int payloadLen = PageUtils.getShort(pageAddr, dataOff) & 0xFFFF;
if ((payloadLen & FRAGMENTED_FLAG) != 0)
payloadLen &= ~FRAGMENTED_FLAG; // We are fragmented and have a link.
@@ -144,7 +147,7 @@ public class DataPageIO extends PageIO {
* @return Data page entry size.
*/
private int getPageEntrySize(int payloadLen, int show) {
- assert payloadLen > 0: payloadLen;
+ assert payloadLen > 0 : payloadLen;
int res = payloadLen;
@@ -161,45 +164,47 @@ public class DataPageIO extends PageIO {
}
/**
- * @param buf Buffer.
+ * @param pageAddr Page address.
* @param dataOff Entry data offset.
+ * @param pageSize Page size.
*/
- private void setFirstEntryOffset(ByteBuffer buf, int dataOff) {
- assert dataOff >= ITEMS_OFF + ITEM_SIZE && dataOff <= buf.capacity(): dataOff;
+ private void setFirstEntryOffset(long pageAddr, int dataOff, int pageSize) {
+ assert dataOff >= ITEMS_OFF + ITEM_SIZE && dataOff <= pageSize : dataOff;
- buf.putShort(FIRST_ENTRY_OFF, (short)dataOff);
+ PageUtils.putShort(pageAddr, FIRST_ENTRY_OFF, (short)dataOff);
}
/**
- * @param buf Buffer.
+ * @param pageAddr Page address.
* @return Entry data offset.
*/
- private int getFirstEntryOffset(ByteBuffer buf) {
- return buf.getShort(FIRST_ENTRY_OFF) & 0xFFFF;
+ private int getFirstEntryOffset(long pageAddr) {
+ return PageUtils.getShort(pageAddr, FIRST_ENTRY_OFF) & 0xFFFF;
}
/**
- * @param buf Buffer.
+ * @param pageAddr Page address.
* @param freeSpace Free space.
+ * @param pageSize Page size.
*/
- private void setRealFreeSpace(ByteBuffer buf, int freeSpace) {
- assert freeSpace == actualFreeSpace(buf): freeSpace + " != " + actualFreeSpace(buf);
+ private void setRealFreeSpace(long pageAddr, int freeSpace, int pageSize) {
+ assert freeSpace == actualFreeSpace(pageAddr, pageSize) : freeSpace + " != " + actualFreeSpace(pageAddr, pageSize);
- buf.putShort(FREE_SPACE_OFF, (short)freeSpace);
+ PageUtils.putShort(pageAddr, FREE_SPACE_OFF, (short)freeSpace);
}
/**
* Free space refers to a "max row size (without any data page specific overhead) which is
* guaranteed to fit into this data page".
*
- * @param buf Buffer.
+ * @param pageAddr Page address.
* @return Free space.
*/
- public int getFreeSpace(ByteBuffer buf) {
- if (getFreeItemSlots(buf) == 0)
+ public int getFreeSpace(long pageAddr) {
+ if (getFreeItemSlots(pageAddr) == 0)
return 0;
- int freeSpace = getRealFreeSpace(buf);
+ int freeSpace = getRealFreeSpace(pageAddr);
// We reserve size here because of getFreeSpace() method semantics (see method javadoc).
// It means that we must be able to accommodate a row of size which is equal to getFreeSpace(),
@@ -211,48 +216,49 @@ public class DataPageIO extends PageIO {
}
/**
+ * @param pageAddr Page address.
* @return {@code true} If there is no useful data in this page.
*/
- public boolean isEmpty(ByteBuffer buf) {
- return getDirectCount(buf) == 0;
+ public boolean isEmpty(long pageAddr) {
+ return getDirectCount(pageAddr) == 0;
}
/**
- * Equivalent for {@link #actualFreeSpace(ByteBuffer)} but reads saved value.
+ * Equivalent for {@link #actualFreeSpace(long, int)} but reads saved value.
*
- * @param buf Buffer.
+ * @param pageAddr Page address.
* @return Free space.
*/
- private int getRealFreeSpace(ByteBuffer buf) {
- return buf.getShort(FREE_SPACE_OFF);
+ private int getRealFreeSpace(long pageAddr) {
+ return PageUtils.getShort(pageAddr, FREE_SPACE_OFF);
}
/**
- * @param buf Buffer.
+ * @param pageAddr Page address.
* @param cnt Direct count.
*/
- private void setDirectCount(ByteBuffer buf, int cnt) {
+ private void setDirectCount(long pageAddr, int cnt) {
assert checkCount(cnt): cnt;
- buf.put(DIRECT_CNT_OFF, (byte)cnt);
+ PageUtils.putByte(pageAddr, DIRECT_CNT_OFF, (byte)cnt);
}
/**
- * @param buf Buffer.
+ * @param pageAddr Page address.
* @return Direct count.
*/
- private int getDirectCount(ByteBuffer buf) {
- return buf.get(DIRECT_CNT_OFF) & 0xFF;
+ private int getDirectCount(long pageAddr) {
+ return PageUtils.getByte(pageAddr, DIRECT_CNT_OFF) & 0xFF;
}
/**
- * @param buf Buffer.
+ * @param pageAddr Page address.
* @param cnt Indirect count.
*/
- private void setIndirectCount(ByteBuffer buf, int cnt) {
+ private void setIndirectCount(long pageAddr, int cnt) {
assert checkCount(cnt): cnt;
- buf.put(INDIRECT_CNT_OFF, (byte)cnt);
+ PageUtils.putByte(pageAddr, INDIRECT_CNT_OFF, (byte)cnt);
}
/**
@@ -272,36 +278,36 @@ public class DataPageIO extends PageIO {
}
/**
- * @param buf Buffer.
+ * @param pageAddr Page address.
* @return Indirect count.
*/
- private int getIndirectCount(ByteBuffer buf) {
- return buf.get(INDIRECT_CNT_OFF) & 0xFF;
+ private int getIndirectCount(long pageAddr) {
+ return PageUtils.getByte(pageAddr, INDIRECT_CNT_OFF) & 0xFF;
}
/**
- * @param buf Buffer.
+ * @param pageAddr Page address.
* @return Number of free entry slots.
*/
- private int getFreeItemSlots(ByteBuffer buf) {
- return 0xFF - getDirectCount(buf);
+ private int getFreeItemSlots(long pageAddr) {
+ return 0xFF - getDirectCount(pageAddr);
}
/**
- * @param buf Buffer.
+ * @param pageAddr Page address.
* @param itemId Fixed item ID (the index used for referencing an entry from the outside).
* @param directCnt Direct items count.
* @param indirectCnt Indirect items count.
* @return Found index of indirect item.
*/
- private int findIndirectItemIndex(ByteBuffer buf, int itemId, int directCnt, int indirectCnt) {
+ private int findIndirectItemIndex(long pageAddr, int itemId, int directCnt, int indirectCnt) {
int low = directCnt;
int high = directCnt + indirectCnt - 1;
while (low <= high) {
int mid = (low + high) >>> 1;
- int cmp = Integer.compare(itemId(getItem(buf, mid)), itemId);
+ int cmp = Integer.compare(itemId(getItem(pageAddr, mid)), itemId);
if (cmp < 0)
low = mid + 1;
@@ -315,19 +321,20 @@ public class DataPageIO extends PageIO {
}
/**
- * @param buf Buffer.
+ * @param pageAddr Page address.
+ * @param pageSize Page size.
* @return String representation.
*/
- public String printPageLayout(ByteBuffer buf) {
- int directCnt = getDirectCount(buf);
- int indirectCnt = getIndirectCount(buf);
- int free = getRealFreeSpace(buf);
+ private String printPageLayout(long pageAddr, int pageSize) {
+ int directCnt = getDirectCount(pageAddr);
+ int indirectCnt = getIndirectCount(pageAddr);
+ int free = getRealFreeSpace(pageAddr);
boolean valid = directCnt >= indirectCnt;
SB b = new SB();
- b.appendHex(PageIO.getPageId(buf)).a(" [");
+ b.appendHex(PageIO.getPageId(pageAddr)).a(" [");
int entriesSize = 0;
@@ -335,12 +342,12 @@ public class DataPageIO extends PageIO {
if (i != 0)
b.a(", ");
- short item = getItem(buf, i);
+ short item = getItem(pageAddr, i);
- if (item < ITEMS_OFF || item >= buf.capacity())
+ if (item < ITEMS_OFF || item >= pageSize)
valid = false;
- entriesSize += getPageEntrySize(buf, item, SHOW_PAYLOAD_LEN | SHOW_LINK);
+ entriesSize += getPageEntrySize(pageAddr, item, SHOW_PAYLOAD_LEN | SHOW_LINK);
b.a(item);
}
@@ -353,7 +360,7 @@ public class DataPageIO extends PageIO {
if (i != directCnt)
b.a(", ");
- short item = getItem(buf, i);
+ short item = getItem(pageAddr, i);
int itemId = itemId(item);
int directIdx = directItemIndex(item);
@@ -366,7 +373,7 @@ public class DataPageIO extends PageIO {
if (itemId < directCnt || directIdx < 0 || directIdx >= directCnt)
valid = false;
- if (i > directCnt && itemId(getItem(buf, i - 1)) >= itemId)
+ if (i > directCnt && itemId(getItem(pageAddr, i - 1)) >= itemId)
valid = false;
@@ -375,7 +382,7 @@ public class DataPageIO extends PageIO {
b.a("][free=").a(free);
- int actualFree = buf.capacity() - ITEMS_OFF - (entriesSize + (directCnt + indirectCnt) * ITEM_SIZE);
+ int actualFree = pageSize - ITEMS_OFF - (entriesSize + (directCnt + indirectCnt) * ITEM_SIZE);
if (free != actualFree) {
b.a(", actualFree=").a(actualFree);
@@ -391,94 +398,94 @@ public class DataPageIO extends PageIO {
}
/**
- * @param buf Buffer.
+ * @param pageAddr Page address.
* @param itemId Fixed item ID (the index used for referencing an entry from the outside).
+ * @param pageSize Page size.
* @return Data entry offset in bytes.
*/
- private int getDataOffset(ByteBuffer buf, int itemId) {
+ private int getDataOffset(long pageAddr, int itemId, int pageSize) {
assert checkIndex(itemId): itemId;
- int directCnt = getDirectCount(buf);
+ int directCnt = getDirectCount(pageAddr);
- assert directCnt > 0: "itemId=" + itemId + ", directCnt=" + directCnt + ", page=" + printPageLayout(buf);
+ assert directCnt > 0: "itemId=" + itemId + ", directCnt=" + directCnt + ", page=" + printPageLayout(pageAddr, pageSize);
if (itemId >= directCnt) { // Need to do indirect lookup.
- int indirectCnt = getIndirectCount(buf);
+ int indirectCnt = getIndirectCount(pageAddr);
// Must have indirect items here.
assert indirectCnt > 0: "itemId=" + itemId + ", directCnt=" + directCnt + ", indirectCnt=" + indirectCnt +
- ", page=" + printPageLayout(buf);
+ ", page=" + printPageLayout(pageAddr, pageSize);
- int indirectItemIdx = findIndirectItemIndex(buf, itemId, directCnt, indirectCnt);
+ int indirectItemIdx = findIndirectItemIndex(pageAddr, itemId, directCnt, indirectCnt);
assert indirectItemIdx >= directCnt : indirectItemIdx + " " + directCnt;
assert indirectItemIdx < directCnt + indirectCnt: indirectItemIdx + " " + directCnt + " " + indirectCnt;
- itemId = directItemIndex(getItem(buf, indirectItemIdx));
+ itemId = directItemIndex(getItem(pageAddr, indirectItemIdx));
assert itemId >= 0 && itemId < directCnt: itemId + " " + directCnt + " " + indirectCnt; // Direct item.
}
- return directItemToOffset(getItem(buf, itemId));
+ return directItemToOffset(getItem(pageAddr, itemId));
}
/**
- * @param buf Byte buffer.
+ * @param pageAddr Page address.
* @param dataOff Points to the entry start.
* @return Link to the next entry fragment or 0 if no fragments left or if entry is not fragmented.
*/
- private long getNextFragmentLink(ByteBuffer buf, int dataOff) {
- assert isFragmented(buf, dataOff);
+ private long getNextFragmentLink(long pageAddr, int dataOff) {
+ assert isFragmented(pageAddr, dataOff);
- return buf.getLong(dataOff + PAYLOAD_LEN_SIZE);
+ return PageUtils.getLong(pageAddr, dataOff + PAYLOAD_LEN_SIZE);
}
/**
- * @param buf Buffer.
+ * @param pageAddr Page address.
* @param dataOff Data offset.
* @return {@code true} If the data row is fragmented across multiple pages.
*/
- private boolean isFragmented(ByteBuffer buf, int dataOff) {
- return (buf.getShort(dataOff) & FRAGMENTED_FLAG) != 0;
+ private boolean isFragmented(long pageAddr, int dataOff) {
+ return (PageUtils.getShort(pageAddr, dataOff) & FRAGMENTED_FLAG) != 0;
}
/**
* Sets position to start of actual fragment data and limit to it's end.
*
- * @param buf Byte buffer.
+ * @param pageAddr Page address.
* @param itemId Item to position on.
+ * @param pageSize Page size.
* @return Link to the next fragment or {@code 0} if it is the last fragment or the data row is not fragmented.
*/
- public long setPositionAndLimitOnPayload(final ByteBuffer buf, final int itemId) {
- int dataOff = getDataOffset(buf, itemId);
-
- boolean fragmented = isFragmented(buf, dataOff);
- long nextLink = fragmented ? getNextFragmentLink(buf, dataOff) : 0;
- int payloadSize = getPageEntrySize(buf, dataOff, 0);
+ public DataPagePayload readPayload(final long pageAddr, final int itemId, final int pageSize) {
+ int dataOff = getDataOffset(pageAddr, itemId, pageSize);
- buf.position(dataOff + PAYLOAD_LEN_SIZE + (fragmented ? LINK_SIZE : 0));
+ boolean fragmented = isFragmented(pageAddr, dataOff);
+ long nextLink = fragmented ? getNextFragmentLink(pageAddr, dataOff) : 0;
+ int payloadSize = getPageEntrySize(pageAddr, dataOff, 0);
- buf.limit(buf.position() + payloadSize);
-
- return nextLink;
+ return new DataPagePayload(dataOff + PAYLOAD_LEN_SIZE + (fragmented ? LINK_SIZE : 0),
+ payloadSize,
+ nextLink);
}
/**
- * @param buf Buffer.
+ * @param pageAddr Page address.
* @param idx Item index.
* @return Item.
*/
- private short getItem(ByteBuffer buf, int idx) {
- return buf.getShort(itemOffset(idx));
+ private short getItem(long pageAddr, int idx) {
+ return PageUtils.getShort(pageAddr, itemOffset(idx));
}
/**
- * @param buf Buffer.
+ * @param pageAddr Page address.
* @param idx Item index.
* @param item Item.
*/
- private void setItem(ByteBuffer buf, int idx, short item) {
- buf.putShort(itemOffset(idx), item);
+ private void setItem(long pageAddr, int idx, short item) {
+ PageUtils.putShort(pageAddr, itemOffset(idx), item);
}
/**
@@ -540,14 +547,14 @@ public class DataPageIO extends PageIO {
/**
* Move the last direct item to the free slot and reference it with indirect item on the same place.
*
- * @param buf Buffer.
+ * @param pageAddr Page address.
* @param freeDirectIdx Free slot.
* @param directCnt Direct items count.
* @param indirectCnt Indirect items count.
* @return {@code true} If the last direct item already had corresponding indirect item.
*/
- private boolean moveLastItem(ByteBuffer buf, int freeDirectIdx, int directCnt, int indirectCnt) {
- int lastIndirectId = findIndirectIndexForLastDirect(buf, directCnt, indirectCnt);
+ private boolean moveLastItem(long pageAddr, int freeDirectIdx, int directCnt, int indirectCnt) {
+ int lastIndirectId = findIndirectIndexForLastDirect(pageAddr, directCnt, indirectCnt);
int lastItemId = directCnt - 1;
@@ -557,13 +564,13 @@ public class DataPageIO extends PageIO {
assert itemId(indirectItem) == lastItemId && directItemIndex(indirectItem) == freeDirectIdx;
- setItem(buf, freeDirectIdx, getItem(buf, lastItemId));
- setItem(buf, lastItemId, indirectItem);
+ setItem(pageAddr, freeDirectIdx, getItem(pageAddr, lastItemId));
+ setItem(pageAddr, lastItemId, indirectItem);
- assert getItem(buf, lastItemId) == indirectItem;
+ assert getItem(pageAddr, lastItemId) == indirectItem;
if (lastIndirectId != -1) { // Fix pointer to direct item.
- setItem(buf, lastIndirectId, indirectItem(itemId(getItem(buf, lastIndirectId)), freeDirectIdx));
+ setItem(pageAddr, lastIndirectId, indirectItem(itemId(getItem(pageAddr, lastIndirectId)), freeDirectIdx));
return true;
}
@@ -572,16 +579,16 @@ public class DataPageIO extends PageIO {
}
/**
- * @param buf Buffer.
+ * @param pageAddr Page address.
* @param directCnt Direct items count.
* @param indirectCnt Indirect items count.
* @return Index of indirect item for the last direct item.
*/
- private int findIndirectIndexForLastDirect(ByteBuffer buf, int directCnt, int indirectCnt) {
+ private int findIndirectIndexForLastDirect(long pageAddr, int directCnt, int indirectCnt) {
int lastDirectId = directCnt - 1;
for (int i = directCnt, end = directCnt + indirectCnt; i < end; i++) {
- short item = getItem(buf, i);
+ short item = getItem(pageAddr, i);
if (directItemIndex(item) == lastDirectId)
return i;
@@ -591,20 +598,21 @@ public class DataPageIO extends PageIO {
}
/**
- * @param buf Buffer.
+ * @param pageAddr Page address.
* @param itemId Fixed item ID (the index used for referencing an entry from the outside).
+ * @param pageSize Page size.
* @return Next link for fragmented entries or {@code 0} if none.
* @throws IgniteCheckedException If failed.
*/
- public long removeRow(ByteBuffer buf, int itemId) throws IgniteCheckedException {
+ public long removeRow(long pageAddr, int itemId, int pageSize) throws IgniteCheckedException {
assert checkIndex(itemId) : itemId;
- final int dataOff = getDataOffset(buf, itemId);
- final long nextLink = isFragmented(buf, dataOff) ? getNextFragmentLink(buf, dataOff) : 0;
+ final int dataOff = getDataOffset(pageAddr, itemId, pageSize);
+ final long nextLink = isFragmented(pageAddr, dataOff) ? getNextFragmentLink(pageAddr, dataOff) : 0;
// Record original counts to calculate delta in free space in the end of remove.
- final int directCnt = getDirectCount(buf);
- final int indirectCnt = getIndirectCount(buf);
+ final int directCnt = getDirectCount(pageAddr);
+ final int indirectCnt = getIndirectCount(pageAddr);
int curIndirectCnt = indirectCnt;
@@ -613,24 +621,24 @@ public class DataPageIO extends PageIO {
// Remove the last item on the page.
if (directCnt == 1) {
assert (indirectCnt == 0 && itemId == 0) ||
- (indirectCnt == 1 && itemId == itemId(getItem(buf, 1))) : itemId;
+ (indirectCnt == 1 && itemId == itemId(getItem(pageAddr, 1))) : itemId;
- setEmptyPage(buf);
+ setEmptyPage(pageAddr, pageSize);
}
else {
// Get the entry size before the actual remove.
- int rmvEntrySize = getPageEntrySize(buf, dataOff, SHOW_PAYLOAD_LEN | SHOW_LINK);
+ int rmvEntrySize = getPageEntrySize(pageAddr, dataOff, SHOW_PAYLOAD_LEN | SHOW_LINK);
int indirectId = 0;
if (itemId >= directCnt) { // Need to remove indirect item.
assert indirectCnt > 0;
- indirectId = findIndirectItemIndex(buf, itemId, directCnt, indirectCnt);
+ indirectId = findIndirectItemIndex(pageAddr, itemId, directCnt, indirectCnt);
assert indirectId >= directCnt;
- itemId = directItemIndex(getItem(buf, indirectId));
+ itemId = directItemIndex(getItem(pageAddr, indirectId));
assert itemId < directCnt;
}
@@ -638,48 +646,50 @@ public class DataPageIO extends PageIO {
boolean dropLast = true;
if (itemId + 1 < directCnt) // It is not the last direct item.
- dropLast = moveLastItem(buf, itemId, directCnt, indirectCnt);
+ dropLast = moveLastItem(pageAddr, itemId, directCnt, indirectCnt);
if (indirectId == 0) {// For the last direct item with no indirect item.
if (dropLast)
- moveItems(buf, directCnt, indirectCnt, -1);
+ moveItems(pageAddr, directCnt, indirectCnt, -1, pageSize);
else
curIndirectCnt++;
}
else {
if (dropLast)
- moveItems(buf, directCnt, indirectId - directCnt, -1);
+ moveItems(pageAddr, directCnt, indirectId - directCnt, -1, pageSize);
- moveItems(buf, indirectId + 1, directCnt + indirectCnt - indirectId - 1, dropLast ? -2 : -1);
+ moveItems(pageAddr, indirectId + 1, directCnt + indirectCnt - indirectId - 1, dropLast ? -2 : -1, pageSize);
if (dropLast)
curIndirectCnt--;
}
- setIndirectCount(buf, curIndirectCnt);
- setDirectCount(buf, directCnt - 1);
+ setIndirectCount(pageAddr, curIndirectCnt);
+ setDirectCount(pageAddr, directCnt - 1);
- assert getIndirectCount(buf) <= getDirectCount(buf);
+ assert getIndirectCount(pageAddr) <= getDirectCount(pageAddr);
// Increase free space.
- setRealFreeSpace(buf, getRealFreeSpace(buf) + rmvEntrySize +
- ITEM_SIZE * (directCnt - getDirectCount(buf) + indirectCnt - getIndirectCount(buf)));
+ setRealFreeSpace(pageAddr,
+ getRealFreeSpace(pageAddr) + rmvEntrySize + ITEM_SIZE * (directCnt - getDirectCount(pageAddr) + indirectCnt - getIndirectCount(pageAddr)),
+ pageSize);
}
return nextLink;
}
/**
- * @param buf Buffer.
+ * @param pageAddr Page address.
* @param idx Index.
* @param cnt Count.
* @param step Step.
+ * @param pageSize Page size.
*/
- private void moveItems(ByteBuffer buf, int idx, int cnt, int step) {
+ private void moveItems(long pageAddr, int idx, int cnt, int step, int pageSize) {
assert cnt >= 0: cnt;
if (cnt != 0)
- moveBytes(buf, itemOffset(idx), cnt * ITEM_SIZE, step * ITEM_SIZE);
+ moveBytes(pageAddr, itemOffset(idx), cnt * ITEM_SIZE, step * ITEM_SIZE, pageSize);
}
/**
@@ -696,73 +706,80 @@ public class DataPageIO extends PageIO {
/**
* Adds row to this data page and sets respective link to the given row object.
*
- * @param buf Buffer.
+ * @param pageAddr Page address.
* @param row Cache data row.
* @param rowSize Row size.
+ * @param pageSize Page size.
* @throws IgniteCheckedException If failed.
*/
public void addRow(
- ByteBuffer buf,
+ final long pageAddr,
CacheDataRow row,
- int rowSize
+ final int rowSize,
+ final int pageSize
) throws IgniteCheckedException {
- assert rowSize <= getFreeSpace(buf): "can't call addRow if not enough space for the whole row";
+ assert rowSize <= getFreeSpace(pageAddr): "can't call addRow if not enough space for the whole row";
int fullEntrySize = getPageEntrySize(rowSize, SHOW_PAYLOAD_LEN | SHOW_ITEM);
- int directCnt = getDirectCount(buf);
- int indirectCnt = getIndirectCount(buf);
+ int directCnt = getDirectCount(pageAddr);
+ int indirectCnt = getIndirectCount(pageAddr);
- int dataOff = getDataOffsetForWrite(buf, fullEntrySize, directCnt, indirectCnt);
+ int dataOff = getDataOffsetForWrite(pageAddr, fullEntrySize, directCnt, indirectCnt, pageSize);
- writeRowData(buf, dataOff, rowSize, row);
+ writeRowData(pageAddr, dataOff, rowSize, row);
- int itemId = addItem(buf, fullEntrySize, directCnt, indirectCnt, dataOff);
+ int itemId = addItem(pageAddr, fullEntrySize, directCnt, indirectCnt, dataOff, pageSize);
- setLink(row, buf, itemId);
+ setLink(row, pageAddr, itemId);
}
/**
* Adds row to this data page and sets respective link to the given row object.
*
- * @param buf Buffer.
+ * @param pageAddr Page address.
+ * @param payload Payload.
+ * @param pageSize Page size.
* @throws IgniteCheckedException If failed.
*/
public void addRow(
- ByteBuffer buf,
- byte[] payload
+ long pageAddr,
+ byte[] payload,
+ int pageSize
) throws IgniteCheckedException {
- assert payload.length <= getFreeSpace(buf): "can't call addRow if not enough space for the whole row";
+ assert payload.length <= getFreeSpace(pageAddr): "can't call addRow if not enough space for the whole row";
int fullEntrySize = getPageEntrySize(payload.length, SHOW_PAYLOAD_LEN | SHOW_ITEM);
- int directCnt = getDirectCount(buf);
- int indirectCnt = getIndirectCount(buf);
+ int directCnt = getDirectCount(pageAddr);
+ int indirectCnt = getIndirectCount(pageAddr);
- int dataOff = getDataOffsetForWrite(buf, fullEntrySize, directCnt, indirectCnt);
+ int dataOff = getDataOffsetForWrite(pageAddr, fullEntrySize, directCnt, indirectCnt, pageSize);
- writeRowData(buf, dataOff, payload);
+ writeRowData(pageAddr, dataOff, payload);
- addItem(buf, fullEntrySize, directCnt, indirectCnt, dataOff);
+ addItem(pageAddr, fullEntrySize, directCnt, indirectCnt, dataOff, pageSize);
}
/**
- * @param buf Byte buffer.
+ * @param pageAddr Page address.
* @param entryFullSize New entry full size (with item, length and link).
* @param directCnt Direct items count.
* @param indirectCnt Indirect items count.
* @param dataOff First entry offset.
+ * @param pageSize Page size.
* @return First entry offset after compaction.
*/
private int compactIfNeed(
- final ByteBuffer buf,
+ final long pageAddr,
final int entryFullSize,
final int directCnt,
final int indirectCnt,
- int dataOff
+ int dataOff,
+ int pageSize
) {
if (!isEnoughSpace(entryFullSize, dataOff, directCnt, indirectCnt)) {
- dataOff = compactDataEntries(buf, directCnt);
+ dataOff = compactDataEntries(pageAddr, directCnt, pageSize);
assert isEnoughSpace(entryFullSize, dataOff, directCnt, indirectCnt);
}
@@ -773,41 +790,49 @@ public class DataPageIO extends PageIO {
/**
* Put item reference on entry.
*
- * @param buf Byte buffer.
+ * @param pageAddr Page address.
* @param fullEntrySize Full entry size (with link, payload size and item).
* @param directCnt Direct items count.
* @param indirectCnt Indirect items count.
* @param dataOff Data offset.
+ * @param pageSize Page size.
* @return Item ID.
*/
- private int addItem(final ByteBuffer buf, final int fullEntrySize, final int directCnt,
- final int indirectCnt, final int dataOff) {
- setFirstEntryOffset(buf, dataOff);
+ private int addItem(final long pageAddr,
+ final int fullEntrySize,
+ final int directCnt,
+ final int indirectCnt,
+ final int dataOff,
+ final int pageSize)
+ {
+ setFirstEntryOffset(pageAddr, dataOff, pageSize);
- int itemId = insertItem(buf, dataOff, directCnt, indirectCnt);
+ int itemId = insertItem(pageAddr, dataOff, directCnt, indirectCnt, pageSize);
assert checkIndex(itemId): itemId;
- assert getIndirectCount(buf) <= getDirectCount(buf);
+ assert getIndirectCount(pageAddr) <= getDirectCount(pageAddr);
// Update free space. If number of indirect items changed, then we were able to reuse an item slot.
- setRealFreeSpace(buf, getRealFreeSpace(buf) - fullEntrySize +
- (getIndirectCount(buf) != indirectCnt ? ITEM_SIZE : 0));
+ setRealFreeSpace(pageAddr,
+ getRealFreeSpace(pageAddr) - fullEntrySize + (getIndirectCount(pageAddr) != indirectCnt ? ITEM_SIZE : 0),
+ pageSize);
return itemId;
}
/**
- * @param buf Buffer.
+ * @param pageAddr Page address.
* @param fullEntrySize Full entry size.
* @param directCnt Direct items count.
* @param indirectCnt Indirect items count.
+ * @param pageSize Page size.
* @return Offset in the buffer where the entry must be written.
*/
- private int getDataOffsetForWrite(ByteBuffer buf, int fullEntrySize, int directCnt, int indirectCnt) {
- int dataOff = getFirstEntryOffset(buf);
+ private int getDataOffsetForWrite(long pageAddr, int fullEntrySize, int directCnt, int indirectCnt, int pageSize) {
+ int dataOff = getFirstEntryOffset(pageAddr);
// Compact if we do not have enough space for entry.
- dataOff = compactIfNeed(buf, fullEntrySize, directCnt, indirectCnt, dataOff);
+ dataOff = compactIfNeed(pageAddr, fullEntrySize, directCnt, indirectCnt, dataOff, pageSize);
// We will write data right before the first entry.
dataOff -= fullEntrySize - ITEM_SIZE;
@@ -818,91 +843,105 @@ public class DataPageIO extends PageIO {
/**
* Adds maximum possible fragment of the given row to this data page and sets respective link to the row.
*
- * @param buf Byte buffer.
+ * @param pageMem Page memory.
+ * @param pageAddr Page address.
* @param row Cache data row.
* @param written Number of bytes of row size that was already written.
* @param rowSize Row size.
+ * @param pageSize Page size.
* @return Written payload size.
* @throws IgniteCheckedException If failed.
*/
public int addRowFragment(
- ByteBuffer buf,
+ PageMemory pageMem,
+ long pageAddr,
CacheDataRow row,
int written,
- int rowSize
+ int rowSize,
+ int pageSize
) throws IgniteCheckedException {
- return addRowFragment(buf, written, rowSize, row.link(), row, null);
+ return addRowFragment(pageMem, pageAddr, written, rowSize, row.link(), row, null, pageSize);
}
/**
* Adds this payload as a fragment to this data page.
*
- * @param buf Byte buffer.
+ * @param pageAddr Page address.
* @param payload Payload bytes.
* @param lastLink Link to the previous written fragment (link to the tail).
+ * @param pageSize Page size.
* @throws IgniteCheckedException If failed.
*/
public void addRowFragment(
- ByteBuffer buf,
+ long pageAddr,
byte[] payload,
- long lastLink
+ long lastLink,
+ int pageSize
) throws IgniteCheckedException {
- addRowFragment(buf, 0, 0, lastLink, null, payload);
+ addRowFragment(null, pageAddr, 0, 0, lastLink, null, payload, pageSize);
}
/**
* Adds maximum possible fragment of the given row to this data page and sets respective link to the row.
*
- * @param buf Byte buffer.
+ * @param pageMem Page memory.
+ * @param pageAddr Page address.
* @param written Number of bytes of row size that was already written.
* @param rowSize Row size.
* @param lastLink Link to the previous written fragment (link to the tail).
* @param row Row.
* @param payload Payload bytes.
+ * @param pageSize Page size.
* @return Written payload size.
* @throws IgniteCheckedException If failed.
*/
private int addRowFragment(
- ByteBuffer buf,
+ PageMemory pageMem,
+ long pageAddr,
int written,
int rowSize,
long lastLink,
CacheDataRow row,
- byte[] payload
+ byte[] payload,
+ int pageSize
) throws IgniteCheckedException {
assert payload == null ^ row == null;
- int directCnt = getDirectCount(buf);
- int indirectCnt = getIndirectCount(buf);
+ int directCnt = getDirectCount(pageAddr);
+ int indirectCnt = getIndirectCount(pageAddr);
int payloadSize = payload != null ? payload.length :
- Math.min(rowSize - written, getFreeSpace(buf));
+ Math.min(rowSize - written, getFreeSpace(pageAddr));
int fullEntrySize = getPageEntrySize(payloadSize, SHOW_PAYLOAD_LEN | SHOW_LINK | SHOW_ITEM);
- int dataOff = getDataOffsetForWrite(buf, fullEntrySize, directCnt, indirectCnt);
+ int dataOff = getDataOffsetForWrite(pageAddr, fullEntrySize, directCnt, indirectCnt, pageSize);
+
+ if (payload == null) {
+ ByteBuffer buf = pageMem.pageBuffer(pageAddr);
- try {
buf.position(dataOff);
- buf.putShort((short) (payloadSize | FRAGMENTED_FLAG));
+ short p = (short)(payloadSize | FRAGMENTED_FLAG);
+
+ buf.putShort(p);
buf.putLong(lastLink);
- if (payload == null) {
- int rowOff = rowSize - written - payloadSize;
+ int rowOff = rowSize - written - payloadSize;
- writeFragmentData(row, buf, rowOff, payloadSize);
- }
- else
- buf.put(payload);
+ writeFragmentData(row, buf, rowOff, payloadSize);
}
- finally {
- buf.position(0);
+ else {
+ PageUtils.putShort(pageAddr, dataOff, (short)(payloadSize | FRAGMENTED_FLAG));
+
+ PageUtils.putLong(pageAddr, dataOff + 2, lastLink);
+
+ PageUtils.putBytes(pageAddr, dataOff + 10, payload);
}
- int itemId = addItem(buf, fullEntrySize, directCnt, indirectCnt, dataOff);
+ int itemId = addItem(pageAddr, fullEntrySize, directCnt, indirectCnt, dataOff, pageSize);
if (row != null)
- setLink(row, buf, itemId);
+ setLink(row, pageAddr, itemId);
return payloadSize;
}
@@ -912,7 +951,7 @@ public class DataPageIO extends PageIO {
* @param buf Page buffer.
* @param itemId Item ID.
*/
- private void setLink(CacheDataRow row, ByteBuffer buf, int itemId) {
+ private void setLink(CacheDataRow row, long buf, int itemId) {
row.link(PageIdUtils.link(getPageId(buf), itemId));
}
@@ -1083,54 +1122,56 @@ public class DataPageIO extends PageIO {
}
/**
- * @param buf Buffer.
+ * @param pageAddr Page address.
* @param dataOff Data offset.
* @param directCnt Direct items count.
* @param indirectCnt Indirect items count.
+ * @param pageSize Page size.
* @return Item ID (insertion index).
*/
- private int insertItem(ByteBuffer buf, int dataOff, int directCnt, int indirectCnt) {
+ private int insertItem(long pageAddr, int dataOff, int directCnt, int indirectCnt, int pageSize) {
if (indirectCnt > 0) {
// If the first indirect item is on correct place to become the last direct item, do the transition
// and insert the new item into the free slot which was referenced by this first indirect item.
- short item = getItem(buf, directCnt);
+ short item = getItem(pageAddr, directCnt);
if (itemId(item) == directCnt) {
int directItemIdx = directItemIndex(item);
- setItem(buf, directCnt, getItem(buf, directItemIdx));
- setItem(buf, directItemIdx, directItemFromOffset(dataOff));
+ setItem(pageAddr, directCnt, getItem(pageAddr, directItemIdx));
+ setItem(pageAddr, directItemIdx, directItemFromOffset(dataOff));
- setDirectCount(buf, directCnt + 1);
- setIndirectCount(buf, indirectCnt - 1);
+ setDirectCount(pageAddr, directCnt + 1);
+ setIndirectCount(pageAddr, indirectCnt - 1);
return directItemIdx;
}
}
// Move all the indirect items forward to make a free slot and insert new item at the end of direct items.
- moveItems(buf, directCnt, indirectCnt, +1);
+ moveItems(pageAddr, directCnt, indirectCnt, +1, pageSize);
- setItem(buf, directCnt, directItemFromOffset(dataOff));
+ setItem(pageAddr, directCnt, directItemFromOffset(dataOff));
- setDirectCount(buf, directCnt + 1);
- assert getDirectCount(buf) == directCnt + 1;
+ setDirectCount(pageAddr, directCnt + 1);
+ assert getDirectCount(pageAddr) == directCnt + 1;
return directCnt; // Previous directCnt will be our itemId.
}
/**
- * @param buf Buffer.
+ * @param pageAddr Page address.
* @param directCnt Direct items count.
+ * @param pageSize Page size.
* @return New first entry offset.
*/
- private int compactDataEntries(ByteBuffer buf, int directCnt) {
+ private int compactDataEntries(long pageAddr, int directCnt, int pageSize) {
assert checkCount(directCnt): directCnt;
int[] offs = new int[directCnt];
for (int i = 0; i < directCnt; i++) {
- int off = directItemToOffset(getItem(buf, i));
+ int off = directItemToOffset(getItem(pageAddr, i));
offs[i] = (off << 8) | i; // This way we'll be able to sort by offset using Arrays.sort(...).
}
@@ -1138,27 +1179,27 @@ public class DataPageIO extends PageIO {
Arrays.sort(offs);
// Move right all of the entries if possible to make the page as compact as possible to its tail.
- int prevOff = buf.capacity();
+ int prevOff = pageSize;
for (int i = directCnt - 1; i >= 0; i--) {
int off = offs[i] >>> 8;
assert off < prevOff: off;
- int entrySize = getPageEntrySize(buf, off, SHOW_PAYLOAD_LEN | SHOW_LINK);
+ int entrySize = getPageEntrySize(pageAddr, off, SHOW_PAYLOAD_LEN | SHOW_LINK);
int delta = prevOff - (off + entrySize);
if (delta != 0) { // Move right.
assert delta > 0: delta;
- moveBytes(buf, off, entrySize, delta);
+ moveBytes(pageAddr, off, entrySize, delta, pageSize);
int itemId = offs[i] & 0xFF;
off += delta;
- setItem(buf, itemId, directItemFromOffset(off));
+ setItem(pageAddr, itemId, directItemFromOffset(off));
}
prevOff = off;
@@ -1170,94 +1211,82 @@ public class DataPageIO extends PageIO {
/**
* Full-scan free space calculation procedure.
*
- * @param buf Buffer to scan.
+ * @param pageAddr Page to scan.
+ * @param pageSize Page size.
* @return Actual free space in the buffer.
*/
- private int actualFreeSpace(ByteBuffer buf) {
- int directCnt = getDirectCount(buf);
+ private int actualFreeSpace(long pageAddr, int pageSize) {
+ int directCnt = getDirectCount(pageAddr);
int entriesSize = 0;
for (int i = 0; i < directCnt; i++) {
- int off = directItemToOffset(getItem(buf, i));
+ int off = directItemToOffset(getItem(pageAddr, i));
- int entrySize = getPageEntrySize(buf, off, SHOW_PAYLOAD_LEN | SHOW_LINK);
+ int entrySize = getPageEntrySize(pageAddr, off, SHOW_PAYLOAD_LEN | SHOW_LINK);
entriesSize += entrySize;
}
- return buf.capacity() - ITEMS_OFF - entriesSize - (directCnt + getIndirectCount(buf)) * ITEM_SIZE;
+ return pageSize - ITEMS_OFF - entriesSize - (directCnt + getIndirectCount(pageAddr)) * ITEM_SIZE;
}
/**
- * @param buf Buffer.
+ * @param addr Address.
* @param off Offset.
* @param cnt Count.
* @param step Step.
+ * @param pageSize Page size.
*/
- private void moveBytes(ByteBuffer buf, int off, int cnt, int step) {
+ private void moveBytes(long addr, int off, int cnt, int step, int pageSize) {
assert step != 0: step;
assert off + step >= 0;
- assert off + step + cnt <= buf.capacity() : "[off=" + off + ", step=" + step + ", cnt=" + cnt +
- ", cap=" + buf.capacity() + ']';
+ assert off + step + cnt <= pageSize : "[off=" + off + ", step=" + step + ", cnt=" + cnt +
+ ", cap=" + pageSize + ']';
- PageHandler.copyMemory(buf, buf, off, off + step, cnt);
+ PageHandler.copyMemory(addr, addr, off, off + step, cnt);
}
/**
- * @param buf Buffer.
+ * @param pageAddr Page address.
* @param dataOff Data offset.
* @param payloadSize Payload size.
* @param row Data row.
* @throws IgniteCheckedException If failed.
*/
private void writeRowData(
- ByteBuffer buf,
+ long pageAddr,
int dataOff,
int payloadSize,
CacheDataRow row
) throws IgniteCheckedException {
- try {
- buf.position(dataOff);
+ long addr = pageAddr + dataOff;
- buf.putShort((short)payloadSize);
+ PageUtils.putShort(addr, 0, (short)payloadSize);
+ addr += 2;
- boolean ok = row.key().putValue(buf);
+ addr += row.key().putValue(addr);
+ addr += row.value().putValue(addr);
- assert ok;
+ CacheVersionIO.write(addr, row.version(), false);
+ addr += CacheVersionIO.size(row.version(), false);
- ok = row.value().putValue(buf);
-
- assert ok;
-
- CacheVersionIO.write(buf, row.version(), false);
-
- buf.putLong(row.expireTime());
- }
- finally {
- buf.position(0);
- }
+ PageUtils.putLong(addr, 0, row.expireTime());
}
/**
- * @param buf Buffer.
+ * @param pageAddr Page address.
* @param dataOff Data offset.
* @param payload Payload
*/
private void writeRowData(
- ByteBuffer buf,
+ long pageAddr,
int dataOff,
byte[] payload
) {
- try {
- buf.position(dataOff);
-
- buf.putShort((short)payload.length);
+ PageUtils.putShort(pageAddr, dataOff, (short)payload.length);
+ dataOff += 2;
- buf.put(payload);
- }
- finally {
- buf.position(0);
- }
+ PageUtils.putBytes(pageAddr, dataOff, payload);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7db65ddd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/DataPagePayload.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/DataPagePayload.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/DataPagePayload.java
new file mode 100644
index 0000000..7dedc00
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/DataPagePayload.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.database.tree.io;
+
+/**
+ *
+ */
+public class DataPagePayload {
+ /** */
+ private final int off;
+
+ /** */
+ private final int payloadSize;
+
+ /** */
+ private final long nextLink;
+
+ /**
+ * @param off Offset.
+ * @param payloadSize Payload size.
+ * @param nextLink Next link.
+ */
+ DataPagePayload(int off, int payloadSize, long nextLink) {
+ this.off = off;
+ this.payloadSize = payloadSize;
+ this.nextLink = nextLink;
+ }
+
+ /**
+ * @return Offset.
+ */
+ public int offset() {
+ return off;
+ }
+
+ /**
+ * @return Payload size.
+ */
+ public int payloadSize() {
+ return payloadSize;
+ }
+
+ /**
+ * @return Next link.
+ */
+ public long nextLink() {
+ return nextLink;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7db65ddd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/IOVersions.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/IOVersions.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/IOVersions.java
index bd1139d..428cb3f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/IOVersions.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/IOVersions.java
@@ -17,8 +17,6 @@
package org.apache.ignite.internal.processors.cache.database.tree.io;
-import java.nio.ByteBuffer;
-
/**
* Registry for IO versions.
*/
@@ -80,16 +78,16 @@ public final class IOVersions<V extends PageIO> {
}
/**
- * @param buf Buffer.
+ * @param pageAddr Page address.
* @return IO.
*/
- public V forPage(ByteBuffer buf) {
- int ver = PageIO.getVersion(buf);
+ public V forPage(long pageAddr) {
+ int ver = PageIO.getVersion(pageAddr);
V res = forVersion(ver);
- assert res.getType() == PageIO.getType(buf) : "resType=" + res.getType() +
- ", pageType=" + PageIO.getType(buf);
+ assert res.getType() == PageIO.getType(pageAddr) : "resType=" + res.getType() +
+ ", pageType=" + PageIO.getType(pageAddr);
return res;
}