You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/10/09 11:42:55 UTC
ignite git commit: ignite-5937 Added BPlusTree.iterate for more
optimal mvcc search
Repository: ignite
Updated Branches:
refs/heads/ignite-3478 921404a6f -> fd53c1a8f
ignite-5937 Added BPlusTree.iterate for more optimal mvcc search
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fd53c1a8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fd53c1a8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fd53c1a8
Branch: refs/heads/ignite-3478
Commit: fd53c1a8f4e905a7aba469eb5decf38c50b7708e
Parents: 921404a
Author: sboikov <sb...@gridgain.com>
Authored: Mon Oct 9 14:42:43 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Oct 9 14:42:43 2017 +0300
----------------------------------------------------------------------
.../cache/IgniteCacheOffheapManager.java | 11 +-
.../cache/IgniteCacheOffheapManagerImpl.java | 244 +++++++----
.../cache/mvcc/CacheCoordinatorsProcessor.java | 12 +-
.../cache/persistence/tree/BPlusTree.java | 416 ++++++++++++++-----
.../cache/tree/AbstractDataInnerIO.java | 8 +-
.../cache/tree/AbstractDataLeafIO.java | 8 +-
.../processors/cache/tree/CacheDataTree.java | 10 +-
.../cache/tree/CacheIdAwareDataInnerIO.java | 4 +-
.../cache/tree/CacheIdAwareDataLeafIO.java | 4 +-
.../processors/cache/tree/DataInnerIO.java | 4 +-
.../processors/cache/tree/DataLeafIO.java | 4 +-
.../processors/cache/tree/MvccDataInnerIO.java | 4 +-
.../processors/cache/tree/MvccDataLeafIO.java | 4 +-
.../cache/tree/MvccKeyMaxVersionBound.java | 77 ++++
.../cache/tree/MvccKeyMinVersionBound.java | 49 +++
.../processors/cache/tree/MvccUpdateRow.java | 177 ++++++++
.../cache/tree/MvccVersionBasedSearchRow.java | 100 +++++
.../processors/cache/tree/RowLinkIO.java | 14 +-
.../cache/mvcc/CacheMvccTransactionsTest.java | 149 +++++++
.../processors/database/BPlusTreeSelfTest.java | 291 ++++++++++++-
20 files changed, 1376 insertions(+), 214 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/fd53c1a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index 9d03e4a..8967ce8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -492,7 +492,16 @@ public interface IgniteCacheOffheapManager {
long expireTime,
@Nullable CacheDataRow oldRow) throws IgniteCheckedException;
- GridLongList mvccUpdate(
+ /**
+ * @param cctx Cache context.
+ * @param key Key.
+ * @param val Value.
+ * @param ver Version.
+ * @param mvccVer Mvcc version.
+ * @return List of transactions to wait for.
+ * @throws IgniteCheckedException If failed.
+ */
+ @Nullable GridLongList mvccUpdate(
GridCacheContext cctx,
KeyCacheObject key,
CacheObject val,
http://git-wip-us.apache.org/repos/asf/ignite/blob/fd53c1a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index d8c5eaa..25f36b2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -55,7 +55,11 @@ import org.apache.ignite.internal.processors.cache.tree.CacheDataRowStore;
import org.apache.ignite.internal.processors.cache.tree.CacheDataTree;
import org.apache.ignite.internal.processors.cache.tree.DataRow;
import org.apache.ignite.internal.processors.cache.tree.MvccDataRow;
+import org.apache.ignite.internal.processors.cache.tree.MvccKeyMaxVersionBound;
+import org.apache.ignite.internal.processors.cache.tree.MvccKeyMinVersionBound;
import org.apache.ignite.internal.processors.cache.tree.MvccSearchRow;
+import org.apache.ignite.internal.processors.cache.tree.MvccUpdateRow;
+import org.apache.ignite.internal.processors.cache.tree.MvccVersionBasedSearchRow;
import org.apache.ignite.internal.processors.cache.tree.PendingEntriesTree;
import org.apache.ignite.internal.processors.cache.tree.PendingRow;
import org.apache.ignite.internal.processors.cache.tree.SearchRow;
@@ -1360,83 +1364,141 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
try {
int cacheId = grp.storeCacheIdInDataPage() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID;
- MvccDataRow dataRow = new MvccDataRow(
- key,
- val,
- ver,
- partId,
- cacheId,
- mvccVer.coordinatorVersion(),
- mvccVer.counter());
-
CacheObjectContext coCtx = cctx.cacheObjectContext();
// Make sure value bytes initialized.
key.valueBytes(coCtx);
val.valueBytes(coCtx);
- rowStore.addRow(dataRow);
+ if (true) {
+ MvccUpdateRow updateRow = new MvccUpdateRow(
+ key,
+ val,
+ ver,
+ mvccVer,
+ partId,
+ cacheId);
- assert dataRow.link() != 0 : dataRow;
+ rowStore.addRow(updateRow);
- if (grp.sharedGroup() && dataRow.cacheId() == CU.UNDEFINED_CACHE_ID)
- dataRow.cacheId(cctx.cacheId());
+ assert updateRow.link() != 0 : updateRow;
- boolean old = dataTree.putx(dataRow);
+ if (grp.sharedGroup() && updateRow.cacheId() == CU.UNDEFINED_CACHE_ID)
+ updateRow.cacheId(cctx.cacheId());
- assert !old;
+ GridLongList waitTxs = null;
- GridLongList waitTxs = null;
+ if (mvccVer.initialLoad()) {
+ boolean old = dataTree.putx(updateRow);
- if (!mvccVer.initialLoad()) {
- MvccLongList activeTxs = mvccVer.activeTransactions();
+ assert !old;
- // TODO IGNITE-3484: need special method.
- GridCursor<CacheDataRow> cur = dataTree.find(
- new MvccSearchRow(cacheId, key, mvccVer.coordinatorVersion(), mvccVer.counter() - 1),
- new MvccSearchRow(cacheId, key, 1, 1));
+ incrementSize(cctx.cacheId());
+ }
+ else {
+ dataTree.iterate(updateRow, new MvccKeyMinVersionBound(cacheId, key), updateRow);
- boolean first = true;
+ boolean old = dataTree.putx(updateRow);
- boolean activeTx = false;
+ assert !old;
- while (cur.next()) {
- CacheDataRow oldVal = cur.get();
+ if (!updateRow.previousNotNull())
+ incrementSize(cctx.cacheId());
- assert oldVal.link() != 0 : oldVal;
+ waitTxs = updateRow.activeTransactions();
- if (activeTxs != null && oldVal.mvccCoordinatorVersion() == mvccVer.coordinatorVersion() &&
- activeTxs.contains(oldVal.mvccCounter())) {
- if (waitTxs == null)
- waitTxs = new GridLongList();
+ List<CacheSearchRow> cleanupRows = updateRow.cleanupRows();
- assert oldVal.mvccCounter() != mvccVer.counter();
+ if (cleanupRows != null) {
+ for (int i = 0; i < cleanupRows.size(); i++) {
+ CacheSearchRow oldRow = cleanupRows.get(i);
- waitTxs.add(oldVal.mvccCounter());
+ assert oldRow.link() != 0L : oldRow;
- activeTx = true;
+ boolean rmvd = dataTree.removex(oldRow);
+
+ assert rmvd;
+
+ rowStore.removeRow(oldRow.link());
+ }
}
+ }
- if (!activeTx) {
- // Should not delete oldest version which is less than cleanup version.
- int cmp = compare(oldVal, mvccVer.coordinatorVersion(), mvccVer.cleanupVersion());
+ return waitTxs;
+ }
+ else {
+ MvccDataRow dataRow = new MvccDataRow(
+ key,
+ val,
+ ver,
+ partId,
+ cacheId,
+ mvccVer.coordinatorVersion(),
+ mvccVer.counter());
- if (cmp <= 0) {
- if (first)
- first = false;
- else {
- boolean rmvd = dataTree.removex(oldVal);
+ rowStore.addRow(dataRow);
- assert rmvd;
+ assert dataRow.link() != 0 : dataRow;
+
+ if (grp.sharedGroup() && dataRow.cacheId() == CU.UNDEFINED_CACHE_ID)
+ dataRow.cacheId(cctx.cacheId());
+
+ boolean old = dataTree.putx(dataRow);
+
+ assert !old;
+
+ GridLongList waitTxs = null;
+
+ if (!mvccVer.initialLoad()) {
+ MvccLongList activeTxs = mvccVer.activeTransactions();
+
+ // TODO IGNITE-3484: need special method.
+ GridCursor<CacheDataRow> cur = dataTree.find(
+ new MvccSearchRow(cacheId, key, mvccVer.coordinatorVersion(), mvccVer.counter() - 1),
+ new MvccSearchRow(cacheId, key, 1, 1));
+
+ boolean first = true;
+
+ boolean activeTx = false;
+
+ while (cur.next()) {
+ CacheDataRow oldVal = cur.get();
+
+ assert oldVal.link() != 0 : oldVal;
+
+ if (activeTxs != null && oldVal.mvccCoordinatorVersion() == mvccVer.coordinatorVersion() &&
+ activeTxs.contains(oldVal.mvccCounter())) {
+ if (waitTxs == null)
+ waitTxs = new GridLongList();
+
+ assert oldVal.mvccCounter() != mvccVer.counter();
+
+ waitTxs.add(oldVal.mvccCounter());
+
+ activeTx = true;
+ }
- rowStore.removeRow(oldVal.link());
+ if (!activeTx) {
+ // Should not delete oldest version which is less than cleanup version.
+ int cmp = compare(oldVal, mvccVer.coordinatorVersion(), mvccVer.cleanupVersion());
+
+ if (cmp <= 0) {
+ if (first)
+ first = false;
+ else {
+ boolean rmvd = dataTree.removex(oldVal);
+
+ assert rmvd;
+
+ rowStore.removeRow(oldVal.link());
+ }
}
}
}
}
- }
- return waitTxs;
+ return waitTxs;
+ }
}
finally {
busyLock.leaveBusy();
@@ -1647,14 +1709,26 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
CacheDataRow row;
if (grp.mvccEnabled()) {
- // TODO IGNITE-3484: need special method.
- GridCursor<CacheDataRow> cur = dataTree.find(new MvccSearchRow(cacheId, key, Long.MAX_VALUE, Long.MAX_VALUE),
- new MvccSearchRow(cacheId, key, 1, 1));
+ if (true) {
+ MvccKeyMaxVersionBound searchRow = new MvccKeyMaxVersionBound(cacheId, key);
+
+ dataTree.iterate(
+ searchRow,
+ new MvccKeyMinVersionBound(cacheId, key),
+ searchRow // Use the same instance as closure to do not create extra object.
+ );
- if (cur.next())
- row = cur.get();
- else
- row = null;
+ row = searchRow.row();
+ }
+ else {
+ GridCursor<CacheDataRow> cur = dataTree.find(new MvccSearchRow(cacheId, key, Long.MAX_VALUE, Long.MAX_VALUE),
+ new MvccSearchRow(cacheId, key, 1, 1));
+
+ if (cur.next())
+ row = cur.get();
+ else
+ row = null;
+ }
}
else
row = dataTree.findOne(new SearchRow(cacheId, key), CacheDataRowAdapter.RowData.NO_KEY);
@@ -1672,6 +1746,8 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
{
assert grp.mvccEnabled();
+ // Note: this method is intended for testing only.
+
key.valueBytes(cctx.cacheObjectContext());
int cacheId = grp.sharedGroup() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID;
@@ -1705,41 +1781,55 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
int cacheId = grp.sharedGroup() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID;
- // TODO IGNITE-3484: need special method.
- GridCursor<CacheDataRow> cur = dataTree.find(
- new MvccSearchRow(cacheId, key, ver.coordinatorVersion(), ver.counter()),
- new MvccSearchRow(cacheId, key, 1, 1));
+ if (true) {
+ MvccVersionBasedSearchRow lower = new MvccVersionBasedSearchRow(cacheId, key, ver);
- CacheDataRow row = null;
+ dataTree.iterate(
+ lower,
+ new MvccKeyMinVersionBound(cacheId, key),
+ lower // Use the same instance as closure to do not create extra object.
+ );
- MvccLongList txs = ver.activeTransactions();
+ CacheDataRow row = lower.row();
- while (cur.next()) {
- CacheDataRow row0 = cur.get();
+ afterRowFound(row, key);
- assert row0.mvccCoordinatorVersion() > 0 : row0;
+ return row;
+ }
+ else {
+ GridCursor<CacheDataRow> cur = dataTree.find(
+ new MvccSearchRow(cacheId, key, ver.coordinatorVersion(), ver.counter()),
+ new MvccSearchRow(cacheId, key, 1, 1));
- boolean visible;
+ CacheDataRow row = null;
- if (txs != null) {
- visible = row0.mvccCoordinatorVersion() != ver.coordinatorVersion()
- || !txs.contains(row0.mvccCounter());
- }
- else
- visible = true;
+ MvccLongList txs = ver.activeTransactions();
- if (visible) {
- row = row0;
+ while (cur.next()) {
+ CacheDataRow row0 = cur.get();
- break;
- }
- }
+ assert row0.mvccCoordinatorVersion() > 0 : row0;
- assert row == null || key.equals(row.key());
+ boolean visible;
- //afterRowFound(row, key);
+ if (txs != null) {
+ visible = row0.mvccCoordinatorVersion() != ver.coordinatorVersion()
+ || !txs.contains(row0.mvccCounter());
+ }
+ else
+ visible = true;
- return row;
+ if (visible) {
+ row = row0;
+
+ break;
+ }
+ }
+
+ assert row == null || key.equals(row.key());
+
+ return row;
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/fd53c1a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
index 5080c83..b9b8ea1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
@@ -614,8 +614,14 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
// TODO IGNITE-3478 sorted? + change GridLongList.writeTo?
MvccCoordinatorVersionResponse res = new MvccCoordinatorVersionResponse();
- for (Long txVer : activeTxs.keySet())
+ long minActive = Long.MAX_VALUE;
+
+ for (Long txVer : activeTxs.keySet()) {
+ if (txVer < minActive)
+ minActive = txVer;
+
res.addTx(txVer);
+ }
Object old = activeTxs.put(nextCtr, txId);
@@ -624,7 +630,9 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
long cleanupVer;
if (prevCrdQueries.previousQueriesDone()) {
- cleanupVer = committedCntr.get() - 1;
+ cleanupVer = Math.min(minActive, committedCntr.get());
+
+ cleanupVer--;
Long qryVer = activeQueries.minimalQueryCounter();
http://git-wip-us.apache.org/repos/asf/ignite/blob/fd53c1a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
index c73b4c7..b31a61f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
@@ -907,7 +907,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
long pageAddr = readLock(firstPageId, firstPage); // We always merge pages backwards, the first page is never removed.
try {
- cursor.init(pageAddr, io(pageAddr), 0);
+ cursor.init(pageAddr, io(pageAddr), -1);
}
finally {
readUnlock(firstPageId, firstPage, pageAddr);
@@ -972,6 +972,34 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
}
}
+ /**
+ * @param lower Lower bound inclusive.
+ * @param upper Upper bound inclusive.
+ * @param c Closure applied for all found items, iteration is stopped if closure returns {@code false}.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void iterate(L lower, L upper, TreeRowClosure<L, T> c) throws IgniteCheckedException {
+ checkDestroyed();
+
+ try {
+ ClosureCursor cursor = new ClosureCursor(lower, upper, c);
+
+ cursor.iterate();
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteCheckedException("Runtime failure on bounds: [lower=" + lower + ", upper=" + upper + "]", e);
+ }
+ catch (RuntimeException e) {
+ throw new IgniteException("Runtime failure on bounds: [lower=" + lower + ", upper=" + upper + "]", e);
+ }
+ catch (AssertionError e) {
+ throw new AssertionError("Assertion error on bounds: [lower=" + lower + ", upper=" + upper + "]", e);
+ }
+ finally {
+ checkDestroyed();
+ }
+ }
+
/** {@inheritDoc} */
@Override public T findFirst() throws IgniteCheckedException {
checkDestroyed();
@@ -2509,14 +2537,14 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
*/
private final class GetCursor extends Get {
/** */
- ForwardCursor cursor;
+ AbstractForwardCursor cursor;
/**
* @param lower Lower bound.
* @param shift Shift.
* @param cursor Cursor.
*/
- GetCursor(L lower, int shift, ForwardCursor cursor) {
+ GetCursor(L lower, int shift, AbstractForwardCursor cursor) {
super(lower, false);
assert shift != 0; // Either handle range of equal rows or find a greater row after concurrent merge.
@@ -4385,51 +4413,57 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
protected abstract T getRow(BPlusIO<L> io, long pageAddr, int idx, Object x) throws IgniteCheckedException;
/**
- * Forward cursor.
+ *
*/
@SuppressWarnings("unchecked")
- private final class ForwardCursor implements GridCursor<T> {
- /** */
- private T[] rows = (T[])EMPTY;
-
- /** */
- private int row = -1;
-
+ private abstract class AbstractForwardCursor {
/** */
- private long nextPageId;
+ long nextPageId;
/** */
- private L lowerBound;
+ L lowerBound;
/** */
private int lowerShift = -1; // Initially it is -1 to handle multiple equal rows.
/** */
- private final L upperBound;
-
- /** */
- private final Object x;
+ final L upperBound;
/**
* @param lowerBound Lower bound.
* @param upperBound Upper bound.
*/
- ForwardCursor(L lowerBound, L upperBound) {
+ AbstractForwardCursor(L lowerBound, L upperBound) {
this.lowerBound = lowerBound;
this.upperBound = upperBound;
- this.x = null;
}
/**
- * @param lowerBound Lower bound.
- * @param upperBound Upper bound.
- * @param x Implementation specific argument, {@code null} always means that we need to return full detached data row.
+ *
*/
- ForwardCursor(L lowerBound, L upperBound, Object x) {
- this.lowerBound = lowerBound;
- this.upperBound = upperBound;
- this.x = x;
- }
+ abstract void init0();
+
+ /**
+ * @param pageAddr Page address.
+ * @param io IO.
+ * @param startIdx Start index.
+ * @param cnt Number of rows in the buffer.
+ * @return {@code true} If we were able to fetch rows from this page.
+ * @throws IgniteCheckedException If failed.
+ */
+ abstract boolean fillFromBuffer0(long pageAddr, BPlusIO<L> io, int startIdx, int cnt)
+ throws IgniteCheckedException;
+
+ /**
+ * @return {@code True} If we have rows to return after reading the next page.
+ * @throws IgniteCheckedException If failed.
+ */
+ abstract boolean reinitialize0() throws IgniteCheckedException;
+
+ /**
+ * @param readDone {@code True} if traversed all rows.
+ */
+ abstract void onNotFound(boolean readDone);
/**
* @param pageAddr Page address.
@@ -4437,9 +4471,10 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
* @param startIdx Start index.
* @throws IgniteCheckedException If failed.
*/
- private void init(long pageAddr, BPlusIO<L> io, int startIdx) throws IgniteCheckedException {
+ final void init(long pageAddr, BPlusIO<L> io, int startIdx) throws IgniteCheckedException {
nextPageId = 0;
- row = -1;
+
+ init0();
int cnt = io.getCount(pageAddr);
@@ -4447,16 +4482,10 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
if (cnt == 0) {
assert io.getForward(pageAddr) == 0L;
- rows = null;
- }
- else if (!fillFromBuffer(pageAddr, io, startIdx, cnt)) {
- if (rows != EMPTY) {
- assert rows.length > 0; // Otherwise it makes no sense to create an array.
-
- // Fake clear.
- rows[0] = null;
- }
+ onNotFound(true);
}
+ else if (!fillFromBuffer(pageAddr, io, startIdx, cnt))
+ onNotFound(false);
}
/**
@@ -4466,7 +4495,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
* @return Adjusted to lower bound start index.
* @throws IgniteCheckedException If failed.
*/
- private int findLowerBound(long pageAddr, BPlusIO<L> io, int cnt) throws IgniteCheckedException {
+ final int findLowerBound(long pageAddr, BPlusIO<L> io, int cnt) throws IgniteCheckedException {
assert io.isLeaf();
// Compare with the first row on the page.
@@ -4491,7 +4520,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
* @return Corrected number of rows with respect to upper bound.
* @throws IgniteCheckedException If failed.
*/
- private int findUpperBound(long pageAddr, BPlusIO<L> io, int low, int cnt) throws IgniteCheckedException {
+ final int findUpperBound(long pageAddr, BPlusIO<L> io, int low, int cnt) throws IgniteCheckedException {
assert io.isLeaf();
// Compare with the last row on the page.
@@ -4523,75 +4552,20 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
throws IgniteCheckedException {
assert io.isLeaf() : io;
assert cnt != 0 : cnt; // We can not see empty pages (empty tree handled in init).
- assert startIdx >= 0 : startIdx;
+ assert startIdx >= 0 || startIdx == -1: startIdx;
assert cnt >= startIdx;
checkDestroyed();
nextPageId = io.getForward(pageAddr);
- if (lowerBound != null && startIdx == 0)
- startIdx = findLowerBound(pageAddr, io, cnt);
-
- if (upperBound != null && cnt != startIdx)
- cnt = findUpperBound(pageAddr, io, startIdx, cnt);
-
- cnt -= startIdx;
-
- if (cnt == 0)
- return false;
-
- if (rows == EMPTY)
- rows = (T[])new Object[cnt];
-
- for (int i = 0; i < cnt; i++) {
- T r = getRow(io, pageAddr, startIdx + i, x);
-
- rows = GridArrays.set(rows, i, r);
- }
-
- GridArrays.clearTail(rows, cnt);
-
- return true;
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("SimplifiableIfStatement")
- @Override public boolean next() throws IgniteCheckedException {
- if (rows == null)
- return false;
-
- if (++row < rows.length && rows[row] != null) {
- clearLastRow(); // Allow to GC the last returned row.
-
- return true;
- }
-
- return nextPage();
- }
-
- /**
- * @return Cleared last row.
- */
- private T clearLastRow() {
- if (row == 0)
- return null;
-
- int last = row - 1;
-
- T r = rows[last];
-
- assert r != null;
-
- rows[last] = null;
-
- return r;
+ return fillFromBuffer0(pageAddr, io, startIdx, cnt);
}
/**
* @throws IgniteCheckedException If failed.
*/
- private void find() throws IgniteCheckedException {
+ final void find() throws IgniteCheckedException {
assert lowerBound != null;
doFind(new GetCursor(lowerBound, lowerShift, this));
@@ -4607,21 +4581,20 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
// to the previous lower bound.
find();
- return next();
+ return reinitialize0();
}
/**
+ * @param lastRow Last read row (to be used as new lower bound).
* @return {@code true} If we have rows to return after reading the next page.
* @throws IgniteCheckedException If failed.
*/
- private boolean nextPage() throws IgniteCheckedException {
- updateLowerBound(clearLastRow());
-
- row = 0;
+ final boolean nextPage(L lastRow) throws IgniteCheckedException {
+ updateLowerBound(lastRow);
for (;;) {
if (nextPageId == 0) {
- rows = null;
+ onNotFound(true);
return false; // Done.
}
@@ -4638,7 +4611,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
try {
BPlusIO<L> io = io(pageAddr);
- if (fillFromBuffer(pageAddr, io, 0, io.getCount(pageAddr)))
+ if (fillFromBuffer(pageAddr, io, -1, io.getCount(pageAddr)))
return true;
// Continue fetching forward.
@@ -4659,12 +4632,227 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
/**
* @param lower New exact lower bound.
*/
- private void updateLowerBound(T lower) {
+ private void updateLowerBound(L lower) {
if (lower != null) {
lowerShift = 1; // Now we have the full row an need to avoid duplicates.
lowerBound = lower; // Move the lower bound forward for further concurrent merge retries.
}
}
+ }
+
+ /**
+ * Closure cursor.
+ */
+ @SuppressWarnings("unchecked")
+ private final class ClosureCursor extends AbstractForwardCursor {
+ /** */
+ private final TreeRowClosure<L, T> p;
+
+ /** */
+ private L lastRow;
+
+ /**
+ * @param lowerBound Lower bound.
+ * @param upperBound Upper bound.
+ * @param p Row predicate.
+ */
+ ClosureCursor(L lowerBound, L upperBound, TreeRowClosure<L, T> p) {
+ super(lowerBound, upperBound);
+
+ assert lowerBound != null;
+ assert upperBound != null;
+ assert p != null;
+
+ this.p = p;
+ }
+
+ /** {@inheritDoc} */
+ @Override void init0() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override boolean fillFromBuffer0(long pageAddr, BPlusIO<L> io, int startIdx, int cnt)
+ throws IgniteCheckedException {
+ if (startIdx == -1) // TODO IGNITE-3478: startIdx == 0? can search twice for first item?
+ startIdx = findLowerBound(pageAddr, io, cnt);
+
+ if (cnt == startIdx)
+ return false;
+
+ for (int i = startIdx; i < cnt; i++) {
+ int cmp = compare(0, io, pageAddr, i, upperBound);
+
+ if (cmp > 0) {
+ nextPageId = 0; // The End.
+
+ return false;
+ }
+
+ boolean stop = !p.apply(BPlusTree.this, io, pageAddr, i);
+
+ if (stop) {
+ nextPageId = 0; // The End.
+
+ return true;
+ }
+ }
+
+ if (nextPageId != 0)
+ lastRow = io.getLookupRow(BPlusTree.this, pageAddr, cnt - 1); // Need save last row.
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override boolean reinitialize0() throws IgniteCheckedException {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override void onNotFound(boolean readDone) {
+ nextPageId = 0;
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ private void iterate() throws IgniteCheckedException {
+ find();
+
+ if (nextPageId == 0) {
+ return;
+ }
+
+ for (;;) {
+ L lastRow0 = lastRow;
+
+ lastRow = null;
+
+ nextPage(lastRow0);
+
+ if (nextPageId == 0)
+ return;
+ }
+ }
+ }
+
+ /**
+ * Forward cursor.
+ */
+ @SuppressWarnings("unchecked")
+ private final class ForwardCursor extends AbstractForwardCursor implements GridCursor<T> {
+ /** */
+ final Object x;
+
+ /** */
+ private T[] rows = (T[])EMPTY;
+
+ /** */
+ private int row = -1;
+
+ /**
+ * @param lowerBound Lower bound.
+ * @param upperBound Upper bound.
+ * @param x Implementation specific argument, {@code null} always means that we need to return full detached data row.
+ */
+ ForwardCursor(L lowerBound, L upperBound, Object x) {
+ super(lowerBound, upperBound);
+
+ this.x = x;
+ }
+
+ /** {@inheritDoc} */
+ @Override boolean fillFromBuffer0(long pageAddr, BPlusIO<L> io, int startIdx, int cnt) throws IgniteCheckedException {
+ if (startIdx == -1) {
+ if (lowerBound != null)
+ startIdx = findLowerBound(pageAddr, io, cnt);
+ else
+ startIdx = 0;
+ }
+
+ if (upperBound != null && cnt != startIdx)
+ cnt = findUpperBound(pageAddr, io, startIdx, cnt);
+
+ cnt -= startIdx;
+
+ if (cnt == 0)
+ return false;
+
+ if (rows == EMPTY)
+ rows = (T[])new Object[cnt];
+
+ for (int i = 0; i < cnt; i++) {
+ T r = getRow(io, pageAddr, startIdx + i, x);
+
+ rows = GridArrays.set(rows, i, r);
+ }
+
+ GridArrays.clearTail(rows, cnt);
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override boolean reinitialize0() throws IgniteCheckedException {
+ return next();
+ }
+
+ /** {@inheritDoc} */
+ @Override void onNotFound(boolean readDone) {
+ if (readDone)
+ rows = null;
+ else {
+ if (rows != EMPTY) {
+ assert rows.length > 0; // Otherwise it makes no sense to create an array.
+
+ // Fake clear.
+ rows[0] = null;
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override void init0() {
+ row = -1;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("SimplifiableIfStatement")
+ @Override public boolean next() throws IgniteCheckedException {
+ if (rows == null)
+ return false;
+
+ if (++row < rows.length && rows[row] != null) {
+ clearLastRow(); // Allow to GC the last returned row.
+
+ return true;
+ }
+
+ T lastRow = clearLastRow();
+
+ row = 0;
+
+ return nextPage(lastRow);
+ }
+
+ /**
+ * @return Cleared last row.
+ */
+ private T clearLastRow() {
+ if (row == 0)
+ return null;
+
+ int last = row - 1;
+
+ T r = rows[last];
+
+ assert r != null;
+
+ rows[last] = null;
+
+ return r;
+ }
/** {@inheritDoc} */
@Override public T get() {
@@ -4805,4 +4993,20 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
/** */
DONE
}
+
+ /**
+ *
+ */
+ public interface TreeRowClosure<L, T extends L> {
+ /**
+ * @param tree Tree.
+ * @param io Tree IO.
+ * @param pageAddr Page address.
+ * @param idx Item index.
+ * @return {@code True} if item pass predicate. TODO IGNITE-3478
+ * @throws IgniteCheckedException If failed.
+ */
+ public boolean apply(BPlusTree<L, T> tree, BPlusIO<L> io, long pageAddr, int idx)
+ throws IgniteCheckedException;
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/fd53c1a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java
index 3fc0962..a07d012 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java
@@ -76,8 +76,8 @@ public abstract class AbstractDataInnerIO extends BPlusInnerIO<CacheSearchRow> i
long link = getLink(pageAddr, idx);
if (storeMvccVersion()) {
- long mvccTopVer = getMvccUpdateTopologyVersion(pageAddr, idx);
- long mvccCntr = getMvccUpdateCounter(pageAddr, idx);
+ long mvccTopVer = getMvccCoordinatorVersion(pageAddr, idx);
+ long mvccCntr = getMvccCounter(pageAddr, idx);
return ((CacheDataTree)tree).rowStore().mvccRow(cacheId,
hash,
@@ -119,8 +119,8 @@ public abstract class AbstractDataInnerIO extends BPlusInnerIO<CacheSearchRow> i
}
if (storeMvccVersion()) {
- long mvccTopVer = rowIo.getMvccUpdateTopologyVersion(srcPageAddr, srcIdx);
- long mvcCntr = rowIo.getMvccUpdateCounter(srcPageAddr, srcIdx);
+ long mvccTopVer = rowIo.getMvccCoordinatorVersion(srcPageAddr, srcIdx);
+ long mvcCntr = rowIo.getMvccCounter(srcPageAddr, srcIdx);
assert mvccTopVer > 0 : mvccTopVer;
assert mvcCntr != CacheCoordinatorsProcessor.COUNTER_NA;
http://git-wip-us.apache.org/repos/asf/ignite/blob/fd53c1a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
index a4eac3e..ef08bec 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
@@ -94,8 +94,8 @@ public abstract class AbstractDataLeafIO extends BPlusLeafIO<CacheSearchRow> imp
}
if (storeMvccVersion()) {
- long mvccUpdateTopVer = ((RowLinkIO)srcIo).getMvccUpdateTopologyVersion(srcPageAddr, srcIdx);
- long mvccUpdateCntr = ((RowLinkIO)srcIo).getMvccUpdateCounter(srcPageAddr, srcIdx);
+ long mvccUpdateTopVer = ((RowLinkIO)srcIo).getMvccCoordinatorVersion(srcPageAddr, srcIdx);
+ long mvccUpdateCntr = ((RowLinkIO)srcIo).getMvccCounter(srcPageAddr, srcIdx);
assert mvccUpdateTopVer >=0 : mvccUpdateCntr;
assert mvccUpdateCntr != CacheCoordinatorsProcessor.COUNTER_NA;
@@ -114,8 +114,8 @@ public abstract class AbstractDataLeafIO extends BPlusLeafIO<CacheSearchRow> imp
long link = getLink(pageAddr, idx);
if (storeMvccVersion()) {
- long mvccTopVer = getMvccUpdateTopologyVersion(pageAddr, idx);
- long mvccCntr = getMvccUpdateCounter(pageAddr, idx);
+ long mvccTopVer = getMvccCoordinatorVersion(pageAddr, idx);
+ long mvccCntr = getMvccCounter(pageAddr, idx);
return ((CacheDataTree)tree).rowStore().mvccRow(cacheId,
hash,
http://git-wip-us.apache.org/repos/asf/ignite/blob/fd53c1a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
index 767c996..eaeefee 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
@@ -114,7 +114,7 @@ public class CacheDataTree extends BPlusTree<CacheSearchRow, CacheDataRow> {
/** {@inheritDoc} */
@Override protected int compare(BPlusIO<CacheSearchRow> iox, long pageAddr, int idx, CacheSearchRow row)
throws IgniteCheckedException {
- assert !grp.mvccEnabled() || row.mvccCoordinatorVersion() != 0;// || row.getClass() == SearchRow.class;
+ assert !grp.mvccEnabled() || row.mvccCoordinatorVersion() != 0 : row;
RowLinkIO io = (RowLinkIO)iox;
@@ -158,14 +158,14 @@ public class CacheDataTree extends BPlusTree<CacheSearchRow, CacheDataRow> {
if (cmp != 0 || !grp.mvccEnabled())
return 0;
- long mvccCrdVer = io.getMvccUpdateTopologyVersion(pageAddr, idx);
+ long mvccCrdVer = io.getMvccCoordinatorVersion(pageAddr, idx);
cmp = Long.compare(row.mvccCoordinatorVersion(), mvccCrdVer);
if (cmp != 0)
return cmp;
- long mvccCntr = io.getMvccUpdateCounter(pageAddr, idx);
+ long mvccCntr = io.getMvccCounter(pageAddr, idx);
assert row.mvccCounter() != CacheCoordinatorsProcessor.COUNTER_NA;
@@ -188,8 +188,8 @@ public class CacheDataTree extends BPlusTree<CacheSearchRow, CacheDataRow> {
CacheDataRowAdapter.RowData.FULL;
if (grp.mvccEnabled()) {
- long mvccTopVer = rowIo.getMvccUpdateTopologyVersion(pageAddr, idx);
- long mvccCntr = rowIo.getMvccUpdateCounter(pageAddr, idx);
+ long mvccTopVer = rowIo.getMvccCoordinatorVersion(pageAddr, idx);
+ long mvccCntr = rowIo.getMvccCounter(pageAddr, idx);
return rowStore.mvccRow(cacheId, hash, link, x, mvccTopVer, mvccCntr);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/fd53c1a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataInnerIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataInnerIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataInnerIO.java
index fc9d15d..3d02b27 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataInnerIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataInnerIO.java
@@ -53,12 +53,12 @@ public final class CacheIdAwareDataInnerIO extends AbstractDataInnerIO {
}
/** {@inheritDoc} */
- @Override public long getMvccUpdateTopologyVersion(long pageAddr, int idx) {
+ @Override public long getMvccCoordinatorVersion(long pageAddr, int idx) {
return 0;
}
/** {@inheritDoc} */
- @Override public long getMvccUpdateCounter(long pageAddr, int idx) {
+ @Override public long getMvccCounter(long pageAddr, int idx) {
return CacheCoordinatorsProcessor.COUNTER_NA;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/fd53c1a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataLeafIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataLeafIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataLeafIO.java
index b328924..58ae9ff 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataLeafIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataLeafIO.java
@@ -53,12 +53,12 @@ public final class CacheIdAwareDataLeafIO extends AbstractDataLeafIO {
}
/** {@inheritDoc} */
- @Override public long getMvccUpdateTopologyVersion(long pageAddr, int idx) {
+ @Override public long getMvccCoordinatorVersion(long pageAddr, int idx) {
return 0;
}
/** {@inheritDoc} */
- @Override public long getMvccUpdateCounter(long pageAddr, int idx) {
+ @Override public long getMvccCounter(long pageAddr, int idx) {
return CacheCoordinatorsProcessor.COUNTER_NA;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/fd53c1a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataInnerIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataInnerIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataInnerIO.java
index 0d424b7..19a5c47 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataInnerIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataInnerIO.java
@@ -53,12 +53,12 @@ public final class DataInnerIO extends AbstractDataInnerIO {
}
/** {@inheritDoc} */
- @Override public long getMvccUpdateTopologyVersion(long pageAddr, int idx) {
+ @Override public long getMvccCoordinatorVersion(long pageAddr, int idx) {
return 0;
}
/** {@inheritDoc} */
- @Override public long getMvccUpdateCounter(long pageAddr, int idx) {
+ @Override public long getMvccCounter(long pageAddr, int idx) {
return CacheCoordinatorsProcessor.COUNTER_NA;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/fd53c1a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataLeafIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataLeafIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataLeafIO.java
index ff51bc2..ab10b96 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataLeafIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataLeafIO.java
@@ -53,12 +53,12 @@ public final class DataLeafIO extends AbstractDataLeafIO {
}
/** {@inheritDoc} */
- @Override public long getMvccUpdateTopologyVersion(long pageAddr, int idx) {
+ @Override public long getMvccCoordinatorVersion(long pageAddr, int idx) {
return 0;
}
/** {@inheritDoc} */
- @Override public long getMvccUpdateCounter(long pageAddr, int idx) {
+ @Override public long getMvccCounter(long pageAddr, int idx) {
return CacheCoordinatorsProcessor.COUNTER_NA;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/fd53c1a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataInnerIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataInnerIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataInnerIO.java
index 5f4f44c..51a911d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataInnerIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataInnerIO.java
@@ -53,12 +53,12 @@ public final class MvccDataInnerIO extends AbstractDataInnerIO {
}
/** {@inheritDoc} */
- @Override public long getMvccUpdateTopologyVersion(long pageAddr, int idx) {
+ @Override public long getMvccCoordinatorVersion(long pageAddr, int idx) {
return PageUtils.getLong(pageAddr, offset(idx) + 12);
}
/** {@inheritDoc} */
- @Override public long getMvccUpdateCounter(long pageAddr, int idx) {
+ @Override public long getMvccCounter(long pageAddr, int idx) {
return PageUtils.getLong(pageAddr, offset(idx) + 20);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/fd53c1a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataLeafIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataLeafIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataLeafIO.java
index e7cfca7..84c33a4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataLeafIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataLeafIO.java
@@ -53,12 +53,12 @@ public final class MvccDataLeafIO extends AbstractDataLeafIO {
}
/** {@inheritDoc} */
- @Override public long getMvccUpdateTopologyVersion(long pageAddr, int idx) {
+ @Override public long getMvccCoordinatorVersion(long pageAddr, int idx) {
return PageUtils.getLong(pageAddr, offset(idx) + 12);
}
/** {@inheritDoc} */
- @Override public long getMvccUpdateCounter(long pageAddr, int idx) {
+ @Override public long getMvccCounter(long pageAddr, int idx) {
return PageUtils.getLong(pageAddr, offset(idx) + 20);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/fd53c1a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccKeyMaxVersionBound.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccKeyMaxVersionBound.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccKeyMaxVersionBound.java
new file mode 100644
index 0000000..aa9422d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccKeyMaxVersionBound.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.tree;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter;
+import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow;
+import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public class MvccKeyMaxVersionBound extends SearchRow implements BPlusTree.TreeRowClosure<CacheSearchRow, CacheDataRow> {
+ /** */
+ private CacheDataRow resRow;
+
+ /**
+ * @param cacheId Cache ID.
+ * @param key Key.
+ */
+ public MvccKeyMaxVersionBound(int cacheId, KeyCacheObject key) {
+ super(cacheId, key);
+ }
+
+ /**
+ * @return Found row.
+ */
+ @Nullable public CacheDataRow row() {
+ return resRow;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(BPlusTree<CacheSearchRow, CacheDataRow> tree, BPlusIO<CacheSearchRow> io,
+ long pageAddr,
+ int idx)
+ throws IgniteCheckedException
+ {
+ resRow = ((CacheDataTree)tree).getRow(io, pageAddr, idx, CacheDataRowAdapter.RowData.NO_KEY);
+
+ return false; // Stop search.
+ }
+
+ /** {@inheritDoc} */
+ @Override public long mvccCoordinatorVersion() {
+ return Long.MAX_VALUE;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long mvccCounter() {
+ return Long.MAX_VALUE;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(MvccKeyMaxVersionBound.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/fd53c1a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccKeyMinVersionBound.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccKeyMinVersionBound.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccKeyMinVersionBound.java
new file mode 100644
index 0000000..f2ac308
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccKeyMinVersionBound.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.tree;
+
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ *
+ */
+public class MvccKeyMinVersionBound extends SearchRow {
+ /**
+ * @param cacheId Cache ID.
+ * @param key Key.
+ */
+ public MvccKeyMinVersionBound(int cacheId, KeyCacheObject key) {
+ super(cacheId, key);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long mvccCoordinatorVersion() {
+ return 1L;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long mvccCounter() {
+ return 1L;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(MvccKeyMinVersionBound.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/fd53c1a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java
new file mode 100644
index 0000000..79544e6
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.tree;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow;
+import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.GridLongList;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<CacheSearchRow, CacheDataRow> {
+ /** */
+ private Boolean hasPrev;
+
+ /** */
+ private boolean canCleanup;
+
+ /** */
+ private GridLongList activeTxs;
+
+ /** */
+ private List<CacheSearchRow> cleanupRows;
+
+ /** */
+ private final MvccCoordinatorVersion mvccVer;
+
+ /**
+ * @param key Key.
+ * @param val Value.
+ * @param ver Version.
+ * @param mvccVer Mvcc version.
+ * @param part Partition.
+ * @param cacheId Cache ID.
+ */
+ public MvccUpdateRow(
+ KeyCacheObject key,
+ CacheObject val,
+ GridCacheVersion ver,
+ MvccCoordinatorVersion mvccVer,
+ int part,
+ int cacheId) {
+ super(key, val, ver, part, 0L, cacheId);
+
+ this.mvccVer = mvccVer;
+ }
+
+ /**
+ * @return {@code True} if previous value was non-null.
+ */
+ public boolean previousNotNull() {
+ return hasPrev != null && hasPrev;
+ }
+
+ /**
+ * @return Active transactions to wait for.
+ */
+ @Nullable public GridLongList activeTransactions() {
+ return activeTxs;
+ }
+
+ /**
+ * @return Rows which are safe to cleanup.
+ */
+ public List<CacheSearchRow> cleanupRows() {
+ return cleanupRows;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(BPlusTree<CacheSearchRow, CacheDataRow> tree,
+ BPlusIO<CacheSearchRow> io,
+ long pageAddr,
+ int idx)
+ throws IgniteCheckedException
+ {
+ RowLinkIO rowIo = (RowLinkIO)io;
+
+ // All previous version should be less then new one.
+ assert mvccVer.coordinatorVersion() >= rowIo.getMvccCoordinatorVersion(pageAddr, idx);
+ assert mvccVer.coordinatorVersion() > rowIo.getMvccCoordinatorVersion(pageAddr, idx) || mvccVer.counter() > rowIo.getMvccCounter(pageAddr, idx);
+
+ boolean checkActive = mvccVer.activeTransactions().size() > 0;
+
+ boolean txActive = false;
+
+ // Suppose transactions on previous coordinator versions are done.
+ if (checkActive && mvccVer.coordinatorVersion() == rowIo.getMvccCoordinatorVersion(pageAddr, idx)) {
+ long rowMvccCntr = rowIo.getMvccCounter(pageAddr, idx);
+
+ if (mvccVer.activeTransactions().contains(rowMvccCntr)) {
+ txActive = true;
+
+ if (activeTxs == null)
+ activeTxs = new GridLongList();
+
+ activeTxs.add(rowMvccCntr);
+ }
+ }
+
+ if (hasPrev == null)
+ hasPrev = Boolean.TRUE; // TODO IGNITE-3478 support removes.
+
+ if (!txActive) {
+ assert Long.compare(mvccVer.coordinatorVersion(), rowIo.getMvccCoordinatorVersion(pageAddr, idx)) >= 0;
+
+ int cmp;
+
+ if (mvccVer.coordinatorVersion() == rowIo.getMvccCoordinatorVersion(pageAddr, idx))
+ cmp = Long.compare(mvccVer.cleanupVersion(), rowIo.getMvccCounter(pageAddr, idx));
+ else
+ cmp = 1;
+
+ if (cmp >= 0) {
+ // Do not cleanup oldest version.
+ if (canCleanup) {
+ CacheSearchRow row = io.getLookupRow(tree, pageAddr, idx);
+
+ assert row.link() != 0 && row.mvccCoordinatorVersion() > 0 : row;
+
+ // Should not be possible to cleanup active tx.
+ assert row.mvccCoordinatorVersion() != mvccVer.coordinatorVersion()
+ || !mvccVer.activeTransactions().contains(row.mvccCounter());
+
+ if (cleanupRows == null)
+ cleanupRows = new ArrayList<>();
+
+ cleanupRows.add(row);
+ }
+ else
+ canCleanup = true;
+ }
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long mvccCoordinatorVersion() {
+ return mvccVer.coordinatorVersion();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long mvccCounter() {
+ return mvccVer.counter();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(MvccUpdateRow.class, this, "super", super.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/fd53c1a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccVersionBasedSearchRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccVersionBasedSearchRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccVersionBasedSearchRow.java
new file mode 100644
index 0000000..c829afb
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccVersionBasedSearchRow.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.tree;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter;
+import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow;
+import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public class MvccVersionBasedSearchRow extends SearchRow implements BPlusTree.TreeRowClosure<CacheSearchRow, CacheDataRow> {
+ /** */
+ private final MvccCoordinatorVersion ver;
+
+ /** */
+ private CacheDataRow resRow;
+
+ /**
+ * @param cacheId Cache ID.
+ * @param key Key.
+ * @param ver Mvcc version.
+ */
+ public MvccVersionBasedSearchRow(int cacheId, KeyCacheObject key, MvccCoordinatorVersion ver) {
+ super(cacheId, key);
+
+ assert ver != null;
+
+ this.ver = ver;
+ }
+
+ /**
+ * @return Found row.
+ */
+ @Nullable public CacheDataRow row() {
+ return resRow;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(BPlusTree<CacheSearchRow, CacheDataRow> tree,
+ BPlusIO<CacheSearchRow> io,
+ long pageAddr,
+ int idx) throws IgniteCheckedException
+ {
+ boolean visible = true;
+
+ if (ver.activeTransactions().size() > 0) {
+ RowLinkIO rowIo = (RowLinkIO)io;
+
+ // TODO IGNITE-3478 sort active transactions?
+ if (rowIo.getMvccCoordinatorVersion(pageAddr, idx) == ver.coordinatorVersion())
+ visible = !ver.activeTransactions().contains(rowIo.getMvccCounter(pageAddr, idx));
+ }
+
+ if (visible) {
+ resRow = ((CacheDataTree) tree).getRow(io, pageAddr, idx, CacheDataRowAdapter.RowData.NO_KEY);
+
+ return false; // Stop search.
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long mvccCoordinatorVersion() {
+ return ver.coordinatorVersion();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long mvccCounter() {
+ return ver.counter();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(MvccVersionBasedSearchRow.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/fd53c1a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/RowLinkIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/RowLinkIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/RowLinkIO.java
index 8b341cb..111968d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/RowLinkIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/RowLinkIO.java
@@ -42,7 +42,17 @@ public interface RowLinkIO {
*/
public int getCacheId(long pageAddr, int idx);
- public long getMvccUpdateTopologyVersion(long pageAddr, int idx);
+ /**
+ * @param pageAddr Page address.
+ * @param idx Index.
+ * @return Mvcc coordinator version.
+ */
+ public long getMvccCoordinatorVersion(long pageAddr, int idx);
- public long getMvccUpdateCounter(long pageAddr, int idx);
+ /**
+ * @param pageAddr Page address.
+ * @param idx Index.
+ * @return Mvcc counter.
+ */
+ public long getMvccCounter(long pageAddr, int idx);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/fd53c1a8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
index f28fe2d..115e8a2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
@@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteTransactions;
@@ -47,16 +48,20 @@ import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.TestCacheNodeExcludingFilter;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishResponse;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.lang.GridInClosure3;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -2534,6 +2539,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
}
/**
+ * @param restartCrd If {@code true} dedicated coordinator node is restarted during test.
* @param srvs Number of server nodes.
* @param clients Number of client nodes.
* @param cacheBackups Number of cache backups.
@@ -2680,6 +2686,149 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
stop.set(true);
}
}
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testSize() throws Exception {
+ Ignite node = startGrid(0);
+
+ IgniteCache cache = node.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, 1));
+
+ assertEquals(cache.size(), 0);
+
+ final int KEYS = 10;
+
+ for (int i = 0; i < KEYS; i++) {
+ final Integer key = i;
+
+ try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ cache.put(key, i);
+
+ tx.commit();
+ }
+
+ assertEquals(i + 1, cache.size());
+ }
+
+ for (int i = 0; i < KEYS; i++) {
+ final Integer key = i;
+
+ try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ cache.put(key, i);
+
+ tx.commit();
+ }
+
+ assertEquals(KEYS, cache.size());
+ }
+
+ // TODO IGNITE-3478: test removes.
+ }
+
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testInternalApi() throws Exception {
+ Ignite node = startGrid(0);
+
+ IgniteCache cache = node.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, 1));
+
+ GridCacheContext cctx =
+ ((IgniteKernal)node).context().cache().context().cacheContext(CU.cacheId(cache.getName()));
+
+ CacheCoordinatorsProcessor crd = cctx.kernalContext().coordinators();
+
+ // Start query to prevent cleanup.
+ IgniteInternalFuture<MvccCoordinatorVersion> fut = crd.requestQueryCounter(crd.currentCoordinator());
+
+ fut.get();
+
+ final int KEYS = 1000;
+
+ for (int i = 0; i < 10; i++) {
+ for (int k = 0; k < KEYS; k++) {
+ final Integer key = k;
+
+ try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ cache.put(key, i);
+
+ tx.commit();
+ }
+ }
+ }
+
+ for (int k = 0; k < KEYS; k++) {
+ final Integer key = k;
+
+ KeyCacheObject key0 = cctx.toCacheKeyObject(key);
+
+ List<T2<Object, MvccCounter>> vers = cctx.offheap().mvccAllVersions(cctx, key0);
+
+ assertEquals(10, vers.size());
+
+ CacheDataRow row = cctx.offheap().read(cctx, key0);
+
+ checkRow(cctx, row, key0, vers.get(0).get1());
+
+ for (T2<Object, MvccCounter> ver : vers) {
+ MvccCounter cntr = ver.get2();
+
+ MvccCoordinatorVersion readVer =
+ new MvccCoordinatorVersionResponse(cntr.coordinatorVersion(), cntr.counter(), 0);
+
+ row = cctx.offheap().mvccRead(cctx, key0, readVer);
+
+ checkRow(cctx, row, key0, ver.get1());
+ }
+
+ checkRow(cctx,
+ cctx.offheap().mvccRead(cctx, key0, version(vers.get(0).get2().coordinatorVersion() + 1, 1)),
+ key0,
+ vers.get(0).get1());
+
+ checkRow(cctx,
+ cctx.offheap().mvccRead(cctx, key0, version(vers.get(0).get2().coordinatorVersion(), vers.get(0).get2().counter() + 1)),
+ key0,
+ vers.get(0).get1());
+
+ MvccCoordinatorVersionResponse ver = version(crd.currentCoordinator().coordinatorVersion(), 100000);
+
+ for (int v = 0; v < vers.size(); v++) {
+ MvccCounter cntr = vers.get(v).get2();
+
+ ver.addTx(cntr.counter());
+
+ row = cctx.offheap().mvccRead(cctx, key0, ver);
+
+ if (v == vers.size() - 1)
+ assertNull(row);
+ else
+ checkRow(cctx, row, key0, vers.get(v + 1).get1());
+ }
+ }
+ }
+
+ /**
+ * @param cctx Context.
+ * @param row Row.
+ * @param expKey Expected row key.
+ * @param expVal Expected row value.
+ */
+ private void checkRow(GridCacheContext cctx, CacheDataRow row, KeyCacheObject expKey, Object expVal) {
+ assertNotNull(row);
+ assertEquals(expKey, row.key());
+ assertEquals(expVal, row.value().value(cctx.cacheObjectContext(), false));
+ }
+
+ /**
+ * @param crdVer Coordinator version.
+ * @param cntr Counter.
+ * @return Version.
+ */
+ private MvccCoordinatorVersionResponse version(long crdVer, long cntr) {
+ return new MvccCoordinatorVersionResponse(crdVer, cntr, 0);
+ }
/**
* @return Cache configurations.
http://git-wip-us.apache.org/repos/asf/ignite/blob/fd53c1a8/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java
index 9c0d791..e2f6b2e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java
@@ -25,6 +25,7 @@ import java.util.Random;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -570,6 +571,7 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
assertNoLocks();
assertEquals(x, tree.findOne(x).longValue());
+ checkIterate(tree, x, x, x, true);
assertNoLocks();
@@ -584,12 +586,15 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
assertNull(tree.findOne(-1L));
- for (long x = 0; x < cnt; x++)
+ for (long x = 0; x < cnt; x++) {
assertEquals(x, tree.findOne(x).longValue());
+ checkIterate(tree, x, x, x, true);
+ }
assertNoLocks();
assertNull(tree.findOne(cnt));
+ checkIterate(tree, cnt, cnt, null, false);
for (long x = RMV_INC > 0 ? 0 : cnt - 1; x >= 0 && x < cnt; x += RMV_INC) {
X.println(" -- " + x);
@@ -603,6 +608,7 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
assertNoLocks();
assertNull(tree.findOne(x));
+ checkIterate(tree, x, x, null, false);
assertNoLocks();
@@ -619,6 +625,32 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
}
/**
+ * @param tree
+ * @param lower
+ * @param upper
+ * @param exp
+ * @param expFound
+ * @throws IgniteCheckedException
+ */
+ private void checkIterate(TestTree tree, long lower, long upper, Long exp, boolean expFound)
+ throws IgniteCheckedException {
+ TestTreeRowClosure c = new TestTreeRowClosure(exp);
+
+ tree.iterate(lower, upper, c);
+
+ assertEquals(expFound, c.found);
+ }
+
+ private void checkIterateC(TestTree tree, long lower, long upper, TestTreeRowClosure c, boolean expFound)
+ throws IgniteCheckedException {
+ c.found = false;
+
+ tree.iterate(lower, upper, c);
+
+ assertEquals(expFound, c.found);
+ }
+
+ /**
* @throws IgniteCheckedException If failed.
*/
public void testRandomInvoke_1_30_1() throws IgniteCheckedException {
@@ -1242,6 +1274,206 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
}
/**
+ * @throws Exception If failed.
+ */
+ public void testIterate() throws Exception {
+ MAX_PER_PAGE = 5;
+
+ TestTree tree = createTestTree(true);
+
+ checkIterate(tree, 0L, 100L, null, false);
+
+ for (long idx = 1L; idx <= 10L; ++idx)
+ tree.put(idx);
+
+ for (long idx = 1L; idx <= 10L; ++idx)
+ checkIterate(tree, idx, 100L, idx, true);
+
+ checkIterate(tree, 0L, 100L, 1L, true);
+
+ for (long idx = 1L; idx <= 10L; ++idx)
+ checkIterate(tree, idx, 100L, 10L, true);
+
+ checkIterate(tree, 0L, 100L, 100L, false);
+
+ for (long idx = 1L; idx <= 10L; ++idx)
+ checkIterate(tree, 0L, 100L, idx, true);
+
+ for (long idx = 0L; idx <= 10L; ++idx)
+ checkIterate(tree, idx, 11L, -1L, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testIterateConcurrentPutRemove() throws Exception {
+ findOneBoundedConcurrentPutRemove();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testIterateConcurrentPutRemove_1() throws Exception {
+ MAX_PER_PAGE = 1;
+
+ findOneBoundedConcurrentPutRemove();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testIterateConcurrentPutRemove_5() throws Exception {
+ MAX_PER_PAGE = 5;
+
+ findOneBoundedConcurrentPutRemove();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testIteratePutRemove_10() throws Exception {
+ MAX_PER_PAGE = 10;
+
+ findOneBoundedConcurrentPutRemove();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void findOneBoundedConcurrentPutRemove() throws Exception {
+ final TestTree tree = createTestTree(true);
+
+ final int KEYS = 10_000;
+
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ for (int i = 0; i < 10; i++) {
+ for (long idx = 0L; idx < KEYS; ++idx)
+ tree.put(idx);
+
+ final Long findKey;
+
+ if (MAX_PER_PAGE > 0) {
+ switch (i) {
+ case 0:
+ findKey = 1L;
+
+ break;
+
+ case 1:
+ findKey = (long)MAX_PER_PAGE;
+
+ break;
+
+ case 2:
+ findKey = (long)MAX_PER_PAGE - 1;
+
+ break;
+
+ case 3:
+ findKey = (long)MAX_PER_PAGE + 1;
+
+ break;
+
+ case 4:
+ findKey = (long)(KEYS / MAX_PER_PAGE / 2) * MAX_PER_PAGE;
+
+ break;
+
+ case 5:
+ findKey = (long)(KEYS / MAX_PER_PAGE / 2) * MAX_PER_PAGE - 1;
+
+ break;
+
+ case 6:
+ findKey = (long)(KEYS / MAX_PER_PAGE / 2) * MAX_PER_PAGE + 1;
+
+ break;
+
+ case 7:
+ findKey = (long)KEYS - 1;
+
+ break;
+
+ default:
+ findKey = rnd.nextLong(KEYS);
+ }
+ }
+ else
+ findKey = rnd.nextLong(KEYS);
+
+ info("Iteration [iter=" + i + ", key=" + findKey + ']');
+
+ assertEquals(findKey, tree.findOne(findKey));
+ checkIterate(tree, findKey, findKey, findKey, true);
+
+ IgniteInternalFuture getFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ TestTreeRowClosure p = new TestTreeRowClosure(findKey);
+
+ TestTreeRowClosure falseP = new TestTreeRowClosure(-1L);
+
+ int cnt = 0;
+
+ while (!stop.get()) {
+ int shift = MAX_PER_PAGE > 0 ? rnd.nextInt(MAX_PER_PAGE * 2) : rnd.nextInt(100);
+
+ checkIterateC(tree, findKey, findKey, p, true);
+
+ checkIterateC(tree, findKey - shift, findKey, p, true);
+
+ checkIterateC(tree, findKey - shift, findKey + shift, p, true);
+
+ checkIterateC(tree, findKey, findKey + shift, p, true);
+
+ checkIterateC(tree, -100L, KEYS + 100L, falseP, false);
+
+ cnt++;
+ }
+
+ info("Done, read count: " + cnt);
+
+ return null;
+ }
+ }, 10, "find");
+
+ asyncRunFut = new GridCompoundFuture<>();
+
+ asyncRunFut.add(getFut);
+
+ asyncRunFut.markInitialized();
+
+ try {
+ U.sleep(100);
+
+ for (int j = 0; j < 20; j++) {
+ for (long idx = 0L; idx < KEYS / 2; ++idx) {
+ long toRmv = rnd.nextLong(KEYS);
+
+ if (toRmv != findKey)
+ tree.remove(toRmv);
+ }
+
+ for (long idx = 0L; idx < KEYS / 2; ++idx) {
+ long put = rnd.nextLong(KEYS);
+
+ tree.put(put);
+ }
+ }
+ }
+ finally {
+ stop.set(true);
+ }
+
+ asyncRunFut.get();
+
+ stop.set(false);
+ }
+ }
+
+ /**
*
*/
public void testConcurrentGrowDegenerateTreeAndConcurrentRemove() throws Exception {
@@ -1449,6 +1681,17 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
last = c.get();
}
+
+ TestTreeFindFirstClosure cl = new TestTreeFindFirstClosure();
+
+ tree.iterate((long)low, (long)high, cl);
+
+ last = cl.val;
+
+ if (last != null) {
+ assertTrue(low + " <= " + last + " <= " + high, last >= low);
+ assertTrue(low + " <= " + last + " <= " + high, last <= high);
+ }
}
return null;
@@ -1853,4 +2096,50 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
return PageUtils.getLong(pageAddr, offset(idx));
}
}
+
+ /**
+ *
+ */
+ static class TestTreeRowClosure implements BPlusTree.TreeRowClosure<Long, Long> {
+ /** */
+ private final Long expVal;
+
+ /** */
+ private boolean found;
+
+ /**
+ * @param expVal Value to find or {@code null} to find first.
+ */
+ TestTreeRowClosure(Long expVal) {
+ this.expVal = expVal;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(BPlusTree<Long, Long> tree, BPlusIO<Long> io, long pageAddr, int idx)
+ throws IgniteCheckedException {
+ assert !found;
+
+ found = expVal == null || io.getLookupRow(tree, pageAddr, idx).equals(expVal);
+
+ return !found;
+ }
+ }
+
+ /**
+ *
+ */
+ static class TestTreeFindFirstClosure implements BPlusTree.TreeRowClosure<Long, Long> {
+ /** */
+ private Long val;
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(BPlusTree<Long, Long> tree, BPlusIO<Long> io, long pageAddr, int idx)
+ throws IgniteCheckedException {
+ assert val == null;
+
+ val = io.getLookupRow(tree, pageAddr, idx);
+
+ return false;
+ }
+ }
}