You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/10/11 10:42:59 UTC
[01/17] ignite git commit: ignite-5937
Repository: ignite
Updated Branches:
refs/heads/ignite-5932 [created] 5318153d6
ignite-5937
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/44ad7011
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/44ad7011
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/44ad7011
Branch: refs/heads/ignite-5932
Commit: 44ad70112fb1063b61fcdc20c5fee893381d2e44
Parents: 27b2be4
Author: sboikov <sb...@gridgain.com>
Authored: Tue Oct 3 16:50:01 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Oct 3 18:00:46 2017 +0300
----------------------------------------------------------------------
.../cache/IgniteCacheOffheapManagerImpl.java | 83 ++--
.../cache/persistence/tree/BPlusTree.java | 376 ++++++++++++++-----
.../cache/tree/MvccVersionBasedSearchRow.java | 80 ++++
.../cache/mvcc/CacheMvccTransactionsTest.java | 79 ++++
.../processors/database/BPlusTreeSelfTest.java | 232 +++++++++++-
5 files changed, 718 insertions(+), 132 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/44ad7011/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index d8c5eaa..76d9649 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -56,6 +56,7 @@ import org.apache.ignite.internal.processors.cache.tree.CacheDataTree;
import org.apache.ignite.internal.processors.cache.tree.DataRow;
import org.apache.ignite.internal.processors.cache.tree.MvccDataRow;
import org.apache.ignite.internal.processors.cache.tree.MvccSearchRow;
+import org.apache.ignite.internal.processors.cache.tree.MvccVersionBasedSearchRow;
import org.apache.ignite.internal.processors.cache.tree.PendingEntriesTree;
import org.apache.ignite.internal.processors.cache.tree.PendingRow;
import org.apache.ignite.internal.processors.cache.tree.SearchRow;
@@ -1647,14 +1648,22 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
CacheDataRow row;
if (grp.mvccEnabled()) {
- // TODO IGNITE-3484: need special method.
- GridCursor<CacheDataRow> cur = dataTree.find(new MvccSearchRow(cacheId, key, Long.MAX_VALUE, Long.MAX_VALUE),
- new MvccSearchRow(cacheId, key, 1, 1));
+ if (false) {
+ row = dataTree.findOneBounded(
+ new MvccSearchRow(cacheId, key, Long.MAX_VALUE, Long.MAX_VALUE),
+ new MvccSearchRow(cacheId, key, 1L, 1L),
+ null,
+ CacheDataRowAdapter.RowData.NO_KEY);
+ }
+ else {
+ GridCursor<CacheDataRow> cur = dataTree.find(new MvccSearchRow(cacheId, key, Long.MAX_VALUE, Long.MAX_VALUE),
+ new MvccSearchRow(cacheId, key, 1, 1));
- if (cur.next())
- row = cur.get();
- else
- row = null;
+ if (cur.next())
+ row = cur.get();
+ else
+ row = null;
+ }
}
else
row = dataTree.findOne(new SearchRow(cacheId, key), CacheDataRowAdapter.RowData.NO_KEY);
@@ -1705,41 +1714,53 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
int cacheId = grp.sharedGroup() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID;
- // TODO IGNITE-3484: need special method.
- GridCursor<CacheDataRow> cur = dataTree.find(
- new MvccSearchRow(cacheId, key, ver.coordinatorVersion(), ver.counter()),
- new MvccSearchRow(cacheId, key, 1, 1));
+ if (false) {
+ MvccVersionBasedSearchRow lower = new MvccVersionBasedSearchRow(cacheId, key, ver);
- CacheDataRow row = null;
+ CacheDataRow row = dataTree.findOneBounded(
+ lower,
+ new MvccSearchRow(cacheId, key, 1L, 1L),
+ lower, // Use the same instance as predicate to do not create extra object.
+ CacheDataRowAdapter.RowData.NO_KEY);
- MvccLongList txs = ver.activeTransactions();
+ afterRowFound(row, key);
- while (cur.next()) {
- CacheDataRow row0 = cur.get();
+ return row;
+ }
+ else {
+ GridCursor<CacheDataRow> cur = dataTree.find(
+ new MvccSearchRow(cacheId, key, ver.coordinatorVersion(), ver.counter()),
+ new MvccSearchRow(cacheId, key, 1, 1));
- assert row0.mvccCoordinatorVersion() > 0 : row0;
+ CacheDataRow row = null;
- boolean visible;
+ MvccLongList txs = ver.activeTransactions();
- if (txs != null) {
- visible = row0.mvccCoordinatorVersion() != ver.coordinatorVersion()
- || !txs.contains(row0.mvccCounter());
- }
- else
- visible = true;
+ while (cur.next()) {
+ CacheDataRow row0 = cur.get();
- if (visible) {
- row = row0;
+ assert row0.mvccCoordinatorVersion() > 0 : row0;
- break;
- }
- }
+ boolean visible;
- assert row == null || key.equals(row.key());
+ if (txs != null) {
+ visible = row0.mvccCoordinatorVersion() != ver.coordinatorVersion()
+ || !txs.contains(row0.mvccCounter());
+ }
+ else
+ visible = true;
- //afterRowFound(row, key);
+ if (visible) {
+ row = row0;
- return row;
+ break;
+ }
+ }
+
+ assert row == null || key.equals(row.key());
+
+ return row;
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/44ad7011/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
index c73b4c7..d570f1e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
@@ -2509,14 +2509,14 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
*/
private final class GetCursor extends Get {
/** */
- ForwardCursor cursor;
+ AbstractForwardCursor cursor;
/**
* @param lower Lower bound.
* @param shift Shift.
* @param cursor Cursor.
*/
- GetCursor(L lower, int shift, ForwardCursor cursor) {
+ GetCursor(L lower, int shift, AbstractForwardCursor cursor) {
super(lower, false);
assert shift != 0; // Either handle range of equal rows or find a greater row after concurrent merge.
@@ -4384,52 +4384,85 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
*/
protected abstract T getRow(BPlusIO<L> io, long pageAddr, int idx, Object x) throws IgniteCheckedException;
+ public interface RowPredicate<L, T extends L> {
+ public boolean apply(BPlusTree<L, T> tree, BPlusIO<L> io, long pageAddr, int idx) throws IgniteCheckedException;
+ }
+
+ public T findOneBounded(L lower, L upper, RowPredicate<L, T> p, Object x) throws IgniteCheckedException {
+ checkDestroyed();
+
+ try {
+ FindOneCursor cursor = new FindOneCursor(lower, upper, p, x);
+
+ return cursor.findOne();
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteCheckedException("Runtime failure on bounds: [lower=" + lower + ", upper=" + upper + "]", e);
+ }
+ catch (RuntimeException e) {
+ throw new IgniteException("Runtime failure on bounds: [lower=" + lower + ", upper=" + upper + "]", e);
+ }
+ catch (AssertionError e) {
+ throw new AssertionError("Assertion error on bounds: [lower=" + lower + ", upper=" + upper + "]", e);
+ }
+ finally {
+ checkDestroyed();
+ }
+ }
+
/**
- * Forward cursor.
+ *
*/
@SuppressWarnings("unchecked")
- private final class ForwardCursor implements GridCursor<T> {
+ private abstract class AbstractForwardCursor {
/** */
- private T[] rows = (T[])EMPTY;
-
- /** */
- private int row = -1;
+ long nextPageId;
/** */
- private long nextPageId;
-
- /** */
- private L lowerBound;
+ L lowerBound;
/** */
private int lowerShift = -1; // Initially it is -1 to handle multiple equal rows.
/** */
- private final L upperBound;
+ final L upperBound;
/** */
- private final Object x;
+ final Object x;
/**
* @param lowerBound Lower bound.
* @param upperBound Upper bound.
+ * @param x Implementation specific argument, {@code null} always means that we need to return full detached data row.
*/
- ForwardCursor(L lowerBound, L upperBound) {
+ AbstractForwardCursor(L lowerBound, L upperBound, Object x) {
this.lowerBound = lowerBound;
this.upperBound = upperBound;
- this.x = null;
+ this.x = x;
}
/**
- * @param lowerBound Lower bound.
- * @param upperBound Upper bound.
- * @param x Implementation specific argument, {@code null} always means that we need to return full detached data row.
+ *
*/
- ForwardCursor(L lowerBound, L upperBound, Object x) {
- this.lowerBound = lowerBound;
- this.upperBound = upperBound;
- this.x = x;
- }
+ abstract void init0();
+
+ /**
+ * @param pageAddr
+ * @param io
+ * @param startIdx
+ * @param cnt
+ * @return
+ * @throws IgniteCheckedException If failed.
+ */
+ abstract boolean fillFromBuffer0(long pageAddr, BPlusIO<L> io, int startIdx, int cnt) throws IgniteCheckedException;
+
+ /**
+ * @return
+ * @throws IgniteCheckedException If failed.
+ */
+ abstract boolean reinitialize0() throws IgniteCheckedException;
+
+ abstract void onNotFound(boolean readDone);
/**
* @param pageAddr Page address.
@@ -4437,9 +4470,10 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
* @param startIdx Start index.
* @throws IgniteCheckedException If failed.
*/
- private void init(long pageAddr, BPlusIO<L> io, int startIdx) throws IgniteCheckedException {
+ final void init(long pageAddr, BPlusIO<L> io, int startIdx) throws IgniteCheckedException {
nextPageId = 0;
- row = -1;
+
+ init0();
int cnt = io.getCount(pageAddr);
@@ -4447,16 +4481,10 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
if (cnt == 0) {
assert io.getForward(pageAddr) == 0L;
- rows = null;
- }
- else if (!fillFromBuffer(pageAddr, io, startIdx, cnt)) {
- if (rows != EMPTY) {
- assert rows.length > 0; // Otherwise it makes no sense to create an array.
-
- // Fake clear.
- rows[0] = null;
- }
+ onNotFound(true);
}
+ else if (!fillFromBuffer(pageAddr, io, startIdx, cnt))
+ onNotFound(false);
}
/**
@@ -4466,7 +4494,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
* @return Adjusted to lower bound start index.
* @throws IgniteCheckedException If failed.
*/
- private int findLowerBound(long pageAddr, BPlusIO<L> io, int cnt) throws IgniteCheckedException {
+ final int findLowerBound(long pageAddr, BPlusIO<L> io, int cnt) throws IgniteCheckedException {
assert io.isLeaf();
// Compare with the first row on the page.
@@ -4491,7 +4519,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
* @return Corrected number of rows with respect to upper bound.
* @throws IgniteCheckedException If failed.
*/
- private int findUpperBound(long pageAddr, BPlusIO<L> io, int low, int cnt) throws IgniteCheckedException {
+ final int findUpperBound(long pageAddr, BPlusIO<L> io, int low, int cnt) throws IgniteCheckedException {
assert io.isLeaf();
// Compare with the last row on the page.
@@ -4530,68 +4558,13 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
nextPageId = io.getForward(pageAddr);
- if (lowerBound != null && startIdx == 0)
- startIdx = findLowerBound(pageAddr, io, cnt);
-
- if (upperBound != null && cnt != startIdx)
- cnt = findUpperBound(pageAddr, io, startIdx, cnt);
-
- cnt -= startIdx;
-
- if (cnt == 0)
- return false;
-
- if (rows == EMPTY)
- rows = (T[])new Object[cnt];
-
- for (int i = 0; i < cnt; i++) {
- T r = getRow(io, pageAddr, startIdx + i, x);
-
- rows = GridArrays.set(rows, i, r);
- }
-
- GridArrays.clearTail(rows, cnt);
-
- return true;
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("SimplifiableIfStatement")
- @Override public boolean next() throws IgniteCheckedException {
- if (rows == null)
- return false;
-
- if (++row < rows.length && rows[row] != null) {
- clearLastRow(); // Allow to GC the last returned row.
-
- return true;
- }
-
- return nextPage();
- }
-
- /**
- * @return Cleared last row.
- */
- private T clearLastRow() {
- if (row == 0)
- return null;
-
- int last = row - 1;
-
- T r = rows[last];
-
- assert r != null;
-
- rows[last] = null;
-
- return r;
+ return fillFromBuffer0(pageAddr, io, startIdx, cnt);
}
/**
* @throws IgniteCheckedException If failed.
*/
- private void find() throws IgniteCheckedException {
+ final void find() throws IgniteCheckedException {
assert lowerBound != null;
doFind(new GetCursor(lowerBound, lowerShift, this));
@@ -4607,21 +4580,19 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
// to the previous lower bound.
find();
- return next();
+ return reinitialize0();
}
/**
* @return {@code true} If we have rows to return after reading the next page.
* @throws IgniteCheckedException If failed.
*/
- private boolean nextPage() throws IgniteCheckedException {
- updateLowerBound(clearLastRow());
-
- row = 0;
+ final boolean nextPage(T lastRow) throws IgniteCheckedException {
+ updateLowerBound(lastRow);
for (;;) {
if (nextPageId == 0) {
- rows = null;
+ onNotFound(true);
return false; // Done.
}
@@ -4665,6 +4636,211 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
lowerBound = lower; // Move the lower bound forward for further concurrent merge retries.
}
}
+ }
+
+ /**
+ * Forward cursor.
+ */
+ @SuppressWarnings("unchecked")
+ private final class FindOneCursor extends AbstractForwardCursor {
+ /** */
+ private Object resRow;
+
+ /** */
+ private T lastRow;
+
+ /** */
+ private final RowPredicate<L, T> p;
+
+ /**
+ * @param lowerBound Lower bound.
+ * @param upperBound Upper bound.
+ * @param x Implementation specific argument, {@code null} always means that we need to return full detached data row.
+ */
+ FindOneCursor(L lowerBound, L upperBound, RowPredicate<L, T> p, Object x) {
+ super(lowerBound, upperBound, x);
+
+ assert lowerBound != null;
+ assert upperBound != null;
+
+ this.p = p;
+ }
+
+ @Override void init0() {
+ // No-op.
+ }
+
+ @Override boolean fillFromBuffer0(long pageAddr, BPlusIO<L> io, int startIdx, int cnt) throws IgniteCheckedException {
+ if (startIdx == 0) // TODO IGNITE-3478: startIdx == 0? can search twice for first item?
+ startIdx = findLowerBound(pageAddr, io, cnt);
+
+ if (cnt == startIdx)
+ return false;
+
+ for (int i = startIdx; i < cnt; i++) {
+ int cmp = compare(0, io, pageAddr, i, upperBound);
+
+ if (cmp > 0) {
+ nextPageId = 0; // The End.
+
+ return false;
+ }
+
+ if (p == null || p.apply(BPlusTree.this, io, pageAddr, i)) {
+ resRow = getRow(io, pageAddr, i, x);
+
+ return true;
+ }
+ }
+
+ if (nextPageId != 0)
+ lastRow = getRow(io, pageAddr, cnt - 1, x); // Need save last row.
+
+ return true;
+ }
+
+ @Override boolean reinitialize0() throws IgniteCheckedException {
+ return true;
+ }
+
+ @Override void onNotFound(boolean readDone) {
+ resRow = EMPTY;
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ * @return Found row.
+ */
+ private T findOne() throws IgniteCheckedException {
+ find();
+
+ if (resRow != null) {
+ if (resRow == EMPTY)
+ return null;
+
+ return (T)resRow;
+ }
+
+ for (;;) {
+ T lastRow0 = lastRow;
+
+ lastRow = null;
+
+ nextPage(lastRow0);
+
+ if (resRow != null) {
+ if (resRow == EMPTY)
+ return null;
+
+ return (T)resRow;
+ }
+ }
+ }
+ }
+
+ /**
+ * Forward cursor.
+ */
+ @SuppressWarnings("unchecked")
+ private final class ForwardCursor extends AbstractForwardCursor implements GridCursor<T> {
+ /** */
+ private T[] rows = (T[])EMPTY;
+
+ /** */
+ private int row = -1;
+
+ /**
+ * @param lowerBound Lower bound.
+ * @param upperBound Upper bound.
+ * @param x Implementation specific argument, {@code null} always means that we need to return full detached data row.
+ */
+ ForwardCursor(L lowerBound, L upperBound, Object x) {
+ super(lowerBound, upperBound, x);
+ }
+
+ @Override boolean fillFromBuffer0(long pageAddr, BPlusIO<L> io, int startIdx, int cnt) throws IgniteCheckedException {
+ if (lowerBound != null && startIdx == 0)
+ startIdx = findLowerBound(pageAddr, io, cnt);
+
+ if (upperBound != null && cnt != startIdx)
+ cnt = findUpperBound(pageAddr, io, startIdx, cnt);
+
+ cnt -= startIdx;
+
+ if (cnt == 0)
+ return false;
+
+ if (rows == EMPTY)
+ rows = (T[])new Object[cnt];
+
+ for (int i = 0; i < cnt; i++) {
+ T r = getRow(io, pageAddr, startIdx + i, x);
+
+ rows = GridArrays.set(rows, i, r);
+ }
+
+ GridArrays.clearTail(rows, cnt);
+
+ return true;
+ }
+
+ @Override boolean reinitialize0() throws IgniteCheckedException {
+ return next();
+ }
+
+ @Override void onNotFound(boolean readDone) {
+ if (readDone)
+ rows = null;
+ else {
+ if (rows != EMPTY) {
+ assert rows.length > 0; // Otherwise it makes no sense to create an array.
+
+ // Fake clear.
+ rows[0] = null;
+ }
+ }
+ }
+
+ @Override void init0() {
+ row = -1;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("SimplifiableIfStatement")
+ @Override public boolean next() throws IgniteCheckedException {
+ if (rows == null)
+ return false;
+
+ if (++row < rows.length && rows[row] != null) {
+ clearLastRow(); // Allow to GC the last returned row.
+
+ return true;
+ }
+
+ T lastRow = clearLastRow();
+
+ row = 0;
+
+ return nextPage(lastRow);
+ }
+
+ /**
+ * @return Cleared last row.
+ */
+ private T clearLastRow() {
+ if (row == 0)
+ return null;
+
+ int last = row - 1;
+
+ T r = rows[last];
+
+ assert r != null;
+
+ rows[last] = null;
+
+ return r;
+ }
/** {@inheritDoc} */
@Override public T get() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/44ad7011/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccVersionBasedSearchRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccVersionBasedSearchRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccVersionBasedSearchRow.java
new file mode 100644
index 0000000..f708ffd
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccVersionBasedSearchRow.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.tree;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow;
+import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ *
+ */
+public class MvccVersionBasedSearchRow extends SearchRow implements BPlusTree.RowPredicate<CacheSearchRow, CacheDataRow> {
+ /** */
+ private final MvccCoordinatorVersion ver;
+
+ /**
+ * @param cacheId Cache ID.
+ * @param key Key.
+ * @param ver Mvcc version.
+ */
+ public MvccVersionBasedSearchRow(int cacheId, KeyCacheObject key, MvccCoordinatorVersion ver) {
+ super(cacheId, key);
+
+ assert ver != null;
+
+ this.ver = ver;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(BPlusTree<CacheSearchRow, CacheDataRow> tree,
+ BPlusIO<CacheSearchRow> io,
+ long pageAddr,
+ int idx) throws IgniteCheckedException
+ {
+ if (ver.activeTransactions() == null)
+ return true;
+
+ RowLinkIO rowIo = (RowLinkIO)io;
+
+ if (rowIo.getMvccUpdateTopologyVersion(pageAddr, idx) != ver.coordinatorVersion())
+ return true;
+
+ return !ver.activeTransactions().contains(ver.counter()); // TODO IGNITE-3478 sort active transactions?
+ }
+
+ /** {@inheritDoc} */
+ @Override public long mvccCoordinatorVersion() {
+ return ver.coordinatorVersion();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long mvccCounter() {
+ return ver.counter();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(MvccVersionBasedSearchRow.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/44ad7011/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
index f28fe2d..7936340 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
@@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteTransactions;
@@ -47,16 +48,20 @@ import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.TestCacheNodeExcludingFilter;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishResponse;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.lang.GridInClosure3;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -2682,6 +2687,80 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
}
/**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testInternalApi() throws Exception {
+ Ignite node = startGrid(0);
+
+ IgniteCache cache = node.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, 1));
+
+ GridCacheContext cctx =
+ ((IgniteKernal)node).context().cache().context().cacheContext(CU.cacheId(cache.getName()));
+
+ CacheCoordinatorsProcessor crd = cctx.kernalContext().coordinators();
+
+ // Start query to prevent cleanup.
+ IgniteInternalFuture<MvccCoordinatorVersion> fut = crd.requestQueryCounter(crd.currentCoordinator());
+
+ fut.get();
+
+ final Integer key = 0;
+
+ for (int i = 0; i < 10; i++) {
+ try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ cache.put(key + 1, i);
+
+ tx.commit();
+ }
+ }
+
+ for (int i = 0; i < 10; i++) {
+ try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ cache.put(key, i);
+
+ tx.commit();
+ }
+ }
+
+ KeyCacheObject key0 = cctx.toCacheKeyObject(key);
+
+ List<T2<Object, MvccCounter>> vers = cctx.offheap().mvccAllVersions(cctx, key0);
+
+ assertEquals(10, vers.size());
+
+ CacheDataRow row = cctx.offheap().read(cctx, key0);
+
+ checkRow(cctx, row, key0, vers.get(0).get1());
+
+ for (T2<Object, MvccCounter> ver : vers) {
+ MvccCounter cntr = ver.get2();
+
+ MvccCoordinatorVersion readVer =
+ new MvccCoordinatorVersionResponse(cntr.coordinatorVersion(), cntr.counter(), 0);
+
+ row = cctx.offheap().mvccRead(cctx, key0, readVer);
+
+ checkRow(cctx, row, key0, ver.get1());
+ }
+
+ checkRow(cctx,
+ cctx.offheap().mvccRead(cctx, key0, new MvccCoordinatorVersionResponse(vers.get(0).get2().coordinatorVersion() + 1, 1, 0)),
+ key0,
+ vers.get(0).get1());
+
+ checkRow(cctx,
+ cctx.offheap().mvccRead(cctx, key0, new MvccCoordinatorVersionResponse(vers.get(0).get2().coordinatorVersion(), vers.get(0).get2().counter() + 1, 0)),
+ key0,
+ vers.get(0).get1());
+ }
+
+ private void checkRow(GridCacheContext cctx, CacheDataRow row, KeyCacheObject expKey, Object expVal) {
+ assertNotNull(row);
+ assertEquals(expKey, row.key());
+ assertEquals(expVal, row.value().value(cctx.cacheObjectContext(), false));
+ }
+
+ /**
* @return Cache configurations.
*/
private List<CacheConfiguration<Object, Object>> cacheConfigurations() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/44ad7011/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java
index 9c0d791..e7ab34f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java
@@ -25,6 +25,7 @@ import java.util.Random;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -570,6 +571,7 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
assertNoLocks();
assertEquals(x, tree.findOne(x).longValue());
+ assertEquals(x, tree.findOneBounded(x, x, null, null).longValue());
assertNoLocks();
@@ -584,12 +586,15 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
assertNull(tree.findOne(-1L));
- for (long x = 0; x < cnt; x++)
+ for (long x = 0; x < cnt; x++) {
assertEquals(x, tree.findOne(x).longValue());
+ assertEquals(x, tree.findOneBounded(x, x, null, null).longValue());
+ }
assertNoLocks();
assertNull(tree.findOne(cnt));
+ assertNull(tree.findOneBounded(cnt, cnt, null, null));
for (long x = RMV_INC > 0 ? 0 : cnt - 1; x >= 0 && x < cnt; x += RMV_INC) {
X.println(" -- " + x);
@@ -603,6 +608,7 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
assertNoLocks();
assertNull(tree.findOne(x));
+ assertNull(tree.findOneBounded(x, x, null, null));
assertNoLocks();
@@ -1242,6 +1248,200 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
}
/**
+ * @throws Exception If failed.
+ */
+ public void testFindOneBounded() throws Exception {
+ MAX_PER_PAGE = 5;
+
+ TestTree tree = createTestTree(true);
+
+ assertNull(tree.findOneBounded(0L, 100L, null, null));
+
+ for (long idx = 1L; idx <= 10L; ++idx)
+ tree.put(idx);
+
+ for (long idx = 1L; idx <= 10L; ++idx)
+ assertEquals(idx, (Object)tree.findOneBounded(idx, 100L, null, null));
+
+ assertEquals(1L, (Object)tree.findOneBounded(0L, 100L, null, null));
+
+ for (long idx = 1L; idx <= 10L; ++idx)
+ assertEquals(10L, (Object)tree.findOneBounded(idx, 100L, new TestRowPredicate(10L), null));
+
+ assertNull(tree.findOneBounded(0L, 100L, new TestRowPredicate(100L), null));
+
+ for (long idx = 1L; idx <= 10L; ++idx)
+ assertEquals(idx, (Object)tree.findOneBounded(0L, 100L, new TestRowPredicate(idx), null));
+
+ for (long idx = 0L; idx <= 10L; ++idx)
+ assertNull(tree.findOneBounded(idx, 11L, new TestRowPredicate(-1L), null));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testFindOneBoundedConcurrentPutRemove() throws Exception {
+ findOneBoundedConcurrentPutRemove();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testFindOneBoundedConcurrentPutRemove_5() throws Exception {
+ MAX_PER_PAGE = 5;
+
+ findOneBoundedConcurrentPutRemove();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testFindOneBoundedConcurrentPutRemove_10() throws Exception {
+ MAX_PER_PAGE = 10;
+
+ findOneBoundedConcurrentPutRemove();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void findOneBoundedConcurrentPutRemove() throws Exception {
+ final TestTree tree = createTestTree(true);
+
+ final int KEYS = 10_000;
+
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ for (int i = 0; i < 10; i++) {
+ for (long idx = 0L; idx < KEYS; ++idx)
+ tree.put(idx);
+
+ final Long findKey;
+
+ if (MAX_PER_PAGE > 0) {
+ switch (i) {
+ case 0:
+ findKey = 1L;
+
+ break;
+
+ case 1:
+ findKey = (long)MAX_PER_PAGE;
+
+ break;
+
+ case 2:
+ findKey = (long)MAX_PER_PAGE - 1;
+
+ break;
+
+ case 3:
+ findKey = (long)MAX_PER_PAGE + 1;
+
+ break;
+
+ case 4:
+ findKey = (long)(KEYS / MAX_PER_PAGE / 2) * MAX_PER_PAGE;
+
+ break;
+
+ case 5:
+ findKey = (long)(KEYS / MAX_PER_PAGE / 2) * MAX_PER_PAGE - 1;
+
+ break;
+
+ case 6:
+ findKey = (long)(KEYS / MAX_PER_PAGE / 2) * MAX_PER_PAGE + 1;
+
+ break;
+
+ case 7:
+ findKey = (long)KEYS - 1;
+
+ break;
+
+ default:
+ findKey = rnd.nextLong(KEYS);
+ }
+ }
+ else
+ findKey = rnd.nextLong(KEYS);
+
+ info("Iteration [iter=" + i + ", key=" + findKey + ']');
+
+ assertEquals(findKey, tree.findOne(findKey));
+ assertEquals(findKey, tree.findOneBounded(findKey, findKey, null, null));
+
+ IgniteInternalFuture getFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ TestRowPredicate p = new TestRowPredicate(findKey);
+
+ TestRowPredicate falseP = new TestRowPredicate(-1L);
+
+ int cnt = 0;
+
+ while (!stop.get()) {
+ int shift = MAX_PER_PAGE > 0 ? rnd.nextInt(MAX_PER_PAGE * 2) : rnd.nextInt(100);
+
+ assertEquals(findKey, tree.findOneBounded(findKey, findKey, null, null));
+
+ assertEquals(findKey,
+ tree.findOneBounded(findKey - shift, findKey, p, null));
+
+ assertEquals(findKey,
+ tree.findOneBounded(findKey - shift, findKey + shift, p, null));
+
+ assertEquals(findKey,
+ tree.findOneBounded(findKey, findKey + shift, p, null));
+
+ assertNull(tree.findOneBounded(-100L, KEYS + 100L, falseP, null));
+
+ cnt++;
+ }
+
+ info("Done, read count: " + cnt);
+
+ return null;
+ }
+ }, 10, "find");
+
+ asyncRunFut = new GridCompoundFuture<>();
+
+ asyncRunFut.add(getFut);
+
+ asyncRunFut.markInitialized();
+
+ try {
+ U.sleep(100);
+
+ for (int j = 0; j < 20; j++) {
+ for (long idx = 0L; idx < KEYS / 2; ++idx) {
+ long toRmv = rnd.nextLong(KEYS);
+
+ if (toRmv != findKey)
+ tree.remove(toRmv);
+ }
+
+ for (long idx = 0L; idx < KEYS / 2; ++idx) {
+ long put = rnd.nextLong(KEYS);
+
+ tree.put(put);
+ }
+ }
+ }
+ finally {
+ stop.set(true);
+ }
+
+ asyncRunFut.get();
+
+ stop.set(false);
+ }
+ }
+
+ /**
*
*/
public void testConcurrentGrowDegenerateTreeAndConcurrentRemove() throws Exception {
@@ -1449,6 +1649,13 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
last = c.get();
}
+
+ last = tree.findOneBounded((long)low, (long)high, null, null);
+
+ if (last != null) {
+ assertTrue(low + " <= " + last + " <= " + high, last >= low);
+ assertTrue(low + " <= " + last + " <= " + high, last <= high);
+ }
}
return null;
@@ -1853,4 +2060,27 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
return PageUtils.getLong(pageAddr, offset(idx));
}
}
+
+ /**
+ *
+ */
+ static class TestRowPredicate implements TestTree.RowPredicate<Long, Long> {
+ /** */
+ private final Long expVal;
+
+ /**
+ * @param expVal Expected value.
+ */
+ TestRowPredicate(Long expVal) {
+ this.expVal = expVal;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(BPlusTree<Long, Long> tree, BPlusIO<Long> io, long pageAddr, int idx)
+ throws IgniteCheckedException {
+ Long row = io.getLookupRow(tree, pageAddr, idx);
+
+ return row.equals(expVal);
+ }
+ }
}
[14/17] ignite git commit: ignite-5932
Posted by sb...@apache.org.
ignite-5932
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/224f2449
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/224f2449
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/224f2449
Branch: refs/heads/ignite-5932
Commit: 224f244938a325e4fa9f7ab3b5187ee832d0d2fb
Parents: bb969db
Author: sboikov <sb...@gridgain.com>
Authored: Tue Oct 10 16:08:04 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Oct 10 16:08:04 2017 +0300
----------------------------------------------------------------------
.../cache/mvcc/CacheMvccTransactionsTest.java | 48 ++++++++++++++++++--
1 file changed, 43 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/224f2449/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
index 1abc116..13f8334 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
@@ -82,14 +82,18 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
/**
* TODO IGNITE-3478: extend tests to use single/mutiple nodes, all tx types.
@@ -182,7 +186,24 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
* @throws Exception If failed.
*/
public void testPessimisticTx1() throws Exception {
- checkPessimisticTx(new CI1<IgniteCache<Integer, Integer>>() {
+ checkTx1(PESSIMISTIC, REPEATABLE_READ);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testSerializableTx1() throws Exception {
+ checkTx1(OPTIMISTIC, SERIALIZABLE);
+ }
+
+ /**
+ * @param concurrency Transaction concurrency.
+ * @param isolation Transaction isolation.
+ * @throws Exception If failed.
+ */
+ private void checkTx1(final TransactionConcurrency concurrency, final TransactionIsolation isolation)
+ throws Exception {
+ checkTxWithAllCaches(new CI1<IgniteCache<Integer, Integer>>() {
@Override public void apply(IgniteCache<Integer, Integer> cache) {
try {
IgniteTransactions txs = cache.unwrap(Ignite.class).transactions();
@@ -192,7 +213,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
for (Integer key : keys) {
log.info("Test key: " + key);
- try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ try (Transaction tx = txs.txStart(concurrency, isolation)) {
Integer val = cache.get(key);
assertNull(val);
@@ -222,7 +243,24 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
* @throws Exception If failed.
*/
public void testPessimisticTx2() throws Exception {
- checkPessimisticTx(new CI1<IgniteCache<Integer, Integer>>() {
+ checkTx2(PESSIMISTIC, REPEATABLE_READ);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testSerializableTx2() throws Exception {
+ checkTx2(OPTIMISTIC, SERIALIZABLE);
+ }
+
+ /**
+ * @param concurrency Transaction concurrency.
+ * @param isolation Transaction isolation.
+ * @throws Exception If failed.
+ */
+ private void checkTx2(final TransactionConcurrency concurrency, final TransactionIsolation isolation)
+ throws Exception {
+ checkTxWithAllCaches(new CI1<IgniteCache<Integer, Integer>>() {
@Override public void apply(IgniteCache<Integer, Integer> cache) {
try {
IgniteTransactions txs = cache.unwrap(Ignite.class).transactions();
@@ -232,7 +270,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
for (Integer key : keys) {
log.info("Test key: " + key);
- try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ try (Transaction tx = txs.txStart(concurrency, isolation)) {
cache.put(key, key);
cache.put(key + 1, key + 1);
@@ -257,7 +295,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
* @param c Closure to run.
* @throws Exception If failed.
*/
- private void checkPessimisticTx(IgniteInClosure<IgniteCache<Integer, Integer>> c) throws Exception {
+ private void checkTxWithAllCaches(IgniteInClosure<IgniteCache<Integer, Integer>> c) throws Exception {
startGridsMultiThreaded(SRVS);
try {
[05/17] ignite git commit: ignite-5937
Posted by sb...@apache.org.
ignite-5937
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f8be46d8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f8be46d8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f8be46d8
Branch: refs/heads/ignite-5932
Commit: f8be46d80875890f95970af34e55a418c7892434
Parents: e6940bd
Author: sboikov <sb...@gridgain.com>
Authored: Wed Oct 4 11:44:31 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Oct 4 11:44:31 2017 +0300
----------------------------------------------------------------------
.../cache/persistence/tree/BPlusTree.java | 62 ++++++++++++--------
1 file changed, 36 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/f8be46d8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
index 05adb41..b6c5c96 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
@@ -973,6 +973,35 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
}
}
+ /**
+ * @param lower Lower bound inclusive.
+ * @param upper Upper bound inclusive.
+ * @param x Implementation specific argument, {@code null} always means that we need to return full detached data row.
+ * @return First found item which meets bounds and pass predicate.
+ * @throws IgniteCheckedException If failed.
+ */
+ public T findOneBounded(L lower, L upper, RowPredicate<L, T> p, Object x) throws IgniteCheckedException {
+ checkDestroyed();
+
+ try {
+ FindOneCursor cursor = new FindOneCursor(lower, upper, p, x);
+
+ return cursor.findOne();
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteCheckedException("Runtime failure on bounds: [lower=" + lower + ", upper=" + upper + "]", e);
+ }
+ catch (RuntimeException e) {
+ throw new IgniteException("Runtime failure on bounds: [lower=" + lower + ", upper=" + upper + "]", e);
+ }
+ catch (AssertionError e) {
+ throw new AssertionError("Assertion error on bounds: [lower=" + lower + ", upper=" + upper + "]", e);
+ }
+ finally {
+ checkDestroyed();
+ }
+ }
+
/** {@inheritDoc} */
@Override public T findFirst() throws IgniteCheckedException {
checkDestroyed();
@@ -4385,32 +4414,6 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
*/
protected abstract T getRow(BPlusIO<L> io, long pageAddr, int idx, Object x) throws IgniteCheckedException;
- public interface RowPredicate<L, T extends L> {
- public boolean apply(BPlusTree<L, T> tree, BPlusIO<L> io, long pageAddr, int idx) throws IgniteCheckedException;
- }
-
- public T findOneBounded(L lower, L upper, RowPredicate<L, T> p, Object x) throws IgniteCheckedException {
- checkDestroyed();
-
- try {
- FindOneCursor cursor = new FindOneCursor(lower, upper, p, x);
-
- return cursor.findOne();
- }
- catch (IgniteCheckedException e) {
- throw new IgniteCheckedException("Runtime failure on bounds: [lower=" + lower + ", upper=" + upper + "]", e);
- }
- catch (RuntimeException e) {
- throw new IgniteException("Runtime failure on bounds: [lower=" + lower + ", upper=" + upper + "]", e);
- }
- catch (AssertionError e) {
- throw new AssertionError("Assertion error on bounds: [lower=" + lower + ", upper=" + upper + "]", e);
- }
- finally {
- checkDestroyed();
- }
- }
-
/**
*
*/
@@ -4996,4 +4999,11 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
/** */
DONE
}
+
+ /**
+ *
+ */
+ public interface RowPredicate<L, T extends L> {
+ public boolean apply(BPlusTree<L, T> tree, BPlusIO<L> io, long pageAddr, int idx) throws IgniteCheckedException;
+ }
}
[12/17] ignite git commit: Merge remote-tracking branch
'remotes/origin/ignite-3478' into ignite-5937
Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-3478' into ignite-5937
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/de3ed0d3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/de3ed0d3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/de3ed0d3
Branch: refs/heads/ignite-5932
Commit: de3ed0d3cb8b1474cc520f1001f9f378e5070699
Parents: 21bc633 61b46c4
Author: sboikov <sb...@gridgain.com>
Authored: Tue Oct 10 14:43:11 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Oct 10 14:43:11 2017 +0300
----------------------------------------------------------------------
.../org/apache/ignite/cache/CacheMetrics.java | 20 +
.../internal/jdbc/thin/JdbcThinTcpIo.java | 20 +-
.../cache/CacheAffinitySharedManager.java | 15 +-
.../cache/CacheClusterMetricsMXBeanImpl.java | 10 +
.../cache/CacheLocalMetricsMXBeanImpl.java | 10 +
.../processors/cache/CacheMetricsImpl.java | 14 +-
.../processors/cache/CacheMetricsSnapshot.java | 10 +
.../processors/cache/ClusterCachesInfo.java | 1 +
.../distributed/dht/GridDhtLocalPartition.java | 59 --
.../dht/preloader/GridDhtPartitionDemander.java | 9 -
.../processors/cluster/ClusterProcessor.java | 39 +-
.../processors/cluster/GridUpdateNotifier.java | 224 +++----
.../cluster/HttpIgniteUpdatesChecker.java | 29 +-
.../platform/client/ClientIntResponse.java | 46 ++
.../platform/client/ClientLongResponse.java | 46 ++
.../platform/client/ClientMessageParser.java | 133 ++++
.../cache/ClientCacheClearKeyRequest.java | 44 ++
.../cache/ClientCacheClearKeysRequest.java | 44 ++
.../client/cache/ClientCacheClearRequest.java | 44 ++
.../cache/ClientCacheContainsKeyRequest.java | 45 ++
.../cache/ClientCacheContainsKeysRequest.java | 45 ++
.../client/cache/ClientCacheGetAllRequest.java | 46 ++
.../client/cache/ClientCacheGetAllResponse.java | 57 ++
.../ClientCacheGetAndPutIfAbsentRequest.java | 45 ++
.../cache/ClientCacheGetAndPutRequest.java | 45 ++
.../cache/ClientCacheGetAndRemoveRequest.java | 45 ++
.../cache/ClientCacheGetAndReplaceRequest.java | 45 ++
.../client/cache/ClientCacheGetRequest.java | 9 +-
.../client/cache/ClientCacheGetSizeRequest.java | 57 ++
.../client/cache/ClientCacheKeyRequest.java | 48 ++
.../cache/ClientCacheKeyValueRequest.java | 48 ++
.../client/cache/ClientCacheKeysRequest.java | 68 +++
.../client/cache/ClientCachePutAllRequest.java | 57 ++
.../cache/ClientCachePutIfAbsentRequest.java | 45 ++
.../client/cache/ClientCachePutRequest.java | 13 +-
.../cache/ClientCacheRemoveAllRequest.java | 44 ++
.../cache/ClientCacheRemoveIfEqualsRequest.java | 45 ++
.../cache/ClientCacheRemoveKeyRequest.java | 45 ++
.../cache/ClientCacheRemoveKeysRequest.java | 44 ++
.../ClientCacheReplaceIfEqualsRequest.java | 50 ++
.../client/cache/ClientCacheReplaceRequest.java | 45 ++
.../client/cache/ClientCacheRequest.java | 2 +-
.../processors/query/GridQueryProcessor.java | 19 +
.../internal/visor/query/VisorQueryTask.java | 15 +-
.../cache/CacheGroupsMetricsRebalanceTest.java | 6 +-
.../CacheMetricsForClusterGroupSelfTest.java | 119 ++--
...heapCacheMetricsForClusterGroupSelfTest.java | 19 +-
.../cluster/GridUpdateNotifierSelfTest.java | 50 +-
.../platform/PlatformCacheWriteMetricsTask.java | 10 +
.../cache/index/H2DynamicTableSelfTest.java | 23 +
.../IgnitePersistentStoreSchemaLoadTest.java | 2 +
.../Client/Cache/CacheTest.cs | 611 ++++++++++++++++++-
.../Client/Cache/CacheTestNoMeta.cs | 4 +-
.../Client/ClientTestBase.cs | 9 +
.../Client/Cache/ICacheClient.cs | 155 +++++
.../Client/IgniteClientException.cs | 8 +
.../Apache.Ignite.Core/Impl/Cache/CacheImpl.cs | 24 +-
.../Impl/Client/Cache/CacheClient.cs | 260 +++++++-
.../Apache.Ignite.Core/Impl/Client/ClientOp.cs | 21 +-
.../Apache.Ignite.Core/Impl/IgniteUtils.cs | 21 +
modules/web-console/backend/app/mongo.js | 1 +
.../frontend/app/helpers/jade/mixins.pug | 9 +-
.../generator/ConfigurationGenerator.js | 6 +-
.../frontend/app/modules/sql/sql.controller.js | 14 +-
.../states/configuration/caches/store.pug | 4 +-
.../configuration/clusters/attributes.pug | 4 +-
.../clusters/collision/job-stealing.pug | 4 +-
.../states/configuration/domains/general.pug | 2 +-
.../states/configuration/domains/query.pug | 8 +-
.../frontend/app/services/JavaTypes.service.js | 15 +
.../app/services/LegacyUtils.service.js | 16 +-
.../frontend/controllers/domains-controller.js | 7 +-
.../frontend/public/stylesheets/style.scss | 8 +
.../web-console/frontend/views/sql/sql.tpl.pug | 10 +
74 files changed, 2867 insertions(+), 427 deletions(-)
----------------------------------------------------------------------
[11/17] ignite git commit: ignite-5937
Posted by sb...@apache.org.
ignite-5937
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/21bc6338
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/21bc6338
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/21bc6338
Branch: refs/heads/ignite-5932
Commit: 21bc6338da4e9d794e624b0c84ae5a5615f86d53
Parents: c1b2c03
Author: sboikov <sb...@gridgain.com>
Authored: Tue Oct 10 13:31:40 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Oct 10 14:41:34 2017 +0300
----------------------------------------------------------------------
.../cache/IgniteCacheOffheapManagerImpl.java | 141 ++++++-----
.../dht/preloader/GridDhtPartitionSupplier.java | 24 +-
.../GridDhtPartitionSupplyMessage.java | 4 +-
.../cache/mvcc/CacheCoordinatorsProcessor.java | 27 +-
.../cache/mvcc/MvccCoordinatorVersion.java | 2 +-
.../cache/persistence/CacheDataRow.java | 5 +
.../cache/persistence/CacheDataRowAdapter.java | 5 +
.../cache/persistence/CacheSearchRow.java | 6 +
.../persistence/GridCacheOffheapManager.java | 5 +
.../processors/cache/persistence/RowStore.java | 2 +
.../persistence/freelist/FreeListImpl.java | 11 +-
.../cache/persistence/tree/io/DataPageIO.java | 22 +-
.../cache/tree/AbstractDataInnerIO.java | 10 +-
.../cache/tree/AbstractDataLeafIO.java | 10 +-
.../cache/tree/CacheDataRowStore.java | 18 +-
.../processors/cache/tree/CacheDataTree.java | 8 +-
.../internal/processors/cache/tree/DataRow.java | 7 +
.../processors/cache/tree/MvccDataRow.java | 32 ++-
.../cache/tree/MvccKeyMaxVersionBound.java | 9 +-
.../processors/cache/tree/MvccRemoveRow.java | 57 +++++
.../processors/cache/tree/MvccUpdateRow.java | 67 +++--
.../cache/tree/MvccVersionBasedSearchRow.java | 16 +-
.../cache/mvcc/CacheMvccTransactionsTest.java | 245 ++++++++++++++++++-
.../database/FreeListImplSelfTest.java | 5 +
.../processors/query/h2/opt/GridH2Row.java | 5 +
25 files changed, 604 insertions(+), 139 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 80d36c1..380ec94 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -39,6 +39,7 @@ import org.apache.ignite.internal.pagemem.FullPageId;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
import org.apache.ignite.internal.processors.cache.mvcc.MvccCounter;
import org.apache.ignite.internal.processors.cache.mvcc.MvccLongList;
@@ -54,9 +55,9 @@ import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
import org.apache.ignite.internal.processors.cache.tree.CacheDataRowStore;
import org.apache.ignite.internal.processors.cache.tree.CacheDataTree;
import org.apache.ignite.internal.processors.cache.tree.DataRow;
-import org.apache.ignite.internal.processors.cache.tree.MvccDataRow;
import org.apache.ignite.internal.processors.cache.tree.MvccKeyMaxVersionBound;
import org.apache.ignite.internal.processors.cache.tree.MvccKeyMinVersionBound;
+import org.apache.ignite.internal.processors.cache.tree.MvccRemoveRow;
import org.apache.ignite.internal.processors.cache.tree.MvccSearchRow;
import org.apache.ignite.internal.processors.cache.tree.MvccUpdateRow;
import org.apache.ignite.internal.processors.cache.tree.MvccVersionBasedSearchRow;
@@ -85,6 +86,8 @@ import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_IDX;
import static org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.unmaskCoordinatorVersion;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.versionForRemovedValue;
/**
*
@@ -1374,60 +1377,43 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
// Make sure value bytes initialized.
key.valueBytes(coCtx);
- MvccUpdateRow updateRow = new MvccUpdateRow(
+ MvccRemoveRow updateRow = new MvccRemoveRow(
key,
- null,
- null,
mvccVer,
partId,
cacheId);
- rowStore.addRow(updateRow);
-
- assert updateRow.link() != 0 : updateRow;
-
if (grp.sharedGroup() && updateRow.cacheId() == CU.UNDEFINED_CACHE_ID)
updateRow.cacheId(cctx.cacheId());
- GridLongList waitTxs = null;
+ dataTree.iterate(updateRow, new MvccKeyMinVersionBound(cacheId, key), updateRow);
- if (mvccVer.initialLoad()) {
- boolean old = dataTree.putx(updateRow);
+ MvccUpdateRow.UpdateResult res = updateRow.updateResult();
- assert !old;
+ if (res == MvccUpdateRow.UpdateResult.VERSION_FOUND) {
+ assert !primary : updateRow;
- incrementSize(cctx.cacheId());
+ cleanup(updateRow.cleanupRows(), false);
}
else {
- dataTree.iterate(updateRow, new MvccKeyMinVersionBound(cacheId, key), updateRow);
-
- boolean old = dataTree.putx(updateRow);
-
- assert !old;
+ if (res == MvccUpdateRow.UpdateResult.PREV_NOT_NULL)
+ decrementSize(cacheId);
- if (!updateRow.previousNotNull())
- incrementSize(cctx.cacheId());
+ CacheSearchRow rmvRow = cleanup(updateRow.cleanupRows(), true);
- waitTxs = updateRow.activeTransactions();
-
- List<CacheSearchRow> cleanupRows = updateRow.cleanupRows();
-
- if (cleanupRows != null) {
- for (int i = 0; i < cleanupRows.size(); i++) {
- CacheSearchRow oldRow = cleanupRows.get(i);
-
- assert oldRow.link() != 0L : oldRow;
+ if (rmvRow == null)
+ rowStore.addRow(updateRow);
+ else
+ updateRow.link(rmvRow.link());
- boolean rmvd = dataTree.removex(oldRow);
+ assert updateRow.link() != 0L;
- assert rmvd;
+ boolean old = dataTree.putx(updateRow);
- rowStore.removeRow(oldRow.link());
- }
- }
+ assert !old;
}
- return waitTxs;
+ return updateRow.activeTransactions();
}
finally {
busyLock.leaveBusy();
@@ -1464,16 +1450,14 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
partId,
cacheId);
- rowStore.addRow(updateRow);
-
- assert updateRow.link() != 0 : updateRow;
-
if (grp.sharedGroup() && updateRow.cacheId() == CU.UNDEFINED_CACHE_ID)
updateRow.cacheId(cctx.cacheId());
GridLongList waitTxs = null;
if (mvccVer.initialLoad()) {
+ rowStore.addRow(updateRow);
+
boolean old = dataTree.putx(updateRow);
assert !old;
@@ -1483,30 +1467,25 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
else {
dataTree.iterate(updateRow, new MvccKeyMinVersionBound(cacheId, key), updateRow);
- boolean old = dataTree.putx(updateRow);
-
- assert !old;
-
- if (!updateRow.previousNotNull())
- incrementSize(cctx.cacheId());
-
- waitTxs = updateRow.activeTransactions();
+ MvccUpdateRow.UpdateResult res = updateRow.updateResult();
- List<CacheSearchRow> cleanupRows = updateRow.cleanupRows();
+ if (res == MvccUpdateRow.UpdateResult.VERSION_FOUND) {
+ assert !primary : updateRow;
+ }
+ else {
+ rowStore.addRow(updateRow);
- if (cleanupRows != null) {
- for (int i = 0; i < cleanupRows.size(); i++) {
- CacheSearchRow oldRow = cleanupRows.get(i);
+ boolean old = dataTree.putx(updateRow);
- assert oldRow.link() != 0L : oldRow;
+ assert !old;
- boolean rmvd = dataTree.removex(oldRow);
+ if (res == MvccUpdateRow.UpdateResult.PREV_NULL)
+ incrementSize(cctx.cacheId());
+ }
- assert rmvd;
+ cleanup(updateRow.cleanupRows(), false);
- rowStore.removeRow(oldRow.link());
- }
- }
+ waitTxs = updateRow.activeTransactions();
}
return waitTxs;
@@ -1516,6 +1495,39 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
}
}
+ /**
+ * @param cleanupRows Rows to cleanup.
+ * @param findRmv {@code True} if need keep removed row entry.
+ * @return Removed row entry if found.
+ * @throws IgniteCheckedException If failed.
+ */
+ @Nullable private CacheSearchRow cleanup(@Nullable List<CacheSearchRow> cleanupRows, boolean findRmv)
+ throws IgniteCheckedException {
+ CacheSearchRow rmvRow = null;
+
+ if (cleanupRows != null) {
+ for (int i = 0; i < cleanupRows.size(); i++) {
+ CacheSearchRow oldRow = cleanupRows.get(i);
+
+ assert oldRow.link() != 0L : oldRow;
+
+ boolean rmvd = dataTree.removex(oldRow);
+
+ assert rmvd;
+
+ if (findRmv &&
+ rmvRow == null &&
+ versionForRemovedValue(oldRow.mvccCoordinatorVersion())) {
+ rmvRow = oldRow;
+ }
+ else
+ rowStore.removeRow(oldRow.link());
+ }
+ }
+
+ return rmvRow;
+ }
+
/** {@inheritDoc} */
@Override public void update(GridCacheContext cctx,
KeyCacheObject key,
@@ -1832,18 +1844,27 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
while (cur.next()) {
CacheDataRow row = cur.get();
- if (row.mvccCoordinatorVersion() > ver.coordinatorVersion()
- || row.mvccCounter() > ver.counter())
+ long rowCrdVerMasked = row.mvccCoordinatorVersion();
+
+ long rowCrdVer = unmaskCoordinatorVersion(rowCrdVerMasked);
+
+ if (rowCrdVer > ver.coordinatorVersion() || row.mvccCounter() > ver.counter())
continue;
MvccLongList txs = ver.activeTransactions();
- if (txs != null && row.mvccCoordinatorVersion() == ver.coordinatorVersion() && txs.contains(row.mvccCounter()))
+ if (txs != null && rowCrdVer == ver.coordinatorVersion() && txs.contains(row.mvccCounter()))
continue;
if (curKey != null && row.key().equals(curKey))
continue;
+ if (CacheCoordinatorsProcessor.versionForRemovedValue(rowCrdVerMasked)) {
+ curKey = row.key();
+
+ continue;
+ }
+
curRow = row;
break;
http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index 0905917..357fef8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -31,9 +31,9 @@ import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
import org.apache.ignite.internal.processors.cache.GridCacheMvccEntryInfo;
import org.apache.ignite.internal.processors.cache.IgniteRebalanceIterator;
-import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.util.lang.GridCloseableIterator;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.T3;
@@ -43,6 +43,7 @@ import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.spi.IgniteSpiException;
import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.versionForRemovedValue;
/**
* Thread pool for supplying partitions to demanding nodes.
@@ -375,13 +376,24 @@ class GridDhtPartitionSupplier {
GridCacheEntryInfo info = grp.mvccEnabled() ?
new GridCacheMvccEntryInfo() : new GridCacheEntryInfo();
+
info.key(row.key());
- info.expireTime(row.expireTime());
- info.version(row.version());
- info.value(row.value());
info.cacheId(row.cacheId());
- info.mvccCoordinatorVersion(row.mvccCoordinatorVersion());
- info.mvccCounter(row.mvccCounter());
+
+ boolean rmvd = false;
+
+ if (grp.mvccEnabled()) {
+ info.mvccCoordinatorVersion(row.mvccCoordinatorVersion());
+ info.mvccCounter(row.mvccCounter());
+
+ rmvd = versionForRemovedValue(row.mvccCoordinatorVersion());
+ }
+
+ if (!rmvd) {
+ info.value(row.value());
+ info.version(row.version());
+ info.expireTime(row.expireTime());
+ }
if (preloadPred == null || preloadPred.apply(info))
s.addEntry0(part, info, grp.shared(), grp.cacheObjectContext());
http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
index 90d11f5..6675f8a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
@@ -42,6 +42,8 @@ import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemTy
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.versionForRemovedValue;
+
/**
* Partition supply message.
*/
@@ -217,7 +219,7 @@ public class GridDhtPartitionSupplyMessage extends GridCacheGroupIdMessage imple
void addEntry0(int p, GridCacheEntryInfo info, GridCacheSharedContext ctx, CacheObjectContext cacheObjCtx) throws IgniteCheckedException {
assert info != null;
assert info.key() != null : info;
- assert info.value() != null : info;
+ assert info.value() != null || versionForRemovedValue(info.coordinatorVersion()): info;
// Need to call this method to initialize info properly.
marshalInfo(info, ctx, cacheObjCtx);
http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
index 54fb3c8..9f9a7a3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
@@ -146,21 +146,30 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
super(ctx);
}
- public static int compareCoordinatorVersions(long crdVer1, long crdVer2) {
- crdVer1 = CRD_VER_MASK & crdVer1;
- crdVer2 = CRD_VER_MASK & crdVer2;
-
- return Long.compare(crdVer1, crdVer2);
- }
-
- public long createVersionForRemovedValue(long crdVer) {
+ /**
+ * @param crdVer Coordinator version.
+ * @return Coordinator version with removed value flag.
+ */
+ public static long createVersionForRemovedValue(long crdVer) {
return crdVer | RMVD_VAL_VER_MASK;
}
- public boolean versionForRemovedValue(long crdVer) {
+ /**
+ * @param crdVer Coordinator version with flags.
+ * @return {@code True} if removed value flag is set.
+ */
+ public static boolean versionForRemovedValue(long crdVer) {
return (crdVer & RMVD_VAL_VER_MASK) != 0;
}
+ /**
+ * @param crdVer Coordinator version with flags.
+ * @return Coordinator version.
+ */
+ public static long unmaskCoordinatorVersion(long crdVer) {
+ return crdVer & CRD_VER_MASK;
+ }
+
/** {@inheritDoc} */
@Override public void start() throws IgniteCheckedException {
statCntrs = new StatCounter[7];
http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java
index a0fd5ee..4003b73 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java
@@ -44,7 +44,7 @@ public interface MvccCoordinatorVersion extends Message {
public long counter();
/**
- *
+ * @return {@code True} if version for initial load update.
*/
public boolean initialLoad();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRow.java
index 57aeaef..b76826f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRow.java
@@ -54,4 +54,9 @@ public interface CacheDataRow extends CacheSearchRow {
* @param key Key.
*/
public void key(KeyCacheObject key);
+
+ /**
+ * @return {@code True} if this is row for cache remove operation (used only with mvcc).
+ */
+ public boolean removed();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java
index 925431f..d0f2dab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java
@@ -582,6 +582,11 @@ public class CacheDataRowAdapter implements CacheDataRow {
return 0;
}
+ /** {@inheritDoc} */
+ @Override public boolean removed() {
+ return false;
+ }
+
/**
*
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheSearchRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheSearchRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheSearchRow.java
index 5bf53d8..efdc08f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheSearchRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheSearchRow.java
@@ -43,7 +43,13 @@ public interface CacheSearchRow {
*/
public int cacheId();
+ /**
+ * @return Mvcc coordinator version.
+ */
public long mvccCoordinatorVersion();
+ /**
+ * @return Mvcc counter.
+ */
public long mvccCounter();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index 1f52309..ee651c2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -837,6 +837,11 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
@Override public long mvccCoordinatorVersion() {
return 0; // TODO IGNITE-3478.
}
+
+ /** {@inheritDoc} */
+ @Override public boolean removed() {
+ return false; // TODO IGNITE-3478.
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java
index 9cc5c62..41d2c4b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java
@@ -82,6 +82,8 @@ public class RowStore {
try {
freeList.insertDataRow(row);
+
+ assert row.link() != 0L;
}
finally {
ctx.database().checkpointReadUnlock();
http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/FreeListImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/FreeListImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/FreeListImpl.java
index 3eb62ae..9bd27b1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/FreeListImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/FreeListImpl.java
@@ -590,12 +590,19 @@ public class FreeListImpl extends PagesList implements FreeList, ReuseList {
*/
public static int getRowSize(CacheDataRow row, boolean withCacheId) throws IgniteCheckedException {
KeyCacheObject key = row.key();
- CacheObject val = row.value();
int keyLen = key.valueBytesLength(null);
+
+ int len = keyLen + (withCacheId ? 4 : 0);
+
+ if (row.removed())
+ return len;
+
+ CacheObject val = row.value();
+
int valLen = val.valueBytesLength(null);
- return keyLen + valLen + CacheVersionIO.size(row.version(), false) + 8 + (withCacheId ? 4 : 0);
+ return len + valLen + CacheVersionIO.size(row.version(), false) + 8;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/DataPageIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/DataPageIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/DataPageIO.java
index 628ff38..da012e9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/DataPageIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/DataPageIO.java
@@ -1040,13 +1040,19 @@ public class DataPageIO extends PageIO {
final int payloadSize
) throws IgniteCheckedException {
final int keySize = row.key().valueBytesLength(null);
- final int valSize = row.value().valueBytesLength(null);
+
+ boolean rmvd = row.removed();
+
+ final int valSize = rmvd ? 0 : row.value().valueBytesLength(null);
int written = writeFragment(row, buf, rowOff, payloadSize, EntryPart.CACHE_ID, keySize, valSize);
written += writeFragment(row, buf, rowOff + written, payloadSize - written, EntryPart.KEY, keySize, valSize);
- written += writeFragment(row, buf, rowOff + written, payloadSize - written, EntryPart.EXPIRE_TIME, keySize, valSize);
- written += writeFragment(row, buf, rowOff + written, payloadSize - written, EntryPart.VALUE, keySize, valSize);
- written += writeFragment(row, buf, rowOff + written, payloadSize - written, EntryPart.VERSION, keySize, valSize);
+
+ if (!rmvd) {
+ written += writeFragment(row, buf, rowOff + written, payloadSize - written, EntryPart.EXPIRE_TIME, keySize, valSize);
+ written += writeFragment(row, buf, rowOff + written, payloadSize - written, EntryPart.VALUE, keySize, valSize);
+ written += writeFragment(row, buf, rowOff + written, payloadSize - written, EntryPart.VERSION, keySize, valSize);
+ }
assert written == payloadSize;
}
@@ -1414,9 +1420,15 @@ public class DataPageIO extends PageIO {
}
addr += row.key().putValue(addr);
+
+ if (row.removed())
+ return;
}
- else
+ else {
+ assert !row.removed() : row;
+
addr += (2 + cacheIdSize + row.key().valueBytesLength(null));
+ }
addr += row.value().putValue(addr);
http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java
index a07d012..fc82cbb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.processors.cache.tree;
import org.apache.ignite.internal.pagemem.PageUtils;
-import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter;
import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow;
import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
@@ -27,6 +26,9 @@ import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusInne
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.lang.IgniteInClosure;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.COUNTER_NA;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.unmaskCoordinatorVersion;
+
/**
*
*/
@@ -59,8 +61,8 @@ public abstract class AbstractDataInnerIO extends BPlusInnerIO<CacheSearchRow> i
}
if (storeMvccVersion()) {
- assert row.mvccCoordinatorVersion() > 0 : row;
- assert row.mvccCounter() != CacheCoordinatorsProcessor.COUNTER_NA : row;
+ assert unmaskCoordinatorVersion(row.mvccCoordinatorVersion()) > 0 : row;
+ assert row.mvccCounter() != COUNTER_NA : row;
PageUtils.putLong(pageAddr, off, row.mvccCoordinatorVersion());
off += 8;
@@ -123,7 +125,7 @@ public abstract class AbstractDataInnerIO extends BPlusInnerIO<CacheSearchRow> i
long mvcCntr = rowIo.getMvccCounter(srcPageAddr, srcIdx);
assert mvccTopVer > 0 : mvccTopVer;
- assert mvcCntr != CacheCoordinatorsProcessor.COUNTER_NA;
+ assert mvcCntr != COUNTER_NA;
PageUtils.putLong(dstPageAddr, off, mvccTopVer);
off += 8;
http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
index ef08bec..c956d22 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.processors.cache.tree;
import org.apache.ignite.internal.pagemem.PageUtils;
-import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter;
import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow;
import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
@@ -27,6 +26,9 @@ import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusLeaf
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.lang.IgniteInClosure;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.COUNTER_NA;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.unmaskCoordinatorVersion;
+
/**
*
*/
@@ -61,8 +63,8 @@ public abstract class AbstractDataLeafIO extends BPlusLeafIO<CacheSearchRow> imp
long mvccCrdVer = row.mvccCoordinatorVersion();
long mvccUpdateCntr = row.mvccCounter();
- assert mvccCrdVer > 0 : mvccCrdVer;
- assert mvccUpdateCntr != CacheCoordinatorsProcessor.COUNTER_NA;
+ assert unmaskCoordinatorVersion(mvccCrdVer) > 0 : mvccCrdVer;
+ assert mvccUpdateCntr != COUNTER_NA;
PageUtils.putLong(pageAddr, off, mvccCrdVer);
off += 8;
@@ -98,7 +100,7 @@ public abstract class AbstractDataLeafIO extends BPlusLeafIO<CacheSearchRow> imp
long mvccUpdateCntr = ((RowLinkIO)srcIo).getMvccCounter(srcPageAddr, srcIdx);
assert mvccUpdateTopVer >=0 : mvccUpdateCntr;
- assert mvccUpdateCntr != CacheCoordinatorsProcessor.COUNTER_NA;
+ assert mvccUpdateCntr != COUNTER_NA;
PageUtils.putLong(dstPageAddr, off, mvccUpdateTopVer);
off += 8;
http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataRowStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataRowStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataRowStore.java
index f9e1eb3..85624d5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataRowStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataRowStore.java
@@ -25,6 +25,8 @@ import org.apache.ignite.internal.processors.cache.persistence.RowStore;
import org.apache.ignite.internal.processors.cache.persistence.freelist.FreeList;
import org.apache.ignite.internal.util.typedef.internal.CU;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.versionForRemovedValue;
+
/**
*
*/
@@ -65,17 +67,25 @@ public class CacheDataRowStore extends RowStore {
* @param cacheId Cache ID.
* @param hash Hash code.
* @param link Link.
- * @param mvccTopVer
- * @param mvccCntr
+ * @param rowData Required row data.
+ * @param crdVer Mvcc coordinator version.
+ * @param mvccCntr Mvcc counter.
* @return Search row.
*/
- MvccDataRow mvccRow(int cacheId, int hash, long link, CacheDataRowAdapter.RowData rowData, long mvccTopVer, long mvccCntr) {
+ MvccDataRow mvccRow(int cacheId, int hash, long link, CacheDataRowAdapter.RowData rowData, long crdVer, long mvccCntr) {
+ if (rowData != CacheDataRowAdapter.RowData.KEY_ONLY && versionForRemovedValue(crdVer)) {
+ if (rowData == CacheDataRowAdapter.RowData.NO_KEY)
+ return MvccDataRow.removedRowNoKey(partId, cacheId, crdVer, mvccCntr);
+ else
+ rowData = CacheDataRowAdapter.RowData.KEY_ONLY;
+ }
+
MvccDataRow dataRow = new MvccDataRow(grp,
hash,
link,
partId,
rowData,
- mvccTopVer,
+ crdVer,
mvccCntr);
initDataRow(dataRow, cacheId);
http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
index eaeefee..6309153 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
@@ -36,6 +36,7 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
import static org.apache.ignite.internal.pagemem.PageIdUtils.itemId;
import static org.apache.ignite.internal.pagemem.PageIdUtils.pageId;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.unmaskCoordinatorVersion;
/**
*
@@ -160,7 +161,8 @@ public class CacheDataTree extends BPlusTree<CacheSearchRow, CacheDataRow> {
long mvccCrdVer = io.getMvccCoordinatorVersion(pageAddr, idx);
- cmp = Long.compare(row.mvccCoordinatorVersion(), mvccCrdVer);
+ cmp = Long.compare(unmaskCoordinatorVersion(row.mvccCoordinatorVersion()),
+ unmaskCoordinatorVersion(mvccCrdVer));
if (cmp != 0)
return cmp;
@@ -188,10 +190,10 @@ public class CacheDataTree extends BPlusTree<CacheSearchRow, CacheDataRow> {
CacheDataRowAdapter.RowData.FULL;
if (grp.mvccEnabled()) {
- long mvccTopVer = rowIo.getMvccCoordinatorVersion(pageAddr, idx);
+ long mvccCrdVer = rowIo.getMvccCoordinatorVersion(pageAddr, idx);
long mvccCntr = rowIo.getMvccCounter(pageAddr, idx);
- return rowStore.mvccRow(cacheId, hash, link, x, mvccTopVer, mvccCntr);
+ return rowStore.mvccRow(cacheId, hash, link, x, mvccCrdVer, mvccCntr);
}
else
return rowStore.dataRow(cacheId, hash, link, x);
http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataRow.java
index 29bbaaf..d1e90d4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataRow.java
@@ -81,6 +81,13 @@ public class DataRow extends CacheDataRowAdapter {
this.cacheId = cacheId;
}
+ /**
+ *
+ */
+ protected DataRow() {
+ super(0);
+ }
+
/** {@inheritDoc} */
@Override public int partition() {
return part;
http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java
index eb1ee10..916ea93 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java
@@ -18,12 +18,11 @@
package org.apache.ignite.internal.processors.cache.tree;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
-import org.apache.ignite.internal.processors.cache.CacheObject;
-import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.typedef.internal.S;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.unmaskCoordinatorVersion;
+
/**
*
*/
@@ -46,7 +45,7 @@ public class MvccDataRow extends DataRow {
MvccDataRow(CacheGroupContext grp, int hash, long link, int part, RowData rowData, long crdVer, long mvccCntr) {
super(grp, hash, link, part, rowData);
- assert crdVer > 0 : crdVer;
+ assert unmaskCoordinatorVersion(crdVer) > 0 : crdVer;
assert mvccCntr != CacheCoordinatorsProcessor.COUNTER_NA;
this.crdVer = crdVer;
@@ -54,25 +53,32 @@ public class MvccDataRow extends DataRow {
}
/**
- * @param key Key.
- * @param val Value.
- * @param ver Version.
+ *
+ */
+ private MvccDataRow() {
+ // No-op.
+ }
+
+ /**
* @param part Partition.
* @param cacheId Cache ID.
* @param crdVer Mvcc coordinator version.
* @param mvccCntr Mvcc counter.
+ * @return Row.
*/
- public MvccDataRow(KeyCacheObject key,
- CacheObject val,
- GridCacheVersion ver,
+ static MvccDataRow removedRowNoKey(
int part,
int cacheId,
long crdVer,
long mvccCntr) {
- super(key, val, ver, part, 0L, cacheId);
+ MvccDataRow row = new MvccDataRow();
- this.mvccCntr = mvccCntr;
- this.crdVer = crdVer;
+ row.cacheId = cacheId;
+ row.part = part;
+ row.crdVer = crdVer;
+ row.mvccCntr = mvccCntr;
+
+ return row;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccKeyMaxVersionBound.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccKeyMaxVersionBound.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccKeyMaxVersionBound.java
index aa9422d..007ac09 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccKeyMaxVersionBound.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccKeyMaxVersionBound.java
@@ -27,6 +27,8 @@ import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.jetbrains.annotations.Nullable;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.versionForRemovedValue;
+
/**
*
*/
@@ -55,7 +57,12 @@ public class MvccKeyMaxVersionBound extends SearchRow implements BPlusTree.TreeR
int idx)
throws IgniteCheckedException
{
- resRow = ((CacheDataTree)tree).getRow(io, pageAddr, idx, CacheDataRowAdapter.RowData.NO_KEY);
+ RowLinkIO rowIo = (RowLinkIO)io;
+
+ if (versionForRemovedValue(rowIo.getMvccCoordinatorVersion(pageAddr, idx)))
+ resRow = null;
+ else
+ resRow = ((CacheDataTree)tree).getRow(io, pageAddr, idx, CacheDataRowAdapter.RowData.NO_KEY);
return false; // Stop search.
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccRemoveRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccRemoveRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccRemoveRow.java
new file mode 100644
index 0000000..8fd8a6e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccRemoveRow.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.tree;
+
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ *
+ */
+public class MvccRemoveRow extends MvccUpdateRow {
+ /**
+ * @param key Key.
+ * @param mvccVer Mvcc version.
+ * @param part Partition.
+ * @param cacheId Cache ID.
+ */
+ public MvccRemoveRow(
+ KeyCacheObject key,
+ MvccCoordinatorVersion mvccVer,
+ int part,
+ int cacheId) {
+ super(key, null, null, mvccVer, part, cacheId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long mvccCoordinatorVersion() {
+ return CacheCoordinatorsProcessor.createVersionForRemovedValue(super.mvccCoordinatorVersion());
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean removed() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(MvccRemoveRow.class, this, "super", super.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java
index d3303e8..794661d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java
@@ -22,6 +22,7 @@ import java.util.List;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow;
@@ -32,12 +33,14 @@ import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.jetbrains.annotations.Nullable;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.unmaskCoordinatorVersion;
+
/**
*
*/
public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<CacheSearchRow, CacheDataRow> {
/** */
- private Boolean hasPrev;
+ private UpdateResult res;
/** */
private boolean canCleanup;
@@ -74,8 +77,8 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C
/**
* @return {@code True} if previous value was non-null.
*/
- public boolean previousNotNull() {
- return hasPrev != null && hasPrev;
+ public UpdateResult updateResult() {
+ return res == null ? UpdateResult.PREV_NULL : res;
}
/**
@@ -98,17 +101,18 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C
* @param idx Item index.
* @return Always {@code true}.
*/
- private boolean assertVersionGreater(RowLinkIO io, long pageAddr, int idx) {
- long rowCrdVer = io.getMvccCoordinatorVersion(pageAddr, idx);
+ private boolean assertVersion(RowLinkIO io, long pageAddr, int idx) {
+ long rowCrdVer = unmaskCoordinatorVersion(io.getMvccCoordinatorVersion(pageAddr, idx));
long rowCntr = io.getMvccCounter(pageAddr, idx);
- int cmp = Long.compare(mvccCoordinatorVersion(), rowCrdVer);
+ int cmp = Long.compare(mvccVer.coordinatorVersion(), rowCrdVer);
if (cmp == 0)
- cmp = Long.compare(mvccCounter(), rowCntr);
+ cmp = Long.compare(mvccVer.counter(), rowCntr);
- assert cmp > 0 : "[updCrd=" + mvccCoordinatorVersion() +
- ", updCntr=" + mvccCounter() +
+ // Can be equals if backup rebalanced value updated on primary.
+ assert cmp >= 0 : "[updCrd=" + mvccVer.coordinatorVersion() +
+ ", updCntr=" + mvccVer.counter() +
", rowCrd=" + rowCrdVer +
", rowCntr=" + rowCntr + ']';
@@ -124,15 +128,31 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C
{
RowLinkIO rowIo = (RowLinkIO)io;
- // All previous versions should be less then new one.
- assert assertVersionGreater(rowIo, pageAddr, idx);
+ // Assert version grows.
+ assert assertVersion(rowIo, pageAddr, idx);
boolean checkActive = mvccVer.activeTransactions().size() > 0;
boolean txActive = false;
+ long rowCrdVerMasked = rowIo.getMvccCoordinatorVersion(pageAddr, idx);
+ long rowCrdVer = unmaskCoordinatorVersion(rowCrdVerMasked);
+
+ if (res == null) {
+ int cmp = Long.compare(mvccVer.coordinatorVersion(), rowCrdVer);
+
+ if (cmp == 0)
+ cmp = Long.compare(mvccVer.coordinatorVersion(), rowIo.getMvccCounter(pageAddr, idx));
+
+ if (cmp == 0)
+ res = UpdateResult.VERSION_FOUND;
+ else
+ res = CacheCoordinatorsProcessor.versionForRemovedValue(rowCrdVerMasked) ?
+ UpdateResult.PREV_NULL : UpdateResult.PREV_NOT_NULL;
+ }
+
// Suppose transactions on previous coordinator versions are done.
- if (checkActive && mvccVer.coordinatorVersion() == rowIo.getMvccCoordinatorVersion(pageAddr, idx)) {
+ if (checkActive && mvccVer.coordinatorVersion() == rowCrdVer) {
long rowMvccCntr = rowIo.getMvccCounter(pageAddr, idx);
if (mvccVer.activeTransactions().contains(rowMvccCntr)) {
@@ -145,15 +165,12 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C
}
}
- if (hasPrev == null)
- hasPrev = Boolean.TRUE; // TODO IGNITE-3478 support removes.
-
if (!txActive) {
- assert Long.compare(mvccVer.coordinatorVersion(), rowIo.getMvccCoordinatorVersion(pageAddr, idx)) >= 0;
+ assert Long.compare(mvccVer.coordinatorVersion(), rowCrdVer) >= 0;
int cmp;
- if (mvccVer.coordinatorVersion() == rowIo.getMvccCoordinatorVersion(pageAddr, idx))
+ if (mvccVer.coordinatorVersion() == rowCrdVer)
cmp = Long.compare(mvccVer.cleanupVersion(), rowIo.getMvccCounter(pageAddr, idx));
else
cmp = 1;
@@ -163,10 +180,10 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C
if (canCleanup) {
CacheSearchRow row = io.getLookupRow(tree, pageAddr, idx);
- assert row.link() != 0 && row.mvccCoordinatorVersion() > 0 : row;
+ assert row.link() != 0 && row.mvccCounter() != CacheCoordinatorsProcessor.COUNTER_NA : row;
// Should not be possible to cleanup active tx.
- assert row.mvccCoordinatorVersion() != mvccVer.coordinatorVersion()
+ assert rowCrdVer != mvccVer.coordinatorVersion()
|| !mvccVer.activeTransactions().contains(row.mvccCounter());
if (cleanupRows == null)
@@ -196,4 +213,16 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C
@Override public String toString() {
return S.toString(MvccUpdateRow.class, this, "super", super.toString());
}
+
+ /**
+ *
+ */
+ public enum UpdateResult {
+ /** */
+ VERSION_FOUND,
+ /** */
+ PREV_NULL,
+ /** */
+ PREV_NOT_NULL
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccVersionBasedSearchRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccVersionBasedSearchRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccVersionBasedSearchRow.java
index c829afb..a1d0127 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccVersionBasedSearchRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccVersionBasedSearchRow.java
@@ -28,6 +28,9 @@ import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.jetbrains.annotations.Nullable;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.unmaskCoordinatorVersion;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.versionForRemovedValue;
+
/**
*
*/
@@ -66,16 +69,23 @@ public class MvccVersionBasedSearchRow extends SearchRow implements BPlusTree.Tr
{
boolean visible = true;
+ RowLinkIO rowIo = (RowLinkIO)io;
+
+ long crdVerMasked = rowIo.getMvccCoordinatorVersion(pageAddr, idx);
+
if (ver.activeTransactions().size() > 0) {
- RowLinkIO rowIo = (RowLinkIO)io;
+ long rowCrdVer = unmaskCoordinatorVersion(crdVerMasked);
// TODO IGNITE-3478 sort active transactions?
- if (rowIo.getMvccCoordinatorVersion(pageAddr, idx) == ver.coordinatorVersion())
+ if (rowCrdVer == ver.coordinatorVersion())
visible = !ver.activeTransactions().contains(rowIo.getMvccCounter(pageAddr, idx));
}
if (visible) {
- resRow = ((CacheDataTree) tree).getRow(io, pageAddr, idx, CacheDataRowAdapter.RowData.NO_KEY);
+ if (versionForRemovedValue(crdVerMasked))
+ resRow = null;
+ else
+ resRow = ((CacheDataTree)tree).getRow(io, pageAddr, idx, CacheDataRowAdapter.RowData.NO_KEY);
return false; // Stop search.
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
index 115e8a2..d45afe7 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
@@ -17,7 +17,9 @@
package org.apache.ignite.internal.processors.cache.mvcc;
+import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
@@ -45,6 +47,7 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.cluster.ClusterTopologyException;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.MemoryConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
@@ -119,6 +122,9 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
/** */
private String nodeAttr;
+ /** */
+ private static final int PAGE_SIZE = MemoryConfiguration.DFLT_PAGE_SIZE;
+
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
@@ -137,6 +143,12 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
if (nodeAttr != null)
cfg.setUserAttributes(F.asMap(nodeAttr, true));
+ MemoryConfiguration memCfg = new MemoryConfiguration();
+
+ memCfg.setPageSize(PAGE_SIZE);
+
+ cfg.setMemoryConfiguration(memCfg);
+
return cfg;
}
@@ -376,6 +388,109 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
/**
* @throws Exception If failed.
*/
+ public void testSimplePutRemove() throws Exception {
+ simplePutRemove(false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testSimplePutRemove_LargeKeys() throws Exception {
+ simplePutRemove(true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ * @param largeKeys {@code True} to use large keys (not fitting in single page).
+ */
+ private void simplePutRemove(boolean largeKeys) throws Exception {
+ Ignite node = startGrid(0);
+
+ IgniteTransactions txs = node.transactions();
+
+ final IgniteCache<Object, Object> cache = node.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, 1));
+
+ final int KEYS = 100;
+
+ checkValues(new HashMap<>(), cache);
+
+ try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ for (int k = 0; k < KEYS; k++)
+ cache.remove(testKey(largeKeys, k));
+
+ tx.commit();
+ }
+
+ checkValues(new HashMap<>(), cache);
+
+ Map<Object, Object> expVals = new HashMap<>();
+
+ try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ for (int k = 0; k < KEYS; k++) {
+ Object key = testKey(largeKeys, k);
+
+ expVals.put(key, k);
+
+ cache.put(key, k);
+ }
+
+ tx.commit();
+ }
+
+ checkValues(expVals, cache);
+
+ try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ for (int k = 0; k < KEYS; k++) {
+ if (k % 2 == 0) {
+ Object key = testKey(largeKeys, k);
+
+ cache.remove(key);
+
+ expVals.remove(key);
+ }
+ }
+
+ tx.commit();
+ }
+
+ checkValues(expVals, cache);
+ }
+
+ /**
+ * @param largeKeys {@code True} to use large keys (not fitting in single page).
+ * @param idx Index.
+ * @return Key instance.
+ */
+ private static Object testKey(boolean largeKeys, int idx) {
+ if (largeKeys) {
+ int payloadSize = PAGE_SIZE + ThreadLocalRandom.current().nextInt(PAGE_SIZE * 10);
+
+ return new TestKey(idx, payloadSize);
+ }
+ else
+ return idx;
+ }
+
+ /**
+ * @param expVals Expected values.
+ * @param cache Cache.
+ */
+ private void checkValues(Map<Object, Object> expVals, IgniteCache<Object, Object> cache) {
+ Map<Object, Object> res = cache.getAll(expVals.keySet());
+
+ assertEquals(expVals, res);
+
+ res = new HashMap<>();
+
+ for (IgniteCache.Entry<Object, Object> e : cache)
+ res.put(e.getKey(), e.getValue());
+
+ assertEquals(expVals, res);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testThreadUpdatesAreVisibleForThisThread() throws Exception {
final Ignite ignite = startGrid(0);
@@ -1601,7 +1716,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
/**
* @throws Exception If failed.
*/
- public void testRebalance1() throws Exception {
+ public void testSimpleRebalance() throws Exception {
Ignite srv0 = startGrid(0);
IgniteCache<Integer, Integer> cache = (IgniteCache)srv0.createCache(
@@ -1664,6 +1779,32 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
/**
* @throws Exception If failed.
*/
+ public void testSimpleRebalanceWithRemovedValues() throws Exception {
+ Ignite node = startGrid(0);
+
+ IgniteTransactions txs = node.transactions();
+
+ final IgniteCache<Object, Object> cache = node.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, 64));
+
+ final int KEYS = 100;
+
+ checkValues(new HashMap<>(), cache);
+
+ try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ for (int k = 0; k < KEYS; k++)
+ cache.remove(k);
+
+ tx.commit();
+ }
+
+ startGrid(1);
+
+ awaitPartitionMapExchange();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testCoordinatorFailurePessimisticTx() throws Exception {
testSpi = true;
@@ -2722,9 +2863,55 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
assertEquals(KEYS, cache.size());
}
- // TODO IGNITE-3478: test removes.
- }
+ int size = KEYS;
+
+ for (int i = 0; i < KEYS; i++) {
+ if (i % 2 == 0) {
+ final Integer key = i;
+
+ try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ cache.remove(key);
+ tx.commit();
+ }
+
+ size--;
+
+ assertEquals(size, cache.size());
+ }
+ }
+
+ // Check size does not change if remove already removed keys.
+ for (int i = 0; i < KEYS; i++) {
+ if (i % 2 == 0) {
+ final Integer key = i;
+
+ try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ cache.remove(key);
+
+ tx.commit();
+ }
+
+ assertEquals(size, cache.size());
+ }
+ }
+
+ for (int i = 0; i < KEYS; i++) {
+ if (i % 2 == 0) {
+ final Integer key = i;
+
+ try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ cache.put(key, i);
+
+ tx.commit();
+ }
+
+ size++;
+
+ assertEquals(size, cache.size());
+ }
+ }
+ }
/**
* @throws IgniteCheckedException If failed.
@@ -2792,7 +2979,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
key0,
vers.get(0).get1());
- MvccCoordinatorVersionResponse ver = version(crd.currentCoordinator().coordinatorVersion(), 100000);
+ MvccCoordinatorVersionResponse ver = version(vers.get(0).get2().coordinatorVersion(), 100000);
for (int v = 0; v < vers.size(); v++) {
MvccCounter cntr = vers.get(v).get2();
@@ -3074,4 +3261,54 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
return null;
}
}
+
+ /**
+ *
+ */
+ static class TestKey implements Serializable {
+ /** */
+ private final int key;
+
+ /** */
+ private final byte[] payload;
+
+ /**
+ * @param key Key.
+ * @param payloadSize Payload size.
+ */
+ public TestKey(int key, int payloadSize) {
+ this.key = key;
+ this.payload = new byte[payloadSize];
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ TestKey testKey = (TestKey)o;
+
+ if (key != testKey.key)
+ return false;
+
+ return Arrays.equals(payload, testKey.payload);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int res = key;
+
+ res = 31 * res + Arrays.hashCode(payload);
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return "TestKey [k=" + key + ", payloadLen=" + payload.length + ']';
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/core/src/test/java/org/apache/ignite/internal/processors/database/FreeListImplSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/FreeListImplSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/FreeListImplSelfTest.java
index d0897e1..600c8df 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/FreeListImplSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/FreeListImplSelfTest.java
@@ -435,6 +435,11 @@ public class FreeListImplSelfTest extends GridCommonAbstractTest {
@Override public long mvccCounter() {
return 0;
}
+
+ /** {@inheritDoc} */
+ @Override public boolean removed() {
+ return false;
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java
index 392301c..1819cfe 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java
@@ -174,4 +174,9 @@ public abstract class GridH2Row implements SearchRow, CacheDataRow, Row {
@Override public long mvccCounter() {
throw new UnsupportedOperationException();
}
+
+ /** {@inheritDoc} */
+ @Override public boolean removed() {
+ throw new UnsupportedOperationException();
+ }
}
\ No newline at end of file
[16/17] ignite git commit: Merge remote-tracking branch
'remotes/origin/ignite-3478' into ignite-5932
Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-3478' into ignite-5932
# Conflicts:
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
# modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ca82daa9
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ca82daa9
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ca82daa9
Branch: refs/heads/ignite-5932
Commit: ca82daa9d6f39a62dc432d120be6dece8ad40219
Parents: 5210767
Author: sboikov <sb...@gridgain.com>
Authored: Wed Oct 11 11:28:19 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Oct 11 11:28:19 2017 +0300
----------------------------------------------------------------------
.../cache/IgniteCacheOffheapManager.java | 22 --------------------
1 file changed, 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/ca82daa9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index e498b5e..2c070fc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -549,22 +549,6 @@ public interface IgniteCacheOffheapManager {
/**
* @param cctx Cache context.
- * @param key Key.
- * @param val Value.
- * @param ver Version.
- * @param mvccVer Mvcc version.
- * @return {@code True} if new value was inserted.
- * @throws IgniteCheckedException If failed.
- */
- boolean mvccInitialValue(
- GridCacheContext cctx,
- KeyCacheObject key,
- @Nullable CacheObject val,
- GridCacheVersion ver,
- MvccCoordinatorVersion mvccVer) throws IgniteCheckedException;
-
- /**
- * @param cctx Cache context.
* @param primary {@code True} if update is executed on primary node.
* @param key Key.
* @param val Value.
@@ -581,12 +565,6 @@ public interface IgniteCacheOffheapManager {
GridCacheVersion ver,
MvccCoordinatorVersion mvccVer) throws IgniteCheckedException;
- @Nullable GridLongList mvccRemove(
- GridCacheContext cctx,
- boolean primary,
- KeyCacheObject key,
- MvccCoordinatorVersion mvccVer) throws IgniteCheckedException;
-
/**
* @param cctx Cache context.
* @param primary {@code True} if update is executed on primary node.
[02/17] ignite git commit: ignite-3478
Posted by sb...@apache.org.
ignite-3478
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d4d2c380
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d4d2c380
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d4d2c380
Branch: refs/heads/ignite-5932
Commit: d4d2c3805e94f00587d8b412b288c24a7f7bc983
Parents: 44ad701
Author: sboikov <sb...@gridgain.com>
Authored: Wed Oct 4 11:20:57 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Oct 4 11:20:57 2017 +0300
----------------------------------------------------------------------
.../cache/IgniteCacheOffheapManagerImpl.java | 4 +-
.../cache/persistence/tree/BPlusTree.java | 2 +-
.../cache/tree/AbstractDataInnerIO.java | 8 +-
.../cache/tree/AbstractDataLeafIO.java | 8 +-
.../processors/cache/tree/CacheDataTree.java | 8 +-
.../cache/tree/CacheIdAwareDataInnerIO.java | 4 +-
.../cache/tree/CacheIdAwareDataLeafIO.java | 4 +-
.../processors/cache/tree/DataInnerIO.java | 4 +-
.../processors/cache/tree/DataLeafIO.java | 4 +-
.../processors/cache/tree/MvccDataInnerIO.java | 4 +-
.../processors/cache/tree/MvccDataLeafIO.java | 4 +-
.../cache/tree/MvccVersionBasedSearchRow.java | 4 +-
.../processors/cache/tree/RowLinkIO.java | 14 ++-
.../cache/mvcc/CacheMvccTransactionsTest.java | 90 +++++++++++++-------
14 files changed, 101 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4d2c380/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 76d9649..eef645d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -1648,7 +1648,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
CacheDataRow row;
if (grp.mvccEnabled()) {
- if (false) {
+ if (true) {
row = dataTree.findOneBounded(
new MvccSearchRow(cacheId, key, Long.MAX_VALUE, Long.MAX_VALUE),
new MvccSearchRow(cacheId, key, 1L, 1L),
@@ -1714,7 +1714,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
int cacheId = grp.sharedGroup() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID;
- if (false) {
+ if (true) {
MvccVersionBasedSearchRow lower = new MvccVersionBasedSearchRow(cacheId, key, ver);
CacheDataRow row = dataTree.findOneBounded(
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4d2c380/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
index d570f1e..5836340 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
@@ -4694,7 +4694,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
}
if (nextPageId != 0)
- lastRow = getRow(io, pageAddr, cnt - 1, x); // Need save last row.
+ lastRow = getRow(io, pageAddr, cnt - 1, null); // Need save last row.
return true;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4d2c380/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java
index 3fc0962..a07d012 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java
@@ -76,8 +76,8 @@ public abstract class AbstractDataInnerIO extends BPlusInnerIO<CacheSearchRow> i
long link = getLink(pageAddr, idx);
if (storeMvccVersion()) {
- long mvccTopVer = getMvccUpdateTopologyVersion(pageAddr, idx);
- long mvccCntr = getMvccUpdateCounter(pageAddr, idx);
+ long mvccTopVer = getMvccCoordinatorVersion(pageAddr, idx);
+ long mvccCntr = getMvccCounter(pageAddr, idx);
return ((CacheDataTree)tree).rowStore().mvccRow(cacheId,
hash,
@@ -119,8 +119,8 @@ public abstract class AbstractDataInnerIO extends BPlusInnerIO<CacheSearchRow> i
}
if (storeMvccVersion()) {
- long mvccTopVer = rowIo.getMvccUpdateTopologyVersion(srcPageAddr, srcIdx);
- long mvcCntr = rowIo.getMvccUpdateCounter(srcPageAddr, srcIdx);
+ long mvccTopVer = rowIo.getMvccCoordinatorVersion(srcPageAddr, srcIdx);
+ long mvcCntr = rowIo.getMvccCounter(srcPageAddr, srcIdx);
assert mvccTopVer > 0 : mvccTopVer;
assert mvcCntr != CacheCoordinatorsProcessor.COUNTER_NA;
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4d2c380/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
index a4eac3e..ef08bec 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
@@ -94,8 +94,8 @@ public abstract class AbstractDataLeafIO extends BPlusLeafIO<CacheSearchRow> imp
}
if (storeMvccVersion()) {
- long mvccUpdateTopVer = ((RowLinkIO)srcIo).getMvccUpdateTopologyVersion(srcPageAddr, srcIdx);
- long mvccUpdateCntr = ((RowLinkIO)srcIo).getMvccUpdateCounter(srcPageAddr, srcIdx);
+ long mvccUpdateTopVer = ((RowLinkIO)srcIo).getMvccCoordinatorVersion(srcPageAddr, srcIdx);
+ long mvccUpdateCntr = ((RowLinkIO)srcIo).getMvccCounter(srcPageAddr, srcIdx);
assert mvccUpdateTopVer >=0 : mvccUpdateCntr;
assert mvccUpdateCntr != CacheCoordinatorsProcessor.COUNTER_NA;
@@ -114,8 +114,8 @@ public abstract class AbstractDataLeafIO extends BPlusLeafIO<CacheSearchRow> imp
long link = getLink(pageAddr, idx);
if (storeMvccVersion()) {
- long mvccTopVer = getMvccUpdateTopologyVersion(pageAddr, idx);
- long mvccCntr = getMvccUpdateCounter(pageAddr, idx);
+ long mvccTopVer = getMvccCoordinatorVersion(pageAddr, idx);
+ long mvccCntr = getMvccCounter(pageAddr, idx);
return ((CacheDataTree)tree).rowStore().mvccRow(cacheId,
hash,
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4d2c380/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
index 767c996..a1bfc9b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
@@ -158,14 +158,14 @@ public class CacheDataTree extends BPlusTree<CacheSearchRow, CacheDataRow> {
if (cmp != 0 || !grp.mvccEnabled())
return 0;
- long mvccCrdVer = io.getMvccUpdateTopologyVersion(pageAddr, idx);
+ long mvccCrdVer = io.getMvccCoordinatorVersion(pageAddr, idx);
cmp = Long.compare(row.mvccCoordinatorVersion(), mvccCrdVer);
if (cmp != 0)
return cmp;
- long mvccCntr = io.getMvccUpdateCounter(pageAddr, idx);
+ long mvccCntr = io.getMvccCounter(pageAddr, idx);
assert row.mvccCounter() != CacheCoordinatorsProcessor.COUNTER_NA;
@@ -188,8 +188,8 @@ public class CacheDataTree extends BPlusTree<CacheSearchRow, CacheDataRow> {
CacheDataRowAdapter.RowData.FULL;
if (grp.mvccEnabled()) {
- long mvccTopVer = rowIo.getMvccUpdateTopologyVersion(pageAddr, idx);
- long mvccCntr = rowIo.getMvccUpdateCounter(pageAddr, idx);
+ long mvccTopVer = rowIo.getMvccCoordinatorVersion(pageAddr, idx);
+ long mvccCntr = rowIo.getMvccCounter(pageAddr, idx);
return rowStore.mvccRow(cacheId, hash, link, x, mvccTopVer, mvccCntr);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4d2c380/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataInnerIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataInnerIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataInnerIO.java
index fc9d15d..3d02b27 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataInnerIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataInnerIO.java
@@ -53,12 +53,12 @@ public final class CacheIdAwareDataInnerIO extends AbstractDataInnerIO {
}
/** {@inheritDoc} */
- @Override public long getMvccUpdateTopologyVersion(long pageAddr, int idx) {
+ @Override public long getMvccCoordinatorVersion(long pageAddr, int idx) {
return 0;
}
/** {@inheritDoc} */
- @Override public long getMvccUpdateCounter(long pageAddr, int idx) {
+ @Override public long getMvccCounter(long pageAddr, int idx) {
return CacheCoordinatorsProcessor.COUNTER_NA;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4d2c380/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataLeafIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataLeafIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataLeafIO.java
index b328924..58ae9ff 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataLeafIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataLeafIO.java
@@ -53,12 +53,12 @@ public final class CacheIdAwareDataLeafIO extends AbstractDataLeafIO {
}
/** {@inheritDoc} */
- @Override public long getMvccUpdateTopologyVersion(long pageAddr, int idx) {
+ @Override public long getMvccCoordinatorVersion(long pageAddr, int idx) {
return 0;
}
/** {@inheritDoc} */
- @Override public long getMvccUpdateCounter(long pageAddr, int idx) {
+ @Override public long getMvccCounter(long pageAddr, int idx) {
return CacheCoordinatorsProcessor.COUNTER_NA;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4d2c380/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataInnerIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataInnerIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataInnerIO.java
index 0d424b7..19a5c47 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataInnerIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataInnerIO.java
@@ -53,12 +53,12 @@ public final class DataInnerIO extends AbstractDataInnerIO {
}
/** {@inheritDoc} */
- @Override public long getMvccUpdateTopologyVersion(long pageAddr, int idx) {
+ @Override public long getMvccCoordinatorVersion(long pageAddr, int idx) {
return 0;
}
/** {@inheritDoc} */
- @Override public long getMvccUpdateCounter(long pageAddr, int idx) {
+ @Override public long getMvccCounter(long pageAddr, int idx) {
return CacheCoordinatorsProcessor.COUNTER_NA;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4d2c380/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataLeafIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataLeafIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataLeafIO.java
index ff51bc2..ab10b96 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataLeafIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataLeafIO.java
@@ -53,12 +53,12 @@ public final class DataLeafIO extends AbstractDataLeafIO {
}
/** {@inheritDoc} */
- @Override public long getMvccUpdateTopologyVersion(long pageAddr, int idx) {
+ @Override public long getMvccCoordinatorVersion(long pageAddr, int idx) {
return 0;
}
/** {@inheritDoc} */
- @Override public long getMvccUpdateCounter(long pageAddr, int idx) {
+ @Override public long getMvccCounter(long pageAddr, int idx) {
return CacheCoordinatorsProcessor.COUNTER_NA;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4d2c380/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataInnerIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataInnerIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataInnerIO.java
index 5f4f44c..51a911d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataInnerIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataInnerIO.java
@@ -53,12 +53,12 @@ public final class MvccDataInnerIO extends AbstractDataInnerIO {
}
/** {@inheritDoc} */
- @Override public long getMvccUpdateTopologyVersion(long pageAddr, int idx) {
+ @Override public long getMvccCoordinatorVersion(long pageAddr, int idx) {
return PageUtils.getLong(pageAddr, offset(idx) + 12);
}
/** {@inheritDoc} */
- @Override public long getMvccUpdateCounter(long pageAddr, int idx) {
+ @Override public long getMvccCounter(long pageAddr, int idx) {
return PageUtils.getLong(pageAddr, offset(idx) + 20);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4d2c380/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataLeafIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataLeafIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataLeafIO.java
index e7cfca7..84c33a4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataLeafIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataLeafIO.java
@@ -53,12 +53,12 @@ public final class MvccDataLeafIO extends AbstractDataLeafIO {
}
/** {@inheritDoc} */
- @Override public long getMvccUpdateTopologyVersion(long pageAddr, int idx) {
+ @Override public long getMvccCoordinatorVersion(long pageAddr, int idx) {
return PageUtils.getLong(pageAddr, offset(idx) + 12);
}
/** {@inheritDoc} */
- @Override public long getMvccUpdateCounter(long pageAddr, int idx) {
+ @Override public long getMvccCounter(long pageAddr, int idx) {
return PageUtils.getLong(pageAddr, offset(idx) + 20);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4d2c380/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccVersionBasedSearchRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccVersionBasedSearchRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccVersionBasedSearchRow.java
index f708ffd..6af2c4c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccVersionBasedSearchRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccVersionBasedSearchRow.java
@@ -57,10 +57,10 @@ public class MvccVersionBasedSearchRow extends SearchRow implements BPlusTree.Ro
RowLinkIO rowIo = (RowLinkIO)io;
- if (rowIo.getMvccUpdateTopologyVersion(pageAddr, idx) != ver.coordinatorVersion())
+ if (rowIo.getMvccCoordinatorVersion(pageAddr, idx) != ver.coordinatorVersion())
return true;
- return !ver.activeTransactions().contains(ver.counter()); // TODO IGNITE-3478 sort active transactions?
+ return !ver.activeTransactions().contains(rowIo.getMvccCounter(pageAddr, idx)); // TODO IGNITE-3478 sort active transactions?
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4d2c380/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/RowLinkIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/RowLinkIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/RowLinkIO.java
index 8b341cb..111968d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/RowLinkIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/RowLinkIO.java
@@ -42,7 +42,17 @@ public interface RowLinkIO {
*/
public int getCacheId(long pageAddr, int idx);
- public long getMvccUpdateTopologyVersion(long pageAddr, int idx);
+ /**
+ * @param pageAddr Page address.
+ * @param idx Index.
+ * @return Mvcc coordinator version.
+ */
+ public long getMvccCoordinatorVersion(long pageAddr, int idx);
- public long getMvccUpdateCounter(long pageAddr, int idx);
+ /**
+ * @param pageAddr Page address.
+ * @param idx Index.
+ * @return Mvcc counter.
+ */
+ public long getMvccCounter(long pageAddr, int idx);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4d2c380/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
index 7936340..89b3df2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
@@ -2704,56 +2704,77 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
fut.get();
- final Integer key = 0;
+ final int KEYS = 1000;
for (int i = 0; i < 10; i++) {
- try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
- cache.put(key + 1, i);
+ for (int k = 0; k < KEYS; k++) {
+ final Integer key = k;
- tx.commit();
+ try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ cache.put(key, i);
+
+ tx.commit();
+ }
}
}
- for (int i = 0; i < 10; i++) {
- try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
- cache.put(key, i);
+ for (int k = 0; k < KEYS; k++) {
+ final Integer key = k;
- tx.commit();
- }
- }
+ KeyCacheObject key0 = cctx.toCacheKeyObject(key);
- KeyCacheObject key0 = cctx.toCacheKeyObject(key);
+ List<T2<Object, MvccCounter>> vers = cctx.offheap().mvccAllVersions(cctx, key0);
- List<T2<Object, MvccCounter>> vers = cctx.offheap().mvccAllVersions(cctx, key0);
+ assertEquals(10, vers.size());
- assertEquals(10, vers.size());
+ CacheDataRow row = cctx.offheap().read(cctx, key0);
- CacheDataRow row = cctx.offheap().read(cctx, key0);
+ checkRow(cctx, row, key0, vers.get(0).get1());
- checkRow(cctx, row, key0, vers.get(0).get1());
+ for (T2<Object, MvccCounter> ver : vers) {
+ MvccCounter cntr = ver.get2();
- for (T2<Object, MvccCounter> ver : vers) {
- MvccCounter cntr = ver.get2();
+ MvccCoordinatorVersion readVer =
+ new MvccCoordinatorVersionResponse(cntr.coordinatorVersion(), cntr.counter(), 0);
- MvccCoordinatorVersion readVer =
- new MvccCoordinatorVersionResponse(cntr.coordinatorVersion(), cntr.counter(), 0);
+ row = cctx.offheap().mvccRead(cctx, key0, readVer);
- row = cctx.offheap().mvccRead(cctx, key0, readVer);
+ checkRow(cctx, row, key0, ver.get1());
+ }
- checkRow(cctx, row, key0, ver.get1());
- }
+ checkRow(cctx,
+ cctx.offheap().mvccRead(cctx, key0, version(vers.get(0).get2().coordinatorVersion() + 1, 1)),
+ key0,
+ vers.get(0).get1());
+
+ checkRow(cctx,
+ cctx.offheap().mvccRead(cctx, key0, version(vers.get(0).get2().coordinatorVersion(), vers.get(0).get2().counter() + 1)),
+ key0,
+ vers.get(0).get1());
- checkRow(cctx,
- cctx.offheap().mvccRead(cctx, key0, new MvccCoordinatorVersionResponse(vers.get(0).get2().coordinatorVersion() + 1, 1, 0)),
- key0,
- vers.get(0).get1());
+ MvccCoordinatorVersionResponse ver = version(crd.currentCoordinator().coordinatorVersion(), 100000);
- checkRow(cctx,
- cctx.offheap().mvccRead(cctx, key0, new MvccCoordinatorVersionResponse(vers.get(0).get2().coordinatorVersion(), vers.get(0).get2().counter() + 1, 0)),
- key0,
- vers.get(0).get1());
+ for (int v = 0; v < vers.size(); v++) {
+ MvccCounter cntr = vers.get(v).get2();
+
+ ver.addTx(cntr.counter());
+
+ row = cctx.offheap().mvccRead(cctx, key0, ver);
+
+ if (v == vers.size() - 1)
+ assertNull(row);
+ else
+ checkRow(cctx, row, key0, vers.get(v + 1).get1());
+ }
+ }
}
+ /**
+ * @param cctx Context.
+ * @param row Row.
+ * @param expKey Expected row key.
+ * @param expVal Expected row value.
+ */
private void checkRow(GridCacheContext cctx, CacheDataRow row, KeyCacheObject expKey, Object expVal) {
assertNotNull(row);
assertEquals(expKey, row.key());
@@ -2761,6 +2782,15 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
}
/**
+ * @param crdVer Coordinator version.
+ * @param cntr Counter.
+ * @return Version.
+ */
+ private MvccCoordinatorVersionResponse version(long crdVer, long cntr) {
+ return new MvccCoordinatorVersionResponse(crdVer, cntr, 0);
+ }
+
+ /**
* @return Cache configurations.
*/
private List<CacheConfiguration<Object, Object>> cacheConfigurations() {
[09/17] ignite git commit: Merge remote-tracking branch
'remotes/origin/ignite-3478' into ignite-5937
Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-3478' into ignite-5937
# Conflicts:
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c553638a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c553638a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c553638a
Branch: refs/heads/ignite-5932
Commit: c553638a48594d110472aad27ba8218f7e92f663
Parents: 0bbbbe2 fd53c1a
Author: sboikov <sb...@gridgain.com>
Authored: Mon Oct 9 16:27:14 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Oct 9 16:27:14 2017 +0300
----------------------------------------------------------------------
assembly/release-fabric-base.xml | 1 -
.../internal/jdbc2/JdbcMetadataSelfTest.java | 28 +-
.../internal/jdbc2/JdbcStatementSelfTest.java | 130 +++++++-
.../JdbcThinAbstractDmlStatementSelfTest.java | 2 +-
.../thin/JdbcThinAutoCloseServerCursorTest.java | 8 +-
.../jdbc/thin/JdbcThinComplexQuerySelfTest.java | 2 +-
.../jdbc/thin/JdbcThinConnectionSelfTest.java | 4 +
.../JdbcThinDynamicIndexAbstractSelfTest.java | 2 +-
.../jdbc/thin/JdbcThinEmptyCacheSelfTest.java | 2 +-
.../jdbc/thin/JdbcThinMetadataSelfTest.java | 39 +--
.../JdbcThinMissingLongArrayResultsTest.java | 2 +-
.../jdbc/thin/JdbcThinNoDefaultSchemaTest.java | 5 +-
.../thin/JdbcThinPreparedStatementSelfTest.java | 2 +-
.../jdbc/thin/JdbcThinResultSetSelfTest.java | 2 +-
.../jdbc/thin/JdbcThinStatementSelfTest.java | 46 +--
.../org/apache/ignite/IgniteJdbcDriver.java | 9 +-
.../apache/ignite/configuration/WALMode.java | 9 +-
.../connection/GridClientNioTcpConnection.java | 2 +
.../internal/jdbc/thin/JdbcThinConnection.java | 6 +-
.../jdbc/thin/JdbcThinDatabaseMetadata.java | 3 +-
.../jdbc/thin/JdbcThinPreparedStatement.java | 5 +-
.../internal/jdbc/thin/JdbcThinStatement.java | 9 +-
.../internal/jdbc/thin/JdbcThinTcpIo.java | 4 +-
.../ignite/internal/jdbc2/JdbcConnection.java | 13 +
.../internal/jdbc2/JdbcDatabaseMetadata.java | 124 +++++---
.../jdbc2/JdbcQueryMultipleStatementsTask.java | 167 ++++++++++
.../ignite/internal/jdbc2/JdbcQueryTask.java | 154 +++-------
.../internal/jdbc2/JdbcQueryTaskResult.java | 120 ++++++++
.../ignite/internal/jdbc2/JdbcQueryTaskV3.java | 94 ++++++
.../ignite/internal/jdbc2/JdbcResultSet.java | 175 ++++++++---
.../ignite/internal/jdbc2/JdbcStatement.java | 270 +++++++++--------
.../internal/jdbc2/JdbcStatementResultInfo.java | 73 +++++
.../jdbc2/JdbcStreamedPreparedStatement.java | 19 +-
.../ignite/internal/pagemem/wal/WALPointer.java | 4 +-
.../internal/pagemem/wal/record/DataEntry.java | 13 +-
.../internal/pagemem/wal/record/DataRecord.java | 20 +-
.../pagemem/wal/record/SnapshotRecord.java | 58 ++++
.../pagemem/wal/record/TimeStampRecord.java | 57 ++++
.../internal/pagemem/wal/record/TxRecord.java | 52 ++--
.../pagemem/wal/record/UnwrapDataEntry.java | 22 +-
.../internal/pagemem/wal/record/WALRecord.java | 6 +-
.../processors/cache/GridCacheAdapter.java | 16 +
.../processors/cache/GridCacheMapEntry.java | 2 +
.../GridCacheDatabaseSharedManager.java | 5 +
.../persistence/pagemem/PageMemoryImpl.java | 18 ++
.../cache/persistence/wal/FileWALPointer.java | 3 +
.../wal/FileWriteAheadLogManager.java | 7 +
.../reader/StandaloneWalRecordsIterator.java | 37 ++-
.../wal/serializer/RecordDataV1Serializer.java | 6 +-
.../wal/serializer/RecordDataV2Serializer.java | 49 ++-
.../wal/serializer/RecordV2Serializer.java | 2 +-
.../wal/serializer/TxRecordSerializer.java | 3 +-
.../cache/query/GridCacheQueryManager.java | 87 +++++-
.../query/GridCacheQuerySqlMetadataJobV2.java | 154 ++++++++++
.../query/GridCacheQuerySqlMetadataV2.java | 101 +++++++
.../cache/query/GridCacheSqlMetadata.java | 8 +
.../cache/query/IgniteQueryErrorCode.java | 2 +-
.../cache/transactions/IgniteTxAdapter.java | 3 +-
.../processors/odbc/jdbc/JdbcColumnMeta.java | 10 +
.../processors/odbc/jdbc/JdbcColumnMetaV2.java | 74 +++++
.../odbc/jdbc/JdbcConnectionContext.java | 4 +-
.../odbc/jdbc/JdbcMetaColumnsResult.java | 28 +-
.../odbc/jdbc/JdbcMetaColumnsResultV2.java | 50 +++
.../odbc/jdbc/JdbcRequestHandler.java | 32 +-
.../processors/odbc/jdbc/JdbcResult.java | 8 +
.../utils/PlatformConfigurationUtils.java | 3 +
.../processors/query/GridQueryIndexing.java | 22 +-
.../processors/query/GridQueryProcessor.java | 8 +-
.../query/QueryTypeDescriptorImpl.java | 42 ++-
.../query/property/QueryBinaryProperty.java | 1 -
.../handlers/cache/GridCacheCommandHandler.java | 12 +-
.../internal/visor/query/VisorQueryTask.java | 36 ++-
.../cache/GridCacheAbstractFullApiSelfTest.java | 45 ++-
...IgniteClientCacheInitializationFailTest.java | 18 +-
.../db/wal/IgniteWalSerializerVersionTest.java | 205 ++++++++++++-
.../db/wal/reader/IgniteWalReaderTest.java | 256 +++++++++++++++-
.../query/h2/DmlStatementsProcessor.java | 26 +-
.../internal/processors/query/h2/H2Schema.java | 17 +-
.../internal/processors/query/h2/H2TypeKey.java | 64 ++++
.../processors/query/h2/IgniteH2Indexing.java | 33 +-
.../query/h2/ddl/DdlStatementsProcessor.java | 36 ++-
.../query/h2/sql/GridSqlCreateTable.java | 34 +++
.../query/h2/sql/GridSqlQueryParser.java | 58 +++-
...ynamicColumnsAbstractConcurrentSelfTest.java | 57 ++--
...umnsConcurrentAtomicPartitionedSelfTest.java | 2 +-
...lumnsConcurrentAtomicReplicatedSelfTest.java | 2 +-
...currentTransactionalPartitionedSelfTest.java | 2 +-
...ncurrentTransactionalReplicatedSelfTest.java | 5 +-
.../H2DynamicColumnsAbstractBasicSelfTest.java | 43 +++
.../cache/index/H2DynamicTableSelfTest.java | 301 +++++++++++++++++--
.../query/IgniteSqlNotNullConstraintTest.java | 2 +-
.../h2/GridIndexingSpiAbstractSelfTest.java | 47 +--
.../Cache/Query/CacheDmlQueriesTest.cs | 4 +-
.../Cache/Query/Linq/CacheLinqTest.Strings.cs | 5 +
.../Apache.Ignite.Linq/Impl/MethodVisitor.cs | 4 +
modules/web-console/backend/app/agentSocket.js | 21 +-
.../web-console/backend/app/browsersHandler.js | 9 +-
modules/web-console/backend/package.json | 4 +-
.../app/modules/agent/AgentManager.service.js | 25 +-
.../app/modules/agent/decompress.worker.js | 34 +++
.../frontend/app/modules/sql/sql.controller.js | 36 ++-
.../frontend/app/utils/SimpleWorkerPool.js | 119 ++++++++
modules/web-console/frontend/package.json | 8 +-
.../web-console/frontend/views/sql/sql.tpl.pug | 14 +-
104 files changed, 3445 insertions(+), 687 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/c553638a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
[10/17] ignite git commit: ignite-5937
Posted by sb...@apache.org.
ignite-5937
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c1b2c03d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c1b2c03d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c1b2c03d
Branch: refs/heads/ignite-5932
Commit: c1b2c03dc1ee9de222997cba4efcb2e5fb1a5885
Parents: c553638
Author: sboikov <sb...@gridgain.com>
Authored: Mon Oct 9 17:05:02 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Oct 9 17:50:53 2017 +0300
----------------------------------------------------------------------
.../cache/IgniteCacheOffheapManagerImpl.java | 285 +++++++------------
.../GridDhtPartitionsExchangeFuture.java | 2 +-
.../cache/mvcc/CacheCoordinatorsProcessor.java | 32 ++-
.../cache/mvcc/CacheMvccClusterRestartTest.java | 173 +++++++++++
4 files changed, 308 insertions(+), 184 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/c1b2c03d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index dd4d7e0..80d36c1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -1356,15 +1356,6 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
return dataRow;
}
- private int compare(CacheDataRow row, long crdVer, long mvccCntr) {
- int cmp = Long.compare(row.mvccCoordinatorVersion(), crdVer);
-
- if (cmp != 0)
- return cmp;
-
- return Long.compare(row.mvccCounter(), mvccCntr);
- }
-
/** {@inheritDoc} */
@Override public GridLongList mvccRemove(GridCacheContext cctx,
boolean primary,
@@ -1376,9 +1367,67 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
throw new NodeStoppingException("Operation has been cancelled (node is stopping).");
try {
+ int cacheId = grp.storeCacheIdInDataPage() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID;
+ CacheObjectContext coCtx = cctx.cacheObjectContext();
- return null;
+ // Make sure value bytes initialized.
+ key.valueBytes(coCtx);
+
+ MvccUpdateRow updateRow = new MvccUpdateRow(
+ key,
+ null,
+ null,
+ mvccVer,
+ partId,
+ cacheId);
+
+ rowStore.addRow(updateRow);
+
+ assert updateRow.link() != 0 : updateRow;
+
+ if (grp.sharedGroup() && updateRow.cacheId() == CU.UNDEFINED_CACHE_ID)
+ updateRow.cacheId(cctx.cacheId());
+
+ GridLongList waitTxs = null;
+
+ if (mvccVer.initialLoad()) {
+ boolean old = dataTree.putx(updateRow);
+
+ assert !old;
+
+ incrementSize(cctx.cacheId());
+ }
+ else {
+ dataTree.iterate(updateRow, new MvccKeyMinVersionBound(cacheId, key), updateRow);
+
+ boolean old = dataTree.putx(updateRow);
+
+ assert !old;
+
+ if (!updateRow.previousNotNull())
+ incrementSize(cctx.cacheId());
+
+ waitTxs = updateRow.activeTransactions();
+
+ List<CacheSearchRow> cleanupRows = updateRow.cleanupRows();
+
+ if (cleanupRows != null) {
+ for (int i = 0; i < cleanupRows.size(); i++) {
+ CacheSearchRow oldRow = cleanupRows.get(i);
+
+ assert oldRow.link() != 0L : oldRow;
+
+ boolean rmvd = dataTree.removex(oldRow);
+
+ assert rmvd;
+
+ rowStore.removeRow(oldRow.link());
+ }
+ }
+ }
+
+ return waitTxs;
}
finally {
busyLock.leaveBusy();
@@ -1407,135 +1456,60 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
key.valueBytes(coCtx);
val.valueBytes(coCtx);
- if (true) {
- MvccUpdateRow updateRow = new MvccUpdateRow(
- key,
- val,
- ver,
- mvccVer,
- partId,
- cacheId);
-
- rowStore.addRow(updateRow);
-
- assert updateRow.link() != 0 : updateRow;
-
- if (grp.sharedGroup() && updateRow.cacheId() == CU.UNDEFINED_CACHE_ID)
- updateRow.cacheId(cctx.cacheId());
-
- GridLongList waitTxs = null;
-
- if (mvccVer.initialLoad()) {
- boolean old = dataTree.putx(updateRow);
-
- assert !old;
-
- incrementSize(cctx.cacheId());
- }
- else {
- dataTree.iterate(updateRow, new MvccKeyMinVersionBound(cacheId, key), updateRow);
-
- boolean old = dataTree.putx(updateRow);
-
- assert !old;
-
- if (!updateRow.previousNotNull())
- incrementSize(cctx.cacheId());
-
- waitTxs = updateRow.activeTransactions();
+ MvccUpdateRow updateRow = new MvccUpdateRow(
+ key,
+ val,
+ ver,
+ mvccVer,
+ partId,
+ cacheId);
- List<CacheSearchRow> cleanupRows = updateRow.cleanupRows();
+ rowStore.addRow(updateRow);
- if (cleanupRows != null) {
- for (int i = 0; i < cleanupRows.size(); i++) {
- CacheSearchRow oldRow = cleanupRows.get(i);
+ assert updateRow.link() != 0 : updateRow;
- assert oldRow.link() != 0L : oldRow;
+ if (grp.sharedGroup() && updateRow.cacheId() == CU.UNDEFINED_CACHE_ID)
+ updateRow.cacheId(cctx.cacheId());
- boolean rmvd = dataTree.removex(oldRow);
+ GridLongList waitTxs = null;
- assert rmvd;
+ if (mvccVer.initialLoad()) {
+ boolean old = dataTree.putx(updateRow);
- rowStore.removeRow(oldRow.link());
- }
- }
- }
+ assert !old;
- return waitTxs;
+ incrementSize(cctx.cacheId());
}
else {
- MvccDataRow dataRow = new MvccDataRow(
- key,
- val,
- ver,
- partId,
- cacheId,
- mvccVer.coordinatorVersion(),
- mvccVer.counter());
-
- rowStore.addRow(dataRow);
+ dataTree.iterate(updateRow, new MvccKeyMinVersionBound(cacheId, key), updateRow);
- assert dataRow.link() != 0 : dataRow;
-
- if (grp.sharedGroup() && dataRow.cacheId() == CU.UNDEFINED_CACHE_ID)
- dataRow.cacheId(cctx.cacheId());
-
- boolean old = dataTree.putx(dataRow);
+ boolean old = dataTree.putx(updateRow);
assert !old;
- GridLongList waitTxs = null;
-
- if (!mvccVer.initialLoad()) {
- MvccLongList activeTxs = mvccVer.activeTransactions();
-
- // TODO IGNITE-3484: need special method.
- GridCursor<CacheDataRow> cur = dataTree.find(
- new MvccSearchRow(cacheId, key, mvccVer.coordinatorVersion(), mvccVer.counter() - 1),
- new MvccSearchRow(cacheId, key, 1, 1));
-
- boolean first = true;
-
- boolean activeTx = false;
-
- while (cur.next()) {
- CacheDataRow oldVal = cur.get();
-
- assert oldVal.link() != 0 : oldVal;
-
- if (activeTxs != null && oldVal.mvccCoordinatorVersion() == mvccVer.coordinatorVersion() &&
- activeTxs.contains(oldVal.mvccCounter())) {
- if (waitTxs == null)
- waitTxs = new GridLongList();
+ if (!updateRow.previousNotNull())
+ incrementSize(cctx.cacheId());
- assert oldVal.mvccCounter() != mvccVer.counter();
+ waitTxs = updateRow.activeTransactions();
- waitTxs.add(oldVal.mvccCounter());
+ List<CacheSearchRow> cleanupRows = updateRow.cleanupRows();
- activeTx = true;
- }
+ if (cleanupRows != null) {
+ for (int i = 0; i < cleanupRows.size(); i++) {
+ CacheSearchRow oldRow = cleanupRows.get(i);
- if (!activeTx) {
- // Should not delete oldest version which is less than cleanup version.
- int cmp = compare(oldVal, mvccVer.coordinatorVersion(), mvccVer.cleanupVersion());
+ assert oldRow.link() != 0L : oldRow;
- if (cmp <= 0) {
- if (first)
- first = false;
- else {
- boolean rmvd = dataTree.removex(oldVal);
+ boolean rmvd = dataTree.removex(oldRow);
- assert rmvd;
+ assert rmvd;
- rowStore.removeRow(oldVal.link());
- }
- }
- }
+ rowStore.removeRow(oldRow.link());
}
}
-
- return waitTxs;
}
+
+ return waitTxs;
}
finally {
busyLock.leaveBusy();
@@ -1746,26 +1720,15 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
CacheDataRow row;
if (grp.mvccEnabled()) {
- if (true) {
- MvccKeyMaxVersionBound searchRow = new MvccKeyMaxVersionBound(cacheId, key);
-
- dataTree.iterate(
- searchRow,
- new MvccKeyMinVersionBound(cacheId, key),
- searchRow // Use the same instance as closure to do not create extra object.
- );
+ MvccKeyMaxVersionBound searchRow = new MvccKeyMaxVersionBound(cacheId, key);
- row = searchRow.row();
- }
- else {
- GridCursor<CacheDataRow> cur = dataTree.find(new MvccSearchRow(cacheId, key, Long.MAX_VALUE, Long.MAX_VALUE),
- new MvccSearchRow(cacheId, key, 1, 1));
+ dataTree.iterate(
+ searchRow,
+ new MvccKeyMinVersionBound(cacheId, key),
+ searchRow // Use the same instance as closure to do not create extra object.
+ );
- if (cur.next())
- row = cur.get();
- else
- row = null;
- }
+ row = searchRow.row();
}
else
row = dataTree.findOne(new SearchRow(cacheId, key), CacheDataRowAdapter.RowData.NO_KEY);
@@ -1818,55 +1781,19 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
int cacheId = grp.sharedGroup() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID;
- if (true) {
- MvccVersionBasedSearchRow lower = new MvccVersionBasedSearchRow(cacheId, key, ver);
-
- dataTree.iterate(
- lower,
- new MvccKeyMinVersionBound(cacheId, key),
- lower // Use the same instance as closure to do not create extra object.
- );
-
- CacheDataRow row = lower.row();
-
- afterRowFound(row, key);
-
- return row;
- }
- else {
- GridCursor<CacheDataRow> cur = dataTree.find(
- new MvccSearchRow(cacheId, key, ver.coordinatorVersion(), ver.counter()),
- new MvccSearchRow(cacheId, key, 1, 1));
-
- CacheDataRow row = null;
+ MvccVersionBasedSearchRow lower = new MvccVersionBasedSearchRow(cacheId, key, ver);
- MvccLongList txs = ver.activeTransactions();
+ dataTree.iterate(
+ lower,
+ new MvccKeyMinVersionBound(cacheId, key),
+ lower // Use the same instance as closure to do not create extra object.
+ );
- while (cur.next()) {
- CacheDataRow row0 = cur.get();
+ CacheDataRow row = lower.row();
- assert row0.mvccCoordinatorVersion() > 0 : row0;
-
- boolean visible;
-
- if (txs != null) {
- visible = row0.mvccCoordinatorVersion() != ver.coordinatorVersion()
- || !txs.contains(row0.mvccCounter());
- }
- else
- visible = true;
-
- if (visible) {
- row = row0;
-
- break;
- }
- }
-
- assert row == null || key.equals(row.key());
+ afterRowFound(row, key);
- return row;
- }
+ return row;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/c1b2c03d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 830d50b..88095ab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -560,7 +560,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
MvccCoordinator mvccCrd = firstEvtDiscoCache.mvccCoordinator();
boolean mvccCrdChange = mvccCrd != null &&
- initialVersion().equals(mvccCrd.topologyVersion());
+ (initialVersion().equals(mvccCrd.topologyVersion()) || activateCluster());
cctx.kernalContext().coordinators().currentCoordinator(mvccCrd);
http://git-wip-us.apache.org/repos/asf/ignite/blob/c1b2c03d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
index b9b8ea1..54fb3c8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
@@ -66,6 +66,7 @@ import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.events.EventType.EVT_NODE_METRICS_UPDATED;
import static org.apache.ignite.events.EventType.EVT_NODE_SEGMENTED;
import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE_COORDINATOR;
+import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
/**
@@ -86,7 +87,13 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
/** */
private static final byte MSG_POLICY = SYSTEM_POOL;
-
+
+ /** */
+ private static final long CRD_VER_MASK = 0x3F_FF_FF_FF_FF_FF_FF_FFL;
+
+ /** */
+ private static final long RMVD_VAL_VER_MASK = 0x80_00_00_00_00_00_00_00L;
+
/** */
private volatile MvccCoordinator curCrd;
@@ -139,6 +146,21 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
super(ctx);
}
+ public static int compareCoordinatorVersions(long crdVer1, long crdVer2) {
+ crdVer1 = CRD_VER_MASK & crdVer1;
+ crdVer2 = CRD_VER_MASK & crdVer2;
+
+ return Long.compare(crdVer1, crdVer2);
+ }
+
+ public long createVersionForRemovedValue(long crdVer) {
+ return crdVer | RMVD_VAL_VER_MASK;
+ }
+
+ public boolean versionForRemovedValue(long crdVer) {
+ return (crdVer & RMVD_VAL_VER_MASK) != 0;
+ }
+
/** {@inheritDoc} */
@Override public void start() throws IgniteCheckedException {
statCntrs = new StatCounter[7];
@@ -199,7 +221,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
* @param topVer Topology version.
*/
public void onDiscoveryEvent(int evtType, Collection<ClusterNode> nodes, long topVer) {
- if (evtType == EVT_NODE_METRICS_UPDATED)
+ if (evtType == EVT_NODE_METRICS_UPDATED || evtType == EVT_DISCOVERY_CUSTOM_EVT)
return;
MvccCoordinator crd;
@@ -778,7 +800,9 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
private MvccCoordinatorVersionResponse assignQueryCounter(UUID qryNodeId, long futId) {
assert crdVer != 0;
- return activeQueries.assignQueryCounter(qryNodeId, futId);
+ MvccCoordinatorVersionResponse res = activeQueries.assignQueryCounter(qryNodeId, futId);
+
+ return res;
// MvccCoordinatorVersionResponse res = new MvccCoordinatorVersionResponse();
//
@@ -989,7 +1013,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
log.info("Initialize local node as mvcc coordinator [node=" + ctx.localNodeId() +
", topVer=" + topVer + ']');
- crdVer = topVer.topologyVersion();
+ crdVer = topVer.topologyVersion() + ctx.discovery().gridStartTime();
prevCrdQueries.init(activeQueries, discoCache, ctx.discovery());
http://git-wip-us.apache.org/repos/asf/ignite/blob/c1b2c03d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccClusterRestartTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccClusterRestartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccClusterRestartTest.java
new file mode 100644
index 0000000..ed7b62d
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccClusterRestartTest.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.MemoryConfiguration;
+import org.apache.ignite.configuration.PersistentStoreConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ *
+ */
+public class CacheMvccClusterRestartTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setConsistentId(gridName);
+
+ cfg.setMvccEnabled(true);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+ MemoryConfiguration memCfg = new MemoryConfiguration();
+ memCfg.setPageSize(1024);
+ memCfg.setDefaultMemoryPolicySize(100 * 1024 * 1024);
+
+ cfg.setMemoryConfiguration(memCfg);
+
+ cfg.setPersistentStoreConfiguration(new PersistentStoreConfiguration().setWalMode(WALMode.LOG_ONLY));
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ GridTestUtils.deleteDbFiles();
+
+ super.afterTestsStopped();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ GridTestUtils.deleteDbFiles();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ GridTestUtils.deleteDbFiles();
+
+ stopAllGrids();
+
+ super.afterTest();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRestart1() throws Exception {
+ restart1(3, 3);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRestart2() throws Exception {
+ restart1(1, 3);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRestart3() throws Exception {
+ restart1(3, 1);
+ }
+
+ /**
+ * @param srvBefore Number of servers before restart.
+ * @param srvAfter Number of servers after restart.
+ * @throws Exception If failed.
+ */
+ private void restart1(int srvBefore, int srvAfter) throws Exception {
+ Ignite srv0 = startGridsMultiThreaded(srvBefore);
+
+ srv0.active(true);
+
+ IgniteCache<Object, Object> cache = srv0.createCache(cacheConfiguration());
+
+ Set<Integer> keys = new HashSet<>(primaryKeys(cache, 1, 0));
+
+ try (Transaction tx = srv0.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ for (Integer k : keys)
+ cache.put(k, k);
+
+ tx.commit();
+ }
+
+ stopAllGrids();
+
+ srv0 = startGridsMultiThreaded(srvAfter);
+
+ srv0.active(true);
+
+ cache = srv0.cache(DEFAULT_CACHE_NAME);
+
+ Map<Object, Object> res = cache.getAll(keys);
+
+ assertEquals(keys.size(), res.size());
+
+ for (Integer k : keys)
+ assertEquals(k, cache.get(k));
+
+ try (Transaction tx = srv0.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ for (Integer k : keys)
+ cache.put(k, k + 1);
+
+ tx.commit();
+ }
+
+ for (Integer k : keys)
+ assertEquals(k + 1, cache.get(k));
+ }
+
+ /**
+ * @return Cache configuration.
+ */
+ private CacheConfiguration<Object, Object> cacheConfiguration() {
+ CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME);
+
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+ ccfg.setAtomicityMode(TRANSACTIONAL);
+ ccfg.setBackups(2);
+
+ return ccfg;
+ }
+}
[04/17] ignite git commit: Merge remote-tracking branch
'remotes/origin/ignite-3478' into ignite-5937
Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-3478' into ignite-5937
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e6940bd5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e6940bd5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e6940bd5
Branch: refs/heads/ignite-5932
Commit: e6940bd5fe00cc77d1d27facfc56964c5b03c9fe
Parents: c4f98f3 a1d9ddd
Author: sboikov <sb...@gridgain.com>
Authored: Wed Oct 4 11:38:37 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Oct 4 11:38:37 2017 +0300
----------------------------------------------------------------------
.../JettyRestProcessorAbstractSelfTest.java | 46 +
.../ignite/jdbc/JdbcErrorsAbstractSelfTest.java | 88 +-
.../FoldersReuseCompatibilityTest.java | 224 +
...itePersistenceCompatibilityAbstractTest.java | 3 +
.../IgniteCompatibilityBasicTestSuite.java | 3 +
.../apache/ignite/IgniteSystemProperties.java | 7 +
.../apache/ignite/configuration/WALMode.java | 14 +-
.../ignite/internal/GridKernalContext.java | 8 +-
.../ignite/internal/GridKernalContextImpl.java | 17 +-
.../apache/ignite/internal/IgniteKernal.java | 6 +-
.../internal/binary/BinaryClassDescriptor.java | 4 +-
.../discovery/GridDiscoveryManager.java | 55 +-
.../affinity/GridAffinityAssignmentCache.java | 2 +-
.../cache/GridCacheAffinityManager.java | 2 +-
.../processors/cache/GridCacheUtils.java | 6 +-
.../cache/binary/BinaryMetadataFileStore.java | 6 +-
.../dht/GridPartitionedGetFuture.java | 21 +-
.../dht/GridPartitionedSingleGetFuture.java | 6 +-
.../distributed/near/GridNearGetFuture.java | 6 +-
.../GridCacheDatabaseSharedManager.java | 91 +-
.../IgniteCacheDatabaseSharedManager.java | 13 +-
.../persistence/file/FilePageStoreManager.java | 29 +-
.../filename/PdsConsistentIdProcessor.java | 568 +
.../persistence/filename/PdsFolderSettings.java | 138 +
.../filename/PdsFoldersResolver.java | 33 +
.../wal/FileWriteAheadLogManager.java | 25 +-
.../wal/reader/StandaloneGridKernalContext.java | 31 +-
.../datastreamer/PlatformDataStreamer.java | 14 +
.../processors/query/GridQueryProcessor.java | 6 +-
.../internal/processors/query/QueryUtils.java | 17 +
.../processors/rest/GridRestCommand.java | 2 +-
.../processors/rest/GridRestProcessor.java | 5 +-
.../ignite/internal/util/IgniteUtils.java | 43 +-
.../cache/VisorCacheAggregatedMetrics.java | 24 +-
.../spi/discovery/tcp/TcpDiscoverySpi.java | 4 +-
.../tcp/internal/TcpDiscoveryNode.java | 15 +
.../binary/BinaryMarshallerSelfTest.java | 136 +-
.../IgniteUidAsConsistentIdMigrationTest.java | 712 ++
.../db/wal/IgniteWalRecoveryTest.java | 13 +-
.../db/wal/reader/IgniteWalReaderTest.java | 187 +-
.../db/wal/reader/MockWalIteratorFactory.java | 25 +-
.../ignite/testsuites/IgnitePdsTestSuite2.java | 4 +
.../processors/query/h2/IgniteH2Indexing.java | 3 +
.../query/h2/ddl/DdlStatementsProcessor.java | 15 +-
.../query/IgniteSqlNotNullConstraintTest.java | 179 +-
.../query/IgniteSqlParameterizedQueryTest.java | 392 +
.../IgniteCacheQuerySelfTestSuite.java | 3 +-
modules/platforms/.gitignore | 3 +-
.../Cache/CacheConfigurationTest.cs | 2 +-
.../Dataload/DataStreamerTest.cs | 50 +-
.../Apache.Ignite.Core.Tests/EventsTest.cs | 2 +-
.../Apache.Ignite.Core.csproj | 1 +
.../Datastream/DataStreamerDefaults.cs | 46 +
.../Datastream/IDataStreamer.cs | 21 +-
.../dotnet/Apache.Ignite.Core/Ignition.cs | 2 +
.../Impl/Binary/BinaryProcessorClient.cs | 4 +-
.../Impl/Binary/BinaryReaderExtensions.cs | 10 +-
.../Impl/Binary/BinaryUtils.cs | 14 +
.../Impl/Cache/Query/QueryCursorBase.cs | 3 +
.../Client/Cache/Query/ClientQueryCursor.cs | 2 +
.../Impl/Client/ClientSocket.cs | 2 +
.../Impl/Datastream/DataStreamerImpl.cs | 45 +-
.../Apache.Ignite.Core/Impl/Events/Events.cs | 2 +-
.../Impl/PlatformJniTarget.cs | 6 +
.../Impl/Unmanaged/UnmanagedCallbacks.cs | 3 +-
modules/platforms/dotnet/Apache.Ignite.ndproj | 11139 +++++++++++++++++
.../http/jetty/GridJettyRestHandler.java | 18 +
.../rest/protocols/http/jetty/favicon.ico | Bin 1406 -> 1150 bytes
.../commands/cache/VisorCacheCommand.scala | 38 +-
parent/pom.xml | 1 +
70 files changed, 14294 insertions(+), 371 deletions(-)
----------------------------------------------------------------------
[06/17] ignite git commit: ignite-5937
Posted by sb...@apache.org.
ignite-5937
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6e25b649
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6e25b649
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6e25b649
Branch: refs/heads/ignite-5932
Commit: 6e25b649f758c3aa308118354b08c6899dd50654
Parents: f8be46d
Author: sboikov <sb...@gridgain.com>
Authored: Wed Oct 4 16:57:59 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Oct 5 17:54:17 2017 +0300
----------------------------------------------------------------------
.../cache/IgniteCacheOffheapManager.java | 11 +-
.../cache/IgniteCacheOffheapManagerImpl.java | 179 +++++++++++++------
.../cache/mvcc/CacheCoordinatorsProcessor.java | 12 +-
.../cache/persistence/tree/BPlusTree.java | 106 +++++------
.../processors/cache/tree/CacheDataTree.java | 2 +-
.../cache/tree/MvccKeyMaxVersionBound.java | 77 ++++++++
.../cache/tree/MvccKeyMinVersionBound.java | 49 +++++
.../processors/cache/tree/MvccUpdateRow.java | 177 ++++++++++++++++++
.../cache/tree/MvccVersionBasedSearchRow.java | 36 +++-
.../cache/mvcc/CacheMvccTransactionsTest.java | 40 +++++
.../processors/database/BPlusTreeSelfTest.java | 123 +++++++++----
11 files changed, 662 insertions(+), 150 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/6e25b649/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index 9d03e4a..8967ce8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -492,7 +492,16 @@ public interface IgniteCacheOffheapManager {
long expireTime,
@Nullable CacheDataRow oldRow) throws IgniteCheckedException;
- GridLongList mvccUpdate(
+ /**
+ * @param cctx Cache context.
+ * @param key Key.
+ * @param val Value.
+ * @param ver Version.
+ * @param mvccVer Mvcc version.
+ * @return List of transactions to wait for.
+ * @throws IgniteCheckedException If failed.
+ */
+ @Nullable GridLongList mvccUpdate(
GridCacheContext cctx,
KeyCacheObject key,
CacheObject val,
http://git-wip-us.apache.org/repos/asf/ignite/blob/6e25b649/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index eef645d..25f36b2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -55,7 +55,10 @@ import org.apache.ignite.internal.processors.cache.tree.CacheDataRowStore;
import org.apache.ignite.internal.processors.cache.tree.CacheDataTree;
import org.apache.ignite.internal.processors.cache.tree.DataRow;
import org.apache.ignite.internal.processors.cache.tree.MvccDataRow;
+import org.apache.ignite.internal.processors.cache.tree.MvccKeyMaxVersionBound;
+import org.apache.ignite.internal.processors.cache.tree.MvccKeyMinVersionBound;
import org.apache.ignite.internal.processors.cache.tree.MvccSearchRow;
+import org.apache.ignite.internal.processors.cache.tree.MvccUpdateRow;
import org.apache.ignite.internal.processors.cache.tree.MvccVersionBasedSearchRow;
import org.apache.ignite.internal.processors.cache.tree.PendingEntriesTree;
import org.apache.ignite.internal.processors.cache.tree.PendingRow;
@@ -1361,83 +1364,141 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
try {
int cacheId = grp.storeCacheIdInDataPage() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID;
- MvccDataRow dataRow = new MvccDataRow(
- key,
- val,
- ver,
- partId,
- cacheId,
- mvccVer.coordinatorVersion(),
- mvccVer.counter());
-
CacheObjectContext coCtx = cctx.cacheObjectContext();
// Make sure value bytes initialized.
key.valueBytes(coCtx);
val.valueBytes(coCtx);
- rowStore.addRow(dataRow);
+ if (true) {
+ MvccUpdateRow updateRow = new MvccUpdateRow(
+ key,
+ val,
+ ver,
+ mvccVer,
+ partId,
+ cacheId);
- assert dataRow.link() != 0 : dataRow;
+ rowStore.addRow(updateRow);
- if (grp.sharedGroup() && dataRow.cacheId() == CU.UNDEFINED_CACHE_ID)
- dataRow.cacheId(cctx.cacheId());
+ assert updateRow.link() != 0 : updateRow;
- boolean old = dataTree.putx(dataRow);
+ if (grp.sharedGroup() && updateRow.cacheId() == CU.UNDEFINED_CACHE_ID)
+ updateRow.cacheId(cctx.cacheId());
- assert !old;
+ GridLongList waitTxs = null;
- GridLongList waitTxs = null;
+ if (mvccVer.initialLoad()) {
+ boolean old = dataTree.putx(updateRow);
- if (!mvccVer.initialLoad()) {
- MvccLongList activeTxs = mvccVer.activeTransactions();
+ assert !old;
- // TODO IGNITE-3484: need special method.
- GridCursor<CacheDataRow> cur = dataTree.find(
- new MvccSearchRow(cacheId, key, mvccVer.coordinatorVersion(), mvccVer.counter() - 1),
- new MvccSearchRow(cacheId, key, 1, 1));
+ incrementSize(cctx.cacheId());
+ }
+ else {
+ dataTree.iterate(updateRow, new MvccKeyMinVersionBound(cacheId, key), updateRow);
- boolean first = true;
+ boolean old = dataTree.putx(updateRow);
- boolean activeTx = false;
+ assert !old;
- while (cur.next()) {
- CacheDataRow oldVal = cur.get();
+ if (!updateRow.previousNotNull())
+ incrementSize(cctx.cacheId());
+
+ waitTxs = updateRow.activeTransactions();
+
+ List<CacheSearchRow> cleanupRows = updateRow.cleanupRows();
- assert oldVal.link() != 0 : oldVal;
+ if (cleanupRows != null) {
+ for (int i = 0; i < cleanupRows.size(); i++) {
+ CacheSearchRow oldRow = cleanupRows.get(i);
- if (activeTxs != null && oldVal.mvccCoordinatorVersion() == mvccVer.coordinatorVersion() &&
- activeTxs.contains(oldVal.mvccCounter())) {
- if (waitTxs == null)
- waitTxs = new GridLongList();
+ assert oldRow.link() != 0L : oldRow;
- assert oldVal.mvccCounter() != mvccVer.counter();
+ boolean rmvd = dataTree.removex(oldRow);
- waitTxs.add(oldVal.mvccCounter());
+ assert rmvd;
- activeTx = true;
+ rowStore.removeRow(oldRow.link());
+ }
}
+ }
- if (!activeTx) {
- // Should not delete oldest version which is less than cleanup version.
- int cmp = compare(oldVal, mvccVer.coordinatorVersion(), mvccVer.cleanupVersion());
+ return waitTxs;
+ }
+ else {
+ MvccDataRow dataRow = new MvccDataRow(
+ key,
+ val,
+ ver,
+ partId,
+ cacheId,
+ mvccVer.coordinatorVersion(),
+ mvccVer.counter());
- if (cmp <= 0) {
- if (first)
- first = false;
- else {
- boolean rmvd = dataTree.removex(oldVal);
+ rowStore.addRow(dataRow);
+
+ assert dataRow.link() != 0 : dataRow;
+
+ if (grp.sharedGroup() && dataRow.cacheId() == CU.UNDEFINED_CACHE_ID)
+ dataRow.cacheId(cctx.cacheId());
- assert rmvd;
+ boolean old = dataTree.putx(dataRow);
- rowStore.removeRow(oldVal.link());
+ assert !old;
+
+ GridLongList waitTxs = null;
+
+ if (!mvccVer.initialLoad()) {
+ MvccLongList activeTxs = mvccVer.activeTransactions();
+
+ // TODO IGNITE-3484: need special method.
+ GridCursor<CacheDataRow> cur = dataTree.find(
+ new MvccSearchRow(cacheId, key, mvccVer.coordinatorVersion(), mvccVer.counter() - 1),
+ new MvccSearchRow(cacheId, key, 1, 1));
+
+ boolean first = true;
+
+ boolean activeTx = false;
+
+ while (cur.next()) {
+ CacheDataRow oldVal = cur.get();
+
+ assert oldVal.link() != 0 : oldVal;
+
+ if (activeTxs != null && oldVal.mvccCoordinatorVersion() == mvccVer.coordinatorVersion() &&
+ activeTxs.contains(oldVal.mvccCounter())) {
+ if (waitTxs == null)
+ waitTxs = new GridLongList();
+
+ assert oldVal.mvccCounter() != mvccVer.counter();
+
+ waitTxs.add(oldVal.mvccCounter());
+
+ activeTx = true;
+ }
+
+ if (!activeTx) {
+ // Should not delete oldest version which is less than cleanup version.
+ int cmp = compare(oldVal, mvccVer.coordinatorVersion(), mvccVer.cleanupVersion());
+
+ if (cmp <= 0) {
+ if (first)
+ first = false;
+ else {
+ boolean rmvd = dataTree.removex(oldVal);
+
+ assert rmvd;
+
+ rowStore.removeRow(oldVal.link());
+ }
}
}
}
}
- }
- return waitTxs;
+ return waitTxs;
+ }
}
finally {
busyLock.leaveBusy();
@@ -1649,11 +1710,15 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
if (grp.mvccEnabled()) {
if (true) {
- row = dataTree.findOneBounded(
- new MvccSearchRow(cacheId, key, Long.MAX_VALUE, Long.MAX_VALUE),
- new MvccSearchRow(cacheId, key, 1L, 1L),
- null,
- CacheDataRowAdapter.RowData.NO_KEY);
+ MvccKeyMaxVersionBound searchRow = new MvccKeyMaxVersionBound(cacheId, key);
+
+ dataTree.iterate(
+ searchRow,
+ new MvccKeyMinVersionBound(cacheId, key),
+ searchRow // Use the same instance as closure to do not create extra object.
+ );
+
+ row = searchRow.row();
}
else {
GridCursor<CacheDataRow> cur = dataTree.find(new MvccSearchRow(cacheId, key, Long.MAX_VALUE, Long.MAX_VALUE),
@@ -1681,6 +1746,8 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
{
assert grp.mvccEnabled();
+ // Note: this method is intended for testing only.
+
key.valueBytes(cctx.cacheObjectContext());
int cacheId = grp.sharedGroup() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID;
@@ -1717,11 +1784,13 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
if (true) {
MvccVersionBasedSearchRow lower = new MvccVersionBasedSearchRow(cacheId, key, ver);
- CacheDataRow row = dataTree.findOneBounded(
+ dataTree.iterate(
lower,
- new MvccSearchRow(cacheId, key, 1L, 1L),
- lower, // Use the same instance as predicate to do not create extra object.
- CacheDataRowAdapter.RowData.NO_KEY);
+ new MvccKeyMinVersionBound(cacheId, key),
+ lower // Use the same instance as closure to do not create extra object.
+ );
+
+ CacheDataRow row = lower.row();
afterRowFound(row, key);
http://git-wip-us.apache.org/repos/asf/ignite/blob/6e25b649/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
index 5080c83..b9b8ea1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
@@ -614,8 +614,14 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
// TODO IGNITE-3478 sorted? + change GridLongList.writeTo?
MvccCoordinatorVersionResponse res = new MvccCoordinatorVersionResponse();
- for (Long txVer : activeTxs.keySet())
+ long minActive = Long.MAX_VALUE;
+
+ for (Long txVer : activeTxs.keySet()) {
+ if (txVer < minActive)
+ minActive = txVer;
+
res.addTx(txVer);
+ }
Object old = activeTxs.put(nextCtr, txId);
@@ -624,7 +630,9 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
long cleanupVer;
if (prevCrdQueries.previousQueriesDone()) {
- cleanupVer = committedCntr.get() - 1;
+ cleanupVer = Math.min(minActive, committedCntr.get());
+
+ cleanupVer--;
Long qryVer = activeQueries.minimalQueryCounter();
http://git-wip-us.apache.org/repos/asf/ignite/blob/6e25b649/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
index b6c5c96..9752b17 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
@@ -908,7 +908,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
long pageAddr = readLock(firstPageId, firstPage); // We always merge pages backwards, the first page is never removed.
try {
- cursor.init(pageAddr, io(pageAddr), 0);
+ cursor.init(pageAddr, io(pageAddr), -1);
}
finally {
readUnlock(firstPageId, firstPage, pageAddr);
@@ -976,17 +976,16 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
/**
* @param lower Lower bound inclusive.
* @param upper Upper bound inclusive.
- * @param x Implementation specific argument, {@code null} always means that we need to return full detached data row.
- * @return First found item which meets bounds and pass predicate.
+ * @param c Closure applied for all found items, iteration is stopped if closure returns {@code false}.
* @throws IgniteCheckedException If failed.
*/
- public T findOneBounded(L lower, L upper, RowPredicate<L, T> p, Object x) throws IgniteCheckedException {
+ public void iterate(L lower, L upper, TreeRowClosure<L, T> c) throws IgniteCheckedException {
checkDestroyed();
try {
- FindOneCursor cursor = new FindOneCursor(lower, upper, p, x);
+ ClosureCursor cursor = new ClosureCursor(lower, upper, c);
- return cursor.findOne();
+ cursor.iterate();
}
catch (IgniteCheckedException e) {
throw new IgniteCheckedException("Runtime failure on bounds: [lower=" + lower + ", upper=" + upper + "]", e);
@@ -4431,18 +4430,13 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
/** */
final L upperBound;
- /** */
- final Object x;
-
/**
* @param lowerBound Lower bound.
* @param upperBound Upper bound.
- * @param x Implementation specific argument, {@code null} always means that we need to return full detached data row.
*/
- AbstractForwardCursor(L lowerBound, L upperBound, Object x) {
+ AbstractForwardCursor(L lowerBound, L upperBound) {
this.lowerBound = lowerBound;
this.upperBound = upperBound;
- this.x = x;
}
/**
@@ -4559,7 +4553,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
throws IgniteCheckedException {
assert io.isLeaf() : io;
assert cnt != 0 : cnt; // We can not see empty pages (empty tree handled in init).
- assert startIdx >= 0 : startIdx;
+ assert startIdx >= 0 || startIdx == -1: startIdx;
assert cnt >= startIdx;
checkDestroyed();
@@ -4596,7 +4590,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
* @return {@code true} If we have rows to return after reading the next page.
* @throws IgniteCheckedException If failed.
*/
- final boolean nextPage(T lastRow) throws IgniteCheckedException {
+ final boolean nextPage(L lastRow) throws IgniteCheckedException {
updateLowerBound(lastRow);
for (;;) {
@@ -4618,7 +4612,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
try {
BPlusIO<L> io = io(pageAddr);
- if (fillFromBuffer(pageAddr, io, 0, io.getCount(pageAddr)))
+ if (fillFromBuffer(pageAddr, io, -1, io.getCount(pageAddr)))
return true;
// Continue fetching forward.
@@ -4639,7 +4633,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
/**
* @param lower New exact lower bound.
*/
- private void updateLowerBound(T lower) {
+ private void updateLowerBound(L lower) {
if (lower != null) {
lowerShift = 1; // Now we have the full row an need to avoid duplicates.
lowerBound = lower; // Move the lower bound forward for further concurrent merge retries.
@@ -4648,30 +4642,27 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
}
/**
- * Forward cursor.
+ * Closure cursor.
*/
@SuppressWarnings("unchecked")
- private final class FindOneCursor extends AbstractForwardCursor {
- /** */
- private Object resRow;
-
+ private final class ClosureCursor extends AbstractForwardCursor {
/** */
- private T lastRow;
+ private final TreeRowClosure<L, T> p;
/** */
- private final RowPredicate<L, T> p;
+ private L lastRow;
/**
* @param lowerBound Lower bound.
* @param upperBound Upper bound.
* @param p Row predicate.
- * @param x Implementation specific argument, {@code null} always means that we need to return full detached data row.
*/
- FindOneCursor(L lowerBound, L upperBound, @Nullable RowPredicate<L, T> p, Object x) {
- super(lowerBound, upperBound, x);
+ ClosureCursor(L lowerBound, L upperBound, TreeRowClosure<L, T> p) {
+ super(lowerBound, upperBound);
assert lowerBound != null;
assert upperBound != null;
+ assert p != null;
this.p = p;
}
@@ -4682,8 +4673,9 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
}
/** {@inheritDoc} */
- @Override boolean fillFromBuffer0(long pageAddr, BPlusIO<L> io, int startIdx, int cnt) throws IgniteCheckedException {
- if (startIdx == 0) // TODO IGNITE-3478: startIdx == 0? can search twice for first item?
+ @Override boolean fillFromBuffer0(long pageAddr, BPlusIO<L> io, int startIdx, int cnt)
+ throws IgniteCheckedException {
+ if (startIdx == -1) // TODO IGNITE-3478: startIdx == 0? can search twice for first item?
startIdx = findLowerBound(pageAddr, io, cnt);
if (cnt == startIdx)
@@ -4698,15 +4690,17 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
return false;
}
- if (p == null || p.apply(BPlusTree.this, io, pageAddr, i)) {
- resRow = getRow(io, pageAddr, i, x);
+ boolean stop = !p.apply(BPlusTree.this, io, pageAddr, i);
+
+ if (stop) {
+ nextPageId = 0; // The End.
return true;
}
}
if (nextPageId != 0)
- lastRow = getRow(io, pageAddr, cnt - 1, null); // Need save last row.
+ lastRow = io.getLookupRow(BPlusTree.this, pageAddr, cnt - 1); // Need save last row.
return true;
}
@@ -4718,36 +4712,28 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
/** {@inheritDoc} */
@Override void onNotFound(boolean readDone) {
- resRow = EMPTY;
+ nextPageId = 0;
}
/**
* @throws IgniteCheckedException If failed.
- * @return Found row.
*/
- private T findOne() throws IgniteCheckedException {
+ private void iterate() throws IgniteCheckedException {
find();
- if (resRow != null) {
- if (resRow == EMPTY)
- return null;
-
- return (T)resRow;
+ if (nextPageId == 0) {
+ return;
}
for (;;) {
- T lastRow0 = lastRow;
+ L lastRow0 = lastRow;
lastRow = null;
nextPage(lastRow0);
- if (resRow != null) {
- if (resRow == EMPTY)
- return null;
-
- return (T)resRow;
- }
+ if (nextPageId == 0)
+ return;
}
}
}
@@ -4758,6 +4744,9 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
@SuppressWarnings("unchecked")
private final class ForwardCursor extends AbstractForwardCursor implements GridCursor<T> {
/** */
+ final Object x;
+
+ /** */
private T[] rows = (T[])EMPTY;
/** */
@@ -4769,13 +4758,19 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
* @param x Implementation specific argument, {@code null} always means that we need to return full detached data row.
*/
ForwardCursor(L lowerBound, L upperBound, Object x) {
- super(lowerBound, upperBound, x);
+ super(lowerBound, upperBound);
+
+ this.x = x;
}
/** {@inheritDoc} */
@Override boolean fillFromBuffer0(long pageAddr, BPlusIO<L> io, int startIdx, int cnt) throws IgniteCheckedException {
- if (lowerBound != null && startIdx == 0)
- startIdx = findLowerBound(pageAddr, io, cnt);
+ if (startIdx == -1) {
+ if (lowerBound != null)
+ startIdx = findLowerBound(pageAddr, io, cnt);
+ else
+ startIdx = 0;
+ }
if (upperBound != null && cnt != startIdx)
cnt = findUpperBound(pageAddr, io, startIdx, cnt);
@@ -5003,7 +4998,16 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
/**
*
*/
- public interface RowPredicate<L, T extends L> {
- public boolean apply(BPlusTree<L, T> tree, BPlusIO<L> io, long pageAddr, int idx) throws IgniteCheckedException;
+ public interface TreeRowClosure<L, T extends L> {
+ /**
+ * @param tree Tree.
+ * @param io Tree IO.
+ * @param pageAddr Page address.
+ * @param idx Item index.
+ * @return {@code True} if item pass predicate. TODO IGNITE-3478
+ * @throws IgniteCheckedException If failed.
+ */
+ public boolean apply(BPlusTree<L, T> tree, BPlusIO<L> io, long pageAddr, int idx)
+ throws IgniteCheckedException;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6e25b649/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
index a1bfc9b..eaeefee 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
@@ -114,7 +114,7 @@ public class CacheDataTree extends BPlusTree<CacheSearchRow, CacheDataRow> {
/** {@inheritDoc} */
@Override protected int compare(BPlusIO<CacheSearchRow> iox, long pageAddr, int idx, CacheSearchRow row)
throws IgniteCheckedException {
- assert !grp.mvccEnabled() || row.mvccCoordinatorVersion() != 0;// || row.getClass() == SearchRow.class;
+ assert !grp.mvccEnabled() || row.mvccCoordinatorVersion() != 0 : row;
RowLinkIO io = (RowLinkIO)iox;
http://git-wip-us.apache.org/repos/asf/ignite/blob/6e25b649/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccKeyMaxVersionBound.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccKeyMaxVersionBound.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccKeyMaxVersionBound.java
new file mode 100644
index 0000000..aa9422d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccKeyMaxVersionBound.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.tree;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter;
+import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow;
+import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public class MvccKeyMaxVersionBound extends SearchRow implements BPlusTree.TreeRowClosure<CacheSearchRow, CacheDataRow> {
+ /** */
+ private CacheDataRow resRow;
+
+ /**
+ * @param cacheId Cache ID.
+ * @param key Key.
+ */
+ public MvccKeyMaxVersionBound(int cacheId, KeyCacheObject key) {
+ super(cacheId, key);
+ }
+
+ /**
+ * @return Found row.
+ */
+ @Nullable public CacheDataRow row() {
+ return resRow;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(BPlusTree<CacheSearchRow, CacheDataRow> tree, BPlusIO<CacheSearchRow> io,
+ long pageAddr,
+ int idx)
+ throws IgniteCheckedException
+ {
+ resRow = ((CacheDataTree)tree).getRow(io, pageAddr, idx, CacheDataRowAdapter.RowData.NO_KEY);
+
+ return false; // Stop search.
+ }
+
+ /** {@inheritDoc} */
+ @Override public long mvccCoordinatorVersion() {
+ return Long.MAX_VALUE;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long mvccCounter() {
+ return Long.MAX_VALUE;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(MvccKeyMaxVersionBound.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6e25b649/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccKeyMinVersionBound.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccKeyMinVersionBound.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccKeyMinVersionBound.java
new file mode 100644
index 0000000..f2ac308
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccKeyMinVersionBound.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.tree;
+
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ *
+ */
+public class MvccKeyMinVersionBound extends SearchRow {
+ /**
+ * @param cacheId Cache ID.
+ * @param key Key.
+ */
+ public MvccKeyMinVersionBound(int cacheId, KeyCacheObject key) {
+ super(cacheId, key);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long mvccCoordinatorVersion() {
+ return 1L;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long mvccCounter() {
+ return 1L;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(MvccKeyMinVersionBound.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6e25b649/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java
new file mode 100644
index 0000000..79544e6
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.tree;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow;
+import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.GridLongList;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<CacheSearchRow, CacheDataRow> {
+ /** */
+ private Boolean hasPrev;
+
+ /** */
+ private boolean canCleanup;
+
+ /** */
+ private GridLongList activeTxs;
+
+ /** */
+ private List<CacheSearchRow> cleanupRows;
+
+ /** */
+ private final MvccCoordinatorVersion mvccVer;
+
+ /**
+ * @param key Key.
+ * @param val Value.
+ * @param ver Version.
+ * @param mvccVer Mvcc version.
+ * @param part Partition.
+ * @param cacheId Cache ID.
+ */
+ public MvccUpdateRow(
+ KeyCacheObject key,
+ CacheObject val,
+ GridCacheVersion ver,
+ MvccCoordinatorVersion mvccVer,
+ int part,
+ int cacheId) {
+ super(key, val, ver, part, 0L, cacheId);
+
+ this.mvccVer = mvccVer;
+ }
+
+ /**
+ * @return {@code True} if previous value was non-null.
+ */
+ public boolean previousNotNull() {
+ return hasPrev != null && hasPrev;
+ }
+
+ /**
+ * @return Active transactions to wait for.
+ */
+ @Nullable public GridLongList activeTransactions() {
+ return activeTxs;
+ }
+
+ /**
+ * @return Rows which are safe to cleanup.
+ */
+ public List<CacheSearchRow> cleanupRows() {
+ return cleanupRows;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(BPlusTree<CacheSearchRow, CacheDataRow> tree,
+ BPlusIO<CacheSearchRow> io,
+ long pageAddr,
+ int idx)
+ throws IgniteCheckedException
+ {
+ RowLinkIO rowIo = (RowLinkIO)io;
+
+ // All previous version should be less then new one.
+ assert mvccVer.coordinatorVersion() >= rowIo.getMvccCoordinatorVersion(pageAddr, idx);
+ assert mvccVer.coordinatorVersion() > rowIo.getMvccCoordinatorVersion(pageAddr, idx) || mvccVer.counter() > rowIo.getMvccCounter(pageAddr, idx);
+
+ boolean checkActive = mvccVer.activeTransactions().size() > 0;
+
+ boolean txActive = false;
+
+ // Suppose transactions on previous coordinator versions are done.
+ if (checkActive && mvccVer.coordinatorVersion() == rowIo.getMvccCoordinatorVersion(pageAddr, idx)) {
+ long rowMvccCntr = rowIo.getMvccCounter(pageAddr, idx);
+
+ if (mvccVer.activeTransactions().contains(rowMvccCntr)) {
+ txActive = true;
+
+ if (activeTxs == null)
+ activeTxs = new GridLongList();
+
+ activeTxs.add(rowMvccCntr);
+ }
+ }
+
+ if (hasPrev == null)
+ hasPrev = Boolean.TRUE; // TODO IGNITE-3478 support removes.
+
+ if (!txActive) {
+ assert Long.compare(mvccVer.coordinatorVersion(), rowIo.getMvccCoordinatorVersion(pageAddr, idx)) >= 0;
+
+ int cmp;
+
+ if (mvccVer.coordinatorVersion() == rowIo.getMvccCoordinatorVersion(pageAddr, idx))
+ cmp = Long.compare(mvccVer.cleanupVersion(), rowIo.getMvccCounter(pageAddr, idx));
+ else
+ cmp = 1;
+
+ if (cmp >= 0) {
+ // Do not cleanup oldest version.
+ if (canCleanup) {
+ CacheSearchRow row = io.getLookupRow(tree, pageAddr, idx);
+
+ assert row.link() != 0 && row.mvccCoordinatorVersion() > 0 : row;
+
+ // Should not be possible to cleanup active tx.
+ assert row.mvccCoordinatorVersion() != mvccVer.coordinatorVersion()
+ || !mvccVer.activeTransactions().contains(row.mvccCounter());
+
+ if (cleanupRows == null)
+ cleanupRows = new ArrayList<>();
+
+ cleanupRows.add(row);
+ }
+ else
+ canCleanup = true;
+ }
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long mvccCoordinatorVersion() {
+ return mvccVer.coordinatorVersion();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long mvccCounter() {
+ return mvccVer.counter();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(MvccUpdateRow.class, this, "super", super.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6e25b649/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccVersionBasedSearchRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccVersionBasedSearchRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccVersionBasedSearchRow.java
index 6af2c4c..c829afb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccVersionBasedSearchRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccVersionBasedSearchRow.java
@@ -11,7 +11,7 @@
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
+ See the License for the specific language governing permissions and
* limitations under the License.
*/
@@ -21,18 +21,23 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter;
import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow;
import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
/**
*
*/
-public class MvccVersionBasedSearchRow extends SearchRow implements BPlusTree.RowPredicate<CacheSearchRow, CacheDataRow> {
+public class MvccVersionBasedSearchRow extends SearchRow implements BPlusTree.TreeRowClosure<CacheSearchRow, CacheDataRow> {
/** */
private final MvccCoordinatorVersion ver;
+ /** */
+ private CacheDataRow resRow;
+
/**
* @param cacheId Cache ID.
* @param key Key.
@@ -46,21 +51,36 @@ public class MvccVersionBasedSearchRow extends SearchRow implements BPlusTree.Ro
this.ver = ver;
}
+ /**
+ * @return Found row.
+ */
+ @Nullable public CacheDataRow row() {
+ return resRow;
+ }
+
/** {@inheritDoc} */
@Override public boolean apply(BPlusTree<CacheSearchRow, CacheDataRow> tree,
BPlusIO<CacheSearchRow> io,
long pageAddr,
int idx) throws IgniteCheckedException
{
- if (ver.activeTransactions() == null)
- return true;
+ boolean visible = true;
+
+ if (ver.activeTransactions().size() > 0) {
+ RowLinkIO rowIo = (RowLinkIO)io;
+
+ // TODO IGNITE-3478 sort active transactions?
+ if (rowIo.getMvccCoordinatorVersion(pageAddr, idx) == ver.coordinatorVersion())
+ visible = !ver.activeTransactions().contains(rowIo.getMvccCounter(pageAddr, idx));
+ }
- RowLinkIO rowIo = (RowLinkIO)io;
+ if (visible) {
+ resRow = ((CacheDataTree) tree).getRow(io, pageAddr, idx, CacheDataRowAdapter.RowData.NO_KEY);
- if (rowIo.getMvccCoordinatorVersion(pageAddr, idx) != ver.coordinatorVersion())
- return true;
+ return false; // Stop search.
+ }
- return !ver.activeTransactions().contains(rowIo.getMvccCounter(pageAddr, idx)); // TODO IGNITE-3478 sort active transactions?
+ return true;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/6e25b649/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
index 89b3df2..115e8a2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
@@ -2539,6 +2539,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
}
/**
+ * @param restartCrd If {@code true} dedicated coordinator node is restarted during test.
* @param srvs Number of server nodes.
* @param clients Number of client nodes.
* @param cacheBackups Number of cache backups.
@@ -2685,6 +2686,45 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
stop.set(true);
}
}
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testSize() throws Exception {
+ Ignite node = startGrid(0);
+
+ IgniteCache cache = node.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, 1));
+
+ assertEquals(cache.size(), 0);
+
+ final int KEYS = 10;
+
+ for (int i = 0; i < KEYS; i++) {
+ final Integer key = i;
+
+ try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ cache.put(key, i);
+
+ tx.commit();
+ }
+
+ assertEquals(i + 1, cache.size());
+ }
+
+ for (int i = 0; i < KEYS; i++) {
+ final Integer key = i;
+
+ try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ cache.put(key, i);
+
+ tx.commit();
+ }
+
+ assertEquals(KEYS, cache.size());
+ }
+
+ // TODO IGNITE-3478: test removes.
+ }
+
/**
* @throws IgniteCheckedException If failed.
http://git-wip-us.apache.org/repos/asf/ignite/blob/6e25b649/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java
index e7ab34f..e2f6b2e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java
@@ -571,7 +571,7 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
assertNoLocks();
assertEquals(x, tree.findOne(x).longValue());
- assertEquals(x, tree.findOneBounded(x, x, null, null).longValue());
+ checkIterate(tree, x, x, x, true);
assertNoLocks();
@@ -588,13 +588,13 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
for (long x = 0; x < cnt; x++) {
assertEquals(x, tree.findOne(x).longValue());
- assertEquals(x, tree.findOneBounded(x, x, null, null).longValue());
+ checkIterate(tree, x, x, x, true);
}
assertNoLocks();
assertNull(tree.findOne(cnt));
- assertNull(tree.findOneBounded(cnt, cnt, null, null));
+ checkIterate(tree, cnt, cnt, null, false);
for (long x = RMV_INC > 0 ? 0 : cnt - 1; x >= 0 && x < cnt; x += RMV_INC) {
X.println(" -- " + x);
@@ -608,7 +608,7 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
assertNoLocks();
assertNull(tree.findOne(x));
- assertNull(tree.findOneBounded(x, x, null, null));
+ checkIterate(tree, x, x, null, false);
assertNoLocks();
@@ -625,6 +625,32 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
}
/**
+ * @param tree
+ * @param lower
+ * @param upper
+ * @param exp
+ * @param expFound
+ * @throws IgniteCheckedException
+ */
+ private void checkIterate(TestTree tree, long lower, long upper, Long exp, boolean expFound)
+ throws IgniteCheckedException {
+ TestTreeRowClosure c = new TestTreeRowClosure(exp);
+
+ tree.iterate(lower, upper, c);
+
+ assertEquals(expFound, c.found);
+ }
+
+ private void checkIterateC(TestTree tree, long lower, long upper, TestTreeRowClosure c, boolean expFound)
+ throws IgniteCheckedException {
+ c.found = false;
+
+ tree.iterate(lower, upper, c);
+
+ assertEquals(expFound, c.found);
+ }
+
+ /**
* @throws IgniteCheckedException If failed.
*/
public void testRandomInvoke_1_30_1() throws IgniteCheckedException {
@@ -1250,44 +1276,53 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
/**
* @throws Exception If failed.
*/
- public void testFindOneBounded() throws Exception {
+ public void testIterate() throws Exception {
MAX_PER_PAGE = 5;
TestTree tree = createTestTree(true);
- assertNull(tree.findOneBounded(0L, 100L, null, null));
+ checkIterate(tree, 0L, 100L, null, false);
for (long idx = 1L; idx <= 10L; ++idx)
tree.put(idx);
for (long idx = 1L; idx <= 10L; ++idx)
- assertEquals(idx, (Object)tree.findOneBounded(idx, 100L, null, null));
+ checkIterate(tree, idx, 100L, idx, true);
- assertEquals(1L, (Object)tree.findOneBounded(0L, 100L, null, null));
+ checkIterate(tree, 0L, 100L, 1L, true);
for (long idx = 1L; idx <= 10L; ++idx)
- assertEquals(10L, (Object)tree.findOneBounded(idx, 100L, new TestRowPredicate(10L), null));
+ checkIterate(tree, idx, 100L, 10L, true);
- assertNull(tree.findOneBounded(0L, 100L, new TestRowPredicate(100L), null));
+ checkIterate(tree, 0L, 100L, 100L, false);
for (long idx = 1L; idx <= 10L; ++idx)
- assertEquals(idx, (Object)tree.findOneBounded(0L, 100L, new TestRowPredicate(idx), null));
+ checkIterate(tree, 0L, 100L, idx, true);
for (long idx = 0L; idx <= 10L; ++idx)
- assertNull(tree.findOneBounded(idx, 11L, new TestRowPredicate(-1L), null));
+ checkIterate(tree, idx, 11L, -1L, false);
}
/**
* @throws Exception If failed.
*/
- public void testFindOneBoundedConcurrentPutRemove() throws Exception {
+ public void testIterateConcurrentPutRemove() throws Exception {
+ findOneBoundedConcurrentPutRemove();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testIterateConcurrentPutRemove_1() throws Exception {
+ MAX_PER_PAGE = 1;
+
findOneBoundedConcurrentPutRemove();
}
/**
* @throws Exception If failed.
*/
- public void testFindOneBoundedConcurrentPutRemove_5() throws Exception {
+ public void testIterateConcurrentPutRemove_5() throws Exception {
MAX_PER_PAGE = 5;
findOneBoundedConcurrentPutRemove();
@@ -1296,7 +1331,7 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
/**
* @throws Exception If failed.
*/
- public void testFindOneBoundedConcurrentPutRemove_10() throws Exception {
+ public void testIteratePutRemove_10() throws Exception {
MAX_PER_PAGE = 10;
findOneBoundedConcurrentPutRemove();
@@ -1370,33 +1405,30 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
info("Iteration [iter=" + i + ", key=" + findKey + ']');
assertEquals(findKey, tree.findOne(findKey));
- assertEquals(findKey, tree.findOneBounded(findKey, findKey, null, null));
+ checkIterate(tree, findKey, findKey, findKey, true);
IgniteInternalFuture getFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
ThreadLocalRandom rnd = ThreadLocalRandom.current();
- TestRowPredicate p = new TestRowPredicate(findKey);
+ TestTreeRowClosure p = new TestTreeRowClosure(findKey);
- TestRowPredicate falseP = new TestRowPredicate(-1L);
+ TestTreeRowClosure falseP = new TestTreeRowClosure(-1L);
int cnt = 0;
while (!stop.get()) {
int shift = MAX_PER_PAGE > 0 ? rnd.nextInt(MAX_PER_PAGE * 2) : rnd.nextInt(100);
- assertEquals(findKey, tree.findOneBounded(findKey, findKey, null, null));
+ checkIterateC(tree, findKey, findKey, p, true);
- assertEquals(findKey,
- tree.findOneBounded(findKey - shift, findKey, p, null));
+ checkIterateC(tree, findKey - shift, findKey, p, true);
- assertEquals(findKey,
- tree.findOneBounded(findKey - shift, findKey + shift, p, null));
+ checkIterateC(tree, findKey - shift, findKey + shift, p, true);
- assertEquals(findKey,
- tree.findOneBounded(findKey, findKey + shift, p, null));
+ checkIterateC(tree, findKey, findKey + shift, p, true);
- assertNull(tree.findOneBounded(-100L, KEYS + 100L, falseP, null));
+ checkIterateC(tree, -100L, KEYS + 100L, falseP, false);
cnt++;
}
@@ -1650,7 +1682,11 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
last = c.get();
}
- last = tree.findOneBounded((long)low, (long)high, null, null);
+ TestTreeFindFirstClosure cl = new TestTreeFindFirstClosure();
+
+ tree.iterate((long)low, (long)high, cl);
+
+ last = cl.val;
if (last != null) {
assertTrue(low + " <= " + last + " <= " + high, last >= low);
@@ -2064,23 +2100,46 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
/**
*
*/
- static class TestRowPredicate implements TestTree.RowPredicate<Long, Long> {
+ static class TestTreeRowClosure implements BPlusTree.TreeRowClosure<Long, Long> {
/** */
private final Long expVal;
+ /** */
+ private boolean found;
+
/**
- * @param expVal Expected value.
+ * @param expVal Value to find or {@code null} to find first.
*/
- TestRowPredicate(Long expVal) {
+ TestTreeRowClosure(Long expVal) {
this.expVal = expVal;
}
/** {@inheritDoc} */
@Override public boolean apply(BPlusTree<Long, Long> tree, BPlusIO<Long> io, long pageAddr, int idx)
throws IgniteCheckedException {
- Long row = io.getLookupRow(tree, pageAddr, idx);
+ assert !found;
+
+ found = expVal == null || io.getLookupRow(tree, pageAddr, idx).equals(expVal);
+
+ return !found;
+ }
+ }
+
+ /**
+ *
+ */
+ static class TestTreeFindFirstClosure implements BPlusTree.TreeRowClosure<Long, Long> {
+ /** */
+ private Long val;
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(BPlusTree<Long, Long> tree, BPlusIO<Long> io, long pageAddr, int idx)
+ throws IgniteCheckedException {
+ assert val == null;
+
+ val = io.getLookupRow(tree, pageAddr, idx);
- return row.equals(expVal);
+ return false;
}
}
}
[17/17] ignite git commit: ignite-5932
Posted by sb...@apache.org.
ignite-5932
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5318153d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5318153d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5318153d
Branch: refs/heads/ignite-5932
Commit: 5318153d6d0648b8496b7f4b8d02d1b1b7181d4d
Parents: ca82daa
Author: sboikov <sb...@gridgain.com>
Authored: Wed Oct 11 11:37:34 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Oct 11 13:42:46 2017 +0300
----------------------------------------------------------------------
.../GridDistributedTxRemoteAdapter.java | 2 +-
...arOptimisticSerializableTxPrepareFuture.java | 127 ++++++++++++++++++-
.../transactions/IgniteTxLocalAdapter.java | 2 +-
3 files changed, 128 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/5318153d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index 77039cc..b21a7b8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -474,7 +474,7 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
cctx.database().checkpointReadLock();
try {
- assert !txState.mvccEnabled(cctx) || mvccInfo != null;
+ assert !txState.mvccEnabled(cctx) || mvccInfo != null : "Mvcc is not initialized: " + this;
Collection<IgniteTxEntry> entries = near() ? allEntries() : writeEntries();
http://git-wip-us.apache.org/repos/asf/ignite/blob/5318153d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index 9d36bca..86669ca 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -38,6 +38,10 @@ import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxMapping;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccResponseListener;
+import org.apache.ignite.internal.processors.cache.mvcc.TxMvccInfo;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -56,6 +60,7 @@ import org.apache.ignite.internal.util.typedef.P1;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteReducer;
import org.jetbrains.annotations.Nullable;
@@ -68,6 +73,10 @@ import static org.apache.ignite.transactions.TransactionState.PREPARING;
*/
public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptimisticTxPrepareFutureAdapter {
/** */
+ private static final AtomicIntegerFieldUpdater<MvccVersionFuture> LOCK_CNT_UPD =
+ AtomicIntegerFieldUpdater.newUpdater(MvccVersionFuture.class, "lockCnt");
+
+ /** */
@GridToStringExclude
private KeyLockFuture keyLockFut;
@@ -76,6 +85,10 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
private ClientRemapFuture remapFut;
/** */
+ @GridToStringExclude
+ private MvccVersionFuture mvccVerFut;
+
+ /** */
private int miniId;
/**
@@ -193,6 +206,9 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
if (keyLockFut != null)
keyLockFut.onDone(e);
+
+ if (mvccVerFut != null)
+ mvccVerFut.onDone();
}
/** {@inheritDoc} */
@@ -345,11 +361,25 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
boolean hasNearCache = false;
+ MvccCoordinator mvccCrd = null;
+
for (IgniteTxEntry write : writes) {
map(write, topVer, mappings, txMapping, remap, topLocked);
- if (write.context().isNear())
+ GridCacheContext cctx = write.context();
+
+ if (cctx.isNear())
hasNearCache = true;
+
+ if (cctx.mvccEnabled() && mvccCrd == null) {
+ mvccCrd = cctx.affinity().mvccCoordinator(topVer);
+
+ if (mvccCrd == null) {
+ onDone(new IgniteCheckedException("Mvcc coordinator is not assigned: " + topVer));
+
+ return;
+ }
+ }
}
for (IgniteTxEntry read : reads)
@@ -365,6 +395,8 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
return;
}
+ assert !tx.txState().mvccEnabled(cctx) || mvccCrd != null;
+
tx.addEntryMapping(mappings.values());
cctx.mvcc().recheckPendingLocks();
@@ -376,12 +408,16 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
MiniFuture locNearEntriesFut = null;
+ int lockCnt = keyLockFut != null ? 1 : 0;
+
// Create futures in advance to have all futures when process {@link GridNearTxPrepareResponse#clientRemapVersion}.
for (GridDistributedTxMapping m : mappings.values()) {
assert !m.empty();
MiniFuture fut = new MiniFuture(this, m, ++miniId);
+ lockCnt++;
+
add((IgniteInternalFuture)fut);
if (m.primary().isLocal() && m.hasNearCacheEntries() && m.hasColocatedCacheEntries()) {
@@ -390,7 +426,24 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
locNearEntriesFut = fut;
add((IgniteInternalFuture)new MiniFuture(this, m, ++miniId));
+
+ lockCnt++;
+ }
+ }
+
+ if (mvccCrd != null) {
+ if (!remap) {
+ mvccVerFut = new MvccVersionFuture();
+
+ if (keyLockFut != null)
+ keyLockFut.listen(mvccVerFut);
+
+ add(mvccVerFut);
}
+ else
+ assert mvccVerFut != null;
+
+ mvccVerFut.init(mvccCrd, lockCnt);
}
Collection<IgniteInternalFuture<?>> futs = (Collection)futures();
@@ -722,6 +775,75 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
}
/**
+ *
+ */
+ private class MvccVersionFuture extends GridFutureAdapter implements MvccResponseListener,
+ IgniteInClosure<IgniteInternalFuture<Void>> {
+ /** */
+ MvccCoordinator crd;
+
+ /** */
+ volatile int lockCnt;
+
+ @Override public void apply(IgniteInternalFuture<Void> keyLockFut) {
+ try {
+ keyLockFut.get();
+
+ onLockReceived();
+ }
+ catch (IgniteCheckedException e) {
+ if (log.isDebugEnabled())
+ log.debug("MvccVersionFuture ignores key lock future failure: " + e);
+ }
+ }
+
+ /**
+ * @param crd Mvcc coordinator.
+ * @param lockCnt Expected number of lock responses.
+ */
+ void init(MvccCoordinator crd, int lockCnt) {
+ assert crd != null;
+ assert lockCnt > 0;
+
+ this.crd = crd;
+ this.lockCnt = lockCnt;
+
+ assert !isDone();
+ }
+
+ /**
+ *
+ */
+ void onLockReceived() {
+ int remaining = LOCK_CNT_UPD.decrementAndGet(this);
+
+ assert remaining >= 0 : remaining;
+
+ if (remaining == 0) {
+ // TODO IGNTIE-3478: add method to do not create one more future in requestTxCounter.
+ if (cctx.localNodeId().equals(crd.nodeId()))
+ onMvccResponse(crd.nodeId(), cctx.coordinators().requestTxCounterOnCoordinator(tx));
+ else
+ cctx.coordinators().requestTxCounter(crd, this, tx.nearXidVersion());
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onMvccResponse(UUID crdId, MvccCoordinatorVersion res) {
+ tx.mvccInfo(new TxMvccInfo(crdId, res));
+
+ onDone();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onMvccError(IgniteCheckedException e) {
+ ERR_UPD.compareAndSet(GridNearOptimisticSerializableTxPrepareFuture.this, null, e);
+
+ onDone();
+ }
+ }
+
+ /**
* Client remap future.
*/
private static class ClientRemapFuture extends GridCompoundFuture<GridNearTxPrepareResponse, Boolean> {
@@ -963,6 +1085,9 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
// Finish this mini future (need result only on client node).
onDone(parent.cctx.kernalContext().clientNode() ? res : null);
+
+ if (parent.mvccVerFut != null)
+ parent.mvccVerFut.onLockReceived();
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5318153d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index d8f911c..0ebf2f6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -520,7 +520,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
try {
cctx.tm().txContext(this);
- assert !txState.mvccEnabled(cctx) || mvccInfo != null;
+ assert !txState.mvccEnabled(cctx) || mvccInfo != null : "Mvcc is not initialized: " + this;
AffinityTopologyVersion topVer = topologyVersion();
[15/17] ignite git commit: Merge remote-tracking branch
'remotes/origin/ignite-3478' into ignite-5932
Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-3478' into ignite-5932
# Conflicts:
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
# modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/52107670
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/52107670
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/52107670
Branch: refs/heads/ignite-5932
Commit: 521076700fde97118f35387a2e4bcfdcc29fd94b
Parents: 224f244 970cf47
Author: sboikov <sb...@gridgain.com>
Authored: Wed Oct 11 11:28:13 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Oct 11 11:28:13 2017 +0300
----------------------------------------------------------------------
.../JettyRestProcessorAbstractSelfTest.java | 4 +-
.../ignite/internal/binary/BinaryMetadata.java | 8 +-
.../cache/IgniteCacheOffheapManager.java | 65 ++++++--
.../cache/IgniteCacheOffheapManagerImpl.java | 32 +++-
.../distributed/dht/GridDhtCacheEntry.java | 5 +-
.../persistence/GridCacheOffheapManager.java | 7 +
.../transactions/IgniteTxLocalAdapter.java | 26 ++-
.../cache/mvcc/CacheMvccTransactionsTest.java | 163 +++++++++++++++----
.../query/h2/opt/GridH2IndexBase.java | 25 ++-
9 files changed, 267 insertions(+), 68 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/52107670/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index 9e3d0fb,2c070fc..e498b5e
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@@ -542,10 -549,11 +549,27 @@@ public interface IgniteCacheOffheapMana
/**
* @param cctx Cache context.
+ * @param key Key.
+ * @param val Value.
+ * @param ver Version.
+ * @param mvccVer Mvcc version.
++ * @return {@code True} if new value was inserted.
++ * @throws IgniteCheckedException If failed.
++ */
++ boolean mvccInitialValue(
++ GridCacheContext cctx,
++ KeyCacheObject key,
++ @Nullable CacheObject val,
++ GridCacheVersion ver,
++ MvccCoordinatorVersion mvccVer) throws IgniteCheckedException;
++
++ /**
++ * @param cctx Cache context.
+ * @param primary {@code True} if update is executed on primary node.
+ * @param key Key.
+ * @param val Value.
+ * @param ver Version.
+ * @param mvccVer Mvcc version.
* @return List of transactions to wait for.
* @throws IgniteCheckedException If failed.
*/
@@@ -557,11 -565,26 +581,32 @@@
GridCacheVersion ver,
MvccCoordinatorVersion mvccVer) throws IgniteCheckedException;
+ @Nullable GridLongList mvccRemove(
+ GridCacheContext cctx,
+ boolean primary,
+ KeyCacheObject key,
+ MvccCoordinatorVersion mvccVer) throws IgniteCheckedException;
++
+ /**
+ * @param cctx Cache context.
+ * @param primary {@code True} if update is executed on primary node.
+ * @param key Key.
+ * @param mvccVer Mvcc version.
+ * @return List of transactions to wait for.
+ * @throws IgniteCheckedException If failed.
+ */
+ @Nullable GridLongList mvccRemove(
+ GridCacheContext cctx,
+ boolean primary,
+ KeyCacheObject key,
+ MvccCoordinatorVersion mvccVer) throws IgniteCheckedException;
+
+ /**
+ * @param cctx Cache context.
+ * @param key Key.
+ * @throws IgniteCheckedException If failed.
+ */
+ void mvccRemoveAll(GridCacheContext cctx, KeyCacheObject key) throws IgniteCheckedException;
/**
* @param cctx Cache context.
http://git-wip-us.apache.org/repos/asf/ignite/blob/52107670/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
----------------------------------------------------------------------
[07/17] ignite git commit: ignite-5937
Posted by sb...@apache.org.
ignite-5937
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5c195c3b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5c195c3b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5c195c3b
Branch: refs/heads/ignite-5932
Commit: 5c195c3bece5924b03d141d8bdf68da488ababf9
Parents: 6e25b64
Author: sboikov <sb...@gridgain.com>
Authored: Mon Oct 9 13:43:03 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Oct 9 13:43:03 2017 +0300
----------------------------------------------------------------------
.../internal/processors/cache/persistence/tree/BPlusTree.java | 1 -
1 file changed, 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/5c195c3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
index 9752b17..b31a61f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
@@ -62,7 +62,6 @@ import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.SB;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
-import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree.Bool.DONE;
import static org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree.Bool.FALSE;
[08/17] ignite git commit: ignite-5937
Posted by sb...@apache.org.
ignite-5937
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0bbbbe29
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0bbbbe29
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0bbbbe29
Branch: refs/heads/ignite-5932
Commit: 0bbbbe2909718a171f2c4e31d39851220f4b646a
Parents: 5c195c3
Author: sboikov <sb...@gridgain.com>
Authored: Mon Oct 9 16:24:30 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Oct 9 16:24:30 2017 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheMapEntry.java | 20 ++++++++--
.../cache/IgniteCacheOffheapManager.java | 20 +++++++++-
.../cache/IgniteCacheOffheapManagerImpl.java | 39 +++++++++++++++++++-
.../persistence/GridCacheOffheapManager.java | 17 ++++++++-
.../processors/cache/tree/MvccUpdateRow.java | 28 ++++++++++++--
5 files changed, 112 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/0bbbbe29/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 00307a5..4b71a7b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -1012,7 +1012,11 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
if (cctx.mvccEnabled() && !((IgniteCacheOffheapManagerImpl)cctx.offheap()).IGNITE_FAKE_MVCC_STORAGE) {
assert mvccVer != null;
- mvccWaitTxs = cctx.offheap().mvccUpdate(this, val, newVer, mvccVer);
+ mvccWaitTxs = cctx.offheap().mvccUpdate(tx.local(),
+ this,
+ val,
+ newVer,
+ mvccVer);
}
else
storeValue(val, expireTime, newVer, null);
@@ -1141,6 +1145,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
boolean marked = false;
+ GridLongList mvccWaitTxs = null;
+
synchronized (this) {
checkObsolete();
@@ -1181,7 +1187,13 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
}
}
- removeValue();
+ if (cctx.mvccEnabled() && !((IgniteCacheOffheapManagerImpl)cctx.offheap()).IGNITE_FAKE_MVCC_STORAGE) {
+ assert mvccVer != null;
+
+ mvccWaitTxs = cctx.offheap().mvccRemove(tx.local(), this, mvccVer);
+ }
+ else
+ removeValue();
update(null, 0, 0, newVer, true);
@@ -1292,7 +1304,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
cctx.config().getInterceptor().onAfterRemove(entry0);
if (valid)
- return new GridCacheUpdateTxResult(true, updateCntr0, null);
+ return new GridCacheUpdateTxResult(true, updateCntr0, mvccWaitTxs);
else
return new GridCacheUpdateTxResult(false);
}
@@ -2593,7 +2605,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
if (val != null) {
if (cctx.mvccEnabled())
- cctx.offheap().mvccUpdate(this, val, ver, mvccVer);
+ cctx.offheap().mvccUpdate(false, this, val, ver, mvccVer);
else
storeValue(val, expTime, ver, null);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0bbbbe29/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index 8967ce8..bee2108 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -196,10 +196,19 @@ public interface IgniteCacheOffheapManager {
* @return Transactions to wait for before finishing current transaction.
* @throws IgniteCheckedException If failed.
*/
- @Nullable public GridLongList mvccUpdate(GridCacheMapEntry entry,
+ @Nullable public GridLongList mvccUpdate(
+ boolean primary,
+ GridCacheMapEntry entry,
CacheObject val,
GridCacheVersion ver,
- MvccCoordinatorVersion mvccVer) throws IgniteCheckedException;
+ MvccCoordinatorVersion mvccVer
+ ) throws IgniteCheckedException;
+
+ @Nullable public GridLongList mvccRemove(
+ boolean primary,
+ GridCacheMapEntry entry,
+ MvccCoordinatorVersion mvccVer
+ ) throws IgniteCheckedException;
/**
* @param cctx Cache context.
@@ -503,11 +512,18 @@ public interface IgniteCacheOffheapManager {
*/
@Nullable GridLongList mvccUpdate(
GridCacheContext cctx,
+ boolean primary,
KeyCacheObject key,
CacheObject val,
GridCacheVersion ver,
MvccCoordinatorVersion mvccVer) throws IgniteCheckedException;
+ @Nullable GridLongList mvccRemove(
+ GridCacheContext cctx,
+ boolean primary,
+ KeyCacheObject key,
+ MvccCoordinatorVersion mvccVer) throws IgniteCheckedException;
+
/**
* @param cctx Cache context.
* @param key Key.
http://git-wip-us.apache.org/repos/asf/ignite/blob/0bbbbe29/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 25f36b2..dd4d7e0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -380,11 +380,14 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
}
/** {@inheritDoc} */
- @Override public GridLongList mvccUpdate(GridCacheMapEntry entry,
+ @Override public GridLongList mvccUpdate(
+ boolean primary,
+ GridCacheMapEntry entry,
CacheObject val,
GridCacheVersion ver,
MvccCoordinatorVersion mvccVer) throws IgniteCheckedException {
return dataStore(entry.localPartition()).mvccUpdate(entry.context(),
+ primary,
entry.key(),
val,
ver,
@@ -392,6 +395,19 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
}
/** {@inheritDoc} */
+ @Override public GridLongList mvccRemove(
+ boolean primary,
+ GridCacheMapEntry entry,
+ MvccCoordinatorVersion mvccVer
+ )
+ throws IgniteCheckedException {
+ return dataStore(entry.localPartition()).mvccRemove(entry.context(),
+ primary,
+ entry.key(),
+ mvccVer);
+ }
+
+ /** {@inheritDoc} */
@Override public void updateIndexes(GridCacheContext cctx, KeyCacheObject key, GridDhtLocalPartition part)
throws IgniteCheckedException {
dataStore(part).updateIndexes(cctx, key);
@@ -1350,8 +1366,29 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
}
/** {@inheritDoc} */
+ @Override public GridLongList mvccRemove(GridCacheContext cctx,
+ boolean primary,
+ KeyCacheObject key,
+ MvccCoordinatorVersion mvccVer) throws IgniteCheckedException {
+ assert mvccVer != null;
+
+ if (!busyLock.enterBusy())
+ throw new NodeStoppingException("Operation has been cancelled (node is stopping).");
+
+ try {
+
+
+ return null;
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public GridLongList mvccUpdate(
GridCacheContext cctx,
+ boolean primary,
KeyCacheObject key,
CacheObject val,
GridCacheVersion ver,
http://git-wip-us.apache.org/repos/asf/ignite/blob/0bbbbe29/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index cb01b6c..1f52309 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -1251,14 +1251,27 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
}
/** {@inheritDoc} */
- @Override public GridLongList mvccUpdate(GridCacheContext cctx,
+ @Override public GridLongList mvccUpdate(
+ GridCacheContext cctx,
+ boolean primary,
KeyCacheObject key,
CacheObject val,
GridCacheVersion ver,
MvccCoordinatorVersion mvccVer) throws IgniteCheckedException {
CacheDataStore delegate = init0(false);
- return delegate.mvccUpdate(cctx, key, val, ver, mvccVer);
+ return delegate.mvccUpdate(cctx, primary, key, val, ver, mvccVer);
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridLongList mvccRemove(
+ GridCacheContext cctx,
+ boolean primary,
+ KeyCacheObject key,
+ MvccCoordinatorVersion mvccVer) throws IgniteCheckedException {
+ CacheDataStore delegate = init0(false);
+
+ return delegate.mvccRemove(cctx, primary, key, mvccVer);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/0bbbbe29/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java
index 79544e6..d3303e8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java
@@ -92,6 +92,29 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C
return cleanupRows;
}
+ /**
+ * @param io IO.
+ * @param pageAddr Page address.
+ * @param idx Item index.
+ * @return Always {@code true}.
+ */
+ private boolean assertVersionGreater(RowLinkIO io, long pageAddr, int idx) {
+ long rowCrdVer = io.getMvccCoordinatorVersion(pageAddr, idx);
+ long rowCntr = io.getMvccCounter(pageAddr, idx);
+
+ int cmp = Long.compare(mvccCoordinatorVersion(), rowCrdVer);
+
+ if (cmp == 0)
+ cmp = Long.compare(mvccCounter(), rowCntr);
+
+ assert cmp > 0 : "[updCrd=" + mvccCoordinatorVersion() +
+ ", updCntr=" + mvccCounter() +
+ ", rowCrd=" + rowCrdVer +
+ ", rowCntr=" + rowCntr + ']';
+
+ return true;
+ }
+
/** {@inheritDoc} */
@Override public boolean apply(BPlusTree<CacheSearchRow, CacheDataRow> tree,
BPlusIO<CacheSearchRow> io,
@@ -101,9 +124,8 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C
{
RowLinkIO rowIo = (RowLinkIO)io;
- // All previous version should be less then new one.
- assert mvccVer.coordinatorVersion() >= rowIo.getMvccCoordinatorVersion(pageAddr, idx);
- assert mvccVer.coordinatorVersion() > rowIo.getMvccCoordinatorVersion(pageAddr, idx) || mvccVer.counter() > rowIo.getMvccCounter(pageAddr, idx);
+ // All previous versions should be less then new one.
+ assert assertVersionGreater(rowIo, pageAddr, idx);
boolean checkActive = mvccVer.activeTransactions().size() > 0;
[13/17] ignite git commit: ignite-5937
Posted by sb...@apache.org.
ignite-5937
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bb969db0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bb969db0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bb969db0
Branch: refs/heads/ignite-5932
Commit: bb969db0457e46fc2db4322927bd3536fdd9fb7b
Parents: de3ed0d
Author: sboikov <sb...@gridgain.com>
Authored: Tue Oct 10 15:36:01 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Oct 10 15:41:21 2017 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheEntryInfo.java | 5 -
.../processors/cache/GridCacheMapEntry.java | 18 ++-
.../cache/IgniteCacheOffheapManager.java | 39 +++++
.../cache/IgniteCacheOffheapManagerImpl.java | 160 +++++++++++++------
.../cache/mvcc/MvccCoordinatorVersion.java | 5 -
.../mvcc/MvccCoordinatorVersionResponse.java | 5 -
.../persistence/GridCacheOffheapManager.java | 14 ++
.../processors/cache/tree/MvccRemoveRow.java | 11 +-
.../processors/cache/tree/MvccUpdateRow.java | 25 ++-
.../datastreamer/DataStreamerImpl.java | 6 +-
.../cache/mvcc/CacheMvccTransactionsTest.java | 34 +++-
11 files changed, 235 insertions(+), 87 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/bb969db0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java
index 8a5f0df..e09d33c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java
@@ -101,11 +101,6 @@ public class GridCacheEntryInfo implements Message, MvccCoordinatorVersion {
return 0;
}
- /** {@inheritDoc} */
- @Override public boolean initialLoad() {
- return true;
- }
-
/**
* @return Cache ID.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/bb969db0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index ded9513..a1535e9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -2581,6 +2581,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
boolean walEnabled = !cctx.isNear() && cctx.shared().wal() != null;
+ // TODO IGNITE-3478: move checks in special initialValue method.
if (cctx.shared().database().persistenceEnabled()) {
unswap(false);
@@ -2603,14 +2604,19 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
val = cctx.kernalContext().cacheObjects().prepareForCache(val, cctx);
- if (val != null) {
- if (cctx.mvccEnabled())
- cctx.offheap().mvccUpdate(false, this, val, ver, mvccVer);
- else
- storeValue(val, expTime, ver, null);
+ if (cctx.mvccEnabled()) {
+ cctx.offheap().mvccInitialValue(this, val, ver, mvccVer);
+
+ if (val != null)
+ update(val, expTime, ttl, ver, true);
}
+ else {
+ if (val != null) {
+ storeValue(val, expTime, ver, null);
- update(val, expTime, ttl, ver, true);
+ update(val, expTime, ttl, ver, true);
+ }
+ }
boolean skipQryNtf = false;
http://git-wip-us.apache.org/repos/asf/ignite/blob/bb969db0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index bee2108..9e3d0fb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -189,6 +189,7 @@ public interface IgniteCacheOffheapManager {
throws IgniteCheckedException;
/**
+ * @param primary {@code True} if on primary node.
* @param entry Entry.
* @param val Value.
* @param ver Cache version.
@@ -204,6 +205,13 @@ public interface IgniteCacheOffheapManager {
MvccCoordinatorVersion mvccVer
) throws IgniteCheckedException;
+ /**
+ * @param primary {@code True} if on primary node.
+ * @param entry Entry.
+ * @param mvccVer Mvcc update version.
+ * @return Transactions to wait for before finishing current transaction.
+ * @throws IgniteCheckedException If failed.
+ */
@Nullable public GridLongList mvccRemove(
boolean primary,
GridCacheMapEntry entry,
@@ -211,6 +219,21 @@ public interface IgniteCacheOffheapManager {
) throws IgniteCheckedException;
/**
+ * @param entry Entry.
+ * @param val Value.
+ * @param ver Version.
+ * @param mvccVer Mvcc update version.
+ * @return {@code True} if value was inserted.
+ * @throws IgniteCheckedException If failed.
+ */
+ public boolean mvccInitialValue(
+ GridCacheMapEntry entry,
+ @Nullable CacheObject val,
+ GridCacheVersion ver,
+ MvccCoordinatorVersion mvccVer
+ ) throws IgniteCheckedException;
+
+ /**
* @param cctx Cache context.
* @param key Key.
* @param val Value.
@@ -507,6 +530,22 @@ public interface IgniteCacheOffheapManager {
* @param val Value.
* @param ver Version.
* @param mvccVer Mvcc version.
+ * @return {@code True} if new value was inserted.
+ * @throws IgniteCheckedException If failed.
+ */
+ boolean mvccInitialValue(
+ GridCacheContext cctx,
+ KeyCacheObject key,
+ @Nullable CacheObject val,
+ GridCacheVersion ver,
+ MvccCoordinatorVersion mvccVer) throws IgniteCheckedException;
+
+ /**
+ * @param cctx Cache context.
+ * @param key Key.
+ * @param val Value.
+ * @param ver Version.
+ * @param mvccVer Mvcc version.
* @return List of transactions to wait for.
* @throws IgniteCheckedException If failed.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/bb969db0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 380ec94..4fb5bfd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -383,6 +383,20 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
}
/** {@inheritDoc} */
+ @Override public boolean mvccInitialValue(
+ GridCacheMapEntry entry,
+ CacheObject val,
+ GridCacheVersion ver,
+ MvccCoordinatorVersion mvccVer) throws IgniteCheckedException {
+ return dataStore(entry.localPartition()).mvccInitialValue(
+ entry.context(),
+ entry.key(),
+ val,
+ ver,
+ mvccVer);
+ }
+
+ /** {@inheritDoc} */
@Override public GridLongList mvccUpdate(
boolean primary,
GridCacheMapEntry entry,
@@ -1360,9 +1374,76 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
}
/** {@inheritDoc} */
- @Override public GridLongList mvccRemove(GridCacheContext cctx,
+ @Override public boolean mvccInitialValue(
+ GridCacheContext cctx,
+ KeyCacheObject key,
+ @Nullable CacheObject val,
+ GridCacheVersion ver,
+ MvccCoordinatorVersion mvccVer)
+ throws IgniteCheckedException
+ {
+ assert mvccVer != null;
+
+ if (!busyLock.enterBusy())
+ throw new NodeStoppingException("Operation has been cancelled (node is stopping).");
+
+ try {
+ assert val != null || CacheCoordinatorsProcessor.versionForRemovedValue(mvccVer.coordinatorVersion());
+
+ int cacheId = grp.storeCacheIdInDataPage() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID;
+
+ CacheObjectContext coCtx = cctx.cacheObjectContext();
+
+ // Make sure value bytes initialized.
+ key.valueBytes(coCtx);
+
+ MvccUpdateRow updateRow;
+
+ if (val != null) {
+ val.valueBytes(coCtx);
+
+ updateRow = new MvccUpdateRow(
+ key,
+ val,
+ ver,
+ mvccVer,
+ partId,
+ cacheId);
+ }
+ else {
+ updateRow = new MvccRemoveRow(
+ key,
+ mvccVer,
+ partId,
+ cacheId);
+ }
+
+ if (grp.sharedGroup() && updateRow.cacheId() == CU.UNDEFINED_CACHE_ID)
+ updateRow.cacheId(cctx.cacheId());
+
+ rowStore.addRow(updateRow);
+
+ boolean old = dataTree.putx(updateRow);
+
+ assert !old;
+
+ if (val != null)
+ incrementSize(cctx.cacheId());
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridLongList mvccUpdate(
+ GridCacheContext cctx,
boolean primary,
KeyCacheObject key,
+ CacheObject val,
+ GridCacheVersion ver,
MvccCoordinatorVersion mvccVer) throws IgniteCheckedException {
assert mvccVer != null;
@@ -1376,9 +1457,12 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
// Make sure value bytes initialized.
key.valueBytes(coCtx);
+ val.valueBytes(coCtx);
- MvccRemoveRow updateRow = new MvccRemoveRow(
+ MvccUpdateRow updateRow = new MvccUpdateRow(
key,
+ val,
+ ver,
mvccVer,
partId,
cacheId);
@@ -1392,27 +1476,20 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
if (res == MvccUpdateRow.UpdateResult.VERSION_FOUND) {
assert !primary : updateRow;
-
- cleanup(updateRow.cleanupRows(), false);
}
else {
- if (res == MvccUpdateRow.UpdateResult.PREV_NOT_NULL)
- decrementSize(cacheId);
-
- CacheSearchRow rmvRow = cleanup(updateRow.cleanupRows(), true);
-
- if (rmvRow == null)
- rowStore.addRow(updateRow);
- else
- updateRow.link(rmvRow.link());
-
- assert updateRow.link() != 0L;
+ rowStore.addRow(updateRow);
boolean old = dataTree.putx(updateRow);
assert !old;
+
+ if (res == MvccUpdateRow.UpdateResult.PREV_NULL)
+ incrementSize(cctx.cacheId());
}
+ cleanup(updateRow.cleanupRows(), false);
+
return updateRow.activeTransactions();
}
finally {
@@ -1421,12 +1498,9 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
}
/** {@inheritDoc} */
- @Override public GridLongList mvccUpdate(
- GridCacheContext cctx,
+ @Override public GridLongList mvccRemove(GridCacheContext cctx,
boolean primary,
KeyCacheObject key,
- CacheObject val,
- GridCacheVersion ver,
MvccCoordinatorVersion mvccVer) throws IgniteCheckedException {
assert mvccVer != null;
@@ -1440,12 +1514,9 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
// Make sure value bytes initialized.
key.valueBytes(coCtx);
- val.valueBytes(coCtx);
- MvccUpdateRow updateRow = new MvccUpdateRow(
+ MvccRemoveRow updateRow = new MvccRemoveRow(
key,
- val,
- ver,
mvccVer,
partId,
cacheId);
@@ -1453,42 +1524,34 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
if (grp.sharedGroup() && updateRow.cacheId() == CU.UNDEFINED_CACHE_ID)
updateRow.cacheId(cctx.cacheId());
- GridLongList waitTxs = null;
-
- if (mvccVer.initialLoad()) {
- rowStore.addRow(updateRow);
+ dataTree.iterate(updateRow, new MvccKeyMinVersionBound(cacheId, key), updateRow);
- boolean old = dataTree.putx(updateRow);
+ MvccUpdateRow.UpdateResult res = updateRow.updateResult();
- assert !old;
+ if (res == MvccUpdateRow.UpdateResult.VERSION_FOUND) {
+ assert !primary : updateRow;
- incrementSize(cctx.cacheId());
+ cleanup(updateRow.cleanupRows(), false);
}
else {
- dataTree.iterate(updateRow, new MvccKeyMinVersionBound(cacheId, key), updateRow);
+ if (res == MvccUpdateRow.UpdateResult.PREV_NOT_NULL)
+ decrementSize(cacheId);
- MvccUpdateRow.UpdateResult res = updateRow.updateResult();
+ CacheSearchRow rmvRow = cleanup(updateRow.cleanupRows(), true);
- if (res == MvccUpdateRow.UpdateResult.VERSION_FOUND) {
- assert !primary : updateRow;
- }
- else {
+ if (rmvRow == null)
rowStore.addRow(updateRow);
+ else
+ updateRow.link(rmvRow.link());
- boolean old = dataTree.putx(updateRow);
-
- assert !old;
-
- if (res == MvccUpdateRow.UpdateResult.PREV_NULL)
- incrementSize(cctx.cacheId());
- }
+ assert updateRow.link() != 0L;
- cleanup(updateRow.cleanupRows(), false);
+ boolean old = dataTree.putx(updateRow);
- waitTxs = updateRow.activeTransactions();
+ assert !old;
}
- return waitTxs;
+ return updateRow.activeTransactions();
}
finally {
busyLock.leaveBusy();
@@ -1848,7 +1911,10 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
long rowCrdVer = unmaskCoordinatorVersion(rowCrdVerMasked);
- if (rowCrdVer > ver.coordinatorVersion() || row.mvccCounter() > ver.counter())
+ if (rowCrdVer > ver.coordinatorVersion())
+ continue;
+
+ if (rowCrdVer == ver.coordinatorVersion() && row.mvccCounter() > ver.counter())
continue;
MvccLongList txs = ver.activeTransactions();
http://git-wip-us.apache.org/repos/asf/ignite/blob/bb969db0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java
index 4003b73..d80e43c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java
@@ -42,9 +42,4 @@ public interface MvccCoordinatorVersion extends Message {
* @return Counter.
*/
public long counter();
-
- /**
- * @return {@code True} if version for initial load update.
- */
- public boolean initialLoad();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/bb969db0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionResponse.java
index 20d23ed..c037226 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionResponse.java
@@ -159,11 +159,6 @@ public class MvccCoordinatorVersionResponse implements MvccCoordinatorMessage, M
}
/** {@inheritDoc} */
- @Override public boolean initialLoad() {
- return false;
- }
-
- /** {@inheritDoc} */
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
writer.setBuffer(buf);
http://git-wip-us.apache.org/repos/asf/ignite/blob/bb969db0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index ee651c2..45b78d4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -1256,6 +1256,20 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
}
/** {@inheritDoc} */
+ @Override public boolean mvccInitialValue(
+ GridCacheContext cctx,
+ KeyCacheObject key,
+ @Nullable CacheObject val,
+ GridCacheVersion ver,
+ MvccCoordinatorVersion mvccVer)
+ throws IgniteCheckedException
+ {
+ CacheDataStore delegate = init0(false);
+
+ return delegate.mvccInitialValue(cctx, key, val, ver, mvccVer);
+ }
+
+ /** {@inheritDoc} */
@Override public GridLongList mvccUpdate(
GridCacheContext cctx,
boolean primary,
http://git-wip-us.apache.org/repos/asf/ignite/blob/bb969db0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccRemoveRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccRemoveRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccRemoveRow.java
index 8fd8a6e..af11a9d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccRemoveRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccRemoveRow.java
@@ -18,10 +18,12 @@
package org.apache.ignite.internal.processors.cache.tree;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
import org.apache.ignite.internal.util.typedef.internal.S;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.createVersionForRemovedValue;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.unmaskCoordinatorVersion;
+
/**
*
*/
@@ -42,7 +44,12 @@ public class MvccRemoveRow extends MvccUpdateRow {
/** {@inheritDoc} */
@Override public long mvccCoordinatorVersion() {
- return CacheCoordinatorsProcessor.createVersionForRemovedValue(super.mvccCoordinatorVersion());
+ return createVersionForRemovedValue(super.mvccCoordinatorVersion());
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long unmaskedCoordinatorVersion() {
+ return unmaskCoordinatorVersion(super.mvccCoordinatorVersion());
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/bb969db0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java
index 794661d..137ca28 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java
@@ -105,13 +105,13 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C
long rowCrdVer = unmaskCoordinatorVersion(io.getMvccCoordinatorVersion(pageAddr, idx));
long rowCntr = io.getMvccCounter(pageAddr, idx);
- int cmp = Long.compare(mvccVer.coordinatorVersion(), rowCrdVer);
+ int cmp = Long.compare(unmaskedCoordinatorVersion(), rowCrdVer);
if (cmp == 0)
cmp = Long.compare(mvccVer.counter(), rowCntr);
// Can be equals if backup rebalanced value updated on primary.
- assert cmp >= 0 : "[updCrd=" + mvccVer.coordinatorVersion() +
+ assert cmp >= 0 : "[updCrd=" + unmaskedCoordinatorVersion() +
", updCntr=" + mvccVer.counter() +
", rowCrd=" + rowCrdVer +
", rowCntr=" + rowCntr + ']';
@@ -138,11 +138,13 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C
long rowCrdVerMasked = rowIo.getMvccCoordinatorVersion(pageAddr, idx);
long rowCrdVer = unmaskCoordinatorVersion(rowCrdVerMasked);
+ long crdVer = unmaskedCoordinatorVersion();
+
if (res == null) {
- int cmp = Long.compare(mvccVer.coordinatorVersion(), rowCrdVer);
+ int cmp = Long.compare(crdVer, rowCrdVer);
if (cmp == 0)
- cmp = Long.compare(mvccVer.coordinatorVersion(), rowIo.getMvccCounter(pageAddr, idx));
+ cmp = Long.compare(mvccVer.counter(), rowIo.getMvccCounter(pageAddr, idx));
if (cmp == 0)
res = UpdateResult.VERSION_FOUND;
@@ -152,7 +154,7 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C
}
// Suppose transactions on previous coordinator versions are done.
- if (checkActive && mvccVer.coordinatorVersion() == rowCrdVer) {
+ if (checkActive && crdVer == rowCrdVer) {
long rowMvccCntr = rowIo.getMvccCounter(pageAddr, idx);
if (mvccVer.activeTransactions().contains(rowMvccCntr)) {
@@ -166,11 +168,11 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C
}
if (!txActive) {
- assert Long.compare(mvccVer.coordinatorVersion(), rowCrdVer) >= 0;
+ assert Long.compare(crdVer, rowCrdVer) >= 0;
int cmp;
- if (mvccVer.coordinatorVersion() == rowCrdVer)
+ if (crdVer == rowCrdVer)
cmp = Long.compare(mvccVer.cleanupVersion(), rowIo.getMvccCounter(pageAddr, idx));
else
cmp = 1;
@@ -183,7 +185,7 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C
assert row.link() != 0 && row.mvccCounter() != CacheCoordinatorsProcessor.COUNTER_NA : row;
// Should not be possible to cleanup active tx.
- assert rowCrdVer != mvccVer.coordinatorVersion()
+ assert rowCrdVer != crdVer
|| !mvccVer.activeTransactions().contains(row.mvccCounter());
if (cleanupRows == null)
@@ -199,6 +201,13 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C
return true;
}
+ /**
+ * @return Coordinator version without flags.
+ */
+ protected long unmaskedCoordinatorVersion() {
+ return mvccVer.coordinatorVersion();
+ }
+
/** {@inheritDoc} */
@Override public long mvccCoordinatorVersion() {
return mvccVer.coordinatorVersion();
http://git-wip-us.apache.org/repos/asf/ignite/blob/bb969db0/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 6ced2f9..30145ab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -134,11 +134,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
/** Version which is less then any version generated on coordinator. */
private static final MvccCoordinatorVersion ISOLATED_STREAMER_MVCC_VER =
- new MvccCoordinatorVersionResponse(1L, CacheCoordinatorsProcessor.START_VER, 0L) {
- @Override public boolean initialLoad() {
- return true;
- }
- };
+ new MvccCoordinatorVersionResponse(1L, CacheCoordinatorsProcessor.START_VER, 0L);
/** Cache receiver. */
private StreamReceiver<K, V> rcvr = ISOLATED_UPDATER;
http://git-wip-us.apache.org/repos/asf/ignite/blob/bb969db0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
index d45afe7..1abc116 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
@@ -1786,13 +1786,33 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
final IgniteCache<Object, Object> cache = node.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, 64));
- final int KEYS = 100;
+ try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ for (int k = 0; k < 100; k++)
+ cache.remove(k);
- checkValues(new HashMap<>(), cache);
+ tx.commit();
+ }
+
+ Map<Object, Object> expVals = new HashMap<>();
try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
- for (int k = 0; k < KEYS; k++)
- cache.remove(k);
+ for (int k = 100; k < 200; k++) {
+ cache.put(k, k);
+
+ expVals.put(k, k);
+ }
+
+ tx.commit();
+ }
+
+ try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ for (int k = 100; k < 200; k++) {
+ if (k % 2 == 0) {
+ cache.remove(k);
+
+ expVals.remove(k);
+ }
+ }
tx.commit();
}
@@ -1800,6 +1820,12 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
startGrid(1);
awaitPartitionMapExchange();
+
+ checkValues(expVals, jcache(1));
+
+ stopGrid(0);
+
+ checkValues(expVals, jcache(1));
}
/**
[03/17] ignite git commit: ignite-5937
Posted by sb...@apache.org.
ignite-5937
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c4f98f36
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c4f98f36
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c4f98f36
Branch: refs/heads/ignite-5932
Commit: c4f98f367ab24ca22ab8b4f3f1213f365846210f
Parents: d4d2c38
Author: sboikov <sb...@gridgain.com>
Authored: Wed Oct 4 11:32:02 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Oct 4 11:32:02 2017 +0300
----------------------------------------------------------------------
.../cache/persistence/tree/BPlusTree.java | 31 +++++++++++++++-----
1 file changed, 23 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/c4f98f36/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
index 5836340..05adb41 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
@@ -62,6 +62,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.SB;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
+import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree.Bool.DONE;
import static org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree.Bool.FALSE;
@@ -4447,21 +4448,25 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
abstract void init0();
/**
- * @param pageAddr
- * @param io
- * @param startIdx
- * @param cnt
- * @return
+ * @param pageAddr Page address.
+ * @param io IO.
+ * @param startIdx Start index.
+ * @param cnt Number of rows in the buffer.
+ * @return {@code true} If we were able to fetch rows from this page.
* @throws IgniteCheckedException If failed.
*/
- abstract boolean fillFromBuffer0(long pageAddr, BPlusIO<L> io, int startIdx, int cnt) throws IgniteCheckedException;
+ abstract boolean fillFromBuffer0(long pageAddr, BPlusIO<L> io, int startIdx, int cnt)
+ throws IgniteCheckedException;
/**
- * @return
+ * @return {@code True} If we have rows to return after reading the next page.
* @throws IgniteCheckedException If failed.
*/
abstract boolean reinitialize0() throws IgniteCheckedException;
+ /**
+ * @param readDone {@code True} if traversed all rows.
+ */
abstract void onNotFound(boolean readDone);
/**
@@ -4584,6 +4589,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
}
/**
+ * @param lastRow Last read row (to be used as new lower bound).
* @return {@code true} If we have rows to return after reading the next page.
* @throws IgniteCheckedException If failed.
*/
@@ -4655,9 +4661,10 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
/**
* @param lowerBound Lower bound.
* @param upperBound Upper bound.
+ * @param p Row predicate.
* @param x Implementation specific argument, {@code null} always means that we need to return full detached data row.
*/
- FindOneCursor(L lowerBound, L upperBound, RowPredicate<L, T> p, Object x) {
+ FindOneCursor(L lowerBound, L upperBound, @Nullable RowPredicate<L, T> p, Object x) {
super(lowerBound, upperBound, x);
assert lowerBound != null;
@@ -4666,10 +4673,12 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
this.p = p;
}
+ /** {@inheritDoc} */
@Override void init0() {
// No-op.
}
+ /** {@inheritDoc} */
@Override boolean fillFromBuffer0(long pageAddr, BPlusIO<L> io, int startIdx, int cnt) throws IgniteCheckedException {
if (startIdx == 0) // TODO IGNITE-3478: startIdx == 0? can search twice for first item?
startIdx = findLowerBound(pageAddr, io, cnt);
@@ -4699,10 +4708,12 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
return true;
}
+ /** {@inheritDoc} */
@Override boolean reinitialize0() throws IgniteCheckedException {
return true;
}
+ /** {@inheritDoc} */
@Override void onNotFound(boolean readDone) {
resRow = EMPTY;
}
@@ -4758,6 +4769,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
super(lowerBound, upperBound, x);
}
+ /** {@inheritDoc} */
@Override boolean fillFromBuffer0(long pageAddr, BPlusIO<L> io, int startIdx, int cnt) throws IgniteCheckedException {
if (lowerBound != null && startIdx == 0)
startIdx = findLowerBound(pageAddr, io, cnt);
@@ -4784,10 +4796,12 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
return true;
}
+ /** {@inheritDoc} */
@Override boolean reinitialize0() throws IgniteCheckedException {
return next();
}
+ /** {@inheritDoc} */
@Override void onNotFound(boolean readDone) {
if (readDone)
rows = null;
@@ -4801,6 +4815,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
}
}
+ /** {@inheritDoc} */
@Override void init0() {
row = -1;
}