You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2019/01/29 15:26:54 UTC

[ignite] 04/05: Data store.

This is an automated email from the ASF dual-hosted git repository.

vozerov pushed a commit to branch ignite-10985
in repository https://gitbox.apache.org/repos/asf/ignite.git

commit e334d35a115b4a08e463628ac96259ed86fd2c3a
Author: devozerov <vo...@gridgain.com>
AuthorDate: Tue Jan 29 18:20:26 2019 +0300

    Data store.
---
 .../cache/IgniteCacheOffheapManager.java           | 13 -------
 .../cache/IgniteCacheOffheapManagerImpl.java       |  8 +----
 .../dht/GridDhtTxAbstractEnlistFuture.java         |  4 ++-
 .../cache/persistence/GridCacheOffheapManager.java | 13 +------
 .../processors/cache/tree/CacheDataRowStore.java   | 41 +++++++++++++++++++---
 .../internal/processors/cache/tree/DataRow.java    |  5 +--
 .../cache/tree/mvcc/data/MvccDataRow.java          |  9 +++--
 .../query/h2/database/H2PkHashIndex.java           | 13 ++++++-
 .../processors/query/h2/database/H2RowFactory.java |  5 +--
 9 files changed, 66 insertions(+), 45 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index b02bc9d..c0c81c6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -1010,19 +1010,6 @@ public interface IgniteCacheOffheapManager {
             KeyCacheObject upper, Object x, MvccSnapshot snapshot) throws IgniteCheckedException;
 
         /**
-         * @param cacheId Cache ID.
-         * @param lower Lower bound.
-         * @param upper Upper bound.
-         * @param x Implementation specific argument, {@code null} always means that we need to return full detached data row.
-         * @param snapshot Mvcc snapshot.
-         * @param skipVer Whether version read should be skipped.
-         * @return Data cursor.
-         * @throws IgniteCheckedException If failed.
-         */
-        public GridCursor<? extends CacheDataRow> cursor(int cacheId, KeyCacheObject lower,
-            KeyCacheObject upper, Object x, MvccSnapshot snapshot, boolean skipVer) throws IgniteCheckedException;
-
-        /**
          * Destroys the tree associated with the store.
          *
          * @throws IgniteCheckedException If failed.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index e60a002..82c65ad 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -2759,15 +2759,9 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
         /** {@inheritDoc} */
         @Override public GridCursor<? extends CacheDataRow> cursor(int cacheId, KeyCacheObject lower,
             KeyCacheObject upper, Object x, MvccSnapshot snapshot) throws IgniteCheckedException {
-            return cursor(cacheId, lower, upper, x, snapshot, false);
-        }
-
-        /** {@inheritDoc} */
-        @Override public GridCursor<? extends CacheDataRow> cursor(int cacheId, KeyCacheObject lower,
-            KeyCacheObject upper, Object x, MvccSnapshot snapshot, boolean skipVer) throws IgniteCheckedException {
             SearchRow lowerRow;
             SearchRow upperRow;
-            // TODO: Use skiPver.
+
             if (grp.sharedGroup()) {
                 assert cacheId != CU.UNDEFINED_CACHE_ID;
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
index 3f53b48..138e24d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
@@ -732,7 +732,9 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd
                 CacheDataRowAdapter.RowData.NO_KEY,
                 row0.mvccCoordinatorVersion(),
                 row0.mvccCounter(),
-                row0.mvccOperationCounter());
+                row0.mvccOperationCounter(),
+                false
+            );
 
             GridCacheMvccEntryInfo entry = new GridCacheMvccEntryInfo();
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index b93f1ff..ac7be6e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -2231,21 +2231,10 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
             Object x,
             MvccSnapshot mvccSnapshot
         ) throws IgniteCheckedException {
-            return cursor(cacheId, lower, upper, x, mvccSnapshot, false);
-        }
-
-        /** {@inheritDoc} */
-        @Override public GridCursor<? extends CacheDataRow> cursor(int cacheId,
-            KeyCacheObject lower,
-            KeyCacheObject upper,
-            Object x,
-            MvccSnapshot mvccSnapshot,
-            boolean skipVer
-        ) throws IgniteCheckedException {
             CacheDataStore delegate = init0(true);
 
             if (delegate != null)
-                return delegate.cursor(cacheId, lower, upper, x, mvccSnapshot, skipVer);
+                return delegate.cursor(cacheId, lower, upper, x, mvccSnapshot);
 
             return EMPTY_CURSOR;
         }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataRowStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataRowStore.java
index 7aa7068..42f6c07 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataRowStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataRowStore.java
@@ -30,6 +30,18 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
  *
  */
 public class CacheDataRowStore extends RowStore {
+    /** Whether version should be skipped. */
+    private static ThreadLocal<Boolean> SKIP_VER = ThreadLocal.withInitial(() -> false);
+
+    /**
+     * Set skip version flag.
+     *
+     * @param skipVer Flag value.
+     */
+    public static void setSkipVersion(boolean skipVer) {
+        SKIP_VER.set(skipVer);
+    }
+
     /** */
     private final int partId;
 
@@ -62,7 +74,16 @@ public class CacheDataRowStore extends RowStore {
      * @return Search row.
      */
     CacheSearchRow keySearchRow(int cacheId, int hash, long link) {
-        return initDataRow(new DataRow(grp, hash, link, partId, CacheDataRowAdapter.RowData.KEY_ONLY), cacheId);
+        DataRow dataRow = new DataRow(
+            grp,
+            hash,
+            link,
+            partId,
+            CacheDataRowAdapter.RowData.KEY_ONLY,
+            SKIP_VER.get()
+        );
+
+        return initDataRow(dataRow, cacheId);
     }
 
     /**
@@ -76,14 +97,17 @@ public class CacheDataRowStore extends RowStore {
      * @return Search row.
      */
     MvccDataRow mvccRow(int cacheId, int hash, long link, CacheDataRowAdapter.RowData rowData, long crdVer, long mvccCntr, int opCntr) {
-        MvccDataRow dataRow = new MvccDataRow(grp,
+        MvccDataRow dataRow = new MvccDataRow(
+            grp,
             hash,
             link,
             partId,
             rowData,
             crdVer,
             mvccCntr,
-            opCntr);
+            opCntr,
+            SKIP_VER.get()
+        );
 
         return initDataRow(dataRow, cacheId);
     }
@@ -96,7 +120,16 @@ public class CacheDataRowStore extends RowStore {
      * @return Data row.
      */
     CacheDataRow dataRow(int cacheId, int hash, long link, CacheDataRowAdapter.RowData rowData) {
-        return initDataRow(new DataRow(grp, hash, link, partId, rowData), cacheId);
+        DataRow dataRow = new DataRow(
+            grp,
+            hash,
+            link,
+            partId,
+            rowData,
+            SKIP_VER.get()
+        );
+
+        return initDataRow(dataRow, cacheId);
     }
 
     /**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataRow.java
index 552a39c..add2abe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataRow.java
@@ -41,8 +41,9 @@ public class DataRow extends CacheDataRowAdapter {
      * @param link Link.
      * @param part Partition.
      * @param rowData Required row data.
+     * @param skipVer Whether version read should be skipped.
      */
-    protected DataRow(CacheGroupContext grp, int hash, long link, int part, RowData rowData) {
+    protected DataRow(CacheGroupContext grp, int hash, long link, int part, RowData rowData, boolean skipVer) {
         super(link);
 
         this.hash = hash;
@@ -51,7 +52,7 @@ public class DataRow extends CacheDataRowAdapter {
         try {
             // We can not init data row lazily outside of entry lock because underlying buffer can be concurrently cleared.
             if (rowData != RowData.LINK_ONLY)
-                initFromLink(grp, rowData);
+                initFromLink(grp, rowData, skipVer);
         }
         catch (IgniteCheckedException e) {
             throw new IgniteException(e);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/mvcc/data/MvccDataRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/mvcc/data/MvccDataRow.java
index cb4bc87..a3ee923 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/mvcc/data/MvccDataRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/mvcc/data/MvccDataRow.java
@@ -92,17 +92,20 @@ public class MvccDataRow extends DataRow {
      * @param crdVer Mvcc coordinator version.
      * @param mvccCntr Mvcc counter.
      * @param mvccOpCntr Mvcc operation counter.
+     * @param skipVer Skip version flag.
      */
-    public MvccDataRow(CacheGroupContext grp,
+    public MvccDataRow(
+        CacheGroupContext grp,
         int hash,
         long link,
         int part,
         RowData rowData,
         long crdVer,
         long mvccCntr,
-        int mvccOpCntr
+        int mvccOpCntr,
+        boolean skipVer
     ) {
-        super(grp, hash, link, part, rowData);
+        super(grp, hash, link, part, rowData, skipVer);
 
         assert MvccUtils.mvccVersionIsValid(crdVer, mvccCntr, mvccOpCntr);
 
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java
index fee4b19..6aa9ebf 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java
@@ -29,6 +29,7 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.processors.cache.tree.CacheDataRowStore;
 import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext;
@@ -117,6 +118,8 @@ public class H2PkHashIndex extends GridH2IndexBase {
         KeyCacheObject upperObj = upper != null ? cctx.toCacheKeyObject(upper.getValue(0).getObject()) : null;
 
         try {
+            CacheDataRowStore.setSkipVersion(true);
+
             Collection<GridCursor<? extends CacheDataRow>> cursors = new ArrayList<>();
 
             for (IgniteCacheOffheapManager.CacheDataStore store : cctx.offheap().cacheDataStores()) {
@@ -126,7 +129,7 @@ public class H2PkHashIndex extends GridH2IndexBase {
                     continue;
 
                 if (filter == null || filter.applyPartition(part))
-                    cursors.add(store.cursor(cctx.cacheId(), lowerObj, upperObj, null, mvccSnapshot, true));
+                    cursors.add(store.cursor(cctx.cacheId(), lowerObj, upperObj, null, mvccSnapshot));
             }
 
             return new H2PkHashIndexCursor(cursors.iterator());
@@ -134,6 +137,9 @@ public class H2PkHashIndex extends GridH2IndexBase {
         catch (IgniteCheckedException e) {
             throw DbException.convert(e);
         }
+        finally {
+            CacheDataRowStore.setSkipVersion(false);
+        }
     }
 
     /** {@inheritDoc} */
@@ -248,6 +254,8 @@ public class H2PkHashIndex extends GridH2IndexBase {
         /** {@inheritDoc} */
         @Override public boolean next() {
             try {
+                CacheDataRowStore.setSkipVersion(true);
+
                 GridQueryTypeDescriptor type = desc.type();
 
                 for (;;) {
@@ -269,6 +277,9 @@ public class H2PkHashIndex extends GridH2IndexBase {
             catch (IgniteCheckedException e) {
                 throw DbException.convert(e);
             }
+            finally {
+                CacheDataRowStore.setSkipVersion(false);
+            }
         }
 
         /** {@inheritDoc} */
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2RowFactory.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2RowFactory.java
index bb17706..fb7ff0b 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2RowFactory.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2RowFactory.java
@@ -82,10 +82,11 @@ public class H2RowFactory {
             0,
             link,
             partId,
-            null, // TODO: Appropriate data!
+            null,
             mvccCrdVer,
             mvccCntr,
-            mvccOpCntr
+            mvccOpCntr,
+            true
         );
 
         return rowDesc.createRow(row);