You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/02/22 06:56:00 UTC
[1/3] ignite git commit: ignite-4652 Implemented BPlusTree.invoke
Repository: ignite
Updated Branches:
refs/heads/ignite-3477 40f015d18 -> ee28b9cb8
http://git-wip-us.apache.org/repos/asf/ignite/blob/ee28b9cb/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java
index 0de3754..22bc17a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java
@@ -50,17 +50,22 @@ import org.apache.ignite.internal.processors.cache.database.tree.reuse.ReuseList
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.GridRandom;
import org.apache.ignite.internal.util.GridStripedLock;
+import org.apache.ignite.internal.util.IgniteTree;
import org.apache.ignite.internal.util.lang.GridCursor;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.SB;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
import org.jsr166.ConcurrentLinkedHashMap;
import static org.apache.ignite.internal.pagemem.PageIdUtils.effectivePageId;
import static org.apache.ignite.internal.processors.cache.database.tree.BPlusTree.rnd;
+import static org.apache.ignite.internal.util.IgniteTree.OperationType.NOOP;
+import static org.apache.ignite.internal.util.IgniteTree.OperationType.PUT;
+import static org.apache.ignite.internal.util.IgniteTree.OperationType.REMOVE;
/**
*/
@@ -189,6 +194,40 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
}
/**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void _testBenchInvoke() throws IgniteCheckedException {
+ MAX_PER_PAGE = 10;
+
+ TestTree tree = createTestTree(true);
+
+ long start = System.nanoTime();
+
+ for (int i = 0; i < 10_000_000; i++) {
+ final long key = BPlusTree.randomInt(1000);
+
+// tree.findOne(key); // 39
+// tree.putx(key); // 22
+
+ tree.invoke(key, null, new IgniteTree.InvokeClosure<Long>() { // 25
+ @Override public void call(@Nullable Long row) throws IgniteCheckedException {
+ // No-op.
+ }
+
+ @Override public Long newRow() {
+ return key;
+ }
+
+ @Override public IgniteTree.OperationType operationType() {
+ return PUT;
+ }
+ });
+ }
+
+ X.println(" __ time: " + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
+ }
+
+ /**
* @param cursor cursor to check.
* @param iterator iterator with expected result.
* @throws IgniteCheckedException If failed
@@ -561,6 +600,127 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
/**
* @throws IgniteCheckedException If failed.
*/
+ public void testRandomInvoke_1_30_1() throws IgniteCheckedException {
+ MAX_PER_PAGE = 1;
+ CNT = 30;
+
+ doTestRandomInvoke(true);
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testRandomInvoke_1_30_0() throws IgniteCheckedException {
+ MAX_PER_PAGE = 1;
+ CNT = 30;
+
+ doTestRandomInvoke(false);
+ }
+
+ /**
+ * @param canGetRow Can get row from inner page.
+ * @throws IgniteCheckedException If failed.
+ */
+ private void doTestRandomInvoke(boolean canGetRow) throws IgniteCheckedException {
+ TestTree tree = createTestTree(canGetRow);
+
+ Map<Long,Long> map = new HashMap<>();
+
+ int loops = reuseList == null ? 100_000 : 300_000;
+
+ for (int i = 0 ; i < loops; i++) {
+ final Long x = (long)BPlusTree.randomInt(CNT);
+ final int rnd = BPlusTree.randomInt(11);
+
+ if (i % 10_000 == 0) {
+// X.println(tree.printTree());
+ X.println(" --> " + i + " ++> " + x);
+ }
+
+ // Update map.
+ if (!map.containsKey(x)) {
+ if (rnd % 2 == 0) {
+ map.put(x, x);
+
+// X.println("put0: " + x);
+ }
+ else {
+// X.println("noop0: " + x);
+ }
+ }
+ else {
+ if (rnd % 2 == 0) {
+// X.println("put1: " + x);
+ }
+ else if (rnd % 3 == 0) {
+ map.remove(x);
+
+// X.println("rmv1: " + x);
+ }
+ else {
+// X.println("noop1: " + x);
+ }
+ }
+
+ // Consistently update tree.
+ tree.invoke(x, null, new IgniteTree.InvokeClosure<Long>() {
+
+ IgniteTree.OperationType op;
+
+ Long newRow;
+
+ @Override public void call(@Nullable Long row) throws IgniteCheckedException {
+ if (row == null) {
+ if (rnd % 2 == 0) {
+ op = PUT;
+ newRow = x;
+ }
+ else {
+ op = NOOP;
+ newRow = null;
+ }
+ }
+ else {
+ assertEquals(x, row);
+
+ if (rnd % 2 == 0) {
+ op = PUT;
+ newRow = x; // We can not replace x with y here, because keys must be equal.
+ }
+ else if (rnd % 3 == 0) {
+ op = REMOVE;
+ newRow = null;
+ }
+ else {
+ op = NOOP;
+ newRow = null;
+ }
+ }
+ }
+
+ @Override public Long newRow() {
+ return newRow;
+ }
+
+ @Override public IgniteTree.OperationType operationType() {
+ return op;
+ }
+ });
+
+ assertNoLocks();
+
+// X.println(tree.printTree());
+
+ tree.validateTree();
+
+ if (i % 100 == 0)
+ assertEqualContents(tree, map);
+ }
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
public void testRandomPutRemove_1_30_0() throws IgniteCheckedException {
MAX_PER_PAGE = 1;
CNT = 30;
@@ -840,24 +1000,32 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
// X.println(tree.printTree());
tree.validateTree();
- if (i % 100 == 0) {
- GridCursor<Long> cursor = tree.find(null, null);
-
- while (cursor.next()) {
- x = cursor.get();
+ if (i % 100 == 0)
+ assertEqualContents(tree, map);
+ }
+ }
- assert x != null;
+ /**
+ * @param tree Tree.
+ * @param map Map.
+ * @throws IgniteCheckedException If failed.
+ */
+ private void assertEqualContents(IgniteTree<Long, Long> tree, Map<Long,Long> map) throws IgniteCheckedException {
+ GridCursor<Long> cursor = tree.find(null, null);
- assertEquals(map.get(x), x);
+ while (cursor.next()) {
+ Long x = cursor.get();
- assertNoLocks();
- }
+ assert x != null;
- assertEquals(map.size(), tree.size());
+ assertEquals(map.get(x), x);
- assertNoLocks();
- }
+ assertNoLocks();
}
+
+ assertEquals(map.size(), tree.size());
+
+ assertNoLocks();
}
/**
@@ -1039,27 +1207,28 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
final GridStripedLock lock = new GridStripedLock(256);
+ final String[] ops = {"put", "rmv", "inv_put", "inv_rmv"};
+
IgniteInternalFuture<?> fut = multithreadedAsync(new Callable<Object>() {
@Override public Object call() throws Exception {
for (int i = 0; i < loops; i++) {
- Long x = (long)DataStructure.randomInt(CNT);
-
- boolean put = DataStructure.randomInt(2) == 0;
+ final Long x = (long)DataStructure.randomInt(CNT);
+ final int op = DataStructure.randomInt(4);
if (i % 10000 == 0)
- X.println(" --> " + (put ? "put " : "rmv ") + i + " " + x);
+ X.println(" --> " + ops[op] + "_" + i + " " + x);
Lock l = lock.getLock(x.longValue());
l.lock();
try {
- if (put) {
+ if (op == 0) { // Put.
assertEquals(map.put(x, x), tree.put(x));
assertNoLocks();
}
- else {
+ else if (op == 1) { // Remove.
if (map.remove(x) != null) {
assertEquals(x, tree.remove(x));
@@ -1070,6 +1239,54 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
assertNoLocks();
}
+ else if (op == 2) {
+ tree.invoke(x, null, new IgniteTree.InvokeClosure<Long>() {
+ IgniteTree.OperationType opType;
+
+ @Override public void call(@Nullable Long row) throws IgniteCheckedException {
+ opType = PUT;
+
+ if (row != null)
+ assertEquals(x, row);
+ }
+
+ @Override public Long newRow() {
+ return x;
+ }
+
+ @Override public IgniteTree.OperationType operationType() {
+ return opType;
+ }
+ });
+
+ map.put(x,x);
+ }
+ else if (op == 3) {
+ tree.invoke(x, null, new IgniteTree.InvokeClosure<Long>() {
+ IgniteTree.OperationType opType;
+
+ @Override public void call(@Nullable Long row) throws IgniteCheckedException {
+ if (row != null) {
+ assertEquals(x, row);
+ opType = REMOVE;
+ }
+ else
+ opType = NOOP;
+ }
+
+ @Override public Long newRow() {
+ return null;
+ }
+
+ @Override public IgniteTree.OperationType operationType() {
+ return opType;
+ }
+ });
+
+ map.remove(x);
+ }
+ else
+ fail();
}
finally {
l.unlock();
@@ -1240,7 +1457,8 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
}
/** {@inheritDoc} */
- @Override protected Long getRow(BPlusIO<Long> io, long pageAddr, int idx) throws IgniteCheckedException {
+ @Override protected Long getRow(BPlusIO<Long> io, long pageAddr, int idx, Object ignore)
+ throws IgniteCheckedException {
assert io.canGetRow() : io;
return io.getLookupRow(this, pageAddr, idx);
@@ -1276,11 +1494,17 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
/** {@inheritDoc} */
@Override public void onBeforeReadLock(Page page) {
+// X.println(" onBeforeReadLock: " + U.hexLong(page.id()));
+//
+// U.dumpStack();
+
assertNull(beforeReadLock.put(threadId(), page.id()));
}
/** {@inheritDoc} */
@Override public void onReadLock(Page page, long pageAddr) {
+// X.println(" onReadLock: " + U.hexLong(page.id()));
+
if (pageAddr != 0L) {
long pageId = PageIO.getPageId(pageAddr);
@@ -1294,6 +1518,8 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
/** {@inheritDoc} */
@Override public void onReadUnlock(Page page, long pageAddr) {
+// X.println(" onReadUnlock: " + U.hexLong(page.id()));
+
checkPageId(page, pageAddr);
long pageId = PageIO.getPageId(pageAddr);
@@ -1303,11 +1529,17 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
/** {@inheritDoc} */
@Override public void onBeforeWriteLock(Page page) {
+// X.println(" onBeforeWriteLock: " + U.hexLong(page.id()));
+
assertNull(beforeWriteLock.put(threadId(), page.id()));
}
/** {@inheritDoc} */
@Override public void onWriteLock(Page page, long pageAddr) {
+// X.println(" onWriteLock: " + U.hexLong(page.id()));
+//
+// U.dumpStack();
+
if (pageAddr != 0L) {
checkPageId(page, pageAddr);
@@ -1324,6 +1556,8 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
/** {@inheritDoc} */
@Override public void onWriteUnlock(Page page, long pageAddr) {
+// X.println(" onWriteUnlock: " + U.hexLong(page.id()));
+
assertEquals(effectivePageId(page.id()), effectivePageId(PageIO.getPageId(pageAddr)));
assertEquals(Long.valueOf(page.id()), locks(false).remove(page.id()));
http://git-wip-us.apache.org/repos/asf/ignite/blob/ee28b9cb/modules/core/src/test/java/org/apache/ignite/internal/processors/database/FreeListImplSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/FreeListImplSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/FreeListImplSelfTest.java
index d0d495e..ebf4aee 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/FreeListImplSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/FreeListImplSelfTest.java
@@ -368,6 +368,11 @@ public class FreeListImplSelfTest extends GridCommonAbstractTest {
}
/** {@inheritDoc} */
+ @Override public void key(KeyCacheObject key) {
+ this.key = (TestCacheObject)key;
+ }
+
+ /** {@inheritDoc} */
@Override public CacheObject value() {
return val;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ee28b9cb/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2Tree.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2Tree.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2Tree.java
index 2442722..d9b820a 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2Tree.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2Tree.java
@@ -90,7 +90,7 @@ public abstract class H2Tree extends BPlusTree<SearchRow, GridH2Row> {
}
/** {@inheritDoc} */
- @Override protected GridH2Row getRow(BPlusIO<SearchRow> io, long pageAddr, int idx)
+ @Override protected GridH2Row getRow(BPlusIO<SearchRow> io, long pageAddr, int idx, Object ignore)
throws IgniteCheckedException {
return (GridH2Row)io.getLookupRow(this, pageAddr, idx);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ee28b9cb/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java
index 6d4e97a..ce10cdb 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java
@@ -66,6 +66,11 @@ public abstract class GridH2Row extends Row implements GridSearchRowPointer, Cac
}
/** {@inheritDoc} */
+ @Override public void key(KeyCacheObject key) {
+ this.key = key;
+ }
+
+ /** {@inheritDoc} */
@Override public CacheObject value() {
return val;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ee28b9cb/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
index 729309a..9c59faf 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
@@ -499,6 +499,11 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
}
/** {@inheritDoc} */
+ @Override public void invoke(GridSearchRowPointer key, Object x, InvokeClosure<GridH2Row> c) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
@Override public GridH2Row put(GridH2Row val) {
return tree.put(val, val);
}
[2/3] ignite git commit: ignite-4652 Implemented BPlusTree.invoke
Posted by sb...@apache.org.
http://git-wip-us.apache.org/repos/asf/ignite/blob/ee28b9cb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index a869b21..4a98f6a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -27,6 +27,7 @@ import org.apache.ignite.internal.processors.cache.database.tree.reuse.ReuseList
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.GridAtomicLong;
+import org.apache.ignite.internal.util.IgniteTree;
import org.apache.ignite.internal.util.lang.GridCloseableIterator;
import org.apache.ignite.internal.util.lang.GridCursor;
import org.apache.ignite.internal.util.lang.GridIterator;
@@ -82,6 +83,12 @@ public interface IgniteCacheOffheapManager extends GridCacheManager {
public Iterable<CacheDataStore> cacheDataStores();
/**
+ * @param part Partition.
+ * @return Data store.
+ */
+ public CacheDataStore dataStore(GridDhtLocalPartition part);
+
+ /**
* @param p Partition ID.
* @param store Data store.
*/
@@ -107,6 +114,15 @@ public interface IgniteCacheOffheapManager extends GridCacheManager {
public long expiredSize() throws IgniteCheckedException;
/**
+ * @param key Key.
+ * @param part Partition.
+ * @param c Tree update closure.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void invoke(KeyCacheObject key, GridDhtLocalPartition part, OffheapInvokeClosure c)
+ throws IgniteCheckedException;
+
+ /**
* @param key Key.
* @param val Value.
* @param ver Version.
@@ -253,6 +269,16 @@ public interface IgniteCacheOffheapManager extends GridCacheManager {
/**
*
*/
+ interface OffheapInvokeClosure extends IgniteTree.InvokeClosure<CacheDataRow> {
+ /**
+ * @return Old row.
+ */
+ @Nullable public CacheDataRow oldRow();
+ }
+
+ /**
+ *
+ */
interface CacheDataStore {
/**
* @return Partition ID.
@@ -297,6 +323,21 @@ public interface IgniteCacheOffheapManager extends GridCacheManager {
/**
* @param key Key.
+ * @param val Value.
+ * @param ver Version.
+ * @param expireTime Expire time.
+ * @param oldRow Old row.
+ * @return New row.
+ * @throws IgniteCheckedException If failed.
+ */
+ CacheDataRow createRow(KeyCacheObject key,
+ CacheObject val,
+ GridCacheVersion ver,
+ long expireTime,
+ @Nullable CacheDataRow oldRow) throws IgniteCheckedException;
+
+ /**
+ * @param key Key.
* @param part Partition.
* @param val Value.
* @param ver Version.
@@ -313,6 +354,13 @@ public interface IgniteCacheOffheapManager extends GridCacheManager {
/**
* @param key Key.
+ * @param c Closure.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void invoke(KeyCacheObject key, OffheapInvokeClosure c) throws IgniteCheckedException;
+
+ /**
+ * @param key Key.
* @param partId Partition number.
* @throws IgniteCheckedException If failed.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/ee28b9cb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 5df99b6..b863edd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -207,7 +207,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
* @param part Partition.
* @return Data store for given entry.
*/
- private CacheDataStore dataStore(GridDhtLocalPartition part) {
+ public CacheDataStore dataStore(GridDhtLocalPartition part) {
if (cctx.isLocal())
return locCacheDataStore;
else {
@@ -329,6 +329,14 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
}
/** {@inheritDoc} */
+ @Override public void invoke(KeyCacheObject key,
+ GridDhtLocalPartition part,
+ OffheapInvokeClosure c)
+ throws IgniteCheckedException {
+ dataStore(part).invoke(key, c);
+ }
+
+ /** {@inheritDoc} */
@Override public void update(
KeyCacheObject key,
CacheObject val,
@@ -838,6 +846,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
protected Long initCntr = 0L;
/**
+ * @param partId Partition number.
* @param name Name.
* @param rowStore Row store.
* @param dataTree Data tree.
@@ -900,6 +909,9 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
if (oldRow == null || indexingEnabled)
return false;
+ if (oldRow.expireTime() != dataRow.expireTime())
+ return false;
+
CacheObjectContext coCtx = cctx.cacheObjectContext();
int oldLen = oldRow.key().valueBytesLength(coCtx) + oldRow.value().valueBytesLength(coCtx);
@@ -913,6 +925,71 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
}
/** {@inheritDoc} */
+ @Override public void invoke(KeyCacheObject key, OffheapInvokeClosure c)
+ throws IgniteCheckedException {
+ if (!busyLock.enterBusy())
+ throw new NodeStoppingException("Operation has been cancelled (node is stopping).");
+
+ try {
+ dataTree.invoke(new SearchRow(key), CacheDataRowAdapter.RowData.NO_KEY, c);
+
+ switch (c.operationType()) {
+ case PUT: {
+ assert c.newRow() != null : c;
+
+ CacheDataRow oldRow = c.oldRow();
+
+ finishUpdate(c.newRow(), oldRow);
+
+ break;
+ }
+
+ case REMOVE: {
+ CacheDataRow oldRow = c.oldRow();
+
+ finishRemove(key, oldRow);
+
+ break;
+ }
+
+ case NOOP:
+ break;
+
+ default:
+ assert false : c.operationType();
+ }
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public CacheDataRow createRow(KeyCacheObject key,
+ CacheObject val,
+ GridCacheVersion ver,
+ long expireTime,
+ @Nullable CacheDataRow oldRow) throws IgniteCheckedException
+ {
+ DataRow dataRow = new DataRow(key, val, ver, partId, expireTime);
+
+ if (canUpdateOldRow(oldRow, dataRow) && rowStore.updateRow(oldRow.link(), dataRow))
+ dataRow.link(oldRow.link());
+ else {
+ CacheObjectContext coCtx = cctx.cacheObjectContext();
+
+ key.valueBytes(coCtx);
+ val.valueBytes(coCtx);
+
+ rowStore.addRow(dataRow);
+ }
+
+ assert dataRow.link() != 0 : dataRow;
+
+ return dataRow;
+ }
+
+ /** {@inheritDoc} */
@Override public void update(KeyCacheObject key,
int p,
CacheObject val,
@@ -935,14 +1012,10 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
CacheDataRow old;
- boolean rmvOld = true;
-
if (canUpdateOldRow(oldRow, dataRow) && rowStore.updateRow(oldRow.link(), dataRow)) {
old = oldRow;
dataRow.link(oldRow.link());
-
- rmvOld = false;
}
else {
rowStore.addRow(dataRow);
@@ -956,43 +1029,68 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
}
else
old = dataTree.put(dataRow);
-
- if (old == null)
- storageSize.incrementAndGet();
}
- if (indexingEnabled) {
- GridCacheQueryManager qryMgr = cctx.queries();
+ finishUpdate(dataRow, old);
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
+ }
- assert qryMgr.enabled();
+ /**
+ * @param newRow New row.
+ * @param oldRow Old row if available.
+ * @throws IgniteCheckedException If failed.
+ */
+ private void finishUpdate(CacheDataRow newRow, @Nullable CacheDataRow oldRow) throws IgniteCheckedException {
+ if (oldRow == null)
+ storageSize.incrementAndGet();
- if (old != null)
- qryMgr.store(key, p, old.value(), old.version(), val, ver, expireTime, dataRow.link());
- else
- qryMgr.store(key, p, null, null, val, ver, expireTime, dataRow.link());
- }
+ KeyCacheObject key = newRow.key();
+
+ long expireTime = newRow.expireTime();
- if (old != null) {
- assert old.link() != 0 : old;
+ if (indexingEnabled) {
+ GridCacheQueryManager qryMgr = cctx.queries();
- if (pendingEntries != null && old.expireTime() != 0)
- pendingEntries.removex(new PendingRow(old.expireTime(), old.link()));
+ assert qryMgr.enabled();
- if (rmvOld)
- rowStore.removeRow(old.link());
+ if (oldRow != null) {
+ qryMgr.store(key,
+ partId,
+ oldRow.value(), oldRow.version(),
+ newRow.value(), newRow.version(),
+ expireTime,
+ newRow.link());
+ }
+ else {
+ qryMgr.store(key,
+ partId,
+ null, null,
+ newRow.value(), newRow.version(),
+ expireTime,
+ newRow.link());
}
+ }
- if (pendingEntries != null && expireTime != 0) {
- pendingEntries.putx(new PendingRow(expireTime, dataRow.link()));
+ if (oldRow != null) {
+ assert oldRow.link() != 0 : oldRow;
- hasPendingEntries = true;
- }
+ if (pendingEntries != null && oldRow.expireTime() != 0)
+ pendingEntries.removex(new PendingRow(oldRow.expireTime(), oldRow.link()));
- updateIgfsMetrics(key, (old != null ? old.value() : null), val);
+ if (newRow.link() != oldRow.link())
+ rowStore.removeRow(oldRow.link());
}
- finally {
- busyLock.leaveBusy();
+
+ if (pendingEntries != null && expireTime != 0) {
+ pendingEntries.putx(new PendingRow(expireTime, newRow.link()));
+
+ hasPendingEntries = true;
}
+
+ updateIgfsMetrics(key, (oldRow != null ? oldRow.value() : null), newRow.value());
}
/** {@inheritDoc} */
@@ -1001,50 +1099,59 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
throw new NodeStoppingException("Operation has been cancelled (node is stopping).");
try {
- CacheDataRow dataRow = dataTree.remove(new SearchRow(key));
-
- CacheObject val = null;
- GridCacheVersion ver = null;
+ CacheDataRow oldRow = dataTree.remove(new SearchRow(key));
- if (dataRow != null) {
- assert dataRow.link() != 0 : dataRow;
+ finishRemove(key, oldRow);
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
+ }
- if (pendingEntries != null && dataRow.expireTime() != 0)
- pendingEntries.removex(new PendingRow(dataRow.expireTime(), dataRow.link()));
+ /**
+ * @param key Key.
+ * @param oldRow Removed row.
+ * @throws IgniteCheckedException If failed.
+ */
+ private void finishRemove(KeyCacheObject key, @Nullable CacheDataRow oldRow) throws IgniteCheckedException {
+ CacheObject val = null;
+ GridCacheVersion ver = null;
- storageSize.decrementAndGet();
+ if (oldRow != null) {
+ assert oldRow.link() != 0 : oldRow;
- val = dataRow.value();
+ if (pendingEntries != null && oldRow.expireTime() != 0)
+ pendingEntries.removex(new PendingRow(oldRow.expireTime(), oldRow.link()));
- ver = dataRow.version();
- }
+ storageSize.decrementAndGet();
- if (indexingEnabled) {
- GridCacheQueryManager qryMgr = cctx.queries();
+ val = oldRow.value();
- assert qryMgr.enabled();
+ ver = oldRow.version();
+ }
- qryMgr.remove(key, partId, val, ver);
- }
+ if (indexingEnabled) {
+ GridCacheQueryManager qryMgr = cctx.queries();
- if (dataRow != null)
- rowStore.removeRow(dataRow.link());
+ assert qryMgr.enabled();
- updateIgfsMetrics(key, (dataRow != null ? dataRow.value() : null), null);
- }
- finally {
- busyLock.leaveBusy();
+ qryMgr.remove(key, partId, val, ver);
}
+
+ if (oldRow != null)
+ rowStore.removeRow(oldRow.link());
+
+ updateIgfsMetrics(key, (oldRow != null ? oldRow.value() : null), null);
}
/** {@inheritDoc} */
@Override public CacheDataRow find(KeyCacheObject key) throws IgniteCheckedException {
key.valueBytes(cctx.cacheObjectContext());
- CacheDataRow row = dataTree.findOne(new SearchRow(key), dataTree.noKeyC);
+ CacheDataRow row = dataTree.findOne(new SearchRow(key), CacheDataRowAdapter.RowData.NO_KEY);
if (row != null)
- ((CacheDataRowAdapter)row).key(key);
+ row.key(key);
return row;
}
@@ -1261,17 +1368,6 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
/** */
private final GridCacheContext cctx;
- /** */
- private final RowClosure<CacheSearchRow, CacheDataRow> noKeyC = new RowClosure<CacheSearchRow, CacheDataRow>() {
- @Override public CacheDataRow row(BPlusIO<CacheSearchRow> io, long pageAddr, int idx)
- throws IgniteCheckedException {
- int hash = ((RowLinkIO)io).getHash(pageAddr, idx);
- long link = ((RowLinkIO)io).getLink(pageAddr, idx);
-
- return rowStore.dataRow(hash, link, CacheDataRowAdapter.RowData.NO_KEY);
- }
- };
-
/**
* @param name Tree name.
* @param reuseList Reuse list.
@@ -1320,12 +1416,16 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
}
/** {@inheritDoc} */
- @Override protected CacheDataRow getRow(BPlusIO<CacheSearchRow> io, long pageAddr, int idx)
+ @Override protected CacheDataRow getRow(BPlusIO<CacheSearchRow> io, long pageAddr, int idx, Object flags)
throws IgniteCheckedException {
int hash = ((RowLinkIO)io).getHash(pageAddr, idx);
long link = ((RowLinkIO)io).getLink(pageAddr, idx);
- return rowStore.dataRow(hash, link, CacheDataRowAdapter.RowData.FULL);
+ CacheDataRowAdapter.RowData x = flags != null ?
+ (CacheDataRowAdapter.RowData)flags :
+ CacheDataRowAdapter.RowData.FULL;
+
+ return rowStore.dataRow(hash, link, x);
}
/**
@@ -1705,7 +1805,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
}
/** {@inheritDoc} */
- @Override protected PendingRow getRow(BPlusIO<PendingRow> io, long pageAddr, int idx)
+ @Override protected PendingRow getRow(BPlusIO<PendingRow> io, long pageAddr, int idx, Object ignore)
throws IgniteCheckedException {
return io.getLookupRow(this, pageAddr, idx);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ee28b9cb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRow.java
index 75ab8e4..cc26b21 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRow.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.cache.database;
import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
/**
@@ -48,4 +49,9 @@ public interface CacheDataRow extends CacheSearchRow {
* @param link Link for this row.
*/
public void link(long link);
+
+ /**
+ * @param key Key.
+ */
+ public void key(KeyCacheObject key);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ee28b9cb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java
index 4bfdd99..5a62e75 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java
@@ -73,6 +73,19 @@ public class CacheDataRowAdapter implements CacheDataRow {
}
/**
+ * @param key Key.
+ * @param val Value.
+ * @param expireTime Expire time.
+ * @param ver Version.
+ */
+ public CacheDataRowAdapter(KeyCacheObject key, CacheObject val, GridCacheVersion ver, long expireTime) {
+ this.key = key;
+ this.val = val;
+ this.ver = ver;
+ this.expireTime = expireTime;
+ }
+
+ /**
* Read row from data pages.
*
* @param cctx Cache context.
http://git-wip-us.apache.org/repos/asf/ignite/blob/ee28b9cb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/MetadataStorage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/MetadataStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/MetadataStorage.java
index 47c3254..ca4ad05 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/MetadataStorage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/MetadataStorage.java
@@ -213,7 +213,7 @@ public class MetadataStorage implements MetaStore {
/** {@inheritDoc} */
@Override protected IndexItem getRow(final BPlusIO<IndexItem> io, final long pageAddr,
- final int idx) throws IgniteCheckedException {
+ final int idx, Object ignore) throws IgniteCheckedException {
return readRow(pageAddr, ((IndexIO)io).getOffset(pageAddr, idx));
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ee28b9cb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/BPlusTree.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/BPlusTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/BPlusTree.java
index aa61fbd..8827407 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/BPlusTree.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/BPlusTree.java
@@ -120,20 +120,6 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
/** */
private volatile TreeMetaData treeMeta;
- /**
- *
- */
- public static interface RowClosure<L, R> {
- /**
- * @param io IO.
- * @param pageAddr Page address.
- * @param idx Index.
- * @return Result.
- * @throws IgniteCheckedException If failed.
- */
- public R row(BPlusIO<L> io, long pageAddr, int idx) throws IgniteCheckedException;
- }
-
/** */
private final GridTreePrinter<Long> treePrinter = new GridTreePrinter<Long>() {
/** */
@@ -224,22 +210,20 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
long res = doAskNeighbor(io, pageAddr, back);
if (back) {
- assert g.getClass() == Remove.class;
-
if (io.getForward(pageAddr) != g.backId) // See how g.backId is setup in removeDown for this check.
return RETRY;
- g.backId = res;
+ g.backId(res);
}
else {
assert isBack == FALSE.ordinal() : isBack;
- g.fwdId = res;
+ g.fwdId(res);
}
return FOUND;
}
- };
+ }
/** */
private final GetPageHandler<Get> search = new Search();
@@ -257,7 +241,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
boolean needBackIfRouting = g.backId != 0;
- g.backId = 0; // Usually we'll go left down and don't need it.
+ g.backId(0L); // Usually we'll go left down and don't need it.
int cnt = io.getCount(pageAddr);
int idx = findInsertionPoint(io, pageAddr, 0, cnt, g.row, g.shift);
@@ -282,13 +266,13 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
assert !io.isLeaf() : io;
// If idx == cnt then we go right down, else left down: getLeft(cnt) == getRight(cnt - 1).
- g.pageId = inner(io).getLeft(pageAddr, idx);
+ g.pageId(inner(io).getLeft(pageAddr, idx));
// If we see the tree in consistent state, then our right down page must be forward for our left down page,
// we need to setup fwdId and/or backId to be able to check this invariant on lower level.
if (idx < cnt) {
// Go left down here.
- g.fwdId = inner(io).getRight(pageAddr, idx);
+ g.fwdId(inner(io).getRight(pageAddr, idx));
}
else {
// Go right down here or it is an empty branch.
@@ -301,7 +285,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
// Setup fwdId.
if (fwdId == 0)
- g.fwdId = 0;
+ g.fwdId(0L);
else {
// We can do askNeighbor on forward page here because we always take locks in forward direction.
Result res = askNeighbor(fwdId, g, false);
@@ -312,7 +296,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
// Setup backId.
if (cnt != 0) // It is not a routing page and we are going to the right, can get backId here.
- g.backId = inner(io).getLeft(pageAddr, cnt - 1);
+ g.backId(inner(io).getLeft(pageAddr, cnt - 1));
else if (needBackIfRouting) {
// Can't get backId here because of possible deadlock and it is only needed for remove operation.
return GO_DOWN_X;
@@ -321,7 +305,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
return GO_DOWN;
}
- };
+ }
/** */
private final GetPageHandler<Put> replace = new Replace();
@@ -331,6 +315,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
*/
private class Replace extends GetPageHandler<Put> {
/** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
@Override public Result run0(Page page, long pageAddr, BPlusIO<L> io, Put p, int lvl)
throws IgniteCheckedException {
// Check the triangle invariant.
@@ -351,15 +336,26 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
// Detach the old row if we are on a leaf page.
if (lvl == 0) {
- assert p.oldRow == null;
+ assert p.oldRow == null; // The old row must be set only once.
+
+ // Inner replace state must be consistent by the end of the operation.
+ assert p.needReplaceInner == FALSE || p.needReplaceInner == DONE : p.needReplaceInner;
+
+ // Need to replace inner key if now we are replacing the rightmost row and have a forward page.
+ if (canGetRowFromInner && idx + 1 == cnt && p.fwdId != 0L && p.needReplaceInner == FALSE) {
+ // Can happen only for invoke, otherwise inner key must be replaced on the way down.
+ assert p.invoke != null;
+
+ // We need to restart the operation from root to perform inner replace.
+ // On the second pass we will not get here (will avoid infinite loop) because
+ // needReplaceInner will be DONE or our key will not be the rightmost anymore.
+ return RETRY_ROOT;
+ }
// Get old row in leaf page to reduce contention at upper level.
p.oldRow = p.needOld ? getRow(io, pageAddr, idx) : (T)Boolean.TRUE;
p.finish();
-
- // Inner replace state must be consistent by the end of the operation.
- assert p.needReplaceInner == FALSE || p.needReplaceInner == DONE : p.needReplaceInner;
}
boolean needWal = needWalDeltaRecord(page);
@@ -371,7 +367,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
return FOUND;
}
- };
+ }
/** */
private final GetPageHandler<Put> insert = new Insert();
@@ -405,6 +401,9 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
p.btmLvl++; // Get high.
p.row = moveUpRow;
+ if (p.invoke != null)
+ p.invoke.row = moveUpRow;
+
// Here forward page can't be concurrently removed because we keep write lock on tail which is the only
// page who knows about the forward page, because it was just produced by split.
p.rightId = io.getForward(pageAddr);
@@ -417,7 +416,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
return FOUND;
}
- };
+ }
/** */
private final GetPageHandler<Remove> rmvFromLeaf = new RemoveFromLeaf();
@@ -437,15 +436,8 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
int idx = findInsertionPoint(io, pageAddr, 0, cnt, r.row, 0);
- if (idx < 0) {
- if (!r.ceil) // We've found exact match on search but now it's gone.
- return RETRY;
-
- idx = fix(idx);
-
- if (idx == cnt) // We can not remove ceiling row here. Bad luck.
- return NOT_FOUND;
- }
+ if (idx < 0)
+ return RETRY; // We've found exact match on search but now it's gone.
assert idx >= 0 && idx < cnt: idx;
@@ -495,7 +487,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
return FOUND;
}
- };
+ }
/** */
private final GetPageHandler<Remove> lockBackAndRmvFromLeaf = new LockBackAndRmvFromLeaf();
@@ -520,7 +512,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
return res;
}
- };
+ }
/** */
private final GetPageHandler<Remove> lockBackAndTail = new LockBackAndTail();
@@ -544,7 +536,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
return res;
}
- };
+ }
/** */
private final GetPageHandler<Remove> lockTailForward = new LockTailForward();
@@ -560,7 +552,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
return FOUND;
}
- };
+ }
/** */
private final GetPageHandler<Remove> lockTail = new LockTail();
@@ -590,7 +582,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
return FOUND;
}
- };
+ }
/** */
private final PageHandler<Void, Bool> cutRoot = new CutRoot();
@@ -620,7 +612,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
return TRUE;
}
- };
+ }
/** */
private final PageHandler<Long, Bool> addRoot = new AddRoot();
@@ -651,7 +643,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
return TRUE;
}
- };
+ }
/** */
private final PageHandler<Long, Bool> initRoot = new InitRoot();
@@ -681,7 +673,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
return TRUE;
}
- };
+ }
/**
* @param name Tree name.
@@ -947,15 +939,16 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
/**
* @param row Lookup row for exact match.
- * @param c Found row closure.
- * @return Found result.
+ * @param x Implementation specific argument, {@code null} always means that we need to return full detached data row.
+ * @return Found result or {@code null}.
* @throws IgniteCheckedException If failed.
*/
- public final <R> R findOne(L row, RowClosure<L, R> c) throws IgniteCheckedException {
+ @SuppressWarnings("unchecked")
+ public final <R> R findOne(L row, Object x) throws IgniteCheckedException {
checkDestroyed();
try {
- GetOne g = new GetOne(row, c);
+ GetOne g = new GetOne(row, x);
doFind(g);
@@ -1036,7 +1029,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
case RETRY:
checkInterrupted();
- continue; // The child page got splitted, need to reread our page.
+ continue; // The child page got split, need to reread our page.
default:
return res;
@@ -1437,48 +1430,183 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
* @return Removed row.
* @throws IgniteCheckedException If failed.
*/
- @SuppressWarnings("unused")
- public final T removeCeil(L row) throws IgniteCheckedException {
- return doRemove(row, true, true);
+ @Override public final T remove(L row) throws IgniteCheckedException {
+ return doRemove(row, true);
}
/**
* @param row Lookup row.
- * @return Removed row.
* @throws IgniteCheckedException If failed.
+ * @return {@code True} if removed row.
*/
- @Override public final T remove(L row) throws IgniteCheckedException {
- return doRemove(row, false, true);
+ public final boolean removex(L row) throws IgniteCheckedException {
+ Boolean res = (Boolean)doRemove(row, false);
+
+ return res != null ? res : false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void invoke(L row, Object z, InvokeClosure<T> c) throws IgniteCheckedException {
+ checkDestroyed();
+
+ Invoke x = new Invoke(row, z, c);
+
+ try {
+ for (;;) {
+ x.init();
+
+ Result res = invokeDown(x, x.rootId, 0L, 0L, x.rootLvl);
+
+ switch (res) {
+ case RETRY:
+ case RETRY_ROOT:
+ checkInterrupted();
+
+ continue;
+
+ default:
+ if (!x.isFinished()) {
+ res = x.tryFinish();
+
+ if (res == RETRY || res == RETRY_ROOT) {
+ checkInterrupted();
+
+ continue;
+ }
+
+ assert x.isFinished(): res;
+ }
+
+ return;
+ }
+ }
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteCheckedException("Runtime failure on search row: " + row, e);
+ }
+ catch (RuntimeException e) {
+ throw new IgniteException("Runtime failure on search row: " + row, e);
+ }
+ catch (AssertionError e) {
+ throw new AssertionError("Assertion error on search row: " + row, e);
+ }
+ finally {
+ x.releaseAll();
+ }
}
/**
- * @param row Lookup row.
+ * @param x Invoke operation.
+ * @param pageId Page ID.
+ * @param backId Expected backward page ID if we are going to the right.
+ * @param fwdId Expected forward page ID.
+ * @param lvl Level.
+ * @return Result code.
* @throws IgniteCheckedException If failed.
- * @return {@code True} if removed row.
*/
- public final boolean removex(L row) throws IgniteCheckedException {
- Boolean res = (Boolean)doRemove(row, false, false);
+ private Result invokeDown(final Invoke x, final long pageId, final long backId, final long fwdId, final int lvl)
+ throws IgniteCheckedException {
+ assert lvl >= 0 : lvl;
- return res != null ? res : false;
+ if (x.isTail(pageId, lvl))
+ return FOUND; // We've already locked this page, so return that we are ok.
+
+ final Page page = page(pageId);
+
+ try {
+ for (;;) {
+ // Init args.
+ x.pageId(pageId);
+ x.fwdId(fwdId);
+ x.backId(backId);
+
+ Result res = readPage(page, this, search, x, lvl, RETRY);
+
+ switch (res) {
+ case GO_DOWN_X:
+ assert backId != 0;
+ assert x.backId == 0; // We did not setup it yet.
+
+ x.backId(pageId); // Dirty hack to setup a check inside of askNeighbor.
+
+ // We need to get backId here for our child page, it must be the last child of our back.
+ res = askNeighbor(backId, x, true);
+
+ if (res != FOUND)
+ return res; // Retry.
+
+ assert x.backId != pageId; // It must be updated in askNeighbor.
+
+ // Intentional fallthrough.
+ case GO_DOWN:
+ res = x.tryReplaceInner(page, pageId, fwdId, lvl);
+
+ if (res != RETRY)
+ res = invokeDown(x, x.pageId, x.backId, x.fwdId, lvl - 1);
+
+ if (res == RETRY_ROOT || x.isFinished())
+ return res;
+
+ if (res == RETRY) {
+ checkInterrupted();
+
+ continue;
+ }
+
+ // Unfinished Put does insertion on the same level.
+ if (x.isPut())
+ continue;
+
+ assert x.isRemove(); // Guarded by isFinished.
+
+ res = x.finishOrLockTail(page, pageId, backId, fwdId, lvl);
+
+ return res;
+
+ case NOT_FOUND:
+ if (lvl == 0)
+ x.invokeClosure();
+
+ return x.onNotFound(page, pageId, fwdId, lvl);
+
+ case FOUND:
+ if (lvl == 0)
+ x.invokeClosure();
+
+ return x.onFound(page, pageId, backId, fwdId, lvl);
+
+ default:
+ return res;
+ }
+ }
+ }
+ finally {
+ x.levelExit();
+
+ if (x.canRelease(page, lvl))
+ page.close();
+ }
}
+
/**
* @param row Lookup row.
- * @param ceil If we can remove ceil row when we can not find exact.
* @param needOld {@code True} if need return removed row.
* @return Removed row.
* @throws IgniteCheckedException If failed.
*/
- private T doRemove(L row, boolean ceil, boolean needOld) throws IgniteCheckedException {
+ private T doRemove(L row, boolean needOld) throws IgniteCheckedException {
checkDestroyed();
- Remove r = new Remove(row, ceil, needOld);
+ Remove r = new Remove(row, needOld);
try {
for (;;) {
r.init();
- switch (removeDown(r, r.rootId, 0L, 0L, r.rootLvl)) {
+ Result res = removeDown(r, r.rootId, 0L, 0L, r.rootLvl);
+
+ switch (res) {
case RETRY:
case RETRY_ROOT:
checkInterrupted();
@@ -1487,15 +1615,11 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
default:
if (!r.isFinished()) {
- Result res = r.finishTail();
+ res = r.finishTail();
// If not found, then the tree grew beyond our call stack -> retry from the actual root.
if (res == RETRY || res == NOT_FOUND) {
- int root = getRootLevel();
-
- boolean checkRes = r.checkTailLevel(root);
-
- assert checkRes : "tail=" + r.tail + ", root=" + root + ", res=" + res;
+ assert r.checkTailLevel(getRootLevel()) : "tail=" + r.tail + ", res=" + res;
checkInterrupted();
@@ -1521,9 +1645,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
throw new AssertionError("Assertion error on search row: " + row, e);
}
finally {
- r.releaseTail();
-
- r.reuseFreePages();
+ r.releaseAll();
}
}
@@ -1579,12 +1701,10 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
continue;
}
- if (res != RETRY_ROOT && !r.isFinished()) {
- res = r.finishTail();
+ if (res == RETRY_ROOT || r.isFinished())
+ return res;
- if (res == NOT_FOUND)
- res = r.lockTail(pageId, page, backId, fwdId, lvl);
- }
+ res = r.finishOrLockTail(page, pageId, backId, fwdId, lvl);
return res;
@@ -1592,31 +1712,12 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
// We are at the bottom.
assert lvl == 0 : lvl;
- if (!r.ceil) {
- r.finish();
-
- return res;
- }
+ r.finish();
- // Intentional fallthrough for ceiling remove.
+ return res;
case FOUND:
- // We must be at the bottom here, just need to remove row from the current page.
- assert lvl == 0 : lvl;
-
- res = r.removeFromLeaf(pageId, page, backId, fwdId);
-
- if (res == NOT_FOUND) {
- assert r.ceil : "must be a retry if not a ceiling remove";
-
- r.finish();
- }
- else if (res == FOUND && r.tail == null) {
- // Finish if we don't need to do any merges.
- r.finish();
- }
-
- return res;
+ return r.tryRemoveFromLeaf(page, pageId, backId, fwdId, lvl);
default:
return res;
@@ -1716,7 +1817,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
* {@inheritDoc}
*/
@Override public final T put(T row) throws IgniteCheckedException {
- return put(row, true);
+ return doPut(row, true);
}
/**
@@ -1725,7 +1826,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
* @return {@code True} if replaced existing row.
*/
public boolean putx(T row) throws IgniteCheckedException {
- Boolean res = (Boolean)put(row, false);
+ Boolean res = (Boolean)doPut(row, false);
return res != null ? res : false;
}
@@ -1736,7 +1837,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
* @return Old row.
* @throws IgniteCheckedException If failed.
*/
- private T put(T row, boolean needOld) throws IgniteCheckedException {
+ private T doPut(T row, boolean needOld) throws IgniteCheckedException {
checkDestroyed();
Put p = new Put(row, needOld);
@@ -1858,7 +1959,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
/**
* @return {@code True} if state was changed.
*/
- private final boolean markDestroyed() {
+ private boolean markDestroyed() {
return destroyed.compareAndSet(false, true);
}
@@ -1997,31 +2098,10 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
assert p.pageId != pageId;
assert p.fwdId != fwdId || fwdId == 0;
- // Need to replace key in inner page. There is no race because we keep tail lock after split.
- if (p.needReplaceInner == TRUE) {
- p.needReplaceInner = FALSE; // Protect from retries.
-
- long oldFwdId = p.fwdId;
- long oldPageId = p.pageId;
-
- // Set old args.
- p.fwdId = fwdId;
- p.pageId = pageId;
-
- res = writePage(pageMem, page, this, replace, p, lvl, RETRY);
+ res = p.tryReplaceInner(page, pageId, fwdId, lvl);
- // Restore args.
- p.pageId = oldPageId;
- p.fwdId = oldFwdId;
-
- if (res != FOUND)
- return res; // Need to retry.
-
- p.needReplaceInner = DONE; // We can have only single matching inner key.
- }
-
- // Go down recursively.
- res = putDown(p, p.pageId, p.fwdId, lvl - 1);
+ if (res != RETRY) // Go down recursively.
+ res = putDown(p, p.pageId, p.fwdId, lvl - 1);
if (res == RETRY_ROOT || p.isFinished())
return res;
@@ -2034,21 +2114,13 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
case FOUND: // Do replace.
assert lvl == 0 : "This replace can happen only at the bottom level.";
- // Init args.
- p.pageId = pageId;
- p.fwdId = fwdId;
-
- return writePage(pageMem, page, this, replace, p, lvl, RETRY);
+ return p.tryReplace(page, pageId, fwdId, lvl);
case NOT_FOUND: // Do insert.
assert lvl == p.btmLvl : "must insert at the bottom level";
assert p.needReplaceInner == FALSE : p.needReplaceInner + " " + lvl;
- // Init args.
- p.pageId = pageId;
- p.fwdId = fwdId;
-
- return writePage(pageMem, page, this, insert, p, lvl, RETRY);
+ return p.tryInsert(page, pageId, fwdId, lvl);
default:
return res;
@@ -2096,29 +2168,32 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
*/
private abstract class Get {
/** */
- protected long rmvId;
+ long rmvId;
/** Starting point root level. May be outdated. Must be modified only in {@link Get#init()}. */
- protected int rootLvl;
+ int rootLvl;
/** Starting point root ID. May be outdated. Must be modified only in {@link Get#init()}. */
- protected long rootId;
+ long rootId;
/** */
- protected L row;
+ L row;
/** In/Out parameter: Page ID. */
- protected long pageId;
+ long pageId;
/** In/Out parameter: expected forward page ID. */
- protected long fwdId;
+ long fwdId;
/** In/Out parameter: in case of right turn this field will contain backward page ID for the child. */
- protected long backId;
+ long backId;
/** */
int shift;
+ /** If this operation is a part of invoke. */
+ Invoke invoke;
+
/**
* @param row Row.
*/
@@ -2129,6 +2204,21 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
}
/**
+ * @param g Other operation to copy from.
+ * @return {@code this}.
+ */
+ final Get copyFrom(Get g) {
+ rmvId = g.rmvId;
+ rootLvl = g.rootLvl;
+ pageId = g.pageId;
+ fwdId = g.fwdId;
+ backId = g.backId;
+ shift = g.shift;
+
+ return this;
+ }
+
+ /**
* Initialize operation.
*
* @throws IgniteCheckedException If failed.
@@ -2146,7 +2236,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
* @param rootLvl Root level.
* @param rmvId Remove ID to be afraid of.
*/
- final void restartFromRoot(long rootId, int rootLvl, long rmvId) {
+ void restartFromRoot(long rootId, int rootLvl, long rmvId) {
this.rootId = rootId;
this.rootLvl = rootLvl;
this.rmvId = rmvId;
@@ -2188,6 +2278,34 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
boolean canRelease(Page page, int lvl) {
return page != null;
}
+
+ /**
+ * @param backId Back page ID.
+ */
+ void backId(long backId) {
+ this.backId = backId;
+ }
+
+ /**
+ * @param pageId Page ID.
+ */
+ void pageId(long pageId) {
+ this.pageId = pageId;
+ }
+
+ /**
+ * @param fwdId Forward page ID.
+ */
+ void fwdId(long fwdId) {
+ this.fwdId = fwdId;
+ }
+
+ /**
+ * @return {@code true} If the operation is finished.
+ */
+ boolean isFinished() {
+ throw new IllegalStateException();
+ }
}
/**
@@ -2195,25 +2313,26 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
*/
private final class GetOne extends Get {
/** */
- private final RowClosure<L, ?> c;
+ Object x;
/**
* @param row Row.
- * @param c Row closure.
+ * @param x Implementation specific argument.
*/
- private GetOne(L row, RowClosure<L, ?> c) {
+ private GetOne(L row, Object x) {
super(row);
- this.c = c;
+ this.x = x;
}
/** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
@Override boolean found(BPlusIO<L> io, long pageAddr, int idx, int lvl) throws IgniteCheckedException {
// Check if we are on an inner page and can't get row from it.
if (lvl != 0 && !canGetRowFromInner)
return false;
- row = c != null ? (L)c.row(io, pageAddr, idx) : getRow(io, pageAddr, idx);
+ row = getRow(io, pageAddr, idx, x);
return true;
}
@@ -2224,7 +2343,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
*/
private final class GetCursor extends Get {
/** */
- private ForwardCursor cursor;
+ ForwardCursor cursor;
/**
* @param lower Lower bound.
@@ -2261,31 +2380,31 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
*/
private final class Put extends Get {
/** Right child page ID for split row. */
- private long rightId;
+ long rightId;
/** Replaced row if any. */
- private T oldRow;
+ T oldRow;
/**
* This page is kept locked after split until insert to the upper level will not be finished.
* It is needed because split row will be "in flight" and if we'll release tail, remove on
* split row may fail.
*/
- private Page tail;
+ Page tail;
/** */
- private long tailPageAddr;
+ long tailPageAddr;
/**
* Bottom level for insertion (insert can't go deeper). Will be incremented on split on each level.
*/
- private short btmLvl;
+ short btmLvl;
/** */
Bool needReplaceInner = FALSE;
/** */
- private final boolean needOld;
+ final boolean needOld;
/**
* @param row Row.
@@ -2302,6 +2421,8 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
if (lvl == 0) // Leaf: need to stop.
return true;
+ assert btmLvl == 0; // It can not be insert.
+
// If we can get full row from the inner page, we have to replace it with the new one. On the way down
// we can not miss inner key even in presence of concurrent operations because of `triangle` invariant +
// concurrent inner replace handling by retrying from root.
@@ -2348,10 +2469,8 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
tail(null, 0L);
}
- /**
- * @return {@code true} If finished.
- */
- private boolean isFinished() {
+ /** {@inheritDoc} */
+ @Override boolean isFinished() {
return row == null;
}
@@ -2505,45 +2624,404 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
}
}
}
- }
- /**
- * Remove operation.
- */
- private final class Remove extends Get implements ReuseBag {
- /** */
- private boolean ceil;
+ /**
+ * @param page Page.
+ * @param pageId Page ID.
+ * @param fwdId Forward ID.
+ * @param lvl Level.
+ * @return Result.
+ * @throws IgniteCheckedException If failed.
+ */
+ private Result tryReplaceInner(Page page, long pageId, long fwdId, int lvl)
+ throws IgniteCheckedException {
+ // Need to replace key in inner page. There is no race because we keep tail lock after split.
+ if (needReplaceInner == TRUE) {
+ needReplaceInner = FALSE; // Protect from retries.
- /** We may need to lock part of the tree branch from the bottom to up for multiple levels. */
- private Tail<L> tail;
+ long oldFwdId = this.fwdId;
+ long oldPageId = this.pageId;
- /** */
- Bool needReplaceInner = FALSE;
+ // Set old args.
+ this.fwdId = fwdId;
+ this.pageId = pageId;
- /** */
- Bool needMergeEmptyBranch = FALSE;
+ Result res = writePage(pageMem, page, BPlusTree.this, replace, this, lvl, RETRY);
- /** Removed row. */
- private T rmvd;
+ // Restore args.
+ this.pageId = oldPageId;
+ this.fwdId = oldFwdId;
- /** Current page. */
- private Page page;
+ if (res == RETRY)
+ return RETRY;
+
+ needReplaceInner = DONE; // We can have only a single matching inner key.
+
+ return FOUND;
+ }
+
+ return NOT_FOUND;
+ }
+
+ /**
+ * @param page Page.
+ * @param pageId Page ID.
+ * @param fwdId Forward ID.
+ * @param lvl Level.
+ * @return Result.
+ * @throws IgniteCheckedException If failed.
+ */
+ private Result tryInsert(Page page, long pageId, long fwdId, int lvl) throws IgniteCheckedException {
+ // Init args.
+ this.pageId = pageId;
+ this.fwdId = fwdId;
+
+ return writePage(pageMem, page, BPlusTree.this, insert, this, lvl, RETRY);
+ }
+
+ /**
+ * @param page Page.
+ * @param pageId Page ID.
+ * @param fwdId Forward ID.
+ * @param lvl Level.
+ * @return Result.
+ * @throws IgniteCheckedException If failed.
+ */
+ public Result tryReplace(Page page, long pageId, long fwdId, int lvl) throws IgniteCheckedException {
+ // Init args.
+ this.pageId = pageId;
+ this.fwdId = fwdId;
+
+ return writePage(pageMem, page, BPlusTree.this, replace, this, lvl, RETRY);
+ }
+ }
+
+ /**
+ * Invoke operation.
+ */
+ private final class Invoke extends Get {
+ /** */
+ Object x;
+
+ /** */
+ InvokeClosure<T> clo;
+
+ /** */
+ Bool closureInvoked = FALSE;
+
+ /** */
+ T foundRow;
+
+ /** */
+ Get op;
+
+ /**
+ * @param row Row.
+ * @param x Implementation specific argument.
+ * @param clo Closure.
+ */
+ private Invoke(L row, Object x, final InvokeClosure<T> clo) {
+ super(row);
+
+ assert clo != null;
+
+ this.clo = clo;
+ this.x = x;
+ }
+
+ /** {@inheritDoc} */
+ @Override void pageId(long pageId) {
+ this.pageId = pageId;
+
+ if (op != null)
+ op.pageId = pageId;
+ }
+
+ /** {@inheritDoc} */
+ @Override void fwdId(long fwdId) {
+ this.fwdId = fwdId;
+
+ if (op != null)
+ op.fwdId = fwdId;
+ }
+
+ /** {@inheritDoc} */
+ @Override void backId(long backId) {
+ this.backId = backId;
+
+ if (op != null)
+ op.backId = backId;
+ }
+
+ /** {@inheritDoc} */
+ @Override void restartFromRoot(long rootId, int rootLvl, long rmvId) {
+ super.restartFromRoot(rootId, rootLvl, rmvId);
+
+ if (op != null)
+ op.restartFromRoot(rootId, rootLvl, rmvId);
+ }
+
+ /** {@inheritDoc} */
+ @Override boolean found(BPlusIO<L> io, long pageAddr, int idx, int lvl) throws IgniteCheckedException {
+ // If the operation is initialized, then the closure has been called already.
+ if (op != null)
+ return op.found(io, pageAddr, idx, lvl);
+
+ if (lvl == 0) {
+ if (closureInvoked == FALSE) {
+ closureInvoked = READY;
+
+ foundRow = getRow(io, pageAddr, idx, x);
+ }
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override boolean notFound(BPlusIO<L> io, long pageAddr, int idx, int lvl) throws IgniteCheckedException {
+ // If the operation is initialized, then the closure has been called already.
+ if (op != null)
+ return op.notFound(io, pageAddr, idx, lvl);
+
+ if (lvl == 0) {
+ if (closureInvoked == FALSE)
+ closureInvoked = READY;
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ private void invokeClosure() throws IgniteCheckedException {
+ if (closureInvoked != READY)
+ return;
+
+ closureInvoked = DONE;
+
+ clo.call(foundRow);
+
+ switch (clo.operationType()) {
+ case PUT:
+ T newRow = clo.newRow();
+
+ assert newRow != null;
+
+ op = new Put(newRow, false);
+
+ break;
+
+ case REMOVE:
+ assert foundRow != null;
+
+ op = new Remove(row, false);
+
+ break;
+
+ case NOOP:
+ return;
+
+ default:
+ throw new IllegalStateException();
+ }
+
+ op.copyFrom(this);
+
+ op.invoke = this;
+ }
+
+ /** {@inheritDoc} */
+ @Override boolean canRelease(Page page, int lvl) {
+ if (page == null)
+ return false;
+
+ if (op == null)
+ return true;
+
+ return op.canRelease(page, lvl);
+ }
+
+ /**
+ * @return {@code true} If it is a {@link Put} operation internally.
+ */
+ private boolean isPut() {
+ return op != null && op.getClass() == Put.class;
+ }
+
+ /**
+ * @return {@code true} If it is a {@link Remove} operation internally.
+ */
+ private boolean isRemove() {
+ return op != null && op.getClass() == Remove.class;
+ }
+
+ /**
+ * @param pageId Page ID.
+ * @param lvl Level.
+ * @return {@code true} If it is a {@link Remove} and the page is in tail.
+ */
+ private boolean isTail(long pageId, int lvl) {
+ return isRemove() && ((Remove)op).isTail(pageId, lvl);
+ }
+
+ /**
+ */
+ private void levelExit() {
+ if (isRemove())
+ ((Remove)op).page = null;
+ }
+
+ /**
+ * Release all the resources by the end of operation.
+ */
+ private void releaseAll() throws IgniteCheckedException {
+ if (isRemove())
+ ((Remove)op).releaseAll();
+ }
+
+ /**
+ * @param page Page.
+ * @param pageId Page ID.
+ * @param fwdId Forward ID.
+ * @param lvl Level.
+ * @return Result.
+ * @throws IgniteCheckedException If failed.
+ */
+ private Result onNotFound(Page page, long pageId, long fwdId, int lvl)
+ throws IgniteCheckedException {
+ if (op == null)
+ return NOT_FOUND;
+
+ if (isRemove()) {
+ assert lvl == 0;
+
+ ((Remove)op).finish();
+
+ return NOT_FOUND;
+ }
+
+ return ((Put)op).tryInsert(page, pageId, fwdId, lvl);
+ }
+
+ /**
+ * @param page Page.
+ * @param pageId Page ID.
+ * @param backId Back page ID.
+ * @param fwdId Forward ID.
+ * @param lvl Level.
+ * @return Result.
+ * @throws IgniteCheckedException If failed.
+ */
+ private Result onFound(Page page, long pageId, long backId, long fwdId, int lvl)
+ throws IgniteCheckedException {
+ if (op == null)
+ return FOUND;
+
+ if (isRemove())
+ return ((Remove)op).tryRemoveFromLeaf(page, pageId, backId, fwdId, lvl);
+
+ return ((Put)op).tryReplace(page, pageId, fwdId, lvl);
+ }
+
+ /**
+ * @return Result.
+ * @throws IgniteCheckedException If failed.
+ */
+ private Result tryFinish() throws IgniteCheckedException {
+ assert op != null; // Must be guarded by isFinished.
+
+ if (isPut())
+ return RETRY;
+
+ Result res = ((Remove)op).finishTail();
+
+ if (res == NOT_FOUND)
+ res = RETRY;
+
+ assert res == FOUND || res == RETRY: res;
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override boolean isFinished() {
+ if (closureInvoked != DONE)
+ return false;
+
+ if (op == null)
+ return true;
+
+ return op.isFinished();
+ }
+
+ /**
+ * @param page Page.
+ * @param pageId Page ID.
+ * @param fwdId Forward ID.
+ * @param lvl Level.
+ * @return Result.
+ * @throws IgniteCheckedException If failed.
+ */
+ Result tryReplaceInner(Page page, long pageId, long fwdId, int lvl) throws IgniteCheckedException {
+ if (!isPut())
+ return NOT_FOUND;
+
+ return ((Put)op).tryReplaceInner(page, pageId, fwdId, lvl);
+ }
+
+ /**
+ * @param page Page.
+ * @param pageId Page ID.
+ * @param backId Back page ID.
+ * @param fwdId Forward ID.
+ * @param lvl Level.
+ * @return Result.
+ * @throws IgniteCheckedException If failed.
+ */
+ public Result finishOrLockTail(Page page, long pageId, long backId, long fwdId, int lvl)
+ throws IgniteCheckedException {
+ return ((Remove)op).finishOrLockTail(page, pageId, backId, fwdId, lvl);
+ }
+ }
+
+ /**
+ * Remove operation.
+ */
+ private final class Remove extends Get implements ReuseBag {
+ /** We may need to lock part of the tree branch from the bottom to up for multiple levels. */
+ Tail<L> tail;
+
+ /** */
+ Bool needReplaceInner = FALSE;
+
+ /** */
+ Bool needMergeEmptyBranch = FALSE;
+
+ /** Removed row. */
+ T rmvd;
+
+ /** Current page. */
+ Page page;
/** */
- private Object freePages;
+ Object freePages;
/** */
- private final boolean needOld;
+ final boolean needOld;
/**
* @param row Row.
- * @param ceil If we can remove ceil row when we can not find exact.
* @param needOld {@code True} If need return old value.
*/
- private Remove(L row, boolean ceil, boolean needOld) {
+ private Remove(L row, boolean needOld) {
super(row);
- this.ceil = ceil;
this.needOld = needOld;
}
@@ -2551,12 +3029,12 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
@SuppressWarnings("unchecked")
@Override public long pollFreePage() {
if (freePages == null)
- return 0;
+ return 0L;
if (freePages.getClass() == GridLongList.class) {
GridLongList list = ((GridLongList)freePages);
- return list.isEmpty() ? 0 : list.remove();
+ return list.isEmpty() ? 0L : list.remove();
}
long res = (long)freePages;
@@ -2569,7 +3047,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public void addFreePage(long pageId) {
- assert pageId != 0;
+ assert pageId != 0L;
if (freePages == null)
freePages = pageId;
@@ -2957,6 +3435,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
* @param idx Index to remove.
* @throws IgniteCheckedException If failed.
*/
+ @SuppressWarnings("unchecked")
private void removeDataRowFromLeaf(Page page, BPlusIO<L> io, long pageAddr, int cnt, int idx)
throws IgniteCheckedException {
assert idx >= 0 && idx < cnt: idx;
@@ -3280,10 +3759,8 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
return true;
}
- /**
- * @return {@code true} If finished.
- */
- private boolean isFinished() {
+ /** {@inheritDoc} */
+ @Override boolean isFinished() {
return row == null;
}
@@ -3438,9 +3915,58 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
* @param rootLvl Actual root level.
* @return {@code true} If tail level is correct.
*/
- public boolean checkTailLevel(int rootLvl) {
+ private boolean checkTailLevel(int rootLvl) {
return tail == null || tail.lvl < rootLvl;
}
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ private void releaseAll() throws IgniteCheckedException {
+ releaseTail();
+ reuseFreePages();
+ }
+
+ /**
+ * @param page Page.
+ * @param pageId Page ID.
+ * @param backId Back page ID.
+ * @param fwdId Forward ID.
+ * @param lvl Level.
+ * @return Result.
+ * @throws IgniteCheckedException If failed.
+ */
+ private Result finishOrLockTail(Page page, long pageId, long backId, long fwdId, int lvl)
+ throws IgniteCheckedException {
+ Result res = finishTail();
+
+ if (res == NOT_FOUND)
+ res = lockTail(pageId, page, backId, fwdId, lvl);
+
+ return res;
+ }
+
+ /**
+ * @param page Page.
+ * @param pageId Page ID.
+ * @param backId Back page ID.
+ * @param fwdId Forward ID.
+ * @param lvl Level.
+ * @return Result.
+ * @throws IgniteCheckedException If failed.
+ */
+ private Result tryRemoveFromLeaf(Page page, long pageId, long backId, long fwdId, int lvl)
+ throws IgniteCheckedException {
+ // We must be at the bottom here, just need to remove row from the current page.
+ assert lvl == 0 : lvl;
+
+ Result res = removeFromLeaf(pageId, page, backId, fwdId);
+
+ if (res == FOUND && tail == null) // Finish if we don't need to do any merges.
+ finish();
+
+ return res;
+ }
}
/**
@@ -3619,7 +4145,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
protected abstract int compare(BPlusIO<L> io, long pageAddr, int idx, L row) throws IgniteCheckedException;
/**
- * Get the full detached row. Can be called on inner page only if {@link #canGetRowFromInner} is {@code true}.
+ * Get a full detached data row.
*
* @param io IO.
* @param pageAddr Page address.
@@ -3627,7 +4153,21 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
* @return Full detached data row.
* @throws IgniteCheckedException If failed.
*/
- protected abstract T getRow(BPlusIO<L> io, long pageAddr, int idx) throws IgniteCheckedException;
+ protected final T getRow(BPlusIO<L> io, long pageAddr, int idx) throws IgniteCheckedException {
+ return getRow(io, pageAddr, idx, null);
+ }
+
+ /**
+ * Get data row. Can be called on inner page only if {@link #canGetRowFromInner} is {@code true}.
+ *
+ * @param io IO.
+ * @param pageAddr Page address.
+ * @param idx Index.
+ * @param x Implementation specific argument, {@code null} always means that we need to return full detached data row.
+ * @return Data row.
+ * @throws IgniteCheckedException If failed.
+ */
+ protected abstract T getRow(BPlusIO<L> io, long pageAddr, int idx, Object x) throws IgniteCheckedException;
/**
* Forward cursor.
http://git-wip-us.apache.org/repos/asf/ignite/blob/ee28b9cb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
index fc78f69..0c71731 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
@@ -97,6 +97,11 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
}
/** {@inheritDoc} */
+ @Override protected long nextPartCounter() {
+ return locPart.nextUpdateCounter();
+ }
+
+ /** {@inheritDoc} */
@Override public int memorySize() throws IgniteCheckedException {
int rdrsOverhead;
http://git-wip-us.apache.org/repos/asf/ignite/blob/ee28b9cb/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteTree.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteTree.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteTree.java
index 8dcd205..7eae0d5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteTree.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteTree.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.util;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.util.lang.GridCursor;
+import org.jetbrains.annotations.Nullable;
/**
* Interface for ignite internal tree.
@@ -34,6 +35,14 @@ public interface IgniteTree<L, T> {
public T put(T val) throws IgniteCheckedException;
/**
+ * @param key Key.
+ * @param x Implementation specific argument, {@code null} always means that we need a full detached data row.
+ * @param c Closure.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void invoke(L key, Object x, InvokeClosure<T> c) throws IgniteCheckedException;
+
+ /**
* Returns the value to which the specified key is mapped, or {@code null} if this tree contains no mapping for the
* key.
*
@@ -70,4 +79,42 @@ public interface IgniteTree<L, T> {
* @throws IgniteCheckedException If failed.
*/
public long size() throws IgniteCheckedException;
+
+ /**
+ *
+ */
+ interface InvokeClosure<T> {
+ /**
+ *
+ * @param row Old row or {@code null} if old row not found.
+ * @throws IgniteCheckedException If failed.
+ */
+ void call(@Nullable T row) throws IgniteCheckedException;
+
+ /**
+ * @return New row for {@link OperationType#PUT} operation.
+ */
+ T newRow();
+
+ /**
+ * @return Operation type for this closure or {@code null} if it is unknown.
+ * After method {@link #call(Object)} has been called, operation type must
+ * be know and this method can not return {@code null}.
+ */
+ OperationType operationType();
+ }
+
+ /**
+ *
+ */
+ enum OperationType {
+ /** */
+ NOOP,
+
+ /** */
+ REMOVE,
+
+ /** */
+ PUT
+ }
}
[3/3] ignite git commit: ignite-4652 Implemented BPlusTree.invoke
Posted by sb...@apache.org.
ignite-4652 Implemented BPlusTree.invoke
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ee28b9cb
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ee28b9cb
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ee28b9cb
Branch: refs/heads/ignite-3477
Commit: ee28b9cb89400af6fcddd89a52fcd1adbcd5d4ff
Parents: 40f015d
Author: sboikov <sb...@gridgain.com>
Authored: Wed Feb 22 09:55:50 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Feb 22 09:55:50 2017 +0300
----------------------------------------------------------------------
.../benchmarks/jmh/tree/BPlusTreeBenchmark.java | 3 +-
.../internal/pagemem/wal/record/DataRecord.java | 10 +-
.../processors/cache/GridCacheMapEntry.java | 1729 +++++++++++-------
.../cache/GridCacheUpdateAtomicResult.java | 96 +-
.../cache/IgniteCacheOffheapManager.java | 48 +
.../cache/IgniteCacheOffheapManagerImpl.java | 236 ++-
.../processors/cache/database/CacheDataRow.java | 6 +
.../cache/database/CacheDataRowAdapter.java | 13 +
.../cache/database/MetadataStorage.java | 2 +-
.../cache/database/tree/BPlusTree.java | 936 ++++++++--
.../distributed/dht/GridDhtCacheEntry.java | 5 +
.../apache/ignite/internal/util/IgniteTree.java | 47 +
.../processors/database/BPlusTreeSelfTest.java | 272 ++-
.../database/FreeListImplSelfTest.java | 5 +
.../processors/query/h2/database/H2Tree.java | 2 +-
.../processors/query/h2/opt/GridH2Row.java | 5 +
.../query/h2/opt/GridH2TreeIndex.java | 5 +
17 files changed, 2462 insertions(+), 958 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/ee28b9cb/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/tree/BPlusTreeBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/tree/BPlusTreeBenchmark.java b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/tree/BPlusTreeBenchmark.java
index 7355850..dc74363 100644
--- a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/tree/BPlusTreeBenchmark.java
+++ b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/tree/BPlusTreeBenchmark.java
@@ -190,7 +190,8 @@ public class BPlusTreeBenchmark extends JmhAbstractBenchmark {
}
/** {@inheritDoc} */
- @Override protected Long getRow(BPlusIO<Long> io, long pageAddr, int idx) throws IgniteCheckedException {
+ @Override protected Long getRow(BPlusIO<Long> io, long pageAddr, int idx, Object ignore)
+ throws IgniteCheckedException {
assert io.canGetRow() : io;
return io.getLookupRow(this, pageAddr, idx);
http://git-wip-us.apache.org/repos/asf/ignite/blob/ee28b9cb/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataRecord.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataRecord.java
index 6592852..d2747f1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataRecord.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataRecord.java
@@ -17,15 +17,10 @@
package org.apache.ignite.internal.pagemem.wal.record;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.internal.S;
-
-import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
/**
*
@@ -68,6 +63,7 @@ public class DataRecord extends WALRecord {
return writeEntries == null ? Collections.<DataEntry>emptyList() : writeEntries;
}
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(DataRecord.class, this, super.toString());
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ee28b9cb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 6dc1d04..d28ea25 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -40,7 +40,9 @@ import org.apache.ignite.internal.pagemem.wal.StorageException;
import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult.UpdateOutcome;
import org.apache.ignite.internal.processors.cache.database.CacheDataRow;
+import org.apache.ignite.internal.processors.cache.database.CacheDataRowAdapter;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateFuture;
@@ -60,6 +62,7 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConfl
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx;
import org.apache.ignite.internal.processors.dr.GridDrType;
+import org.apache.ignite.internal.util.IgniteTree;
import org.apache.ignite.internal.util.lang.GridClosureException;
import org.apache.ignite.internal.util.lang.GridMetadataAwareAdapter;
import org.apache.ignite.internal.util.lang.GridTuple;
@@ -83,6 +86,8 @@ import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ;
import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED;
import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_UNLOCKED;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE;
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE;
import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE;
/**
@@ -1535,11 +1540,11 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public GridCacheUpdateAtomicResult innerUpdate(
- GridCacheVersion newVer,
+ final GridCacheVersion newVer,
final UUID evtNodeId,
final UUID affNodeId,
- GridCacheOperation op,
- @Nullable Object writeObj,
+ final GridCacheOperation op,
+ @Nullable final Object writeObj,
@Nullable final Object[] invokeArgs,
final boolean writeThrough,
final boolean readThrough,
@@ -1555,42 +1560,22 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
final GridDrType drType,
final long explicitTtl,
final long explicitExpireTime,
- @Nullable GridCacheVersion conflictVer,
+ @Nullable final GridCacheVersion conflictVer,
final boolean conflictResolve,
final boolean intercept,
@Nullable final UUID subjId,
final String taskName,
@Nullable final CacheObject prevVal,
@Nullable final Long updateCntr,
- @Nullable GridDhtAtomicAbstractUpdateFuture fut
+ @Nullable final GridDhtAtomicAbstractUpdateFuture fut
) throws IgniteCheckedException, GridCacheEntryRemovedException, GridClosureException {
- assert cctx.atomic();
-
- boolean res = true;
-
- CacheObject oldVal;
- CacheObject updated;
-
- GridCacheVersion enqueueVer = null;
+ assert cctx.atomic() && !detached();
- GridCacheVersionConflictContext<?, ?> conflictCtx = null;
-
- IgniteBiTuple<Object, Exception> invokeRes = null;
-
- // System TTL/ET which may have special values.
- long newSysTtl;
- long newSysExpireTime;
-
- // TTL/ET which will be passed to entry on update.
- long newTtl;
- long newExpireTime;
-
- Object key0 = null;
- Object updated0 = null;
-
- Long updateCntr0 = null;
+ AtomicCacheUpdateClosure c;
synchronized (this) {
+ checkObsolete();
+
boolean internal = isInternal() || !context().userCache();
Map<UUID, CacheContinuousQueryListener> lsnrs = cctx.continuousQueries().updateListeners(internal, false);
@@ -1598,679 +1583,270 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
boolean needVal = lsnrs != null || intercept || retval || op == GridCacheOperation.TRANSFORM
|| !F.isEmptyOrNulls(filter);
- checkObsolete();
-
- CacheDataRow oldRow = null;
-
- // Load and remove from swap if it is new.
- if (isStartVersion())
- oldRow = unswap(retval, false);
-
- // Prepare old value.
- oldVal = val;
-
// Possibly read value from store.
- boolean readFromStore = false;
-
- Object old0 = null;
-
- if (readThrough && needVal && oldVal == null && (cctx.readThrough() &&
- (op == GridCacheOperation.TRANSFORM || cctx.loadPreviousValue()))) {
- old0 = readThrough(null, key, false, subjId, taskName);
-
- oldVal = cctx.toCacheObject(old0);
-
- readFromStore = true;
-
- // Detach value before index update.
- oldVal = cctx.kernalContext().cacheObjects().prepareForCache(oldVal, cctx);
-
- // Calculate initial TTL and expire time.
- long initTtl;
- long initExpireTime;
-
- if (expiryPlc != null && oldVal != null) {
- IgniteBiTuple<Long, Long> initTtlAndExpireTime = initialTtlAndExpireTime(expiryPlc);
-
- initTtl = initTtlAndExpireTime.get1();
- initExpireTime = initTtlAndExpireTime.get2();
- }
- else {
- initTtl = CU.TTL_ETERNAL;
- initExpireTime = CU.EXPIRE_TIME_ETERNAL;
- }
+ boolean readFromStore = readThrough && needVal && (cctx.readThrough() &&
+ (op == GridCacheOperation.TRANSFORM || cctx.loadPreviousValue()));
+
+ c = new AtomicCacheUpdateClosure(this,
+ newVer,
+ op,
+ writeObj,
+ invokeArgs,
+ readFromStore,
+ writeThrough,
+ keepBinary,
+ expiryPlc,
+ primary,
+ verCheck,
+ filter,
+ explicitTtl,
+ explicitExpireTime,
+ conflictVer,
+ conflictResolve,
+ intercept,
+ updateCntr);
+
+ key.valueBytes(cctx.cacheObjectContext());
- if (oldVal != null)
- storeValue(oldVal, initExpireTime, ver, oldRow);
- // else nothing to do, real old value was null.
-
- update(oldVal, initExpireTime, initTtl, ver, true);
+ if (isNear()) {
+ CacheDataRow dataRow = val != null ? new CacheDataRowAdapter(key, val, ver, expireTimeExtras()) : null;
- if (deletedUnlocked() && oldVal != null && !isInternal())
- deletedUnlocked(false);
+ c.call(dataRow);
}
+ else
+ cctx.offheap().invoke(key, localPartition(), c);
- Object transformClo = null;
-
- // Request-level conflict resolution is needed, i.e. we do not know who will win in advance.
- if (conflictResolve) {
- GridCacheVersion oldConflictVer = version().conflictVersion();
-
- // Cache is conflict-enabled.
- if (cctx.conflictNeedResolve()) {
- GridCacheVersionedEntryEx newEntry;
-
- GridTuple3<Long, Long, Boolean> expiration = ttlAndExpireTime(expiryPlc,
- explicitTtl,
- explicitExpireTime);
-
- // Prepare old and new entries for conflict resolution.
- GridCacheVersionedEntryEx oldEntry = versionedEntry(keepBinary);
-
- if (op == GridCacheOperation.TRANSFORM) {
- transformClo = writeObj;
+ GridCacheUpdateAtomicResult updateRes = c.updateRes;
- EntryProcessor<Object, Object, ?> entryProcessor = (EntryProcessor<Object, Object, ?>)writeObj;
+ assert updateRes != null : c;
- oldVal = this.val;
+ CacheObject oldVal = c.oldRow != null ? c.oldRow.value() : null;
+ CacheObject updateVal = null;
+ GridCacheVersion updateVer = c.newVer;
- CacheInvokeEntry<Object, Object> entry = new CacheInvokeEntry(key, oldVal, version(),
- keepBinary, this);
+ // Apply metrics.
+ if (metrics &&
+ updateRes.outcome().updateReadMetrics() &&
+ cctx.cache().configuration().isStatisticsEnabled() &&
+ needVal) {
+ // PutIfAbsent methods must not update hit/miss statistics.
+ if (op != GridCacheOperation.UPDATE || F.isEmpty(filter) || !cctx.putIfAbsentFilter(filter))
+ cctx.cache().metrics0().onRead(oldVal != null);
+ }
- try {
- Object computed = entryProcessor.process(entry, invokeArgs);
+ switch (updateRes.outcome()) {
+ case VERSION_CHECK_FAILED: {
+ if (!cctx.isNear()) {
+ CacheObject evtVal;
- if (entry.modified())
- writeObj = cctx.toCacheObject(cctx.unwrapTemporary(entry.getValue()));
- else
- writeObj = oldVal;
+ if (op == GridCacheOperation.TRANSFORM) {
+ EntryProcessor<Object, Object, ?> entryProcessor =
+ (EntryProcessor<Object, Object, ?>)writeObj;
- key0 = entry.key();
+ CacheInvokeEntry<Object, Object> entry =
+ new CacheInvokeEntry<>(key, prevVal, version(), keepBinary, this);
- if (computed != null)
- invokeRes = new IgniteBiTuple(cctx.unwrapTemporary(computed), null);
- }
- catch (Exception e) {
- invokeRes = new IgniteBiTuple(null, e);
+ try {
+ entryProcessor.process(entry, invokeArgs);
- writeObj = oldVal;
+ evtVal = entry.modified() ?
+ cctx.toCacheObject(cctx.unwrapTemporary(entry.getValue())) : prevVal;
+ }
+ catch (Exception ignore) {
+ evtVal = prevVal;
+ }
}
- }
-
- newEntry = new GridCacheLazyPlainVersionedEntry<>(
- cctx,
- key,
- (CacheObject)writeObj,
- expiration.get1(),
- expiration.get2(),
- conflictVer != null ? conflictVer : newVer,
- keepBinary);
-
- // Resolve conflict.
- conflictCtx = cctx.conflictResolve(oldEntry, newEntry, verCheck);
-
- assert conflictCtx != null;
-
- boolean ignoreTime = cctx.config().getAtomicWriteOrderMode() == CacheAtomicWriteOrderMode.PRIMARY;
-
- // Use old value?
- if (conflictCtx.isUseOld()) {
- GridCacheVersion newConflictVer = conflictVer != null ? conflictVer : newVer;
+ else
+ evtVal = (CacheObject)writeObj;
- // Handle special case with atomic comparator.
- if (!isNew() && // Not initial value,
- verCheck && // and atomic version check,
- oldConflictVer.dataCenterId() == newConflictVer.dataCenterId() && // and data centers are equal,
- ATOMIC_VER_COMPARATOR.compare(oldConflictVer, newConflictVer, ignoreTime) == 0 && // and both versions are equal,
- cctx.writeThrough() && // and store is enabled,
- primary) // and we are primary.
- {
- CacheObject val = this.val;
+ long updateCntr0 = nextPartCounter();
- if (val == null) {
- assert deletedUnlocked();
+ if (updateCntr != null)
+ updateCntr0 = updateCntr;
- cctx.store().remove(null, key);
- }
- else
- cctx.store().put(null, key, val, ver);
- }
+ onUpdateFinished(updateCntr0);
- return new GridCacheUpdateAtomicResult(false,
- retval ? this.val : null,
- null,
- invokeRes,
- CU.TTL_ETERNAL,
- CU.EXPIRE_TIME_ETERNAL,
- null,
- null,
+ cctx.continuousQueries().onEntryUpdated(
+ key,
+ evtVal,
+ prevVal,
+ isInternal() || !context().userCache(),
+ partition(),
+ primary,
false,
- updateCntr0 == null ? 0 : updateCntr0);
+ updateCntr0,
+ null,
+ topVer);
}
- // Will update something.
- else {
- // Merge is a local update which override passed value bytes.
- if (conflictCtx.isMerge()) {
- writeObj = cctx.toCacheObject(conflictCtx.mergeValue());
-
- conflictVer = null;
- }
- else
- assert conflictCtx.isUseNew();
- // Update value is known at this point, so update operation type.
- op = writeObj != null ? GridCacheOperation.UPDATE : GridCacheOperation.DELETE;
- }
+ return updateRes;
}
- else
- // Nullify conflict version on this update, so that we will use regular version during next updates.
- conflictVer = null;
- }
-
- boolean ignoreTime = cctx.config().getAtomicWriteOrderMode() == CacheAtomicWriteOrderMode.PRIMARY;
-
- // Perform version check only in case there was no explicit conflict resolution.
- if (conflictCtx == null) {
- if (verCheck) {
- if (!isNew() && ATOMIC_VER_COMPARATOR.compare(ver, newVer, ignoreTime) >= 0) {
- if (ATOMIC_VER_COMPARATOR.compare(ver, newVer, ignoreTime) == 0 && cctx.writeThrough() && primary) {
- if (log.isDebugEnabled())
- log.debug("Received entry update with same version as current (will update store) " +
- "[entry=" + this + ", newVer=" + newVer + ']');
- CacheObject val = this.val;
+ case CONFLICT_USE_OLD:
+ case FILTER_FAILED:
+ case INVOKE_NO_OP:
+ case INTERCEPTOR_CANCEL:
+ return updateRes;
+ }
- if (val == null) {
- assert deletedUnlocked();
+ assert updateRes.outcome() == UpdateOutcome.SUCCESS || updateRes.outcome() == UpdateOutcome.REMOVE_NO_VAL;
- cctx.store().remove(null, key);
- }
- else
- cctx.store().put(null, key, val, ver);
- }
- else {
- if (log.isDebugEnabled())
- log.debug("Received entry update with smaller version than current (will ignore) " +
- "[entry=" + this + ", newVer=" + newVer + ']');
- }
+ CacheObject evtOld = null;
- if (!cctx.isNear()) {
- CacheObject evtVal;
+ if (evt && op == TRANSFORM && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) {
+ assert writeObj instanceof EntryProcessor : writeObj;
- if (op == GridCacheOperation.TRANSFORM) {
- EntryProcessor<Object, Object, ?> entryProcessor =
- (EntryProcessor<Object, Object, ?>)writeObj;
+ evtOld = cctx.unwrapTemporary(oldVal);
- CacheInvokeEntry<Object, Object> entry =
- new CacheInvokeEntry<>(key, prevVal, version(), keepBinary, this);
+ Object transformClo = EntryProcessorResourceInjectorProxy.unwrap(writeObj);
- try {
- entryProcessor.process(entry, invokeArgs);
+ cctx.events().addEvent(partition(),
+ key,
+ evtNodeId,
+ null,
+ newVer,
+ EVT_CACHE_OBJECT_READ,
+ evtOld, evtOld != null,
+ evtOld, evtOld != null,
+ subjId,
+ transformClo.getClass().getName(),
+ taskName,
+ keepBinary);
+ }
- evtVal = entry.modified() ?
- cctx.toCacheObject(cctx.unwrapTemporary(entry.getValue())) : prevVal;
- }
- catch (Exception ignore) {
- evtVal = prevVal;
- }
- }
- else
- evtVal = (CacheObject)writeObj;
-
- updateCntr0 = nextPartCounter(topVer);
-
- if (updateCntr != null)
- updateCntr0 = updateCntr;
-
- onUpdateFinished(updateCntr0);
-
- cctx.continuousQueries().onEntryUpdated(
- key,
- evtVal,
- prevVal,
- isInternal() || !context().userCache(),
- partition(),
- primary,
- false,
- updateCntr0,
- null,
- topVer);
- }
+ if (c.op == GridCacheOperation.UPDATE) {
+ updateVal = val;
- return new GridCacheUpdateAtomicResult(false,
- retval ? this.val : null,
- null,
- invokeRes,
- CU.TTL_ETERNAL,
- CU.EXPIRE_TIME_ETERNAL,
- null,
- null,
- false,
- 0);
- }
- }
- else
- assert isNew() || ATOMIC_VER_COMPARATOR.compare(ver, newVer, ignoreTime) <= 0 :
- "Invalid version for inner update [isNew=" + isNew() + ", entry=" + this + ", newVer=" + newVer + ']';
- }
+ assert updateVal != null : c;
- // Apply metrics.
- if (metrics && cctx.cache().configuration().isStatisticsEnabled() && needVal) {
- // PutIfAbsent methods mustn't update hit/miss statistics
- if (op != GridCacheOperation.UPDATE || F.isEmpty(filter) || !cctx.putIfAbsentFilter(filter))
- cctx.cache().metrics0().onRead(oldVal != null);
- }
+ drReplicate(drType, updateVal, updateVer, topVer);
- // Check filter inside of synchronization.
- if (!F.isEmptyOrNulls(filter)) {
- boolean pass = cctx.isAllLocked(this, filter);
+ recordNodeId(affNodeId, topVer);
- if (!pass) {
- if (expiryPlc != null && !readFromStore && hasValueUnlocked() && !cctx.putIfAbsentFilter(filter))
- updateTtl(expiryPlc);
+ if (evt && cctx.events().isRecordable(EVT_CACHE_OBJECT_PUT)) {
+ if (evtOld == null)
+ evtOld = cctx.unwrapTemporary(oldVal);
- return new GridCacheUpdateAtomicResult(false,
- retval ? oldVal : null,
- null,
- invokeRes,
- CU.TTL_ETERNAL,
- CU.EXPIRE_TIME_ETERNAL,
+ cctx.events().addEvent(partition(),
+ key,
+ evtNodeId,
null,
+ newVer,
+ EVT_CACHE_OBJECT_PUT,
+ updateVal,
+ true,
+ evtOld,
+ evtOld != null,
+ subjId,
null,
- false,
- updateCntr0 == null ? 0 : updateCntr0);
+ taskName,
+ keepBinary);
}
}
+ else {
+ assert c.op == GridCacheOperation.DELETE : c.op;
- // Calculate new value in case we met transform.
- if (op == GridCacheOperation.TRANSFORM) {
- assert conflictCtx == null : "Cannot be TRANSFORM here if conflict resolution was performed earlier.";
-
- transformClo = writeObj;
-
- EntryProcessor<Object, Object, ?> entryProcessor = (EntryProcessor<Object, Object, ?>)writeObj;
-
- CacheInvokeEntry<Object, Object> entry = new CacheInvokeEntry(key, oldVal, version(), keepBinary, this);
-
- try {
- Object computed = entryProcessor.process(entry, invokeArgs);
-
- if (entry.modified()) {
- updated0 = cctx.unwrapTemporary(entry.getValue());
- updated = cctx.toCacheObject(updated0);
- }
- else
- updated = oldVal;
-
- key0 = entry.key();
+ clearReaders();
- if (computed != null)
- invokeRes = new IgniteBiTuple(cctx.unwrapTemporary(computed), null);
- }
- catch (Exception e) {
- invokeRes = new IgniteBiTuple(null, e);
+ drReplicate(drType, null, newVer, topVer);
- updated = oldVal;
- }
+ recordNodeId(affNodeId, topVer);
- if (!entry.modified()) {
- if (expiryPlc != null && !readFromStore && hasValueUnlocked())
- updateTtl(expiryPlc);
+ if (evt && cctx.events().isRecordable(EVT_CACHE_OBJECT_REMOVED)) {
+ if (evtOld == null)
+ evtOld = cctx.unwrapTemporary(oldVal);
- return new GridCacheUpdateAtomicResult(false,
- retval ? oldVal : null,
- null,
- invokeRes,
- CU.TTL_ETERNAL,
- CU.EXPIRE_TIME_ETERNAL,
- null,
+ cctx.events().addEvent(partition(),
+ key,
+ evtNodeId,
+ null, newVer,
+ EVT_CACHE_OBJECT_REMOVED,
+ null, false,
+ evtOld, evtOld != null,
+ subjId,
null,
- false,
- updateCntr0 == null ? 0 : updateCntr0);
+ taskName,
+ keepBinary);
}
}
- else
- updated = (CacheObject)writeObj;
-
- op = updated == null ? GridCacheOperation.DELETE : GridCacheOperation.UPDATE;
-
- assert op == GridCacheOperation.UPDATE || (op == GridCacheOperation.DELETE && updated == null);
-
- boolean hadVal = hasValueUnlocked();
-
- // Incorporate conflict version into new version if needed.
- if (conflictVer != null && conflictVer != newVer)
- newVer = new GridCacheVersionEx(newVer.topologyVersion(),
- newVer.globalTime(),
- newVer.order(),
- newVer.nodeOrder(),
- newVer.dataCenterId(),
- conflictVer);
- if (op == GridCacheOperation.UPDATE) {
- // Conflict context is null if there were no explicit conflict resolution.
- if (conflictCtx == null) {
- // Calculate TTL and expire time for local update.
- if (explicitTtl != CU.TTL_NOT_CHANGED) {
- // If conflict existed, expire time must be explicit.
- assert conflictVer == null || explicitExpireTime != CU.EXPIRE_TIME_CALCULATE;
-
- newSysTtl = newTtl = explicitTtl;
- newSysExpireTime = explicitExpireTime;
-
- newExpireTime = explicitExpireTime != CU.EXPIRE_TIME_CALCULATE ?
- explicitExpireTime : CU.toExpireTime(explicitTtl);
- }
- else {
- newSysTtl = expiryPlc == null ? CU.TTL_NOT_CHANGED :
- hadVal ? expiryPlc.forUpdate() : expiryPlc.forCreate();
+ if (updateRes.success())
+ updateMetrics(c.op, metrics);
- if (newSysTtl == CU.TTL_NOT_CHANGED) {
- newSysExpireTime = CU.EXPIRE_TIME_CALCULATE;
- newTtl = ttlExtras();
- newExpireTime = expireTimeExtras();
- }
- else if (newSysTtl == CU.TTL_ZERO) {
- op = GridCacheOperation.DELETE;
+ // Continuous query filter should be perform under lock.
+ if (lsnrs != null) {
+ CacheObject evtVal = cctx.unwrapTemporary(updateVal);
+ CacheObject evtOldVal = cctx.unwrapTemporary(oldVal);
- newSysTtl = CU.TTL_NOT_CHANGED;
- newSysExpireTime = CU.EXPIRE_TIME_CALCULATE;
+ cctx.continuousQueries().onEntryUpdated(lsnrs,
+ key,
+ evtVal,
+ evtOldVal,
+ internal,
+ partition(),
+ primary,
+ false,
+ c.updateRes.updateCounter(),
+ fut,
+ topVer);
+ }
- newTtl = CU.TTL_ETERNAL;
- newExpireTime = CU.EXPIRE_TIME_ETERNAL;
+ cctx.dataStructures().onEntryUpdated(key, c.op == GridCacheOperation.DELETE, keepBinary);
- updated = null;
- }
- else {
- newSysExpireTime = CU.EXPIRE_TIME_CALCULATE;
- newTtl = newSysTtl;
- newExpireTime = CU.toExpireTime(newTtl);
- }
- }
+ if (intercept) {
+ if (c.op == GridCacheOperation.UPDATE) {
+ cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry(
+ cctx,
+ key,
+ null,
+ updateVal,
+ null,
+ keepBinary,
+ c.updateRes.updateCounter()));
}
else {
- newSysTtl = newTtl = conflictCtx.ttl();
- newSysExpireTime = newExpireTime = conflictCtx.expireTime();
+ assert c.op == GridCacheOperation.DELETE : c.op;
+
+ cctx.config().getInterceptor().onAfterRemove(new CacheLazyEntry(
+ cctx,
+ key,
+ null,
+ oldVal,
+ null,
+ keepBinary,
+ c.updateRes.updateCounter()));
}
}
- else {
- assert op == GridCacheOperation.DELETE;
-
- newSysTtl = CU.TTL_NOT_CHANGED;
- newSysExpireTime = CU.EXPIRE_TIME_CALCULATE;
+ }
- newTtl = CU.TTL_ETERNAL;
- newExpireTime = CU.EXPIRE_TIME_ETERNAL;
- }
+ onUpdateFinished(c.updateRes.updateCounter());
- // TTL and expire time must be resolved at this point.
- assert newTtl != CU.TTL_NOT_CHANGED && newTtl != CU.TTL_ZERO && newTtl >= 0;
- assert newExpireTime != CU.EXPIRE_TIME_CALCULATE && newExpireTime >= 0;
+ return c.updateRes;
+ }
- IgniteBiTuple<Boolean, Object> interceptRes = null;
+ /**
+ * @param val Value.
+ * @param cacheObj Cache object.
+ * @param keepBinary Keep binary flag.
+ * @param cpy Copy flag.
+ * @return Cache object value.
+ */
+ @Nullable private Object value(@Nullable Object val, @Nullable CacheObject cacheObj, boolean keepBinary, boolean cpy) {
+ if (val != null)
+ return val;
- // Actual update.
- if (op == GridCacheOperation.UPDATE) {
- if (log.isTraceEnabled()) {
- log.trace("innerUpdate [key=" + key +
- ", entry=" + System.identityHashCode(this) + ']');
- }
+ return cctx.unwrapBinaryIfNeeded(cacheObj, keepBinary, cpy);
+ }
- if (intercept) {
- updated0 = value(updated0, updated, keepBinary, false);
+ /**
+ * @param expiry Expiration policy.
+ * @return Tuple holding initial TTL and expire time with the given expiry.
+ */
+ private static IgniteBiTuple<Long, Long> initialTtlAndExpireTime(IgniteCacheExpiryPolicy expiry) {
+ assert expiry != null;
- Object interceptorVal = cctx.config().getInterceptor()
- .onBeforePut(new CacheLazyEntry(cctx, key, key0, oldVal, old0, keepBinary), updated0);
-
- if (interceptorVal == null)
- return new GridCacheUpdateAtomicResult(false,
- retval ? oldVal : null,
- null,
- invokeRes,
- CU.TTL_ETERNAL,
- CU.EXPIRE_TIME_ETERNAL,
- null,
- null,
- false,
- updateCntr0 == null ? 0 : updateCntr0);
- else if (interceptorVal != updated0) {
- updated0 = cctx.unwrapTemporary(interceptorVal);
-
- updated = cctx.toCacheObject(updated0);
- }
- }
-
- // Try write-through.
- if (writeThrough)
- // Must persist inside synchronization in non-tx mode.
- cctx.store().put(null, key, updated, newVer);
-
- if (!hadVal) {
- boolean new0 = isNew();
-
- assert deletedUnlocked() || new0 || isInternal(): "Invalid entry [entry=" + this + ", locNodeId=" +
- cctx.localNodeId() + ']';
-
- if (!new0 && !isInternal())
- deletedUnlocked(false);
- }
- else {
- assert !deletedUnlocked() : "Invalid entry [entry=" + this +
- ", locNodeId=" + cctx.localNodeId() + ']';
-
- // Do not change size.
- }
-
- updated = cctx.kernalContext().cacheObjects().prepareForCache(updated, cctx);
-
- updateCntr0 = nextPartCounter(topVer);
-
- if (updateCntr != null)
- updateCntr0 = updateCntr;
-
- logUpdate(op, updated, newVer, newExpireTime, updateCntr0);
-
- storeValue(updated, newExpireTime, newVer, oldRow);
-
- update(updated, newExpireTime, newTtl, newVer, true);
-
- drReplicate(drType, updated, newVer, topVer);
-
- recordNodeId(affNodeId, topVer);
-
- if (evt) {
- CacheObject evtOld = null;
-
- if (transformClo != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) {
- evtOld = cctx.unwrapTemporary(oldVal);
-
- transformClo = EntryProcessorResourceInjectorProxy.unwrap(transformClo);
-
- cctx.events().addEvent(partition(), key, evtNodeId, null,
- newVer, EVT_CACHE_OBJECT_READ, evtOld, evtOld != null || hadVal, evtOld,
- evtOld != null || hadVal, subjId, transformClo.getClass().getName(), taskName,
- keepBinary);
- }
-
- if (newVer != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_PUT)) {
- if (evtOld == null)
- evtOld = cctx.unwrapTemporary(oldVal);
-
- cctx.events().addEvent(partition(), key, evtNodeId, null,
- newVer, EVT_CACHE_OBJECT_PUT, updated, updated != null, evtOld,
- evtOld != null || hadVal, subjId, null, taskName, keepBinary);
- }
- }
- }
- else {
- if (intercept) {
- interceptRes = cctx.config().getInterceptor().onBeforeRemove(new CacheLazyEntry(cctx, key, key0,
- oldVal, old0, keepBinary, updateCntr0));
-
- if (cctx.cancelRemove(interceptRes))
- return new GridCacheUpdateAtomicResult(false,
- cctx.toCacheObject(cctx.unwrapTemporary(interceptRes.get2())),
- null,
- invokeRes,
- CU.TTL_ETERNAL,
- CU.EXPIRE_TIME_ETERNAL,
- null,
- null,
- false,
- updateCntr0 == null ? 0 : updateCntr0);
- }
-
- if (writeThrough)
- // Must persist inside synchronization in non-tx mode.
- cctx.store().remove(null, key);
-
- updateCntr0 = nextPartCounter(topVer);
-
- if (updateCntr != null)
- updateCntr0 = updateCntr;
-
- logUpdate(op, null, newVer, 0, updateCntr0);
-
- removeValue();
-
- if (hadVal) {
- assert !deletedUnlocked();
-
- if (!isInternal())
- deletedUnlocked(true);
- }
- else {
- boolean new0 = isNew();
-
- assert deletedUnlocked() || new0 || isInternal() : "Invalid entry [entry=" + this + ", locNodeId=" +
- cctx.localNodeId() + ']';
-
- if (new0) {
- if (!isInternal())
- deletedUnlocked(true);
- }
- }
-
- enqueueVer = newVer;
-
- // Clear value on backup. Entry will be removed from cache when it got evicted from queue.
- update(null, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, newVer, true);
-
- assert newSysTtl == CU.TTL_NOT_CHANGED;
- assert newSysExpireTime == CU.EXPIRE_TIME_CALCULATE;
-
- clearReaders();
-
- recordNodeId(affNodeId, topVer);
-
- drReplicate(drType, null, newVer, topVer);
-
- if (evt) {
- CacheObject evtOld = null;
-
- if (transformClo != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) {
- evtOld = cctx.unwrapTemporary(oldVal);
-
- transformClo = EntryProcessorResourceInjectorProxy.unwrap(transformClo);
-
- cctx.events().addEvent(partition(), key, evtNodeId, null,
- newVer, EVT_CACHE_OBJECT_READ, evtOld, evtOld != null || hadVal, evtOld,
- evtOld != null || hadVal, subjId, transformClo.getClass().getName(), taskName,
- keepBinary);
- }
-
- if (newVer != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_REMOVED)) {
- if (evtOld == null)
- evtOld = cctx.unwrapTemporary(oldVal);
-
- cctx.events().addEvent(partition(), key, evtNodeId, null, newVer,
- EVT_CACHE_OBJECT_REMOVED, null, false, evtOld, evtOld != null || hadVal,
- subjId, null, taskName, keepBinary);
- }
- }
-
- res = hadVal;
- }
-
- if (res)
- updateMetrics(op, metrics);
-
- // Continuous query filter should be perform under lock.
- if (lsnrs != null) {
- CacheObject evtVal = cctx.unwrapTemporary(updated);
- CacheObject evtOldVal = cctx.unwrapTemporary(oldVal);
-
- cctx.continuousQueries().onEntryUpdated(lsnrs, key, evtVal, evtOldVal, internal,
- partition(), primary, false, updateCntr0, fut, topVer);
- }
-
- cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE, keepBinary);
-
- if (intercept) {
- if (op == GridCacheOperation.UPDATE)
- cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry(
- cctx,
- key,
- key0,
- updated,
- updated0,
- keepBinary,
- updateCntr0));
- else
- cctx.config().getInterceptor().onAfterRemove(new CacheLazyEntry(
- cctx,
- key,
- key0,
- oldVal,
- old0,
- keepBinary,
- updateCntr0));
-
- if (interceptRes != null)
- oldVal = cctx.toCacheObject(cctx.unwrapTemporary(interceptRes.get2()));
- }
- }
-
- onUpdateFinished(updateCntr0);
-
- if (log.isDebugEnabled())
- log.debug("Updated cache entry [val=" + val + ", old=" + oldVal + ", entry=" + this + ']');
-
- return new GridCacheUpdateAtomicResult(res,
- oldVal,
- updated,
- invokeRes,
- newSysTtl,
- newSysExpireTime,
- enqueueVer,
- conflictCtx,
- true,
- updateCntr0 == null ? 0 : updateCntr0);
- }
-
- /**
- * @param val Value.
- * @param cacheObj Cache object.
- * @param keepBinary Keep binary flag.
- * @param cpy Copy flag.
- * @return Cache object value.
- */
- @Nullable private Object value(@Nullable Object val, @Nullable CacheObject cacheObj, boolean keepBinary, boolean cpy) {
- if (val != null)
- return val;
-
- return cctx.unwrapBinaryIfNeeded(cacheObj, keepBinary, cpy);
- }
-
- /**
- * @param expiry Expiration policy.
- * @return Tuple holding initial TTL and expire time with the given expiry.
- */
- private static IgniteBiTuple<Long, Long> initialTtlAndExpireTime(IgniteCacheExpiryPolicy expiry) {
- assert expiry != null;
-
- long initTtl = expiry.forCreate();
- long initExpireTime;
+ long initTtl = expiry.forCreate();
+ long initExpireTime;
if (initTtl == CU.TTL_ZERO) {
initTtl = CU.TTL_MINIMUM;
@@ -2294,8 +1870,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
* @param expireTime Explicit expire time.
* @return Result.
*/
- private GridTuple3<Long, Long, Boolean> ttlAndExpireTime(IgniteCacheExpiryPolicy expiry, long ttl, long expireTime)
- throws GridCacheEntryRemovedException {
+ private GridTuple3<Long, Long, Boolean> ttlAndExpireTime(IgniteCacheExpiryPolicy expiry, long ttl, long expireTime) {
+ assert !obsolete();
+
boolean rmv = false;
// 1. If TTL is not changed, then calculate it based on expiry.
@@ -2313,7 +1890,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
// 3. If TTL is still not changed, then either use old entry TTL or set it to "ETERNAL".
if (ttl == CU.TTL_NOT_CHANGED) {
- if (isNew())
+ if (isStartVersion())
ttl = CU.TTL_ETERNAL;
else {
ttl = ttlExtras();
@@ -3027,6 +2604,13 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
}
/**
+ * @return Update counter.
+ */
+ protected long nextPartCounter() {
+ return 0;
+ }
+
+ /**
* @param topVer Topology version.
* @return Update counter.
*/
@@ -3566,13 +3150,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
assert Thread.holdsLock(this);
assert val != null : "null values in update for key: " + key;
- cctx.offheap().update(key,
- val,
- ver,
- expireTime,
- partition(),
- localPartition(),
- oldRow);
+ cctx.offheap().invoke(key, localPartition(), new UpdateClosure(this, val, ver, expireTime));
}
/**
@@ -4177,6 +3755,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
/**
* @param key Key.
+ * @param keepBinary Keep binary flag.
*/
private LazyValueEntry(KeyCacheObject key, boolean keepBinary) {
this.key = key;
@@ -4223,4 +3802,854 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
return "IteratorEntry [key=" + key + ']';
}
}
+
+ /**
+ *
+ */
+ private static class UpdateClosure implements IgniteCacheOffheapManager.OffheapInvokeClosure {
+ /** */
+ private final GridCacheMapEntry entry;
+
+ /** */
+ private final CacheObject val;
+
+ /** */
+ private final GridCacheVersion ver;
+
+ /** */
+ private final long expireTime;
+
+ /** */
+ private CacheDataRow newRow;
+
+ /** */
+ private CacheDataRow oldRow;
+
+ /** */
+ private IgniteTree.OperationType treeOp = IgniteTree.OperationType.PUT;
+
+ /**
+ * @param entry Entry.
+ * @param val New value.
+ * @param ver New version.
+ * @param expireTime New expire time.
+ */
+ UpdateClosure(GridCacheMapEntry entry, CacheObject val, GridCacheVersion ver, long expireTime) {
+ this.entry = entry;
+ this.val = val;
+ this.ver = ver;
+ this.expireTime = expireTime;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void call(@Nullable CacheDataRow oldRow) throws IgniteCheckedException {
+ this.oldRow = oldRow;
+
+ if (oldRow != null)
+ oldRow.key(entry.key);
+
+ newRow = entry.cctx.offheap().dataStore(entry.localPartition()).createRow(entry.key,
+ val,
+ ver,
+ expireTime,
+ oldRow);
+
+ treeOp = oldRow != null && oldRow.link() == newRow.link() ?
+ IgniteTree.OperationType.NOOP : IgniteTree.OperationType.PUT;
+ }
+
+ /** {@inheritDoc} */
+ @Override public CacheDataRow newRow() {
+ return newRow;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteTree.OperationType operationType() {
+ return treeOp;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public CacheDataRow oldRow() {
+ return oldRow;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class AtomicCacheUpdateClosure implements IgniteCacheOffheapManager.OffheapInvokeClosure {
+ /** */
+ private final GridCacheMapEntry entry;
+
+ /** */
+ private GridCacheVersion newVer;
+
+ /** */
+ private GridCacheOperation op;
+
+ /** */
+ private Object writeObj;
+
+ /** */
+ private Object[] invokeArgs;
+
+ /** */
+ private final boolean readThrough;
+
+ /** */
+ private final boolean writeThrough;
+
+ /** */
+ private final boolean keepBinary;
+
+ /** */
+ private final IgniteCacheExpiryPolicy expiryPlc;
+
+ /** */
+ private final boolean primary;
+
+ /** */
+ private final boolean verCheck;
+
+ /** */
+ private final CacheEntryPredicate[] filter;
+
+ /** */
+ private final long explicitTtl;
+
+ /** */
+ private final long explicitExpireTime;
+
+ /** */
+ private GridCacheVersion conflictVer;
+
+ /** */
+ private final boolean conflictResolve;
+
+ /** */
+ private final boolean intercept;
+
+ /** */
+ private final Long updateCntr;
+
+ /** */
+ private GridCacheUpdateAtomicResult updateRes;
+
+ /** */
+ private IgniteTree.OperationType treeOp;
+
+ /** */
+ private CacheDataRow newRow;
+
+ /** */
+ private CacheDataRow oldRow;
+
+ AtomicCacheUpdateClosure(GridCacheMapEntry entry,
+ GridCacheVersion newVer,
+ GridCacheOperation op,
+ Object writeObj,
+ Object[] invokeArgs,
+ boolean readThrough,
+ boolean writeThrough,
+ boolean keepBinary,
+ @Nullable IgniteCacheExpiryPolicy expiryPlc,
+ boolean primary,
+ boolean verCheck,
+ @Nullable CacheEntryPredicate[] filter,
+ long explicitTtl,
+ long explicitExpireTime,
+ @Nullable GridCacheVersion conflictVer,
+ boolean conflictResolve,
+ boolean intercept,
+ @Nullable Long updateCntr) {
+ assert op == UPDATE || op == DELETE || op == TRANSFORM : op;
+
+ this.entry = entry;
+ this.newVer = newVer;
+ this.op = op;
+ this.writeObj = writeObj;
+ this.invokeArgs = invokeArgs;
+ this.readThrough = readThrough;
+ this.writeThrough = writeThrough;
+ this.keepBinary = keepBinary;
+ this.expiryPlc = expiryPlc;
+ this.primary = primary;
+ this.verCheck = verCheck;
+ this.filter = filter;
+ this.explicitTtl = explicitTtl;
+ this.explicitExpireTime = explicitExpireTime;
+ this.conflictVer = conflictVer;
+ this.conflictResolve = conflictResolve;
+ this.intercept = intercept;
+ this.updateCntr = updateCntr;
+
+ switch (op) {
+ case UPDATE:
+ treeOp = IgniteTree.OperationType.PUT;
+
+ break;
+
+ case DELETE:
+ treeOp = IgniteTree.OperationType.REMOVE;
+
+ break;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public CacheDataRow oldRow() {
+ return oldRow;
+ }
+
+ /** {@inheritDoc} */
+ @Override public CacheDataRow newRow() {
+ return newRow;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteTree.OperationType operationType() {
+ return treeOp;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void call(@Nullable CacheDataRow oldRow) throws IgniteCheckedException {
+ assert entry.isNear() || oldRow == null || oldRow.link() != 0 : oldRow;
+
+ if (oldRow != null)
+ oldRow.key(entry.key());
+
+ this.oldRow = oldRow;
+
+ GridCacheContext cctx = entry.context();
+
+ CacheObject oldVal;
+ CacheObject storeLoadedVal = null;
+
+ if (oldRow != null) {
+ oldVal = oldRow.value();
+
+ entry.update(oldVal, oldRow.expireTime(), 0, oldRow.version(), false);
+ }
+ else
+ oldVal = null;
+
+ if (oldVal == null && readThrough) {
+ storeLoadedVal = cctx.toCacheObject(cctx.store().load(null, entry.key));
+
+ if (storeLoadedVal != null) {
+ oldVal = cctx.kernalContext().cacheObjects().prepareForCache(storeLoadedVal, cctx);
+
+ entry.val = oldVal;
+
+ if (entry.deletedUnlocked())
+ entry.deletedUnlocked(false);
+ }
+ }
+
+ CacheInvokeEntry<Object, Object> invokeEntry = null;
+ IgniteBiTuple<Object, Exception> invokeRes = null;
+
+ boolean invoke = op == TRANSFORM;
+
+ if (invoke) {
+ invokeEntry = new CacheInvokeEntry<>(entry.key, oldVal, entry.ver, keepBinary, entry);
+
+ invokeRes = runEntryProcessor(invokeEntry);
+
+ op = writeObj == null ? DELETE : UPDATE;
+ }
+
+ CacheObject newVal = (CacheObject)writeObj;
+
+ GridCacheVersionConflictContext<?, ?> conflictCtx = null;
+
+ if (conflictResolve) {
+ conflictCtx = resolveConflict(newVal, invokeRes);
+
+ if (updateRes != null) {
+ assert conflictCtx != null && conflictCtx.isUseOld() : conflictCtx;
+ assert treeOp == IgniteTree.OperationType.NOOP : treeOp;
+
+ return;
+ }
+ }
+
+ if (conflictCtx == null) {
+ // Perform version check only in case there was no explicit conflict resolution.
+ versionCheck(invokeRes);
+
+ if (updateRes != null) {
+ assert treeOp == IgniteTree.OperationType.NOOP : treeOp;
+
+ return;
+ }
+ }
+
+ if (!F.isEmptyOrNulls(filter)) {
+ boolean pass = cctx.isAllLocked(entry, filter);
+
+ if (!pass) {
+ initResultOnCancelUpdate(storeLoadedVal, !cctx.putIfAbsentFilter(filter));
+
+ updateRes = new GridCacheUpdateAtomicResult(UpdateOutcome.FILTER_FAILED,
+ oldVal,
+ null,
+ invokeRes,
+ CU.TTL_ETERNAL,
+ CU.EXPIRE_TIME_ETERNAL,
+ null,
+ null,
+ 0);
+
+ return;
+ }
+ }
+
+ if (invoke) {
+ if (!invokeEntry.modified()) {
+ initResultOnCancelUpdate(storeLoadedVal, true);
+
+ updateRes = new GridCacheUpdateAtomicResult(UpdateOutcome.INVOKE_NO_OP,
+ oldVal,
+ null,
+ invokeRes,
+ CU.TTL_ETERNAL,
+ CU.EXPIRE_TIME_ETERNAL,
+ null,
+ null,
+ 0);
+
+ return;
+ }
+
+ op = writeObj == null ? DELETE : UPDATE;
+ }
+
+ // Incorporate conflict version into new version if needed.
+ if (conflictVer != null && conflictVer != newVer) {
+ newVer = new GridCacheVersionEx(newVer.topologyVersion(),
+ newVer.globalTime(),
+ newVer.order(),
+ newVer.nodeOrder(),
+ newVer.dataCenterId(),
+ conflictVer);
+ }
+
+ if (op == UPDATE) {
+ assert writeObj != null;
+
+ update(conflictCtx, invokeRes, storeLoadedVal != null);
+ }
+ else {
+ assert op == DELETE && writeObj == null : op;
+
+ remove(conflictCtx, invokeRes, storeLoadedVal != null);
+ }
+
+ assert updateRes != null && treeOp != null;
+ }
+
+ /**
+ * @param storeLoadedVal Value loaded from store.
+ * @param updateExpireTime {@code True} if need update expire time.
+ * @throws IgniteCheckedException If failed.
+ */
+ private void initResultOnCancelUpdate(@Nullable CacheObject storeLoadedVal, boolean updateExpireTime)
+ throws IgniteCheckedException {
+ boolean needUpdate = false;
+
+ if (storeLoadedVal != null) {
+ long initTtl;
+ long initExpireTime;
+
+ if (expiryPlc != null) {
+ IgniteBiTuple<Long, Long> initTtlAndExpireTime = initialTtlAndExpireTime(expiryPlc);
+
+ initTtl = initTtlAndExpireTime.get1();
+ initExpireTime = initTtlAndExpireTime.get2();
+ }
+ else {
+ initTtl = CU.TTL_ETERNAL;
+ initExpireTime = CU.EXPIRE_TIME_ETERNAL;
+ }
+
+ entry.update(storeLoadedVal, initExpireTime, initTtl, entry.ver, true);
+
+ needUpdate = true;
+ }
+ else if (updateExpireTime && expiryPlc != null && entry.val != null){
+ long ttl = expiryPlc.forAccess();
+
+ if (ttl != CU.TTL_NOT_CHANGED) {
+ long expireTime;
+
+ if (ttl == CU.TTL_ZERO) {
+ ttl = CU.TTL_MINIMUM;
+ expireTime = CU.expireTimeInPast();
+ }
+ else
+ expireTime = CU.toExpireTime(ttl);
+
+ if (entry.expireTimeExtras() != expireTime) {
+ entry.update(entry.val, expireTime, ttl, entry.ver, true);
+
+ expiryPlc.ttlUpdated(entry.key, entry.ver, null);
+
+ needUpdate = true;
+ }
+ }
+ }
+
+ if (needUpdate) {
+ newRow = entry.localPartition().dataStore().createRow(entry.key,
+ storeLoadedVal,
+ newVer,
+ entry.expireTimeExtras(),
+ oldRow);
+
+ treeOp = IgniteTree.OperationType.PUT;
+ }
+ else
+ treeOp = IgniteTree.OperationType.NOOP;
+ }
+
+ /**
+ * @param conflictCtx Conflict context.
+ * @param invokeRes Entry processor result (for invoke operation).
+ * @param readFromStore {@code True} if initial entry value was {@code null} and it was read from store.
+ * @throws IgniteCheckedException If failed.
+ */
+ private void update(@Nullable GridCacheVersionConflictContext<?, ?> conflictCtx,
+ @Nullable IgniteBiTuple<Object, Exception> invokeRes,
+ boolean readFromStore)
+ throws IgniteCheckedException
+ {
+ GridCacheContext cctx = entry.context();
+
+ final CacheObject oldVal = entry.val;
+ CacheObject updated = (CacheObject)writeObj;
+
+ long newSysTtl;
+ long newSysExpireTime;
+
+ long newTtl;
+ long newExpireTime;
+
+ // Conflict context is null if there were no explicit conflict resolution.
+ if (conflictCtx == null) {
+ // Calculate TTL and expire time for local update.
+ if (explicitTtl != CU.TTL_NOT_CHANGED) {
+ // If conflict existed, expire time must be explicit.
+ assert conflictVer == null || explicitExpireTime != CU.EXPIRE_TIME_CALCULATE;
+
+ newSysTtl = newTtl = explicitTtl;
+ newSysExpireTime = explicitExpireTime;
+
+ newExpireTime = explicitExpireTime != CU.EXPIRE_TIME_CALCULATE ?
+ explicitExpireTime : CU.toExpireTime(explicitTtl);
+ }
+ else {
+ newSysTtl = expiryPlc == null ? CU.TTL_NOT_CHANGED :
+ entry.val != null ? expiryPlc.forUpdate() : expiryPlc.forCreate();
+
+ if (newSysTtl == CU.TTL_NOT_CHANGED) {
+ newSysExpireTime = CU.EXPIRE_TIME_CALCULATE;
+ newTtl = entry.ttlExtras();
+ newExpireTime = entry.expireTimeExtras();
+ }
+ else if (newSysTtl == CU.TTL_ZERO) {
+ op = GridCacheOperation.DELETE;
+
+ writeObj = null;
+
+ remove(conflictCtx, invokeRes, readFromStore);
+
+ return;
+ }
+ else {
+ newSysExpireTime = CU.EXPIRE_TIME_CALCULATE;
+ newTtl = newSysTtl;
+ newExpireTime = CU.toExpireTime(newTtl);
+ }
+ }
+ }
+ else {
+ newSysTtl = newTtl = conflictCtx.ttl();
+ newSysExpireTime = newExpireTime = conflictCtx.expireTime();
+ }
+
+ if (intercept) {
+ Object updated0 = cctx.unwrapBinaryIfNeeded(updated, keepBinary, false);
+
+ CacheLazyEntry<Object, Object> interceptEntry = new CacheLazyEntry<>(cctx,
+ entry.key,
+ null,
+ oldVal,
+ null,
+ keepBinary);
+
+ Object interceptorVal = cctx.config().getInterceptor().onBeforePut(interceptEntry, updated0);
+
+ if (interceptorVal == null) {
+ treeOp = IgniteTree.OperationType.NOOP;
+
+ updateRes = new GridCacheUpdateAtomicResult(UpdateOutcome.INTERCEPTOR_CANCEL,
+ oldVal,
+ null,
+ invokeRes,
+ CU.TTL_ETERNAL,
+ CU.EXPIRE_TIME_ETERNAL,
+ null,
+ null,
+ 0);
+
+ return;
+ }
+ else if (interceptorVal != updated0) {
+ updated0 = cctx.unwrapTemporary(interceptorVal);
+
+ updated = cctx.toCacheObject(updated0);
+ }
+ }
+
+ updated = cctx.kernalContext().cacheObjects().prepareForCache(updated, cctx);
+
+ if (writeThrough)
+ // Must persist inside synchronization in non-tx mode.
+ cctx.store().put(null, entry.key, updated, newVer);
+
+ if (entry.val == null) {
+ boolean new0 = entry.isStartVersion();
+
+ assert entry.deletedUnlocked() || new0 || entry.isInternal(): "Invalid entry [entry=" + entry +
+ ", locNodeId=" + cctx.localNodeId() + ']';
+
+ if (!new0 && !entry.isInternal())
+ entry.deletedUnlocked(false);
+ }
+ else {
+ assert !entry.deletedUnlocked() : "Invalid entry [entry=" + this +
+ ", locNodeId=" + cctx.localNodeId() + ']';
+ }
+
+ long updateCntr0 = entry.nextPartCounter();
+
+ if (updateCntr != null)
+ updateCntr0 = updateCntr;
+
+ entry.logUpdate(op, updated, newVer, newExpireTime, updateCntr0);
+
+ if (!entry.isNear()) {
+ newRow = entry.localPartition().dataStore().createRow(entry.key,
+ updated,
+ newVer,
+ newExpireTime,
+ oldRow);
+
+ treeOp = oldRow != null && oldRow.link() == newRow.link() ?
+ IgniteTree.OperationType.NOOP : IgniteTree.OperationType.PUT;
+ }
+ else
+ treeOp = IgniteTree.OperationType.PUT;
+
+ entry.update(updated, newExpireTime, newTtl, newVer, true);
+
+ updateRes = new GridCacheUpdateAtomicResult(UpdateOutcome.SUCCESS,
+ oldVal,
+ updated,
+ invokeRes,
+ newSysTtl,
+ newSysExpireTime,
+ null,
+ conflictCtx,
+ updateCntr0);
+ }
+
+ /**
+ * @param conflictCtx Conflict context.
+ * @param invokeRes Entry processor result (for invoke operation).
+ * @param readFromStore {@code True} if initial entry value was {@code null} and it was read from store.
+ * @throws IgniteCheckedException If failed.
+ */
+ @SuppressWarnings("unchecked")
+ private void remove(@Nullable GridCacheVersionConflictContext<?, ?> conflictCtx,
+ @Nullable IgniteBiTuple<Object, Exception> invokeRes,
+ boolean readFromStore)
+ throws IgniteCheckedException
+ {
+ GridCacheContext cctx = entry.context();
+
+ CacheObject oldVal = entry.val;
+
+ IgniteBiTuple<Boolean, Object> interceptRes = null;
+
+ if (intercept) {
+ CacheLazyEntry<Object, Object> intercepEntry = new CacheLazyEntry<>(cctx,
+ entry.key,
+ null,
+ oldVal,
+ null,
+ keepBinary);
+
+ interceptRes = cctx.config().getInterceptor().onBeforeRemove(intercepEntry);
+
+ if (cctx.cancelRemove(interceptRes)) {
+ treeOp = IgniteTree.OperationType.NOOP;
+
+ updateRes = new GridCacheUpdateAtomicResult(UpdateOutcome.INTERCEPTOR_CANCEL,
+ cctx.toCacheObject(cctx.unwrapTemporary(interceptRes.get2())),
+ null,
+ invokeRes,
+ CU.TTL_ETERNAL,
+ CU.EXPIRE_TIME_ETERNAL,
+ null,
+ null,
+ 0);
+
+ return;
+ }
+ }
+
+ if (writeThrough)
+ // Must persist inside synchronization in non-tx mode.
+ cctx.store().remove(null, entry.key);
+
+ long updateCntr0 = entry.nextPartCounter();
+
+ if (updateCntr != null)
+ updateCntr0 = updateCntr;
+
+ if (oldVal != null) {
+ assert !entry.deletedUnlocked();
+
+ if (!entry.isInternal())
+ entry.deletedUnlocked(true);
+ }
+ else {
+ boolean new0 = entry.isStartVersion();
+
+ assert entry.deletedUnlocked() || new0 || entry.isInternal() : "Invalid entry [entry=" + this +
+ ", locNodeId=" + cctx.localNodeId() + ']';
+
+ if (new0) {
+ if (!entry.isInternal())
+ entry.deletedUnlocked(true);
+ }
+ }
+
+ GridCacheVersion enqueueVer = newVer;
+
+ entry.update(null, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, newVer, true);
+
+ treeOp = (oldVal == null || readFromStore) ? IgniteTree.OperationType.NOOP :
+ IgniteTree.OperationType.REMOVE;
+
+ UpdateOutcome outcome = oldVal != null ? UpdateOutcome.SUCCESS : UpdateOutcome.REMOVE_NO_VAL;
+
+ if (interceptRes != null)
+ oldVal = cctx.toCacheObject(cctx.unwrapTemporary(interceptRes.get2()));
+
+ updateRes = new GridCacheUpdateAtomicResult(outcome,
+ oldVal,
+ null,
+ invokeRes,
+ CU.TTL_NOT_CHANGED,
+ CU.EXPIRE_TIME_CALCULATE,
+ enqueueVer,
+ conflictCtx,
+ updateCntr0);
+ }
+
+ /**
+ * @param newVal New entry value.
+ * @param invokeRes Entry processor result (for invoke operation).
+ * @return Conflict context.
+ * @throws IgniteCheckedException If failed.
+ */
+ private GridCacheVersionConflictContext<?, ?> resolveConflict(
+ CacheObject newVal,
+ @Nullable IgniteBiTuple<Object, Exception> invokeRes)
+ throws IgniteCheckedException
+ {
+ GridCacheContext cctx = entry.context();
+
+ // Cache is conflict-enabled.
+ if (cctx.conflictNeedResolve()) {
+ GridCacheVersion oldConflictVer = entry.ver.conflictVersion();
+
+ // Prepare old and new entries for conflict resolution.
+ GridCacheVersionedEntryEx oldEntry = new GridCacheLazyPlainVersionedEntry<>(cctx,
+ entry.key,
+ entry.val,
+ entry.ttlExtras(),
+ entry.expireTimeExtras(),
+ entry.ver.conflictVersion(),
+ entry.isStartVersion(),
+ keepBinary);
+
+ GridTuple3<Long, Long, Boolean> expiration = entry.ttlAndExpireTime(expiryPlc,
+ explicitTtl,
+ explicitExpireTime);
+
+ GridCacheVersionedEntryEx newEntry = new GridCacheLazyPlainVersionedEntry<>(
+ cctx,
+ entry.key,
+ newVal,
+ expiration.get1(),
+ expiration.get2(),
+ conflictVer != null ? conflictVer : newVer,
+ keepBinary);
+
+ // Resolve conflict.
+ GridCacheVersionConflictContext<?, ?> conflictCtx = cctx.conflictResolve(oldEntry, newEntry, verCheck);
+
+ assert conflictCtx != null;
+
+ boolean ignoreTime = cctx.config().getAtomicWriteOrderMode() == CacheAtomicWriteOrderMode.PRIMARY;
+
+ // Use old value?
+ if (conflictCtx.isUseOld()) {
+ GridCacheVersion newConflictVer = conflictVer != null ? conflictVer : newVer;
+
+ // Handle special case with atomic comparator.
+ if (!entry.isStartVersion() && // Not initial value,
+ verCheck && // and atomic version check,
+ oldConflictVer.dataCenterId() == newConflictVer.dataCenterId() && // and data centers are equal,
+ ATOMIC_VER_COMPARATOR.compare(oldConflictVer, newConflictVer, ignoreTime) == 0 && // and both versions are equal,
+ cctx.writeThrough() && // and store is enabled,
+ primary) // and we are primary.
+ {
+ CacheObject val = entry.val;
+
+ if (val == null) {
+ assert entry.deletedUnlocked();
+
+ cctx.store().remove(null, entry.key);
+ }
+ else
+ cctx.store().put(null, entry.key, val, entry.ver);
+ }
+
+ treeOp = IgniteTree.OperationType.NOOP;
+
+ updateRes = new GridCacheUpdateAtomicResult(UpdateOutcome.CONFLICT_USE_OLD,
+ entry.val,
+ null,
+ invokeRes,
+ CU.TTL_ETERNAL,
+ CU.EXPIRE_TIME_ETERNAL,
+ null,
+ null,
+ 0);
+ }
+ // Will update something.
+ else {
+ // Merge is a local update which override passed value bytes.
+ if (conflictCtx.isMerge()) {
+ writeObj = cctx.toCacheObject(conflictCtx.mergeValue());
+
+ conflictVer = null;
+ }
+ else
+ assert conflictCtx.isUseNew();
+
+ // Update value is known at this point, so update operation type.
+ op = writeObj != null ? GridCacheOperation.UPDATE : GridCacheOperation.DELETE;
+ }
+
+ return conflictCtx;
+ }
+ else
+ // Nullify conflict version on this update, so that we will use regular version during next updates.
+ conflictVer = null;
+
+ return null;
+ }
+
+ /**
+ * @param invokeRes Entry processor result (for invoke operation).
+ * @throws IgniteCheckedException If failed.
+ */
+ private void versionCheck(@Nullable IgniteBiTuple<Object, Exception> invokeRes) throws IgniteCheckedException {
+ GridCacheContext cctx = entry.context();
+
+ boolean ignoreTime = cctx.config().getAtomicWriteOrderMode() == CacheAtomicWriteOrderMode.PRIMARY;
+
+ if (verCheck) {
+ if (!entry.isStartVersion() && ATOMIC_VER_COMPARATOR.compare(entry.ver, newVer, ignoreTime) >= 0) {
+ if (ATOMIC_VER_COMPARATOR.compare(entry.ver, newVer, ignoreTime) == 0 && cctx.writeThrough() && primary) {
+ if (log.isDebugEnabled())
+ log.debug("Received entry update with same version as current (will update store) " +
+ "[entry=" + this + ", newVer=" + newVer + ']');
+
+ CacheObject val = entry.val;
+
+ if (val == null) {
+ assert entry.deletedUnlocked();
+
+ cctx.store().remove(null, entry.key);
+ }
+ else
+ cctx.store().put(null, entry.key, val, entry.ver);
+ }
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Received entry update with smaller version than current (will ignore) " +
+ "[entry=" + this + ", newVer=" + newVer + ']');
+ }
+
+ treeOp = IgniteTree.OperationType.NOOP;
+
+ updateRes = new GridCacheUpdateAtomicResult(UpdateOutcome.VERSION_CHECK_FAILED,
+ entry.val,
+ null,
+ invokeRes,
+ CU.TTL_ETERNAL,
+ CU.EXPIRE_TIME_ETERNAL,
+ null,
+ null,
+ 0);
+ }
+ }
+ else
+ assert entry.isStartVersion() || ATOMIC_VER_COMPARATOR.compare(entry.ver, newVer, ignoreTime) <= 0 :
+ "Invalid version for inner update [isNew=" + entry.isStartVersion() + ", entry=" + this + ", newVer=" + newVer + ']';
+ }
+
+ /**
+ * @param invokeEntry Entry for {@link EntryProcessor}.
+ * @return Entry processor return value.
+ */
+ @SuppressWarnings("unchecked")
+ private IgniteBiTuple<Object, Exception> runEntryProcessor(CacheInvokeEntry<Object, Object> invokeEntry) {
+ EntryProcessor<Object, Object, ?> entryProcessor = (EntryProcessor<Object, Object, ?>)writeObj;
+
+ try {
+ Object computed = entryProcessor.process(invokeEntry, invokeArgs);
+
+ if (invokeEntry.modified()) {
+ GridCacheContext cctx = entry.context();
+
+ writeObj = cctx.toCacheObject(cctx.unwrapTemporary(invokeEntry.getValue()));
+ }
+ else
+ writeObj = invokeEntry.valObj;
+
+ if (computed != null)
+ return new IgniteBiTuple<>(entry.cctx.unwrapTemporary(computed), null);
+
+ return null;
+ }
+ catch (Exception e) {
+ writeObj = invokeEntry.valObj;
+
+ return new IgniteBiTuple<>(null, e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(AtomicCacheUpdateClosure.class, this);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ee28b9cb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
index 2355b7c..97cb534 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
@@ -29,8 +29,8 @@ import org.jetbrains.annotations.Nullable;
* Cache entry atomic update result.
*/
public class GridCacheUpdateAtomicResult {
- /** Success flag.*/
- private final boolean success;
+ /** Update operation outcome. */
+ private final UpdateOutcome outcome;
/** Old value. */
@GridToStringInclude
@@ -54,9 +54,6 @@ public class GridCacheUpdateAtomicResult {
@GridToStringInclude
private final GridCacheVersionConflictContext<?, ?> conflictRes;
- /** Whether update should be propagated to DHT node. */
- private final boolean sndToDht;
-
/** */
private final long updateCntr;
@@ -66,7 +63,7 @@ public class GridCacheUpdateAtomicResult {
/**
* Constructor.
*
- * @param success Success flag.
+ * @param outcome Update outcome.
* @param oldVal Old value.
* @param newVal New value.
* @param res Value computed by the {@link EntryProcessor}.
@@ -74,10 +71,9 @@ public class GridCacheUpdateAtomicResult {
* @param conflictExpireTime Explicit DR expire time (if any).
* @param rmvVer Version for deferred delete.
* @param conflictRes DR resolution result.
- * @param sndToDht Whether update should be propagated to DHT node.
* @param updateCntr Partition update counter.
*/
- public GridCacheUpdateAtomicResult(boolean success,
+ GridCacheUpdateAtomicResult(UpdateOutcome outcome,
@Nullable CacheObject oldVal,
@Nullable CacheObject newVal,
@Nullable IgniteBiTuple<Object, Exception> res,
@@ -85,9 +81,10 @@ public class GridCacheUpdateAtomicResult {
long conflictExpireTime,
@Nullable GridCacheVersion rmvVer,
@Nullable GridCacheVersionConflictContext<?, ?> conflictRes,
- boolean sndToDht,
long updateCntr) {
- this.success = success;
+ assert outcome != null;
+
+ this.outcome = outcome;
this.oldVal = oldVal;
this.newVal = newVal;
this.res = res;
@@ -95,11 +92,17 @@ public class GridCacheUpdateAtomicResult {
this.conflictExpireTime = conflictExpireTime;
this.rmvVer = rmvVer;
this.conflictRes = conflictRes;
- this.sndToDht = sndToDht;
this.updateCntr = updateCntr;
}
/**
+ * @return Update operation outcome.
+ */
+ UpdateOutcome outcome() {
+ return outcome;
+ }
+
+ /**
* @return Value computed by the {@link EntryProcessor}.
*/
@Nullable public IgniteBiTuple<Object, Exception> computedResult() {
@@ -110,7 +113,7 @@ public class GridCacheUpdateAtomicResult {
* @return Success flag.
*/
public boolean success() {
- return success;
+ return outcome.success();
}
/**
@@ -167,7 +170,74 @@ public class GridCacheUpdateAtomicResult {
* @return Whether update should be propagated to DHT node.
*/
public boolean sendToDht() {
- return sndToDht;
+ return outcome.sendToDht();
+ }
+
+ /**
+ *
+ */
+ public enum UpdateOutcome {
+ /** */
+ CONFLICT_USE_OLD(false, false, false),
+
+ /** */
+ VERSION_CHECK_FAILED(false, false, false),
+
+ /** */
+ FILTER_FAILED(false, false, true),
+
+ /** */
+ INVOKE_NO_OP(false, false, true),
+
+ /** */
+ INTERCEPTOR_CANCEL(false, false, true),
+
+ /** */
+ REMOVE_NO_VAL(false, true, true),
+
+ /** */
+ SUCCESS(true, true, true);
+
+ /** */
+ private final boolean success;
+
+ /** */
+ private final boolean sndToDht;
+
+ /** */
+ private final boolean updateReadMetrics;
+
+ /**
+ * @param success Success flag.
+ * @param sndToDht Whether update should be propagated to DHT node.
+ * @param updateReadMetrics Metrics update flag.
+ */
+ UpdateOutcome(boolean success, boolean sndToDht, boolean updateReadMetrics) {
+ this.success = success;
+ this.sndToDht = sndToDht;
+ this.updateReadMetrics = updateReadMetrics;
+ }
+
+ /**
+ * @return Success flag.
+ */
+ public boolean success() {
+ return success;
+ }
+
+ /**
+ * @return Whether update should be propagated to DHT node.
+ */
+ public boolean sendToDht() {
+ return sndToDht;
+ }
+
+ /**
+ * @return Metrics update flag.
+ */
+ public boolean updateReadMetrics() {
+ return updateReadMetrics;
+ }
}
/** {@inheritDoc} */