You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2021/11/25 13:59:01 UTC

[ignite] branch master updated: IGNITE-15968 Index de-fragmentation fails on DECIMAL and VARCHAR columns (#9580)

This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new c726cf1  IGNITE-15968 Index de-fragmentation fails on DECIMAL and VARCHAR columns (#9580)
c726cf1 is described below

commit c726cf1acf6f090525b65d442f49730acbcf5d04
Author: Semyon Danilov <sa...@yandex.ru>
AuthorDate: Thu Nov 25 16:58:31 2021 +0300

    IGNITE-15968 Index de-fragmentation fails on DECIMAL and VARCHAR columns (#9580)
---
 .../sorted/defragmentation/DefragIndexFactory.java | 101 +++++++++++++++++----
 .../defragmentation/IndexingDefragmentation.java   |  23 ++---
 .../query/index/sorted/inline/io/IORowHandler.java |   4 +-
 .../cache/persistence/tree/BPlusTree.java          |   6 +-
 .../IgnitePdsIndexingDefragmentationTest.java      |  94 +++++++++++++++++++
 5 files changed, 189 insertions(+), 39 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/defragmentation/DefragIndexFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/defragmentation/DefragIndexFactory.java
index 4f36444..e168c44 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/defragmentation/DefragIndexFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/defragmentation/DefragIndexFactory.java
@@ -26,20 +26,21 @@ import org.apache.ignite.internal.cache.query.index.sorted.MetaPageInfo;
 import org.apache.ignite.internal.cache.query.index.sorted.SortedIndexDefinition;
 import org.apache.ignite.internal.cache.query.index.sorted.inline.InlineIndex;
 import org.apache.ignite.internal.cache.query.index.sorted.inline.InlineIndexFactory;
-import org.apache.ignite.internal.cache.query.index.sorted.inline.InlineIndexKeyType;
 import org.apache.ignite.internal.cache.query.index.sorted.inline.InlineIndexTree;
 import org.apache.ignite.internal.cache.query.index.sorted.inline.InlineRecommender;
 import org.apache.ignite.internal.cache.query.index.sorted.inline.io.AbstractInlineInnerIO;
 import org.apache.ignite.internal.cache.query.index.sorted.inline.io.AbstractInlineLeafIO;
+import org.apache.ignite.internal.cache.query.index.sorted.inline.io.IORowHandler;
 import org.apache.ignite.internal.cache.query.index.sorted.inline.io.InlineIO;
 import org.apache.ignite.internal.cache.query.index.sorted.inline.io.InnerIO;
 import org.apache.ignite.internal.cache.query.index.sorted.inline.io.LeafIO;
-import org.apache.ignite.internal.cache.query.index.sorted.keys.IndexKey;
 import org.apache.ignite.internal.metric.IoStatisticsHolder;
 import org.apache.ignite.internal.pagemem.PageIdUtils;
 import org.apache.ignite.internal.pagemem.PageMemory;
+import org.apache.ignite.internal.pagemem.PageUtils;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter;
 import org.apache.ignite.internal.processors.cache.persistence.RootPage;
 import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
@@ -47,6 +48,7 @@ import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusInnerIO;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusLeafIO;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusMetaIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.IOVersions;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIoResolver;
 import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccDataRow;
@@ -55,6 +57,9 @@ import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccDataRow;
  * Creates temporary index to defragment old index.
  */
 public class DefragIndexFactory extends InlineIndexFactory {
+    /** Empty array. */
+    private static final byte[] EMPTY_BYTES = new byte[0];
+
     /** Temporary offheap manager. */
     private final IgniteCacheOffheapManager offheap;
 
@@ -103,6 +108,11 @@ public class DefragIndexFactory extends InlineIndexFactory {
 
         final MetaPageInfo oldInfo = oldIdx.segment(segmentNum).metaInfo();
 
+        // Set IO wrappers for the new tree.
+        BPlusInnerIO<IndexRow> innerIO = (BPlusInnerIO<IndexRow>) wrap(tree.latestInnerIO(), tree.rowHandler());
+        BPlusLeafIO<IndexRow> leafIo = (BPlusLeafIO<IndexRow>) wrap(tree.latestLeafIO(), tree.rowHandler());
+        tree.setIos(new IOVersions<>(innerIO), new IOVersions<>(leafIo));
+
         tree.copyMetaInfo(oldInfo);
 
         tree.enableSequentialWriteMode();
@@ -147,6 +157,30 @@ public class DefragIndexFactory extends InlineIndexFactory {
         }
     }
 
+    /**
+     * Stores the needed info about the row in the page. Overrides {@link BPlusIO#storeByOffset(long, int, Object)}.
+     *
+     * @param io Page io.
+     * @param pageAddr Page address.
+     * @param off Data offset.
+     * @param row H2 cache row.
+     * @param <IO> Type of the Page io.
+     */
+    private static <IO extends BPlusIO<?> & InlineIO> void storeByOffset(
+        IO io,
+        long pageAddr,
+        int off,
+        DefragIndexRowImpl row
+    ) {
+        int payloadSize = io.inlineSize();
+
+        assert row.link() != 0;
+
+        PageUtils.putBytes(pageAddr, off, row.values);
+
+        IORowHandler.store(pageAddr, off + payloadSize, row, io.storeMvccInfo());
+    }
+
     /** */
     private static <T extends BPlusIO<IndexRow> & InlineIO> IndexRow lookupRow(
         InlineIndexRowHandler rowHnd,
@@ -158,19 +192,14 @@ public class DefragIndexFactory extends InlineIndexFactory {
 
         int off = io.offset(idx);
 
-        IndexKey[] keys = new IndexKey[rowHnd.indexKeyDefinitions().size()];
+        int inlineSize = io.inlineSize();
 
-        int fieldOff = 0;
+        byte[] values;
 
-        for (int i = 0; i < rowHnd.inlineIndexKeyTypes().size(); i++) {
-            InlineIndexKeyType keyType = rowHnd.inlineIndexKeyTypes().get(i);
-
-            IndexKey key = keyType.get(pageAddr, off + fieldOff, io.inlineSize() - fieldOff);
-
-            fieldOff += keyType.inlineSize(key);
-
-            keys[i] = key;
-        }
+        if (rowHnd.inlineIndexKeyTypes().isEmpty())
+            values = EMPTY_BYTES;
+        else
+            values = PageUtils.getBytes(pageAddr, off, inlineSize);
 
         if (io.storeMvccInfo()) {
             long mvccCrdVer = io.mvccCoordinatorVersion(pageAddr, idx);
@@ -189,10 +218,10 @@ public class DefragIndexFactory extends InlineIndexFactory {
                 true
             );
 
-            return new IndexRowImpl(rowHnd, row, keys);
+            return new DefragIndexRowImpl(rowHnd, row, values);
         }
 
-        return new IndexRowImpl(rowHnd, new CacheDataRowAdapter(link), keys);
+        return new DefragIndexRowImpl(rowHnd, new CacheDataRowAdapter(link), values);
     }
 
     /** */
@@ -213,7 +242,7 @@ public class DefragIndexFactory extends InlineIndexFactory {
 
         /** {@inheritDoc} */
         @Override public void storeByOffset(long pageAddr, int off, IndexRow row) throws IgniteCheckedException {
-            io.storeByOffset(pageAddr, off, row);
+            DefragIndexFactory.storeByOffset(io, pageAddr, off, (DefragIndexRowImpl) row);
         }
 
         /** {@inheritDoc} */
@@ -277,18 +306,18 @@ public class DefragIndexFactory extends InlineIndexFactory {
 
         /** {@inheritDoc} */
         @Override public void storeByOffset(long pageAddr, int off, IndexRow row) throws IgniteCheckedException {
-            io.storeByOffset(pageAddr, off, row);
+            DefragIndexFactory.storeByOffset(io, pageAddr, off, (DefragIndexRowImpl) row);
         }
 
         /** {@inheritDoc} */
         @Override public void store(long dstPageAddr, int dstIdx, BPlusIO<IndexRow> srcIo, long srcPageAddr, int srcIdx)
-            throws IgniteCheckedException
-        {
+            throws IgniteCheckedException {
             io.store(dstPageAddr, dstIdx, srcIo, srcPageAddr, srcIdx);
         }
 
         /** {@inheritDoc} */
-        @Override public IndexRow getLookupRow(BPlusTree<IndexRow, ?> tree, long pageAddr, int idx) throws IgniteCheckedException {
+        @Override public IndexRow getLookupRow(BPlusTree<IndexRow, ?> tree, long pageAddr, int idx)
+            throws IgniteCheckedException {
             return lookupRow(rowHnd, pageAddr, idx, this);
         }
 
@@ -322,4 +351,36 @@ public class DefragIndexFactory extends InlineIndexFactory {
             return io.storeMvccInfo();
         }
     }
+
+    /**
+     * IndexRowImpl with index values stored in a byte array.
+     */
+    public static class DefragIndexRowImpl extends IndexRowImpl {
+        /** Byte array of index values. */
+        private final byte[] values;
+
+        /** */
+        public DefragIndexRowImpl(InlineIndexRowHandler rowHnd, CacheDataRow row, byte[] values) {
+            super(rowHnd, row);
+            this.values = values;
+        }
+
+        /** */
+        public static DefragIndexRowImpl create(
+            InlineIndexRowHandler rowHnd,
+            long newLink,
+            DefragIndexRowImpl oldValue,
+            boolean storeMvcc
+        ) {
+            CacheDataRow newDataRow;
+
+            if (storeMvcc) {
+                newDataRow = new MvccDataRow(newLink);
+                newDataRow.mvccVersion(oldValue);
+            } else
+                newDataRow = new CacheDataRowAdapter(newLink);
+
+            return new DefragIndexRowImpl(rowHnd, newDataRow, oldValue.values);
+        }
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/defragmentation/IndexingDefragmentation.java b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/defragmentation/IndexingDefragmentation.java
index 1d39073..8c30d19 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/defragmentation/IndexingDefragmentation.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/defragmentation/IndexingDefragmentation.java
@@ -28,9 +28,9 @@ import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.failure.FailureContext;
 import org.apache.ignite.internal.cache.query.index.IndexProcessor;
 import org.apache.ignite.internal.cache.query.index.sorted.IndexRow;
-import org.apache.ignite.internal.cache.query.index.sorted.IndexRowImpl;
 import org.apache.ignite.internal.cache.query.index.sorted.InlineIndexRowHandler;
 import org.apache.ignite.internal.cache.query.index.sorted.SortedIndexDefinition;
+import org.apache.ignite.internal.cache.query.index.sorted.defragmentation.DefragIndexFactory.DefragIndexRowImpl;
 import org.apache.ignite.internal.cache.query.index.sorted.inline.InlineIndex;
 import org.apache.ignite.internal.cache.query.index.sorted.inline.InlineIndexImpl;
 import org.apache.ignite.internal.cache.query.index.sorted.inline.io.MvccIO;
@@ -38,13 +38,11 @@ import org.apache.ignite.internal.pagemem.PageMemory;
 import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
-import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter;
 import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointTimeoutLock;
 import org.apache.ignite.internal.processors.cache.persistence.defragmentation.LinkMap;
 import org.apache.ignite.internal.processors.cache.persistence.defragmentation.TreeIterator;
 import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
-import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccDataRow;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.collection.IntMap;
 import org.apache.ignite.thread.IgniteThreadPoolExecutor;
@@ -176,8 +174,8 @@ public class IndexingDefragmentation {
 
                         IndexRow row = theTree.getRow(h2IO, pageAddr, idx);
 
-                        if (!row.indexSearchRow()) {
-                            IndexRowImpl r = (IndexRowImpl)row;
+                        if (row instanceof DefragIndexRowImpl) {
+                            DefragIndexRowImpl r = (DefragIndexRowImpl)row;
 
                             CacheDataRow cacheDataRow = r.cacheDataRow();
 
@@ -189,16 +187,13 @@ public class IndexingDefragmentation {
 
                             long newLink = map.get(link);
 
-                            CacheDataRow newDataRow;
-
-                             if (((MvccIO)io).storeMvccInfo()) {
-                                 newDataRow = new MvccDataRow(newLink);
-                                 newDataRow.mvccVersion(row);
-                             } else
-                                 newDataRow = new CacheDataRowAdapter(newLink);
-
                             // Use old row handler, as MetaInfo is copied from old tree.
-                            IndexRowImpl newRow = new IndexRowImpl(oldRowHnd, newDataRow, r.keys());
+                            DefragIndexRowImpl newRow = DefragIndexRowImpl.create(
+                                oldRowHnd,
+                                newLink,
+                                r,
+                                ((MvccIO) io).storeMvccInfo()
+                            );
 
                             newIdx.putIndexRow(newRow);
                         }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/inline/io/IORowHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/inline/io/IORowHandler.java
index 98ef338..076718b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/inline/io/IORowHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/inline/io/IORowHandler.java
@@ -24,9 +24,9 @@ import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils;
 /**
  * Class provide a common logic for storing an index row.
  */
-class IORowHandler {
+public class IORowHandler {
     /** */
-    static void store(long pageAddr, int off, IndexRow row, boolean storeMvccInfo) {
+    public static void store(long pageAddr, int off, IndexRow row, boolean storeMvccInfo) {
         // Write link after all inlined idx keys.
         PageUtils.putLong(pageAddr, off, row.link());
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
index 18f1ff8..6e508a7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
@@ -864,7 +864,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
      * @param innerIos Inner IO versions.
      * @param leafIos Leaf IO versions.
      */
-    protected void setIos(IOVersions<? extends BPlusInnerIO<L>> innerIos,
+    public void setIos(IOVersions<? extends BPlusInnerIO<L>> innerIos,
         IOVersions<? extends BPlusLeafIO<L>> leafIos) {
         assert innerIos != null;
         assert leafIos != null;
@@ -5396,14 +5396,14 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
     /**
      * @return Latest version of inner page IO.
      */
-    protected final BPlusInnerIO<L> latestInnerIO() {
+    public final BPlusInnerIO<L> latestInnerIO() {
         return innerIos.latest();
     }
 
     /**
      * @return Latest version of leaf page IO.
      */
-    protected final BPlusLeafIO<L> latestLeafIO() {
+    public final BPlusLeafIO<L> latestLeafIO() {
         return leafIos.latest();
     }
 
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsIndexingDefragmentationTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsIndexingDefragmentationTest.java
index d7647ec..78da516 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsIndexingDefragmentationTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsIndexingDefragmentationTest.java
@@ -18,8 +18,10 @@
 package org.apache.ignite.internal.processors.cache.persistence;
 
 import java.io.File;
+import java.math.BigDecimal;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
 import java.util.function.Function;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
@@ -291,6 +293,98 @@ public class IgnitePdsIndexingDefragmentationTest extends IgnitePdsDefragmentati
         cache.query(new SqlFieldsQuery("SELECT * FROM TEST WHERE VAL_OBJ > 0")).getAll();
     }
 
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testDecimalIndex() throws Exception {
+        startGrid(0).cluster().state(ClusterState.ACTIVE);
+
+        IgniteCache<?, ?> cache = grid(0).cache(DEFAULT_CACHE_NAME);
+
+        cache.query(new SqlFieldsQuery("CREATE TABLE TEST (ID INT PRIMARY KEY, VAL_INT INT, VAL_DEC DECIMAL)"));
+
+        final String cacheName = "SQL_default_TEST";
+
+        cache.query(new SqlFieldsQuery("CREATE INDEX TEST_VAL_DEC ON TEST(VAL_DEC)"));
+
+        for (int i = 0; i < ADDED_KEYS_COUNT; i++)
+            cache.query(new SqlFieldsQuery("INSERT INTO TEST VALUES (?, ?, ?)").setArgs(i, i, BigDecimal.valueOf(i)));
+
+        cache.query(new SqlFieldsQuery("DELETE FROM TEST WHERE MOD(ID, 2) = 0"));
+
+        createMaintenanceRecord(cacheName);
+
+        CacheGroupContext grp = grid(0).context().cache().cacheGroup(CU.cacheId(cacheName));
+
+        forceCheckpoint();
+
+        // Restart first time.
+        stopGrid(0);
+
+        defragmentAndValidateSizesDecreasedAfterDefragmentation(0, grp);
+
+        startGrid(0);
+
+        // Reinit cache object.
+        cache = grid(0).cache(DEFAULT_CACHE_NAME);
+
+        assertTrue(explainQuery(cache, "EXPLAIN SELECT * FROM TEST WHERE ID > 0").contains("_key_pk_proxy"));
+
+        cache.query(new SqlFieldsQuery("SELECT * FROM TEST WHERE ID > 0")).getAll();
+
+        assertTrue(explainQuery(cache, "EXPLAIN SELECT * FROM TEST WHERE VAL_DEC > 0").contains("test_val_dec"));
+
+        cache.query(new SqlFieldsQuery("SELECT * FROM TEST WHERE VAL_DEC > 0")).getAll();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testVarcharIndex() throws Exception {
+        final String strPrefix = "AAAAAAAAAA";
+
+        startGrid(0).cluster().state(ClusterState.ACTIVE);
+
+        IgniteCache<?, ?> cache = grid(0).cache(DEFAULT_CACHE_NAME);
+
+        cache.query(new SqlFieldsQuery("CREATE TABLE TEST (ID INT PRIMARY KEY, VAL_INT INT, VAL_STR VARCHAR)"));
+
+        final String cacheName = "SQL_default_TEST";
+
+        cache.query(new SqlFieldsQuery("CREATE INDEX TEST_VAL_STR ON TEST(VAL_STR)"));
+
+        for (int i = 0; i < ADDED_KEYS_COUNT; i++) {
+            cache.query(new SqlFieldsQuery("INSERT INTO TEST VALUES (?, ?, ?)")
+                .setArgs(i, i, strPrefix + i));
+        }
+
+        cache.query(new SqlFieldsQuery("DELETE FROM TEST WHERE MOD(ID, 2) = 0"));
+
+        createMaintenanceRecord(cacheName);
+
+        CacheGroupContext grp = grid(0).context().cache().cacheGroup(CU.cacheId(cacheName));
+
+        forceCheckpoint();
+
+        // Restart first time.
+        stopGrid(0);
+
+        defragmentAndValidateSizesDecreasedAfterDefragmentation(0, grp);
+
+        startGrid(0);
+
+        // Reinit cache object.
+        cache = grid(0).cache(DEFAULT_CACHE_NAME);
+
+        assertTrue(explainQuery(cache, "EXPLAIN SELECT * FROM TEST WHERE VAL_STR = 'a'").contains("test_val_str"));
+
+        List<List<?>> res = cache.query(new SqlFieldsQuery("SELECT * FROM TEST WHERE VAL_STR = ?").setArgs(strPrefix + 1)).getAll();
+
+        assertEquals(1, res.size());
+    }
+
     /** */
     private static String explainQuery(IgniteCache<?, ?> cache, String qry) {
         return cache