You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/11/11 14:18:59 UTC
[10/17] ignite git commit: IGNITE-4145: Fixes "No query result found
for request" exception when running multiple queries concurrently. This
closes #1218.
IGNITE-4145: Fixes "No query result found for request" exception when running multiple queries concurrently. This closes #1218.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a70f0bac
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a70f0bac
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a70f0bac
Branch: refs/heads/ignite-4154-opt2
Commit: a70f0bac3ac2487b8ab58598ad921daa952b485f
Parents: 53876d3
Author: Andrey V. Mashenkov <an...@gmail.com>
Authored: Fri Nov 11 13:03:40 2016 +0300
Committer: Andrey V. Mashenkov <an...@gmail.com>
Committed: Fri Nov 11 13:03:40 2016 +0300
----------------------------------------------------------------------
.../query/h2/twostep/GridMergeIndex.java | 49 +++++++++++-----
.../IgniteCacheQueryMultiThreadedSelfTest.java | 59 ++++++++++++++++++++
2 files changed, 93 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/a70f0bac/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
index 3914bd7..7ac2ee3 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
@@ -177,36 +177,50 @@ public abstract class GridMergeIndex extends BaseIndex {
public final void addPage(GridResultPage page) {
int pageRowsCnt = page.rowsInPage();
- if (pageRowsCnt != 0)
- addPage0(page);
-
Counter cnt = remainingRows.get(page.source());
+ // RemainingRowsCount should be updated before page adding to avoid race
+ // in GridMergeIndexUnsorted cursor iterator
+ int remainingRowsCount;
+
int allRows = page.response().allRows();
if (allRows != -1) { // Only the first page contains allRows count and is allowed to init counter.
- assert !cnt.initialized : "Counter is already initialized.";
+ assert cnt.state == State.UNINITIALIZED : "Counter is already initialized.";
+
+ remainingRowsCount = cnt.addAndGet(allRows - pageRowsCnt);
- cnt.addAndGet(allRows);
expRowsCnt.addAndGet(allRows);
+ // Add page before setting initialized flag to avoid race condition with adding last page
+ if (pageRowsCnt > 0)
+ addPage0(page);
+
// We need this separate flag to handle case when the first source contains only one page
// and it will signal that all remaining counters are zero and fetch is finished.
- cnt.initialized = true;
+ cnt.state = State.INITIALIZED;
}
+ else {
+ remainingRowsCount = cnt.addAndGet(-pageRowsCnt);
- if (cnt.addAndGet(-pageRowsCnt) == 0) { // Result can be negative in case of race between messages, it is ok.
- boolean last = true;
+ if (pageRowsCnt > 0)
+ addPage0(page);
+ }
- for (Counter c : remainingRows.values()) { // Check all the sources.
- if (c.get() != 0 || !c.initialized) {
- last = false;
+ if (remainingRowsCount == 0) { // Result can be negative in case of race between messages, it is ok.
+ if (cnt.state == State.UNINITIALIZED)
+ return;
- break;
- }
+ // Guarantee that finished state possible only if counter is zero and all pages was added
+ cnt.state = State.FINISHED;
+
+ for (Counter c : remainingRows.values()) { // Check all the sources.
+ if (c.state != State.FINISHED)
+ return;
}
- if (last && lastSubmitted.compareAndSet(false, true)) {
+ if (lastSubmitted.compareAndSet(false, true)) {
+ // Add page-marker that last page was added
addPage0(new GridResultPage(null, page.source(), null) {
@Override public boolean isLast() {
return true;
@@ -426,11 +440,16 @@ public abstract class GridMergeIndex extends BaseIndex {
}
}
+ /** */
+ enum State {
+ UNINITIALIZED, INITIALIZED, FINISHED
+ }
+
/**
* Counter with initialization flag.
*/
private static class Counter extends AtomicInteger {
/** */
- volatile boolean initialized;
+ volatile State state = State.UNINITIALIZED;
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/a70f0bac/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryMultiThreadedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryMultiThreadedSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryMultiThreadedSelfTest.java
index be644e2..efa6bd6 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryMultiThreadedSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryMultiThreadedSelfTest.java
@@ -21,6 +21,7 @@ import java.io.Serializable;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
+import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
@@ -34,6 +35,7 @@ import org.apache.ignite.cache.CacheMemoryMode;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cache.eviction.lru.LruEvictionPolicy;
import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.SqlQuery;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
import org.apache.ignite.configuration.CacheConfiguration;
@@ -730,6 +732,63 @@ public class IgniteCacheQueryMultiThreadedSelfTest extends GridCommonAbstractTes
}
/**
+ * SqlFieldsQuery paging mechanics stress test
+ *
+ * @throws Exception If failed.
+ */
+ @SuppressWarnings({"TooBroadScope"})
+ public void testMultiThreadedSqlFieldsQuery() throws Throwable {
+ int threadCnt = 16;
+ final int keyCnt = 1100; // set resultSet size bigger than page size
+ final int logMod = 5000;
+
+ final Ignite g = grid(0);
+
+ // Put test values into cache.
+ final IgniteCache<Integer, TestValue> c = g.cache(null);
+
+ for (int i = 0; i < keyCnt; i++)
+ c.put(i, new TestValue(i));
+
+ final AtomicInteger cnt = new AtomicInteger();
+
+ final AtomicBoolean done = new AtomicBoolean();
+
+ IgniteInternalFuture<?> fut = multithreadedAsync(
+ new CAX() {
+ @Override public void applyx() throws IgniteCheckedException {
+ int iter = 0;
+
+ while (!done.get() && !Thread.currentThread().isInterrupted()) {
+ iter++;
+
+ List<List<?>> entries =
+ c.query(new SqlFieldsQuery("SELECT * from TestValue").setPageSize(100)).getAll();
+
+ assert entries != null;
+
+ assertEquals("Entries count is not as expected on iteration: " + iter, keyCnt, entries.size());
+
+ if (cnt.incrementAndGet() % logMod == 0) {
+ GridCacheQueryManager<Object, Object> qryMgr =
+ ((IgniteKernal)g).internalCache().context().queries();
+
+ assert qryMgr != null;
+
+ qryMgr.printMemoryStats();
+ }
+ }
+ }
+ }, threadCnt);
+
+ Thread.sleep(DURATION);
+
+ done.set(true);
+
+ fut.get();
+ }
+
+ /**
* Test value.
*/
private static class TestValue implements Serializable {