You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/11/22 11:40:08 UTC

[19/50] [abbrv] ignite git commit: IGNITE-4145: Fixes "No query result found for request" exception when running multiple queries concurrently. This closes #1218.

IGNITE-4145: Fixes "No query result found for request" exception when running multiple queries concurrently. This closes #1218.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a70f0bac
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a70f0bac
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a70f0bac

Branch: refs/heads/master
Commit: a70f0bac3ac2487b8ab58598ad921daa952b485f
Parents: 53876d3
Author: Andrey V. Mashenkov <an...@gmail.com>
Authored: Fri Nov 11 13:03:40 2016 +0300
Committer: Andrey V. Mashenkov <an...@gmail.com>
Committed: Fri Nov 11 13:03:40 2016 +0300

----------------------------------------------------------------------
 .../query/h2/twostep/GridMergeIndex.java        | 49 +++++++++++-----
 .../IgniteCacheQueryMultiThreadedSelfTest.java  | 59 ++++++++++++++++++++
 2 files changed, 93 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a70f0bac/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
index 3914bd7..7ac2ee3 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
@@ -177,36 +177,50 @@ public abstract class GridMergeIndex extends BaseIndex {
     public final void addPage(GridResultPage page) {
         int pageRowsCnt = page.rowsInPage();
 
-        if (pageRowsCnt != 0)
-            addPage0(page);
-
         Counter cnt = remainingRows.get(page.source());
 
+        // RemainingRowsCount should be updated before page adding to avoid race
+        // in GridMergeIndexUnsorted cursor iterator
+        int remainingRowsCount;
+
         int allRows = page.response().allRows();
 
         if (allRows != -1) { // Only the first page contains allRows count and is allowed to init counter.
-            assert !cnt.initialized : "Counter is already initialized.";
+            assert cnt.state == State.UNINITIALIZED : "Counter is already initialized.";
+
+            remainingRowsCount = cnt.addAndGet(allRows - pageRowsCnt);
 
-            cnt.addAndGet(allRows);
             expRowsCnt.addAndGet(allRows);
 
+            // Add page before setting initialized flag to avoid race condition with adding last page
+            if (pageRowsCnt > 0)
+                addPage0(page);
+
             // We need this separate flag to handle case when the first source contains only one page
             // and it will signal that all remaining counters are zero and fetch is finished.
-            cnt.initialized = true;
+            cnt.state = State.INITIALIZED;
         }
+        else {
+            remainingRowsCount = cnt.addAndGet(-pageRowsCnt);
 
-        if (cnt.addAndGet(-pageRowsCnt) == 0) { // Result can be negative in case of race between messages, it is ok.
-            boolean last = true;
+            if (pageRowsCnt > 0)
+                addPage0(page);
+        }
 
-            for (Counter c : remainingRows.values()) { // Check all the sources.
-                if (c.get() != 0 || !c.initialized) {
-                    last = false;
+        if (remainingRowsCount == 0) { // Result can be negative in case of race between messages, it is ok.
+            if (cnt.state == State.UNINITIALIZED)
+                return;
 
-                    break;
-                }
+            // Guarantee that finished state possible only if counter is zero and all pages was added
+            cnt.state = State.FINISHED;
+
+            for (Counter c : remainingRows.values()) { // Check all the sources.
+                if (c.state != State.FINISHED)
+                    return;
             }
 
-            if (last && lastSubmitted.compareAndSet(false, true)) {
+            if (lastSubmitted.compareAndSet(false, true)) {
+                // Add page-marker that last page was added
                 addPage0(new GridResultPage(null, page.source(), null) {
                     @Override public boolean isLast() {
                         return true;
@@ -426,11 +440,16 @@ public abstract class GridMergeIndex extends BaseIndex {
         }
     }
 
+    /** */
+    enum State {
+        UNINITIALIZED, INITIALIZED, FINISHED
+    }
+
     /**
      * Counter with initialization flag.
      */
     private static class Counter extends AtomicInteger {
         /** */
-        volatile boolean initialized;
+        volatile State state = State.UNINITIALIZED;
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/a70f0bac/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryMultiThreadedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryMultiThreadedSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryMultiThreadedSelfTest.java
index be644e2..efa6bd6 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryMultiThreadedSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryMultiThreadedSelfTest.java
@@ -21,6 +21,7 @@ import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
@@ -34,6 +35,7 @@ import org.apache.ignite.cache.CacheMemoryMode;
 import org.apache.ignite.cache.CachePeekMode;
 import org.apache.ignite.cache.eviction.lru.LruEvictionPolicy;
 import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.cache.query.SqlQuery;
 import org.apache.ignite.cache.query.annotations.QuerySqlField;
 import org.apache.ignite.configuration.CacheConfiguration;
@@ -730,6 +732,63 @@ public class IgniteCacheQueryMultiThreadedSelfTest extends GridCommonAbstractTes
     }
 
     /**
+     * SqlFieldsQuery paging mechanics stress test
+     *
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings({"TooBroadScope"})
+    public void testMultiThreadedSqlFieldsQuery() throws Throwable {
+        int threadCnt = 16;
+        final int keyCnt = 1100; // set resultSet size bigger than page size
+        final int logMod = 5000;
+
+        final Ignite g = grid(0);
+
+        // Put test values into cache.
+        final IgniteCache<Integer, TestValue> c = g.cache(null);
+
+        for (int i = 0; i < keyCnt; i++)
+            c.put(i, new TestValue(i));
+
+        final AtomicInteger cnt = new AtomicInteger();
+
+        final AtomicBoolean done = new AtomicBoolean();
+
+        IgniteInternalFuture<?> fut = multithreadedAsync(
+            new CAX() {
+                @Override public void applyx() throws IgniteCheckedException {
+                    int iter = 0;
+
+                    while (!done.get() && !Thread.currentThread().isInterrupted()) {
+                        iter++;
+
+                        List<List<?>> entries =
+                            c.query(new SqlFieldsQuery("SELECT * from TestValue").setPageSize(100)).getAll();
+
+                        assert entries != null;
+
+                        assertEquals("Entries count is not as expected on iteration: " + iter, keyCnt, entries.size());
+
+                        if (cnt.incrementAndGet() % logMod == 0) {
+                            GridCacheQueryManager<Object, Object> qryMgr =
+                                ((IgniteKernal)g).internalCache().context().queries();
+
+                            assert qryMgr != null;
+
+                            qryMgr.printMemoryStats();
+                        }
+                    }
+                }
+            }, threadCnt);
+
+        Thread.sleep(DURATION);
+
+        done.set(true);
+
+        fut.get();
+    }
+
+    /**
      * Test value.
      */
     private static class TestValue implements Serializable {