You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/10/25 12:16:04 UTC
[1/4] ignite git commit: ignite-3478 Mvcc support for sql indexes
Repository: ignite
Updated Branches:
refs/heads/ignite-3478 00bd4794a -> 6150f3a0a
http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesTest.java
index 7ba1b32..e77a3f1 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesTest.java
@@ -17,11 +17,42 @@
package org.apache.ignite.internal.processors.cache.mvcc;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteTransactions;
+import org.apache.ignite.cache.CacheEntryProcessor;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.SqlQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.util.lang.GridInClosure3;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.transactions.Transaction;
+
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
/**
- *
+ * TODO IGNITE-3478: text/spatial indexes with mvcc.
+ * TODO IGNITE-3478: indexingSpi with mvcc.
+ * TODO IGNITE-3478: setQueryParallelism with mvcc.
+ * TODO IGNITE-3478: dynamic index create.
*/
@SuppressWarnings("unchecked")
public class CacheMvccSqlQueriesTest extends CacheMvccAbstractTest {
@@ -29,11 +60,1538 @@ public class CacheMvccSqlQueriesTest extends CacheMvccAbstractTest {
* @throws Exception If failed.
*/
public void testAccountsTxSql_SingleNode_SinglePartition() throws Exception {
- accountsTxReadAll(1, 0, 0, 1, new IgniteInClosure<CacheConfiguration>() {
- @Override public void apply(CacheConfiguration ccfg) {
- ccfg.setIndexedTypes(Integer.class, MvccTestAccount.class);
+ accountsTxReadAll(1, 0, 0, 1, new InitIndexing(Integer.class, MvccTestAccount.class), false, ReadMode.SQL_ALL);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAccountsTxSql_WithRemoves_SingleNode_SinglePartition() throws Exception {
+ accountsTxReadAll(1, 0, 0, 1, new InitIndexing(Integer.class, MvccTestAccount.class), true, ReadMode.SQL_ALL);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAccountsTxSql_SingleNode() throws Exception {
+ accountsTxReadAll(1, 0, 0, 64, new InitIndexing(Integer.class, MvccTestAccount.class), false, ReadMode.SQL_ALL);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAccountsTxSumSql_SingleNode() throws Exception {
+ accountsTxReadAll(1, 0, 0, 64, new InitIndexing(Integer.class, MvccTestAccount.class), false, ReadMode.SQL_SUM);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAccountsTxSql_WithRemoves_SingleNode() throws Exception {
+ accountsTxReadAll(1, 0, 0, 64, new InitIndexing(Integer.class, MvccTestAccount.class), true, ReadMode.SQL_ALL);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAccountsTxSql_ClientServer_Backups2() throws Exception {
+ accountsTxReadAll(4, 2, 2, 64, new InitIndexing(Integer.class, MvccTestAccount.class), false, ReadMode.SQL_ALL);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testUpdateSingleValue_SingleNode() throws Exception {
+ updateSingleValue(true, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testUpdateSingleValue_LocalQuery_SingleNode() throws Exception {
+ updateSingleValue(true, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testUpdateSingleValue_ClientServer() throws Exception {
+ updateSingleValue(false, false);
+ }
+
+ /**
+ * @param singleNode {@code True} for test with single node.
+ * @param locQry Local query flag.
+ * @throws Exception If failed.
+ */
+ private void updateSingleValue(boolean singleNode, final boolean locQry) throws Exception {
+ final int VALS = 100;
+
+ final int writers = 4;
+
+ final int readers = 4;
+
+ final int INC_BY = 110;
+
+ final IgniteInClosure<IgniteCache<Object, Object>> init = new IgniteInClosure<IgniteCache<Object, Object>>() {
+ @Override public void apply(IgniteCache<Object, Object> cache) {
+ Map<Integer, MvccTestSqlIndexValue> vals = new HashMap<>();
+
+ for (int i = 0; i < VALS; i++)
+ vals.put(i, new MvccTestSqlIndexValue(i));
+
+ cache.putAll(vals);
+ }
+ };
+
+ GridInClosure3<Integer, List<TestCache>, AtomicBoolean> writer =
+ new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() {
+ @Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) {
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ int cnt = 0;
+
+ while (!stop.get()) {
+ TestCache<Integer, MvccTestSqlIndexValue> cache = randomCache(caches, rnd);
+
+ try {
+ Integer key = rnd.nextInt(VALS);
+
+ cache.cache.invoke(key, new CacheEntryProcessor<Integer, MvccTestSqlIndexValue, Object>() {
+ @Override public Object process(MutableEntry<Integer, MvccTestSqlIndexValue> e, Object... args) {
+ Integer key = e.getKey();
+
+ MvccTestSqlIndexValue val = e.getValue();
+
+ int newIdxVal;
+
+ if (val.idxVal1 < INC_BY) {
+ assertEquals(key.intValue(), val.idxVal1);
+
+ newIdxVal = val.idxVal1 + INC_BY;
+ }
+ else {
+ assertEquals(INC_BY + key, val.idxVal1);
+
+ newIdxVal = key;
+ }
+
+ e.setValue(new MvccTestSqlIndexValue(newIdxVal));
+
+ return null;
+ }
+ });
+ }
+ finally {
+ cache.readUnlock();
+ }
+ }
+
+ info("Writer finished, updates: " + cnt);
+ }
+ };
+
+ GridInClosure3<Integer, List<TestCache>, AtomicBoolean> reader =
+ new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() {
+ @Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) {
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ List<SqlFieldsQuery> fieldsQrys = new ArrayList<>();
+
+ fieldsQrys.add(
+ new SqlFieldsQuery("select _key, idxVal1 from MvccTestSqlIndexValue where idxVal1=?").setLocal(locQry));
+
+ fieldsQrys.add(new SqlFieldsQuery("select _key, idxVal1 from MvccTestSqlIndexValue where idxVal1=? or idxVal1=?").setLocal(locQry));
+
+ fieldsQrys.add(new SqlFieldsQuery("select _key, idxVal1 from MvccTestSqlIndexValue where _key=?").setLocal(locQry));
+
+ List<SqlQuery<Integer, MvccTestSqlIndexValue>> sqlQrys = new ArrayList<>();
+
+ sqlQrys.add(new SqlQuery<Integer, MvccTestSqlIndexValue>(MvccTestSqlIndexValue.class, "idxVal1=?").setLocal(locQry));
+
+ sqlQrys.add(new SqlQuery<Integer, MvccTestSqlIndexValue>(MvccTestSqlIndexValue.class, "idxVal1=? or idxVal1=?").setLocal(locQry));
+
+ sqlQrys.add(new SqlQuery<Integer, MvccTestSqlIndexValue>(MvccTestSqlIndexValue.class, "_key=?").setLocal(locQry));
+
+ while (!stop.get()) {
+ Integer key = rnd.nextInt(VALS);
+
+ int qryIdx = rnd.nextInt(3);
+
+ TestCache<Integer, MvccTestSqlIndexValue> cache = randomCache(caches, rnd);
+
+ List<List<?>> res;
+
+ try {
+ if (rnd.nextBoolean()) {
+ SqlFieldsQuery qry = fieldsQrys.get(qryIdx);
+
+ if (qryIdx == 1)
+ qry.setArgs(key, key + INC_BY);
+ else
+ qry.setArgs(key);
+
+ res = cache.cache.query(qry).getAll();
+ }
+ else {
+ SqlQuery<Integer, MvccTestSqlIndexValue> qry = sqlQrys.get(qryIdx);
+
+ if (qryIdx == 1)
+ qry.setArgs(key, key + INC_BY);
+ else
+ qry.setArgs(key);
+
+ res = new ArrayList<>();
+
+ for (IgniteCache.Entry<Integer, MvccTestSqlIndexValue> e : cache.cache.query(qry).getAll()) {
+ List<Object> row = new ArrayList<>(2);
+
+ row.add(e.getKey());
+ row.add(e.getValue().idxVal1);
+
+ res.add(row);
+ }
+ }
+ }
+ finally {
+ cache.readUnlock();
+ }
+
+ assertTrue(qryIdx == 0 || !res.isEmpty());
+
+ if (!res.isEmpty()) {
+ assertEquals(1, res.size());
+
+ List<?> resVals = res.get(0);
+
+ Integer key0 = (Integer)resVals.get(0);
+ Integer val0 = (Integer)resVals.get(1);
+
+ assertEquals(key, key0);
+ assertTrue(val0.equals(key) || val0.equals(key + INC_BY));
+ }
+ }
+
+ if (idx == 0) {
+ SqlFieldsQuery qry = new SqlFieldsQuery("select _key, idxVal1 from MvccTestSqlIndexValue");
+
+ TestCache<Integer, MvccTestSqlIndexValue> cache = randomCache(caches, rnd);
+
+ List<List<?>> res;
+
+ try {
+ res = cache.cache.query(qry).getAll();
+ }
+ finally {
+ cache.readUnlock();
+ }
+
+ assertEquals(VALS, res.size());
+
+ for (List<?> vals : res)
+ info("Value: " + vals);
+ }
+ }
+ };
+
+ int srvs;
+ int clients;
+
+ if (singleNode) {
+ srvs = 1;
+ clients = 0;
+ }
+ else {
+ srvs = 4;
+ clients = 2;
+ }
+
+ readWriteTest(
+ null,
+ srvs,
+ clients,
+ 0,
+ DFLT_PARTITION_COUNT,
+ writers,
+ readers,
+ DFLT_TEST_TIME,
+ new InitIndexing(Integer.class, MvccTestSqlIndexValue.class),
+ init,
+ writer,
+ reader);
+
+ for (Ignite node : G.allGrids())
+ checkActiveQueriesCleanup(node);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testJoinTransactional_SingleNode() throws Exception {
+ joinTransactional(true, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testJoinTransactional_ClientServer() throws Exception {
+ joinTransactional(false, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testJoinTransactional_DistributedJoins_ClientServer() throws Exception {
+ joinTransactional(false, true);
+ }
+
+ /**
+ * @param singleNode {@code True} for test with single node.
+ * @param distributedJoin {@code True} to test distributed joins.
+ * @throws Exception If failed.
+ */
+ private void joinTransactional(boolean singleNode, final boolean distributedJoin) throws Exception {
+ final int KEYS = 100;
+
+ final int writers = 4;
+
+ final int readers = 4;
+
+ GridInClosure3<Integer, List<TestCache>, AtomicBoolean> writer =
+ new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() {
+ @Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) {
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ int cnt = 0;
+
+ while (!stop.get()) {
+ TestCache<Object, Object> cache = randomCache(caches, rnd);
+
+ IgniteTransactions txs = cache.cache.unwrap(Ignite.class).transactions();
+
+ try {
+ try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ Integer key = rnd.nextInt(KEYS);
+
+ JoinTestChildKey childKey = new JoinTestChildKey(key);
+
+ JoinTestChild child = (JoinTestChild)cache.cache.get(childKey);
+
+ if (child == null) {
+ Integer parentKey = distributedJoin ? key + 100 : key;
+
+ child = new JoinTestChild(parentKey);
+
+ cache.cache.put(childKey, child);
+
+ JoinTestParent parent = new JoinTestParent(parentKey);
+
+ cache.cache.put(new JoinTestParentKey(parentKey), parent);
+ }
+ else {
+ cache.cache.remove(childKey);
+
+ cache.cache.remove(new JoinTestParentKey(child.parentId));
+ }
+
+ tx.commit();
+ }
+
+ cnt++;
+ }
+ finally {
+ cache.readUnlock();
+ }
+ }
+
+ info("Writer finished, updates: " + cnt);
+ }
+ };
+
+ GridInClosure3<Integer, List<TestCache>, AtomicBoolean> reader =
+ new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() {
+ @Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) {
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ List<SqlFieldsQuery> qrys = new ArrayList<>();
+
+ qrys.add(new SqlFieldsQuery("select c.parentId, p.id from " +
+ "JoinTestChild c left outer join JoinTestParent p on (c.parentId = p.id)").
+ setDistributedJoins(distributedJoin));
+
+ qrys.add(new SqlFieldsQuery("select c.parentId, p.id from " +
+ "JoinTestChild c left outer join JoinTestParent p on (c.parentId = p.id) where p.id = 10").
+ setDistributedJoins(distributedJoin));
+
+ qrys.add(new SqlFieldsQuery("select c.parentId, p.id from " +
+ "JoinTestChild c left outer join JoinTestParent p on (c.parentId = p.id) where p.id != 10").
+ setDistributedJoins(distributedJoin));
+
+ while (!stop.get()) {
+ TestCache<Object, Object> cache = randomCache(caches, rnd);
+
+ try {
+ for (SqlFieldsQuery qry : qrys) {
+ List<List<?>> res = cache.cache.query(qry).getAll();
+
+ if (!res.isEmpty()) {
+ for (List<?> resRow : res) {
+ Integer parentId = (Integer)resRow.get(1);
+
+ assertNotNull(parentId);
+ }
+ }
+ }
+ }
+ finally {
+ cache.readUnlock();
+ }
+ }
+
+ if (idx == 0) {
+ TestCache<Object, Object> cache = randomCache(caches, rnd);
+
+ try {
+ List<List<?>> res = cache.cache.query(qrys.get(0)).getAll();
+
+ info("Reader finished, result: " + res);
+ }
+ finally {
+ cache.readUnlock();
+ }
+ }
+ }
+ };
+
+ int srvs;
+ int clients;
+
+ if (singleNode) {
+ srvs = 1;
+ clients = 0;
+ }
+ else {
+ srvs = 4;
+ clients = 2;
+ }
+
+ readWriteTest(
+ null,
+ srvs,
+ clients,
+ 0,
+ DFLT_PARTITION_COUNT,
+ writers,
+ readers,
+ DFLT_TEST_TIME,
+ new InitIndexing(JoinTestParentKey.class, JoinTestParent.class,
+ JoinTestChildKey.class, JoinTestChild.class),
+ null,
+ writer,
+ reader);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testJoinTransactional_DistributedJoins_ClientServer2() throws Exception {
+ final int KEYS = 100;
+
+ final int writers = 1;
+
+ final int readers = 4;
+
+ final int CHILDREN_CNT = 10;
+
+ GridInClosure3<Integer, List<TestCache>, AtomicBoolean> writer =
+ new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() {
+ @Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) {
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ int cnt = 0;
+
+ while (!stop.get()) {
+ TestCache<Object, Object> cache = randomCache(caches, rnd);
+
+ IgniteTransactions txs = cache.cache.unwrap(Ignite.class).transactions();
+
+ try {
+ try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ Integer key = rnd.nextInt(KEYS);
+
+ JoinTestParentKey parentKey = new JoinTestParentKey(key);
+
+ JoinTestParent parent = (JoinTestParent)cache.cache.get(parentKey);
+
+ if (parent == null) {
+ for (int i = 0; i < CHILDREN_CNT; i++)
+ cache.cache.put(new JoinTestChildKey(key * 10_000 + i), new JoinTestChild(key));
+
+ cache.cache.put(parentKey, new JoinTestParent(key));
+ }
+ else {
+ for (int i = 0; i < CHILDREN_CNT; i++)
+ cache.cache.remove(new JoinTestChildKey(key * 10_000 + i));
+
+ cache.cache.remove(parentKey);
+ }
+
+ tx.commit();
+ }
+
+ cnt++;
+ }
+ finally {
+ cache.readUnlock();
+ }
+ }
+
+ info("Writer finished, updates: " + cnt);
+ }
+ };
+
+ GridInClosure3<Integer, List<TestCache>, AtomicBoolean> reader =
+ new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() {
+ @Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) {
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ SqlFieldsQuery qry = new SqlFieldsQuery("select c.parentId, p.id from " +
+ "JoinTestChild c left outer join JoinTestParent p on (c.parentId = p.id) where p.id=?").
+ setDistributedJoins(true);
+
+ int cnt = 0;
+
+ while (!stop.get()) {
+ TestCache<Object, Object> cache = randomCache(caches, rnd);
+
+ qry.setArgs(rnd.nextInt(KEYS));
+
+ try {
+ List<List<?>> res = cache.cache.query(qry).getAll();
+
+ if (!res.isEmpty())
+ assertEquals(CHILDREN_CNT, res.size());
+
+ cnt++;
+ }
+ finally {
+ cache.readUnlock();
+ }
+ }
+
+ info("Reader finished, read count: " + cnt);
+ }
+ };
+
+ readWriteTest(
+ null,
+ 4,
+ 2,
+ 0,
+ DFLT_PARTITION_COUNT,
+ writers,
+ readers,
+ DFLT_TEST_TIME,
+ new InitIndexing(JoinTestParentKey.class, JoinTestParent.class,
+ JoinTestChildKey.class, JoinTestChild.class),
+ null,
+ writer,
+ reader);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testDistributedJoinSimple() throws Exception {
+ startGridsMultiThreaded(4);
+
+ Ignite srv0 = ignite(0);
+
+ int[] backups = {0, 1, 2};
+
+ for (int b : backups) {
+ IgniteCache<Object, Object> cache = srv0.createCache(
+ cacheConfiguration(PARTITIONED, FULL_SYNC, b, DFLT_PARTITION_COUNT).
+ setIndexedTypes(JoinTestParentKey.class, JoinTestParent.class, JoinTestChildKey.class, JoinTestChild.class));
+
+ int cntr = 0;
+
+ int expCnt = 0;
+
+ for (int i = 0; i < 10; i++) {
+ JoinTestParentKey parentKey = new JoinTestParentKey(i);
+
+ cache.put(parentKey, new JoinTestParent(i));
+
+ for (int c = 0; c < i; c++) {
+ JoinTestChildKey childKey = new JoinTestChildKey(cntr++);
+
+ cache.put(childKey, new JoinTestChild(i));
+
+ expCnt++;
+ }
+ }
+
+ SqlFieldsQuery qry = new SqlFieldsQuery("select c.parentId, p.id from " +
+ "JoinTestChild c join JoinTestParent p on (c.parentId = p.id)").
+ setDistributedJoins(true);
+
+ Map<Integer, Integer> resMap = new HashMap<>();
+
+ List<List<?>> res = cache.query(qry).getAll();
+
+ assertEquals(expCnt, res.size());
+
+ for (List<?> resRow : res) {
+ Integer parentId = (Integer)resRow.get(0);
+
+ Integer cnt = resMap.get(parentId);
+
+ if (cnt == null)
+ resMap.put(parentId, 1);
+ else
+ resMap.put(parentId, cnt + 1);
+ }
+
+ for (int i = 1; i < 10; i++)
+ assertEquals(i, (Object)resMap.get(i));
+
+ srv0.destroyCache(cache.getName());
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testCacheRecreate() throws Exception {
+ cacheRecreate(new InitIndexing(Integer.class, MvccTestAccount.class));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testCacheRecreateChangeIndexedType() throws Exception {
+ Ignite srv0 = startGrid(0);
+
+ final int PARTS = 64;
+
+ {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, FULL_SYNC, 0, PARTS).
+ setIndexedTypes(Integer.class, MvccTestAccount.class);
+
+ IgniteCache<Integer, MvccTestAccount> cache = (IgniteCache)srv0.createCache(ccfg);
+
+ for (int k = 0; k < PARTS * 2; k++) {
+ assertNull(cache.get(k));
+
+ int vals = k % 3 + 1;
+
+ for (int v = 0; v < vals; v++)
+ cache.put(k, new MvccTestAccount(v, 1));
+
+ assertEquals(vals - 1, cache.get(k).val);
+ }
+
+ assertEquals(PARTS * 2, cache.query(new SqlQuery<>(MvccTestAccount.class, "true")).getAll().size());
+
+ srv0.destroyCache(cache.getName());
+ }
+
+ {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, FULL_SYNC, 0, PARTS).
+ setIndexedTypes(Integer.class, MvccTestSqlIndexValue.class);
+
+ IgniteCache<Integer, MvccTestSqlIndexValue> cache = (IgniteCache)srv0.createCache(ccfg);
+
+ for (int k = 0; k < PARTS * 2; k++) {
+ assertNull(cache.get(k));
+
+ int vals = k % 3 + 1;
+
+ for (int v = 0; v < vals; v++)
+ cache.put(k, new MvccTestSqlIndexValue(v));
+
+ assertEquals(vals - 1, cache.get(k).idxVal1);
+ }
+
+ assertEquals(PARTS * 2, cache.query(new SqlQuery<>(MvccTestSqlIndexValue.class, "true")).getAll().size());
+
+ srv0.destroyCache(cache.getName());
+ }
+
+ {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, FULL_SYNC, 0, PARTS).
+ setIndexedTypes(Long.class, Long.class);
+
+ IgniteCache<Long, Long> cache = (IgniteCache)srv0.createCache(ccfg);
+
+ for (int k = 0; k < PARTS * 2; k++) {
+ assertNull(cache.get((long)k));
+
+ int vals = k % 3 + 1;
+
+ for (int v = 0; v < vals; v++)
+ cache.put((long)k, (long)v);
+
+ assertEquals((long)(vals - 1), (Object)cache.get((long)k));
}
- }, false, ReadMode.SQL_ALL);
+
+ assertEquals(PARTS * 2, cache.query(new SqlQuery<>(Long.class, "true")).getAll().size());
+
+ srv0.destroyCache(cache.getName());
+ }
}
+ /**
+ * @throws Exception If failed.
+ */
+ public void testChangeValueType1() throws Exception {
+ Ignite srv0 = startGrid(0);
+
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, FULL_SYNC, 0, DFLT_PARTITION_COUNT).
+ setIndexedTypes(Integer.class, MvccTestSqlIndexValue.class, Integer.class, Integer.class);
+
+ IgniteCache<Object, Object> cache = srv0.createCache(ccfg);
+
+ cache.put(1, new MvccTestSqlIndexValue(1));
+ cache.put(1, new MvccTestSqlIndexValue(2));
+
+ checkSingleResult(cache, new SqlFieldsQuery("select idxVal1 from MvccTestSqlIndexValue"), 2);
+
+ cache.put(1, 1);
+
+ assertEquals(0, cache.query(new SqlFieldsQuery("select idxVal1 from MvccTestSqlIndexValue")).getAll().size());
+
+ checkSingleResult(cache, new SqlFieldsQuery("select _val from Integer"), 1);
+
+ cache.put(1, 2);
+
+ checkSingleResult(cache, new SqlFieldsQuery("select _val from Integer"), 2);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testChangeValueType2() throws Exception {
+ Ignite srv0 = startGrid(0);
+
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, FULL_SYNC, 0, DFLT_PARTITION_COUNT).
+ setIndexedTypes(Integer.class, MvccTestSqlIndexValue.class, Integer.class, Integer.class);
+
+ IgniteCache<Object, Object> cache = srv0.createCache(ccfg);
+
+ cache.put(1, new MvccTestSqlIndexValue(1));
+ cache.put(1, new MvccTestSqlIndexValue(2));
+
+ checkSingleResult(cache, new SqlFieldsQuery("select idxVal1 from MvccTestSqlIndexValue"), 2);
+
+ cache.remove(1);
+
+ assertEquals(0, cache.query(new SqlFieldsQuery("select idxVal1 from MvccTestSqlIndexValue")).getAll().size());
+
+ cache.put(1, 1);
+
+ assertEquals(0, cache.query(new SqlFieldsQuery("select idxVal1 from MvccTestSqlIndexValue")).getAll().size());
+
+ checkSingleResult(cache, new SqlFieldsQuery("select _val from Integer"), 1);
+
+ cache.put(1, 2);
+
+ checkSingleResult(cache, new SqlFieldsQuery("select _val from Integer"), 2);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testCountTransactional_SingleNode() throws Exception {
+ countTransactional(true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testCountTransactional_ClientServer() throws Exception {
+ countTransactional(false);
+ }
+
+ /**
+ * @param singleNode {@code True} for test with single node.
+ * @throws Exception If failed.
+ */
+ private void countTransactional(boolean singleNode) throws Exception {
+ final int writers = 4;
+
+ final int readers = 4;
+
+ final int THREAD_KEY_RANGE = 100;
+
+ final int VAL_RANGE = 10;
+
+ GridInClosure3<Integer, List<TestCache>, AtomicBoolean> writer =
+ new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() {
+ @Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) {
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ int min = idx * THREAD_KEY_RANGE;
+ int max = min + THREAD_KEY_RANGE;
+
+ info("Thread range [min=" + min + ", max=" + max + ']');
+
+ int cnt = 0;
+
+ Set<Integer> keys = new LinkedHashSet<>();
+
+ while (!stop.get()) {
+ TestCache<Integer, MvccTestSqlIndexValue> cache = randomCache(caches, rnd);
+
+ try {
+ // Add or remove 10 keys.
+ if (!keys.isEmpty() && (keys.size() == THREAD_KEY_RANGE || rnd.nextInt(3) == 0 )) {
+ Set<Integer> rmvKeys = new HashSet<>();
+
+ for (Integer key : keys) {
+ rmvKeys.add(key);
+
+ if (rmvKeys.size() == 10)
+ break;
+ }
+
+ assertEquals(10, rmvKeys.size());
+
+ cache.cache.removeAll(rmvKeys);
+
+ keys.removeAll(rmvKeys);
+ }
+ else {
+ TreeMap<Integer, MvccTestSqlIndexValue> map = new TreeMap<>();
+
+ while (map.size() != 10) {
+ Integer key = rnd.nextInt(min, max);
+
+ if (keys.add(key))
+ map.put(key, new MvccTestSqlIndexValue(rnd.nextInt(VAL_RANGE)));
+ }
+
+ assertEquals(10, map.size());
+
+ cache.cache.putAll(map);
+ }
+ }
+ finally {
+ cache.readUnlock();
+ }
+ }
+
+ info("Writer finished, updates: " + cnt);
+ }
+ };
+
+ GridInClosure3<Integer, List<TestCache>, AtomicBoolean> reader =
+ new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() {
+ @Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) {
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ List<SqlFieldsQuery> qrys = new ArrayList<>();
+
+ qrys.add(new SqlFieldsQuery("select count(*) from MvccTestSqlIndexValue"));
+
+ qrys.add(new SqlFieldsQuery(
+ "select count(*) from MvccTestSqlIndexValue where idxVal1 >= 0 and idxVal1 <= " + VAL_RANGE));
+
+ while (!stop.get()) {
+ TestCache<Integer, MvccTestSqlIndexValue> cache = randomCache(caches, rnd);
+
+ try {
+ for (SqlFieldsQuery qry : qrys) {
+ List<List<?>> res = cache.cache.query(qry).getAll();
+
+ assertEquals(1, res.size());
+
+ Long cnt = (Long)res.get(0).get(0);
+
+ assertTrue(cnt % 10 == 0);
+ }
+ }
+ finally {
+ cache.readUnlock();
+ }
+ }
+ }
+ };
+
+ int srvs;
+ int clients;
+
+ if (singleNode) {
+ srvs = 1;
+ clients = 0;
+ }
+ else {
+ srvs = 4;
+ clients = 2;
+ }
+
+ readWriteTest(
+ null,
+ srvs,
+ clients,
+ 0,
+ DFLT_PARTITION_COUNT,
+ writers,
+ readers,
+ DFLT_TEST_TIME,
+ new InitIndexing(Integer.class, MvccTestSqlIndexValue.class),
+ null,
+ writer,
+ reader);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testMaxMinTransactional_SingleNode() throws Exception {
+ maxMinTransactional(true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testMaxMinTransactional_ClientServer() throws Exception {
+ maxMinTransactional(false);
+ }
+
+ /**
+ * @param singleNode {@code True} for test with single node.
+ * @throws Exception If failed.
+ */
+ private void maxMinTransactional(boolean singleNode) throws Exception {
+ final int writers = 1;
+
+ final int readers = 1;
+
+ final int THREAD_OPS = 10;
+
+ final int OP_RANGE = 10;
+
+ final int THREAD_KEY_RANGE = OP_RANGE * THREAD_OPS;
+
+ GridInClosure3<Integer, List<TestCache>, AtomicBoolean> writer =
+ new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() {
+ @Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) {
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ int min = idx * THREAD_KEY_RANGE;
+
+ info("Thread range [start=" + min + ']');
+
+ int cnt = 0;
+
+ boolean add = true;
+
+ int op = 0;
+
+ while (!stop.get()) {
+ TestCache<Integer, MvccTestSqlIndexValue> cache = randomCache(caches, rnd);
+
+ try {
+ int startKey = min + op * OP_RANGE;
+
+ if (add) {
+ Map<Integer, MvccTestSqlIndexValue> vals = new HashMap<>();
+
+ for (int i = 0; i < 10; i++) {
+ Integer key = startKey + i + 1;
+
+ vals.put(key, new MvccTestSqlIndexValue(key));
+ }
+
+ cache.cache.putAll(vals);
+
+ // info("put " + vals.keySet());
+ }
+ else {
+ Set<Integer> rmvKeys = new HashSet<>();
+
+ for (int i = 0; i < 10; i++)
+ rmvKeys.add(startKey + i + 1);
+
+ cache.cache.removeAll(rmvKeys);
+
+ // info("remove " + rmvKeys);
+ }
+
+ if (++op == THREAD_OPS) {
+ add = !add;
+
+ op = 0;
+ }
+ }
+ finally {
+ cache.readUnlock();
+ }
+ }
+
+ info("Writer finished, updates: " + cnt);
+ }
+ };
+
+ GridInClosure3<Integer, List<TestCache>, AtomicBoolean> reader =
+ new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() {
+ @Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) {
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ List<SqlFieldsQuery> maxQrys = new ArrayList<>();
+ List<SqlFieldsQuery> minQrys = new ArrayList<>();
+
+ maxQrys.add(new SqlFieldsQuery("select max(idxVal1) from MvccTestSqlIndexValue"));
+ maxQrys.add(new SqlFieldsQuery("select max(idxVal1) from MvccTestSqlIndexValue where idxVal1 >= 0"));
+
+ minQrys.add(new SqlFieldsQuery("select min(idxVal1) from MvccTestSqlIndexValue"));
+ minQrys.add(new SqlFieldsQuery("select min(idxVal1) from MvccTestSqlIndexValue where idxVal1 >= 0"));
+
+ while (!stop.get()) {
+ TestCache<Integer, MvccTestSqlIndexValue> cache = randomCache(caches, rnd);
+
+ try {
+ for (SqlFieldsQuery qry : maxQrys) {
+ List<List<?>> res = cache.cache.query(qry).getAll();
+
+ assertEquals(1, res.size());
+
+ Integer m = (Integer)res.get(0).get(0);
+
+ assertTrue(m == null || m % 10 == 0);
+ }
+
+ for (SqlFieldsQuery qry : minQrys) {
+ List<List<?>> res = cache.cache.query(qry).getAll();
+
+ assertEquals(1, res.size());
+
+ Integer m = (Integer)res.get(0).get(0);
+
+ assertTrue(m == null || m % 10 == 1);
+ }
+ }
+ finally {
+ cache.readUnlock();
+ }
+ }
+ }
+ };
+
+ int srvs;
+ int clients;
+
+ if (singleNode) {
+ srvs = 1;
+ clients = 0;
+ }
+ else {
+ srvs = 4;
+ clients = 2;
+ }
+
+ readWriteTest(
+ null,
+ srvs,
+ clients,
+ 0,
+ DFLT_PARTITION_COUNT,
+ writers,
+ readers,
+ DFLT_TEST_TIME,
+ new InitIndexing(Integer.class, MvccTestSqlIndexValue.class),
+ null,
+ writer,
+ reader);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testSqlQueriesWithMvcc() throws Exception {
+ Ignite srv0 = startGrid(0);
+
+ IgniteCache<Integer, MvccTestSqlIndexValue> cache = (IgniteCache)srv0.createCache(
+ cacheConfiguration(PARTITIONED, FULL_SYNC, 0, DFLT_PARTITION_COUNT).
+ setIndexedTypes(Integer.class, MvccTestSqlIndexValue.class));
+
+ for (int i = 0; i < 10; i++)
+ cache.put(i, new MvccTestSqlIndexValue(i));
+
+ sqlQueriesWithMvcc(cache, true);
+
+ sqlQueriesWithMvcc(cache, false);
+
+ startGrid(1);
+
+ awaitPartitionMapExchange();
+
+ sqlQueriesWithMvcc(cache, false);
+ }
+
+ /**
+ * @param cache Cache.
+ * @param loc Local query flag.
+ */
+ private void sqlQueriesWithMvcc(IgniteCache<Integer, MvccTestSqlIndexValue> cache, boolean loc) {
+ assertEquals(10,
+ cache.query(new SqlQuery<>(MvccTestSqlIndexValue.class, "true").setLocal(loc)).getAll().size());
+
+ assertEquals(10,
+ cache.query(new SqlFieldsQuery("select idxVal1 from MvccTestSqlIndexValue").setLocal(loc)).getAll().size());
+
+ checkSingleResult(cache,
+ new SqlFieldsQuery("select max(idxVal1) from MvccTestSqlIndexValue").setLocal(loc), 9);
+
+ checkSingleResult(cache,
+ new SqlFieldsQuery("select max(idxVal1) from MvccTestSqlIndexValue where idxVal1 > 0").setLocal(loc), 9);
+
+ checkSingleResult(cache,
+ new SqlFieldsQuery("select max(idxVal1) from MvccTestSqlIndexValue where idxVal1 < 5").setLocal(loc), 4);
+
+ checkSingleResult(cache,
+ new SqlFieldsQuery("select min(idxVal1) from MvccTestSqlIndexValue").setLocal(loc), 0);
+
+ checkSingleResult(cache,
+ new SqlFieldsQuery("select min(idxVal1) from MvccTestSqlIndexValue where idxVal1 < 100").setLocal(loc), 0);
+
+ checkSingleResult(cache,
+ new SqlFieldsQuery("select min(idxVal1) from MvccTestSqlIndexValue where idxVal1 < 5").setLocal(loc), 0);
+
+ checkSingleResult(cache,
+ new SqlFieldsQuery("select min(idxVal1) from MvccTestSqlIndexValue where idxVal1 > 5").setLocal(loc), 6);
+
+ checkSingleResult(cache,
+ new SqlFieldsQuery("select count(*) from MvccTestSqlIndexValue").setLocal(loc), 10L);
+
+ checkSingleResult(cache,
+ new SqlFieldsQuery("select count(*) from MvccTestSqlIndexValue where idxVal1 >= 0").setLocal(loc), 10L);
+
+ checkSingleResult(cache,
+ new SqlFieldsQuery("select count(*) from MvccTestSqlIndexValue where idxVal1 >= 0 and idxVal1 < 100").setLocal(loc), 10L);
+
+ checkSingleResult(cache,
+ new SqlFieldsQuery("select count(*) from MvccTestSqlIndexValue where idxVal1 >0 and idxVal1 < 5").setLocal(loc), 4L);
+
+ checkSingleResult(cache,
+ new SqlFieldsQuery("select count(*) from MvccTestSqlIndexValue where idxVal1 >= 1").setLocal(loc), 9L);
+
+ checkSingleResult(cache,
+ new SqlFieldsQuery("select count(*) from MvccTestSqlIndexValue where idxVal1 > 100").setLocal(loc), 0L);
+
+ checkSingleResult(cache,
+ new SqlFieldsQuery("select count(*) from MvccTestSqlIndexValue where idxVal1 = 1").setLocal(loc), 1L);
+ }
+
+ /**
+ * @param cache Cache.
+ * @param qry Query.
+ * @param exp Expected value.
+ */
+ private void checkSingleResult(IgniteCache cache, SqlFieldsQuery qry, Object exp) {
+ List<List<?>> res = cache.query(qry).getAll();
+
+ assertEquals(1, res.size());
+
+ List<?> row = res.get(0);
+
+ assertEquals(1, row.size());
+
+ assertEquals(exp, row.get(0));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testSqlSimple() throws Exception {
+ startGrid(0);
+
+ for (int i = 0; i < 4; i++)
+ sqlSimple(i * 512);
+
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ for (int i = 0; i < 5; i++)
+ sqlSimple(rnd.nextInt(2048));
+ }
+
+ /**
+ * @param inlineSize Inline size.
+ * @throws Exception If failed.
+ */
+ private void sqlSimple(int inlineSize) throws Exception {
+ Ignite srv0 = ignite(0);
+
+ IgniteCache<Integer, MvccTestSqlIndexValue> cache = (IgniteCache)srv0.createCache(
+ cacheConfiguration(PARTITIONED, FULL_SYNC, 0, DFLT_PARTITION_COUNT).
+ setIndexedTypes(Integer.class, MvccTestSqlIndexValue.class).
+ setSqlIndexMaxInlineSize(inlineSize));
+
+ Map<Integer, Integer> expVals = new HashMap<>();
+
+ checkValues(expVals, cache);
+
+ cache.put(1, new MvccTestSqlIndexValue(1));
+ expVals.put(1, 1);
+
+ checkValues(expVals, cache);
+
+ cache.put(1, new MvccTestSqlIndexValue(2));
+ expVals.put(1, 2);
+
+ checkValues(expVals, cache);
+
+ cache.put(2, new MvccTestSqlIndexValue(1));
+ expVals.put(2, 1);
+ cache.put(3, new MvccTestSqlIndexValue(1));
+ expVals.put(3, 1);
+ cache.put(4, new MvccTestSqlIndexValue(1));
+ expVals.put(4, 1);
+
+ checkValues(expVals, cache);
+
+ cache.remove(1);
+ expVals.remove(1);
+
+ checkValues(expVals, cache);
+
+ checkNoValue(1, cache);
+
+ cache.put(1, new MvccTestSqlIndexValue(10));
+ expVals.put(1, 10);
+
+ checkValues(expVals, cache);
+
+ checkActiveQueriesCleanup(srv0);
+
+ srv0.destroyCache(cache.getName());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testSqlSimplePutRemoveRandom() throws Exception {
+ startGrid(0);
+
+ testSqlSimplePutRemoveRandom(0);
+
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ for (int i = 0; i < 3; i++)
+ testSqlSimplePutRemoveRandom(rnd.nextInt(2048));
+ }
+
+ /**
+ * @param inlineSize Inline size.
+ * @throws Exception If failed.
+ */
+ private void testSqlSimplePutRemoveRandom(int inlineSize) throws Exception {
+ Ignite srv0 = grid(0);
+
+ IgniteCache<Integer, MvccTestSqlIndexValue> cache = (IgniteCache) srv0.createCache(
+ cacheConfiguration(PARTITIONED, FULL_SYNC, 0, DFLT_PARTITION_COUNT).
+ setIndexedTypes(Integer.class, MvccTestSqlIndexValue.class).
+ setSqlIndexMaxInlineSize(inlineSize));
+
+ Map<Integer, Integer> expVals = new HashMap<>();
+
+ final int KEYS = 100;
+ final int VALS = 10;
+
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ long stopTime = System.currentTimeMillis() + 5_000;
+
+ for (int i = 0; i < 100_000; i++) {
+ Integer key = rnd.nextInt(KEYS);
+
+ if (rnd.nextInt(5) == 0) {
+ cache.remove(key);
+
+ expVals.remove(key);
+ }
+ else {
+ Integer val = rnd.nextInt(VALS);
+
+ cache.put(key, new MvccTestSqlIndexValue(val));
+
+ expVals.put(key, val);
+ }
+
+ checkValues(expVals, cache);
+
+ if (System.currentTimeMillis() > stopTime) {
+ info("Stop test, iteration: " + i);
+
+ break;
+ }
+ }
+
+ for (int i = 0; i < KEYS; i++) {
+ if (!expVals.containsKey(i))
+ checkNoValue(i, cache);
+ }
+
+ checkActiveQueriesCleanup(srv0);
+
+ srv0.destroyCache(cache.getName());
+ }
+
+ /**
+ * @param key Key.
+ * @param cache Cache.
+ */
+ private void checkNoValue(Object key, IgniteCache cache) {
+ SqlQuery<Integer, MvccTestSqlIndexValue> qry;
+
+ qry = new SqlQuery<>(MvccTestSqlIndexValue.class, "_key = ?");
+
+ qry.setArgs(key);
+
+ List<IgniteCache.Entry<Integer, MvccTestSqlIndexValue>> res = cache.query(qry).getAll();
+
+ assertTrue(res.isEmpty());
+ }
+
+ /**
+ * @param expVals Expected values.
+ * @param cache Cache.
+ */
+ private void checkValues(Map<Integer, Integer> expVals, IgniteCache<Integer, MvccTestSqlIndexValue> cache) {
+ SqlFieldsQuery cntQry = new SqlFieldsQuery("select count(*) from MvccTestSqlIndexValue");
+
+ Long cnt = (Long)cache.query(cntQry).getAll().get(0).get(0);
+
+ assertEquals((long)expVals.size(), (Object)cnt);
+
+ SqlQuery<Integer, MvccTestSqlIndexValue> qry;
+
+ qry = new SqlQuery<>(MvccTestSqlIndexValue.class, "true");
+
+ Map<Integer, Integer> vals = new HashMap<>();
+
+ for (IgniteCache.Entry<Integer, MvccTestSqlIndexValue> e : cache.query(qry).getAll())
+ assertNull(vals.put(e.getKey(), e.getValue().idxVal1));
+
+ assertEquals(expVals, vals);
+
+ qry = new SqlQuery<>(MvccTestSqlIndexValue.class, "_key >= 0");
+
+ vals = new HashMap<>();
+
+ for (IgniteCache.Entry<Integer, MvccTestSqlIndexValue> e : cache.query(qry).getAll())
+ assertNull(vals.put(e.getKey(), e.getValue().idxVal1));
+
+ assertEquals(expVals, vals);
+
+ qry = new SqlQuery<>(MvccTestSqlIndexValue.class, "idxVal1 >= 0");
+
+ vals = new HashMap<>();
+
+ for (IgniteCache.Entry<Integer, MvccTestSqlIndexValue> e : cache.query(qry).getAll())
+ assertNull(vals.put(e.getKey(), e.getValue().idxVal1));
+
+ assertEquals(expVals, vals);
+
+ Map<Integer, Set<Integer>> expIdxVals = new HashMap<>();
+
+ for (Map.Entry<Integer, Integer> e : expVals.entrySet()) {
+ qry = new SqlQuery<>(MvccTestSqlIndexValue.class, "_key = ?");
+
+ qry.setArgs(e.getKey());
+
+ List<IgniteCache.Entry<Integer, MvccTestSqlIndexValue>> res = cache.query(qry).getAll();
+
+ assertEquals(1, res.size());
+ assertEquals(e.getKey(), res.get(0).getKey());
+ assertEquals(e.getValue(), (Integer)res.get(0).getValue().idxVal1);
+
+ SqlFieldsQuery fieldsQry = new SqlFieldsQuery("select _key, idxVal1 from MvccTestSqlIndexValue where _key=?");
+ fieldsQry.setArgs(e.getKey());
+
+ List<List<?>> fieldsRes = cache.query(fieldsQry).getAll();
+
+ assertEquals(1, fieldsRes.size());
+ assertEquals(e.getKey(), fieldsRes.get(0).get(0));
+ assertEquals(e.getValue(), fieldsRes.get(0).get(1));
+
+ Integer val = e.getValue();
+
+ Set<Integer> keys = expIdxVals.get(val);
+
+ if (keys == null)
+ expIdxVals.put(val, keys = new HashSet<>());
+
+ assertTrue(keys.add(e.getKey()));
+ }
+
+ for (Map.Entry<Integer, Set<Integer>> expE : expIdxVals.entrySet()) {
+ qry = new SqlQuery<>(MvccTestSqlIndexValue.class, "idxVal1 = ?");
+ qry.setArgs(expE.getKey());
+
+ vals = new HashMap<>();
+
+ for (IgniteCache.Entry<Integer, MvccTestSqlIndexValue> e : cache.query(qry).getAll()) {
+ assertNull(vals.put(e.getKey(), e.getValue().idxVal1));
+
+ assertEquals(expE.getKey(), (Integer)e.getValue().idxVal1);
+
+ assertTrue(expE.getValue().contains(e.getKey()));
+ }
+
+ assertEquals(expE.getValue().size(), vals.size());
+ }
+ }
+
+ /**
+ *
+ */
+ static class JoinTestParentKey implements Serializable {
+ /** */
+ private int key;
+
+ /**
+ * @param key Key.
+ */
+ JoinTestParentKey(int key) {
+ this.key = key;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ JoinTestParentKey that = (JoinTestParentKey)o;
+
+ return key == that.key;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return key;
+ }
+ }
+
+ /**
+ *
+ */
+ static class JoinTestParent {
+ /** */
+ @QuerySqlField(index = true)
+ private int id;
+
+ /**
+ * @param id ID.
+ */
+ JoinTestParent(int id) {
+ this.id = id;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(JoinTestParent.class, this);
+ }
+ }
+
+ /**
+ *
+ */
+ static class JoinTestChildKey implements Serializable {
+ /** */
+ @QuerySqlField(index = true)
+ private int key;
+
+ /**
+ * @param key Key.
+ */
+ JoinTestChildKey(int key) {
+ this.key = key;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ JoinTestChildKey that = (JoinTestChildKey)o;
+
+ return key == that.key;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return key;
+ }
+ }
+
+ /**
+ *
+ */
+ static class JoinTestChild {
+ /** */
+ @QuerySqlField(index = true)
+ private int parentId;
+
+ /**
+ * @param parentId Parent ID.
+ */
+ JoinTestChild(int parentId) {
+ this.parentId = parentId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(JoinTestChild.class, this);
+ }
+ }
+
+ /**
+ *
+ */
+ static class MvccTestSqlIndexValue implements Serializable {
+ /** */
+ @QuerySqlField(index = true)
+ private int idxVal1;
+
+ /**
+ * @param idxVal1 Indexed value 1.
+ */
+ MvccTestSqlIndexValue(int idxVal1) {
+ this.idxVal1 = idxVal1;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(MvccTestSqlIndexValue.class, this);
+ }
+ }
+
+ /**
+ *
+ */
+ static class InitIndexing implements IgniteInClosure<CacheConfiguration> {
+ /** */
+ private final Class[] idxTypes;
+
+ /**
+ * @param idxTypes Indexed types.
+ */
+ InitIndexing(Class<?>... idxTypes) {
+ this.idxTypes = idxTypes;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void apply(CacheConfiguration cfg) {
+ cfg.setIndexedTypes(idxTypes);
+ }
+ }
}
[4/4] ignite git commit: ignite-3478 Mvcc support for sql indexes
Posted by sb...@apache.org.
ignite-3478 Mvcc support for sql indexes
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6150f3a0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6150f3a0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6150f3a0
Branch: refs/heads/ignite-3478
Commit: 6150f3a0ad310810606ec5bafbd007804808ff25
Parents: 00bd479
Author: sboikov <sb...@gridgain.com>
Authored: Wed Oct 25 15:15:56 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Oct 25 15:15:56 2017 +0300
----------------------------------------------------------------------
.../cache/IgniteCacheOffheapManagerImpl.java | 181 +-
.../cache/mvcc/CacheCoordinatorsProcessor.java | 26 +-
.../cache/mvcc/CoordinatorAckRequestTx.java | 2 +-
.../cache/mvcc/PreviousCoordinatorQueries.java | 4 +-
.../cache/persistence/CacheDataRowAdapter.java | 6 +-
.../cache/persistence/tree/BPlusTree.java | 42 +-
.../cache/persistence/tree/io/IOVersions.java | 7 +
.../cache/persistence/tree/io/PageIO.java | 85 +-
.../cache/query/GridCacheQueryManager.java | 11 +-
.../cache/tree/AbstractDataInnerIO.java | 8 +-
.../cache/tree/AbstractDataLeafIO.java | 6 +-
.../cache/tree/CacheDataRowStore.java | 6 +-
.../processors/cache/tree/CacheDataTree.java | 2 +-
.../cache/tree/CacheIdAwareDataInnerIO.java | 2 +-
.../cache/tree/CacheIdAwareDataLeafIO.java | 2 +-
.../processors/cache/tree/DataInnerIO.java | 2 +-
.../processors/cache/tree/DataLeafIO.java | 2 +-
.../internal/processors/cache/tree/DataRow.java | 17 +-
.../processors/cache/tree/MvccCleanupRow.java | 48 +
.../processors/cache/tree/MvccDataRow.java | 25 +-
.../processors/cache/tree/MvccUpdateRow.java | 23 +-
.../processors/cache/tree/SearchRow.java | 2 +-
.../datastreamer/DataStreamerImpl.java | 2 +-
.../processors/query/GridQueryIndexing.java | 8 +-
.../processors/query/GridQueryProcessor.java | 43 +-
...IgniteClientCacheInitializationFailTest.java | 4 +-
.../cache/mvcc/CacheMvccAbstractTest.java | 123 +-
.../cache/mvcc/CacheMvccTransactionsTest.java | 78 +-
.../processors/database/BPlusTreeSelfTest.java | 106 +-
.../query/h2/opt/GridH2SpatialIndex.java | 5 +
.../cache/query/GridCacheTwoStepQuery.java | 18 +
.../processors/query/h2/IgniteH2Indexing.java | 41 +-
.../query/h2/database/H2PkHashIndex.java | 11 +-
.../query/h2/database/H2RowFactory.java | 30 +-
.../processors/query/h2/database/H2Tree.java | 102 +-
.../query/h2/database/H2TreeIndex.java | 74 +-
.../h2/database/H2TreeMvccFilterClosure.java | 106 ++
.../h2/database/io/AbstractH2ExtrasInnerIO.java | 190 +++
.../h2/database/io/AbstractH2ExtrasLeafIO.java | 187 +++
.../query/h2/database/io/AbstractH2InnerIO.java | 106 ++
.../query/h2/database/io/AbstractH2LeafIO.java | 108 ++
.../query/h2/database/io/H2ExtrasInnerIO.java | 115 +-
.../query/h2/database/io/H2ExtrasLeafIO.java | 111 +-
.../query/h2/database/io/H2IOUtils.java | 113 ++
.../query/h2/database/io/H2InnerIO.java | 41 +-
.../query/h2/database/io/H2LeafIO.java | 41 +-
.../h2/database/io/H2MvccExtrasInnerIO.java | 77 +
.../h2/database/io/H2MvccExtrasLeafIO.java | 76 +
.../query/h2/database/io/H2MvccInnerIO.java | 42 +
.../query/h2/database/io/H2MvccLeafIO.java | 42 +
.../query/h2/database/io/H2RowLinkIO.java | 33 +
.../query/h2/opt/GridH2IndexBase.java | 27 +-
.../query/h2/opt/GridH2KeyValueRowOnheap.java | 30 +-
.../query/h2/opt/GridH2MetaTable.java | 5 +
.../query/h2/opt/GridH2PlainRowFactory.java | 17 +-
.../query/h2/opt/GridH2QueryContext.java | 27 +-
.../processors/query/h2/opt/GridH2Row.java | 24 +-
.../query/h2/opt/GridH2RowDescriptor.java | 12 +-
.../query/h2/opt/GridH2SearchRow.java | 41 +
.../query/h2/opt/GridH2SearchRowAdapter.java | 13 +-
.../processors/query/h2/opt/GridH2Table.java | 53 +-
.../query/h2/twostep/GridMapQueryExecutor.java | 38 +-
.../h2/twostep/GridMergeIndexIterator.java | 16 +-
.../h2/twostep/GridReduceQueryExecutor.java | 46 +-
.../h2/twostep/msg/GridH2QueryRequest.java | 83 +-
.../cache/mvcc/CacheMvccSqlQueriesTest.java | 1568 +++++++++++++++++-
66 files changed, 3955 insertions(+), 587 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 1280e75..8ce47bf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -39,9 +39,7 @@ import org.apache.ignite.internal.pagemem.FullPageId;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
-import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersionResponse;
import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersionWithoutTxs;
import org.apache.ignite.internal.processors.cache.mvcc.MvccCounter;
import org.apache.ignite.internal.processors.cache.mvcc.MvccLongList;
@@ -57,6 +55,7 @@ import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
import org.apache.ignite.internal.processors.cache.tree.CacheDataRowStore;
import org.apache.ignite.internal.processors.cache.tree.CacheDataTree;
import org.apache.ignite.internal.processors.cache.tree.DataRow;
+import org.apache.ignite.internal.processors.cache.tree.MvccCleanupRow;
import org.apache.ignite.internal.processors.cache.tree.MvccKeyMaxVersionBound;
import org.apache.ignite.internal.processors.cache.tree.MvccKeyMinVersionBound;
import org.apache.ignite.internal.processors.cache.tree.MvccRemoveRow;
@@ -88,6 +87,7 @@ import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_IDX;
import static org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.MVCC_START_CNTR;
import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.unmaskCoordinatorVersion;
import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.versionForRemovedValue;
@@ -1419,12 +1419,12 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
// TODO IGNITE-3478: null is passed for loaded from store, need handle better.
if (mvccVer == null) {
- mvccVer = new MvccCoordinatorVersionWithoutTxs(1L, CacheCoordinatorsProcessor.START_VER, 0L);
+ mvccVer = new MvccCoordinatorVersionWithoutTxs(1L, MVCC_START_CNTR, 0L);
newVal = true;
}
else
- assert val != null || CacheCoordinatorsProcessor.versionForRemovedValue(mvccVer.coordinatorVersion());
+ assert val != null || versionForRemovedValue(mvccVer.coordinatorVersion());
if (val != null) {
val.valueBytes(coCtx);
@@ -1476,8 +1476,12 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
assert !old;
- if (val != null)
+ if (val != null) {
incrementSize(cctx.cacheId());
+
+ if (cctx.queries().enabled())
+ cctx.queries().store(updateRow, mvccVer, null);
+ }
}
finally {
busyLock.leaveBusy();
@@ -1531,6 +1535,10 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
if (res == MvccUpdateRow.UpdateResult.VERSION_FOUND) {
assert !primary : updateRow;
+
+ cleanup(cctx, updateRow.cleanupRows(), false);
+
+ return null;
}
else {
rowStore.addRow(updateRow);
@@ -1543,7 +1551,19 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
incrementSize(cctx.cacheId());
}
- cleanup(updateRow.cleanupRows(), false);
+ CacheDataRow oldRow = updateRow.oldRow();
+
+ if (oldRow != null)
+ oldRow.key(key);
+
+ GridCacheQueryManager qryMgr = cctx.queries();
+
+ if (qryMgr.enabled())
+ qryMgr.store(updateRow, mvccVer, oldRow);
+
+ updatePendingEntries(cctx, updateRow, oldRow);
+
+ cleanup(cctx, updateRow.cleanupRows(), false);
return updateRow.activeTransactions();
}
@@ -1590,18 +1610,20 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
if (res == MvccUpdateRow.UpdateResult.VERSION_FOUND) {
assert !primary : updateRow;
- cleanup(updateRow.cleanupRows(), false);
+ cleanup(cctx, updateRow.cleanupRows(), false);
+
+ return null;
}
else {
if (res == MvccUpdateRow.UpdateResult.PREV_NOT_NULL)
decrementSize(cacheId);
- CacheSearchRow rmvRow = cleanup(updateRow.cleanupRows(), true);
+ long rmvRowLink = cleanup(cctx, updateRow.cleanupRows(), true);
- if (rmvRow == null)
+ if (rmvRowLink == 0)
rowStore.addRow(updateRow);
else
- updateRow.link(rmvRow.link());
+ updateRow.link(rmvRowLink);
assert updateRow.link() != 0L;
@@ -1610,6 +1632,21 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
assert !old;
}
+ CacheDataRow oldRow = updateRow.oldRow();
+
+ if (oldRow != null) {
+ assert oldRow.link() != 0 : oldRow;
+
+ oldRow.key(key);
+
+ GridCacheQueryManager qryMgr = cctx.queries();
+
+ if (qryMgr.enabled())
+ qryMgr.remove(key, oldRow, mvccVer);
+
+ clearPendingEntries(cctx, oldRow);
+ }
+
return updateRow.activeTransactions();
}
finally {
@@ -1623,26 +1660,40 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
int cacheId = grp.sharedGroup() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID;
+ boolean cleanup = cctx.queries().enabled() || hasPendingEntries;
+
GridCursor<CacheDataRow> cur = dataTree.find(
new MvccSearchRow(cacheId, key, Long.MAX_VALUE, Long.MAX_VALUE),
new MvccSearchRow(cacheId, key, 1, 1),
- CacheDataRowAdapter.RowData.KEY_ONLY);
+ cleanup ? CacheDataRowAdapter.RowData.NO_KEY : CacheDataRowAdapter.RowData.LINK_ONLY);
boolean first = true;
while (cur.next()) {
CacheDataRow row = cur.get();
+ row.key(key);
+
assert row.link() != 0 : row;
boolean rmvd = dataTree.removex(row);
- assert rmvd;
+ assert rmvd : row;
+
+ boolean rmvdVal = versionForRemovedValue(row.mvccCoordinatorVersion());
+
+ if (cleanup && !rmvdVal) {
+ if (cctx.queries().enabled())
+ cctx.queries().remove(key, row, null);
+
+ if (first)
+ clearPendingEntries(cctx, row);
+ }
rowStore.removeRow(row.link());
if (first) {
- if (!versionForRemovedValue(row.mvccCoordinatorVersion()))
+ if (!rmvdVal)
decrementSize(cctx.cacheId());
first = false;
@@ -1651,36 +1702,48 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
}
/**
+ * @param cctx Cache context.
* @param cleanupRows Rows to cleanup.
* @param findRmv {@code True} if need keep removed row entry.
- * @return Removed row entry if found.
+ * @return Removed row link of {@code 0} if not found.
* @throws IgniteCheckedException If failed.
*/
- @Nullable private CacheSearchRow cleanup(@Nullable List<CacheSearchRow> cleanupRows, boolean findRmv)
+ private long cleanup(GridCacheContext cctx, @Nullable List<MvccCleanupRow> cleanupRows, boolean findRmv)
throws IgniteCheckedException {
- CacheSearchRow rmvRow = null;
+ long rmvRowLink = 0;
if (cleanupRows != null) {
+ GridCacheQueryManager qryMgr = cctx.queries();
+
for (int i = 0; i < cleanupRows.size(); i++) {
- CacheSearchRow oldRow = cleanupRows.get(i);
+ MvccCleanupRow cleanupRow = cleanupRows.get(i);
+
+ assert cleanupRow.link() != 0 : cleanupRow;
- assert oldRow.link() != 0L : oldRow;
+ if (qryMgr.enabled() && !versionForRemovedValue(cleanupRow.mvccCoordinatorVersion())) {
+ CacheDataRow oldRow = dataTree.remove(cleanupRow);
- boolean rmvd = dataTree.removex(oldRow);
+ assert oldRow != null : cleanupRow;
- assert rmvd;
+ qryMgr.remove(oldRow.key(), oldRow, null);
+ }
+ else {
+ boolean rmvd = dataTree.removex(cleanupRow);
+
+ assert rmvd;
+ }
if (findRmv &&
- rmvRow == null &&
- versionForRemovedValue(oldRow.mvccCoordinatorVersion())) {
- rmvRow = oldRow;
+ rmvRowLink == 0 &&
+ versionForRemovedValue(cleanupRow.mvccCoordinatorVersion())) {
+ rmvRowLink = cleanupRow.link();
}
else
- rowStore.removeRow(oldRow.link());
+ rowStore.removeRow(cleanupRow.link());
}
}
- return rmvRow;
+ return rmvRowLink;
}
/** {@inheritDoc} */
@@ -1753,32 +1816,48 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
KeyCacheObject key = newRow.key();
- long expireTime = newRow.expireTime();
-
GridCacheQueryManager qryMgr = cctx.queries();
- int cacheId = grp.sharedGroup() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID;
-
if (qryMgr.enabled())
- qryMgr.store(newRow, oldRow);
+ qryMgr.store(newRow, null, oldRow);
+
+ updatePendingEntries(cctx, newRow, oldRow);
if (oldRow != null) {
assert oldRow.link() != 0 : oldRow;
- if (pendingEntries != null && oldRow.expireTime() != 0)
- pendingEntries.removex(new PendingRow(cacheId, oldRow.expireTime(), oldRow.link()));
-
if (newRow.link() != oldRow.link())
rowStore.removeRow(oldRow.link());
}
+ updateIgfsMetrics(cctx, key, (oldRow != null ? oldRow.value() : null), newRow.value());
+ }
+
+ /**
+ * @param cctx Cache context.
+ * @param newRow
+ * @param oldRow
+ * @throws IgniteCheckedException If failed.
+ */
+ private void updatePendingEntries(GridCacheContext cctx, CacheDataRow newRow, @Nullable CacheDataRow oldRow)
+ throws IgniteCheckedException
+ {
+ long expireTime = newRow.expireTime();
+
+ int cacheId = grp.sharedGroup() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID;
+
+ if (oldRow != null) {
+ assert oldRow.link() != 0 : oldRow;
+
+ if (pendingEntries != null && oldRow.expireTime() != 0)
+ pendingEntries.removex(new PendingRow(cacheId, oldRow.expireTime(), oldRow.link()));
+ }
+
if (pendingEntries != null && expireTime != 0) {
pendingEntries.putx(new PendingRow(cacheId, expireTime, newRow.link()));
hasPendingEntries = true;
}
-
- updateIgfsMetrics(cctx, key, (oldRow != null ? oldRow.value() : null), newRow.value());
}
/** {@inheritDoc} */
@@ -1792,7 +1871,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
GridCacheQueryManager qryMgr = cctx.queries();
- qryMgr.store(row, null);
+ qryMgr.store(row, null, null); // TODO IGNITE-3478.
}
}
@@ -1821,14 +1900,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
*/
private void finishRemove(GridCacheContext cctx, KeyCacheObject key, @Nullable CacheDataRow oldRow) throws IgniteCheckedException {
if (oldRow != null) {
- int cacheId = grp.sharedGroup() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID;
-
- assert oldRow.link() != 0 : oldRow;
- assert cacheId == CU.UNDEFINED_CACHE_ID || oldRow.cacheId() == cacheId :
- "Incorrect cache ID [expected=" + cacheId + ", actual=" + oldRow.cacheId() + "].";
-
- if (pendingEntries != null && oldRow.expireTime() != 0)
- pendingEntries.removex(new PendingRow(cacheId, oldRow.expireTime(), oldRow.link()));
+ clearPendingEntries(cctx, oldRow);
decrementSize(cctx.cacheId());
}
@@ -1836,7 +1908,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
GridCacheQueryManager qryMgr = cctx.queries();
if (qryMgr.enabled())
- qryMgr.remove(key, oldRow);
+ qryMgr.remove(key, oldRow, null);
if (oldRow != null)
rowStore.removeRow(oldRow.link());
@@ -1844,6 +1916,23 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
updateIgfsMetrics(cctx, key, (oldRow != null ? oldRow.value() : null), null);
}
+ /**
+ * @param cctx
+ * @param oldRow
+ * @throws IgniteCheckedException
+ */
+ private void clearPendingEntries(GridCacheContext cctx, CacheDataRow oldRow)
+ throws IgniteCheckedException {
+ int cacheId = grp.sharedGroup() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID;
+
+ assert oldRow.link() != 0 : oldRow;
+ assert cacheId == CU.UNDEFINED_CACHE_ID || oldRow.cacheId() == cacheId :
+ "Incorrect cache ID [expected=" + cacheId + ", actual=" + oldRow.cacheId() + "].";
+
+ if (pendingEntries != null && oldRow.expireTime() != 0)
+ pendingEntries.removex(new PendingRow(cacheId, oldRow.expireTime(), oldRow.link()));
+ }
+
/** {@inheritDoc} */
@Override public CacheDataRow find(GridCacheContext cctx, KeyCacheObject key) throws IgniteCheckedException {
key.valueBytes(cctx.cacheObjectContext());
@@ -1985,7 +2074,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
if (curKey != null && row.key().equals(curKey))
continue;
- if (CacheCoordinatorsProcessor.versionForRemovedValue(rowCrdVerMasked)) {
+ if (versionForRemovedValue(rowCrdVerMasked)) {
curKey = row.key();
continue;
http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
index fd3c2af..07e30d8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
@@ -75,10 +75,10 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYS
*/
public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
/** */
- public static final long COUNTER_NA = 0L;
+ public static final long MVCC_COUNTER_NA = 0L;
/** */
- public static final long START_VER = 1L;
+ public static final long MVCC_START_CNTR = 1L;
/** */
private static final boolean STAT_CNTRS = false;
@@ -99,7 +99,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
private volatile MvccCoordinator curCrd;
/** */
- private final AtomicLong mvccCntr = new AtomicLong(START_VER);
+ private final AtomicLong mvccCntr = new AtomicLong(MVCC_START_CNTR);
/** */
private final GridAtomicLong committedCntr = new GridAtomicLong(1L);
@@ -148,6 +148,18 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
}
/**
+ * @param crdVer Mvcc coordinator version.
+ * @param cntr Counter.
+ * @return Always {@code true}.
+ */
+ public static boolean assertMvccVersionValid(long crdVer, long cntr) {
+ assert unmaskCoordinatorVersion(crdVer) > 0;
+ assert cntr != MVCC_COUNTER_NA;
+
+ return true;
+ }
+
+ /**
* @param crdVer Coordinator version.
* @return Coordinator version with removed value flag.
*/
@@ -651,7 +663,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
private void processCoordinatorTxAckRequest(UUID nodeId, CoordinatorAckRequestTx msg) {
onTxDone(msg.txCounter());
- if (msg.queryCounter() != COUNTER_NA) {
+ if (msg.queryCounter() != MVCC_COUNTER_NA) {
if (msg.queryCoordinatorVersion() == 0)
onQueryDone(nodeId, msg.queryCounter());
else
@@ -824,7 +836,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
else
qryCnt.incrementAndGet();
- res.init(futId, crdVer, mvccCntr, COUNTER_NA);
+ res.init(futId, crdVer, mvccCntr, MVCC_COUNTER_NA);
return res;
}
@@ -909,7 +921,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
// }
// }
//
-// res.init(futId, crdVer, mvccCntr, COUNTER_NA);
+// res.init(futId, crdVer, mvccCntr, MVCC_COUNTER_NA);
//
// return res;
}
@@ -1197,7 +1209,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
* @param res Response.
*/
void onResponse(MvccCoordinatorVersionResponse res) {
- assert res.counter() != COUNTER_NA;
+ assert res.counter() != MVCC_COUNTER_NA;
if (lsnr != null)
lsnr.onMvccResponse(crd.nodeId(), res);
http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTx.java
index c0512f0..5ab3d3b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTx.java
@@ -60,7 +60,7 @@ public class CoordinatorAckRequestTx implements MvccCoordinatorMessage {
/** {@inheritDoc} */
long queryCounter() {
- return CacheCoordinatorsProcessor.COUNTER_NA;
+ return CacheCoordinatorsProcessor.MVCC_COUNTER_NA;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java
index 5c56f40..521e989 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java
@@ -26,9 +26,7 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.CU;
import org.jetbrains.annotations.Nullable;
/**
@@ -165,7 +163,7 @@ class PreviousCoordinatorQueries {
*/
void onQueryDone(UUID nodeId, long crdVer, long cntr) {
assert crdVer != 0;
- assert cntr != CacheCoordinatorsProcessor.COUNTER_NA;
+ assert cntr != CacheCoordinatorsProcessor.MVCC_COUNTER_NA;
synchronized (this) {
MvccCounter mvccCntr = new MvccCounter(crdVer, cntr);
http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java
index 1e3a229..29bb6bf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java
@@ -60,6 +60,7 @@ public class CacheDataRowAdapter implements CacheDataRow {
protected CacheObject val;
/** */
+ @GridToStringInclude
protected long expireTime = -1;
/** */
@@ -599,7 +600,10 @@ public class CacheDataRowAdapter implements CacheDataRow {
KEY_ONLY,
/** */
- NO_KEY
+ NO_KEY,
+
+ /** */
+ LINK_ONLY,
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
index b31a61f..1ebb1e9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
@@ -884,12 +884,13 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
/**
* @param upper Upper bound.
+ * @param c Filter closure.
* @param x Implementation specific argument, {@code null} always means that we need to return full detached data row.
* @return Cursor.
* @throws IgniteCheckedException If failed.
*/
- private GridCursor<T> findLowerUnbounded(L upper, Object x) throws IgniteCheckedException {
- ForwardCursor cursor = new ForwardCursor(null, upper, x);
+ private GridCursor<T> findLowerUnbounded(L upper, TreeRowClosure<L, T> c, Object x) throws IgniteCheckedException {
+ ForwardCursor cursor = new ForwardCursor(null, upper, c, x);
long firstPageId;
@@ -946,13 +947,25 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
* @throws IgniteCheckedException If failed.
*/
public final GridCursor<T> find(L lower, L upper, Object x) throws IgniteCheckedException {
+ return find(lower, upper, null, x);
+ }
+
+ /**
+ * @param lower Lower bound inclusive or {@code null} if unbounded.
+ * @param upper Upper bound inclusive or {@code null} if unbounded.
+ * @param c Filter closure.
+ * @param x Implementation specific argument, {@code null} always means that we need to return full detached data row.
+ * @return Cursor.
+ * @throws IgniteCheckedException If failed.
+ */
+ public final GridCursor<T> find(L lower, L upper, TreeRowClosure<L, T> c, Object x) throws IgniteCheckedException {
checkDestroyed();
try {
if (lower == null)
- return findLowerUnbounded(upper, x);
+ return findLowerUnbounded(upper, c, x);
- ForwardCursor cursor = new ForwardCursor(lower, upper, x);
+ ForwardCursor cursor = new ForwardCursor(lower, upper, c, x);
cursor.find();
@@ -4751,14 +4764,19 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
/** */
private int row = -1;
+ /** */
+ private final TreeRowClosure<L, T> c;
+
/**
* @param lowerBound Lower bound.
* @param upperBound Upper bound.
+ * @param c Filter closure.
* @param x Implementation specific argument, {@code null} always means that we need to return full detached data row.
*/
- ForwardCursor(L lowerBound, L upperBound, Object x) {
+ ForwardCursor(L lowerBound, L upperBound, TreeRowClosure<L, T> c, Object x) {
super(lowerBound, upperBound);
+ this.c = c;
this.x = x;
}
@@ -4782,15 +4800,21 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
if (rows == EMPTY)
rows = (T[])new Object[cnt];
+ int resCnt = 0;
+
for (int i = 0; i < cnt; i++) {
- T r = getRow(io, pageAddr, startIdx + i, x);
+ int itemIdx = startIdx + i;
- rows = GridArrays.set(rows, i, r);
+ if (c == null || c.apply(BPlusTree.this, io, pageAddr, itemIdx)) {
+ T r = getRow(io, pageAddr, itemIdx, x);
+
+ rows = GridArrays.set(rows, resCnt++, r);
+ }
}
- GridArrays.clearTail(rows, cnt);
+ GridArrays.clearTail(rows, resCnt);
- return true;
+ return resCnt > 0;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/IOVersions.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/IOVersions.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/IOVersions.java
index d74d344..9dcad9b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/IOVersions.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/IOVersions.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.processors.cache.persistence.tree.io;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
/**
* Registry for IO versions.
*/
@@ -99,4 +101,9 @@ public final class IOVersions<V extends PageIO> {
return res;
}
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(IOVersions.class, this);
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java
index 2de0b8c..0a42129 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java
@@ -88,6 +88,12 @@ public abstract class PageIO {
/** */
private static IOVersions<? extends BPlusLeafIO<?>> h2LeafIOs;
+ /** */
+ private static IOVersions<? extends BPlusInnerIO<?>> h2MvccInnerIOs;
+
+ /** */
+ private static IOVersions<? extends BPlusLeafIO<?>> h2MvccLeafIOs;
+
/** Maximum payload size. */
public static final short MAX_PAYLOAD_SIZE = 2048;
@@ -98,6 +104,12 @@ public abstract class PageIO {
private static List<IOVersions<? extends BPlusLeafIO<?>>> h2ExtraLeafIOs = new ArrayList<>(MAX_PAYLOAD_SIZE);
/** */
+ private static List<IOVersions<? extends BPlusInnerIO<?>>> h2ExtraMvccInnerIOs = new ArrayList<>(MAX_PAYLOAD_SIZE);
+
+ /** */
+ private static List<IOVersions<? extends BPlusLeafIO<?>>> h2ExtraMvccLeafIOs = new ArrayList<>(MAX_PAYLOAD_SIZE);
+
+ /** */
public static final int TYPE_OFF = 0;
/** */
@@ -184,24 +196,42 @@ public abstract class PageIO {
public static final short T_PART_CNTRS = 20;
/** Index for payload == 1. */
- public static final short T_H2_EX_REF_LEAF_START = 10000;
+ public static final short T_H2_EX_REF_LEAF_START = 10_000;
/** */
public static final short T_H2_EX_REF_LEAF_END = T_H2_EX_REF_LEAF_START + MAX_PAYLOAD_SIZE - 1;
/** */
- public static final short T_H2_EX_REF_INNER_START = 20000;
+ public static final short T_H2_EX_REF_INNER_START = 20_000;
/** */
public static final short T_H2_EX_REF_INNER_END = T_H2_EX_REF_INNER_START + MAX_PAYLOAD_SIZE - 1;
/** */
+ public static final short T_H2_EX_REF_MVCC_LEAF_START = 23_000;
+
+ /** */
+ public static final short T_H2_EX_REF_MVCC_LEAF_END = T_H2_EX_REF_MVCC_LEAF_START + MAX_PAYLOAD_SIZE - 1;
+
+ /** */
+ public static final short T_H2_EX_REF_MVCC_INNER_START = 26_000;
+
+ /** */
+ public static final short T_H2_EX_REF_MVCC_INNER_END = T_H2_EX_REF_MVCC_INNER_START + MAX_PAYLOAD_SIZE - 1;
+
+ /** */
public static final short T_DATA_REF_MVCC_INNER = 21;
/** */
public static final short T_DATA_REF_MVCC_LEAF = 22;
/** */
+ public static final short T_H2_MVCC_REF_LEAF = 23;
+
+ /** */
+ public static final short T_H2_MVCC_REF_INNER = 24;
+
+ /** */
private final int ver;
/** */
@@ -334,13 +364,19 @@ public abstract class PageIO {
*
* @param innerIOs Inner IO versions.
* @param leafIOs Leaf IO versions.
+ * @param mvccInnerIOs Inner IO versions with mvcc enabled.
+ * @param mvccLeafIOs Leaf IO versions with mvcc enabled.
*/
public static void registerH2(
IOVersions<? extends BPlusInnerIO<?>> innerIOs,
- IOVersions<? extends BPlusLeafIO<?>> leafIOs
+ IOVersions<? extends BPlusLeafIO<?>> leafIOs,
+ IOVersions<? extends BPlusInnerIO<?>> mvccInnerIOs,
+ IOVersions<? extends BPlusLeafIO<?>> mvccLeafIOs
) {
h2InnerIOs = innerIOs;
h2LeafIOs = leafIOs;
+ h2MvccInnerIOs = mvccInnerIOs;
+ h2MvccLeafIOs = mvccLeafIOs;
}
/**
@@ -348,8 +384,10 @@ public abstract class PageIO {
*
* @param innerExtIOs Extra versions.
*/
- public static void registerH2ExtraInner(IOVersions<? extends BPlusInnerIO<?>> innerExtIOs) {
- h2ExtraInnerIOs.add(innerExtIOs);
+ public static void registerH2ExtraInner(IOVersions<? extends BPlusInnerIO<?>> innerExtIOs, boolean mvcc) {
+ List<IOVersions<? extends BPlusInnerIO<?>>> ios = mvcc ? h2ExtraMvccInnerIOs : h2ExtraInnerIOs;
+
+ ios.add(innerExtIOs);
}
/**
@@ -357,24 +395,30 @@ public abstract class PageIO {
*
* @param leafExtIOs Extra versions.
*/
- public static void registerH2ExtraLeaf(IOVersions<? extends BPlusLeafIO<?>> leafExtIOs) {
- h2ExtraLeafIOs.add(leafExtIOs);
+ public static void registerH2ExtraLeaf(IOVersions<? extends BPlusLeafIO<?>> leafExtIOs, boolean mvcc) {
+ List<IOVersions<? extends BPlusLeafIO<?>>> ios = mvcc ? h2ExtraMvccLeafIOs : h2ExtraLeafIOs;
+
+ ios.add(leafExtIOs);
}
/**
* @param idx Index.
* @return IOVersions for given idx.
*/
- public static IOVersions<? extends BPlusInnerIO<?>> getInnerVersions(int idx) {
- return h2ExtraInnerIOs.get(idx);
+ public static IOVersions<? extends BPlusInnerIO<?>> getInnerVersions(int idx, boolean mvcc) {
+ List<IOVersions<? extends BPlusInnerIO<?>>> ios = mvcc ? h2ExtraMvccInnerIOs : h2ExtraInnerIOs;
+
+ return ios.get(idx);
}
/**
* @param idx Index.
* @return IOVersions for given idx.
*/
- public static IOVersions<? extends BPlusLeafIO<?>> getLeafVersions(int idx) {
- return h2ExtraLeafIOs.get(idx);
+ public static IOVersions<? extends BPlusLeafIO<?>> getLeafVersions(int idx, boolean mvcc) {
+ List<IOVersions<? extends BPlusLeafIO<?>>> ios = mvcc ? h2ExtraMvccLeafIOs : h2ExtraLeafIOs;
+
+ return ios.get(idx);
}
/**
@@ -493,13 +537,18 @@ public abstract class PageIO {
*/
@SuppressWarnings("unchecked")
public static <Q extends BPlusIO<?>> Q getBPlusIO(int type, int ver) throws IgniteCheckedException {
-
if (type >= T_H2_EX_REF_LEAF_START && type <= T_H2_EX_REF_LEAF_END)
return (Q)h2ExtraLeafIOs.get(type - T_H2_EX_REF_LEAF_START).forVersion(ver);
if (type >= T_H2_EX_REF_INNER_START && type <= T_H2_EX_REF_INNER_END)
return (Q)h2ExtraInnerIOs.get(type - T_H2_EX_REF_INNER_START).forVersion(ver);
+ if (type >= T_H2_EX_REF_MVCC_LEAF_START && type <= T_H2_EX_REF_MVCC_LEAF_END)
+ return (Q)h2ExtraMvccLeafIOs.get(type - T_H2_EX_REF_MVCC_LEAF_START).forVersion(ver);
+
+ if (type >= T_H2_EX_REF_MVCC_INNER_START && type <= T_H2_EX_REF_MVCC_INNER_END)
+ return (Q)h2ExtraMvccInnerIOs.get(type - T_H2_EX_REF_MVCC_INNER_START).forVersion(ver);
+
switch (type) {
case T_H2_REF_INNER:
if (h2InnerIOs == null)
@@ -513,6 +562,18 @@ public abstract class PageIO {
return (Q)h2LeafIOs.forVersion(ver);
+ case T_H2_MVCC_REF_INNER:
+ if (h2MvccInnerIOs == null)
+ break;
+
+ return (Q)h2MvccInnerIOs.forVersion(ver);
+
+ case T_H2_MVCC_REF_LEAF:
+ if (h2MvccLeafIOs == null)
+ break;
+
+ return (Q)h2MvccLeafIOs.forVersion(ver);
+
case T_DATA_REF_INNER:
return (Q)DataInnerIO.VERSIONS.forVersion(ver);
http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 59b7613..fb5728a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -382,10 +382,11 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
/**
* @param newRow New row.
+ * @param mvccVer Mvcc version for update.
* @param prevRow Previous row.
* @throws IgniteCheckedException In case of error.
*/
- public void store(CacheDataRow newRow, @Nullable CacheDataRow prevRow)
+ public void store(CacheDataRow newRow, @Nullable MvccCoordinatorVersion mvccVer, @Nullable CacheDataRow prevRow)
throws IgniteCheckedException {
assert enabled();
assert newRow != null && newRow.value() != null && newRow.link() != 0 : newRow;
@@ -405,7 +406,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
}
if (qryProcEnabled)
- qryProc.store(cctx, newRow, prevRow);
+ qryProc.store(cctx, newRow, mvccVer, prevRow);
}
finally {
invalidateResultCache();
@@ -417,9 +418,11 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
/**
* @param key Key.
* @param prevRow Previous row.
+ * @param newVer Mvcc version for remove operation.
* @throws IgniteCheckedException Thrown in case of any errors.
*/
- public void remove(KeyCacheObject key, @Nullable CacheDataRow prevRow) throws IgniteCheckedException {
+ public void remove(KeyCacheObject key, @Nullable CacheDataRow prevRow, @Nullable MvccCoordinatorVersion newVer)
+ throws IgniteCheckedException {
if (!QueryUtils.isEnabled(cctx.config()))
return; // No-op.
@@ -435,7 +438,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
// val may be null if we have no previous value. We should not call processor in this case.
if (qryProcEnabled && prevRow != null)
- qryProc.remove(cctx, prevRow);
+ qryProc.remove(cctx, prevRow, newVer);
}
finally {
invalidateResultCache();
http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java
index 31aa2ca..c36d5cb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java
@@ -26,7 +26,7 @@ import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusInne
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.lang.IgniteInClosure;
-import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.COUNTER_NA;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.MVCC_COUNTER_NA;
import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.unmaskCoordinatorVersion;
/**
@@ -62,7 +62,7 @@ public abstract class AbstractDataInnerIO extends BPlusInnerIO<CacheSearchRow> i
if (storeMvccVersion()) {
assert unmaskCoordinatorVersion(row.mvccCoordinatorVersion()) > 0 : row;
- assert row.mvccCounter() != COUNTER_NA : row;
+ assert row.mvccCounter() != MVCC_COUNTER_NA : row;
PageUtils.putLong(pageAddr, off, row.mvccCoordinatorVersion());
off += 8;
@@ -82,7 +82,7 @@ public abstract class AbstractDataInnerIO extends BPlusInnerIO<CacheSearchRow> i
long mvccCntr = getMvccCounter(pageAddr, idx);
assert unmaskCoordinatorVersion(mvccTopVer) > 0 : mvccTopVer;
- assert mvccCntr != COUNTER_NA;
+ assert mvccCntr != MVCC_COUNTER_NA;
return ((CacheDataTree)tree).rowStore().mvccRow(cacheId,
hash,
@@ -128,7 +128,7 @@ public abstract class AbstractDataInnerIO extends BPlusInnerIO<CacheSearchRow> i
long mvccCntr = rowIo.getMvccCounter(srcPageAddr, srcIdx);
assert unmaskCoordinatorVersion(mvccTopVer) > 0 : mvccTopVer;
- assert mvccCntr != COUNTER_NA;
+ assert mvccCntr != MVCC_COUNTER_NA;
PageUtils.putLong(dstPageAddr, off, mvccTopVer);
off += 8;
http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
index 47d8a6f..d60aef2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
@@ -26,7 +26,7 @@ import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusLeaf
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.lang.IgniteInClosure;
-import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.COUNTER_NA;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.MVCC_COUNTER_NA;
import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.unmaskCoordinatorVersion;
/**
@@ -64,7 +64,7 @@ public abstract class AbstractDataLeafIO extends BPlusLeafIO<CacheSearchRow> imp
long mvccUpdateCntr = row.mvccCounter();
assert unmaskCoordinatorVersion(mvccCrdVer) > 0 : mvccCrdVer;
- assert mvccUpdateCntr != COUNTER_NA;
+ assert mvccUpdateCntr != MVCC_COUNTER_NA;
PageUtils.putLong(pageAddr, off, mvccCrdVer);
off += 8;
@@ -100,7 +100,7 @@ public abstract class AbstractDataLeafIO extends BPlusLeafIO<CacheSearchRow> imp
long mvccUpdateCntr = ((RowLinkIO)srcIo).getMvccCounter(srcPageAddr, srcIdx);
assert unmaskCoordinatorVersion(mvccUpdateTopVer) > 0 : mvccUpdateCntr;
- assert mvccUpdateCntr != COUNTER_NA;
+ assert mvccUpdateCntr != MVCC_COUNTER_NA;
PageUtils.putLong(dstPageAddr, off, mvccUpdateTopVer);
off += 8;
http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataRowStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataRowStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataRowStore.java
index 85624d5..5537794 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataRowStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataRowStore.java
@@ -73,9 +73,9 @@ public class CacheDataRowStore extends RowStore {
* @return Search row.
*/
MvccDataRow mvccRow(int cacheId, int hash, long link, CacheDataRowAdapter.RowData rowData, long crdVer, long mvccCntr) {
- if (rowData != CacheDataRowAdapter.RowData.KEY_ONLY && versionForRemovedValue(crdVer)) {
- if (rowData == CacheDataRowAdapter.RowData.NO_KEY)
- return MvccDataRow.removedRowNoKey(partId, cacheId, crdVer, mvccCntr);
+ if (versionForRemovedValue(crdVer)) {
+ if (rowData == CacheDataRowAdapter.RowData.NO_KEY || rowData == CacheDataRowAdapter.RowData.LINK_ONLY)
+ return MvccDataRow.removedRowNoKey(link, partId, cacheId, crdVer, mvccCntr);
else
rowData = CacheDataRowAdapter.RowData.KEY_ONLY;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
index a699cd3..9f85640 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
@@ -169,7 +169,7 @@ public class CacheDataTree extends BPlusTree<CacheSearchRow, CacheDataRow> {
long mvccCntr = io.getMvccCounter(pageAddr, idx);
- assert row.mvccCounter() != CacheCoordinatorsProcessor.COUNTER_NA;
+ assert row.mvccCounter() != CacheCoordinatorsProcessor.MVCC_COUNTER_NA;
cmp = Long.compare(row.mvccCounter(), mvccCntr);
http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataInnerIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataInnerIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataInnerIO.java
index 3d02b27..36ffd49 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataInnerIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataInnerIO.java
@@ -59,6 +59,6 @@ public final class CacheIdAwareDataInnerIO extends AbstractDataInnerIO {
/** {@inheritDoc} */
@Override public long getMvccCounter(long pageAddr, int idx) {
- return CacheCoordinatorsProcessor.COUNTER_NA;
+ return CacheCoordinatorsProcessor.MVCC_COUNTER_NA;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataLeafIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataLeafIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataLeafIO.java
index 58ae9ff..ae6fc0d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataLeafIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataLeafIO.java
@@ -59,6 +59,6 @@ public final class CacheIdAwareDataLeafIO extends AbstractDataLeafIO {
/** {@inheritDoc} */
@Override public long getMvccCounter(long pageAddr, int idx) {
- return CacheCoordinatorsProcessor.COUNTER_NA;
+ return CacheCoordinatorsProcessor.MVCC_COUNTER_NA;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataInnerIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataInnerIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataInnerIO.java
index 19a5c47..98a5450 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataInnerIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataInnerIO.java
@@ -59,6 +59,6 @@ public final class DataInnerIO extends AbstractDataInnerIO {
/** {@inheritDoc} */
@Override public long getMvccCounter(long pageAddr, int idx) {
- return CacheCoordinatorsProcessor.COUNTER_NA;
+ return CacheCoordinatorsProcessor.MVCC_COUNTER_NA;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataLeafIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataLeafIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataLeafIO.java
index ab10b96..b644e6a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataLeafIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataLeafIO.java
@@ -59,6 +59,6 @@ public final class DataLeafIO extends AbstractDataLeafIO {
/** {@inheritDoc} */
@Override public long getMvccCounter(long pageAddr, int idx) {
- return CacheCoordinatorsProcessor.COUNTER_NA;
+ return CacheCoordinatorsProcessor.MVCC_COUNTER_NA;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataRow.java
index d1e90d4..8853d6d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataRow.java
@@ -50,15 +50,13 @@ public class DataRow extends CacheDataRowAdapter {
this.part = part;
try {
- // We can not init data row lazily because underlying buffer can be concurrently cleared.
- initFromLink(grp, rowData);
+ // We can not init data row lazily outside of entry lock because underlying buffer can be concurrently cleared.
+ if (rowData != RowData.LINK_ONLY)
+ initFromLink(grp, rowData);
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
-
- if (key != null)
- key.partition(part);
}
/**
@@ -84,11 +82,18 @@ public class DataRow extends CacheDataRowAdapter {
/**
*
*/
- protected DataRow() {
+ DataRow() {
super(0);
}
/** {@inheritDoc} */
+ @Override public void key(KeyCacheObject key) {
+ super.key(key);
+
+ hash = key.hashCode();
+ }
+
+ /** {@inheritDoc} */
@Override public int partition() {
return part;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccCleanupRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccCleanupRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccCleanupRow.java
new file mode 100644
index 0000000..92caf70
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccCleanupRow.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.tree;
+
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+
+/**
+ * Row contains only link.
+ */
+public class MvccCleanupRow extends MvccSearchRow {
+ /** */
+ private final long link;
+
+ /**
+ * @param cacheId Cache ID.
+ * @param key Key.
+ * @param crdVer Mvcc coordinator version.
+ * @param mvccCntr Mvcc counter.
+ * @param link Link.
+ */
+ MvccCleanupRow(int cacheId, KeyCacheObject key, long crdVer, long mvccCntr, long link) {
+ super(cacheId, key, crdVer, mvccCntr);
+
+ assert link != 0L;
+
+ this.link = link;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long link() {
+ return link;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java
index 916ea93..a2cf079 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java
@@ -18,10 +18,9 @@
package org.apache.ignite.internal.processors.cache.tree;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
-import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
import org.apache.ignite.internal.util.typedef.internal.S;
-import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.unmaskCoordinatorVersion;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.assertMvccVersionValid;
/**
*
@@ -34,6 +33,13 @@ public class MvccDataRow extends DataRow {
private long mvccCntr;
/**
+ *
+ */
+ private MvccDataRow() {
+ // No-op.
+ }
+
+ /**
* @param grp Context.
* @param hash Key hash.
* @param link Link.
@@ -42,24 +48,17 @@ public class MvccDataRow extends DataRow {
* @param crdVer Mvcc coordinator version.
* @param mvccCntr Mvcc counter.
*/
- MvccDataRow(CacheGroupContext grp, int hash, long link, int part, RowData rowData, long crdVer, long mvccCntr) {
+ public MvccDataRow(CacheGroupContext grp, int hash, long link, int part, RowData rowData, long crdVer, long mvccCntr) {
super(grp, hash, link, part, rowData);
- assert unmaskCoordinatorVersion(crdVer) > 0 : crdVer;
- assert mvccCntr != CacheCoordinatorsProcessor.COUNTER_NA;
+ assertMvccVersionValid(crdVer, mvccCntr);
this.crdVer = crdVer;
this.mvccCntr = mvccCntr;
}
/**
- *
- */
- private MvccDataRow() {
- // No-op.
- }
-
- /**
+ * @param link Link.
* @param part Partition.
* @param cacheId Cache ID.
* @param crdVer Mvcc coordinator version.
@@ -67,12 +66,14 @@ public class MvccDataRow extends DataRow {
* @return Row.
*/
static MvccDataRow removedRowNoKey(
+ long link,
int part,
int cacheId,
long crdVer,
long mvccCntr) {
MvccDataRow row = new MvccDataRow();
+ row.link = link;
row.cacheId = cacheId;
row.part = part;
row.crdVer = crdVer;
http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java
index fb2a6cf..0b37a94 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java
@@ -22,7 +22,6 @@ import java.util.List;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter;
@@ -34,6 +33,7 @@ import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.jetbrains.annotations.Nullable;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.assertMvccVersionValid;
import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.unmaskCoordinatorVersion;
import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.versionForRemovedValue;
@@ -51,7 +51,7 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C
private GridLongList activeTxs;
/** */
- private List<CacheSearchRow> cleanupRows;
+ private List<MvccCleanupRow> cleanupRows;
/** */
private final MvccCoordinatorVersion mvccVer;
@@ -66,7 +66,9 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C
* @param key Key.
* @param val Value.
* @param ver Version.
+ * @param expireTime Expire time.
* @param mvccVer Mvcc version.
+ * @param needOld {@code True} if need previous value.
* @param part Partition.
* @param cacheId Cache ID.
*/
@@ -109,7 +111,7 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C
/**
* @return Rows which are safe to cleanup.
*/
- public List<CacheSearchRow> cleanupRows() {
+ public List<MvccCleanupRow> cleanupRows() {
return cleanupRows;
}
@@ -175,8 +177,6 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C
if (needOld)
oldRow = ((CacheDataTree)tree).getRow(io, pageAddr, idx, CacheDataRowAdapter.RowData.NO_KEY);
}
- res = versionForRemovedValue(rowCrdVerMasked) ?
- UpdateResult.PREV_NULL : UpdateResult.PREV_NOT_NULL;
}
}
@@ -199,26 +199,25 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C
int cmp;
+ long rowCntr = rowIo.getMvccCounter(pageAddr, idx);
+
if (crdVer == rowCrdVer)
- cmp = Long.compare(mvccVer.cleanupVersion(), rowIo.getMvccCounter(pageAddr, idx));
+ cmp = Long.compare(mvccVer.cleanupVersion(), rowCntr);
else
cmp = 1;
if (cmp >= 0) {
// Do not cleanup oldest version.
if (canCleanup) {
- CacheSearchRow row = io.getLookupRow(tree, pageAddr, idx);
-
- assert row.link() != 0 && row.mvccCounter() != CacheCoordinatorsProcessor.COUNTER_NA : row;
+ assert assertMvccVersionValid(rowCrdVer, rowCntr);
// Should not be possible to cleanup active tx.
- assert rowCrdVer != crdVer
- || !mvccVer.activeTransactions().contains(row.mvccCounter());
+ assert rowCrdVer != crdVer || !mvccVer.activeTransactions().contains(rowCntr);
if (cleanupRows == null)
cleanupRows = new ArrayList<>();
- cleanupRows.add(row);
+ cleanupRows.add(new MvccCleanupRow(cacheId, key, rowCrdVerMasked, rowCntr, rowIo.getLink(pageAddr, idx)));
}
else
canCleanup = true;
http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/SearchRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/SearchRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/SearchRow.java
index 5bdc495..5fd7e8f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/SearchRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/SearchRow.java
@@ -83,7 +83,7 @@ public class SearchRow implements CacheSearchRow {
/** {@inheritDoc} */
@Override public long mvccCounter() {
- return CacheCoordinatorsProcessor.COUNTER_NA;
+ return CacheCoordinatorsProcessor.MVCC_COUNTER_NA;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index e6300a9..dab2ec0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -134,7 +134,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
/** Version which is less then any version generated on coordinator. */
private static final MvccCoordinatorVersion ISOLATED_STREAMER_MVCC_VER =
- new MvccCoordinatorVersionWithoutTxs(1L, CacheCoordinatorsProcessor.START_VER, 0L);
+ new MvccCoordinatorVersionWithoutTxs(1L, CacheCoordinatorsProcessor.MVCC_START_CNTR, 0L);
/** Cache receiver. */
private StreamReceiver<K, V> rcvr = ISOLATED_UPDATER;
http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
index b0a3831..5bd4bc8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
@@ -31,6 +31,7 @@ import org.apache.ignite.cache.query.SqlQuery;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitor;
import org.apache.ignite.internal.util.GridSpinBusyLock;
@@ -217,10 +218,13 @@ public interface GridQueryIndexing {
* @param cctx Cache context.
* @param type Type descriptor.
* @param row New row.
+ * @param newVer Version of new mvcc value inserted for the same key.
* @throws IgniteCheckedException If failed.
*/
- public void store(GridCacheContext cctx, GridQueryTypeDescriptor type, CacheDataRow row)
- throws IgniteCheckedException;
+ public void store(GridCacheContext cctx,
+ GridQueryTypeDescriptor type,
+ CacheDataRow row,
+ @Nullable MvccCoordinatorVersion newVer) throws IgniteCheckedException;
/**
* Removes index entry by key.
http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 4886b1b..3b3dec0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -67,6 +67,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.StoredCacheData;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.query.CacheQueryFuture;
import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
@@ -1700,14 +1701,19 @@ public class GridQueryProcessor extends GridProcessorAdapter {
/**
* @param cctx Cache context.
* @param newRow New row.
+ * @param mvccVer Mvcc version for update.
* @param prevRow Previous row.
* @throws IgniteCheckedException In case of error.
*/
@SuppressWarnings({"unchecked", "ConstantConditions"})
- public void store(GridCacheContext cctx, CacheDataRow newRow, @Nullable CacheDataRow prevRow)
- throws IgniteCheckedException {
+ public void store(GridCacheContext cctx,
+ CacheDataRow newRow,
+ @Nullable MvccCoordinatorVersion mvccVer,
+ @Nullable CacheDataRow prevRow) throws IgniteCheckedException
+ {
assert cctx != null;
assert newRow != null;
+ assert !cctx.mvccEnabled() || mvccVer != null;
KeyCacheObject key = newRow.key();
@@ -1734,14 +1740,26 @@ public class GridQueryProcessor extends GridProcessorAdapter {
prevRow.value(),
false);
- if (prevValDesc != null && prevValDesc != desc)
+ if (prevValDesc != null && prevValDesc != desc) {
idx.remove(cctx, prevValDesc, prevRow);
+
+ prevRow = null;
+ }
}
if (desc == null)
return;
- idx.store(cctx, desc, newRow);
+ if (cctx.mvccEnabled()) {
+ // Add new mvcc value.
+ idx.store(cctx, desc, newRow, null);
+
+ // Set info about more recent version for previous record.
+ if (prevRow != null)
+ idx.store(cctx, desc, prevRow, mvccVer);
+ }
+ else
+ idx.store(cctx, desc, newRow, null);
}
finally {
busyLock.leaveBusy();
@@ -2304,12 +2322,14 @@ public class GridQueryProcessor extends GridProcessorAdapter {
/**
* @param cctx Cache context.
- * @param val Row.
+ * @param val Value removed from cache.
+ * @param newVer Mvcc version for remove operation.
* @throws IgniteCheckedException Thrown in case of any errors.
*/
- public void remove(GridCacheContext cctx, CacheDataRow val)
+ public void remove(GridCacheContext cctx, CacheDataRow val, @Nullable MvccCoordinatorVersion newVer)
throws IgniteCheckedException {
assert val != null;
+ assert cctx.mvccEnabled() || newVer == null;
if (log.isDebugEnabled())
log.debug("Remove [cacheName=" + cctx.name() + ", key=" + val.key()+ ", val=" + val.value() + "]");
@@ -2330,7 +2350,16 @@ public class GridQueryProcessor extends GridProcessorAdapter {
if (desc == null)
return;
- idx.remove(cctx, desc, val);
+ if (cctx.mvccEnabled()) {
+ if (newVer != null) {
+ // Set info about more recent version for previous record.
+ idx.store(cctx, desc, val, newVer);
+ }
+ else
+ idx.remove(cctx, desc, val);
+ }
+ else
+ idx.remove(cctx, desc, val);
}
finally {
busyLock.leaveBusy();
http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java
index b0b758a..d77fb81 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java
@@ -43,6 +43,7 @@ import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.query.GridQueryCancel;
import org.apache.ignite.internal.processors.query.GridQueryIndexing;
@@ -310,7 +311,8 @@ public class IgniteClientCacheInitializationFailTest extends GridCommonAbstractT
}
/** {@inheritDoc} */
- @Override public void store(GridCacheContext cctx, GridQueryTypeDescriptor type, CacheDataRow val) {
+ @Override public void store(GridCacheContext cctx, GridQueryTypeDescriptor type, CacheDataRow row,
+ @Nullable MvccCoordinatorVersion newVer) throws IgniteCheckedException {
// No-op.
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java
index 999144f..1949cd2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.cache.mvcc;
+import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -160,6 +161,71 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest {
}
/**
+ * @param cfgC Optional closure applied to cache configuration.
+ * @throws Exception If failed.
+ */
+ final void cacheRecreate(@Nullable IgniteInClosure<CacheConfiguration> cfgC) throws Exception {
+ Ignite srv0 = startGrid(0);
+
+ final int PARTS = 64;
+
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, FULL_SYNC, 0, PARTS);
+
+ if (cfgC != null)
+ cfgC.apply(ccfg);
+
+ IgniteCache<Integer, MvccTestAccount> cache = (IgniteCache)srv0.createCache(ccfg);
+
+ for (int k = 0; k < PARTS * 2; k++) {
+ assertNull(cache.get(k));
+
+ int vals = k % 3 + 1;
+
+ for (int v = 0; v < vals; v++)
+ cache.put(k, new MvccTestAccount(v, 1));
+
+ assertEquals(vals - 1, cache.get(k).val);
+ }
+
+ srv0.destroyCache(cache.getName());
+
+ ccfg = cacheConfiguration(PARTITIONED, FULL_SYNC, 0, PARTS);
+
+ if (cfgC != null)
+ cfgC.apply(ccfg);
+
+ cache = (IgniteCache)srv0.createCache(ccfg);
+
+ for (int k = 0; k < PARTS * 2; k++) {
+ assertNull(cache.get(k));
+
+ int vals = k % 3 + 2;
+
+ for (int v = 0; v < vals; v++)
+ cache.put(k, new MvccTestAccount(v + 100, 1));
+
+ assertEquals(vals - 1 + 100, cache.get(k).val);
+ }
+
+ srv0.destroyCache(cache.getName());
+
+ ccfg = cacheConfiguration(PARTITIONED, FULL_SYNC, 0, PARTS);
+
+ IgniteCache<Long, Long> cache0 = (IgniteCache)srv0.createCache(ccfg);
+
+ for (long k = 0; k < PARTS * 2; k++) {
+ assertNull(cache0.get(k));
+
+ int vals = (int)(k % 3 + 2);
+
+ for (long v = 0; v < vals; v++)
+ cache0.put(k, v);
+
+ assertEquals((long)(vals - 1), (Object)cache0.get(k));
+ }
+ }
+
+ /**
* @param srvs Number of server nodes.
* @param clients Number of client nodes.
* @param cacheBackups Number of cache backups.
@@ -332,13 +398,15 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest {
Map<Integer, Integer> lastUpdateCntrs = new HashMap<>();
+ SqlFieldsQuery sumQry = new SqlFieldsQuery("select sum(val) from MvccTestAccount");
+
while (!stop.get()) {
while (keys.size() < ACCOUNTS)
keys.add(rnd.nextInt(ACCOUNTS));
TestCache<Integer, MvccTestAccount> cache = randomCache(caches, rnd);
- Map<Integer, MvccTestAccount> accounts;
+ Map<Integer, MvccTestAccount> accounts = null;
try {
switch (readMode) {
@@ -378,7 +446,7 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest {
for (List<?> row : cache.cache.query(qry)) {
Integer id = (Integer)row.get(0);
- Integer val = (Integer)row.get(0);
+ Integer val = (Integer)row.get(1);
MvccTestAccount old = accounts.put(id, new MvccTestAccount(val, 1));
@@ -389,6 +457,18 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest {
break;
}
+ case SQL_SUM: {
+ List<List<?>> res = cache.cache.query(sumQry).getAll();
+
+ assertEquals(1, res.size());
+
+ BigDecimal sum = (BigDecimal)res.get(0).get(0);
+
+ assertEquals(ACCOUNT_START_VAL * ACCOUNTS, sum.intValue());
+
+ break;
+ }
+
default: {
fail();
@@ -400,29 +480,31 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest {
cache.readUnlock();
}
- if (!withRmvs)
- assertEquals(ACCOUNTS, accounts.size());
+ if (accounts != null) {
+ if (!withRmvs)
+ assertEquals(ACCOUNTS, accounts.size());
- int sum = 0;
+ int sum = 0;
- for (int i = 0; i < ACCOUNTS; i++) {
- MvccTestAccount account = accounts.get(i);
+ for (int i = 0; i < ACCOUNTS; i++) {
+ MvccTestAccount account = accounts.get(i);
- if (account != null) {
- sum += account.val;
+ if (account != null) {
+ sum += account.val;
- Integer cntr = lastUpdateCntrs.get(i);
+ Integer cntr = lastUpdateCntrs.get(i);
- if (cntr != null)
- assertTrue(cntr <= account.updateCnt);
+ if (cntr != null)
+ assertTrue(cntr <= account.updateCnt);
- lastUpdateCntrs.put(i, cntr);
+ lastUpdateCntrs.put(i, cntr);
+ }
+ else
+ assertTrue(withRmvs);
}
- else
- assertTrue(withRmvs);
- }
- assertEquals(ACCOUNTS * ACCOUNT_START_VAL, sum);
+ assertEquals(ACCOUNTS * ACCOUNT_START_VAL, sum);
+ }
}
if (idx == 0) {
@@ -713,7 +795,7 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest {
* @param node Node.
* @throws Exception If failed.
*/
- final void checkActiveQueriesCleanup(Ignite node) throws Exception {
+ protected final void checkActiveQueriesCleanup(Ignite node) throws Exception {
final CacheCoordinatorsProcessor crd = ((IgniteKernal)node).context().cache().context().coordinators();
assertTrue("Active queries not cleared: " + node.name(), GridTestUtils.waitForCondition(
@@ -827,7 +909,10 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest {
SCAN,
/** */
- SQL_ALL
+ SQL_ALL,
+
+ /** */
+ SQL_SUM
}
/**
[2/4] ignite git commit: ignite-3478 Mvcc support for sql indexes
Posted by sb...@apache.org.
http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2LeafIO.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2LeafIO.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2LeafIO.java
index 85dcf50..8954de0 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2LeafIO.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2LeafIO.java
@@ -17,20 +17,12 @@
package org.apache.ignite.internal.processors.query.h2.database.io;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.pagemem.PageUtils;
-import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
-import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
-import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusLeafIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.IOVersions;
-import org.apache.ignite.internal.processors.query.h2.database.H2Tree;
-import org.apache.ignite.internal.processors.query.h2.opt.GridH2Row;
-import org.h2.result.SearchRow;
/**
* Leaf page for H2 row references.
*/
-public class H2LeafIO extends BPlusLeafIO<SearchRow> implements H2RowLinkIO {
+public class H2LeafIO extends AbstractH2LeafIO {
/** */
public static final IOVersions<H2LeafIO> VERSIONS = new IOVersions<>(
new H2LeafIO(1)
@@ -39,36 +31,7 @@ public class H2LeafIO extends BPlusLeafIO<SearchRow> implements H2RowLinkIO {
/**
* @param ver Page format version.
*/
- protected H2LeafIO(int ver) {
+ private H2LeafIO(int ver) {
super(T_H2_REF_LEAF, ver, 8);
}
-
- /** {@inheritDoc} */
- @Override public void storeByOffset(long pageAddr, int off, SearchRow row) {
- GridH2Row row0 = (GridH2Row)row;
-
- assert row0.link() != 0;
-
- PageUtils.putLong(pageAddr, off, row0.link());
- }
-
- /** {@inheritDoc} */
- @Override public void store(long dstPageAddr, int dstIdx, BPlusIO<SearchRow> srcIo, long srcPageAddr, int srcIdx) {
- assert srcIo == this;
-
- PageUtils.putLong(dstPageAddr, offset(dstIdx), getLink(srcPageAddr, srcIdx));
- }
-
- /** {@inheritDoc} */
- @Override public SearchRow getLookupRow(BPlusTree<SearchRow,?> tree, long pageAddr, int idx)
- throws IgniteCheckedException {
- long link = getLink(pageAddr, idx);
-
- return ((H2Tree)tree).getRowFactory().getRow(link);
- }
-
- /** {@inheritDoc} */
- @Override public long getLink(long pageAddr, int idx) {
- return PageUtils.getLong(pageAddr, offset(idx));
- }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2MvccExtrasInnerIO.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2MvccExtrasInnerIO.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2MvccExtrasInnerIO.java
new file mode 100644
index 0000000..fa6978e
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2MvccExtrasInnerIO.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.database.io;
+
+import org.apache.ignite.internal.pagemem.PageUtils;
+
+/**
+ *
+ */
+class H2MvccExtrasInnerIO extends AbstractH2ExtrasInnerIO {
+ /** */
+ private final int crdVerOff;
+
+ /** */
+ private final int cntrOff;
+
+ /** */
+ private final int newCrdVerOff;
+
+ /** */
+ private final int newCntrOff;
+
+ /**
+ * @param type Page type.
+ * @param ver Page format version.
+ * @param payloadSize Payload size.
+ */
+ H2MvccExtrasInnerIO(short type, int ver, int payloadSize) {
+ super(type, ver, 40, payloadSize);
+
+ crdVerOff = payloadSize + 8;
+ cntrOff = payloadSize + 16;
+ newCrdVerOff = payloadSize + 24;
+ newCntrOff = payloadSize + 32;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getMvccCoordinatorVersion(long pageAddr, int idx) {
+ return PageUtils.getLong(pageAddr, offset(idx) + crdVerOff);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getMvccCounter(long pageAddr, int idx) {
+ return PageUtils.getLong(pageAddr, offset(idx) + cntrOff);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getNewMvccCoordinatorVersion(long pageAddr, int idx) {
+ return PageUtils.getLong(pageAddr, offset(idx) + newCrdVerOff);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getNewMvccCounter(long pageAddr, int idx) {
+ return PageUtils.getLong(pageAddr, offset(idx) + newCntrOff);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean storeMvccInfo() {
+ return true;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2MvccExtrasLeafIO.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2MvccExtrasLeafIO.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2MvccExtrasLeafIO.java
new file mode 100644
index 0000000..2448e76
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2MvccExtrasLeafIO.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.database.io;
+
+import org.apache.ignite.internal.pagemem.PageUtils;
+
+/**
+ * Leaf page for H2 row references.
+ */
+class H2MvccExtrasLeafIO extends AbstractH2ExtrasLeafIO {
+ /** */
+ private final int crdVerOff;
+
+ /** */
+ private final int cntrOff;
+
+ /** */
+ private final int newCrdVerOff;
+
+ /** */
+ private final int newCntrOff;
+
+ /**
+ * @param type Page type.
+ * @param ver Page format version.
+ * @param payloadSize Payload size.
+ */
+ H2MvccExtrasLeafIO(short type, int ver, int payloadSize) {
+ super(type, ver, 40, payloadSize);
+
+ crdVerOff = payloadSize + 8;
+ cntrOff = payloadSize + 16;
+ newCrdVerOff = payloadSize + 24;
+ newCntrOff = payloadSize + 32;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getMvccCoordinatorVersion(long pageAddr, int idx) {
+ return PageUtils.getLong(pageAddr, offset(idx) + crdVerOff);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getMvccCounter(long pageAddr, int idx) {
+ return PageUtils.getLong(pageAddr, offset(idx) + cntrOff);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getNewMvccCoordinatorVersion(long pageAddr, int idx) {
+ return PageUtils.getLong(pageAddr, offset(idx) + newCrdVerOff);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getNewMvccCounter(long pageAddr, int idx) {
+ return PageUtils.getLong(pageAddr, offset(idx) + newCntrOff);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean storeMvccInfo() {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2MvccInnerIO.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2MvccInnerIO.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2MvccInnerIO.java
new file mode 100644
index 0000000..e64ab43
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2MvccInnerIO.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.database.io;
+
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.IOVersions;
+
+/**
+ * Inner page for H2 row references.
+ */
+public class H2MvccInnerIO extends AbstractH2InnerIO {
+ /** */
+ public static final IOVersions<H2MvccInnerIO> VERSIONS = new IOVersions<>(
+ new H2MvccInnerIO(1)
+ );
+
+ /**
+ * @param ver Page format version.
+ */
+ private H2MvccInnerIO(int ver) {
+ super(T_H2_MVCC_REF_INNER, ver, 40);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean storeMvccInfo() {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2MvccLeafIO.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2MvccLeafIO.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2MvccLeafIO.java
new file mode 100644
index 0000000..a364432
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2MvccLeafIO.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.database.io;
+
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.IOVersions;
+
+/**
+ *
+ */
+public class H2MvccLeafIO extends AbstractH2LeafIO {
+ /** */
+ public static final IOVersions<H2MvccLeafIO> VERSIONS = new IOVersions<>(
+ new H2MvccLeafIO(1)
+ );
+
+ /**
+ * @param ver Page format version.
+ */
+ private H2MvccLeafIO(int ver) {
+ super(T_H2_MVCC_REF_LEAF, ver, 40);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean storeMvccInfo() {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2RowLinkIO.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2RowLinkIO.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2RowLinkIO.java
index ce69197..d828c44 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2RowLinkIO.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2RowLinkIO.java
@@ -27,4 +27,37 @@ public interface H2RowLinkIO {
* @return Row link.
*/
public long getLink(long pageAddr, int idx);
+
+ /**
+ * @param pageAddr Page address.
+ * @param idx Index.
+ * @return Mvcc coordinator version.
+ */
+ public long getMvccCoordinatorVersion(long pageAddr, int idx);
+
+ /**
+ * @param pageAddr Page address.
+ * @param idx Index.
+ * @return Mvcc counter.
+ */
+ public long getMvccCounter(long pageAddr, int idx);
+
+ /**
+ * @param pageAddr Page address.
+ * @param idx Index.
+ * @return Mvcc coordinator version.
+ */
+ public long getNewMvccCoordinatorVersion(long pageAddr, int idx);
+
+ /**
+ * @param pageAddr Page address.
+ * @param idx Index.
+ * @return Mvcc counter.
+ */
+ public long getNewMvccCounter(long pageAddr, int idx);
+
+ /**
+ * @return {@code True} if IO stores mvcc information.
+ */
+ public boolean storeMvccInfo();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
index 92b7d10..96b331a 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
@@ -28,6 +28,7 @@ import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.query.h2.database.H2TreeMvccFilterClosure;
import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeRequest;
import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse;
import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowMessage;
@@ -200,6 +201,12 @@ public abstract class GridH2IndexBase extends BaseIndex {
public abstract GridH2Row put(GridH2Row row);
/**
+ * @param row Row.
+ * @return {@code True} if replaced existing row.
+ */
+ public abstract boolean putx(GridH2Row row);
+
+ /**
* Remove row from index.
*
* @param row Row.
@@ -426,7 +433,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
// This is the first request containing all the search rows.
assert !msg.bounds().isEmpty() : "empty bounds";
- src = new RangeSource(msg.bounds(), msg.segment(), qctx.filter());
+ src = new RangeSource(msg.bounds(), msg.segment(), qctx.filter(), qctx.mvccFilter());
}
else {
// This is request to fetch next portion of data.
@@ -1469,20 +1476,28 @@ public abstract class GridH2IndexBase extends BaseIndex {
/** */
final IndexingQueryFilter filter;
+ /** */
+ private final H2TreeMvccFilterClosure mvccFilter;
+
/** Iterator. */
Iterator<GridH2Row> iter = emptyIterator();
/**
* @param bounds Bounds.
+ * @param segment Segment.
* @param filter Filter.
+ * @param mvccFilter Mvcc filter.
*/
RangeSource(
Iterable<GridH2RowRangeBounds> bounds,
int segment,
- IndexingQueryFilter filter
+ IndexingQueryFilter filter,
+ H2TreeMvccFilterClosure mvccFilter
) {
this.segment = segment;
this.filter = filter;
+ this.mvccFilter = mvccFilter;
+
boundsIter = bounds.iterator();
}
@@ -1540,7 +1555,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
IgniteTree t = treeForRead(segment);
- iter = new CursorIteratorWrapper(doFind0(t, first, true, last, filter));
+ iter = new CursorIteratorWrapper(doFind0(t, first, last, filter, mvccFilter));
if (!iter.hasNext()) {
// We have to return empty range here.
@@ -1565,17 +1580,17 @@ public abstract class GridH2IndexBase extends BaseIndex {
/**
* @param t Tree.
* @param first Lower bound.
- * @param includeFirst Whether lower bound should be inclusive.
* @param last Upper bound always inclusive.
* @param filter Filter.
+ * @param mvccFilter Mvcc filter.
* @return Iterator over rows in given range.
*/
protected GridCursor<GridH2Row> doFind0(
IgniteTree t,
@Nullable SearchRow first,
- boolean includeFirst,
@Nullable SearchRow last,
- IndexingQueryFilter filter) {
+ IndexingQueryFilter filter,
+ H2TreeMvccFilterClosure mvccFilter) {
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOnheap.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOnheap.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOnheap.java
index e855536..62b459a 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOnheap.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOnheap.java
@@ -18,6 +18,8 @@
package org.apache.ignite.internal.processors.query.h2.opt;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
import org.apache.ignite.internal.util.typedef.internal.SB;
@@ -56,17 +58,24 @@ public class GridH2KeyValueRowOnheap extends GridH2Row {
/** */
private Value ver;
+ /** */
+ private final MvccCoordinatorVersion newVer;
+
/**
* Constructor.
*
* @param desc Row descriptor.
* @param row Row.
+ * @param newVer Version of new mvcc value inserted for the same key.
* @param keyType Key type.
* @param valType Value type.
* @throws IgniteCheckedException If failed.
*/
- public GridH2KeyValueRowOnheap(GridH2RowDescriptor desc, CacheDataRow row, int keyType, int valType)
- throws IgniteCheckedException {
+ public GridH2KeyValueRowOnheap(GridH2RowDescriptor desc,
+ CacheDataRow row,
+ MvccCoordinatorVersion newVer,
+ int keyType,
+ int valType) throws IgniteCheckedException {
super(row);
this.desc = desc;
@@ -78,6 +87,23 @@ public class GridH2KeyValueRowOnheap extends GridH2Row {
if (row.version() != null)
this.ver = desc.wrap(row.version(), Value.JAVA_OBJECT);
+
+ this.newVer = newVer;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long newMvccCoordinatorVersion() {
+ return newVer != null ? newVer.coordinatorVersion() : 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long newMvccCounter() {
+ return newVer != null ? newVer.counter(): CacheCoordinatorsProcessor.MVCC_COUNTER_NA;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean indexSearchRow() {
+ return false;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2MetaTable.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2MetaTable.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2MetaTable.java
index 5e09a86..38ad9d0 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2MetaTable.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2MetaTable.java
@@ -284,6 +284,11 @@ public class GridH2MetaTable extends TableBase {
throw new IllegalStateException("Index: " + idx);
}
}
+
+ /** {@inheritDoc} */
+ @Override public boolean indexSearchRow() {
+ return false; // TODO IGNITE-3478, check meta table with mvcc.
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2PlainRowFactory.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2PlainRowFactory.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2PlainRowFactory.java
index fd8a613..d24dc08 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2PlainRowFactory.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2PlainRowFactory.java
@@ -70,7 +70,7 @@ public class GridH2PlainRowFactory extends RowFactory {
/**
* @param key Key.
*/
- public RowKey(Value key) {
+ RowKey(Value key) {
this.key = key;
}
@@ -92,6 +92,11 @@ public class GridH2PlainRowFactory extends RowFactory {
}
/** {@inheritDoc} */
+ @Override public boolean indexSearchRow() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(RowKey.class, this);
}
@@ -138,6 +143,11 @@ public class GridH2PlainRowFactory extends RowFactory {
}
/** {@inheritDoc} */
+ @Override public boolean indexSearchRow() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(RowPair.class, this);
}
@@ -174,6 +184,11 @@ public class GridH2PlainRowFactory extends RowFactory {
}
/** {@inheritDoc} */
+ @Override public boolean indexSearchRow() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(RowSimple.class, this);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
index 91f0aef..b490179 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentMap;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable;
+import org.apache.ignite.internal.processors.query.h2.database.H2TreeMvccFilterClosure;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.spi.indexing.IndexingQueryFilter;
@@ -83,6 +84,9 @@ public class GridH2QueryContext {
/** */
private GridH2CollocationModel qryCollocationMdl;
+ /** */
+ private H2TreeMvccFilterClosure mvccFilter;
+
/**
* @param locNodeId Local node ID.
* @param nodeId The node who initiated the query.
@@ -102,13 +106,34 @@ public class GridH2QueryContext {
* @param segmentId Index segment ID.
* @param type Query type.
*/
- public GridH2QueryContext(UUID locNodeId, UUID nodeId, long qryId, int segmentId, GridH2QueryType type) {
+ public GridH2QueryContext(UUID locNodeId,
+ UUID nodeId,
+ long qryId,
+ int segmentId,
+ GridH2QueryType type) {
assert segmentId == 0 || type == MAP;
key = new Key(locNodeId, nodeId, qryId, segmentId, type);
}
/**
+ * @return Mvcc version.
+ */
+ @Nullable public H2TreeMvccFilterClosure mvccFilter() {
+ return mvccFilter;
+ }
+
+ /**
+ * @param mvccFilter Mvcc filter.
+ * @return {@code this}.
+ */
+ public GridH2QueryContext mvccFilter(H2TreeMvccFilterClosure mvccFilter) {
+ this.mvccFilter = mvccFilter;
+
+ return this;
+ }
+
+ /**
* @return Type.
*/
public GridH2QueryType type() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java
index 54e0417..785b791 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.query.h2.opt;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -88,16 +89,35 @@ public abstract class GridH2Row extends GridH2SearchRowAdapter implements CacheD
/** {@inheritDoc} */
@Override public long mvccCoordinatorVersion() {
- throw new UnsupportedOperationException();
+ return row.mvccCoordinatorVersion();
}
/** {@inheritDoc} */
@Override public long mvccCounter() {
- throw new UnsupportedOperationException();
+ return row.mvccCounter();
}
/** {@inheritDoc} */
@Override public boolean removed() {
throw new UnsupportedOperationException();
}
+
+ /** {@inheritDoc} */
+ @Override public boolean indexSearchRow() {
+ return false;
+ }
+
+ /**
+ * @return Part of new mvcc version.
+ */
+ public long newMvccCoordinatorVersion() {
+ return 0;
+ }
+
+ /**
+ * @return Part of new mvcc version.
+ */
+ public long newMvccCounter() {
+ return CacheCoordinatorsProcessor.MVCC_COUNTER_NA;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java
index 1d915e5..ad91deb 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java
@@ -29,6 +29,7 @@ import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.query.GridQueryProperty;
import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
@@ -58,6 +59,7 @@ import org.h2.value.ValueString;
import org.h2.value.ValueTime;
import org.h2.value.ValueTimestamp;
import org.h2.value.ValueUuid;
+import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOnheap.DEFAULT_COLUMNS_COUNT;
import static org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOnheap.KEY_COL;
@@ -273,17 +275,21 @@ public class GridH2RowDescriptor {
* Creates new row.
*
* @param dataRow Data row.
+ * @param newVer Version of new mvcc value inserted for the same key.
* @return Row.
* @throws IgniteCheckedException If failed.
*/
- public GridH2Row createRow(CacheDataRow dataRow) throws IgniteCheckedException {
+ public GridH2Row createRow(CacheDataRow dataRow, @Nullable MvccCoordinatorVersion newVer) throws IgniteCheckedException {
GridH2Row row;
try {
- if (dataRow.value() == null) // Only can happen for remove operation, can create simple search row.
+ if (dataRow.value() == null) { // Only can happen for remove operation, can create simple search row.
+ assert newVer == null;
+
row = new GridH2KeyRowOnheap(dataRow, wrap(dataRow.key(), keyType));
+ }
else
- row = new GridH2KeyValueRowOnheap(this, dataRow, keyType, valType);
+ row = new GridH2KeyValueRowOnheap(this, dataRow, newVer, keyType, valType);
}
catch (ClassCastException e) {
throw new IgniteCheckedException("Failed to convert key to SQL type. " +
http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SearchRow.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SearchRow.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SearchRow.java
new file mode 100644
index 0000000..4b3940c
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SearchRow.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.opt;
+
+import org.apache.ignite.internal.processors.query.h2.database.H2Tree;
+import org.h2.result.Row;
+
+/**
+ *
+ */
+public interface GridH2SearchRow extends Row {
+ /**
+ * @return Mvcc coordinator version.
+ */
+ public long mvccCoordinatorVersion();
+
+ /**
+ * @return Mvcc counter.
+ */
+ public long mvccCounter();
+
+ /**
+ * @return {@code True} for rows used for index search (as opposed to rows stored in {@link H2Tree}.
+ */
+ public boolean indexSearchRow();
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SearchRowAdapter.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SearchRowAdapter.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SearchRowAdapter.java
index 24a90b3..4fc8ee5 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SearchRowAdapter.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SearchRowAdapter.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.query.h2.opt;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
import org.h2.result.Row;
import org.h2.result.SearchRow;
import org.h2.store.Data;
@@ -25,7 +26,7 @@ import org.h2.value.Value;
/**
* Dummy H2 search row adadpter.
*/
-public abstract class GridH2SearchRowAdapter implements Row {
+public abstract class GridH2SearchRowAdapter implements GridH2SearchRow {
/** {@inheritDoc} */
@Override public void setKeyAndVersion(SearchRow old) {
throw new UnsupportedOperationException();
@@ -100,4 +101,14 @@ public abstract class GridH2SearchRowAdapter implements Row {
@Override public Value[] getValueList() {
throw new UnsupportedOperationException();
}
+
+ /** {@inheritDoc} */
+ @Override public long mvccCoordinatorVersion() {
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long mvccCounter() {
+ return CacheCoordinatorsProcessor.MVCC_COUNTER_NA;
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
index 6c353e9..ca9c1f5 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
@@ -28,6 +28,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteInterruptedException;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.query.QueryTable;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
@@ -399,15 +401,16 @@ public class GridH2Table extends TableBase {
* otherwise value and expiration time will be updated or new row will be added.
*
* @param row Row.
+ * @param newVer Version of new mvcc value inserted for the same key.
* @param rmv If {@code true} then remove, else update row.
* @return {@code true} If operation succeeded.
* @throws IgniteCheckedException If failed.
*/
- public boolean update(CacheDataRow row, boolean rmv)
+ public boolean update(CacheDataRow row, @Nullable MvccCoordinatorVersion newVer, boolean rmv)
throws IgniteCheckedException {
assert desc != null;
- GridH2Row h2Row = desc.createRow(row);
+ GridH2Row h2Row = desc.createRow(row, newVer);
if (rmv)
return doUpdate(h2Row, true);
@@ -454,6 +457,8 @@ public class GridH2Table extends TableBase {
*/
@SuppressWarnings("LockAcquiredButNotSafelyReleased")
private boolean doUpdate(final GridH2Row row, boolean del) throws IgniteCheckedException {
+ assert !cctx.mvccEnabled() || row.mvccCounter() != CacheCoordinatorsProcessor.MVCC_COUNTER_NA : row;
+
// Here we assume that each key can't be updated concurrently and case when different indexes
// getting updated from different threads with different rows with the same key is impossible.
lock(false);
@@ -466,10 +471,25 @@ public class GridH2Table extends TableBase {
if (!del) {
assert rowFactory == null || row.link() != 0 : row;
- GridH2Row old = pk.put(row); // Put to PK.
+ GridH2Row old;
+
+ // Put to PK.
+ if (cctx.mvccEnabled()) {
+ boolean replaced = pk.putx(row);
+
+ assert replaced == (row.newMvccCoordinatorVersion() != 0);
- if (old == null)
- size.increment();
+ old = null;
+
+ if (!replaced)
+ size.increment();
+ }
+ else {
+ old = pk.put(row);
+
+ if (old == null)
+ size.increment();
+ }
int len = idxs.size();
@@ -536,17 +556,24 @@ public class GridH2Table extends TableBase {
private void addToIndex(GridH2IndexBase idx, Index pk, GridH2Row row, GridH2Row old, boolean tmp) {
assert !idx.getIndexType().isUnique() : "Unique indexes are not supported: " + idx;
- GridH2Row old2 = idx.put(row);
+ if (idx.ctx.mvccEnabled()) {
+ boolean replaced = idx.putx(row);
- if (old2 != null) { // Row was replaced in index.
- if (!tmp) {
- if (!eq(pk, old2, old))
- throw new IllegalStateException("Row conflict should never happen, unique indexes are " +
- "not supported [idx=" + idx + ", old=" + old + ", old2=" + old2 + ']');
+ assert replaced == (row.newMvccCoordinatorVersion() != 0);
+ }
+ else {
+ GridH2Row old2 = idx.put(row);
+
+ if (old2 != null) { // Row was replaced in index.
+ if (!tmp) {
+ if (!eq(pk, old2, old))
+ throw new IllegalStateException("Row conflict should never happen, unique indexes are " +
+ "not supported [idx=" + idx + ", old=" + old + ", old2=" + old2 + ']');
+ }
}
+ else if (old != null) // Row was not replaced, need to remove manually.
+ idx.removex(old);
}
- else if (old != null) // Row was not replaced, need to remove manually.
- idx.removex(old);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 77b928f..fe21b1d 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -54,6 +54,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionsReservation;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
@@ -61,6 +62,7 @@ import org.apache.ignite.internal.processors.query.GridQueryCancel;
import org.apache.ignite.internal.processors.query.h2.H2Utils;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
import org.apache.ignite.internal.processors.query.h2.UpdateResult;
+import org.apache.ignite.internal.processors.query.h2.database.H2TreeMvccFilterClosure;
import org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2RetryException;
@@ -482,7 +484,8 @@ public class GridMapQueryExecutor {
false, // Replicated is always false here (see condition above).
req.timeout(),
params,
- true); // Lazy = true.
+ true,
+ req.mvccVersion()); // Lazy = true.
}
else {
ctx.closure().callLocal(
@@ -504,7 +507,8 @@ public class GridMapQueryExecutor {
false,
req.timeout(),
params,
- false); // Lazy = false.
+ false,
+ req.mvccVersion()); // Lazy = false.
return null;
}
@@ -528,7 +532,8 @@ public class GridMapQueryExecutor {
replicated,
req.timeout(),
params,
- lazy);
+ lazy,
+ req.mvccVersion());
}
/**
@@ -544,6 +549,7 @@ public class GridMapQueryExecutor {
* @param pageSize Page size.
* @param distributedJoinMode Query distributed join mode.
* @param lazy Streaming flag.
+ * @param mvccVer Mvcc version.
*/
private void onQueryRequest0(
final ClusterNode node,
@@ -561,7 +567,8 @@ public class GridMapQueryExecutor {
final boolean replicated,
final int timeout,
final Object[] params,
- boolean lazy
+ boolean lazy,
+ @Nullable final MvccCoordinatorVersion mvccVer
) {
if (lazy && MapQueryLazyWorker.currentWorker() == null) {
// Lazy queries must be re-submitted to dedicated workers.
@@ -570,8 +577,24 @@ public class GridMapQueryExecutor {
worker.submit(new Runnable() {
@Override public void run() {
- onQueryRequest0(node, reqId, segmentId, schemaName, qrys, cacheIds, topVer, partsMap, parts,
- pageSize, distributedJoinMode, enforceJoinOrder, replicated, timeout, params, true);
+ onQueryRequest0(
+ node,
+ reqId,
+ segmentId,
+ schemaName,
+ qrys,
+ cacheIds,
+ topVer,
+ partsMap,
+ parts,
+ pageSize,
+ distributedJoinMode,
+ enforceJoinOrder,
+ replicated,
+ timeout,
+ params,
+ true,
+ mvccVer);
}
});
@@ -639,6 +662,9 @@ public class GridMapQueryExecutor {
.topologyVersion(topVer)
.reservations(reserved);
+ if (mvccVer != null)
+ qctx.mvccFilter(new H2TreeMvccFilterClosure(mvccVer));
+
Connection conn = h2.connectionForSchema(schemaName);
H2Utils.setupConnection(conn, distributedJoinMode != OFF, enforceJoinOrder);
http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexIterator.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexIterator.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexIterator.java
index 1c0efb3..4518d14 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexIterator.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexIterator.java
@@ -25,8 +25,10 @@ import java.util.NoSuchElementException;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker;
import org.h2.index.Cursor;
import org.h2.result.Row;
+import org.jetbrains.annotations.Nullable;
/**
* Iterator that transparently and sequentially traverses a bunch of {@link GridMergeIndex} objects.
@@ -59,6 +61,9 @@ class GridMergeIndexIterator implements Iterator<List<?>>, AutoCloseable {
/** Whether remote resources were released. */
private boolean released;
+ /** */
+ private MvccQueryTracker mvccTracker;
+
/**
* Constructor.
*
@@ -69,14 +74,19 @@ class GridMergeIndexIterator implements Iterator<List<?>>, AutoCloseable {
* @param distributedJoins Distributed joins.
* @throws IgniteCheckedException if failed.
*/
- GridMergeIndexIterator(GridReduceQueryExecutor rdcExec, Collection<ClusterNode> nodes, ReduceQueryRun run,
- long qryReqId, boolean distributedJoins)
+ GridMergeIndexIterator(GridReduceQueryExecutor rdcExec,
+ Collection<ClusterNode> nodes,
+ ReduceQueryRun run,
+ long qryReqId,
+ boolean distributedJoins,
+ @Nullable MvccQueryTracker mvccTracker)
throws IgniteCheckedException {
this.rdcExec = rdcExec;
this.nodes = nodes;
this.run = run;
this.qryReqId = qryReqId;
this.distributedJoins = distributedJoins;
+ this.mvccTracker = mvccTracker;
this.idxIter = run.indexes().iterator();
@@ -155,7 +165,7 @@ class GridMergeIndexIterator implements Iterator<List<?>>, AutoCloseable {
private void releaseIfNeeded() {
if (!released) {
try {
- rdcExec.releaseRemoteResources(nodes, run, qryReqId, distributedJoins);
+ rdcExec.releaseRemoteResources(nodes, run, qryReqId, distributedJoins, mvccTracker);
}
finally {
released = true;
http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index f85cd94..80b1970 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -59,6 +59,8 @@ import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
@@ -83,11 +85,13 @@ import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryReq
import org.apache.ignite.internal.util.GridIntIterator;
import org.apache.ignite.internal.util.GridIntList;
import org.apache.ignite.internal.util.GridSpinBusyLock;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.CIX2;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiClosure;
+import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.h2.command.ddl.CreateTableData;
@@ -562,6 +566,31 @@ public class GridReduceQueryExecutor {
List<Integer> cacheIds = qry.cacheIds();
+ MvccQueryTracker mvccTracker = null;
+
+ // TODO IGNITE-3478.
+ if (qry.mvccEnabled()) {
+ assert !cacheIds.isEmpty();
+
+ final GridFutureAdapter<Void> fut = new GridFutureAdapter<>();
+
+ mvccTracker = new MvccQueryTracker(cacheContext(cacheIds.get(0)), true,
+ new IgniteBiInClosure<AffinityTopologyVersion, IgniteCheckedException>() {
+ @Override public void apply(AffinityTopologyVersion topVer, IgniteCheckedException e) {
+ fut.onDone(null, e);
+ }
+ });
+
+ mvccTracker.requestVersion(topVer);
+
+ try {
+ fut.get();
+ }
+ catch (IgniteCheckedException e) {
+ throw new CacheException(e);
+ }
+ }
+
Collection<ClusterNode> nodes = null;
// Explicit partition mapping for unstable topology.
@@ -730,6 +759,9 @@ public class GridReduceQueryExecutor {
.timeout(timeoutMillis)
.schemaName(schemaName);
+ if (mvccTracker != null)
+ req.mvccVersion(mvccTracker.mvccVersion());
+
if (send(nodes, req, parts == null ? null : new ExplicitPartitionsSpecializer(qryMap), false)) {
awaitAllReplies(r, nodes, cancel);
@@ -763,7 +795,12 @@ public class GridReduceQueryExecutor {
if (!retry) {
if (skipMergeTbl) {
- resIter = new GridMergeIndexIterator(this, finalNodes, r, qryReqId, qry.distributedJoins());
+ resIter = new GridMergeIndexIterator(this,
+ finalNodes,
+ r,
+ qryReqId,
+ qry.distributedJoins(),
+ mvccTracker);
release = false;
}
@@ -833,7 +870,7 @@ public class GridReduceQueryExecutor {
}
finally {
if (release) {
- releaseRemoteResources(finalNodes, r, qryReqId, qry.distributedJoins());
+ releaseRemoteResources(finalNodes, r, qryReqId, qry.distributedJoins(), mvccTracker);
if (!skipMergeTbl) {
for (int i = 0, mapQrys = qry.mapQueries().size(); i < mapQrys; i++)
@@ -1028,7 +1065,10 @@ public class GridReduceQueryExecutor {
* @param distributedJoins Distributed join flag.
*/
public void releaseRemoteResources(Collection<ClusterNode> nodes, ReduceQueryRun r, long qryReqId,
- boolean distributedJoins) {
+ boolean distributedJoins, MvccQueryTracker mvccTracker) {
+ if (mvccTracker != null)
+ mvccTracker.onQueryDone();
+
// For distributedJoins need always send cancel request to cleanup resources.
if (distributedJoins)
send(nodes, new GridQueryCancelRequest(qryReqId), null, false);
http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
index 4e1fadb..347b88c 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
@@ -31,6 +31,7 @@ import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
import org.apache.ignite.internal.processors.cache.query.QueryTable;
@@ -42,6 +43,7 @@ import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery.EMPTY_PARAMS;
@@ -133,6 +135,9 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
/** Schema name. */
private String schemaName;
+ /** */
+ private MvccCoordinatorVersion mvccVer;
+
/**
* Required by {@link Externalizable}
*/
@@ -157,6 +162,24 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
params = req.params;
paramsBytes = req.paramsBytes;
schemaName = req.schemaName;
+ mvccVer = req.mvccVer;
+ }
+
+ /**
+ * @return Mvcc version.
+ */
+ @Nullable public MvccCoordinatorVersion mvccVersion() {
+ return mvccVer;
+ }
+
+ /**
+ * @param mvccVer Mvcc version.
+ * @return {@code this}.
+ */
+ public GridH2QueryRequest mvccVersion(MvccCoordinatorVersion mvccVer) {
+ this.mvccVer = mvccVer;
+
+ return this;
}
/**
@@ -435,65 +458,71 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
writer.incrementState();
case 2:
- if (!writer.writeInt("pageSize", pageSize))
+ if (!writer.writeMessage("mvccVer", mvccVer))
return false;
writer.incrementState();
case 3:
- if (!writer.writeByteArray("paramsBytes", paramsBytes))
+ if (!writer.writeInt("pageSize", pageSize))
return false;
writer.incrementState();
case 4:
- if (!writer.writeMap("parts", parts, MessageCollectionItemType.UUID, MessageCollectionItemType.INT_ARR))
+ if (!writer.writeByteArray("paramsBytes", paramsBytes))
return false;
writer.incrementState();
case 5:
- if (!writer.writeCollection("qrys", qrys, MessageCollectionItemType.MSG))
+ if (!writer.writeMap("parts", parts, MessageCollectionItemType.UUID, MessageCollectionItemType.INT_ARR))
return false;
writer.incrementState();
case 6:
- if (!writer.writeLong("reqId", reqId))
+ if (!writer.writeIntArray("qryParts", qryParts))
return false;
writer.incrementState();
case 7:
- if (!writer.writeCollection("tbls", tbls, MessageCollectionItemType.MSG))
+ if (!writer.writeCollection("qrys", qrys, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
case 8:
- if (!writer.writeInt("timeout", timeout))
+ if (!writer.writeLong("reqId", reqId))
return false;
writer.incrementState();
case 9:
- if (!writer.writeMessage("topVer", topVer))
+ if (!writer.writeString("schemaName", schemaName))
return false;
writer.incrementState();
-
case 10:
- if (!writer.writeIntArray("qryParts", qryParts))
+ if (!writer.writeCollection("tbls", tbls, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
case 11:
- if (!writer.writeString("schemaName", schemaName))
+ if (!writer.writeInt("timeout", timeout))
return false;
writer.incrementState();
+
+ case 12:
+ if (!writer.writeMessage("topVer", topVer))
+ return false;
+
+ writer.incrementState();
+
}
return true;
@@ -524,7 +553,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
reader.incrementState();
case 2:
- pageSize = reader.readInt("pageSize");
+ mvccVer = reader.readMessage("mvccVer");
if (!reader.isLastRead())
return false;
@@ -532,7 +561,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
reader.incrementState();
case 3:
- paramsBytes = reader.readByteArray("paramsBytes");
+ pageSize = reader.readInt("pageSize");
if (!reader.isLastRead())
return false;
@@ -540,7 +569,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
reader.incrementState();
case 4:
- parts = reader.readMap("parts", MessageCollectionItemType.UUID, MessageCollectionItemType.INT_ARR, false);
+ paramsBytes = reader.readByteArray("paramsBytes");
if (!reader.isLastRead())
return false;
@@ -548,7 +577,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
reader.incrementState();
case 5:
- qrys = reader.readCollection("qrys", MessageCollectionItemType.MSG);
+ parts = reader.readMap("parts", MessageCollectionItemType.UUID, MessageCollectionItemType.INT_ARR, false);
if (!reader.isLastRead())
return false;
@@ -556,7 +585,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
reader.incrementState();
case 6:
- reqId = reader.readLong("reqId");
+ qryParts = reader.readIntArray("qryParts");
if (!reader.isLastRead())
return false;
@@ -564,7 +593,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
reader.incrementState();
case 7:
- tbls = reader.readCollection("tbls", MessageCollectionItemType.MSG);
+ qrys = reader.readCollection("qrys", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
return false;
@@ -572,7 +601,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
reader.incrementState();
case 8:
- timeout = reader.readInt("timeout");
+ reqId = reader.readLong("reqId");
if (!reader.isLastRead())
return false;
@@ -580,16 +609,15 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
reader.incrementState();
case 9:
- topVer = reader.readMessage("topVer");
+ schemaName = reader.readString("schemaName");
if (!reader.isLastRead())
return false;
reader.incrementState();
-
case 10:
- qryParts = reader.readIntArray("qryParts");
+ tbls = reader.readCollection("tbls", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
return false;
@@ -597,12 +625,21 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
reader.incrementState();
case 11:
- schemaName = reader.readString("schemaName");
+ timeout = reader.readInt("timeout");
if (!reader.isLastRead())
return false;
reader.incrementState();
+
+ case 12:
+ topVer = reader.readMessage("topVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
}
return reader.afterMessageRead(GridH2QueryRequest.class);
@@ -615,7 +652,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 12;
+ return 13;
}
/** {@inheritDoc} */
[3/4] ignite git commit: ignite-3478 Mvcc support for sql indexes
Posted by sb...@apache.org.
http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
index dbe4ce5..df9f21e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
@@ -36,26 +36,24 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
+import javax.cache.expiry.Duration;
+import javax.cache.expiry.TouchedExpiryPolicy;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteTransactions;
-import org.apache.ignite.cache.CacheMode;
-import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.cluster.ClusterTopologyException;
import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.configuration.MemoryConfiguration;
+import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheMapEntry;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.TestCacheNodeExcludingFilter;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
@@ -275,6 +273,13 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest {
/**
* @throws Exception If failed.
*/
+ public void testCacheRecreate() throws Exception {
+ cacheRecreate(null);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testActiveQueriesCleanup() throws Exception {
activeQueriesCleanup(false);
}
@@ -3626,6 +3631,67 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest {
checkRow(cctx, row, key0, vers.get(v + 1).get1());
}
}
+
+ KeyCacheObject key = cctx.toCacheKeyObject(KEYS);
+
+ cache.put(key, 0);
+
+ cache.remove(key);
+
+ cctx.offheap().mvccRemoveAll((GridCacheMapEntry)cctx.cache().entryEx(key));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testExpiration() throws Exception {
+ final IgniteEx node = startGrid(0);
+
+ IgniteCache cache = node.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, 64));
+
+ final IgniteCache expiryCache =
+ cache.withExpiryPolicy(new TouchedExpiryPolicy(new Duration(TimeUnit.SECONDS, 1)));
+
+ for (int i = 0; i < 10; i++)
+ expiryCache.put(1, i);
+
+ assertTrue("Failed to wait for expiration", GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return expiryCache.localPeek(1) == null;
+ }
+ }, 5000));
+
+ for (int i = 0; i < 11; i++) {
+ if (i % 2 == 0)
+ expiryCache.put(1, i);
+ else
+ expiryCache.remove(1);
+ }
+
+ assertTrue("Failed to wait for expiration", GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return expiryCache.localPeek(1) == null;
+ }
+ }, 5000));
+
+ expiryCache.put(1, 1);
+
+ assertTrue("Failed to wait for expiration", GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ try {
+ GridCacheContext cctx = node.context().cache().context().cacheContext(CU.cacheId(DEFAULT_CACHE_NAME));
+
+ KeyCacheObject key = cctx.toCacheKeyObject(1);
+
+ return cctx.offheap().read(cctx, key) == null;
+ }
+ catch (Exception e) {
+ fail();
+
+ return false;
+ }
+ }
+ }, 5000));
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java
index 27804d9..335279f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java
@@ -18,11 +18,14 @@
package org.apache.ignite.internal.processors.database;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
+import java.util.Set;
import java.util.TreeMap;
+import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
@@ -218,6 +221,53 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
/**
* @throws IgniteCheckedException If failed.
*/
+ public void testFindWithClosure() throws IgniteCheckedException {
+ TestTree tree = createTestTree(true);
+ TreeMap<Long, Long> map = new TreeMap<>();
+
+ long size = CNT * CNT;
+
+ for (long i = 1; i <= size; i++) {
+ tree.put(i);
+ map.put(i, i);
+ }
+
+ checkCursor(tree.find(null, null, new TestTreeFindFilteredClosure(Collections.<Long>emptySet()), null),
+ Collections.<Long>emptyList().iterator());
+
+ checkCursor(tree.find(null, null, new TestTreeFindFilteredClosure(map.keySet()), null),
+ map.values().iterator());
+
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ for (int i = 0; i < 100; i++) {
+ Long val = rnd.nextLong(size) + 1;
+
+ checkCursor(tree.find(null, null, new TestTreeFindFilteredClosure(Collections.singleton(val)), null),
+ Collections.singleton(val).iterator());
+ }
+
+ for (int i = 0; i < 200; i++) {
+ long vals = rnd.nextLong(size) + 1;
+
+ TreeSet<Long> exp = new TreeSet<>();
+
+ for (long k = 0; k < vals; k++)
+ exp.add(rnd.nextLong(size) + 1);
+
+ checkCursor(tree.find(null, null, new TestTreeFindFilteredClosure(exp), null), exp.iterator());
+
+ checkCursor(tree.find(0L, null, new TestTreeFindFilteredClosure(exp), null), exp.iterator());
+
+ checkCursor(tree.find(0L, size, new TestTreeFindFilteredClosure(exp), null), exp.iterator());
+
+ checkCursor(tree.find(null, size, new TestTreeFindFilteredClosure(exp), null), exp.iterator());
+ }
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
public void _testBenchInvoke() throws IgniteCheckedException {
MAX_PER_PAGE = 10;
@@ -625,12 +675,12 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
}
/**
- * @param tree
- * @param lower
- * @param upper
- * @param exp
- * @param expFound
- * @throws IgniteCheckedException
+ * @param tree Tree.
+ * @param lower Lower bound.
+ * @param upper Upper bound.
+ * @param exp Value to find.
+ * @param expFound {@code True} if value should be found.
+ * @throws IgniteCheckedException If failed.
*/
private void checkIterate(TestTree tree, long lower, long upper, Long exp, boolean expFound)
throws IgniteCheckedException {
@@ -641,6 +691,14 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
assertEquals(expFound, c.found);
}
+ /**
+ * @param tree Tree.
+ * @param lower Lower bound.
+ * @param upper Upper bound.
+ * @param c Closure.
+ * @param expFound {@code True} if value should be found.
+ * @throws IgniteCheckedException If failed.
+ */
private void checkIterateC(TestTree tree, long lower, long upper, TestTreeRowClosure c, boolean expFound)
throws IgniteCheckedException {
c.found = false;
@@ -1307,7 +1365,7 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
* @throws Exception If failed.
*/
public void testIterateConcurrentPutRemove() throws Exception {
- findOneBoundedConcurrentPutRemove();
+ iterateConcurrentPutRemove();
}
/**
@@ -1316,7 +1374,7 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
public void testIterateConcurrentPutRemove_1() throws Exception {
MAX_PER_PAGE = 1;
- findOneBoundedConcurrentPutRemove();
+ iterateConcurrentPutRemove();
}
/**
@@ -1325,7 +1383,7 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
public void testIterateConcurrentPutRemove_5() throws Exception {
MAX_PER_PAGE = 5;
- findOneBoundedConcurrentPutRemove();
+ iterateConcurrentPutRemove();
}
/**
@@ -1334,13 +1392,13 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
public void testIteratePutRemove_10() throws Exception {
MAX_PER_PAGE = 10;
- findOneBoundedConcurrentPutRemove();
+ iterateConcurrentPutRemove();
}
/**
* @throws Exception If failed.
*/
- private void findOneBoundedConcurrentPutRemove() throws Exception {
+ private void iterateConcurrentPutRemove() throws Exception {
final TestTree tree = createTestTree(true);
final int KEYS = 10_000;
@@ -1474,7 +1532,7 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
}
/**
- *
+ * @throws Exception If failed.
*/
public void testConcurrentGrowDegenerateTreeAndConcurrentRemove() throws Exception {
//calculate tree size when split happens
@@ -2132,6 +2190,7 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
/** */
private Long val;
+
/** {@inheritDoc} */
@Override public boolean apply(BPlusTree<Long, Long> tree, BPlusIO<Long> io, long pageAddr, int idx)
throws IgniteCheckedException {
@@ -2142,4 +2201,27 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
return false;
}
}
+
+ /**
+ *
+ */
+ static class TestTreeFindFilteredClosure implements BPlusTree.TreeRowClosure<Long, Long> {
+ /** */
+ private final Set<Long> vals;
+
+ /**
+ * @param vals Values to allow in filter.
+ */
+ TestTreeFindFilteredClosure(Set<Long> vals) {
+ this.vals = vals;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(BPlusTree<Long, Long> tree, BPlusIO<Long> io, long pageAddr, int idx)
+ throws IgniteCheckedException {
+ Long val = io.getLookupRow(tree, pageAddr, idx);
+
+ return vals.contains(val);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java
----------------------------------------------------------------------
diff --git a/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java b/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java
index 2cd36b3..fce18f1 100644
--- a/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java
+++ b/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java
@@ -200,6 +200,11 @@ public class GridH2SpatialIndex extends GridH2IndexBase implements SpatialIndex
}
}
+ /** {@inheritDoc} */
+ @Override public boolean putx(GridH2Row row) {
+ return put(row) != null;
+ }
+
/**
* @param row Row.
* @param rowId Row id.
http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
index 4a93aaf..f5c5e60 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
@@ -65,6 +65,9 @@ public class GridCacheTwoStepQuery {
/** */
private CacheQueryPartitionInfo[] derivedPartitions;
+ /** */
+ private boolean mvccEnabled;
+
/**
* @param originalSql Original query SQL.
* @param tbls Tables in query.
@@ -241,6 +244,7 @@ public class GridCacheTwoStepQuery {
cp.distributedJoins = distributedJoins;
cp.derivedPartitions = derivedPartitions;
cp.local = local;
+ cp.mvccEnabled = mvccEnabled;
for (int i = 0; i < mapQrys.size(); i++)
cp.mapQrys.add(mapQrys.get(i).copy());
@@ -262,6 +266,20 @@ public class GridCacheTwoStepQuery {
return tbls;
}
+ /**
+ * @return Mvcc flag.
+ */
+ public boolean mvccEnabled() {
+ return mvccEnabled;
+ }
+
+ /**
+ * @param mvccEnabled Mvcc flag.
+ */
+ public void mvccEnabled(boolean mvccEnabled) {
+ this.mvccEnabled = mvccEnabled;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridCacheTwoStepQuery.class, this);
http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index eed1f19..6dc93c4 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -71,6 +71,7 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
import org.apache.ignite.internal.processors.cache.query.CacheQueryPartitionInfo;
@@ -98,6 +99,8 @@ import org.apache.ignite.internal.processors.query.h2.database.io.H2ExtrasInnerI
import org.apache.ignite.internal.processors.query.h2.database.io.H2ExtrasLeafIO;
import org.apache.ignite.internal.processors.query.h2.database.io.H2InnerIO;
import org.apache.ignite.internal.processors.query.h2.database.io.H2LeafIO;
+import org.apache.ignite.internal.processors.query.h2.database.io.H2MvccInnerIO;
+import org.apache.ignite.internal.processors.query.h2.database.io.H2MvccLeafIO;
import org.apache.ignite.internal.processors.query.h2.ddl.DdlStatementsProcessor;
import org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2DefaultTableEngine;
@@ -183,7 +186,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
* Register IO for indexes.
*/
static {
- PageIO.registerH2(H2InnerIO.VERSIONS, H2LeafIO.VERSIONS);
+ PageIO.registerH2(H2InnerIO.VERSIONS, H2LeafIO.VERSIONS, H2MvccInnerIO.VERSIONS, H2MvccLeafIO.VERSIONS);
H2ExtrasInnerIO.register();
H2ExtrasLeafIO.register();
@@ -537,8 +540,11 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
/** {@inheritDoc} */
- @Override public void store(GridCacheContext cctx, GridQueryTypeDescriptor type, CacheDataRow row)
- throws IgniteCheckedException {
+ @Override public void store(GridCacheContext cctx,
+ GridQueryTypeDescriptor type,
+ CacheDataRow row,
+ @Nullable MvccCoordinatorVersion newVer) throws IgniteCheckedException
+ {
String cacheName = cctx.name();
H2TableDescriptor tbl = tableDescriptor(schema(cacheName), cacheName, type.name());
@@ -546,7 +552,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
if (tbl == null)
return; // Type was rejected.
- tbl.table().update(row, false);
+ tbl.table().update(row, newVer, false);
if (tbl.luceneIndex() != null) {
long expireTime = row.expireTime();
@@ -575,7 +581,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
if (tbl == null)
return;
- if (tbl.table().update(row, true)) {
+ if (tbl.table().update(row, null, true)) {
if (tbl.luceneIndex() != null)
tbl.luceneIndex().remove(row.key());
}
@@ -671,7 +677,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
SchemaIndexCacheVisitorClosure clo = new SchemaIndexCacheVisitorClosure() {
@Override public void apply(CacheDataRow row) throws IgniteCheckedException {
- GridH2Row h2Row = rowDesc.createRow(row);
+ GridH2Row h2Row = rowDesc.createRow(row, null);
h2Idx.put(h2Row);
}
@@ -1546,7 +1552,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
if (partitions == null && twoStepQry.derivedPartitions() != null) {
try {
partitions = calculateQueryPartitions(twoStepQry.derivedPartitions(), args);
- } catch (IgniteCheckedException e) {
+ }
+ catch (IgniteCheckedException e) {
throw new CacheException("Failed to calculate derived partitions: [qry=" + sqlQry + ", params=" +
Arrays.deepToString(args) + "]", e);
}
@@ -1585,9 +1592,11 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
/**
+ * @param cacheIds Cache IDs.
+ * @param twoStepQry Query.
* @throws IllegalStateException if segmented indices used with non-segmented indices.
*/
- private void checkCacheIndexSegmentation(List<Integer> cacheIds) {
+ private void processCaches(List<Integer> cacheIds, GridCacheTwoStepQuery twoStepQry) {
if (cacheIds.isEmpty())
return; // Nothing to check
@@ -1595,11 +1604,21 @@ public class IgniteH2Indexing implements GridQueryIndexing {
int expectedParallelism = 0;
- for (Integer cacheId : cacheIds) {
+ boolean mvccEnabled = false;
+
+ for (int i = 0; i < cacheIds.size(); i++) {
+ Integer cacheId = cacheIds.get(i);
+
GridCacheContext cctx = sharedCtx.cacheContext(cacheId);
assert cctx != null;
+ if (i == 0)
+ mvccEnabled = cctx.mvccEnabled();
+ else if (cctx.mvccEnabled() != mvccEnabled)
+ throw new IllegalStateException("Using caches with different mvcc settings in same query is " +
+ "forbidden.");
+
if (!cctx.isPartitioned())
continue;
@@ -1610,6 +1629,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
"forbidden.");
}
}
+
+ twoStepQry.mvccEnabled(mvccEnabled);
}
/**
@@ -2522,7 +2543,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
//Prohibit usage indices with different numbers of segments in same query.
List<Integer> cacheIds = new ArrayList<>(caches0);
- checkCacheIndexSegmentation(cacheIds);
+ processCaches(cacheIds, twoStepQry);
return cacheIds;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java
index 59bf153..9a99c62 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java
@@ -124,13 +124,20 @@ public class H2PkHashIndex extends GridH2IndexBase {
@SuppressWarnings("StatementWithEmptyBody")
@Override public GridH2Row put(GridH2Row row) {
// Should not be called directly. Rows are inserted into underlying cache data stores.
-
assert false;
throw DbException.getUnsupportedException("put");
}
/** {@inheritDoc} */
+ @Override public boolean putx(GridH2Row row) {
+ // Should not be called directly. Rows are inserted into underlying cache data stores.
+ assert false;
+
+ throw DbException.getUnsupportedException("putx");
+ }
+
+ /** {@inheritDoc} */
@Override public GridH2Row remove(SearchRow row) {
// Should not be called directly. Rows are removed from underlying cache data stores.
@@ -197,7 +204,7 @@ public class H2PkHashIndex extends GridH2IndexBase {
try {
CacheDataRow dataRow = cursor.get();
- return tbl.rowDescriptor().createRow(dataRow);
+ return tbl.rowDescriptor().createRow(dataRow, null);
}
catch (IgniteCheckedException e) {
throw DbException.convert(e);
http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2RowFactory.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2RowFactory.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2RowFactory.java
index 40b9b0a..e9ec9e6 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2RowFactory.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2RowFactory.java
@@ -18,9 +18,9 @@
package org.apache.ignite.internal.processors.query.h2.database;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter;
+import org.apache.ignite.internal.processors.cache.tree.MvccDataRow;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2Row;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor;
@@ -60,17 +60,29 @@ public class H2RowFactory {
rowBuilder.initFromLink(cctx.group(), CacheDataRowAdapter.RowData.FULL);
- GridH2Row row;
-
- try {
- row = rowDesc.createRow(rowBuilder);
- }
- catch (IgniteCheckedException e) {
- throw new IgniteException(e);
- }
+ GridH2Row row = rowDesc.createRow(rowBuilder, null);
assert row.version() != null;
return row;
}
+
+ /**
+ * @param link Link.
+ * @param mvccCrdVer Mvcc coordinator version.
+ * @param mvccCntr Mvcc counter.
+ * @return Row.
+ * @throws IgniteCheckedException If failed.
+ */
+ public GridH2Row getMvccRow(long link, long mvccCrdVer, long mvccCntr) throws IgniteCheckedException {
+ MvccDataRow row = new MvccDataRow(cctx.group(),
+ 0,
+ link,
+ -1, // TODO IGNITE-3478: get partition from link.
+ null,
+ mvccCrdVer,
+ mvccCntr);
+
+ return rowDesc.createRow(row, null);
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2Tree.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2Tree.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2Tree.java
index fcfeb16..df77f7a 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2Tree.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2Tree.java
@@ -29,15 +29,22 @@ import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusMeta
import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList;
import org.apache.ignite.internal.processors.query.h2.database.io.H2ExtrasInnerIO;
import org.apache.ignite.internal.processors.query.h2.database.io.H2ExtrasLeafIO;
+import org.apache.ignite.internal.processors.query.h2.database.io.H2RowLinkIO;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2Row;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2SearchRow;
+import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.h2.result.SearchRow;
import org.h2.table.IndexColumn;
import org.h2.value.Value;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.MVCC_COUNTER_NA;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.assertMvccVersionValid;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.unmaskCoordinatorVersion;
+
/**
*/
-public abstract class H2Tree extends BPlusTree<SearchRow, GridH2Row> {
+public abstract class H2Tree extends BPlusTree<GridH2SearchRow, GridH2Row> {
/** */
private final H2RowFactory rowStore;
@@ -54,6 +61,9 @@ public abstract class H2Tree extends BPlusTree<SearchRow, GridH2Row> {
private final int[] columnIds;
/** */
+ private final boolean mvccEnabled;
+
+ /** */
private final Comparator<Value> comp = new Comparator<Value>() {
@Override public int compare(Value o1, Value o2) {
return compareValues(o1, o2);
@@ -69,9 +79,10 @@ public abstract class H2Tree extends BPlusTree<SearchRow, GridH2Row> {
* @param rowStore Row data store.
* @param metaPageId Meta page ID.
* @param initNew Initialize new index.
+ * @param mvccEnabled Mvcc flag.
* @throws IgniteCheckedException If failed.
*/
- protected H2Tree(
+ H2Tree(
String name,
ReuseList reuseList,
int grpId,
@@ -83,7 +94,8 @@ public abstract class H2Tree extends BPlusTree<SearchRow, GridH2Row> {
boolean initNew,
IndexColumn[] cols,
List<InlineIndexHelper> inlineIdxs,
- int inlineSize
+ int inlineSize,
+ boolean mvccEnabled
) throws IgniteCheckedException {
super(name, grpId, pageMem, wal, globalRmvId, metaPageId, reuseList);
@@ -93,6 +105,7 @@ public abstract class H2Tree extends BPlusTree<SearchRow, GridH2Row> {
}
this.inlineSize = inlineSize;
+ this.mvccEnabled = mvccEnabled;
assert rowStore != null;
@@ -105,7 +118,7 @@ public abstract class H2Tree extends BPlusTree<SearchRow, GridH2Row> {
for (int i = 0; i < cols.length; i++)
columnIds[i] = cols[i].column.getColumnId();
- setIos(H2ExtrasInnerIO.getVersions(inlineSize), H2ExtrasLeafIO.getVersions(inlineSize));
+ setIos(H2ExtrasInnerIO.getVersions(inlineSize, mvccEnabled), H2ExtrasLeafIO.getVersions(inlineSize, mvccEnabled));
initTree(initNew, inlineSize);
}
@@ -118,7 +131,7 @@ public abstract class H2Tree extends BPlusTree<SearchRow, GridH2Row> {
}
/** {@inheritDoc} */
- @Override protected GridH2Row getRow(BPlusIO<SearchRow> io, long pageAddr, int idx, Object ignore)
+ @Override protected GridH2Row getRow(BPlusIO<GridH2SearchRow> io, long pageAddr, int idx, Object ignore)
throws IgniteCheckedException {
return (GridH2Row)io.getLookupRow(this, pageAddr, idx);
}
@@ -159,8 +172,8 @@ public abstract class H2Tree extends BPlusTree<SearchRow, GridH2Row> {
/** {@inheritDoc} */
@SuppressWarnings("ForLoopReplaceableByForEach")
- @Override protected int compare(BPlusIO<SearchRow> io, long pageAddr, int idx,
- SearchRow row) throws IgniteCheckedException {
+ @Override protected int compare(BPlusIO<GridH2SearchRow> io, long pageAddr, int idx,
+ GridH2SearchRow row) throws IgniteCheckedException {
if (inlineSize() == 0)
return compareRows(getRow(io, pageAddr, idx), row);
else {
@@ -195,7 +208,7 @@ public abstract class H2Tree extends BPlusTree<SearchRow, GridH2Row> {
}
if (lastIdxUsed == cols.length)
- return 0;
+ return mvccCompare((H2RowLinkIO)io, pageAddr, idx, row);
SearchRow rowData = getRow(io, pageAddr, idx);
@@ -207,7 +220,7 @@ public abstract class H2Tree extends BPlusTree<SearchRow, GridH2Row> {
if (v2 == null) {
// Can't compare further.
- return 0;
+ return mvccCompare((H2RowLinkIO)io, pageAddr, idx, row);
}
Value v1 = rowData.getValue(idx0);
@@ -218,7 +231,7 @@ public abstract class H2Tree extends BPlusTree<SearchRow, GridH2Row> {
return InlineIndexHelper.fixSort(c, col.sortType);
}
- return 0;
+ return mvccCompare((H2RowLinkIO)io, pageAddr, idx, row);
}
}
@@ -229,7 +242,9 @@ public abstract class H2Tree extends BPlusTree<SearchRow, GridH2Row> {
* @param r2 Row 2.
* @return Compare result.
*/
- private int compareRows(GridH2Row r1, SearchRow r2) {
+ private int compareRows(GridH2Row r1, GridH2SearchRow r2) {
+ assert !mvccEnabled || r2.indexSearchRow() || assertMvccVersionValid(r2.mvccCoordinatorVersion(), r2.mvccCounter()) : r2;
+
if (r1 == r2)
return 0;
@@ -241,7 +256,7 @@ public abstract class H2Tree extends BPlusTree<SearchRow, GridH2Row> {
if (v1 == null || v2 == null) {
// Can't compare further.
- return 0;
+ return mvccCompare(r1, r2);
}
int c = compareValues(v1, v2);
@@ -250,6 +265,64 @@ public abstract class H2Tree extends BPlusTree<SearchRow, GridH2Row> {
return InlineIndexHelper.fixSort(c, cols[i].sortType);
}
+ return mvccCompare(r1, r2);
+ }
+
+ /**
+ * @param io IO.
+ * @param pageAddr Page address.
+ * @param idx Item index.
+ * @param r2 Search row.
+ * @return Comparison result.
+ */
+ private int mvccCompare(H2RowLinkIO io, long pageAddr, int idx, GridH2SearchRow r2) {
+ if (mvccEnabled && !r2.indexSearchRow()) {
+ long crdVer1 = io.getMvccCoordinatorVersion(pageAddr, idx);
+ long crdVer2 = r2.mvccCoordinatorVersion();
+
+ assert crdVer1 != 0;
+ assert crdVer2 != 0 : r2;
+
+ int c = Long.compare(unmaskCoordinatorVersion(crdVer1), unmaskCoordinatorVersion(crdVer2));
+
+ if (c != 0)
+ return c;
+
+ long cntr = io.getMvccCounter(pageAddr, idx);
+
+ assert cntr != MVCC_COUNTER_NA;
+ assert r2.mvccCounter() != MVCC_COUNTER_NA : r2;
+
+ return Long.compare(cntr, r2.mvccCounter());
+ }
+
+ return 0;
+ }
+
+ /**
+ * @param r1 First row.
+ * @param r2 Second row.
+ * @return Comparison result.
+ */
+ private int mvccCompare(GridH2Row r1, GridH2SearchRow r2) {
+ if (mvccEnabled && !r2.indexSearchRow()) {
+ long crdVer1 = r1.mvccCoordinatorVersion();
+ long crdVer2 = r2.mvccCoordinatorVersion();
+
+ assert crdVer1 != 0 : r1;
+ assert crdVer2 != 0 : r2;
+
+ int c = Long.compare(unmaskCoordinatorVersion(crdVer1), unmaskCoordinatorVersion(crdVer2));
+
+ if (c != 0)
+ return c;
+
+ assert r1.mvccCounter() != MVCC_COUNTER_NA : r1;
+ assert r2.mvccCounter() != MVCC_COUNTER_NA : r2;
+
+ return Long.compare(r1.mvccCounter(), r2.mvccCounter());
+ }
+
return 0;
}
@@ -259,4 +332,9 @@ public abstract class H2Tree extends BPlusTree<SearchRow, GridH2Row> {
* @return Comparison result.
*/
public abstract int compareValues(Value v1, Value v2);
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(H2Tree.class, this, "super", super.toString());
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
index 3c0ab5e..87a6eca 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
@@ -29,7 +29,9 @@ import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
import org.apache.ignite.internal.processors.query.h2.H2Cursor;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2Row;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2SearchRow;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
import org.apache.ignite.internal.util.IgniteTree;
import org.apache.ignite.internal.util.lang.GridCursor;
@@ -118,7 +120,8 @@ public class H2TreeIndex extends GridH2IndexBase {
page.isAllocated(),
cols,
inlineIdxs,
- computeInlineSize(inlineIdxs, inlineSize)) {
+ computeInlineSize(inlineIdxs, inlineSize),
+ cctx.mvccEnabled()) {
@Override public int compareValues(Value v1, Value v2) {
return v1 == v2 ? 0 : table.compareTypeSafe(v1, v2);
}
@@ -165,20 +168,33 @@ public class H2TreeIndex extends GridH2IndexBase {
/** {@inheritDoc} */
@Override public Cursor find(Session ses, SearchRow lower, SearchRow upper) {
try {
- IndexingQueryFilter f = threadLocalFilter();
+ assert lower == null || lower instanceof GridH2SearchRow : lower;
+ assert upper == null || upper instanceof GridH2SearchRow : upper;
+
IndexingQueryCacheFilter p = null;
+ H2TreeMvccFilterClosure mvccFilter = null;
+
+ GridH2QueryContext qctx = GridH2QueryContext.get();
+
+ if (qctx != null) {
+ IndexingQueryFilter f = qctx.filter();
+
+ if (f != null) {
+ String cacheName = getTable().cacheName();
- if (f != null) {
- String cacheName = getTable().cacheName();
+ p = f.forCache(cacheName);
+ }
- p = f.forCache(cacheName);
+ mvccFilter = qctx.mvccFilter();
}
int seg = threadLocalSegment();
H2Tree tree = treeForRead(seg);
- return new H2Cursor(tree.find(lower, upper), p);
+ assert !cctx.mvccEnabled() || mvccFilter != null;
+
+ return new H2Cursor(tree.find((GridH2SearchRow)lower, (GridH2SearchRow)upper, mvccFilter, null), p);
}
catch (IgniteCheckedException e) {
throw DbException.convert(e);
@@ -205,7 +221,28 @@ public class H2TreeIndex extends GridH2IndexBase {
}
/** {@inheritDoc} */
+ @Override public boolean putx(GridH2Row row) {
+ try {
+ InlineIndexHelper.setCurrentInlineIndexes(inlineIdxs);
+
+ int seg = segmentForRow(row);
+
+ H2Tree tree = treeForRead(seg);
+
+ return tree.putx(row);
+ }
+ catch (IgniteCheckedException e) {
+ throw DbException.convert(e);
+ }
+ finally {
+ InlineIndexHelper.clearCurrentInlineIndexes();
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public GridH2Row remove(SearchRow row) {
+ assert row instanceof GridH2SearchRow : row;
+
try {
InlineIndexHelper.setCurrentInlineIndexes(inlineIdxs);
@@ -213,7 +250,7 @@ public class H2TreeIndex extends GridH2IndexBase {
H2Tree tree = treeForRead(seg);
- return tree.remove(row);
+ return tree.remove((GridH2SearchRow)row);
}
catch (IgniteCheckedException e) {
throw DbException.convert(e);
@@ -225,6 +262,8 @@ public class H2TreeIndex extends GridH2IndexBase {
/** {@inheritDoc} */
@Override public void removex(SearchRow row) {
+ assert row instanceof GridH2SearchRow : row;
+
try {
InlineIndexHelper.setCurrentInlineIndexes(inlineIdxs);
@@ -232,7 +271,7 @@ public class H2TreeIndex extends GridH2IndexBase {
H2Tree tree = treeForRead(seg);
- tree.removex(row);
+ tree.removex((GridH2SearchRow)row);
}
catch (IgniteCheckedException e) {
throw DbException.convert(e);
@@ -282,6 +321,17 @@ public class H2TreeIndex extends GridH2IndexBase {
H2Tree tree = treeForRead(seg);
+ if (cctx.mvccEnabled()) {
+ GridH2QueryContext qctx = GridH2QueryContext.get();
+
+ assert qctx != null;
+
+ H2TreeMvccFilterClosure mvccFilter = qctx.mvccFilter();
+
+ assert mvccFilter != null;
+ // TODO IGNITE-3478 (support filter for first/last)
+ }
+
GridH2Row row = b ? tree.findFirst(): tree.findLast();
return new SingleRowCursor(row);
@@ -321,11 +371,13 @@ public class H2TreeIndex extends GridH2IndexBase {
@Override protected GridCursor<GridH2Row> doFind0(
IgniteTree t,
@Nullable SearchRow first,
- boolean includeFirst,
@Nullable SearchRow last,
- IndexingQueryFilter filter) {
+ IndexingQueryFilter filter,
+ H2TreeMvccFilterClosure mvccFilter) {
try {
- GridCursor<GridH2Row> range = t.find(first, last);
+ assert !cctx.mvccEnabled() || mvccFilter != null;
+
+ GridCursor<GridH2Row> range = ((BPlusTree)t).find(first, last, mvccFilter, null);
if (range == null)
return EMPTY_CURSOR;
http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeMvccFilterClosure.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeMvccFilterClosure.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeMvccFilterClosure.java
new file mode 100644
index 0000000..6ae2312
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeMvccFilterClosure.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.database;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
+import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
+import org.apache.ignite.internal.processors.query.h2.database.io.H2RowLinkIO;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2Row;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2SearchRow;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.assertMvccVersionValid;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.unmaskCoordinatorVersion;
+
+/**
+ *
+ */
+public class H2TreeMvccFilterClosure implements H2Tree.TreeRowClosure<GridH2SearchRow, GridH2Row> {
+ /** */
+ private final MvccCoordinatorVersion mvccVer;
+
+ /**
+ * @param mvccVer Mvcc version.
+ */
+ public H2TreeMvccFilterClosure(MvccCoordinatorVersion mvccVer) {
+ assert mvccVer != null;
+
+ this.mvccVer = mvccVer;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(BPlusTree<GridH2SearchRow, GridH2Row> tree,
+ BPlusIO<GridH2SearchRow> io,
+ long pageAddr,
+ int idx) throws IgniteCheckedException {
+ H2RowLinkIO rowIo = (H2RowLinkIO)io;
+
+ assert rowIo.storeMvccInfo() : rowIo;
+
+ long rowCrdVer = rowIo.getMvccCoordinatorVersion(pageAddr, idx);
+
+ assert unmaskCoordinatorVersion(rowCrdVer) == rowCrdVer : rowCrdVer;
+ assert rowCrdVer > 0 : rowCrdVer;
+
+ int cmp = Long.compare(mvccVer.coordinatorVersion(), rowCrdVer);
+
+ if (cmp == 0) {
+ long rowCntr = rowIo.getMvccCounter(pageAddr, idx);
+
+ cmp = Long.compare(mvccVer.counter(), rowCntr);
+
+ return cmp >= 0 &&
+ !newVersionAvailable(rowIo, pageAddr, idx) &&
+ !mvccVer.activeTransactions().contains(rowCntr);
+ }
+ else
+ return cmp > 0;
+ }
+
+ /**
+ * @param rowIo Row IO.
+ * @param pageAddr Page address.
+ * @param idx Item index.
+ * @return {@code True}
+ */
+ private boolean newVersionAvailable(H2RowLinkIO rowIo, long pageAddr, int idx) {
+ long newCrdVer = rowIo.getNewMvccCoordinatorVersion(pageAddr, idx);
+
+ if (newCrdVer == 0)
+ return false;
+
+ int cmp = Long.compare(mvccVer.coordinatorVersion(), newCrdVer);
+
+ if (cmp == 0) {
+ long newCntr = rowIo.getNewMvccCounter(pageAddr, idx);
+
+ assert assertMvccVersionValid(newCrdVer, newCntr);
+
+ return newCntr <= mvccVer.counter() && !mvccVer.activeTransactions().contains(newCntr);
+ }
+ else
+ return cmp < 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(H2TreeMvccFilterClosure.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/AbstractH2ExtrasInnerIO.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/AbstractH2ExtrasInnerIO.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/AbstractH2ExtrasInnerIO.java
new file mode 100644
index 0000000..550aade
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/AbstractH2ExtrasInnerIO.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.database.io;
+
+import java.util.List;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.PageUtils;
+import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusInnerIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.IOVersions;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import org.apache.ignite.internal.processors.query.h2.database.H2Tree;
+import org.apache.ignite.internal.processors.query.h2.database.InlineIndexHelper;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2Row;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2SearchRow;
+
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.MVCC_COUNTER_NA;
+
+/**
+ * Inner page for H2 row references.
+ */
+public abstract class AbstractH2ExtrasInnerIO extends BPlusInnerIO<GridH2SearchRow> implements H2RowLinkIO {
+ /** Payload size. */
+ private final int payloadSize;
+
+ /** */
+ public static void register() {
+ register(false);
+
+ register(true);
+ }
+
+ /**
+ * @param mvcc Mvcc flag.
+ */
+ private static void register(boolean mvcc) {
+ short type = mvcc ? PageIO.T_H2_EX_REF_MVCC_INNER_START : PageIO.T_H2_EX_REF_INNER_START;
+
+ for (short payload = 1; payload <= PageIO.MAX_PAYLOAD_SIZE; payload++) {
+ IOVersions<? extends AbstractH2ExtrasInnerIO> io =
+ getVersions((short)(type + payload - 1), payload, mvcc);
+
+ PageIO.registerH2ExtraInner(io, mvcc);
+ }
+ }
+
+ /**
+ * @param payload Payload size.
+ * @param mvccEnabled Mvcc flag.
+ * @return IOVersions for given payload.
+ */
+ @SuppressWarnings("unchecked")
+ public static IOVersions<? extends BPlusInnerIO<GridH2SearchRow>> getVersions(int payload, boolean mvccEnabled) {
+ assert payload >= 0 && payload <= PageIO.MAX_PAYLOAD_SIZE;
+
+ if (payload == 0)
+ return mvccEnabled ? H2MvccInnerIO.VERSIONS : H2InnerIO.VERSIONS;
+ else
+ return (IOVersions<BPlusInnerIO<GridH2SearchRow>>)PageIO.getInnerVersions((short)(payload - 1), mvccEnabled);
+ }
+
+ /**
+ * @param type Type.
+ * @param payload Payload size.
+ * @param mvcc Mvcc flag.
+ * @return Instance of IO versions.
+ */
+ private static IOVersions<? extends AbstractH2ExtrasInnerIO> getVersions(short type, short payload, boolean mvcc) {
+ return new IOVersions<>(mvcc ? new H2MvccExtrasInnerIO(type, 1, payload) : new H2ExtrasInnerIO(type, 1, payload));
+ }
+
+ /**
+ * @param type Page type.
+ * @param ver Page format version.
+ * @param itemSize Item size.
+ * @param payloadSize Payload size.
+ */
+ AbstractH2ExtrasInnerIO(short type, int ver, int itemSize, int payloadSize) {
+ super(type, ver, true, itemSize + payloadSize);
+
+ this.payloadSize = payloadSize;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("ForLoopReplaceableByForEach")
+ @Override public final void storeByOffset(long pageAddr, int off, GridH2SearchRow row) {
+ GridH2Row row0 = (GridH2Row)row;
+
+ assert row0.link() != 0 : row0;
+
+ List<InlineIndexHelper> inlineIdxs = InlineIndexHelper.getCurrentInlineIndexes();
+
+ assert inlineIdxs != null : "no inline index helpers";
+
+
+ int fieldOff = 0;
+
+ for (int i = 0; i < inlineIdxs.size(); i++) {
+ InlineIndexHelper idx = inlineIdxs.get(i);
+
+ int size = idx.put(pageAddr, off + fieldOff, row.getValue(idx.columnIndex()), payloadSize - fieldOff);
+
+ if (size == 0)
+ break;
+
+ fieldOff += size;
+ }
+
+ H2IOUtils.storeRow(row0, pageAddr, off + payloadSize, storeMvccInfo());
+ }
+
+ /** {@inheritDoc} */
+ @Override public final GridH2SearchRow getLookupRow(BPlusTree<GridH2SearchRow, ?> tree, long pageAddr, int idx)
+ throws IgniteCheckedException {
+ long link = getLink(pageAddr, idx);
+
+ assert link != 0;
+
+ if (storeMvccInfo()) {
+ long mvccCrdVer = getMvccCoordinatorVersion(pageAddr, idx);
+ long mvccCntr = getMvccCounter(pageAddr, idx);
+
+ return ((H2Tree)tree).getRowFactory().getMvccRow(link, mvccCrdVer, mvccCntr);
+ }
+
+ return ((H2Tree)tree).getRowFactory().getRow(link);
+ }
+
+ /** {@inheritDoc} */
+ @Override public final void store(long dstPageAddr, int dstIdx, BPlusIO<GridH2SearchRow> srcIo, long srcPageAddr, int srcIdx) {
+ int srcOff = srcIo.offset(srcIdx);
+
+ byte[] payload = PageUtils.getBytes(srcPageAddr, srcOff, payloadSize);
+ long link = PageUtils.getLong(srcPageAddr, srcOff + payloadSize);
+
+ assert link != 0;
+
+ int dstOff = offset(dstIdx);
+
+ PageUtils.putBytes(dstPageAddr, dstOff, payload);
+
+ H2IOUtils.store(dstPageAddr, dstOff + payloadSize, srcIo, srcPageAddr, srcIdx, storeMvccInfo());
+ }
+
+ /** {@inheritDoc} */
+ @Override public final long getLink(long pageAddr, int idx) {
+ return PageUtils.getLong(pageAddr, offset(idx) + payloadSize);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getMvccCoordinatorVersion(long pageAddr, int idx) {
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getMvccCounter(long pageAddr, int idx) {
+ return MVCC_COUNTER_NA;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getNewMvccCoordinatorVersion(long pageAddr, int idx) {
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getNewMvccCounter(long pageAddr, int idx) {
+ return MVCC_COUNTER_NA;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean storeMvccInfo() {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/AbstractH2ExtrasLeafIO.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/AbstractH2ExtrasLeafIO.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/AbstractH2ExtrasLeafIO.java
new file mode 100644
index 0000000..7beecf2
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/AbstractH2ExtrasLeafIO.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.database.io;
+
+import java.util.List;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.PageUtils;
+import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusLeafIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.IOVersions;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import org.apache.ignite.internal.processors.query.h2.database.H2Tree;
+import org.apache.ignite.internal.processors.query.h2.database.InlineIndexHelper;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2Row;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2SearchRow;
+
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.MVCC_COUNTER_NA;
+
+/**
+ * Leaf page for H2 row references.
+ */
+public class AbstractH2ExtrasLeafIO extends BPlusLeafIO<GridH2SearchRow> implements H2RowLinkIO {
+ /** Payload size. */
+ private final int payloadSize;
+
+ /** */
+ public static void register() {
+ register(false);
+
+ register(true);
+ }
+
+ /**
+ * @param mvcc Mvcc flag.
+ */
+ private static void register(boolean mvcc) {
+ short type = mvcc ? PageIO.T_H2_EX_REF_MVCC_LEAF_START : PageIO.T_H2_EX_REF_LEAF_START;
+
+ for (short payload = 1; payload <= PageIO.MAX_PAYLOAD_SIZE; payload++) {
+ IOVersions<? extends AbstractH2ExtrasLeafIO> io =
+ getVersions((short)(type + payload - 1), payload, mvcc);
+
+ PageIO.registerH2ExtraLeaf(io, mvcc);
+ }
+ }
+
+ /**
+ * @param payload Payload size.
+ * @param mvccEnabled Mvcc flag.
+ * @return IOVersions for given payload.
+ */
+ @SuppressWarnings("unchecked")
+ public static IOVersions<? extends BPlusLeafIO<GridH2SearchRow>> getVersions(int payload, boolean mvccEnabled) {
+ assert payload >= 0 && payload <= PageIO.MAX_PAYLOAD_SIZE;
+
+ if (payload == 0)
+ return mvccEnabled ? H2MvccLeafIO.VERSIONS : H2LeafIO.VERSIONS;
+ else
+ return (IOVersions<BPlusLeafIO<GridH2SearchRow>>)PageIO.getLeafVersions((short)(payload - 1), mvccEnabled);
+ }
+
+ /**
+ * @param type Type.
+ * @param payload Payload size.
+ * @param mvcc Mvcc flag.
+ * @return Versions.
+ */
+ private static IOVersions<? extends AbstractH2ExtrasLeafIO> getVersions(short type, short payload, boolean mvcc) {
+ return new IOVersions<>(mvcc ? new H2MvccExtrasLeafIO(type, 1, payload) : new H2ExtrasLeafIO(type, 1, payload));
+ }
+
+ /**
+ * @param type Page type.
+ * @param ver Page format version.
+ * @param itemSize Item size.
+ * @param payloadSize Payload size.
+ */
+ AbstractH2ExtrasLeafIO(short type, int ver, int itemSize, int payloadSize) {
+ super(type, ver, itemSize + payloadSize);
+
+ this.payloadSize = payloadSize;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("ForLoopReplaceableByForEach")
+ @Override public final void storeByOffset(long pageAddr, int off, GridH2SearchRow row) {
+ GridH2Row row0 = (GridH2Row)row;
+
+ assert row0.link() != 0;
+
+ List<InlineIndexHelper> inlineIdxs = InlineIndexHelper.getCurrentInlineIndexes();
+
+ assert inlineIdxs != null : "no inline index helpers";
+
+ int fieldOff = 0;
+
+ for (int i = 0; i < inlineIdxs.size(); i++) {
+ InlineIndexHelper idx = inlineIdxs.get(i);
+
+ int size = idx.put(pageAddr, off + fieldOff, row.getValue(idx.columnIndex()), payloadSize - fieldOff);
+
+ if (size == 0)
+ break;
+
+ fieldOff += size;
+ }
+
+ H2IOUtils.storeRow(row0, pageAddr, off + payloadSize, storeMvccInfo());
+ }
+
+ /** {@inheritDoc} */
+ @Override public final void store(long dstPageAddr, int dstIdx, BPlusIO<GridH2SearchRow> srcIo, long srcPageAddr, int srcIdx) {
+ int srcOff = srcIo.offset(srcIdx);
+
+ byte[] payload = PageUtils.getBytes(srcPageAddr, srcOff, payloadSize);
+ long link = PageUtils.getLong(srcPageAddr, srcOff + payloadSize);
+
+ assert link != 0;
+
+ int dstOff = offset(dstIdx);
+
+ PageUtils.putBytes(dstPageAddr, dstOff, payload);
+
+ H2IOUtils.store(dstPageAddr, dstOff + payloadSize, srcIo, srcPageAddr, srcIdx, storeMvccInfo());
+ }
+
+ /** {@inheritDoc} */
+ @Override public final GridH2SearchRow getLookupRow(BPlusTree<GridH2SearchRow, ?> tree, long pageAddr, int idx)
+ throws IgniteCheckedException {
+ long link = getLink(pageAddr, idx);
+
+ if (storeMvccInfo()) {
+ long mvccCrdVer = getMvccCoordinatorVersion(pageAddr, idx);
+ long mvccCntr = getMvccCounter(pageAddr, idx);
+
+ return ((H2Tree)tree).getRowFactory().getMvccRow(link, mvccCrdVer, mvccCntr);
+ }
+
+ return ((H2Tree)tree).getRowFactory().getRow(link);
+ }
+
+ /** {@inheritDoc} */
+ @Override public final long getLink(long pageAddr, int idx) {
+ return PageUtils.getLong(pageAddr, offset(idx) + payloadSize);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getMvccCoordinatorVersion(long pageAddr, int idx) {
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getMvccCounter(long pageAddr, int idx) {
+ return MVCC_COUNTER_NA;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getNewMvccCoordinatorVersion(long pageAddr, int idx) {
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getNewMvccCounter(long pageAddr, int idx) {
+ return MVCC_COUNTER_NA;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean storeMvccInfo() {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/AbstractH2InnerIO.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/AbstractH2InnerIO.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/AbstractH2InnerIO.java
new file mode 100644
index 0000000..2f12e75
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/AbstractH2InnerIO.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.database.io;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.PageUtils;
+import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusInnerIO;
+import org.apache.ignite.internal.processors.query.h2.database.H2Tree;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2Row;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2SearchRow;
+
+/**
+ * Inner page for H2 row references.
+ */
+public abstract class AbstractH2InnerIO extends BPlusInnerIO<GridH2SearchRow> implements H2RowLinkIO {
+ /**
+ * @param type Page type.
+ * @param ver Page format version.
+ * @param itemSize Single item size on page.
+ */
+ AbstractH2InnerIO(int type, int ver, int itemSize) {
+ super(type, ver, true, itemSize);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean storeMvccInfo() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void storeByOffset(long pageAddr, int off, GridH2SearchRow row) {
+ GridH2Row row0 = (GridH2Row)row;
+
+ H2IOUtils.storeRow(row0, pageAddr, off, storeMvccInfo());
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridH2SearchRow getLookupRow(BPlusTree<GridH2SearchRow, ?> tree, long pageAddr, int idx)
+ throws IgniteCheckedException {
+ long link = getLink(pageAddr, idx);
+
+ if (storeMvccInfo()) {
+ long mvccCrdVer = getMvccCoordinatorVersion(pageAddr, idx);
+ long mvccCntr = getMvccCounter(pageAddr, idx);
+
+ return ((H2Tree)tree).getRowFactory().getMvccRow(link, mvccCrdVer, mvccCntr);
+ }
+
+ return ((H2Tree)tree).getRowFactory().getRow(link);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void store(long dstPageAddr, int dstIdx, BPlusIO<GridH2SearchRow> srcIo, long srcPageAddr, int srcIdx) {
+ H2IOUtils.store(dstPageAddr, offset(dstIdx), srcIo, srcPageAddr, srcIdx, storeMvccInfo());
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getLink(long pageAddr, int idx) {
+ return PageUtils.getLong(pageAddr, offset(idx));
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getMvccCoordinatorVersion(long pageAddr, int idx) {
+ assert storeMvccInfo();
+
+ return PageUtils.getLong(pageAddr, offset(idx) + 8);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getMvccCounter(long pageAddr, int idx) {
+ assert storeMvccInfo();
+
+ return PageUtils.getLong(pageAddr, offset(idx) + 16);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getNewMvccCoordinatorVersion(long pageAddr, int idx) {
+ assert storeMvccInfo();
+
+ return PageUtils.getLong(pageAddr, offset(idx) + 24);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getNewMvccCounter(long pageAddr, int idx) {
+ assert storeMvccInfo();
+
+ return PageUtils.getLong(pageAddr, offset(idx) + 32);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/AbstractH2LeafIO.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/AbstractH2LeafIO.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/AbstractH2LeafIO.java
new file mode 100644
index 0000000..a5cf7c2
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/AbstractH2LeafIO.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.database.io;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.PageUtils;
+import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusLeafIO;
+import org.apache.ignite.internal.processors.query.h2.database.H2Tree;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2Row;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2SearchRow;
+
+/**
+ * Leaf page for H2 row references.
+ */
+public abstract class AbstractH2LeafIO extends BPlusLeafIO<GridH2SearchRow> implements H2RowLinkIO {
+ /**
+ * @param type Page type.
+ * @param ver Page format version.
+ * @param itemSize Single item size on page.
+ */
+ AbstractH2LeafIO(int type, int ver, int itemSize) {
+ super(type, ver, itemSize);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean storeMvccInfo() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public final void storeByOffset(long pageAddr, int off, GridH2SearchRow row) {
+ GridH2Row row0 = (GridH2Row)row;
+
+ H2IOUtils.storeRow(row0, pageAddr, off, storeMvccInfo());
+ }
+
+ /** {@inheritDoc} */
+ @Override public final void store(long dstPageAddr, int dstIdx, BPlusIO<GridH2SearchRow> srcIo, long srcPageAddr, int srcIdx) {
+ assert srcIo == this;
+
+ H2IOUtils.store(dstPageAddr, offset(dstIdx), srcIo, srcPageAddr, srcIdx, storeMvccInfo());
+ }
+
+ /** {@inheritDoc} */
+ @Override public final GridH2SearchRow getLookupRow(BPlusTree<GridH2SearchRow,?> tree, long pageAddr, int idx)
+ throws IgniteCheckedException {
+ long link = getLink(pageAddr, idx);
+
+ if (storeMvccInfo()) {
+ long mvccCrdVer = getMvccCoordinatorVersion(pageAddr, idx);
+ long mvccCntr = getMvccCounter(pageAddr, idx);
+
+ return ((H2Tree)tree).getRowFactory().getMvccRow(link, mvccCrdVer, mvccCntr);
+ }
+
+ return ((H2Tree)tree).getRowFactory().getRow(link);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getLink(long pageAddr, int idx) {
+ return PageUtils.getLong(pageAddr, offset(idx));
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getMvccCoordinatorVersion(long pageAddr, int idx) {
+ assert storeMvccInfo();
+
+ return PageUtils.getLong(pageAddr, offset(idx) + 8);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getMvccCounter(long pageAddr, int idx) {
+ assert storeMvccInfo();
+
+ return PageUtils.getLong(pageAddr, offset(idx) + 16);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getNewMvccCoordinatorVersion(long pageAddr, int idx) {
+ assert storeMvccInfo();
+
+ return PageUtils.getLong(pageAddr, offset(idx) + 24);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getNewMvccCounter(long pageAddr, int idx) {
+ assert storeMvccInfo();
+
+ return PageUtils.getLong(pageAddr, offset(idx) + 32);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2ExtrasInnerIO.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2ExtrasInnerIO.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2ExtrasInnerIO.java
index b8877e9..8dc8c96 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2ExtrasInnerIO.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2ExtrasInnerIO.java
@@ -17,124 +17,17 @@
package org.apache.ignite.internal.processors.query.h2.database.io;
-import java.util.List;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.pagemem.PageUtils;
-import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
-import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
-import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusInnerIO;
-import org.apache.ignite.internal.processors.cache.persistence.tree.io.IOVersions;
-import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
-import org.apache.ignite.internal.processors.query.h2.database.H2Tree;
-import org.apache.ignite.internal.processors.query.h2.database.InlineIndexHelper;
-import org.apache.ignite.internal.processors.query.h2.opt.GridH2Row;
-import org.h2.result.SearchRow;
-
/**
* Inner page for H2 row references.
*/
-public class H2ExtrasInnerIO extends BPlusInnerIO<SearchRow> {
- /** Payload size. */
- private final int payloadSize;
-
- /** */
- public static void register() {
- for (short payload = 1; payload <= PageIO.MAX_PAYLOAD_SIZE; payload++)
- PageIO.registerH2ExtraInner(getVersions((short)(PageIO.T_H2_EX_REF_INNER_START + payload - 1), payload));
- }
-
- /**
- * @param payload Payload size.
- * @return IOVersions for given payload.
- */
- @SuppressWarnings("unchecked")
- public static IOVersions<? extends BPlusInnerIO<SearchRow>> getVersions(int payload) {
- assert payload >= 0 && payload <= PageIO.MAX_PAYLOAD_SIZE;
-
- if (payload == 0)
- return H2InnerIO.VERSIONS;
- else
- return (IOVersions<BPlusInnerIO<SearchRow>>)PageIO.getInnerVersions((short)(payload - 1));
- }
-
- /**
- * @param type Type.
- * @param payload Payload size.
- * @return Instance of IO versions.
- */
- private static IOVersions<H2ExtrasInnerIO> getVersions(short type, short payload) {
- return new IOVersions<>(new H2ExtrasInnerIO(type, 1, payload));
- }
-
+public class H2ExtrasInnerIO extends AbstractH2ExtrasInnerIO implements H2RowLinkIO {
/**
* @param type Page type.
* @param ver Page format version.
* @param payloadSize Payload size.
*/
- private H2ExtrasInnerIO(short type, int ver, int payloadSize) {
- super(type, ver, true, 8 + payloadSize);
- this.payloadSize = payloadSize;
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("ForLoopReplaceableByForEach")
- @Override public void storeByOffset(long pageAddr, int off, SearchRow row) {
- GridH2Row row0 = (GridH2Row)row;
-
- assert row0.link() != 0 : row0;
-
- List<InlineIndexHelper> inlineIdxs = InlineIndexHelper.getCurrentInlineIndexes();
-
- assert inlineIdxs != null : "no inline index helpers";
-
-
- int fieldOff = 0;
-
- for (int i = 0; i < inlineIdxs.size(); i++) {
- InlineIndexHelper idx = inlineIdxs.get(i);
-
- int size = idx.put(pageAddr, off + fieldOff, row.getValue(idx.columnIndex()), payloadSize - fieldOff);
-
- if (size == 0)
- break;
-
- fieldOff += size;
- }
-
- PageUtils.putLong(pageAddr, off + payloadSize, row0.link());
- }
-
- /** {@inheritDoc} */
- @Override public SearchRow getLookupRow(BPlusTree<SearchRow, ?> tree, long pageAddr, int idx)
- throws IgniteCheckedException {
- long link = getLink(pageAddr, idx);
-
- assert link != 0;
-
- return ((H2Tree)tree).getRowFactory().getRow(link);
- }
-
- /** {@inheritDoc} */
- @Override public void store(long dstPageAddr, int dstIdx, BPlusIO<SearchRow> srcIo, long srcPageAddr, int srcIdx) {
- int srcOff = srcIo.offset(srcIdx);
-
- byte[] payload = PageUtils.getBytes(srcPageAddr, srcOff, payloadSize);
- long link = PageUtils.getLong(srcPageAddr, srcOff + payloadSize);
-
- assert link != 0;
-
- int dstOff = offset(dstIdx);
-
- PageUtils.putBytes(dstPageAddr, dstOff, payload);
- PageUtils.putLong(dstPageAddr, dstOff + payloadSize, link);
- }
-
- /**
- * @param pageAddr Page address.
- * @param idx Index.
- * @return Link to row.
- */
- private long getLink(long pageAddr, int idx) {
- return PageUtils.getLong(pageAddr, offset(idx) + payloadSize);
+ H2ExtrasInnerIO(short type, int ver, int payloadSize) {
+ super(type, ver, 8, payloadSize);
}
}
+
http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2ExtrasLeafIO.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2ExtrasLeafIO.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2ExtrasLeafIO.java
index 6161f8d..085f98b 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2ExtrasLeafIO.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2ExtrasLeafIO.java
@@ -17,121 +17,16 @@
package org.apache.ignite.internal.processors.query.h2.database.io;
-import java.util.List;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.pagemem.PageUtils;
-import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
-import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
-import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusLeafIO;
-import org.apache.ignite.internal.processors.cache.persistence.tree.io.IOVersions;
-import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
-import org.apache.ignite.internal.processors.query.h2.database.H2Tree;
-import org.apache.ignite.internal.processors.query.h2.database.InlineIndexHelper;
-import org.apache.ignite.internal.processors.query.h2.opt.GridH2Row;
-import org.h2.result.SearchRow;
-
/**
* Leaf page for H2 row references.
*/
-public class H2ExtrasLeafIO extends BPlusLeafIO<SearchRow> {
- /** Payload size. */
- private final int payloadSize;
-
- /** */
- public static void register() {
- for (short payload = 1; payload <= PageIO.MAX_PAYLOAD_SIZE; payload++)
- PageIO.registerH2ExtraLeaf(getVersions((short)(PageIO.T_H2_EX_REF_LEAF_START + payload - 1), payload));
- }
-
- /**
- * @param payload Payload size.
- * @return IOVersions for given payload.
- */
- @SuppressWarnings("unchecked")
- public static IOVersions<? extends BPlusLeafIO<SearchRow>> getVersions(int payload) {
- assert payload >= 0 && payload <= PageIO.MAX_PAYLOAD_SIZE;
-
- if (payload == 0)
- return H2LeafIO.VERSIONS;
- else
- return (IOVersions<BPlusLeafIO<SearchRow>>)PageIO.getLeafVersions((short)(payload - 1));
- }
-
- /**
- * @param type Type.
- * @param payload Payload size.
- * @return Versions.
- */
- private static IOVersions<H2ExtrasLeafIO> getVersions(short type, short payload) {
- return new IOVersions<>(new H2ExtrasLeafIO(type, 1, payload));
- }
-
+public class H2ExtrasLeafIO extends AbstractH2ExtrasLeafIO {
/**
* @param type Page type.
* @param ver Page format version.
* @param payloadSize Payload size.
*/
- private H2ExtrasLeafIO(short type, int ver, int payloadSize) {
- super(type, ver, 8 + payloadSize);
- this.payloadSize = payloadSize;
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("ForLoopReplaceableByForEach")
- @Override public void storeByOffset(long pageAddr, int off, SearchRow row) {
- GridH2Row row0 = (GridH2Row)row;
-
- assert row0.link() != 0;
-
- List<InlineIndexHelper> inlineIdxs = InlineIndexHelper.getCurrentInlineIndexes();
-
- assert inlineIdxs != null : "no inline index helpers";
-
- int fieldOff = 0;
-
- for (int i = 0; i < inlineIdxs.size(); i++) {
- InlineIndexHelper idx = inlineIdxs.get(i);
-
- int size = idx.put(pageAddr, off + fieldOff, row.getValue(idx.columnIndex()), payloadSize - fieldOff);
-
- if (size == 0)
- break;
-
- fieldOff += size;
- }
-
- PageUtils.putLong(pageAddr, off + payloadSize, row0.link());
- }
-
- /** {@inheritDoc} */
- @Override public void store(long dstPageAddr, int dstIdx, BPlusIO<SearchRow> srcIo, long srcPageAddr, int srcIdx) {
- int srcOff = srcIo.offset(srcIdx);
-
- byte[] payload = PageUtils.getBytes(srcPageAddr, srcOff, payloadSize);
- long link = PageUtils.getLong(srcPageAddr, srcOff + payloadSize);
-
- assert link != 0;
-
- int dstOff = offset(dstIdx);
-
- PageUtils.putBytes(dstPageAddr, dstOff, payload);
- PageUtils.putLong(dstPageAddr, dstOff + payloadSize, link);
- }
-
- /** {@inheritDoc} */
- @Override public SearchRow getLookupRow(BPlusTree<SearchRow, ?> tree, long pageAddr, int idx)
- throws IgniteCheckedException {
- long link = getLink(pageAddr, idx);
-
- return ((H2Tree)tree).getRowFactory().getRow(link);
- }
-
- /**
- * @param pageAddr Page address.
- * @param idx Index.
- * @return Link to row.
- */
- private long getLink(long pageAddr, int idx) {
- return PageUtils.getLong(pageAddr, offset(idx) + payloadSize);
+ H2ExtrasLeafIO(short type, int ver, int payloadSize) {
+ super(type, ver, 8, payloadSize);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2IOUtils.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2IOUtils.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2IOUtils.java
new file mode 100644
index 0000000..c0b2314
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2IOUtils.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.database.io;
+
+import org.apache.ignite.internal.pagemem.PageUtils;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2Row;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2SearchRow;
+
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.assertMvccVersionValid;
+
+/**
+ *
+ */
+class H2IOUtils {
+ /**
+ *
+ */
+ private H2IOUtils() {}
+
+ /**
+ * @param row Row.
+ * @param pageAddr Page address.
+ * @param off Offset.
+ * @param storeMvcc {@code True} to store mvcc data.
+ */
+ static void storeRow(GridH2Row row, long pageAddr, int off, boolean storeMvcc) {
+ assert row.link() != 0;
+
+ PageUtils.putLong(pageAddr, off, row.link());
+
+ if (storeMvcc) {
+ long mvccCrdVer = row.mvccCoordinatorVersion();
+ long mvccCntr = row.mvccCounter();
+
+ assert assertMvccVersionValid(mvccCrdVer, mvccCntr);
+
+ PageUtils.putLong(pageAddr, off + 8, mvccCrdVer);
+ PageUtils.putLong(pageAddr, off + 16, mvccCntr);
+
+ long newMvccCrdVer = row.newMvccCoordinatorVersion();
+
+ PageUtils.putLong(pageAddr, off + 24, newMvccCrdVer);
+
+ if (newMvccCrdVer != 0) {
+ long newMvccCntr = row.newMvccCounter();
+
+ assert assertMvccVersionValid(newMvccCrdVer, newMvccCntr);
+
+ PageUtils.putLong(pageAddr, off + 32, newMvccCntr);
+ }
+ }
+ }
+
+ /**
+ * @param dstPageAddr Destination page address.
+ * @param dstOff Destination page offset.
+ * @param srcIo Source IO.
+ * @param srcPageAddr Source page address.
+ * @param srcIdx Source index.
+ * @param storeMvcc {@code True} to store mvcc data.
+ */
+ static void store(long dstPageAddr,
+ int dstOff,
+ BPlusIO<GridH2SearchRow> srcIo,
+ long srcPageAddr,
+ int srcIdx,
+ boolean storeMvcc)
+ {
+ H2RowLinkIO rowIo = (H2RowLinkIO)srcIo;
+
+ long link = rowIo.getLink(srcPageAddr, srcIdx);
+
+ PageUtils.putLong(dstPageAddr, dstOff, link);
+
+ if (storeMvcc) {
+ long mvccCrdVer = rowIo.getMvccCoordinatorVersion(srcPageAddr, srcIdx);
+ long mvccCntr = rowIo.getMvccCounter(srcPageAddr, srcIdx);
+
+ assert assertMvccVersionValid(mvccCrdVer, mvccCntr);
+
+ PageUtils.putLong(dstPageAddr, dstOff + 8, mvccCrdVer);
+ PageUtils.putLong(dstPageAddr, dstOff + 16, mvccCntr);
+
+ long newMvccCrdVer = rowIo.getNewMvccCoordinatorVersion(srcPageAddr, srcIdx);
+
+ PageUtils.putLong(dstPageAddr, dstOff + 24, newMvccCrdVer);
+
+ if (newMvccCrdVer != 0) {
+ long newMvccCntr = rowIo.getNewMvccCounter(srcPageAddr, srcIdx);
+
+ assertMvccVersionValid(newMvccCrdVer, newMvccCntr);
+
+ PageUtils.putLong(dstPageAddr, dstOff + 32, newMvccCntr);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2InnerIO.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2InnerIO.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2InnerIO.java
index a1f1ce9..9baff7a 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2InnerIO.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2InnerIO.java
@@ -17,20 +17,12 @@
package org.apache.ignite.internal.processors.query.h2.database.io;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.pagemem.PageUtils;
-import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
-import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
-import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusInnerIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.IOVersions;
-import org.apache.ignite.internal.processors.query.h2.database.H2Tree;
-import org.apache.ignite.internal.processors.query.h2.opt.GridH2Row;
-import org.h2.result.SearchRow;
/**
* Inner page for H2 row references.
*/
-public class H2InnerIO extends BPlusInnerIO<SearchRow> implements H2RowLinkIO {
+public class H2InnerIO extends AbstractH2InnerIO {
/** */
public static final IOVersions<H2InnerIO> VERSIONS = new IOVersions<>(
new H2InnerIO(1)
@@ -40,35 +32,6 @@ public class H2InnerIO extends BPlusInnerIO<SearchRow> implements H2RowLinkIO {
* @param ver Page format version.
*/
private H2InnerIO(int ver) {
- super(T_H2_REF_INNER, ver, true, 8);
- }
-
- /** {@inheritDoc} */
- @Override public void storeByOffset(long pageAddr, int off, SearchRow row) {
- GridH2Row row0 = (GridH2Row)row;
-
- assert row0.link() != 0;
-
- PageUtils.putLong(pageAddr, off, row0.link());
- }
-
- /** {@inheritDoc} */
- @Override public SearchRow getLookupRow(BPlusTree<SearchRow,?> tree, long pageAddr, int idx)
- throws IgniteCheckedException {
- long link = getLink(pageAddr, idx);
-
- return ((H2Tree)tree).getRowFactory().getRow(link);
- }
-
- /** {@inheritDoc} */
- @Override public void store(long dstPageAddr, int dstIdx, BPlusIO<SearchRow> srcIo, long srcPageAddr, int srcIdx) {
- long link = ((H2RowLinkIO)srcIo).getLink(srcPageAddr, srcIdx);
-
- PageUtils.putLong(dstPageAddr, offset(dstIdx), link);
- }
-
- /** {@inheritDoc} */
- @Override public long getLink(long pageAddr, int idx) {
- return PageUtils.getLong(pageAddr, offset(idx));
+ super(T_H2_REF_INNER, ver, 8);
}
}