You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/05/24 18:05:33 UTC

[10/32] ignite git commit: Fixed segmented indices snapshots. - Fixes #1936.

Fixed segmented indices snapshots. - Fixes #1936.

Signed-off-by: Sergi Vladykin <se...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1554a160
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1554a160
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1554a160

Branch: refs/heads/ignite-5075-pds
Commit: 1554a1606244b46f042ecbf6aeb7eb09c3a2abb8
Parents: 647fd19
Author: Andrey V. Mashenkov <an...@gmail.com>
Authored: Tue May 23 15:26:00 2017 +0300
Committer: Sergi Vladykin <se...@gmail.com>
Committed: Tue May 23 15:26:00 2017 +0300

----------------------------------------------------------------------
 .../processors/query/h2/opt/GridH2Table.java    | 107 ++++++++++++-------
 .../query/h2/twostep/GridMapQueryExecutor.java  |   2 +-
 .../query/IgniteSqlSegmentedIndexSelfTest.java  |  25 +++++
 3 files changed, 93 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/1554a160/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
index 41cf68b..ec728de 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
@@ -23,7 +23,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicReferenceArray;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -74,6 +74,9 @@ public class GridH2Table extends TableBase {
     private volatile ArrayList<Index> idxs;
 
     /** */
+    private final int pkIndexPos;
+
+    /** */
     private final Map<String, GridH2IndexBase> tmpIdxs = new HashMap<>();
 
     /** */
@@ -86,7 +89,7 @@ public class GridH2Table extends TableBase {
     private final ConcurrentMap<Session, Boolean> sessions = new ConcurrentHashMap8<>();
 
     /** */
-    private final AtomicReference<Object[]> actualSnapshot = new AtomicReference<>();
+    private final AtomicReferenceArray<Object[]> actualSnapshot;
 
     /** */
     private IndexColumn affKeyCol;
@@ -164,21 +167,31 @@ public class GridH2Table extends TableBase {
         assert idxs != null;
 
         List<Index> clones = new ArrayList<>(idxs.size());
-        for (Index index: idxs) {
+        for (Index index : idxs) {
             Index clone = createDuplicateIndexIfNeeded(index);
             if (clone != null)
-               clones.add(clone);
+                clones.add(clone);
         }
         idxs.addAll(clones);
 
+        boolean hasHashIndex = idxs.size() >= 2 && index(0).getIndexType().isHash();
+
         // Add scan index at 0 which is required by H2.
-        if (idxs.size() >= 2 && index(0).getIndexType().isHash())
+        if (hasHashIndex)
             idxs.add(0, new GridH2PrimaryScanIndex(this, index(1), index(0)));
         else
             idxs.add(0, new GridH2PrimaryScanIndex(this, index(0), null));
 
         snapshotEnabled = desc == null || desc.snapshotableIndex();
 
+        pkIndexPos = hasHashIndex ? 2 : 1;
+
+        final int segments = desc != null ? desc.configuration().getQueryParallelism() :
+            // Get index segments count from PK index. Null desc can be passed from tests.
+            index(pkIndexPos).segmentsCount();
+
+        actualSnapshot = snapshotEnabled ? new AtomicReferenceArray<Object[]>(Math.max(segments, 1)) : null;
+
         lock = new ReentrantReadWriteLock();
     }
 
@@ -233,8 +246,13 @@ public class GridH2Table extends TableBase {
             throw new IllegalStateException("Table " + identifierString() + " already destroyed.");
         }
 
-        if (snapshotInLock())
-            snapshotIndexes(null);
+        if (snapshotInLock()) {
+            final GridH2QueryContext qctx = GridH2QueryContext.get();
+
+            assert qctx != null;
+
+            snapshotIndexes(null, qctx.segment());
+        }
 
         return false;
     }
@@ -255,21 +273,22 @@ public class GridH2Table extends TableBase {
 
     /**
      * @param qctx Query context.
+     * @param segment id of index segment to be snapshoted.
      */
-    public void snapshotIndexes(GridH2QueryContext qctx) {
+    public void snapshotIndexes(GridH2QueryContext qctx, int segment) {
         if (!snapshotEnabled)
             return;
 
-        Object[] snapshots;
+        Object[] segmentSnapshot;
 
         // Try to reuse existing snapshots outside of the lock.
-        for (long waitTime = 200;; waitTime *= 2) { // Increase wait time to avoid starvation.
-            snapshots = actualSnapshot.get();
+        for (long waitTime = 200; ; waitTime *= 2) { // Increase wait time to avoid starvation.
+            segmentSnapshot = actualSnapshot.get(segment);
 
-            if (snapshots != null) { // Reuse existing snapshot without locking.
-                snapshots = doSnapshotIndexes(snapshots, qctx);
+            if (segmentSnapshot != null) { // Reuse existing snapshot without locking.
+                segmentSnapshot = doSnapshotIndexes(segment, segmentSnapshot, qctx);
 
-                if (snapshots != null)
+                if (segmentSnapshot != null)
                     return; // Reused successfully.
             }
 
@@ -281,17 +300,17 @@ public class GridH2Table extends TableBase {
             ensureNotDestroyed();
 
             // Try again inside of the lock.
-            snapshots = actualSnapshot.get();
+            segmentSnapshot = actualSnapshot.get(segment);
 
-            if (snapshots != null) // Try reusing.
-                snapshots = doSnapshotIndexes(snapshots, qctx);
+            if (segmentSnapshot != null) // Try reusing.
+                segmentSnapshot = doSnapshotIndexes(segment, segmentSnapshot, qctx);
 
-            if (snapshots == null) { // Reuse failed, produce new snapshots.
-                snapshots = doSnapshotIndexes(null, qctx);
+            if (segmentSnapshot == null) { // Reuse failed, produce new snapshots.
+                segmentSnapshot = doSnapshotIndexes(segment,null, qctx);
 
-                assert snapshots != null;
+                assert segmentSnapshot != null;
 
-                actualSnapshot.set(snapshots);
+                actualSnapshot.set(segment, segmentSnapshot);
             }
         }
         finally {
@@ -375,19 +394,22 @@ public class GridH2Table extends TableBase {
      * Must be called inside of write lock because when using multiple indexes we have to ensure that all of them have
      * the same contents at snapshot taking time.
      *
+     * @param segment id of index segment snapshot.
+     * @param segmentSnapshot snapshot to be reused.
      * @param qctx Query context.
      * @return New indexes data snapshot.
      */
     @SuppressWarnings("unchecked")
-    private Object[] doSnapshotIndexes(Object[] snapshots, GridH2QueryContext qctx) {
+    private Object[] doSnapshotIndexes(int segment, Object[] segmentSnapshot, GridH2QueryContext qctx) {
         assert snapshotEnabled;
 
-        if (snapshots == null) // Nothing to reuse, create new snapshots.
-            snapshots = new Object[idxs.size() - 2];
+        //TODO: make HashIndex snapshotable or remove it at all?
+        if (segmentSnapshot == null) // Nothing to reuse, create new snapshots.
+            segmentSnapshot = new Object[idxs.size() - pkIndexPos];
 
-        // Take snapshots on all except first which is scan and second which is hash.
-        for (int i = 2, len = idxs.size(); i < len; i++) {
-            Object s = snapshots[i - 2];
+        // Take snapshots on all except first which is scan.
+        for (int i = pkIndexPos, len = idxs.size(); i < len; i++) {
+            Object s = segmentSnapshot[i - pkIndexPos];
 
             boolean reuseExisting = s != null;
 
@@ -401,20 +423,20 @@ public class GridH2Table extends TableBase {
                 if (qctx != null)
                     qctx.clearSnapshots();
 
-                for (int j = 2; j < i; j++)
+                for (int j = pkIndexPos; j < i; j++)
                     if ((idxs.get(j) instanceof GridH2IndexBase))
                         index(j).releaseSnapshot();
 
                 // Drop invalidated snapshot.
-                actualSnapshot.compareAndSet(snapshots, null);
+                actualSnapshot.compareAndSet(segment, segmentSnapshot, null);
 
                 return null;
             }
 
-            snapshots[i - 2] = s;
+            segmentSnapshot[i - pkIndexPos] = s;
         }
 
-        return snapshots;
+        return segmentSnapshot;
     }
 
     /** {@inheritDoc} */
@@ -587,7 +609,7 @@ public class GridH2Table extends TableBase {
 
                 int len = idxs.size();
 
-                int i = 2;
+                int i = pkIndexPos;
 
                 // Put row if absent to all indexes sequentially.
                 // Start from 3 because 0 - Scan (don't need to update), 1 - PK hash (already updated), 2 - PK (already updated).
@@ -609,7 +631,7 @@ public class GridH2Table extends TableBase {
                 if (old != null) {
                     // Remove row from all indexes.
                     // Start from 3 because 0 - Scan (don't need to update), 1 - PK hash (already updated), 2 - PK (already updated).
-                    for (int i = 3, len = idxs.size(); i < len; i++) {
+                    for (int i = pkIndexPos + 1, len = idxs.size(); i < len; i++) {
                         if (!(idxs.get(i) instanceof GridH2IndexBase))
                             continue;
                         Row res = index(i).remove(old);
@@ -627,7 +649,8 @@ public class GridH2Table extends TableBase {
             }
 
             // The snapshot is not actual after update.
-            actualSnapshot.set(null);
+            if (actualSnapshot != null)
+                actualSnapshot.set(pk.segmentForRow(row), null);
 
             return true;
         }
@@ -684,9 +707,10 @@ public class GridH2Table extends TableBase {
     ArrayList<GridH2IndexBase> indexes() {
         ArrayList<GridH2IndexBase> res = new ArrayList<>(idxs.size() - 2);
 
-        for (int i = 2, len = idxs.size(); i < len; i++)
+        for (int i = pkIndexPos, len = idxs.size(); i < len; i++) {
             if (idxs.get(i) instanceof GridH2IndexBase)
                 res.add(index(i));
+        }
 
         return res;
     }
@@ -695,6 +719,8 @@ public class GridH2Table extends TableBase {
      *
      */
     public void markRebuildFromHashInProgress(boolean value) {
+        assert !value || (idxs.size() >= 2 && index(1).getIndexType().isHash()) : "Table has no hash index.";
+
         rebuildFromHashInProgress = value;
     }
 
@@ -759,7 +785,8 @@ public class GridH2Table extends TableBase {
 
             Index cloneIdx = createDuplicateIndexIfNeeded(idx);
 
-            ArrayList<Index> newIdxs = new ArrayList<>(idxs.size() + ((cloneIdx == null) ? 1 : 2));
+            ArrayList<Index> newIdxs = new ArrayList<>(
+                    idxs.size() + ((cloneIdx == null) ? 1 : 2));
 
             newIdxs.addAll(idxs);
 
@@ -837,14 +864,14 @@ public class GridH2Table extends TableBase {
         try {
             ArrayList<Index> idxs = new ArrayList<>(this.idxs);
 
-            Index targetIdx = (h2Idx instanceof GridH2ProxyIndex)?
-                    ((GridH2ProxyIndex)h2Idx).underlyingIndex(): h2Idx;
+            Index targetIdx = (h2Idx instanceof GridH2ProxyIndex) ?
+                ((GridH2ProxyIndex)h2Idx).underlyingIndex() : h2Idx;
 
-            for (int i = 2; i < idxs.size();) {
+            for (int i = pkIndexPos; i < idxs.size();) {
                 Index idx = idxs.get(i);
 
                 if (idx == targetIdx || (idx instanceof GridH2ProxyIndex &&
-                   ((GridH2ProxyIndex)idx).underlyingIndex() == targetIdx)) {
+                    ((GridH2ProxyIndex)idx).underlyingIndex() == targetIdx)) {
 
                     idxs.remove(i);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/1554a160/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 43cc230..6d76eea 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -573,7 +573,7 @@ public class GridMapQueryExecutor {
 
                     Objects.requireNonNull(h2Tbl, tbl.toString());
 
-                    h2Tbl.snapshotIndexes(qctx);
+                    h2Tbl.snapshotIndexes(qctx, segmentId);
 
                     snapshotedTbls.add(h2Tbl);
                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/1554a160/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSegmentedIndexSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSegmentedIndexSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSegmentedIndexSelfTest.java
index 586b81e..03c3f1e 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSegmentedIndexSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSegmentedIndexSelfTest.java
@@ -135,6 +135,31 @@ public class IgniteSqlSegmentedIndexSelfTest extends GridCommonAbstractTest {
     }
 
     /**
+     * Check correct index snapshots with segmented indices.
+     * @throws Exception If failed.
+     */
+    public void testSegmentedIndexReproducableResults() throws Exception {
+        ignite(0).createCache(cacheConfig(ORG_CACHE_NAME, true, Integer.class, Organization.class));
+
+        IgniteCache<Object, Object> cache = ignite(0).cache(ORG_CACHE_NAME);
+
+        // Unequal entries distribution among partitions.
+        int expectedSize = nodesCount() * QRY_PARALLELISM_LVL *  3 / 2;
+
+        for (int i = 0; i < expectedSize; i++)
+            cache.put(i, new Organization("org-" + i));
+
+        String select0 = "select * from \"org\".Organization o";
+
+        // Check for stable results.
+        for(int i = 0; i < 10; i++) {
+            List<List<?>> result = cache.query(new SqlFieldsQuery(select0)).getAll();
+
+            assertEquals(expectedSize, result.size());
+        }
+    }
+
+    /**
      * Run tests on single-node grid
      *
      * @throws Exception If failed.