You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/05/24 18:05:33 UTC
[10/32] ignite git commit: Fixed segmented indices snapshots. - Fixes
#1936.
Fixed segmented indices snapshots. - Fixes #1936.
Signed-off-by: Sergi Vladykin <se...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1554a160
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1554a160
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1554a160
Branch: refs/heads/ignite-5075-pds
Commit: 1554a1606244b46f042ecbf6aeb7eb09c3a2abb8
Parents: 647fd19
Author: Andrey V. Mashenkov <an...@gmail.com>
Authored: Tue May 23 15:26:00 2017 +0300
Committer: Sergi Vladykin <se...@gmail.com>
Committed: Tue May 23 15:26:00 2017 +0300
----------------------------------------------------------------------
.../processors/query/h2/opt/GridH2Table.java | 107 ++++++++++++-------
.../query/h2/twostep/GridMapQueryExecutor.java | 2 +-
.../query/IgniteSqlSegmentedIndexSelfTest.java | 25 +++++
3 files changed, 93 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/1554a160/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
index 41cf68b..ec728de 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
@@ -23,7 +23,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -74,6 +74,9 @@ public class GridH2Table extends TableBase {
private volatile ArrayList<Index> idxs;
/** */
+ private final int pkIndexPos;
+
+ /** */
private final Map<String, GridH2IndexBase> tmpIdxs = new HashMap<>();
/** */
@@ -86,7 +89,7 @@ public class GridH2Table extends TableBase {
private final ConcurrentMap<Session, Boolean> sessions = new ConcurrentHashMap8<>();
/** */
- private final AtomicReference<Object[]> actualSnapshot = new AtomicReference<>();
+ private final AtomicReferenceArray<Object[]> actualSnapshot;
/** */
private IndexColumn affKeyCol;
@@ -164,21 +167,31 @@ public class GridH2Table extends TableBase {
assert idxs != null;
List<Index> clones = new ArrayList<>(idxs.size());
- for (Index index: idxs) {
+ for (Index index : idxs) {
Index clone = createDuplicateIndexIfNeeded(index);
if (clone != null)
- clones.add(clone);
+ clones.add(clone);
}
idxs.addAll(clones);
+ boolean hasHashIndex = idxs.size() >= 2 && index(0).getIndexType().isHash();
+
// Add scan index at 0 which is required by H2.
- if (idxs.size() >= 2 && index(0).getIndexType().isHash())
+ if (hasHashIndex)
idxs.add(0, new GridH2PrimaryScanIndex(this, index(1), index(0)));
else
idxs.add(0, new GridH2PrimaryScanIndex(this, index(0), null));
snapshotEnabled = desc == null || desc.snapshotableIndex();
+ pkIndexPos = hasHashIndex ? 2 : 1;
+
+ final int segments = desc != null ? desc.configuration().getQueryParallelism() :
+ // Get index segments count from PK index. Null desc can be passed from tests.
+ index(pkIndexPos).segmentsCount();
+
+ actualSnapshot = snapshotEnabled ? new AtomicReferenceArray<Object[]>(Math.max(segments, 1)) : null;
+
lock = new ReentrantReadWriteLock();
}
@@ -233,8 +246,13 @@ public class GridH2Table extends TableBase {
throw new IllegalStateException("Table " + identifierString() + " already destroyed.");
}
- if (snapshotInLock())
- snapshotIndexes(null);
+ if (snapshotInLock()) {
+ final GridH2QueryContext qctx = GridH2QueryContext.get();
+
+ assert qctx != null;
+
+ snapshotIndexes(null, qctx.segment());
+ }
return false;
}
@@ -255,21 +273,22 @@ public class GridH2Table extends TableBase {
/**
* @param qctx Query context.
+ * @param segment id of index segment to be snapshoted.
*/
- public void snapshotIndexes(GridH2QueryContext qctx) {
+ public void snapshotIndexes(GridH2QueryContext qctx, int segment) {
if (!snapshotEnabled)
return;
- Object[] snapshots;
+ Object[] segmentSnapshot;
// Try to reuse existing snapshots outside of the lock.
- for (long waitTime = 200;; waitTime *= 2) { // Increase wait time to avoid starvation.
- snapshots = actualSnapshot.get();
+ for (long waitTime = 200; ; waitTime *= 2) { // Increase wait time to avoid starvation.
+ segmentSnapshot = actualSnapshot.get(segment);
- if (snapshots != null) { // Reuse existing snapshot without locking.
- snapshots = doSnapshotIndexes(snapshots, qctx);
+ if (segmentSnapshot != null) { // Reuse existing snapshot without locking.
+ segmentSnapshot = doSnapshotIndexes(segment, segmentSnapshot, qctx);
- if (snapshots != null)
+ if (segmentSnapshot != null)
return; // Reused successfully.
}
@@ -281,17 +300,17 @@ public class GridH2Table extends TableBase {
ensureNotDestroyed();
// Try again inside of the lock.
- snapshots = actualSnapshot.get();
+ segmentSnapshot = actualSnapshot.get(segment);
- if (snapshots != null) // Try reusing.
- snapshots = doSnapshotIndexes(snapshots, qctx);
+ if (segmentSnapshot != null) // Try reusing.
+ segmentSnapshot = doSnapshotIndexes(segment, segmentSnapshot, qctx);
- if (snapshots == null) { // Reuse failed, produce new snapshots.
- snapshots = doSnapshotIndexes(null, qctx);
+ if (segmentSnapshot == null) { // Reuse failed, produce new snapshots.
+ segmentSnapshot = doSnapshotIndexes(segment,null, qctx);
- assert snapshots != null;
+ assert segmentSnapshot != null;
- actualSnapshot.set(snapshots);
+ actualSnapshot.set(segment, segmentSnapshot);
}
}
finally {
@@ -375,19 +394,22 @@ public class GridH2Table extends TableBase {
* Must be called inside of write lock because when using multiple indexes we have to ensure that all of them have
* the same contents at snapshot taking time.
*
+ * @param segment id of index segment snapshot.
+ * @param segmentSnapshot snapshot to be reused.
* @param qctx Query context.
* @return New indexes data snapshot.
*/
@SuppressWarnings("unchecked")
- private Object[] doSnapshotIndexes(Object[] snapshots, GridH2QueryContext qctx) {
+ private Object[] doSnapshotIndexes(int segment, Object[] segmentSnapshot, GridH2QueryContext qctx) {
assert snapshotEnabled;
- if (snapshots == null) // Nothing to reuse, create new snapshots.
- snapshots = new Object[idxs.size() - 2];
+ //TODO: make HashIndex snapshotable or remove it at all?
+ if (segmentSnapshot == null) // Nothing to reuse, create new snapshots.
+ segmentSnapshot = new Object[idxs.size() - pkIndexPos];
- // Take snapshots on all except first which is scan and second which is hash.
- for (int i = 2, len = idxs.size(); i < len; i++) {
- Object s = snapshots[i - 2];
+ // Take snapshots on all except first which is scan.
+ for (int i = pkIndexPos, len = idxs.size(); i < len; i++) {
+ Object s = segmentSnapshot[i - pkIndexPos];
boolean reuseExisting = s != null;
@@ -401,20 +423,20 @@ public class GridH2Table extends TableBase {
if (qctx != null)
qctx.clearSnapshots();
- for (int j = 2; j < i; j++)
+ for (int j = pkIndexPos; j < i; j++)
if ((idxs.get(j) instanceof GridH2IndexBase))
index(j).releaseSnapshot();
// Drop invalidated snapshot.
- actualSnapshot.compareAndSet(snapshots, null);
+ actualSnapshot.compareAndSet(segment, segmentSnapshot, null);
return null;
}
- snapshots[i - 2] = s;
+ segmentSnapshot[i - pkIndexPos] = s;
}
- return snapshots;
+ return segmentSnapshot;
}
/** {@inheritDoc} */
@@ -587,7 +609,7 @@ public class GridH2Table extends TableBase {
int len = idxs.size();
- int i = 2;
+ int i = pkIndexPos;
// Put row if absent to all indexes sequentially.
// Start from 3 because 0 - Scan (don't need to update), 1 - PK hash (already updated), 2 - PK (already updated).
@@ -609,7 +631,7 @@ public class GridH2Table extends TableBase {
if (old != null) {
// Remove row from all indexes.
// Start from 3 because 0 - Scan (don't need to update), 1 - PK hash (already updated), 2 - PK (already updated).
- for (int i = 3, len = idxs.size(); i < len; i++) {
+ for (int i = pkIndexPos + 1, len = idxs.size(); i < len; i++) {
if (!(idxs.get(i) instanceof GridH2IndexBase))
continue;
Row res = index(i).remove(old);
@@ -627,7 +649,8 @@ public class GridH2Table extends TableBase {
}
// The snapshot is not actual after update.
- actualSnapshot.set(null);
+ if (actualSnapshot != null)
+ actualSnapshot.set(pk.segmentForRow(row), null);
return true;
}
@@ -684,9 +707,10 @@ public class GridH2Table extends TableBase {
ArrayList<GridH2IndexBase> indexes() {
ArrayList<GridH2IndexBase> res = new ArrayList<>(idxs.size() - 2);
- for (int i = 2, len = idxs.size(); i < len; i++)
+ for (int i = pkIndexPos, len = idxs.size(); i < len; i++) {
if (idxs.get(i) instanceof GridH2IndexBase)
res.add(index(i));
+ }
return res;
}
@@ -695,6 +719,8 @@ public class GridH2Table extends TableBase {
*
*/
public void markRebuildFromHashInProgress(boolean value) {
+ assert !value || (idxs.size() >= 2 && index(1).getIndexType().isHash()) : "Table has no hash index.";
+
rebuildFromHashInProgress = value;
}
@@ -759,7 +785,8 @@ public class GridH2Table extends TableBase {
Index cloneIdx = createDuplicateIndexIfNeeded(idx);
- ArrayList<Index> newIdxs = new ArrayList<>(idxs.size() + ((cloneIdx == null) ? 1 : 2));
+ ArrayList<Index> newIdxs = new ArrayList<>(
+ idxs.size() + ((cloneIdx == null) ? 1 : 2));
newIdxs.addAll(idxs);
@@ -837,14 +864,14 @@ public class GridH2Table extends TableBase {
try {
ArrayList<Index> idxs = new ArrayList<>(this.idxs);
- Index targetIdx = (h2Idx instanceof GridH2ProxyIndex)?
- ((GridH2ProxyIndex)h2Idx).underlyingIndex(): h2Idx;
+ Index targetIdx = (h2Idx instanceof GridH2ProxyIndex) ?
+ ((GridH2ProxyIndex)h2Idx).underlyingIndex() : h2Idx;
- for (int i = 2; i < idxs.size();) {
+ for (int i = pkIndexPos; i < idxs.size();) {
Index idx = idxs.get(i);
if (idx == targetIdx || (idx instanceof GridH2ProxyIndex &&
- ((GridH2ProxyIndex)idx).underlyingIndex() == targetIdx)) {
+ ((GridH2ProxyIndex)idx).underlyingIndex() == targetIdx)) {
idxs.remove(i);
http://git-wip-us.apache.org/repos/asf/ignite/blob/1554a160/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 43cc230..6d76eea 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -573,7 +573,7 @@ public class GridMapQueryExecutor {
Objects.requireNonNull(h2Tbl, tbl.toString());
- h2Tbl.snapshotIndexes(qctx);
+ h2Tbl.snapshotIndexes(qctx, segmentId);
snapshotedTbls.add(h2Tbl);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1554a160/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSegmentedIndexSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSegmentedIndexSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSegmentedIndexSelfTest.java
index 586b81e..03c3f1e 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSegmentedIndexSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSegmentedIndexSelfTest.java
@@ -135,6 +135,31 @@ public class IgniteSqlSegmentedIndexSelfTest extends GridCommonAbstractTest {
}
/**
+ * Check correct index snapshots with segmented indices.
+ * @throws Exception If failed.
+ */
+ public void testSegmentedIndexReproducableResults() throws Exception {
+ ignite(0).createCache(cacheConfig(ORG_CACHE_NAME, true, Integer.class, Organization.class));
+
+ IgniteCache<Object, Object> cache = ignite(0).cache(ORG_CACHE_NAME);
+
+ // Unequal entries distribution among partitions.
+ int expectedSize = nodesCount() * QRY_PARALLELISM_LVL * 3 / 2;
+
+ for (int i = 0; i < expectedSize; i++)
+ cache.put(i, new Organization("org-" + i));
+
+ String select0 = "select * from \"org\".Organization o";
+
+ // Check for stable results.
+ for(int i = 0; i < 10; i++) {
+ List<List<?>> result = cache.query(new SqlFieldsQuery(select0)).getAll();
+
+ assertEquals(expectedSize, result.size());
+ }
+ }
+
+ /**
* Run tests on single-node grid
*
* @throws Exception If failed.