You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2017/03/01 14:32:57 UTC
[14/50] [abbrv] ignite git commit: Implemented.
http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
index 914e0da..0829df0 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
@@ -17,12 +17,15 @@
package org.apache.ignite.internal.processors.query.h2.opt;
+import java.lang.reflect.Field;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.NavigableMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.util.GridEmptyIterator;
import org.apache.ignite.internal.util.offheap.unsafe.GridOffHeapSnapTreeMap;
import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeGuard;
@@ -43,18 +46,36 @@ import org.h2.value.Value;
import org.jetbrains.annotations.Nullable;
/**
- * Base class for snapshotable tree indexes.
+ * Base class for snapshotable segmented tree indexes.
*/
@SuppressWarnings("ComparatorNotSerializable")
public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridSearchRowPointer> {
+
/** */
- private final ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> tree;
+ private static Field KEY_FIELD;
+
+ /** */
+ static {
+ try {
+ KEY_FIELD = GridH2AbstractKeyValueRow.class.getDeclaredField("key");
+ KEY_FIELD.setAccessible(true);
+ }
+ catch (NoSuchFieldException e) {
+ KEY_FIELD = null;
+ }
+ }
+
+ /** Index segments. */
+ private final ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row>[] segments;
/** */
private final boolean snapshotEnabled;
+ /** */
+ private final GridH2RowDescriptor desc;
+
/**
- * Constructor with index initialization.
+ * Constructor with index initialization. Creates index with single segment.
*
* @param name Index name.
* @param tbl Table.
@@ -63,35 +84,59 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
*/
@SuppressWarnings("unchecked")
public GridH2TreeIndex(String name, GridH2Table tbl, boolean pk, List<IndexColumn> colsList) {
+ this(name, tbl, pk, colsList, 1);
+ }
+
+ /**
+ * Constructor with index initialization.
+ *
+ * @param name Index name.
+ * @param tbl Table.
+ * @param pk If this index is primary key.
+ * @param colsList Index columns list.
+ * @param segmentsCnt Number of segments.
+ */
+ @SuppressWarnings("unchecked")
+ public GridH2TreeIndex(String name, GridH2Table tbl, boolean pk, List<IndexColumn> colsList, int segmentsCnt) {
+ assert segmentsCnt > 0 : segmentsCnt;
+
IndexColumn[] cols = colsList.toArray(new IndexColumn[colsList.size()]);
IndexColumn.mapColumns(cols, tbl);
+ desc = tbl.rowDescriptor();
+
initBaseIndex(tbl, 0, name, cols,
pk ? IndexType.createPrimaryKey(false, false) : IndexType.createNonUnique(false, false, false));
+ segments = new ConcurrentNavigableMap[segmentsCnt];
+
final GridH2RowDescriptor desc = tbl.rowDescriptor();
if (desc == null || desc.memory() == null) {
snapshotEnabled = desc == null || desc.snapshotableIndex();
if (snapshotEnabled) {
- tree = new SnapTreeMap<GridSearchRowPointer, GridH2Row>(this) {
- @Override protected void afterNodeUpdate_nl(Node<GridSearchRowPointer, GridH2Row> node, Object val) {
- if (val != null)
- node.key = (GridSearchRowPointer)val;
- }
+ for (int i = 0; i < segmentsCnt; i++) {
+ segments[i] = new SnapTreeMap<GridSearchRowPointer, GridH2Row>(this) {
+ @Override
+ protected void afterNodeUpdate_nl(Node<GridSearchRowPointer, GridH2Row> node, Object val) {
+ if (val != null)
+ node.key = (GridSearchRowPointer)val;
+ }
- @Override protected Comparable<? super GridSearchRowPointer> comparable(Object key) {
- if (key instanceof ComparableRow)
- return (Comparable<? super SearchRow>)key;
+ @Override protected Comparable<? super GridSearchRowPointer> comparable(Object key) {
+ if (key instanceof ComparableRow)
+ return (Comparable<? super SearchRow>)key;
- return super.comparable(key);
- }
- };
+ return super.comparable(key);
+ }
+ };
+ }
}
else {
- tree = new ConcurrentSkipListMap<>(
+ for (int i = 0; i < segmentsCnt; i++) {
+ segments[i] = new ConcurrentSkipListMap<>(
new Comparator<GridSearchRowPointer>() {
@Override public int compare(GridSearchRowPointer o1, GridSearchRowPointer o2) {
if (o1 instanceof ComparableRow)
@@ -103,7 +148,8 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
return compareRows(o1, o2);
}
}
- );
+ );
+ }
}
}
else {
@@ -111,28 +157,30 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
snapshotEnabled = true;
- tree = new GridOffHeapSnapTreeMap<GridSearchRowPointer, GridH2Row>(desc, desc, desc.memory(), desc.guard(), this) {
- @Override protected void afterNodeUpdate_nl(long node, GridH2Row val) {
- final long oldKey = keyPtr(node);
+ for (int i = 0; i < segmentsCnt; i++) {
+ segments[i] = new GridOffHeapSnapTreeMap<GridSearchRowPointer, GridH2Row>(desc, desc, desc.memory(), desc.guard(), this) {
+ @Override protected void afterNodeUpdate_nl(long node, GridH2Row val) {
+ final long oldKey = keyPtr(node);
- if (val != null) {
- key(node, val);
+ if (val != null) {
+ key(node, val);
- guard.finalizeLater(new Runnable() {
- @Override public void run() {
- desc.createPointer(oldKey).decrementRefCount();
- }
- });
+ guard.finalizeLater(new Runnable() {
+ @Override public void run() {
+ desc.createPointer(oldKey).decrementRefCount();
+ }
+ });
+ }
}
- }
- @Override protected Comparable<? super GridSearchRowPointer> comparable(Object key) {
- if (key instanceof ComparableRow)
- return (Comparable<? super SearchRow>)key;
+ @Override protected Comparable<? super GridSearchRowPointer> comparable(Object key) {
+ if (key instanceof ComparableRow)
+ return (Comparable<? super SearchRow>)key;
- return super.comparable(key);
- }
- };
+ return super.comparable(key);
+ }
+ };
+ }
}
initDistributedJoinMessaging(tbl);
@@ -142,20 +190,24 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
@Override protected Object doTakeSnapshot() {
assert snapshotEnabled;
+ int seg = threadLocalSegment();
+
+ ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> tree = segments[seg];
+
return tree instanceof SnapTreeMap ?
((SnapTreeMap)tree).clone() :
((GridOffHeapSnapTreeMap)tree).clone();
}
/** {@inheritDoc} */
- protected final ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> treeForRead() {
+ protected ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> treeForRead(int seg) {
if (!snapshotEnabled)
- return tree;
+ return segments[seg];
ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> res = threadLocalSnapshot();
if (res == null)
- res = tree;
+ return segments[seg];
return res;
}
@@ -164,19 +216,37 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
@Override public void destroy() {
assert threadLocalSnapshot() == null;
- if (tree instanceof AutoCloseable)
- U.closeQuiet((AutoCloseable)tree);
+ for (int i = 0; i < segments.length; i++) {
+ if (segments[i] instanceof AutoCloseable)
+ U.closeQuiet((AutoCloseable)segments[i]);
+ }
super.destroy();
}
+
+ /** {@inheritDoc} */
+ protected int threadLocalSegment() {
+ GridH2QueryContext qctx = GridH2QueryContext.get();
+
+ if(segments.length == 1)
+ return 0;
+
+ if(qctx == null)
+ throw new IllegalStateException("GridH2QueryContext is not initialized.");
+
+ return qctx.segment();
+ }
+
/** {@inheritDoc} */
@Override public long getRowCount(@Nullable Session ses) {
IndexingQueryFilter f = threadLocalFilter();
+ int seg = threadLocalSegment();
+
// Fast path if we don't need to perform any filtering.
if (f == null || f.forSpace((getTable()).spaceName()) == null)
- return treeForRead().size();
+ return treeForRead(seg).size();
Iterator<GridH2Row> iter = doFind(null, false, null);
@@ -255,7 +325,9 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
* @return Row.
*/
public GridH2Row findOne(GridSearchRowPointer row) {
- return tree.get(row);
+ int seg = threadLocalSegment();
+
+ return segments[seg].get(row);
}
/**
@@ -268,7 +340,9 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
*/
@SuppressWarnings("unchecked")
private Iterator<GridH2Row> doFind(@Nullable SearchRow first, boolean includeFirst, @Nullable SearchRow last) {
- ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> t = treeForRead();
+ int seg = threadLocalSegment();
+
+ ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> t = treeForRead(seg);
return doFind0(t, first, includeFirst, last, threadLocalFilter());
}
@@ -359,12 +433,68 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
/** {@inheritDoc} */
@Override public GridH2Row put(GridH2Row row) {
- return tree.put(row, row);
+ int seg = segment(row);
+
+ return segments[seg].put(row, row);
}
/** {@inheritDoc} */
@Override public GridH2Row remove(SearchRow row) {
- return tree.remove(comparable(row, 0));
+ GridSearchRowPointer comparable = comparable(row, 0);
+
+ int seg = segment(row);
+
+ return segments[seg].remove(comparable);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int segmentsCount() {
+ return segments.length;
+ }
+
+ /**
+ * @param partition Parttition idx.
+ * @return index currentSegment Id for given key
+ */
+ protected int segment(int partition) {
+ return partition % segments.length;
+ }
+
+ /**
+ * @param row
+ * @return index currentSegment Id for given row
+ */
+ private int segment(SearchRow row) {
+ assert row != null;
+
+ CacheObject key;
+
+ if (desc != null && desc.context() != null) {
+ GridCacheContext<?, ?> ctx = desc.context();
+
+ assert ctx != null;
+
+ if (row instanceof GridH2AbstractKeyValueRow && KEY_FIELD != null) {
+ try {
+ Object o = KEY_FIELD.get(row);
+
+ if (o instanceof CacheObject)
+ key = (CacheObject)o;
+ else
+ key = ctx.toCacheKeyObject(o);
+
+ }
+ catch (IllegalAccessException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ else
+ key = ctx.toCacheKeyObject(row.getValue(0));
+
+ return segment(ctx.affinity().partition(key));
+ }
+ else
+ return 0;
}
/**
@@ -463,18 +593,20 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
IndexColumn[] cols = getIndexColumns();
GridH2TreeIndex idx = new GridH2TreeIndex(getName(), getTable(),
- getIndexType().isUnique(), F.asList(cols));
+ getIndexType().isUnique(), F.asList(cols), segments.length);
Thread thread = Thread.currentThread();
- long i = 0;
+ long j = 0;
- for (GridH2Row row : tree.values()) {
- // Check for interruptions every 1000 iterations.
- if (++i % 1000 == 0 && thread.isInterrupted())
- throw new InterruptedException();
+ for (int i = 0; i < segments.length; i++) {
+ for (GridH2Row row : segments[i].values()) {
+ // Check for interruptions every 1000 iterations.
+ if ((++j & 1023) == 0 && thread.isInterrupted())
+ throw new InterruptedException();
- idx.tree.put(row, row);
+ idx.put(row);
+ }
}
return idx;
http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index ac1a6a6..5027c9a 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -29,6 +29,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReferenceArray;
import javax.cache.CacheException;
@@ -56,6 +57,7 @@ import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshalla
import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
import org.apache.ignite.internal.processors.query.GridQueryCancel;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2RetryException;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
@@ -84,6 +86,8 @@ import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED;
import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ;
import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE;
import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
+import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF;
+import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.distributedJoinMode;
import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.MAP;
import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.REPLICATED;
import static org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExecutor.QUERY_POOL;
@@ -161,8 +165,7 @@ public class GridMapQueryExecutor {
if (nodeRess == null)
return;
- for (QueryResults ress : nodeRess.results().values())
- ress.cancel(true);
+ nodeRess.cancelAll();
}
}, EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT);
@@ -235,12 +238,7 @@ public class GridMapQueryExecutor {
GridH2QueryContext.clear(ctx.localNodeId(), node.id(), qryReqId, MAP);
}
- QueryResults results = nodeRess.results().remove(qryReqId);
-
- if (results == null)
- return;
-
- results.cancel(true);
+ nodeRess.cancelRequest(qryReqId);
}
/**
@@ -427,6 +425,7 @@ public class GridMapQueryExecutor {
onQueryRequest0(node,
req.requestId(),
+ 0,
req.queries(),
cacheIds,
req.topologyVersion(),
@@ -434,7 +433,7 @@ public class GridMapQueryExecutor {
req.partitions(),
null,
req.pageSize(),
- false,
+ OFF,
req.timeout());
}
@@ -442,12 +441,49 @@ public class GridMapQueryExecutor {
* @param node Node.
* @param req Query request.
*/
- private void onQueryRequest(ClusterNode node, GridH2QueryRequest req) {
- Map<UUID,int[]> partsMap = req.partitions();
- int[] parts = partsMap == null ? null : partsMap.get(ctx.localNodeId());
+ private void onQueryRequest(final ClusterNode node, final GridH2QueryRequest req) throws IgniteCheckedException {
+ final Map<UUID,int[]> partsMap = req.partitions();
+ final int[] parts = partsMap == null ? null : partsMap.get(ctx.localNodeId());
+
+ assert req.caches() != null && !req.caches().isEmpty();
+
+ GridCacheContext<?, ?> mainCctx = ctx.cache().context().cacheContext( req.caches().get(0));
+
+ if (mainCctx == null)
+ throw new CacheException("Failed to find cache.");
+
+ final DistributedJoinMode joinMode = distributedJoinMode(
+ req.isFlagSet(GridH2QueryRequest.FLAG_IS_LOCAL),
+ req.isFlagSet(GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS));
+
+ for (int i = 1; i < mainCctx.config().getQueryParallelism(); i++) {
+ final int segment = i;
+
+ ctx.closure().callLocal(
+ new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ onQueryRequest0(node,
+ req.requestId(),
+ segment,
+ req.queries(),
+ req.caches(),
+ req.topologyVersion(),
+ partsMap,
+ parts,
+ req.tables(),
+ req.pageSize(),
+ joinMode,
+ req.timeout());
+
+ return null;
+ }
+ }
+ , QUERY_POOL);
+ }
onQueryRequest0(node,
req.requestId(),
+ 0,
req.queries(),
req.caches(),
req.topologyVersion(),
@@ -455,13 +491,14 @@ public class GridMapQueryExecutor {
parts,
req.tables(),
req.pageSize(),
- req.isFlagSet(GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS),
+ joinMode,
req.timeout());
}
/**
* @param node Node authored request.
* @param reqId Request ID.
+ * @param segmentId index segment ID.
* @param qrys Queries to execute.
* @param cacheIds Caches which will be affected by these queries.
* @param topVer Topology version.
@@ -469,11 +506,12 @@ public class GridMapQueryExecutor {
* @param parts Explicit partitions for current node.
* @param tbls Tables.
* @param pageSize Page size.
- * @param distributedJoins Can we expect distributed joins to be ran.
+ * @param distributedJoinMode Query distributed join mode.
*/
private void onQueryRequest0(
ClusterNode node,
long reqId,
+ int segmentId,
Collection<GridCacheSqlQuery> qrys,
List<Integer> cacheIds,
AffinityTopologyVersion topVer,
@@ -481,7 +519,7 @@ public class GridMapQueryExecutor {
int[] parts,
Collection<String> tbls,
int pageSize,
- boolean distributedJoins,
+ DistributedJoinMode distributedJoinMode,
int timeout
) {
// Prepare to run queries.
@@ -500,7 +538,7 @@ public class GridMapQueryExecutor {
if (topVer != null) {
// Reserve primary for topology version or explicit partitions.
if (!reservePartitions(cacheIds, topVer, parts, reserved)) {
- sendRetry(node, reqId);
+ sendRetry(node, reqId, segmentId);
return;
}
@@ -508,17 +546,18 @@ public class GridMapQueryExecutor {
qr = new QueryResults(reqId, qrys.size(), mainCctx);
- if (nodeRess.results().put(reqId, qr) != null)
+ if (nodeRess.put(reqId, segmentId, qr) != null)
throw new IllegalStateException();
// Prepare query context.
GridH2QueryContext qctx = new GridH2QueryContext(ctx.localNodeId(),
node.id(),
reqId,
+ segmentId,
mainCctx.isReplicated() ? REPLICATED : MAP)
.filter(h2.backupFilter(topVer, parts))
.partitionsMap(partsMap)
- .distributedJoins(distributedJoins)
+ .distributedJoinMode(distributedJoinMode)
.pageSize(pageSize)
.topologyVersion(topVer)
.reservations(reserved);
@@ -542,7 +581,7 @@ public class GridMapQueryExecutor {
Connection conn = h2.connectionForSpace(mainCctx.name());
// Here we enforce join order to have the same behavior on all the nodes.
- h2.setupConnection(conn, distributedJoins, true);
+ h2.setupConnection(conn, distributedJoinMode != OFF, true);
GridH2QueryContext.set(qctx);
@@ -553,13 +592,13 @@ public class GridMapQueryExecutor {
if (nodeRess.cancelled(reqId)) {
GridH2QueryContext.clear(ctx.localNodeId(), node.id(), reqId, qctx.type());
- nodeRess.results().remove(reqId);
+ nodeRess.cancelRequest(reqId);
throw new QueryCancelledException();
}
// Run queries.
- int i = 0;
+ int qryIdx = 0;
boolean evt = ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED);
@@ -567,7 +606,7 @@ public class GridMapQueryExecutor {
ResultSet rs = h2.executeSqlQueryWithTimer(mainCctx.name(), conn, qry.query(),
F.asList(qry.parameters()), true,
timeout,
- qr.cancels[i]);
+ qr.cancels[qryIdx]);
if (evt) {
ctx.event().record(new CacheQueryExecutedEvent<>(
@@ -587,24 +626,24 @@ public class GridMapQueryExecutor {
assert rs instanceof JdbcResultSet : rs.getClass();
- qr.addResult(i, qry, node.id(), rs);
+ qr.addResult(qryIdx, qry, node.id(), rs);
if (qr.canceled) {
- qr.result(i).close();
+ qr.result(qryIdx).close();
throw new QueryCancelledException();
}
// Send the first page.
- sendNextPage(nodeRess, node, qr, i, pageSize);
+ sendNextPage(nodeRess, node, qr, qryIdx, segmentId, pageSize);
- i++;
+ qryIdx++;
}
}
finally {
GridH2QueryContext.clearThreadLocal();
- if (!distributedJoins)
+ if (distributedJoinMode == OFF)
qctx.clearContext(false);
if (!F.isEmpty(snapshotedTbls)) {
@@ -615,13 +654,13 @@ public class GridMapQueryExecutor {
}
catch (Throwable e) {
if (qr != null) {
- nodeRess.results().remove(reqId, qr);
+ nodeRess.remove(reqId, segmentId, qr);
qr.cancel(false);
}
if (X.hasCause(e, GridH2RetryException.class))
- sendRetry(node, reqId);
+ sendRetry(node, reqId, segmentId);
else {
U.error(log, "Failed to execute local query.", e);
@@ -681,14 +720,14 @@ public class GridMapQueryExecutor {
return;
}
- QueryResults qr = nodeRess.results().get(req.queryRequestId());
+ QueryResults qr = nodeRess.get(req.queryRequestId(), req.segmentId());
if (qr == null)
sendError(node, req.queryRequestId(), new CacheException("No query result found for request: " + req));
else if (qr.canceled)
sendError(node, req.queryRequestId(), new QueryCancelledException());
else
- sendNextPage(nodeRess, node, qr, req.query(), req.pageSize());
+ sendNextPage(nodeRess, node, qr, req.query(), req.segmentId(), req.pageSize());
}
/**
@@ -696,9 +735,10 @@ public class GridMapQueryExecutor {
* @param node Node.
* @param qr Query results.
* @param qry Query.
+ * @param segmentId Index segment ID.
* @param pageSize Page size.
*/
- private void sendNextPage(NodeResults nodeRess, ClusterNode node, QueryResults qr, int qry,
+ private void sendNextPage(NodeResults nodeRess, ClusterNode node, QueryResults qr, int qry, int segmentId,
int pageSize) {
QueryResult res = qr.result(qry);
@@ -714,14 +754,14 @@ public class GridMapQueryExecutor {
res.close();
if (qr.isAllClosed())
- nodeRess.results().remove(qr.qryReqId, qr);
+ nodeRess.remove(qr.qryReqId, segmentId, qr);
}
try {
boolean loc = node.isLocal();
- GridQueryNextPageResponse msg = new GridQueryNextPageResponse(qr.qryReqId, qry, page,
- page == 0 ? res.rowCnt : -1 ,
+ GridQueryNextPageResponse msg = new GridQueryNextPageResponse(qr.qryReqId, segmentId, qry, page,
+ page == 0 ? res.rowCnt : -1,
res.cols,
loc ? null : toMessages(rows, new ArrayList<Message>(res.cols)),
loc ? rows : null);
@@ -741,12 +781,13 @@ public class GridMapQueryExecutor {
/**
* @param node Node.
* @param reqId Request ID.
+ * @param segmentId Index segment ID.
*/
- private void sendRetry(ClusterNode node, long reqId) {
+ private void sendRetry(ClusterNode node, long reqId, int segmentId) {
try {
boolean loc = node.isLocal();
- GridQueryNextPageResponse msg = new GridQueryNextPageResponse(reqId,
+ GridQueryNextPageResponse msg = new GridQueryNextPageResponse(reqId, segmentId,
/*qry*/0, /*page*/0, /*allRows*/0, /*cols*/1,
loc ? null : Collections.<Message>emptyList(),
loc ? Collections.<Value[]>emptyList() : null);
@@ -780,35 +821,118 @@ public class GridMapQueryExecutor {
*/
private static class NodeResults {
/** */
- private final ConcurrentMap<Long, QueryResults> res = new ConcurrentHashMap8<>();
+ private final ConcurrentMap<RequestKey, QueryResults> res = new ConcurrentHashMap8<>();
/** */
private final GridBoundedConcurrentLinkedHashMap<Long, Boolean> qryHist =
new GridBoundedConcurrentLinkedHashMap<>(1024, 1024, 0.75f, 64, PER_SEGMENT_Q);
/**
- * @return All results.
+ * @param reqId Query Request ID.
+ * @return {@code False} if query was already cancelled.
*/
- ConcurrentMap<Long, QueryResults> results() {
- return res;
+ boolean cancelled(long reqId) {
+ return qryHist.get(reqId) != null;
}
/**
- * @param qryId Query ID.
- * @return {@code False} if query was already cancelled.
+ * @param reqId Query Request ID.
+ * @return {@code True} if cancelled.
*/
- boolean cancelled(long qryId) {
- return qryHist.get(qryId) != null;
+ boolean onCancel(long reqId) {
+ Boolean old = qryHist.putIfAbsent(reqId, Boolean.FALSE);
+
+ return old == null;
}
/**
- * @param qryId Query ID.
- * @return {@code True} if cancelled.
+ * @param reqId Query Request ID.
+ * @param segmentId Index segment ID.
+ * @return query partial results.
*/
- boolean onCancel(long qryId) {
- Boolean old = qryHist.putIfAbsent(qryId, Boolean.FALSE);
+ public QueryResults get(long reqId, int segmentId) {
+ return res.get(new RequestKey(reqId, segmentId));
+ }
- return old == null;
+ /**
+ * Cancel all thread of given request.
+ * @param reqID Request ID.
+ */
+ public void cancelRequest(long reqID) {
+ for (RequestKey key : res.keySet()) {
+ if (key.reqId == reqID) {
+ QueryResults removed = res.remove(key);
+
+ if (removed != null)
+ removed.cancel(true);
+ }
+
+ }
+ }
+
+ /**
+ * @param reqId Query Request ID.
+ * @param segmentId Index segment ID.
+ * @param qr Query Results.
+ * @return {@code True} if removed.
+ */
+ public boolean remove(long reqId, int segmentId, QueryResults qr) {
+ return res.remove(new RequestKey(reqId, segmentId), qr);
+ }
+
+ /**
+ * @param reqId Query Request ID.
+ * @param segmentId Index segment ID.
+ * @param qr Query Results.
+ * @return previous value.
+ */
+ public QueryResults put(long reqId, int segmentId, QueryResults qr) {
+ return res.put(new RequestKey(reqId, segmentId), qr);
+ }
+
+ /**
+ * Cancel all node queries.
+ */
+ public void cancelAll() {
+ for (QueryResults ress : res.values())
+ ress.cancel(true);
+ }
+
+ /**
+ *
+ */
+ private static class RequestKey {
+ /** */
+ private long reqId;
+
+ /** */
+ private int segmentId;
+
+ /** Constructor */
+ RequestKey(long reqId, int segmentId) {
+ this.reqId = reqId;
+ this.segmentId = segmentId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ RequestKey other = (RequestKey)o;
+
+ return reqId == other.reqId && segmentId == other.segmentId;
+
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int result = (int)(reqId ^ (reqId >>> 32));
+ result = 31 * result + segmentId;
+ return result;
+ }
}
}
@@ -836,7 +960,8 @@ public class GridMapQueryExecutor {
* @param qrys Number of queries.
* @param cctx Cache context.
*/
- private QueryResults(long qryReqId, int qrys, GridCacheContext<?,?> cctx) {
+ @SuppressWarnings("unchecked")
+ private QueryResults(long qryReqId, int qrys, GridCacheContext<?, ?> cctx) {
this.qryReqId = qryReqId;
this.cctx = cctx;
http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
index c267f4a..45d3c58 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
@@ -59,7 +59,7 @@ public abstract class GridMergeIndex extends BaseIndex {
private final AtomicInteger expRowsCnt = new AtomicInteger(0);
/** Remaining rows per source node ID. */
- private Map<UUID, Counter> remainingRows;
+ private Map<UUID, Counter[]> remainingRows;
/** */
private final AtomicBoolean lastSubmitted = new AtomicBoolean();
@@ -141,15 +141,22 @@ public abstract class GridMergeIndex extends BaseIndex {
* Set source nodes.
*
* @param nodes Nodes.
+ * @param segmentsCnt Index segments per table.
*/
- public void setSources(Collection<ClusterNode> nodes) {
+ public void setSources(Collection<ClusterNode> nodes, int segmentsCnt) {
assert remainingRows == null;
remainingRows = U.newHashMap(nodes.size());
for (ClusterNode node : nodes) {
- if (remainingRows.put(node.id(), new Counter()) != null)
+ Counter[] counters = new Counter[segmentsCnt];
+
+ for (int i = 0; i < segmentsCnt; i++)
+ counters[i] = new Counter();
+
+ if (remainingRows.put(node.id(), counters) != null)
throw new IllegalStateException("Duplicate node id: " + node.id());
+
}
}
@@ -194,7 +201,7 @@ public abstract class GridMergeIndex extends BaseIndex {
public final void addPage(GridResultPage page) {
int pageRowsCnt = page.rowsInPage();
- Counter cnt = remainingRows.get(page.source());
+ Counter cnt = remainingRows.get(page.source())[page.res.segmentId()];
// RemainingRowsCount should be updated before page adding to avoid race
// in GridMergeIndexUnsorted cursor iterator
@@ -231,13 +238,14 @@ public abstract class GridMergeIndex extends BaseIndex {
// Guarantee that finished state possible only if counter is zero and all pages was added
cnt.state = State.FINISHED;
- for (Counter c : remainingRows.values()) { // Check all the sources.
- if (c.state != State.FINISHED)
- return;
+ for (Counter[] cntrs : remainingRows.values()) { // Check all the sources.
+ for(int i = 0; i < cntrs.length; i++) {
+ if (cntrs[i].state != State.FINISHED)
+ return;
+ }
}
if (lastSubmitted.compareAndSet(false, true)) {
- // Add page-marker that last page was added
addPage0(new GridResultPage(null, page.source(), null) {
@Override public boolean isLast() {
return true;
@@ -256,7 +264,20 @@ public abstract class GridMergeIndex extends BaseIndex {
* @param page Page.
*/
protected void fetchNextPage(GridResultPage page) {
- if (remainingRows.get(page.source()).get() != 0)
+ assert !page.isLast();
+
+ if(page.isFail())
+ page.fetchNextPage(); //rethrow exceptions
+
+ assert page.res != null;
+
+ Counter[] counters = remainingRows.get(page.source());
+
+ int segId = page.res.segmentId();
+
+ Counter counter = counters[segId];
+
+ if (counter.get() != 0)
page.fetchNextPage();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index f54fab6..8837046 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -62,9 +62,9 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartit
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
-import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator;
import org.apache.ignite.internal.processors.query.GridQueryCancel;
+import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext;
import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQuerySplitter;
@@ -100,6 +100,7 @@ import org.jsr166.ConcurrentHashMap8;
import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE;
import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL_FIELDS;
+import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF;
import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.REDUCE;
/**
@@ -294,6 +295,7 @@ public class GridReduceQueryExecutor {
private void onNextPage(final ClusterNode node, GridQueryNextPageResponse msg) {
final long qryReqId = msg.queryRequestId();
final int qry = msg.query();
+ final int seg = msg.segmentId();
final QueryRun r = runs.get(qryReqId);
@@ -326,7 +328,7 @@ public class GridReduceQueryExecutor {
}
try {
- GridQueryNextPageRequest msg0 = new GridQueryNextPageRequest(qryReqId, qry, pageSize);
+ GridQueryNextPageRequest msg0 = new GridQueryNextPageRequest(qryReqId, qry, seg, pageSize);
if (node.isLocal())
h2.mapQueryExecutor().onMessage(ctx.localNodeId(), msg0);
@@ -514,29 +516,33 @@ public class GridReduceQueryExecutor {
// Explicit partition mapping for unstable topology.
Map<ClusterNode, IntArray> partsMap = null;
- if (isPreloadingActive(cctx, extraSpaces)) {
- if (cctx.isReplicated())
- nodes = replicatedUnstableDataNodes(cctx, extraSpaces);
- else {
- partsMap = partitionedUnstableDataNodes(cctx, extraSpaces);
+ if (qry.isLocal())
+ nodes = Collections.singleton(ctx.discovery().localNode());
+ else {
+ if (isPreloadingActive(cctx, extraSpaces)) {
+ if (cctx.isReplicated())
+ nodes = replicatedUnstableDataNodes(cctx, extraSpaces);
+ else {
+ partsMap = partitionedUnstableDataNodes(cctx, extraSpaces);
- nodes = partsMap == null ? null : partsMap.keySet();
+ nodes = partsMap == null ? null : partsMap.keySet();
+ }
}
- }
- else
- nodes = stableDataNodes(topVer, cctx, extraSpaces);
+ else
+ nodes = stableDataNodes(topVer, cctx, extraSpaces);
- if (nodes == null)
- continue; // Retry.
+ if (nodes == null)
+ continue; // Retry.
- assert !nodes.isEmpty();
+ assert !nodes.isEmpty();
- if (cctx.isReplicated() || qry.explain()) {
- assert qry.explain() || !nodes.contains(ctx.discovery().localNode()) :
- "We must be on a client node.";
+ if (cctx.isReplicated() || qry.explain()) {
+ assert qry.explain() || !nodes.contains(ctx.discovery().localNode()) :
+ "We must be on a client node.";
- // Select random data node to run query on a replicated data or get EXPLAIN PLAN from a single node.
- nodes = Collections.singleton(F.rand(nodes));
+ // Select random data node to run query on a replicated data or get EXPLAIN PLAN from a single node.
+ nodes = Collections.singleton(F.rand(nodes));
+ }
}
final Collection<ClusterNode> finalNodes = nodes;
@@ -545,6 +551,8 @@ public class GridReduceQueryExecutor {
final boolean skipMergeTbl = !qry.explain() && qry.skipMergeTable();
+ final int segmentsPerIndex = cctx.config().getQueryParallelism();
+
for (GridCacheSqlQuery mapQry : qry.mapQueries()) {
GridMergeIndex idx;
@@ -565,12 +573,12 @@ public class GridReduceQueryExecutor {
else
idx = GridMergeIndexUnsorted.createDummy(ctx);
- idx.setSources(nodes);
+ idx.setSources(nodes, segmentsPerIndex);
r.idxs.add(idx);
}
- r.latch = new CountDownLatch(r.idxs.size() * nodes.size());
+ r.latch = new CountDownLatch(r.idxs.size() * nodes.size() * segmentsPerIndex);
runs.put(qryReqId, r);
@@ -626,14 +634,13 @@ public class GridReduceQueryExecutor {
.tables(distributedJoins ? qry.tables() : null)
.partitions(convert(partsMap))
.queries(mapQrys)
- .flags(distributedJoins ? GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS : 0)
+ .flags((qry.isLocal() ? GridH2QueryRequest.FLAG_IS_LOCAL : 0) |
+ (distributedJoins ? GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS : 0))
.timeout(timeoutMillis),
oldStyle && partsMap != null ? new ExplicitPartitionsSpecializer(partsMap) : null,
- distributedJoins)
- ) {
- awaitAllReplies(r, nodes);
+ false)) {
- cancel.checkCancelled();
+ awaitAllReplies(r, nodes, cancel);
Object state = r.state.get();
@@ -696,7 +703,7 @@ public class GridReduceQueryExecutor {
h2.setupConnection(r.conn, false, enforceJoinOrder);
GridH2QueryContext.set(new GridH2QueryContext(locNodeId, locNodeId, qryReqId, REDUCE)
- .pageSize(r.pageSize).distributedJoins(false));
+ .pageSize(r.pageSize).distributedJoinMode(OFF));
try {
if (qry.explain())
@@ -817,11 +824,15 @@ public class GridReduceQueryExecutor {
/**
* @param r Query run.
* @param nodes Nodes to check periodically if they alive.
+ * @param cancel Query cancel.
* @throws IgniteInterruptedCheckedException If interrupted.
*/
- private void awaitAllReplies(QueryRun r, Collection<ClusterNode> nodes)
- throws IgniteInterruptedCheckedException {
+ private void awaitAllReplies(QueryRun r, Collection<ClusterNode> nodes, GridQueryCancel cancel)
+ throws IgniteInterruptedCheckedException, QueryCancelledException {
while (!U.await(r.latch, 500, TimeUnit.MILLISECONDS)) {
+
+ cancel.checkCancelled();
+
for (ClusterNode node : nodes) {
if (!ctx.discovery().alive(node)) {
handleNodeLeft(r, node.id());
http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2IndexRangeRequest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2IndexRangeRequest.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2IndexRangeRequest.java
index e49c48f..b2548cc 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2IndexRangeRequest.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2IndexRangeRequest.java
@@ -38,6 +38,12 @@ public class GridH2IndexRangeRequest implements Message {
private long qryId;
/** */
+ private int originSegmentId;
+
+ /** */
+ private int segmentId;
+
+ /** */
private int batchLookupId;
/** */
@@ -87,6 +93,34 @@ public class GridH2IndexRangeRequest implements Message {
}
/**
+ * @param segmentId Index segment ID.
+ */
+ public void segment(int segmentId) {
+ this.segmentId = segmentId;
+ }
+
+ /**
+ * @return Index segment ID.
+ */
+ public int segment() {
+ return segmentId;
+ }
+
+ /**
+ * @return Origin index segment ID.
+ */
+ public int originSegmentId() {
+ return originSegmentId;
+ }
+
+ /**
+ * @param segmentId Origin index segment ID.
+ */
+ public void originSegmentId(int segmentId) {
+ this.originSegmentId = segmentId;
+ }
+
+ /**
* @param batchLookupId Batch lookup ID.
*/
public void batchLookupId(int batchLookupId) {
@@ -136,6 +170,15 @@ public class GridH2IndexRangeRequest implements Message {
writer.incrementState();
+ case 4:
+ if (!writer.writeInt("segmentId", segmentId))
+ return false;
+
+ case 5:
+ if (!writer.writeInt("originSegId", originSegmentId))
+ return false;
+
+ writer.incrementState();
}
return true;
@@ -181,6 +224,21 @@ public class GridH2IndexRangeRequest implements Message {
reader.incrementState();
+ case 4:
+ segmentId = reader.readInt("segmentId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 5:
+ originSegmentId = reader.readInt("originSegId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
}
return reader.afterMessageRead(GridH2IndexRangeRequest.class);
@@ -193,7 +251,7 @@ public class GridH2IndexRangeRequest implements Message {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 4;
+ return 6;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2IndexRangeResponse.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2IndexRangeResponse.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2IndexRangeResponse.java
index c6414bd..4d3db12 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2IndexRangeResponse.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2IndexRangeResponse.java
@@ -47,6 +47,12 @@ public class GridH2IndexRangeResponse implements Message {
private long qryId;
/** */
+ private int segmentId;
+
+ /** */
+ private int originSegmentId;
+
+ /** */
private int batchLookupId;
/** */
@@ -130,6 +136,34 @@ public class GridH2IndexRangeResponse implements Message {
}
/**
+ * @param segmentId Index segment ID.
+ */
+ public void segment(int segmentId) {
+ this.segmentId = segmentId;
+ }
+
+ /**
+ * @return Index segment ID.
+ */
+ public int segment() {
+ return segmentId;
+ }
+
+ /**
+ * @return Origin index segment ID.
+ */
+ public int originSegmentId() {
+ return originSegmentId;
+ }
+
+ /**
+ * @param segmentId Origin index segment ID.
+ */
+ public void originSegmentId(int segmentId) {
+ this.originSegmentId = segmentId;
+ }
+
+ /**
* @param batchLookupId Batch lookup ID.
*/
public void batchLookupId(int batchLookupId) {
@@ -191,6 +225,17 @@ public class GridH2IndexRangeResponse implements Message {
writer.incrementState();
+ case 6:
+ if (!writer.writeInt("originSegId", originSegmentId))
+ return false;
+
+ writer.incrementState();
+
+ case 7:
+ if (!writer.writeInt("segmentId", segmentId))
+ return false;
+
+ writer.incrementState();
}
return true;
@@ -252,6 +297,21 @@ public class GridH2IndexRangeResponse implements Message {
reader.incrementState();
+ case 6:
+ originSegmentId = reader.readInt("originSegId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 7:
+ segmentId = reader.readInt("segmentId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
}
return reader.afterMessageRead(GridH2IndexRangeResponse.class);
@@ -264,7 +324,7 @@ public class GridH2IndexRangeResponse implements Message {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 6;
+ return 8;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
index 884173f..ec49aff 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
@@ -50,6 +50,11 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
*/
public static int FLAG_DISTRIBUTED_JOINS = 1;
+ /**
+ * Restrict distributed joins range-requests to local index segments. Range requests to other nodes will not be sent.
+ */
+ public static int FLAG_IS_LOCAL = 2;
+
/** */
private long reqId;
http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSegmentedIndexSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSegmentedIndexSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSegmentedIndexSelfTest.java
new file mode 100644
index 0000000..f8c9dd5
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSegmentedIndexSelfTest.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query;
+
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import javax.cache.Cache;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheKeyConfiguration;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Tests for correct distributed queries with index consisted of many segments.
+ */
+public class IgniteSqlSegmentedIndexSelfTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static int QRY_PARALLELISM_LVL = 97;
+
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ CacheKeyConfiguration keyCfg = new CacheKeyConfiguration("MyCache", "affKey");
+
+ cfg.setCacheKeyConfiguration(keyCfg);
+
+ cfg.setPeerClassLoadingEnabled(false);
+
+ TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+ disco.setIpFinder(ipFinder);
+
+ cfg.setDiscoverySpi(disco);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids(true);
+ }
+
+ /**
+ * @param name Cache name.
+ * @param partitioned Partition or replicated cache.
+ * @param idxTypes Indexed types.
+ * @return Cache configuration.
+ */
+ private static <K, V> CacheConfiguration<K, V> cacheConfig(String name, boolean partitioned, Class<?>... idxTypes) {
+ return new CacheConfiguration<K, V>()
+ .setName(name)
+ .setCacheMode(partitioned ? CacheMode.PARTITIONED : CacheMode.REPLICATED)
+ .setQueryParallelism(partitioned ? QRY_PARALLELISM_LVL : 1)
+ .setAtomicityMode(CacheAtomicityMode.ATOMIC)
+ .setIndexedTypes(idxTypes);
+ }
+
+ /**
+ * Run tests on single-node grid
+ * @throws Exception If failed.
+ */
+ public void testSingleNodeIndexSegmentation() throws Exception {
+ startGridsMultiThreaded(1, true);
+
+ ignite(0).createCache(cacheConfig("pers", true, Integer.class, Person.class));
+ ignite(0).createCache(cacheConfig("org", true, Integer.class, Organization.class));
+
+ fillCache();
+
+ checkDistributedQueryWithSegmentedIndex();
+
+ checkLocalQueryWithSegmentedIndex();
+ }
+
+ /**
+ * Run tests on multi-node grid
+ * @throws Exception If failed.
+ */
+ public void testMultiNodeIndexSegmentation() throws Exception {
+ startGridsMultiThreaded(4, true);
+
+ ignite(0).createCache(cacheConfig("pers", true, Integer.class, Person.class));
+ ignite(0).createCache(cacheConfig("org", true, Integer.class, Organization.class));
+
+ fillCache();
+
+ checkDistributedQueryWithSegmentedIndex();
+
+ checkLocalQueryWithSegmentedIndex();
+ }
+
+ /**
+ * Run tests on multi-node grid
+ * @throws Exception If failed.
+ */
+ public void testMultiNodeSegmentedPartitionedWithReplicated() throws Exception {
+ startGridsMultiThreaded(4, true);
+
+ ignite(0).createCache(cacheConfig("pers", true, Integer.class, Person.class));
+ ignite(0).createCache(cacheConfig("org", false, Integer.class, Organization.class));
+
+ fillCache();
+
+ checkDistributedQueryWithSegmentedIndex();
+
+ checkLocalQueryWithSegmentedIndex();
+ }
+
+ /**
+ * Check distributed joins.
+ * @throws Exception If failed.
+ */
+ public void checkDistributedQueryWithSegmentedIndex() throws Exception {
+ IgniteCache<Integer, Person> c1 = ignite(0).cache("pers");
+
+ int expectedPersons = 0;
+
+ for (Cache.Entry<Integer, Person> e : c1) {
+ final Integer orgId = e.getValue().orgId;
+
+ if (10 <= orgId && orgId < 500)
+ expectedPersons++;
+ }
+
+ String select0 = "select o.name n1, p.name n2 from \"pers\".Person p, \"org\".Organization o where p.orgId = o._key";
+
+ List<List<?>> result = c1.query(new SqlFieldsQuery(select0).setDistributedJoins(true)).getAll();
+
+ assertEquals(expectedPersons, result.size());
+ }
+
+ /**
+ * Test local query.
+ * @throws Exception If failed.
+ */
+ public void checkLocalQueryWithSegmentedIndex() throws Exception {
+ IgniteCache<Integer, Person> c1 = ignite(0).cache("pers");
+ IgniteCache<Integer, Organization> c2 = ignite(0).cache("org");
+
+ Set<Integer> localOrgIds = new HashSet<>();
+
+ for(Cache.Entry<Integer, Organization> e : c2.localEntries())
+ localOrgIds.add(e.getKey());
+
+ int expectedPersons = 0;
+
+ for (Cache.Entry<Integer, Person> e : c1.localEntries()) {
+ final Integer orgId = e.getValue().orgId;
+
+ if (localOrgIds.contains(orgId))
+ expectedPersons++;
+ }
+
+ String select0 = "select o.name n1, p.name n2 from \"pers\".Person p, \"org\".Organization o where p.orgId = o._key";
+
+ List<List<?>> result = c1.query(new SqlFieldsQuery(select0).setLocal(true)).getAll();
+
+ assertEquals(expectedPersons, result.size());
+ }
+
+ /** */
+ private void fillCache() {
+ IgniteCache<Object, Object> c1 = ignite(0).cache("pers");
+
+ IgniteCache<Object, Object> c2 = ignite(0).cache("org");
+
+ final int orgCount = 500;
+
+ for (int i = 0; i < orgCount; i++)
+ c2.put(i, new Organization("org-" + i));
+
+ final Random random = new Random();
+
+ for (int i = 0; i < 1000; i++) {
+ int orgID = 10 + random.nextInt(orgCount + 10);
+
+ c1.put(i, new Person(orgID, "pers-" + i));
+ }
+ }
+
+ /**
+ *
+ */
+ private static class Person implements Serializable {
+ /** */
+ @QuerySqlField(index = true)
+ Integer orgId;
+
+ /** */
+ @QuerySqlField
+ String name;
+
+ /**
+ *
+ */
+ public Person() {
+ // No-op.
+ }
+
+ /**
+ * @param orgId Organization ID.
+ * @param name Name.
+ */
+ public Person(int orgId, String name) {
+ this.orgId = orgId;
+ this.name = name;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class Organization implements Serializable {
+ /** */
+ @QuerySqlField
+ String name;
+
+ /**
+ *
+ */
+ public Organization() {
+ // No-op.
+ }
+
+ /**
+ * @param name Organization name.
+ */
+ public Organization(String name) {
+ this.name = name;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
index 06afe7c..4ae2f91 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
@@ -33,9 +33,10 @@ import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheKeyConfiguration;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CachePeekMode;
-import org.apache.ignite.cache.affinity.AffinityKeyMapped;
import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.affinity.AffinityKeyMapped;
+import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
import org.apache.ignite.cluster.ClusterNode;
@@ -701,6 +702,7 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest {
ignite(0).destroyCache(cache.getName());
}
}
+
/**
* @throws Exception If failed.
*/
@@ -750,6 +752,127 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest {
}
/**
+ * @throws Exception If failed.
+ */
+ public void testIndexSegmentation() throws Exception {
+ CacheConfiguration ccfg1 = cacheConfig("pers", true,
+ Integer.class, Person2.class).setQueryParallelism(4);
+ CacheConfiguration ccfg2 = cacheConfig("org", true,
+ Integer.class, Organization.class).setQueryParallelism(4);
+
+ IgniteCache<Object, Object> c1 = ignite(0).getOrCreateCache(ccfg1);
+ IgniteCache<Object, Object> c2 = ignite(0).getOrCreateCache(ccfg2);
+
+ try {
+ c2.put(1, new Organization("o1"));
+ c2.put(2, new Organization("o2"));
+ c1.put(3, new Person2(1, "p1"));
+ c1.put(4, new Person2(2, "p2"));
+ c1.put(5, new Person2(3, "p3"));
+
+ String select0 = "select o.name n1, p.name n2 from \"pers\".Person2 p, \"org\".Organization o where p.orgId = o._key and o._key=1";
+
+ checkQueryPlan(c1, true, 1, new SqlFieldsQuery(select0));
+
+ checkQueryPlan(c1, true, 1, new SqlFieldsQuery(select0).setLocal(true));
+ }
+ finally {
+ c1.destroy();
+ c2.destroy();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReplicationCacheIndexSegmentationFailure() throws Exception {
+ GridTestUtils.assertThrows(log, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ CacheConfiguration ccfg = cacheConfig("org", false,
+ Integer.class, Organization.class).setQueryParallelism(4);
+
+ IgniteCache<Object, Object> c = ignite(0).createCache(ccfg);
+
+ return null;
+ }
+ }, CacheException.class, "Cache index segmentation is supported for PARTITIONED mode only.");
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testIndexSegmentationPartitionedReplicated() throws Exception {
+ CacheConfiguration ccfg1 = cacheConfig("pers", true,
+ Integer.class, Person2.class).setQueryParallelism(4);
+ CacheConfiguration ccfg2 = cacheConfig("org", false,
+ Integer.class, Organization.class);
+
+ final IgniteCache<Object, Object> c1 = ignite(0).getOrCreateCache(ccfg1);
+ final IgniteCache<Object, Object> c2 = ignite(0).getOrCreateCache(ccfg2);
+
+ try {
+ c2.put(1, new Organization("o1"));
+ c2.put(2, new Organization("o2"));
+ c1.put(3, new Person2(1, "p1"));
+ c1.put(4, new Person2(2, "p2"));
+ c1.put(5, new Person2(3, "p3"));
+
+ String select0 = "select o.name n1, p.name n2 from \"pers\".Person2 p, \"org\".Organization o where p.orgId = o._key";
+
+ final SqlFieldsQuery qry = new SqlFieldsQuery(select0);
+
+ qry.setDistributedJoins(true);
+
+ List<List<?>> results = c1.query(qry).getAll();
+
+ assertEquals(2, results.size());
+ }
+ finally {
+ c1.destroy();
+ c2.destroy();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testIndexWithDifferentSegmentationLevelsFailure() throws Exception {
+ CacheConfiguration ccfg1 = cacheConfig("pers", true,
+ Integer.class, Person2.class).setQueryParallelism(4);
+ CacheConfiguration ccfg2 = cacheConfig("org", true,
+ Integer.class, Organization.class).setQueryParallelism(3);
+
+ final IgniteCache<Object, Object> c1 = ignite(0).getOrCreateCache(ccfg1);
+ final IgniteCache<Object, Object> c2 = ignite(0).getOrCreateCache(ccfg2);
+
+ try {
+ c2.put(1, new Organization("o1"));
+ c2.put(2, new Organization("o2"));
+ c1.put(3, new Person2(1, "p1"));
+ c1.put(4, new Person2(2, "p2"));
+ c1.put(5, new Person2(3, "p3"));
+
+ String select0 = "select o.name n1, p.name n2 from \"pers\".Person2 p, \"org\".Organization o where p.orgId = o._key and o._key=1";
+
+ final SqlFieldsQuery qry = new SqlFieldsQuery(select0);
+
+ qry.setDistributedJoins(true);
+
+ GridTestUtils.assertThrows(log, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ c1.query(qry);
+
+ return null;
+ }
+ }, CacheException.class, "Using indexes with different parallelism levels in same query is forbidden.");
+ }
+ finally {
+ c1.destroy();
+ c2.destroy();
+ }
+ }
+
+ /**
* @param cache Cache.
* @param sql SQL.
* @param enforceJoinOrder Enforce join order flag.
@@ -789,26 +912,26 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest {
false,
0,
select +
- "from " + cache1 + "," + cache2 + " "+ where);
+ "from " + cache1 + "," + cache2 + " " + where);
checkQueryPlan(cache,
false,
0,
select +
- "from " + cache2 + "," + cache1 + " "+ where);
+ "from " + cache2 + "," + cache1 + " " + where);
if (testEnforceJoinOrder) {
checkQueryPlan(cache,
true,
0,
select +
- "from " + cache1 + "," + cache2 + " "+ where);
+ "from " + cache1 + "," + cache2 + " " + where);
checkQueryPlan(cache,
true,
0,
select +
- "from " + cache2 + "," + cache1 + " "+ where);
+ "from " + cache2 + "," + cache1 + " " + where);
}
}
@@ -823,7 +946,7 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest {
boolean enforceJoinOrder,
int expBatchedJoins,
String sql,
- String...expText) {
+ String... expText) {
checkQueryPlan(cache,
enforceJoinOrder,
expBatchedJoins,
@@ -848,7 +971,7 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest {
boolean enforceJoinOrder,
int expBatchedJoins,
SqlFieldsQuery qry,
- String...expText) {
+ String... expText) {
qry.setEnforceJoinOrder(enforceJoinOrder);
qry.setDistributedJoins(true);
@@ -984,7 +1107,7 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest {
* @param args Arguments.
* @return Column as list.
*/
- private static <X> List<X> columnQuery(IgniteCache<?,?> c, String qry, Object... args) {
+ private static <X> List<X> columnQuery(IgniteCache<?, ?> c, String qry, Object... args) {
return column(0, c.query(new SqlFieldsQuery(qry).setArgs(args)).getAll());
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java
index 52084c7..d6a5fb1 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java
@@ -230,16 +230,16 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract
assertEquals(0, spi.size(typeAB.space(), typeAB));
assertEquals(0, spi.size(typeBA.space(), typeBA));
- assertFalse(spi.queryLocalSql(typeAA.space(), "select * from A.A", null, Collections.emptySet(), typeAA, null).hasNext());
- assertFalse(spi.queryLocalSql(typeAB.space(), "select * from A.B", null, Collections.emptySet(), typeAB, null).hasNext());
- assertFalse(spi.queryLocalSql(typeBA.space(), "select * from B.A", null, Collections.emptySet(), typeBA, null).hasNext());
+ assertFalse(spi.queryLocalSql(typeAA.space(), "select * from A.A", null, Collections.emptySet(), typeAA.name(), null, null).hasNext());
+ assertFalse(spi.queryLocalSql(typeAB.space(), "select * from A.B", null, Collections.emptySet(), typeAB.name(), null, null).hasNext());
+ assertFalse(spi.queryLocalSql(typeBA.space(), "select * from B.A", null, Collections.emptySet(), typeBA.name(), null, null).hasNext());
assertFalse(spi.queryLocalSql(typeBA.space(), "select * from B.A, A.B, A.A", null,
- Collections.emptySet(), typeBA, null).hasNext());
+ Collections.emptySet(), typeBA.name(), null, null).hasNext());
try {
spi.queryLocalSql(typeBA.space(), "select aa.*, ab.*, ba.* from A.A aa, A.B ab, B.A ba", null,
- Collections.emptySet(), typeBA, null).hasNext();
+ Collections.emptySet(), typeBA.name(), null, null).hasNext();
fail("Enumerations of aliases in select block must be prohibited");
}
@@ -248,10 +248,10 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract
}
assertFalse(spi.queryLocalSql(typeAB.space(), "select ab.* from A.B ab", null,
- Collections.emptySet(), typeAB, null).hasNext());
+ Collections.emptySet(), typeAB.name(), null, null).hasNext());
assertFalse(spi.queryLocalSql(typeBA.space(), "select ba.* from B.A as ba", null,
- Collections.emptySet(), typeBA, null).hasNext());
+ Collections.emptySet(), typeBA.name(), null, null).hasNext());
// Nothing to remove.
spi.remove("A", key(1), aa(1, "", 10));
@@ -305,7 +305,7 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract
// Query data.
Iterator<IgniteBiTuple<Integer, Map<String, Object>>> res =
- spi.queryLocalSql(typeAA.space(), "from a order by age", null, Collections.emptySet(), typeAA, null);
+ spi.queryLocalSql(typeAA.space(), "from a order by age", null, Collections.emptySet(), typeAA.name(), null, null);
assertTrue(res.hasNext());
assertEquals(aa(3, "Borya", 18).value(null, false), value(res.next()));
@@ -314,7 +314,7 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract
assertFalse(res.hasNext());
res = spi.queryLocalSql(typeAA.space(), "select aa.* from a aa order by aa.age", null,
- Collections.emptySet(), typeAA, null);
+ Collections.emptySet(), typeAA.name(), null, null);
assertTrue(res.hasNext());
assertEquals(aa(3, "Borya", 18).value(null, false), value(res.next()));
@@ -322,7 +322,7 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract
assertEquals(aa(2, "Valera", 19).value(null, false), value(res.next()));
assertFalse(res.hasNext());
- res = spi.queryLocalSql(typeAB.space(), "from b order by name", null, Collections.emptySet(), typeAB, null);
+ res = spi.queryLocalSql(typeAB.space(), "from b order by name", null, Collections.emptySet(), typeAB.name(), null, null);
assertTrue(res.hasNext());
assertEquals(ab(1, "Vasya", 20, "Some text about Vasya goes here.").value(null, false), value(res.next()));
@@ -331,7 +331,7 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract
assertFalse(res.hasNext());
res = spi.queryLocalSql(typeAB.space(), "select bb.* from b as bb order by bb.name", null,
- Collections.emptySet(), typeAB, null);
+ Collections.emptySet(), typeAB.name(), null, null);
assertTrue(res.hasNext());
assertEquals(ab(1, "Vasya", 20, "Some text about Vasya goes here.").value(null, false), value(res.next()));
@@ -340,7 +340,7 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract
assertFalse(res.hasNext());
- res = spi.queryLocalSql(typeBA.space(), "from a", null, Collections.emptySet(), typeBA, null);
+ res = spi.queryLocalSql(typeBA.space(), "from a", null, Collections.emptySet(), typeBA.name(), null, null);
assertTrue(res.hasNext());
assertEquals(ba(2, "Kolya", 25, true).value(null, false), value(res.next()));
@@ -725,4 +725,4 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract
throw new UnsupportedOperationException();
}
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/test/java/org/apache/ignite/loadtests/h2indexing/FetchingQueryCursorStressTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/loadtests/h2indexing/FetchingQueryCursorStressTest.java b/modules/indexing/src/test/java/org/apache/ignite/loadtests/h2indexing/FetchingQueryCursorStressTest.java
new file mode 100644
index 0000000..d1d80f2
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/loadtests/h2indexing/FetchingQueryCursorStressTest.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.loadtests.h2indexing;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.cache.CacheException;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.marshaller.optimized.OptimizedMarshaller;
+
+/**
+ * SQL query stress test.
+ */
+public class FetchingQueryCursorStressTest {
+ /** Node count. */
+ private static final int NODE_CNT = 4; // Switch to 4 to see better throughput.
+
+ /** Number of entries. */
+ private static final int ENTRIES_CNT = 10_000;
+
+ /** Cache name. */
+ private static final String CACHE_NAME = "cache";
+
+ /** Thread count. */
+ private static final int THREAD_CNT = 16;
+
+ /** Execution counter. */
+ private static final AtomicLong CNT = new AtomicLong();
+
+ /** Verbose mode. */
+ private static final boolean VERBOSE = false;
+
+ /** */
+ private static final long TIMEOUT = TimeUnit.SECONDS.toMillis(30);
+
+ public static final AtomicReference<Exception> error = new AtomicReference<>();
+
+ /**
+ * Entry point.
+ */
+ public static void main(String[] args) throws Exception {
+ List<Thread> threads = new ArrayList<>(THREAD_CNT + 1);
+
+ try (Ignite ignite = start()) {
+
+ IgniteCache<Integer, Person> cache = ignite.cache(CACHE_NAME);
+
+ loadData(ignite, cache);
+
+ System.out.println("Loaded data: " + cache.size());
+
+ for (int i = 0; i < THREAD_CNT; i++)
+ threads.add(startDaemon("qry-exec-" + i, new QueryExecutor(cache, "Select * from Person")));
+
+ threads.add(startDaemon("printer", new ThroughputPrinter()));
+
+ Thread.sleep(TIMEOUT);
+
+ for (Thread t : threads)
+ t.join();
+
+ if(error.get()!=null)
+ throw error.get();
+ }
+ finally {
+ Ignition.stopAll(false);
+ }
+ }
+
+ /**
+ * Start daemon thread.
+ *
+ * @param name Name.
+ * @param r Runnable.
+ */
+ private static Thread startDaemon(String name, Runnable r) {
+ Thread t = new Thread(r);
+
+ t.setName(name);
+ t.setDaemon(true);
+
+ t.start();
+
+ return t;
+ }
+
+ /**
+ * Load data into Ignite.
+ *
+ * @param ignite Ignite.
+ * @param cache Cache.
+ */
+ private static void loadData(Ignite ignite, IgniteCache<Integer, Person> cache) throws Exception {
+ try (IgniteDataStreamer<Object, Object> str = ignite.dataStreamer(cache.getName())) {
+
+ for (int id = 0; id < ENTRIES_CNT; id++)
+ str.addData(id, new Person(id, "John" + id, "Doe"));
+ }
+ }
+
+ /**
+ * Start topology.
+ *
+ * @return Client node.
+ */
+ private static Ignite start() {
+ int i = 0;
+
+ for (; i < NODE_CNT; i++)
+ Ignition.start(config(i, false));
+
+ return Ignition.start(config(i, true));
+ }
+
+ /**
+ * Create configuration.
+ *
+ * @param idx Index.
+ * @param client Client flag.
+ * @return Configuration.
+ */
+ @SuppressWarnings("unchecked")
+ private static IgniteConfiguration config(int idx, boolean client) {
+ IgniteConfiguration cfg = new IgniteConfiguration();
+
+ cfg.setGridName("grid-" + idx);
+ cfg.setClientMode(client);
+
+ CacheConfiguration ccfg = new CacheConfiguration();
+
+ ccfg.setName(CACHE_NAME);
+ ccfg.setIndexedTypes(Integer.class, Person.class);
+ cfg.setMarshaller(new OptimizedMarshaller());
+
+ cfg.setCacheConfiguration(ccfg);
+
+ cfg.setLocalHost("127.0.0.1");
+
+ return cfg;
+ }
+
+ /**
+ *
+ */
+ private static class Person implements Serializable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ @QuerySqlField
+ private int id;
+
+ /** */
+ @QuerySqlField
+ private String firstName;
+
+ /** */
+ @QuerySqlField
+ private String lastName;
+
+ public Person(int id, String firstName, String lastName) {
+ this.id = id;
+ this.firstName = firstName;
+ this.lastName = lastName;
+ }
+ }
+
+ /**
+ * Query runner.
+ */
+ private static class QueryExecutor implements Runnable {
+ /** Cache. */
+ private final IgniteCache<Integer, Person> cache;
+
+ /** */
+ private final String query;
+
+ /**
+ * Constructor.
+ *
+ * @param cache Cache.
+ */
+ public QueryExecutor(IgniteCache<Integer, Person> cache, String query) {
+ this.cache = cache;
+ this.query = query;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("InfiniteLoopStatement")
+ @Override public void run() {
+ System.out.println("Executor started: " + Thread.currentThread().getName());
+
+ try {
+ while (error.get()==null && !Thread.currentThread().isInterrupted()) {
+ long start = System.nanoTime();
+
+ SqlFieldsQuery qry = new SqlFieldsQuery(query);
+
+// qry.setArgs((Object[]) argumentForQuery());
+
+ Set<Integer> extIds = new HashSet<>();
+
+ for (List<?> next : cache.query(qry))
+ extIds.add((Integer)next.get(0));
+
+ long dur = (System.nanoTime() - start) / 1_000_000;
+
+ CNT.incrementAndGet();
+
+ if (VERBOSE)
+ System.out.println("[extIds=" + extIds.size() + ", dur=" + dur + ']');
+ }
+ }
+ catch (CacheException ex){
+ error.compareAndSet(null, ex);
+ }
+ }
+ }
+
+ /**
+ * Throughput printer.
+ */
+ private static class ThroughputPrinter implements Runnable {
+ /** {@inheritDoc} */
+ @SuppressWarnings("InfiniteLoopStatement")
+ @Override public void run() {
+ while (error.get()==null) {
+ long before = CNT.get();
+ long beforeTime = System.currentTimeMillis();
+
+ try {
+ Thread.sleep(2000L);
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+
+ return;
+ }
+
+ long after = CNT.get();
+ long afterTime = System.currentTimeMillis();
+
+ double res = 1000 * ((double)(after - before)) / (afterTime - beforeTime);
+
+ System.out.println((long)res + " ops/sec");
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index 386b8fd..b417b0a 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -98,6 +98,7 @@ import org.apache.ignite.internal.processors.cache.query.IndexingSpiQuerySelfTes
import org.apache.ignite.internal.processors.cache.query.IndexingSpiQueryTxSelfTest;
import org.apache.ignite.internal.processors.query.IgniteSqlEntryCacheModeAgnosticTest;
import org.apache.ignite.internal.processors.query.IgniteSqlSchemaIndexingTest;
+import org.apache.ignite.internal.processors.query.IgniteSqlSegmentedIndexSelfTest;
import org.apache.ignite.internal.processors.query.IgniteSqlSplitterSelfTest;
import org.apache.ignite.internal.processors.query.h2.GridH2IndexRebuildTest;
import org.apache.ignite.internal.processors.query.h2.GridH2IndexingInMemSelfTest;
@@ -134,6 +135,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
// Queries tests.
suite.addTestSuite(IgniteSqlSplitterSelfTest.class);
+ suite.addTestSuite(IgniteSqlSegmentedIndexSelfTest.class);
suite.addTestSuite(IgniteSqlSchemaIndexingTest.class);
suite.addTestSuite(GridCacheQueryIndexDisabledSelfTest.class);
suite.addTestSuite(IgniteCacheQueryLoadSelfTest.class);