You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2017/02/22 10:18:06 UTC
[01/12] ignite git commit: Implemented.
Repository: ignite
Updated Branches:
refs/heads/ignite-1.9 963a9d9fe -> 658b4ad5a
http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
index 914e0da..0829df0 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
@@ -17,12 +17,15 @@
package org.apache.ignite.internal.processors.query.h2.opt;
+import java.lang.reflect.Field;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.NavigableMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.util.GridEmptyIterator;
import org.apache.ignite.internal.util.offheap.unsafe.GridOffHeapSnapTreeMap;
import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeGuard;
@@ -43,18 +46,36 @@ import org.h2.value.Value;
import org.jetbrains.annotations.Nullable;
/**
- * Base class for snapshotable tree indexes.
+ * Base class for snapshotable segmented tree indexes.
*/
@SuppressWarnings("ComparatorNotSerializable")
public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridSearchRowPointer> {
+
/** */
- private final ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> tree;
+ private static Field KEY_FIELD;
+
+ /** */
+ static {
+ try {
+ KEY_FIELD = GridH2AbstractKeyValueRow.class.getDeclaredField("key");
+ KEY_FIELD.setAccessible(true);
+ }
+ catch (NoSuchFieldException e) {
+ KEY_FIELD = null;
+ }
+ }
+
+ /** Index segments. */
+ private final ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row>[] segments;
/** */
private final boolean snapshotEnabled;
+ /** */
+ private final GridH2RowDescriptor desc;
+
/**
- * Constructor with index initialization.
+ * Constructor with index initialization. Creates index with single segment.
*
* @param name Index name.
* @param tbl Table.
@@ -63,35 +84,59 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
*/
@SuppressWarnings("unchecked")
public GridH2TreeIndex(String name, GridH2Table tbl, boolean pk, List<IndexColumn> colsList) {
+ this(name, tbl, pk, colsList, 1);
+ }
+
+ /**
+ * Constructor with index initialization.
+ *
+ * @param name Index name.
+ * @param tbl Table.
+ * @param pk If this index is primary key.
+ * @param colsList Index columns list.
+ * @param segmentsCnt Number of segments.
+ */
+ @SuppressWarnings("unchecked")
+ public GridH2TreeIndex(String name, GridH2Table tbl, boolean pk, List<IndexColumn> colsList, int segmentsCnt) {
+ assert segmentsCnt > 0 : segmentsCnt;
+
IndexColumn[] cols = colsList.toArray(new IndexColumn[colsList.size()]);
IndexColumn.mapColumns(cols, tbl);
+ desc = tbl.rowDescriptor();
+
initBaseIndex(tbl, 0, name, cols,
pk ? IndexType.createPrimaryKey(false, false) : IndexType.createNonUnique(false, false, false));
+ segments = new ConcurrentNavigableMap[segmentsCnt];
+
final GridH2RowDescriptor desc = tbl.rowDescriptor();
if (desc == null || desc.memory() == null) {
snapshotEnabled = desc == null || desc.snapshotableIndex();
if (snapshotEnabled) {
- tree = new SnapTreeMap<GridSearchRowPointer, GridH2Row>(this) {
- @Override protected void afterNodeUpdate_nl(Node<GridSearchRowPointer, GridH2Row> node, Object val) {
- if (val != null)
- node.key = (GridSearchRowPointer)val;
- }
+ for (int i = 0; i < segmentsCnt; i++) {
+ segments[i] = new SnapTreeMap<GridSearchRowPointer, GridH2Row>(this) {
+ @Override
+ protected void afterNodeUpdate_nl(Node<GridSearchRowPointer, GridH2Row> node, Object val) {
+ if (val != null)
+ node.key = (GridSearchRowPointer)val;
+ }
- @Override protected Comparable<? super GridSearchRowPointer> comparable(Object key) {
- if (key instanceof ComparableRow)
- return (Comparable<? super SearchRow>)key;
+ @Override protected Comparable<? super GridSearchRowPointer> comparable(Object key) {
+ if (key instanceof ComparableRow)
+ return (Comparable<? super SearchRow>)key;
- return super.comparable(key);
- }
- };
+ return super.comparable(key);
+ }
+ };
+ }
}
else {
- tree = new ConcurrentSkipListMap<>(
+ for (int i = 0; i < segmentsCnt; i++) {
+ segments[i] = new ConcurrentSkipListMap<>(
new Comparator<GridSearchRowPointer>() {
@Override public int compare(GridSearchRowPointer o1, GridSearchRowPointer o2) {
if (o1 instanceof ComparableRow)
@@ -103,7 +148,8 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
return compareRows(o1, o2);
}
}
- );
+ );
+ }
}
}
else {
@@ -111,28 +157,30 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
snapshotEnabled = true;
- tree = new GridOffHeapSnapTreeMap<GridSearchRowPointer, GridH2Row>(desc, desc, desc.memory(), desc.guard(), this) {
- @Override protected void afterNodeUpdate_nl(long node, GridH2Row val) {
- final long oldKey = keyPtr(node);
+ for (int i = 0; i < segmentsCnt; i++) {
+ segments[i] = new GridOffHeapSnapTreeMap<GridSearchRowPointer, GridH2Row>(desc, desc, desc.memory(), desc.guard(), this) {
+ @Override protected void afterNodeUpdate_nl(long node, GridH2Row val) {
+ final long oldKey = keyPtr(node);
- if (val != null) {
- key(node, val);
+ if (val != null) {
+ key(node, val);
- guard.finalizeLater(new Runnable() {
- @Override public void run() {
- desc.createPointer(oldKey).decrementRefCount();
- }
- });
+ guard.finalizeLater(new Runnable() {
+ @Override public void run() {
+ desc.createPointer(oldKey).decrementRefCount();
+ }
+ });
+ }
}
- }
- @Override protected Comparable<? super GridSearchRowPointer> comparable(Object key) {
- if (key instanceof ComparableRow)
- return (Comparable<? super SearchRow>)key;
+ @Override protected Comparable<? super GridSearchRowPointer> comparable(Object key) {
+ if (key instanceof ComparableRow)
+ return (Comparable<? super SearchRow>)key;
- return super.comparable(key);
- }
- };
+ return super.comparable(key);
+ }
+ };
+ }
}
initDistributedJoinMessaging(tbl);
@@ -142,20 +190,24 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
@Override protected Object doTakeSnapshot() {
assert snapshotEnabled;
+ int seg = threadLocalSegment();
+
+ ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> tree = segments[seg];
+
return tree instanceof SnapTreeMap ?
((SnapTreeMap)tree).clone() :
((GridOffHeapSnapTreeMap)tree).clone();
}
/** {@inheritDoc} */
- protected final ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> treeForRead() {
+ protected ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> treeForRead(int seg) {
if (!snapshotEnabled)
- return tree;
+ return segments[seg];
ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> res = threadLocalSnapshot();
if (res == null)
- res = tree;
+ return segments[seg];
return res;
}
@@ -164,19 +216,37 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
@Override public void destroy() {
assert threadLocalSnapshot() == null;
- if (tree instanceof AutoCloseable)
- U.closeQuiet((AutoCloseable)tree);
+ for (int i = 0; i < segments.length; i++) {
+ if (segments[i] instanceof AutoCloseable)
+ U.closeQuiet((AutoCloseable)segments[i]);
+ }
super.destroy();
}
+
+ /** {@inheritDoc} */
+ protected int threadLocalSegment() {
+ GridH2QueryContext qctx = GridH2QueryContext.get();
+
+ if(segments.length == 1)
+ return 0;
+
+ if(qctx == null)
+ throw new IllegalStateException("GridH2QueryContext is not initialized.");
+
+ return qctx.segment();
+ }
+
/** {@inheritDoc} */
@Override public long getRowCount(@Nullable Session ses) {
IndexingQueryFilter f = threadLocalFilter();
+ int seg = threadLocalSegment();
+
// Fast path if we don't need to perform any filtering.
if (f == null || f.forSpace((getTable()).spaceName()) == null)
- return treeForRead().size();
+ return treeForRead(seg).size();
Iterator<GridH2Row> iter = doFind(null, false, null);
@@ -255,7 +325,9 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
* @return Row.
*/
public GridH2Row findOne(GridSearchRowPointer row) {
- return tree.get(row);
+ int seg = threadLocalSegment();
+
+ return segments[seg].get(row);
}
/**
@@ -268,7 +340,9 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
*/
@SuppressWarnings("unchecked")
private Iterator<GridH2Row> doFind(@Nullable SearchRow first, boolean includeFirst, @Nullable SearchRow last) {
- ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> t = treeForRead();
+ int seg = threadLocalSegment();
+
+ ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> t = treeForRead(seg);
return doFind0(t, first, includeFirst, last, threadLocalFilter());
}
@@ -359,12 +433,68 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
/** {@inheritDoc} */
@Override public GridH2Row put(GridH2Row row) {
- return tree.put(row, row);
+ int seg = segment(row);
+
+ return segments[seg].put(row, row);
}
/** {@inheritDoc} */
@Override public GridH2Row remove(SearchRow row) {
- return tree.remove(comparable(row, 0));
+ GridSearchRowPointer comparable = comparable(row, 0);
+
+ int seg = segment(row);
+
+ return segments[seg].remove(comparable);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int segmentsCount() {
+ return segments.length;
+ }
+
+ /**
+ * @param partition Parttition idx.
+ * @return index currentSegment Id for given key
+ */
+ protected int segment(int partition) {
+ return partition % segments.length;
+ }
+
+ /**
+ * @param row
+ * @return index currentSegment Id for given row
+ */
+ private int segment(SearchRow row) {
+ assert row != null;
+
+ CacheObject key;
+
+ if (desc != null && desc.context() != null) {
+ GridCacheContext<?, ?> ctx = desc.context();
+
+ assert ctx != null;
+
+ if (row instanceof GridH2AbstractKeyValueRow && KEY_FIELD != null) {
+ try {
+ Object o = KEY_FIELD.get(row);
+
+ if (o instanceof CacheObject)
+ key = (CacheObject)o;
+ else
+ key = ctx.toCacheKeyObject(o);
+
+ }
+ catch (IllegalAccessException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ else
+ key = ctx.toCacheKeyObject(row.getValue(0));
+
+ return segment(ctx.affinity().partition(key));
+ }
+ else
+ return 0;
}
/**
@@ -463,18 +593,20 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
IndexColumn[] cols = getIndexColumns();
GridH2TreeIndex idx = new GridH2TreeIndex(getName(), getTable(),
- getIndexType().isUnique(), F.asList(cols));
+ getIndexType().isUnique(), F.asList(cols), segments.length);
Thread thread = Thread.currentThread();
- long i = 0;
+ long j = 0;
- for (GridH2Row row : tree.values()) {
- // Check for interruptions every 1000 iterations.
- if (++i % 1000 == 0 && thread.isInterrupted())
- throw new InterruptedException();
+ for (int i = 0; i < segments.length; i++) {
+ for (GridH2Row row : segments[i].values()) {
+ // Check for interruptions every 1000 iterations.
+ if ((++j & 1023) == 0 && thread.isInterrupted())
+ throw new InterruptedException();
- idx.tree.put(row, row);
+ idx.put(row);
+ }
}
return idx;
http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index ac1a6a6..5027c9a 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -29,6 +29,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReferenceArray;
import javax.cache.CacheException;
@@ -56,6 +57,7 @@ import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshalla
import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
import org.apache.ignite.internal.processors.query.GridQueryCancel;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2RetryException;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
@@ -84,6 +86,8 @@ import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED;
import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ;
import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE;
import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
+import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF;
+import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.distributedJoinMode;
import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.MAP;
import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.REPLICATED;
import static org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExecutor.QUERY_POOL;
@@ -161,8 +165,7 @@ public class GridMapQueryExecutor {
if (nodeRess == null)
return;
- for (QueryResults ress : nodeRess.results().values())
- ress.cancel(true);
+ nodeRess.cancelAll();
}
}, EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT);
@@ -235,12 +238,7 @@ public class GridMapQueryExecutor {
GridH2QueryContext.clear(ctx.localNodeId(), node.id(), qryReqId, MAP);
}
- QueryResults results = nodeRess.results().remove(qryReqId);
-
- if (results == null)
- return;
-
- results.cancel(true);
+ nodeRess.cancelRequest(qryReqId);
}
/**
@@ -427,6 +425,7 @@ public class GridMapQueryExecutor {
onQueryRequest0(node,
req.requestId(),
+ 0,
req.queries(),
cacheIds,
req.topologyVersion(),
@@ -434,7 +433,7 @@ public class GridMapQueryExecutor {
req.partitions(),
null,
req.pageSize(),
- false,
+ OFF,
req.timeout());
}
@@ -442,12 +441,49 @@ public class GridMapQueryExecutor {
* @param node Node.
* @param req Query request.
*/
- private void onQueryRequest(ClusterNode node, GridH2QueryRequest req) {
- Map<UUID,int[]> partsMap = req.partitions();
- int[] parts = partsMap == null ? null : partsMap.get(ctx.localNodeId());
+ private void onQueryRequest(final ClusterNode node, final GridH2QueryRequest req) throws IgniteCheckedException {
+ final Map<UUID,int[]> partsMap = req.partitions();
+ final int[] parts = partsMap == null ? null : partsMap.get(ctx.localNodeId());
+
+ assert req.caches() != null && !req.caches().isEmpty();
+
+ GridCacheContext<?, ?> mainCctx = ctx.cache().context().cacheContext( req.caches().get(0));
+
+ if (mainCctx == null)
+ throw new CacheException("Failed to find cache.");
+
+ final DistributedJoinMode joinMode = distributedJoinMode(
+ req.isFlagSet(GridH2QueryRequest.FLAG_IS_LOCAL),
+ req.isFlagSet(GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS));
+
+ for (int i = 1; i < mainCctx.config().getQueryParallelism(); i++) {
+ final int segment = i;
+
+ ctx.closure().callLocal(
+ new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ onQueryRequest0(node,
+ req.requestId(),
+ segment,
+ req.queries(),
+ req.caches(),
+ req.topologyVersion(),
+ partsMap,
+ parts,
+ req.tables(),
+ req.pageSize(),
+ joinMode,
+ req.timeout());
+
+ return null;
+ }
+ }
+ , QUERY_POOL);
+ }
onQueryRequest0(node,
req.requestId(),
+ 0,
req.queries(),
req.caches(),
req.topologyVersion(),
@@ -455,13 +491,14 @@ public class GridMapQueryExecutor {
parts,
req.tables(),
req.pageSize(),
- req.isFlagSet(GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS),
+ joinMode,
req.timeout());
}
/**
* @param node Node authored request.
* @param reqId Request ID.
+ * @param segmentId index segment ID.
* @param qrys Queries to execute.
* @param cacheIds Caches which will be affected by these queries.
* @param topVer Topology version.
@@ -469,11 +506,12 @@ public class GridMapQueryExecutor {
* @param parts Explicit partitions for current node.
* @param tbls Tables.
* @param pageSize Page size.
- * @param distributedJoins Can we expect distributed joins to be ran.
+ * @param distributedJoinMode Query distributed join mode.
*/
private void onQueryRequest0(
ClusterNode node,
long reqId,
+ int segmentId,
Collection<GridCacheSqlQuery> qrys,
List<Integer> cacheIds,
AffinityTopologyVersion topVer,
@@ -481,7 +519,7 @@ public class GridMapQueryExecutor {
int[] parts,
Collection<String> tbls,
int pageSize,
- boolean distributedJoins,
+ DistributedJoinMode distributedJoinMode,
int timeout
) {
// Prepare to run queries.
@@ -500,7 +538,7 @@ public class GridMapQueryExecutor {
if (topVer != null) {
// Reserve primary for topology version or explicit partitions.
if (!reservePartitions(cacheIds, topVer, parts, reserved)) {
- sendRetry(node, reqId);
+ sendRetry(node, reqId, segmentId);
return;
}
@@ -508,17 +546,18 @@ public class GridMapQueryExecutor {
qr = new QueryResults(reqId, qrys.size(), mainCctx);
- if (nodeRess.results().put(reqId, qr) != null)
+ if (nodeRess.put(reqId, segmentId, qr) != null)
throw new IllegalStateException();
// Prepare query context.
GridH2QueryContext qctx = new GridH2QueryContext(ctx.localNodeId(),
node.id(),
reqId,
+ segmentId,
mainCctx.isReplicated() ? REPLICATED : MAP)
.filter(h2.backupFilter(topVer, parts))
.partitionsMap(partsMap)
- .distributedJoins(distributedJoins)
+ .distributedJoinMode(distributedJoinMode)
.pageSize(pageSize)
.topologyVersion(topVer)
.reservations(reserved);
@@ -542,7 +581,7 @@ public class GridMapQueryExecutor {
Connection conn = h2.connectionForSpace(mainCctx.name());
// Here we enforce join order to have the same behavior on all the nodes.
- h2.setupConnection(conn, distributedJoins, true);
+ h2.setupConnection(conn, distributedJoinMode != OFF, true);
GridH2QueryContext.set(qctx);
@@ -553,13 +592,13 @@ public class GridMapQueryExecutor {
if (nodeRess.cancelled(reqId)) {
GridH2QueryContext.clear(ctx.localNodeId(), node.id(), reqId, qctx.type());
- nodeRess.results().remove(reqId);
+ nodeRess.cancelRequest(reqId);
throw new QueryCancelledException();
}
// Run queries.
- int i = 0;
+ int qryIdx = 0;
boolean evt = ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED);
@@ -567,7 +606,7 @@ public class GridMapQueryExecutor {
ResultSet rs = h2.executeSqlQueryWithTimer(mainCctx.name(), conn, qry.query(),
F.asList(qry.parameters()), true,
timeout,
- qr.cancels[i]);
+ qr.cancels[qryIdx]);
if (evt) {
ctx.event().record(new CacheQueryExecutedEvent<>(
@@ -587,24 +626,24 @@ public class GridMapQueryExecutor {
assert rs instanceof JdbcResultSet : rs.getClass();
- qr.addResult(i, qry, node.id(), rs);
+ qr.addResult(qryIdx, qry, node.id(), rs);
if (qr.canceled) {
- qr.result(i).close();
+ qr.result(qryIdx).close();
throw new QueryCancelledException();
}
// Send the first page.
- sendNextPage(nodeRess, node, qr, i, pageSize);
+ sendNextPage(nodeRess, node, qr, qryIdx, segmentId, pageSize);
- i++;
+ qryIdx++;
}
}
finally {
GridH2QueryContext.clearThreadLocal();
- if (!distributedJoins)
+ if (distributedJoinMode == OFF)
qctx.clearContext(false);
if (!F.isEmpty(snapshotedTbls)) {
@@ -615,13 +654,13 @@ public class GridMapQueryExecutor {
}
catch (Throwable e) {
if (qr != null) {
- nodeRess.results().remove(reqId, qr);
+ nodeRess.remove(reqId, segmentId, qr);
qr.cancel(false);
}
if (X.hasCause(e, GridH2RetryException.class))
- sendRetry(node, reqId);
+ sendRetry(node, reqId, segmentId);
else {
U.error(log, "Failed to execute local query.", e);
@@ -681,14 +720,14 @@ public class GridMapQueryExecutor {
return;
}
- QueryResults qr = nodeRess.results().get(req.queryRequestId());
+ QueryResults qr = nodeRess.get(req.queryRequestId(), req.segmentId());
if (qr == null)
sendError(node, req.queryRequestId(), new CacheException("No query result found for request: " + req));
else if (qr.canceled)
sendError(node, req.queryRequestId(), new QueryCancelledException());
else
- sendNextPage(nodeRess, node, qr, req.query(), req.pageSize());
+ sendNextPage(nodeRess, node, qr, req.query(), req.segmentId(), req.pageSize());
}
/**
@@ -696,9 +735,10 @@ public class GridMapQueryExecutor {
* @param node Node.
* @param qr Query results.
* @param qry Query.
+ * @param segmentId Index segment ID.
* @param pageSize Page size.
*/
- private void sendNextPage(NodeResults nodeRess, ClusterNode node, QueryResults qr, int qry,
+ private void sendNextPage(NodeResults nodeRess, ClusterNode node, QueryResults qr, int qry, int segmentId,
int pageSize) {
QueryResult res = qr.result(qry);
@@ -714,14 +754,14 @@ public class GridMapQueryExecutor {
res.close();
if (qr.isAllClosed())
- nodeRess.results().remove(qr.qryReqId, qr);
+ nodeRess.remove(qr.qryReqId, segmentId, qr);
}
try {
boolean loc = node.isLocal();
- GridQueryNextPageResponse msg = new GridQueryNextPageResponse(qr.qryReqId, qry, page,
- page == 0 ? res.rowCnt : -1 ,
+ GridQueryNextPageResponse msg = new GridQueryNextPageResponse(qr.qryReqId, segmentId, qry, page,
+ page == 0 ? res.rowCnt : -1,
res.cols,
loc ? null : toMessages(rows, new ArrayList<Message>(res.cols)),
loc ? rows : null);
@@ -741,12 +781,13 @@ public class GridMapQueryExecutor {
/**
* @param node Node.
* @param reqId Request ID.
+ * @param segmentId Index segment ID.
*/
- private void sendRetry(ClusterNode node, long reqId) {
+ private void sendRetry(ClusterNode node, long reqId, int segmentId) {
try {
boolean loc = node.isLocal();
- GridQueryNextPageResponse msg = new GridQueryNextPageResponse(reqId,
+ GridQueryNextPageResponse msg = new GridQueryNextPageResponse(reqId, segmentId,
/*qry*/0, /*page*/0, /*allRows*/0, /*cols*/1,
loc ? null : Collections.<Message>emptyList(),
loc ? Collections.<Value[]>emptyList() : null);
@@ -780,35 +821,118 @@ public class GridMapQueryExecutor {
*/
private static class NodeResults {
/** */
- private final ConcurrentMap<Long, QueryResults> res = new ConcurrentHashMap8<>();
+ private final ConcurrentMap<RequestKey, QueryResults> res = new ConcurrentHashMap8<>();
/** */
private final GridBoundedConcurrentLinkedHashMap<Long, Boolean> qryHist =
new GridBoundedConcurrentLinkedHashMap<>(1024, 1024, 0.75f, 64, PER_SEGMENT_Q);
/**
- * @return All results.
+ * @param reqId Query Request ID.
+ * @return {@code False} if query was already cancelled.
*/
- ConcurrentMap<Long, QueryResults> results() {
- return res;
+ boolean cancelled(long reqId) {
+ return qryHist.get(reqId) != null;
}
/**
- * @param qryId Query ID.
- * @return {@code False} if query was already cancelled.
+ * @param reqId Query Request ID.
+ * @return {@code True} if cancelled.
*/
- boolean cancelled(long qryId) {
- return qryHist.get(qryId) != null;
+ boolean onCancel(long reqId) {
+ Boolean old = qryHist.putIfAbsent(reqId, Boolean.FALSE);
+
+ return old == null;
}
/**
- * @param qryId Query ID.
- * @return {@code True} if cancelled.
+ * @param reqId Query Request ID.
+ * @param segmentId Index segment ID.
+ * @return query partial results.
*/
- boolean onCancel(long qryId) {
- Boolean old = qryHist.putIfAbsent(qryId, Boolean.FALSE);
+ public QueryResults get(long reqId, int segmentId) {
+ return res.get(new RequestKey(reqId, segmentId));
+ }
- return old == null;
+ /**
+ * Cancel all thread of given request.
+ * @param reqID Request ID.
+ */
+ public void cancelRequest(long reqID) {
+ for (RequestKey key : res.keySet()) {
+ if (key.reqId == reqID) {
+ QueryResults removed = res.remove(key);
+
+ if (removed != null)
+ removed.cancel(true);
+ }
+
+ }
+ }
+
+ /**
+ * @param reqId Query Request ID.
+ * @param segmentId Index segment ID.
+ * @param qr Query Results.
+ * @return {@code True} if removed.
+ */
+ public boolean remove(long reqId, int segmentId, QueryResults qr) {
+ return res.remove(new RequestKey(reqId, segmentId), qr);
+ }
+
+ /**
+ * @param reqId Query Request ID.
+ * @param segmentId Index segment ID.
+ * @param qr Query Results.
+ * @return previous value.
+ */
+ public QueryResults put(long reqId, int segmentId, QueryResults qr) {
+ return res.put(new RequestKey(reqId, segmentId), qr);
+ }
+
+ /**
+ * Cancel all node queries.
+ */
+ public void cancelAll() {
+ for (QueryResults ress : res.values())
+ ress.cancel(true);
+ }
+
+ /**
+ *
+ */
+ private static class RequestKey {
+ /** */
+ private long reqId;
+
+ /** */
+ private int segmentId;
+
+ /** Constructor */
+ RequestKey(long reqId, int segmentId) {
+ this.reqId = reqId;
+ this.segmentId = segmentId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ RequestKey other = (RequestKey)o;
+
+ return reqId == other.reqId && segmentId == other.segmentId;
+
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int result = (int)(reqId ^ (reqId >>> 32));
+ result = 31 * result + segmentId;
+ return result;
+ }
}
}
@@ -836,7 +960,8 @@ public class GridMapQueryExecutor {
* @param qrys Number of queries.
* @param cctx Cache context.
*/
- private QueryResults(long qryReqId, int qrys, GridCacheContext<?,?> cctx) {
+ @SuppressWarnings("unchecked")
+ private QueryResults(long qryReqId, int qrys, GridCacheContext<?, ?> cctx) {
this.qryReqId = qryReqId;
this.cctx = cctx;
http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
index c267f4a..45d3c58 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
@@ -59,7 +59,7 @@ public abstract class GridMergeIndex extends BaseIndex {
private final AtomicInteger expRowsCnt = new AtomicInteger(0);
/** Remaining rows per source node ID. */
- private Map<UUID, Counter> remainingRows;
+ private Map<UUID, Counter[]> remainingRows;
/** */
private final AtomicBoolean lastSubmitted = new AtomicBoolean();
@@ -141,15 +141,22 @@ public abstract class GridMergeIndex extends BaseIndex {
* Set source nodes.
*
* @param nodes Nodes.
+ * @param segmentsCnt Index segments per table.
*/
- public void setSources(Collection<ClusterNode> nodes) {
+ public void setSources(Collection<ClusterNode> nodes, int segmentsCnt) {
assert remainingRows == null;
remainingRows = U.newHashMap(nodes.size());
for (ClusterNode node : nodes) {
- if (remainingRows.put(node.id(), new Counter()) != null)
+ Counter[] counters = new Counter[segmentsCnt];
+
+ for (int i = 0; i < segmentsCnt; i++)
+ counters[i] = new Counter();
+
+ if (remainingRows.put(node.id(), counters) != null)
throw new IllegalStateException("Duplicate node id: " + node.id());
+
}
}
@@ -194,7 +201,7 @@ public abstract class GridMergeIndex extends BaseIndex {
public final void addPage(GridResultPage page) {
int pageRowsCnt = page.rowsInPage();
- Counter cnt = remainingRows.get(page.source());
+ Counter cnt = remainingRows.get(page.source())[page.res.segmentId()];
// RemainingRowsCount should be updated before page adding to avoid race
// in GridMergeIndexUnsorted cursor iterator
@@ -231,13 +238,14 @@ public abstract class GridMergeIndex extends BaseIndex {
// Guarantee that finished state possible only if counter is zero and all pages was added
cnt.state = State.FINISHED;
- for (Counter c : remainingRows.values()) { // Check all the sources.
- if (c.state != State.FINISHED)
- return;
+ for (Counter[] cntrs : remainingRows.values()) { // Check all the sources.
+ for(int i = 0; i < cntrs.length; i++) {
+ if (cntrs[i].state != State.FINISHED)
+ return;
+ }
}
if (lastSubmitted.compareAndSet(false, true)) {
- // Add page-marker that last page was added
addPage0(new GridResultPage(null, page.source(), null) {
@Override public boolean isLast() {
return true;
@@ -256,7 +264,20 @@ public abstract class GridMergeIndex extends BaseIndex {
* @param page Page.
*/
protected void fetchNextPage(GridResultPage page) {
- if (remainingRows.get(page.source()).get() != 0)
+ assert !page.isLast();
+
+ if(page.isFail())
+ page.fetchNextPage(); //rethrow exceptions
+
+ assert page.res != null;
+
+ Counter[] counters = remainingRows.get(page.source());
+
+ int segId = page.res.segmentId();
+
+ Counter counter = counters[segId];
+
+ if (counter.get() != 0)
page.fetchNextPage();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index f54fab6..8837046 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -62,9 +62,9 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartit
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
-import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator;
import org.apache.ignite.internal.processors.query.GridQueryCancel;
+import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext;
import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQuerySplitter;
@@ -100,6 +100,7 @@ import org.jsr166.ConcurrentHashMap8;
import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE;
import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL_FIELDS;
+import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF;
import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.REDUCE;
/**
@@ -294,6 +295,7 @@ public class GridReduceQueryExecutor {
private void onNextPage(final ClusterNode node, GridQueryNextPageResponse msg) {
final long qryReqId = msg.queryRequestId();
final int qry = msg.query();
+ final int seg = msg.segmentId();
final QueryRun r = runs.get(qryReqId);
@@ -326,7 +328,7 @@ public class GridReduceQueryExecutor {
}
try {
- GridQueryNextPageRequest msg0 = new GridQueryNextPageRequest(qryReqId, qry, pageSize);
+ GridQueryNextPageRequest msg0 = new GridQueryNextPageRequest(qryReqId, qry, seg, pageSize);
if (node.isLocal())
h2.mapQueryExecutor().onMessage(ctx.localNodeId(), msg0);
@@ -514,29 +516,33 @@ public class GridReduceQueryExecutor {
// Explicit partition mapping for unstable topology.
Map<ClusterNode, IntArray> partsMap = null;
- if (isPreloadingActive(cctx, extraSpaces)) {
- if (cctx.isReplicated())
- nodes = replicatedUnstableDataNodes(cctx, extraSpaces);
- else {
- partsMap = partitionedUnstableDataNodes(cctx, extraSpaces);
+ if (qry.isLocal())
+ nodes = Collections.singleton(ctx.discovery().localNode());
+ else {
+ if (isPreloadingActive(cctx, extraSpaces)) {
+ if (cctx.isReplicated())
+ nodes = replicatedUnstableDataNodes(cctx, extraSpaces);
+ else {
+ partsMap = partitionedUnstableDataNodes(cctx, extraSpaces);
- nodes = partsMap == null ? null : partsMap.keySet();
+ nodes = partsMap == null ? null : partsMap.keySet();
+ }
}
- }
- else
- nodes = stableDataNodes(topVer, cctx, extraSpaces);
+ else
+ nodes = stableDataNodes(topVer, cctx, extraSpaces);
- if (nodes == null)
- continue; // Retry.
+ if (nodes == null)
+ continue; // Retry.
- assert !nodes.isEmpty();
+ assert !nodes.isEmpty();
- if (cctx.isReplicated() || qry.explain()) {
- assert qry.explain() || !nodes.contains(ctx.discovery().localNode()) :
- "We must be on a client node.";
+ if (cctx.isReplicated() || qry.explain()) {
+ assert qry.explain() || !nodes.contains(ctx.discovery().localNode()) :
+ "We must be on a client node.";
- // Select random data node to run query on a replicated data or get EXPLAIN PLAN from a single node.
- nodes = Collections.singleton(F.rand(nodes));
+ // Select random data node to run query on a replicated data or get EXPLAIN PLAN from a single node.
+ nodes = Collections.singleton(F.rand(nodes));
+ }
}
final Collection<ClusterNode> finalNodes = nodes;
@@ -545,6 +551,8 @@ public class GridReduceQueryExecutor {
final boolean skipMergeTbl = !qry.explain() && qry.skipMergeTable();
+ final int segmentsPerIndex = cctx.config().getQueryParallelism();
+
for (GridCacheSqlQuery mapQry : qry.mapQueries()) {
GridMergeIndex idx;
@@ -565,12 +573,12 @@ public class GridReduceQueryExecutor {
else
idx = GridMergeIndexUnsorted.createDummy(ctx);
- idx.setSources(nodes);
+ idx.setSources(nodes, segmentsPerIndex);
r.idxs.add(idx);
}
- r.latch = new CountDownLatch(r.idxs.size() * nodes.size());
+ r.latch = new CountDownLatch(r.idxs.size() * nodes.size() * segmentsPerIndex);
runs.put(qryReqId, r);
@@ -626,14 +634,13 @@ public class GridReduceQueryExecutor {
.tables(distributedJoins ? qry.tables() : null)
.partitions(convert(partsMap))
.queries(mapQrys)
- .flags(distributedJoins ? GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS : 0)
+ .flags((qry.isLocal() ? GridH2QueryRequest.FLAG_IS_LOCAL : 0) |
+ (distributedJoins ? GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS : 0))
.timeout(timeoutMillis),
oldStyle && partsMap != null ? new ExplicitPartitionsSpecializer(partsMap) : null,
- distributedJoins)
- ) {
- awaitAllReplies(r, nodes);
+ false)) {
- cancel.checkCancelled();
+ awaitAllReplies(r, nodes, cancel);
Object state = r.state.get();
@@ -696,7 +703,7 @@ public class GridReduceQueryExecutor {
h2.setupConnection(r.conn, false, enforceJoinOrder);
GridH2QueryContext.set(new GridH2QueryContext(locNodeId, locNodeId, qryReqId, REDUCE)
- .pageSize(r.pageSize).distributedJoins(false));
+ .pageSize(r.pageSize).distributedJoinMode(OFF));
try {
if (qry.explain())
@@ -817,11 +824,15 @@ public class GridReduceQueryExecutor {
/**
* @param r Query run.
* @param nodes Nodes to check periodically if they alive.
+ * @param cancel Query cancel.
* @throws IgniteInterruptedCheckedException If interrupted.
*/
- private void awaitAllReplies(QueryRun r, Collection<ClusterNode> nodes)
- throws IgniteInterruptedCheckedException {
+ private void awaitAllReplies(QueryRun r, Collection<ClusterNode> nodes, GridQueryCancel cancel)
+ throws IgniteInterruptedCheckedException, QueryCancelledException {
while (!U.await(r.latch, 500, TimeUnit.MILLISECONDS)) {
+
+ cancel.checkCancelled();
+
for (ClusterNode node : nodes) {
if (!ctx.discovery().alive(node)) {
handleNodeLeft(r, node.id());
http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2IndexRangeRequest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2IndexRangeRequest.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2IndexRangeRequest.java
index e49c48f..b2548cc 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2IndexRangeRequest.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2IndexRangeRequest.java
@@ -38,6 +38,12 @@ public class GridH2IndexRangeRequest implements Message {
private long qryId;
/** */
+ private int originSegmentId;
+
+ /** */
+ private int segmentId;
+
+ /** */
private int batchLookupId;
/** */
@@ -87,6 +93,34 @@ public class GridH2IndexRangeRequest implements Message {
}
/**
+ * @param segmentId Index segment ID.
+ */
+ public void segment(int segmentId) {
+ this.segmentId = segmentId;
+ }
+
+ /**
+ * @return Index segment ID.
+ */
+ public int segment() {
+ return segmentId;
+ }
+
+ /**
+ * @return Origin index segment ID.
+ */
+ public int originSegmentId() {
+ return originSegmentId;
+ }
+
+ /**
+ * @param segmentId Origin index segment ID.
+ */
+ public void originSegmentId(int segmentId) {
+ this.originSegmentId = segmentId;
+ }
+
+ /**
* @param batchLookupId Batch lookup ID.
*/
public void batchLookupId(int batchLookupId) {
@@ -136,6 +170,15 @@ public class GridH2IndexRangeRequest implements Message {
writer.incrementState();
+ case 4:
+ if (!writer.writeInt("segmentId", segmentId))
+ return false;
+
+ case 5:
+ if (!writer.writeInt("originSegId", originSegmentId))
+ return false;
+
+ writer.incrementState();
}
return true;
@@ -181,6 +224,21 @@ public class GridH2IndexRangeRequest implements Message {
reader.incrementState();
+ case 4:
+ segmentId = reader.readInt("segmentId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 5:
+ originSegmentId = reader.readInt("originSegId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
}
return reader.afterMessageRead(GridH2IndexRangeRequest.class);
@@ -193,7 +251,7 @@ public class GridH2IndexRangeRequest implements Message {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 4;
+ return 6;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2IndexRangeResponse.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2IndexRangeResponse.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2IndexRangeResponse.java
index c6414bd..4d3db12 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2IndexRangeResponse.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2IndexRangeResponse.java
@@ -47,6 +47,12 @@ public class GridH2IndexRangeResponse implements Message {
private long qryId;
/** */
+ private int segmentId;
+
+ /** */
+ private int originSegmentId;
+
+ /** */
private int batchLookupId;
/** */
@@ -130,6 +136,34 @@ public class GridH2IndexRangeResponse implements Message {
}
/**
+ * @param segmentId Index segment ID.
+ */
+ public void segment(int segmentId) {
+ this.segmentId = segmentId;
+ }
+
+ /**
+ * @return Index segment ID.
+ */
+ public int segment() {
+ return segmentId;
+ }
+
+ /**
+ * @return Origin index segment ID.
+ */
+ public int originSegmentId() {
+ return originSegmentId;
+ }
+
+ /**
+ * @param segmentId Origin index segment ID.
+ */
+ public void originSegmentId(int segmentId) {
+ this.originSegmentId = segmentId;
+ }
+
+ /**
* @param batchLookupId Batch lookup ID.
*/
public void batchLookupId(int batchLookupId) {
@@ -191,6 +225,17 @@ public class GridH2IndexRangeResponse implements Message {
writer.incrementState();
+ case 6:
+ if (!writer.writeInt("originSegId", originSegmentId))
+ return false;
+
+ writer.incrementState();
+
+ case 7:
+ if (!writer.writeInt("segmentId", segmentId))
+ return false;
+
+ writer.incrementState();
}
return true;
@@ -252,6 +297,21 @@ public class GridH2IndexRangeResponse implements Message {
reader.incrementState();
+ case 6:
+ originSegmentId = reader.readInt("originSegId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 7:
+ segmentId = reader.readInt("segmentId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
}
return reader.afterMessageRead(GridH2IndexRangeResponse.class);
@@ -264,7 +324,7 @@ public class GridH2IndexRangeResponse implements Message {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 6;
+ return 8;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
index 884173f..ec49aff 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
@@ -50,6 +50,11 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
*/
public static int FLAG_DISTRIBUTED_JOINS = 1;
+ /**
+ * Restrict distributed joins range-requests to local index segments. Range requests to other nodes will not be sent.
+ */
+ public static int FLAG_IS_LOCAL = 2;
+
/** */
private long reqId;
http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSegmentedIndexSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSegmentedIndexSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSegmentedIndexSelfTest.java
new file mode 100644
index 0000000..f8c9dd5
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSegmentedIndexSelfTest.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query;
+
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import javax.cache.Cache;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheKeyConfiguration;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Tests for correct distributed queries with index consisted of many segments.
+ */
+public class IgniteSqlSegmentedIndexSelfTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static int QRY_PARALLELISM_LVL = 97;
+
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ CacheKeyConfiguration keyCfg = new CacheKeyConfiguration("MyCache", "affKey");
+
+ cfg.setCacheKeyConfiguration(keyCfg);
+
+ cfg.setPeerClassLoadingEnabled(false);
+
+ TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+ disco.setIpFinder(ipFinder);
+
+ cfg.setDiscoverySpi(disco);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids(true);
+ }
+
+ /**
+ * @param name Cache name.
+ * @param partitioned Partition or replicated cache.
+ * @param idxTypes Indexed types.
+ * @return Cache configuration.
+ */
+ private static <K, V> CacheConfiguration<K, V> cacheConfig(String name, boolean partitioned, Class<?>... idxTypes) {
+ return new CacheConfiguration<K, V>()
+ .setName(name)
+ .setCacheMode(partitioned ? CacheMode.PARTITIONED : CacheMode.REPLICATED)
+ .setQueryParallelism(partitioned ? QRY_PARALLELISM_LVL : 1)
+ .setAtomicityMode(CacheAtomicityMode.ATOMIC)
+ .setIndexedTypes(idxTypes);
+ }
+
+ /**
+ * Run tests on single-node grid
+ * @throws Exception If failed.
+ */
+ public void testSingleNodeIndexSegmentation() throws Exception {
+ startGridsMultiThreaded(1, true);
+
+ ignite(0).createCache(cacheConfig("pers", true, Integer.class, Person.class));
+ ignite(0).createCache(cacheConfig("org", true, Integer.class, Organization.class));
+
+ fillCache();
+
+ checkDistributedQueryWithSegmentedIndex();
+
+ checkLocalQueryWithSegmentedIndex();
+ }
+
+ /**
+ * Run tests on multi-node grid
+ * @throws Exception If failed.
+ */
+ public void testMultiNodeIndexSegmentation() throws Exception {
+ startGridsMultiThreaded(4, true);
+
+ ignite(0).createCache(cacheConfig("pers", true, Integer.class, Person.class));
+ ignite(0).createCache(cacheConfig("org", true, Integer.class, Organization.class));
+
+ fillCache();
+
+ checkDistributedQueryWithSegmentedIndex();
+
+ checkLocalQueryWithSegmentedIndex();
+ }
+
+ /**
+ * Run tests on multi-node grid
+ * @throws Exception If failed.
+ */
+ public void testMultiNodeSegmentedPartitionedWithReplicated() throws Exception {
+ startGridsMultiThreaded(4, true);
+
+ ignite(0).createCache(cacheConfig("pers", true, Integer.class, Person.class));
+ ignite(0).createCache(cacheConfig("org", false, Integer.class, Organization.class));
+
+ fillCache();
+
+ checkDistributedQueryWithSegmentedIndex();
+
+ checkLocalQueryWithSegmentedIndex();
+ }
+
+ /**
+ * Check distributed joins.
+ * @throws Exception If failed.
+ */
+ public void checkDistributedQueryWithSegmentedIndex() throws Exception {
+ IgniteCache<Integer, Person> c1 = ignite(0).cache("pers");
+
+ int expectedPersons = 0;
+
+ for (Cache.Entry<Integer, Person> e : c1) {
+ final Integer orgId = e.getValue().orgId;
+
+ if (10 <= orgId && orgId < 500)
+ expectedPersons++;
+ }
+
+ String select0 = "select o.name n1, p.name n2 from \"pers\".Person p, \"org\".Organization o where p.orgId = o._key";
+
+ List<List<?>> result = c1.query(new SqlFieldsQuery(select0).setDistributedJoins(true)).getAll();
+
+ assertEquals(expectedPersons, result.size());
+ }
+
+ /**
+ * Test local query.
+ * @throws Exception If failed.
+ */
+ public void checkLocalQueryWithSegmentedIndex() throws Exception {
+ IgniteCache<Integer, Person> c1 = ignite(0).cache("pers");
+ IgniteCache<Integer, Organization> c2 = ignite(0).cache("org");
+
+ Set<Integer> localOrgIds = new HashSet<>();
+
+ for(Cache.Entry<Integer, Organization> e : c2.localEntries())
+ localOrgIds.add(e.getKey());
+
+ int expectedPersons = 0;
+
+ for (Cache.Entry<Integer, Person> e : c1.localEntries()) {
+ final Integer orgId = e.getValue().orgId;
+
+ if (localOrgIds.contains(orgId))
+ expectedPersons++;
+ }
+
+ String select0 = "select o.name n1, p.name n2 from \"pers\".Person p, \"org\".Organization o where p.orgId = o._key";
+
+ List<List<?>> result = c1.query(new SqlFieldsQuery(select0).setLocal(true)).getAll();
+
+ assertEquals(expectedPersons, result.size());
+ }
+
+ /** */
+ private void fillCache() {
+ IgniteCache<Object, Object> c1 = ignite(0).cache("pers");
+
+ IgniteCache<Object, Object> c2 = ignite(0).cache("org");
+
+ final int orgCount = 500;
+
+ for (int i = 0; i < orgCount; i++)
+ c2.put(i, new Organization("org-" + i));
+
+ final Random random = new Random();
+
+ for (int i = 0; i < 1000; i++) {
+ int orgID = 10 + random.nextInt(orgCount + 10);
+
+ c1.put(i, new Person(orgID, "pers-" + i));
+ }
+ }
+
+ /**
+ *
+ */
+ private static class Person implements Serializable {
+ /** */
+ @QuerySqlField(index = true)
+ Integer orgId;
+
+ /** */
+ @QuerySqlField
+ String name;
+
+ /**
+ *
+ */
+ public Person() {
+ // No-op.
+ }
+
+ /**
+ * @param orgId Organization ID.
+ * @param name Name.
+ */
+ public Person(int orgId, String name) {
+ this.orgId = orgId;
+ this.name = name;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class Organization implements Serializable {
+ /** */
+ @QuerySqlField
+ String name;
+
+ /**
+ *
+ */
+ public Organization() {
+ // No-op.
+ }
+
+ /**
+ * @param name Organization name.
+ */
+ public Organization(String name) {
+ this.name = name;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
index 06afe7c..4ae2f91 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
@@ -33,9 +33,10 @@ import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheKeyConfiguration;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CachePeekMode;
-import org.apache.ignite.cache.affinity.AffinityKeyMapped;
import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.affinity.AffinityKeyMapped;
+import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
import org.apache.ignite.cluster.ClusterNode;
@@ -701,6 +702,7 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest {
ignite(0).destroyCache(cache.getName());
}
}
+
/**
* @throws Exception If failed.
*/
@@ -750,6 +752,127 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest {
}
/**
+ * @throws Exception If failed.
+ */
+ public void testIndexSegmentation() throws Exception {
+ CacheConfiguration ccfg1 = cacheConfig("pers", true,
+ Integer.class, Person2.class).setQueryParallelism(4);
+ CacheConfiguration ccfg2 = cacheConfig("org", true,
+ Integer.class, Organization.class).setQueryParallelism(4);
+
+ IgniteCache<Object, Object> c1 = ignite(0).getOrCreateCache(ccfg1);
+ IgniteCache<Object, Object> c2 = ignite(0).getOrCreateCache(ccfg2);
+
+ try {
+ c2.put(1, new Organization("o1"));
+ c2.put(2, new Organization("o2"));
+ c1.put(3, new Person2(1, "p1"));
+ c1.put(4, new Person2(2, "p2"));
+ c1.put(5, new Person2(3, "p3"));
+
+ String select0 = "select o.name n1, p.name n2 from \"pers\".Person2 p, \"org\".Organization o where p.orgId = o._key and o._key=1";
+
+ checkQueryPlan(c1, true, 1, new SqlFieldsQuery(select0));
+
+ checkQueryPlan(c1, true, 1, new SqlFieldsQuery(select0).setLocal(true));
+ }
+ finally {
+ c1.destroy();
+ c2.destroy();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReplicationCacheIndexSegmentationFailure() throws Exception {
+ GridTestUtils.assertThrows(log, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ CacheConfiguration ccfg = cacheConfig("org", false,
+ Integer.class, Organization.class).setQueryParallelism(4);
+
+ IgniteCache<Object, Object> c = ignite(0).createCache(ccfg);
+
+ return null;
+ }
+ }, CacheException.class, "Cache index segmentation is supported for PARTITIONED mode only.");
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testIndexSegmentationPartitionedReplicated() throws Exception {
+ CacheConfiguration ccfg1 = cacheConfig("pers", true,
+ Integer.class, Person2.class).setQueryParallelism(4);
+ CacheConfiguration ccfg2 = cacheConfig("org", false,
+ Integer.class, Organization.class);
+
+ final IgniteCache<Object, Object> c1 = ignite(0).getOrCreateCache(ccfg1);
+ final IgniteCache<Object, Object> c2 = ignite(0).getOrCreateCache(ccfg2);
+
+ try {
+ c2.put(1, new Organization("o1"));
+ c2.put(2, new Organization("o2"));
+ c1.put(3, new Person2(1, "p1"));
+ c1.put(4, new Person2(2, "p2"));
+ c1.put(5, new Person2(3, "p3"));
+
+ String select0 = "select o.name n1, p.name n2 from \"pers\".Person2 p, \"org\".Organization o where p.orgId = o._key";
+
+ final SqlFieldsQuery qry = new SqlFieldsQuery(select0);
+
+ qry.setDistributedJoins(true);
+
+ List<List<?>> results = c1.query(qry).getAll();
+
+ assertEquals(2, results.size());
+ }
+ finally {
+ c1.destroy();
+ c2.destroy();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testIndexWithDifferentSegmentationLevelsFailure() throws Exception {
+ CacheConfiguration ccfg1 = cacheConfig("pers", true,
+ Integer.class, Person2.class).setQueryParallelism(4);
+ CacheConfiguration ccfg2 = cacheConfig("org", true,
+ Integer.class, Organization.class).setQueryParallelism(3);
+
+ final IgniteCache<Object, Object> c1 = ignite(0).getOrCreateCache(ccfg1);
+ final IgniteCache<Object, Object> c2 = ignite(0).getOrCreateCache(ccfg2);
+
+ try {
+ c2.put(1, new Organization("o1"));
+ c2.put(2, new Organization("o2"));
+ c1.put(3, new Person2(1, "p1"));
+ c1.put(4, new Person2(2, "p2"));
+ c1.put(5, new Person2(3, "p3"));
+
+ String select0 = "select o.name n1, p.name n2 from \"pers\".Person2 p, \"org\".Organization o where p.orgId = o._key and o._key=1";
+
+ final SqlFieldsQuery qry = new SqlFieldsQuery(select0);
+
+ qry.setDistributedJoins(true);
+
+ GridTestUtils.assertThrows(log, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ c1.query(qry);
+
+ return null;
+ }
+ }, CacheException.class, "Using indexes with different parallelism levels in same query is forbidden.");
+ }
+ finally {
+ c1.destroy();
+ c2.destroy();
+ }
+ }
+
+ /**
* @param cache Cache.
* @param sql SQL.
* @param enforceJoinOrder Enforce join order flag.
@@ -789,26 +912,26 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest {
false,
0,
select +
- "from " + cache1 + "," + cache2 + " "+ where);
+ "from " + cache1 + "," + cache2 + " " + where);
checkQueryPlan(cache,
false,
0,
select +
- "from " + cache2 + "," + cache1 + " "+ where);
+ "from " + cache2 + "," + cache1 + " " + where);
if (testEnforceJoinOrder) {
checkQueryPlan(cache,
true,
0,
select +
- "from " + cache1 + "," + cache2 + " "+ where);
+ "from " + cache1 + "," + cache2 + " " + where);
checkQueryPlan(cache,
true,
0,
select +
- "from " + cache2 + "," + cache1 + " "+ where);
+ "from " + cache2 + "," + cache1 + " " + where);
}
}
@@ -823,7 +946,7 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest {
boolean enforceJoinOrder,
int expBatchedJoins,
String sql,
- String...expText) {
+ String... expText) {
checkQueryPlan(cache,
enforceJoinOrder,
expBatchedJoins,
@@ -848,7 +971,7 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest {
boolean enforceJoinOrder,
int expBatchedJoins,
SqlFieldsQuery qry,
- String...expText) {
+ String... expText) {
qry.setEnforceJoinOrder(enforceJoinOrder);
qry.setDistributedJoins(true);
@@ -984,7 +1107,7 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest {
* @param args Arguments.
* @return Column as list.
*/
- private static <X> List<X> columnQuery(IgniteCache<?,?> c, String qry, Object... args) {
+ private static <X> List<X> columnQuery(IgniteCache<?, ?> c, String qry, Object... args) {
return column(0, c.query(new SqlFieldsQuery(qry).setArgs(args)).getAll());
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java
index 52084c7..d6a5fb1 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java
@@ -230,16 +230,16 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract
assertEquals(0, spi.size(typeAB.space(), typeAB));
assertEquals(0, spi.size(typeBA.space(), typeBA));
- assertFalse(spi.queryLocalSql(typeAA.space(), "select * from A.A", null, Collections.emptySet(), typeAA, null).hasNext());
- assertFalse(spi.queryLocalSql(typeAB.space(), "select * from A.B", null, Collections.emptySet(), typeAB, null).hasNext());
- assertFalse(spi.queryLocalSql(typeBA.space(), "select * from B.A", null, Collections.emptySet(), typeBA, null).hasNext());
+ assertFalse(spi.queryLocalSql(typeAA.space(), "select * from A.A", null, Collections.emptySet(), typeAA.name(), null, null).hasNext());
+ assertFalse(spi.queryLocalSql(typeAB.space(), "select * from A.B", null, Collections.emptySet(), typeAB.name(), null, null).hasNext());
+ assertFalse(spi.queryLocalSql(typeBA.space(), "select * from B.A", null, Collections.emptySet(), typeBA.name(), null, null).hasNext());
assertFalse(spi.queryLocalSql(typeBA.space(), "select * from B.A, A.B, A.A", null,
- Collections.emptySet(), typeBA, null).hasNext());
+ Collections.emptySet(), typeBA.name(), null, null).hasNext());
try {
spi.queryLocalSql(typeBA.space(), "select aa.*, ab.*, ba.* from A.A aa, A.B ab, B.A ba", null,
- Collections.emptySet(), typeBA, null).hasNext();
+ Collections.emptySet(), typeBA.name(), null, null).hasNext();
fail("Enumerations of aliases in select block must be prohibited");
}
@@ -248,10 +248,10 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract
}
assertFalse(spi.queryLocalSql(typeAB.space(), "select ab.* from A.B ab", null,
- Collections.emptySet(), typeAB, null).hasNext());
+ Collections.emptySet(), typeAB.name(), null, null).hasNext());
assertFalse(spi.queryLocalSql(typeBA.space(), "select ba.* from B.A as ba", null,
- Collections.emptySet(), typeBA, null).hasNext());
+ Collections.emptySet(), typeBA.name(), null, null).hasNext());
// Nothing to remove.
spi.remove("A", key(1), aa(1, "", 10));
@@ -305,7 +305,7 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract
// Query data.
Iterator<IgniteBiTuple<Integer, Map<String, Object>>> res =
- spi.queryLocalSql(typeAA.space(), "from a order by age", null, Collections.emptySet(), typeAA, null);
+ spi.queryLocalSql(typeAA.space(), "from a order by age", null, Collections.emptySet(), typeAA.name(), null, null);
assertTrue(res.hasNext());
assertEquals(aa(3, "Borya", 18).value(null, false), value(res.next()));
@@ -314,7 +314,7 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract
assertFalse(res.hasNext());
res = spi.queryLocalSql(typeAA.space(), "select aa.* from a aa order by aa.age", null,
- Collections.emptySet(), typeAA, null);
+ Collections.emptySet(), typeAA.name(), null, null);
assertTrue(res.hasNext());
assertEquals(aa(3, "Borya", 18).value(null, false), value(res.next()));
@@ -322,7 +322,7 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract
assertEquals(aa(2, "Valera", 19).value(null, false), value(res.next()));
assertFalse(res.hasNext());
- res = spi.queryLocalSql(typeAB.space(), "from b order by name", null, Collections.emptySet(), typeAB, null);
+ res = spi.queryLocalSql(typeAB.space(), "from b order by name", null, Collections.emptySet(), typeAB.name(), null, null);
assertTrue(res.hasNext());
assertEquals(ab(1, "Vasya", 20, "Some text about Vasya goes here.").value(null, false), value(res.next()));
@@ -331,7 +331,7 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract
assertFalse(res.hasNext());
res = spi.queryLocalSql(typeAB.space(), "select bb.* from b as bb order by bb.name", null,
- Collections.emptySet(), typeAB, null);
+ Collections.emptySet(), typeAB.name(), null, null);
assertTrue(res.hasNext());
assertEquals(ab(1, "Vasya", 20, "Some text about Vasya goes here.").value(null, false), value(res.next()));
@@ -340,7 +340,7 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract
assertFalse(res.hasNext());
- res = spi.queryLocalSql(typeBA.space(), "from a", null, Collections.emptySet(), typeBA, null);
+ res = spi.queryLocalSql(typeBA.space(), "from a", null, Collections.emptySet(), typeBA.name(), null, null);
assertTrue(res.hasNext());
assertEquals(ba(2, "Kolya", 25, true).value(null, false), value(res.next()));
@@ -725,4 +725,4 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract
throw new UnsupportedOperationException();
}
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/test/java/org/apache/ignite/loadtests/h2indexing/FetchingQueryCursorStressTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/loadtests/h2indexing/FetchingQueryCursorStressTest.java b/modules/indexing/src/test/java/org/apache/ignite/loadtests/h2indexing/FetchingQueryCursorStressTest.java
new file mode 100644
index 0000000..d1d80f2
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/loadtests/h2indexing/FetchingQueryCursorStressTest.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.loadtests.h2indexing;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.cache.CacheException;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.marshaller.optimized.OptimizedMarshaller;
+
+/**
+ * SQL query stress test.
+ */
+public class FetchingQueryCursorStressTest {
+ /** Node count. */
+ private static final int NODE_CNT = 4; // Switch to 4 to see better throughput.
+
+ /** Number of entries. */
+ private static final int ENTRIES_CNT = 10_000;
+
+ /** Cache name. */
+ private static final String CACHE_NAME = "cache";
+
+ /** Thread count. */
+ private static final int THREAD_CNT = 16;
+
+ /** Execution counter. */
+ private static final AtomicLong CNT = new AtomicLong();
+
+ /** Verbose mode. */
+ private static final boolean VERBOSE = false;
+
+ /** */
+ private static final long TIMEOUT = TimeUnit.SECONDS.toMillis(30);
+
+ public static final AtomicReference<Exception> error = new AtomicReference<>();
+
+ /**
+ * Entry point.
+ */
+ public static void main(String[] args) throws Exception {
+ List<Thread> threads = new ArrayList<>(THREAD_CNT + 1);
+
+ try (Ignite ignite = start()) {
+
+ IgniteCache<Integer, Person> cache = ignite.cache(CACHE_NAME);
+
+ loadData(ignite, cache);
+
+ System.out.println("Loaded data: " + cache.size());
+
+ for (int i = 0; i < THREAD_CNT; i++)
+ threads.add(startDaemon("qry-exec-" + i, new QueryExecutor(cache, "Select * from Person")));
+
+ threads.add(startDaemon("printer", new ThroughputPrinter()));
+
+ Thread.sleep(TIMEOUT);
+
+ for (Thread t : threads)
+ t.join();
+
+ if(error.get()!=null)
+ throw error.get();
+ }
+ finally {
+ Ignition.stopAll(false);
+ }
+ }
+
+ /**
+ * Start daemon thread.
+ *
+ * @param name Name.
+ * @param r Runnable.
+ */
+ private static Thread startDaemon(String name, Runnable r) {
+ Thread t = new Thread(r);
+
+ t.setName(name);
+ t.setDaemon(true);
+
+ t.start();
+
+ return t;
+ }
+
+ /**
+ * Load data into Ignite.
+ *
+ * @param ignite Ignite.
+ * @param cache Cache.
+ */
+ private static void loadData(Ignite ignite, IgniteCache<Integer, Person> cache) throws Exception {
+ try (IgniteDataStreamer<Object, Object> str = ignite.dataStreamer(cache.getName())) {
+
+ for (int id = 0; id < ENTRIES_CNT; id++)
+ str.addData(id, new Person(id, "John" + id, "Doe"));
+ }
+ }
+
+ /**
+ * Start topology.
+ *
+ * @return Client node.
+ */
+ private static Ignite start() {
+ int i = 0;
+
+ for (; i < NODE_CNT; i++)
+ Ignition.start(config(i, false));
+
+ return Ignition.start(config(i, true));
+ }
+
+ /**
+ * Create configuration.
+ *
+ * @param idx Index.
+ * @param client Client flag.
+ * @return Configuration.
+ */
+ @SuppressWarnings("unchecked")
+ private static IgniteConfiguration config(int idx, boolean client) {
+ IgniteConfiguration cfg = new IgniteConfiguration();
+
+ cfg.setGridName("grid-" + idx);
+ cfg.setClientMode(client);
+
+ CacheConfiguration ccfg = new CacheConfiguration();
+
+ ccfg.setName(CACHE_NAME);
+ ccfg.setIndexedTypes(Integer.class, Person.class);
+ cfg.setMarshaller(new OptimizedMarshaller());
+
+ cfg.setCacheConfiguration(ccfg);
+
+ cfg.setLocalHost("127.0.0.1");
+
+ return cfg;
+ }
+
+ /**
+ *
+ */
+ private static class Person implements Serializable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ @QuerySqlField
+ private int id;
+
+ /** */
+ @QuerySqlField
+ private String firstName;
+
+ /** */
+ @QuerySqlField
+ private String lastName;
+
+ public Person(int id, String firstName, String lastName) {
+ this.id = id;
+ this.firstName = firstName;
+ this.lastName = lastName;
+ }
+ }
+
+ /**
+ * Query runner.
+ */
+ private static class QueryExecutor implements Runnable {
+ /** Cache. */
+ private final IgniteCache<Integer, Person> cache;
+
+ /** */
+ private final String query;
+
+ /**
+ * Constructor.
+ *
+ * @param cache Cache.
+ */
+ public QueryExecutor(IgniteCache<Integer, Person> cache, String query) {
+ this.cache = cache;
+ this.query = query;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("InfiniteLoopStatement")
+ @Override public void run() {
+ System.out.println("Executor started: " + Thread.currentThread().getName());
+
+ try {
+ while (error.get()==null && !Thread.currentThread().isInterrupted()) {
+ long start = System.nanoTime();
+
+ SqlFieldsQuery qry = new SqlFieldsQuery(query);
+
+// qry.setArgs((Object[]) argumentForQuery());
+
+ Set<Integer> extIds = new HashSet<>();
+
+ for (List<?> next : cache.query(qry))
+ extIds.add((Integer)next.get(0));
+
+ long dur = (System.nanoTime() - start) / 1_000_000;
+
+ CNT.incrementAndGet();
+
+ if (VERBOSE)
+ System.out.println("[extIds=" + extIds.size() + ", dur=" + dur + ']');
+ }
+ }
+ catch (CacheException ex){
+ error.compareAndSet(null, ex);
+ }
+ }
+ }
+
+ /**
+ * Throughput printer.
+ */
+ private static class ThroughputPrinter implements Runnable {
+ /** {@inheritDoc} */
+ @SuppressWarnings("InfiniteLoopStatement")
+ @Override public void run() {
+ while (error.get()==null) {
+ long before = CNT.get();
+ long beforeTime = System.currentTimeMillis();
+
+ try {
+ Thread.sleep(2000L);
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+
+ return;
+ }
+
+ long after = CNT.get();
+ long afterTime = System.currentTimeMillis();
+
+ double res = 1000 * ((double)(after - before)) / (afterTime - beforeTime);
+
+ System.out.println((long)res + " ops/sec");
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index 386b8fd..b417b0a 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -98,6 +98,7 @@ import org.apache.ignite.internal.processors.cache.query.IndexingSpiQuerySelfTes
import org.apache.ignite.internal.processors.cache.query.IndexingSpiQueryTxSelfTest;
import org.apache.ignite.internal.processors.query.IgniteSqlEntryCacheModeAgnosticTest;
import org.apache.ignite.internal.processors.query.IgniteSqlSchemaIndexingTest;
+import org.apache.ignite.internal.processors.query.IgniteSqlSegmentedIndexSelfTest;
import org.apache.ignite.internal.processors.query.IgniteSqlSplitterSelfTest;
import org.apache.ignite.internal.processors.query.h2.GridH2IndexRebuildTest;
import org.apache.ignite.internal.processors.query.h2.GridH2IndexingInMemSelfTest;
@@ -134,6 +135,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
// Queries tests.
suite.addTestSuite(IgniteSqlSplitterSelfTest.class);
+ suite.addTestSuite(IgniteSqlSegmentedIndexSelfTest.class);
suite.addTestSuite(IgniteSqlSchemaIndexingTest.class);
suite.addTestSuite(GridCacheQueryIndexDisabledSelfTest.class);
suite.addTestSuite(IgniteCacheQueryLoadSelfTest.class);
[02/12] ignite git commit: Implemented.
Posted by se...@apache.org.
Implemented.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/64ba13b0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/64ba13b0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/64ba13b0
Branch: refs/heads/ignite-1.9
Commit: 64ba13b0a3be6acbf7d629029b460a39c2e2b388
Parents: 4eac51c
Author: AMRepo <an...@gmail.com>
Authored: Mon Feb 20 21:24:29 2017 +0300
Committer: Andrey V. Mashenkov <an...@gmail.com>
Committed: Tue Feb 21 11:52:40 2017 +0300
----------------------------------------------------------------------
.../configuration/CacheConfiguration.java | 48 ++++
.../processors/cache/GridCacheProcessor.java | 3 +
.../processors/cache/IgniteCacheProxy.java | 6 +-
.../closure/GridClosureProcessor.java | 2 +-
.../processors/query/GridQueryIndexing.java | 27 +-
.../processors/query/GridQueryProcessor.java | 141 +++-------
.../messages/GridQueryNextPageRequest.java | 29 +-
.../messages/GridQueryNextPageResponse.java | 29 +-
.../cache/query/GridCacheTwoStepQuery.java | 17 ++
.../processors/query/h2/IgniteH2Indexing.java | 235 ++++++++++++++--
.../query/h2/opt/DistributedJoinMode.java | 51 ++++
.../query/h2/opt/GridH2IndexBase.java | 264 +++++++++++++-----
.../query/h2/opt/GridH2QueryContext.java | 84 ++++--
.../query/h2/opt/GridH2TreeIndex.java | 232 ++++++++++++----
.../query/h2/twostep/GridMapQueryExecutor.java | 227 +++++++++++----
.../query/h2/twostep/GridMergeIndex.java | 39 ++-
.../h2/twostep/GridReduceQueryExecutor.java | 69 +++--
.../h2/twostep/msg/GridH2IndexRangeRequest.java | 60 +++-
.../twostep/msg/GridH2IndexRangeResponse.java | 62 ++++-
.../h2/twostep/msg/GridH2QueryRequest.java | 5 +
.../query/IgniteSqlSegmentedIndexSelfTest.java | 263 ++++++++++++++++++
.../query/IgniteSqlSplitterSelfTest.java | 139 +++++++++-
.../h2/GridIndexingSpiAbstractSelfTest.java | 26 +-
.../FetchingQueryCursorStressTest.java | 277 +++++++++++++++++++
.../IgniteCacheQuerySelfTestSuite.java | 2 +
25 files changed, 1917 insertions(+), 420 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
index 0656dda..149f25a 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
@@ -223,6 +223,9 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
/** Default threshold for concurrent loading of keys from {@link CacheStore}. */
public static final int DFLT_CONCURRENT_LOAD_ALL_THRESHOLD = 5;
+ /** Default SQL query parallelism level */
+ public static final int DFLT_SQL_QUERY_PARALLELISM_LVL = 1;
+
/** Cache name. */
private String name;
@@ -410,6 +413,9 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
/** Query entities. */
private Collection<QueryEntity> qryEntities;
+ /** */
+ private int qryParallelism = DFLT_SQL_QUERY_PARALLELISM_LVL;
+
/** Empty constructor (all values are initialized to their defaults). */
public CacheConfiguration() {
/* No-op. */
@@ -462,6 +468,7 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
interceptor = cc.getInterceptor();
invalidate = cc.isInvalidate();
isReadThrough = cc.isReadThrough();
+ qryParallelism = cc.getQueryParallelism();
isWriteThrough = cc.isWriteThrough();
storeKeepBinary = cc.isStoreKeepBinary() != null ? cc.isStoreKeepBinary() : DFLT_STORE_KEEP_BINARY;
listenerConfigurations = cc.listenerConfigurations;
@@ -2108,6 +2115,47 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
}
/**
+ * Defines a hint to query execution engine on desired degree of parallelism within a single node.
+ * Query executor may or may not use this hint depending on estimated query costs. Query executor may define
+ * certain restrictions on parallelism depending on query type and/or cache type.
+ * <p>
+ * As of {@code Apache Ignite 1.9} this hint is only supported for SQL queries with the following restrictions:
+ * <ul>
+ * <li>Hint cannot be used for {@code REPLICATED} cache, exception is thrown otherwise</li>
+ * <li>All caches participating in query must have the same degree of parallelism, exception is thrown
+ * otherwise</li>
+ * </ul>
+ * These restrictions will be removed in future versions of Apache Ignite.
+ * <p>
+ * Defaults to {@code 1}.
+ */
+ public int getQueryParallelism() {
+ return qryParallelism;
+ }
+
+ /**
+ * Defines a hint to query execution engine on desired degree of parallelism within a single node.
+ * Query executor may or may not use this hint depending on estimated query costs. Query executor may define
+ * certain restrictions on parallelism depending on query type and/or cache type.
+ * <p>
+ * As of {@code Apache Ignite 1.9} this hint is only supported for SQL queries with the following restrictions:
+ * <ul>
+ * <li>Hint cannot be used for {@code REPLICATED} cache, exception is thrown otherwise</li>
+ * <li>All caches participating in query must have the same degree of parallelism, exception is thrown
+ * otherwise</li>
+ * </ul>
+ * These restrictions will be removed in future versions of Apache Ignite.
+ *
+ * @param qryParallelism Query parallelizm level.
+ * @return {@code this} for chaining.
+ */
+ public CacheConfiguration<K,V> setQueryParallelism(int qryParallelism) {
+ this.qryParallelism = qryParallelism;
+
+ return this;
+ }
+
+ /**
* Gets topology validator.
* <p>
* See {@link TopologyValidator} for details.
http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 7093403..c3e3f3b 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -269,6 +269,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
if (cfg.getCacheMode() == REPLICATED)
cfg.setBackups(Integer.MAX_VALUE);
+ if( cfg.getQueryParallelism() > 1 && cfg.getCacheMode() != PARTITIONED)
+ throw new IgniteCheckedException("Cache index segmentation is supported for PARTITIONED mode only.");
+
if (cfg.getAffinityMapper() == null)
cfg.setAffinityMapper(cacheObjCtx.defaultAffMapper());
http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 1381670..f806d05 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -729,12 +729,8 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
final SqlQuery p = (SqlQuery)qry;
if (isReplicatedDataNode() || ctx.isLocal() || qry.isLocal())
- return (QueryCursor<R>)new QueryCursorImpl<>(new Iterable<Cache.Entry<K, V>>() {
- @Override public Iterator<Cache.Entry<K, V>> iterator() {
- return ctx.kernalContext().query().queryLocal(ctx, p,
+ return (QueryCursor<R>)ctx.kernalContext().query().queryLocal(ctx, p,
opCtxCall != null && opCtxCall.isKeepBinary());
- }
- });
return (QueryCursor<R>)ctx.kernalContext().query().queryTwoStep(ctx, p);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
index 20fb6a0..61ed8a0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
@@ -902,7 +902,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
* @return Future.
* @throws IgniteCheckedException Thrown in case of any errors.
*/
- private <R> IgniteInternalFuture<R> callLocal(@Nullable final Callable<R> c, byte plc)
+ public <R> IgniteInternalFuture<R> callLocal(@Nullable final Callable<R> c, byte plc)
throws IgniteCheckedException {
if (c == null)
return new GridFinishedFuture<>();
http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
index ca04724..37f0ade 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
@@ -84,35 +84,26 @@ public interface GridQueryIndexing {
/**
* Queries individual fields (generally used by JDBC drivers).
*
- * @param spaceName Space name.
+ * @param cctx Cache context.
* @param qry Query.
- * @param params Query parameters.
* @param filter Space name and key filter.
- * @param enforceJoinOrder Enforce join order of tables in the query.
- * @param timeout Query timeout in milliseconds.
* @param cancel Query cancel.
- * @return Query result.
- * @throws IgniteCheckedException If failed.
+ * @return Cursor.
*/
- public GridQueryFieldsResult queryLocalSqlFields(@Nullable String spaceName, String qry,
- Collection<Object> params, IndexingQueryFilter filter, boolean enforceJoinOrder, int timeout,
- GridQueryCancel cancel) throws IgniteCheckedException;
+ public <K, V> QueryCursor<List<?>> queryLocalSqlFields(GridCacheContext<?, ?> cctx, SqlFieldsQuery qry,
+ IndexingQueryFilter filter, GridQueryCancel cancel) throws IgniteCheckedException;
/**
* Executes regular query.
*
- * @param spaceName Space name.
+ * @param cctx Cache context.
* @param qry Query.
- * @param alias Table alias used in Query.
- * @param params Query parameters.
- * @param type Query return type.
* @param filter Space name and key filter.
- * @return Queried rows.
- * @throws IgniteCheckedException If failed.
+ * @param keepBinary Keep binary flag.
+ * @return Cursor.
*/
- public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> queryLocalSql(@Nullable String spaceName, String qry,
- String alias, Collection<Object> params, GridQueryTypeDescriptor type, IndexingQueryFilter filter)
- throws IgniteCheckedException;
+ public <K, V> QueryCursor<Cache.Entry<K,V>> queryLocalSql(GridCacheContext<?, ?> cctx, SqlQuery qry,
+ IndexingQueryFilter filter, boolean keepBinary) throws IgniteCheckedException;
/**
* Executes text query.
http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index ee9224b..85744d5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -754,42 +754,6 @@ public class GridQueryProcessor extends GridProcessorAdapter {
INDEXING.module() + " to classpath or moving it from 'optional' to 'libs' folder).");
}
- /**
- * @param space Space.
- * @param clause Clause.
- * @param params Parameters collection.
- * @param resType Result type.
- * @param filters Filters.
- * @return Key/value rows.
- * @throws IgniteCheckedException If failed.
- */
- @SuppressWarnings("unchecked")
- public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> query(final String space, final String clause,
- final Collection<Object> params, final String resType, final IndexingQueryFilter filters)
- throws IgniteCheckedException {
- checkEnabled();
-
- if (!busyLock.enterBusy())
- throw new IllegalStateException("Failed to execute query (grid is stopping).");
-
- try {
- final GridCacheContext<?, ?> cctx = ctx.cache().internalCache(space).context();
-
- return executeQuery(GridCacheQueryType.SQL_FIELDS, clause, cctx, new IgniteOutClosureX<GridCloseableIterator<IgniteBiTuple<K, V>>>() {
- @Override public GridCloseableIterator<IgniteBiTuple<K, V>> applyx() throws IgniteCheckedException {
- TypeDescriptor type = typesByName.get(new TypeName(space, resType));
-
- if (type == null || !type.registered())
- throw new CacheException("Failed to find SQL table for type: " + resType);
-
- return idx.queryLocalSql(space, clause, null, params, type, filters);
- }
- }, false);
- }
- finally {
- busyLock.leaveBusy();
- }
- }
/**
* @param cctx Cache context.
@@ -829,11 +793,12 @@ public class GridQueryProcessor extends GridProcessorAdapter {
throw new IllegalStateException("Failed to execute query (grid is stopping).");
try {
- return executeQuery(GridCacheQueryType.SQL, qry.getSql(), cctx, new IgniteOutClosureX<QueryCursor<Cache.Entry<K, V>>>() {
- @Override public QueryCursor<Cache.Entry<K, V>> applyx() throws IgniteCheckedException {
- return idx.queryTwoStep(cctx, qry);
- }
- }, true);
+ return executeQuery(GridCacheQueryType.SQL, qry.getSql(), cctx,
+ new IgniteOutClosureX<QueryCursor<Cache.Entry<K, V>>>() {
+ @Override public QueryCursor<Cache.Entry<K, V>> applyx() throws IgniteCheckedException {
+ return idx.queryTwoStep(cctx, qry);
+ }
+ }, true);
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
@@ -849,7 +814,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
* @param keepBinary Keep binary flag.
* @return Cursor.
*/
- public <K, V> Iterator<Cache.Entry<K, V>> queryLocal(
+ public <K, V> QueryCursor<Cache.Entry<K, V>> queryLocal(
final GridCacheContext<?, ?> cctx,
final SqlQuery qry,
final boolean keepBinary
@@ -859,54 +824,25 @@ public class GridQueryProcessor extends GridProcessorAdapter {
try {
return executeQuery(GridCacheQueryType.SQL, qry.getSql(), cctx,
- new IgniteOutClosureX<Iterator<Cache.Entry<K, V>>>() {
- @Override public Iterator<Cache.Entry<K, V>> applyx() throws IgniteCheckedException {
- String space = cctx.name();
+ new IgniteOutClosureX<QueryCursor<Cache.Entry<K, V>>>() {
+ @Override public QueryCursor<Cache.Entry<K, V>> applyx() throws IgniteCheckedException {
String type = qry.getType();
- String sqlQry = qry.getSql();
- Object[] params = qry.getArgs();
- TypeDescriptor typeDesc = typesByName.get(
- new TypeName(
- space,
+ GridQueryProcessor.TypeDescriptor typeDesc = typesByName.get(
+ new GridQueryProcessor.TypeName(
+ cctx.name(),
type));
if (typeDesc == null || !typeDesc.registered())
throw new CacheException("Failed to find SQL table for type: " + type);
- final GridCloseableIterator<IgniteBiTuple<K, V>> i = idx.queryLocalSql(
- space,
- qry.getSql(),
- qry.getAlias(),
- F.asList(params),
- typeDesc,
- idx.backupFilter(requestTopVer.get(), null));
+ qry.setType(typeDesc.name());
sendQueryExecutedEvent(
- sqlQry,
- params);
-
- return new ClIter<Cache.Entry<K, V>>() {
- @Override public void close() throws Exception {
- i.close();
- }
-
- @Override public boolean hasNext() {
- return i.hasNext();
- }
-
- @Override public Cache.Entry<K, V> next() {
- IgniteBiTuple<K, V> t = i.next();
-
- return new CacheEntryImpl<>(
- (K)cctx.unwrapBinaryIfNeeded(t.getKey(), keepBinary, false),
- (V)cctx.unwrapBinaryIfNeeded(t.getValue(), keepBinary, false));
- }
-
- @Override public void remove() {
- throw new UnsupportedOperationException();
- }
- };
+ qry.getSql(),
+ qry.getArgs());
+
+ return idx.queryLocalSql(cctx, qry, idx.backupFilter(requestTopVer.get(), null), keepBinary);
}
}, true);
}
@@ -994,13 +930,6 @@ public class GridQueryProcessor extends GridProcessorAdapter {
}
/**
- * Closeable iterator.
- */
- private interface ClIter<X> extends AutoCloseable, Iterator<X> {
- // No-op.
- }
-
- /**
* @param cctx Cache context.
* @param qry Query.
* @return Iterator.
@@ -1010,34 +939,26 @@ public class GridQueryProcessor extends GridProcessorAdapter {
throw new IllegalStateException("Failed to execute query (grid is stopping).");
try {
- final boolean keepBinary = cctx.keepBinary();
-
return executeQuery(GridCacheQueryType.SQL_FIELDS, qry.getSql(), cctx, new IgniteOutClosureX<QueryCursor<List<?>>>() {
@Override public QueryCursor<List<?>> applyx() throws IgniteCheckedException {
- final String space = cctx.name();
- final String sql = qry.getSql();
- final Object[] args = qry.getArgs();
- final GridQueryCancel cancel = new GridQueryCancel();
+ GridQueryCancel cancel = new GridQueryCancel();
- final GridQueryFieldsResult res = idx.queryLocalSqlFields(space, sql, F.asList(args),
- idx.backupFilter(requestTopVer.get(), null), qry.isEnforceJoinOrder(), qry.getTimeout(), cancel);
+ final QueryCursor<List<?>> cursor = idx.queryLocalSqlFields(cctx, qry,
+ idx.backupFilter(requestTopVer.get(), null), cancel);
- QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>(new Iterable<List<?>>() {
+ return new QueryCursorImpl<List<?>>(new Iterable<List<?>>() {
@Override public Iterator<List<?>> iterator() {
- try {
- sendQueryExecutedEvent(sql, args);
-
- return new GridQueryCacheObjectsIterator(res.iterator(), cctx, keepBinary);
- }
- catch (IgniteCheckedException e) {
- throw new IgniteException(e);
- }
- }
- }, cancel);
-
- cursor.fieldsMeta(res.metaData());
+ sendQueryExecutedEvent(qry.getSql(), qry.getArgs());
- return cursor;
+ return cursor.iterator();
+ }
+ }, cancel) {
+ @Override public List<GridQueryFieldMetadata> fieldsMeta() {
+ if (cursor instanceof QueryCursorImpl)
+ return ((QueryCursorImpl)cursor).fieldsMeta();
+ return super.fieldsMeta();
+ }
+ };
}
}, true);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageRequest.java
index 1feff5a..acea084 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageRequest.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.processors.query.h2.twostep.messages;
-
import java.nio.ByteBuffer;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.plugin.extensions.communication.Message;
@@ -35,6 +34,9 @@ public class GridQueryNextPageRequest implements Message {
private long qryReqId;
/** */
+ private int segmentId;
+
+ /** */
private int qry;
/** */
@@ -50,11 +52,13 @@ public class GridQueryNextPageRequest implements Message {
/**
* @param qryReqId Query request ID.
* @param qry Query.
+ * @param segmentId Index segment ID.
* @param pageSize Page size.
*/
- public GridQueryNextPageRequest(long qryReqId, int qry, int pageSize) {
+ public GridQueryNextPageRequest(long qryReqId, int qry, int segmentId, int pageSize) {
this.qryReqId = qryReqId;
this.qry = qry;
+ this.segmentId = segmentId;
this.pageSize = pageSize;
}
@@ -72,6 +76,11 @@ public class GridQueryNextPageRequest implements Message {
return qry;
}
+ /** @return Index segment ID */
+ public int segmentId() {
+ return segmentId;
+ }
+
/**
* @return Page size.
*/
@@ -119,6 +128,12 @@ public class GridQueryNextPageRequest implements Message {
writer.incrementState();
+ case 3:
+ if (!writer.writeInt("segmentId", segmentId))
+ return false;
+
+ writer.incrementState();
+
}
return true;
@@ -156,6 +171,14 @@ public class GridQueryNextPageRequest implements Message {
reader.incrementState();
+ case 3:
+ segmentId = reader.readInt("segmentId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
}
return reader.afterMessageRead(GridQueryNextPageRequest.class);
@@ -168,6 +191,6 @@ public class GridQueryNextPageRequest implements Message {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 3;
+ return 4;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java
index 4889069..e85c00b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java
@@ -42,6 +42,9 @@ public class GridQueryNextPageResponse implements Message {
private long qryReqId;
/** */
+ private int segmentId;
+
+ /** */
private int qry;
/** */
@@ -73,6 +76,7 @@ public class GridQueryNextPageResponse implements Message {
/**
* @param qryReqId Query request ID.
+ * @param segmentId Index segment ID.
* @param qry Query.
* @param page Page.
* @param allRows All rows count.
@@ -80,12 +84,13 @@ public class GridQueryNextPageResponse implements Message {
* @param vals Values for rows in this page added sequentially.
* @param plainRows Not marshalled rows for local node.
*/
- public GridQueryNextPageResponse(long qryReqId, int qry, int page, int allRows, int cols,
+ public GridQueryNextPageResponse(long qryReqId, int segmentId, int qry, int page, int allRows, int cols,
Collection<Message> vals, Collection<?> plainRows) {
assert vals != null ^ plainRows != null;
assert cols > 0 : cols;
this.qryReqId = qryReqId;
+ this.segmentId = segmentId;
this.qry = qry;
this.page = page;
this.allRows = allRows;
@@ -102,6 +107,13 @@ public class GridQueryNextPageResponse implements Message {
}
/**
+ * @return Index segment ID.
+ */
+ public int segmentId() {
+ return segmentId;
+ }
+
+ /**
* @return Query.
*/
public int query() {
@@ -202,6 +214,12 @@ public class GridQueryNextPageResponse implements Message {
writer.incrementState();
+ case 7:
+ if (!writer.writeInt("segmentId", segmentId))
+ return false;
+
+ writer.incrementState();
+
}
return true;
@@ -271,6 +289,13 @@ public class GridQueryNextPageResponse implements Message {
reader.incrementState();
+ case 7:
+ segmentId = reader.readInt("segmentId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
}
return reader.afterMessageRead(GridQueryNextPageResponse.class);
@@ -283,7 +308,7 @@ public class GridQueryNextPageResponse implements Message {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 7;
+ return 8;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
index f53936f..c127eeb 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
@@ -69,6 +69,9 @@ public class GridCacheTwoStepQuery {
/** */
private List<Integer> extraCaches;
+ /** */
+ private boolean local;
+
/**
* @param originalSql Original query SQL.
* @param schemas Schema names in query.
@@ -229,6 +232,20 @@ public class GridCacheTwoStepQuery {
}
/**
+ * @return {@code True} If query is local.
+ */
+ public boolean isLocal() {
+ return local;
+ }
+
+ /**
+ * @param local Local query flag.
+ */
+ public void local(boolean local) {
+ this.local = local;
+ }
+
+ /**
* @param args New arguments to copy with.
* @return Copy.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index e4b0c1f..2f40d87 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -77,11 +77,13 @@ import org.apache.ignite.internal.processors.cache.CacheObjectContext;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
+import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator;
import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
import org.apache.ignite.internal.processors.query.GridQueryCancel;
import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
@@ -92,6 +94,7 @@ import org.apache.ignite.internal.processors.query.GridQueryIndexing;
import org.apache.ignite.internal.processors.query.GridQueryProperty;
import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2DefaultTableEngine;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOffheap;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOnheap;
@@ -187,6 +190,8 @@ import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryTy
import static org.apache.ignite.internal.processors.query.GridQueryIndexType.FULLTEXT;
import static org.apache.ignite.internal.processors.query.GridQueryIndexType.GEO_SPATIAL;
import static org.apache.ignite.internal.processors.query.GridQueryIndexType.SORTED;
+import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF;
+import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.distributedJoinMode;
import static org.apache.ignite.internal.processors.query.h2.opt.GridH2AbstractKeyValueRow.KEY_COL;
import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.LOCAL;
import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.PREPARE;
@@ -810,10 +815,22 @@ public class IgniteH2Indexing implements GridQueryIndexing {
removeTable(tbl);
}
- /** {@inheritDoc} */
+ /**
+ * Queries individual fields (generally used by JDBC drivers).
+ *
+ * @param spaceName Space name.
+ * @param qry Query.
+ * @param params Query parameters.
+ * @param filter Space name and key filter.
+ * @param enforceJoinOrder Enforce join order of tables in the query.
+ * @param timeout Query timeout in milliseconds.
+ * @param cancel Query cancel.
+ * @return Query result.
+ * @throws IgniteCheckedException If failed.
+ */
@SuppressWarnings("unchecked")
- @Override public GridQueryFieldsResult queryLocalSqlFields(@Nullable final String spaceName, final String qry,
- @Nullable final Collection<Object> params, final IndexingQueryFilter filters, boolean enforceJoinOrder,
+ public GridQueryFieldsResult queryLocalSqlFields(@Nullable final String spaceName, final String qry,
+ @Nullable final Collection<Object> params, final IndexingQueryFilter filter, boolean enforceJoinOrder,
final int timeout, final GridQueryCancel cancel)
throws IgniteCheckedException {
final Connection conn = connectionForSpace(spaceName);
@@ -833,7 +850,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
fldsQry.setEnforceJoinOrder(enforceJoinOrder);
fldsQry.setTimeout(timeout, TimeUnit.MILLISECONDS);
- return dmlProc.updateLocalSqlFields(spaceName, stmt, fldsQry, filters, cancel);
+ return dmlProc.updateLocalSqlFields(spaceName, stmt, fldsQry, filter, cancel);
}
List<GridQueryFieldMetadata> meta;
@@ -846,7 +863,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
final GridH2QueryContext ctx = new GridH2QueryContext(nodeId, nodeId, 0, LOCAL)
- .filter(filters).distributedJoins(false);
+ .filter(filter).distributedJoinMode(OFF);
return new GridQueryFieldsResultAdapter(meta, null) {
@Override public GridCloseableIterator<List<?>> iterator() throws IgniteCheckedException {
@@ -1099,14 +1116,113 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
/** {@inheritDoc} */
- @SuppressWarnings("unchecked")
- @Override public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> queryLocalSql(@Nullable String spaceName,
- final String qry, String alias, @Nullable final Collection<Object> params, GridQueryTypeDescriptor type,
- final IndexingQueryFilter filter) throws IgniteCheckedException {
- final TableDescriptor tbl = tableDescriptor(spaceName, type);
+ @Override public <K, V> QueryCursor<List<?>> queryLocalSqlFields(final GridCacheContext<?, ?> cctx,
+ final SqlFieldsQuery qry, final IndexingQueryFilter filter, final GridQueryCancel cancel)
+ throws IgniteCheckedException {
+
+ if (cctx.config().getQueryParallelism() > 1) {
+ qry.setDistributedJoins(true);
+
+ assert qry.isLocal();
+
+ return queryTwoStep(cctx, qry, cancel);
+ }
+ else {
+ final boolean keepBinary = cctx.keepBinary();
+
+ final String space = cctx.name();
+ final String sql = qry.getSql();
+ final Object[] args = qry.getArgs();
+
+ final GridQueryFieldsResult res = queryLocalSqlFields(space, sql, F.asList(args), filter,
+ qry.isEnforceJoinOrder(), qry.getTimeout(), cancel);
+
+ QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>(new Iterable<List<?>>() {
+ @Override public Iterator<List<?>> iterator() {
+ try {
+ return new GridQueryCacheObjectsIterator(res.iterator(), cctx, keepBinary);
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+ }, cancel);
+
+ cursor.fieldsMeta(res.metaData());
+
+ return cursor;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public <K, V> QueryCursor<Cache.Entry<K,V>> queryLocalSql(final GridCacheContext<?, ?> cctx,
+ final SqlQuery qry, final IndexingQueryFilter filter, final boolean keepBinary) throws IgniteCheckedException {
+ if (cctx.config().getQueryParallelism() > 1) {
+ qry.setDistributedJoins(true);
+
+ assert qry.isLocal();
+
+ return queryTwoStep(cctx, qry);
+ }
+ else {
+ String space = cctx.name();
+ String type = qry.getType();
+ String sqlQry = qry.getSql();
+ String alias = qry.getAlias();
+ Object[] params = qry.getArgs();
+
+ GridQueryCancel cancel = new GridQueryCancel();
+
+ final GridCloseableIterator<IgniteBiTuple<K, V>> i = queryLocalSql(space, sqlQry, alias,
+ F.asList(params), type, filter, cancel);
+
+ return new QueryCursorImpl<Cache.Entry<K, V>>(new Iterable<Cache.Entry<K, V>>() {
+ @Override public Iterator<Cache.Entry<K, V>> iterator() {
+ return new ClIter<Cache.Entry<K, V>>() {
+ @Override public void close() throws Exception {
+ i.close();
+ }
+
+ @Override public boolean hasNext() {
+ return i.hasNext();
+ }
+
+ @Override public Cache.Entry<K, V> next() {
+ IgniteBiTuple<K, V> t = i.next();
+
+ return new CacheEntryImpl<>(
+ (K)cctx.unwrapBinaryIfNeeded(t.get1(), keepBinary, false),
+ (V)cctx.unwrapBinaryIfNeeded(t.get2(), keepBinary, false));
+ }
+
+ @Override public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+ }, cancel);
+ }
+ }
+
+ /**
+ * Executes regular query.
+ *
+ * @param spaceName Space name.
+ * @param qry Query.
+ * @param alias Table alias.
+ * @param params Query parameters.
+ * @param type Query return type.
+ * @param filter Space name and key filter.
+ * @return Queried rows.
+ * @throws IgniteCheckedException If failed.
+ */
+ public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> queryLocalSql(@Nullable String spaceName,
+ final String qry, String alias, @Nullable final Collection<Object> params, String type,
+ final IndexingQueryFilter filter, GridQueryCancel cancel) throws IgniteCheckedException {
+ final TableDescriptor tbl = tableDescriptor(type, spaceName);
if (tbl == null)
- throw new IgniteSQLException("Failed to find SQL table for type: " + type.name(),
+ throw new IgniteSQLException("Failed to find SQL table for type: " + type,
IgniteQueryErrorCode.TABLE_NOT_FOUND);
String sql = generateQuery(qry, alias, tbl);
@@ -1115,7 +1231,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
setupConnection(conn, false, false);
- GridH2QueryContext.set(new GridH2QueryContext(nodeId, nodeId, 0, LOCAL).filter(filter).distributedJoins(false));
+ GridH2QueryContext.set(new GridH2QueryContext(nodeId, nodeId, 0, LOCAL).filter(filter)
+ .distributedJoinMode(OFF));
GridRunningQueryInfo run = new GridRunningQueryInfo(qryIdGen.incrementAndGet(), qry, SQL, spaceName,
U.currentTimeMillis(), null, true);
@@ -1123,7 +1240,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
runs.put(run.id(), run);
try {
- ResultSet rs = executeSqlQueryWithTimer(spaceName, conn, sql, params, true, 0, null);
+ ResultSet rs = executeSqlQueryWithTimer(spaceName, conn, sql, params, true, 0, cancel);
return new KeyValIterator(rs);
}
@@ -1178,8 +1295,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
fqry.setArgs(qry.getArgs());
fqry.setPageSize(qry.getPageSize());
fqry.setDistributedJoins(qry.isDistributedJoins());
+ fqry.setLocal(qry.isLocal());
- if(qry.getTimeout() > 0)
+ if (qry.getTimeout() > 0)
fqry.setTimeout(qry.getTimeout(), TimeUnit.MILLISECONDS);
final QueryCursor<List<?>> res = queryTwoStep(cctx, fqry, null);
@@ -1234,11 +1352,13 @@ public class IgniteH2Indexing implements GridQueryIndexing {
final boolean distributedJoins = qry.isDistributedJoins() && cctx.isPartitioned();
final boolean grpByCollocated = qry.isCollocated();
+ final DistributedJoinMode distributedJoinMode = distributedJoinMode(qry.isLocal(), distributedJoins);
+
GridCacheTwoStepQuery twoStepQry;
List<GridQueryFieldMetadata> meta;
final TwoStepCachedQueryKey cachedQryKey = new TwoStepCachedQueryKey(space, sqlQry, grpByCollocated,
- distributedJoins, enforceJoinOrder);
+ distributedJoins, enforceJoinOrder, qry.isLocal());
TwoStepCachedQuery cachedQry = twoStepCache.get(cachedQryKey);
if (cachedQry != null) {
@@ -1251,7 +1371,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
setupConnection(c, distributedJoins, enforceJoinOrder);
GridH2QueryContext.set(new GridH2QueryContext(locNodeId, locNodeId, 0, PREPARE)
- .distributedJoins(distributedJoins));
+ .distributedJoinMode(distributedJoinMode));
PreparedStatement stmt;
@@ -1286,9 +1406,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
GridH2QueryContext.clearThreadLocal();
}
- Prepared prepared = GridSqlQueryParser.prepared((JdbcPreparedStatement) stmt);
+ Prepared prepared = GridSqlQueryParser.prepared((JdbcPreparedStatement)stmt);
- if (qry instanceof JdbcSqlFieldsQuery && ((JdbcSqlFieldsQuery) qry).isQuery() != prepared.isQuery())
+ if (qry instanceof JdbcSqlFieldsQuery && ((JdbcSqlFieldsQuery)qry).isQuery() != prepared.isQuery())
throw new IgniteSQLException("Given statement type does not match that declared by JDBC driver",
IgniteQueryErrorCode.STMT_TYPE_MISMATCH);
@@ -1341,8 +1461,12 @@ public class IgniteH2Indexing implements GridQueryIndexing {
extraCaches = null;
}
+ //Prohibit usage indices with different numbers of segments in same query.
+ checkCacheIndexSegmentation(caches);
+
twoStepQry.caches(caches);
twoStepQry.extraCaches(extraCaches);
+ twoStepQry.local(qry.isLocal());
meta = meta(stmt.getMetaData());
}
@@ -1380,6 +1504,32 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
/**
+ * @throws IllegalStateException if segmented indices used with non-segmented indices.
+ */
+ private void checkCacheIndexSegmentation(List<Integer> caches) {
+ if (caches.isEmpty())
+ return; //Nnothing to check
+
+ GridCacheSharedContext sharedContext = ctx.cache().context();
+
+ int expectedParallelism = 0;
+
+ for (int i = 0; i < caches.size(); i++) {
+ GridCacheContext cctx = sharedContext.cacheContext(caches.get(i));
+
+ assert cctx != null;
+
+ if(!cctx.isPartitioned())
+ continue;
+
+ if(expectedParallelism == 0)
+ expectedParallelism = cctx.config().getQueryParallelism();
+ else if (expectedParallelism != 0 && cctx.config().getQueryParallelism() != expectedParallelism)
+ throw new IllegalStateException("Using indexes with different parallelism levels in same query is forbidden.");
+ }
+ }
+
+ /**
* Prepares statement for query.
*
* @param qry Query string.
@@ -1669,7 +1819,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
private void cleanupStatementCache() {
long cur = U.currentTimeMillis();
- for(Iterator<Map.Entry<Thread, StatementCache>> it = stmtCache.entrySet().iterator(); it.hasNext(); ) {
+ for (Iterator<Map.Entry<Thread, StatementCache>> it = stmtCache.entrySet().iterator(); it.hasNext(); ) {
Map.Entry<Thread, StatementCache> entry = it.next();
Thread t = entry.getKey();
@@ -1877,6 +2027,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
for (ClusterNode node : nodes) {
if (node.isLocal()) {
+ if (locNode != null)
+ throw new IllegalStateException();
+
locNode = node;
continue;
@@ -2163,23 +2316,29 @@ public class IgniteH2Indexing implements GridQueryIndexing {
/** */
private final boolean enforceJoinOrder;
+ /** */
+ private final boolean isLocal;
+
/**
* @param space Space.
* @param sql Sql.
* @param grpByCollocated Collocated GROUP BY.
* @param distributedJoins Distributed joins enabled.
* @param enforceJoinOrder Enforce join order of tables.
+ * @param isLocal Query is local flag.
*/
private TwoStepCachedQueryKey(String space,
String sql,
boolean grpByCollocated,
boolean distributedJoins,
- boolean enforceJoinOrder) {
+ boolean enforceJoinOrder,
+ boolean isLocal) {
this.space = space;
this.sql = sql;
this.grpByCollocated = grpByCollocated;
this.distributedJoins = distributedJoins;
this.enforceJoinOrder = enforceJoinOrder;
+ this.isLocal = isLocal;
}
/** {@inheritDoc} */
@@ -2204,7 +2363,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
if (space != null ? !space.equals(that.space) : that.space != null)
return false;
- return sql.equals(that.sql);
+ return isLocal == that.isLocal && sql.equals(that.sql);
}
/** {@inheritDoc} */
@@ -2212,8 +2371,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
int res = space != null ? space.hashCode() : 0;
res = 31 * res + sql.hashCode();
res = 31 * res + (grpByCollocated ? 1 : 0);
- res = 31 * res + (distributedJoins ? 1 : 0);
- res = 31 * res + (enforceJoinOrder ? 1 : 0);
+ res = res + (distributedJoins ? 2 : 0);
+ res = res + (enforceJoinOrder ? 4 : 0);
+ res = res + (isLocal ? 8 : 0);
return res;
}
@@ -2572,7 +2732,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
affCol = null;
// Add primary key index.
- idxs.add(new GridH2TreeIndex("_key_PK", tbl, true,
+ idxs.add(createTreeIndex("_key_PK", tbl, true,
treeIndexColumns(new ArrayList<IndexColumn>(2), keyCol, affCol)));
if (type().valueClass() == String.class) {
@@ -2618,7 +2778,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
cols = treeIndexColumns(cols, keyCol, affCol);
- idxs.add(new GridH2TreeIndex(name, tbl, false, cols));
+ idxs.add(createTreeIndex(name, tbl, false, cols));
}
else if (idx.type() == GEO_SPATIAL)
idxs.add(createH2SpatialIndex(tbl, name, cols.toArray(new IndexColumn[cols.size()])));
@@ -2629,7 +2789,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
// Add explicit affinity key index if nothing alike was found.
if (affCol != null && !affIdxFound) {
- idxs.add(new GridH2TreeIndex("AFFINITY_KEY", tbl, false,
+ idxs.add(createTreeIndex("AFFINITY_KEY", tbl, false,
treeIndexColumns(new ArrayList<IndexColumn>(2), affCol, keyCol)));
}
@@ -2676,6 +2836,22 @@ public class IgniteH2Indexing implements GridQueryIndexing {
throw new IgniteException("Failed to instantiate: " + className, e);
}
}
+
+ /**
+ * @param idxName Index name.
+ * @param tbl Table.
+ * @param pk Primary key flag.
+ * @param columns Index column list.
+ * @return
+ */
+ private Index createTreeIndex(String idxName, GridH2Table tbl, boolean pk, List<IndexColumn> columns) {
+ GridCacheContext<?, ?> cctx = tbl.rowDescriptor().context();
+
+ if (cctx != null && cctx.config().getQueryParallelism() > 1)
+ return new GridH2TreeIndex(idxName, tbl, pk, columns, cctx.config().getQueryParallelism());
+
+ return new GridH2TreeIndex(idxName, tbl, pk, columns, 1);
+ }
}
/**
@@ -2729,6 +2905,13 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
/**
+ * Closeable iterator.
+ */
+ private interface ClIter<X> extends AutoCloseable, Iterator<X> {
+ // No-op.
+ }
+
+ /**
* Field descriptor.
*/
static class SqlFieldMetadata implements GridQueryFieldMetadata {
http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/DistributedJoinMode.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/DistributedJoinMode.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/DistributedJoinMode.java
new file mode 100644
index 0000000..cc06244
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/DistributedJoinMode.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.opt;
+
+/**
+ * Defines set of distributed join modes.
+ */
+public enum DistributedJoinMode {
+ /**
+ * Distributed joins is disabled. Local joins will be performed instead.
+ */
+ OFF,
+
+ /**
+ * Distributed joins is enabled within local node only.
+ *
+ * NOTE: This mode is used with segmented indices for local sql queries.
+ * As in this case we need to make distributed join across local index segments
+ * and prevent range-queries to other nodes.
+ */
+ LOCAL_ONLY,
+
+ /**
+ * Distributed joins is enabled.
+ */
+ ON;
+
+ /**
+ * @param isLocal Query local flag.
+ * @param distributedJoins Query distributed joins flag.
+ * @return DistributedJoinMode for the query.
+ */
+ public static DistributedJoinMode distributedJoinMode(boolean isLocal, boolean distributedJoins) {
+ return distributedJoins ? (isLocal ? LOCAL_ONLY : ON) : OFF;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
index bab219c..131e03b 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
@@ -81,6 +81,8 @@ import org.jetbrains.annotations.Nullable;
import static java.util.Collections.emptyIterator;
import static java.util.Collections.singletonList;
+import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.LOCAL_ONLY;
+import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF;
import static org.apache.ignite.internal.processors.query.h2.opt.GridH2AbstractKeyValueRow.KEY_COL;
import static org.apache.ignite.internal.processors.query.h2.opt.GridH2AbstractKeyValueRow.VAL_COL;
import static org.apache.ignite.internal.processors.query.h2.opt.GridH2CollocationModel.buildCollocationModel;
@@ -178,6 +180,13 @@ public abstract class GridH2IndexBase extends BaseIndex {
}
/**
+ * @return Index segment ID for current query context.
+ */
+ protected int threadLocalSegment() {
+ return 0;
+ }
+
+ /**
* If the index supports rebuilding it has to creates its own copy.
*
* @return Rebuilt copy.
@@ -252,7 +261,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
// because on run stage reordering of joined tables by Optimizer is explicitly disabled
// and thus multiplier will be always the same, so it will not affect choice of index.
// Query expressions can not be distributed as well.
- if (qctx == null || qctx.type() != PREPARE || !qctx.distributedJoins() || ses.isPreparingQueryExpression())
+ if (qctx == null || qctx.type() != PREPARE || qctx.distributedJoinMode() == OFF || ses.isPreparingQueryExpression())
return GridH2CollocationModel.MULTIPLIER_COLLOCATED;
// We have to clear this cache because normally sub-query plan cost does not depend on anything
@@ -363,7 +372,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
@Override public IndexLookupBatch createLookupBatch(TableFilter filter) {
GridH2QueryContext qctx = GridH2QueryContext.get();
- if (qctx == null || !qctx.distributedJoins() || !getTable().isPartitioned())
+ if (qctx == null || qctx.distributedJoinMode() == OFF || !getTable().isPartitioned())
return null;
IndexColumn affCol = getTable().getAffinityKeyColumn();
@@ -381,9 +390,11 @@ public abstract class GridH2IndexBase extends BaseIndex {
ucast = false;
}
- GridCacheContext<?,?> cctx = getTable().rowDescriptor().context();
+ GridCacheContext<?, ?> cctx = getTable().rowDescriptor().context();
- return new DistributedLookupBatch(cctx, ucast, affColId);
+ boolean isLocal = qctx.distributedJoinMode() == LOCAL_ONLY;
+
+ return new DistributedLookupBatch(cctx, ucast, affColId, isLocal);
}
/**
@@ -437,18 +448,18 @@ public abstract class GridH2IndexBase extends BaseIndex {
* @param node Requesting node.
* @param msg Request message.
*/
- private void onIndexRangeRequest(ClusterNode node, GridH2IndexRangeRequest msg) {
- GridH2QueryContext qctx = GridH2QueryContext.get(kernalContext().localNodeId(),
- msg.originNodeId(),
- msg.queryId(),
- MAP);
-
+ private void onIndexRangeRequest(final ClusterNode node, final GridH2IndexRangeRequest msg) {
GridH2IndexRangeResponse res = new GridH2IndexRangeResponse();
res.originNodeId(msg.originNodeId());
res.queryId(msg.queryId());
+ res.originSegmentId(msg.originSegmentId());
+ res.segment(msg.segment());
res.batchLookupId(msg.batchLookupId());
+ GridH2QueryContext qctx = GridH2QueryContext.get(kernalContext().localNodeId(), msg.originNodeId(),
+ msg.queryId(), msg.originSegmentId(), MAP);
+
if (qctx == null)
res.status(STATUS_NOT_FOUND);
else {
@@ -461,11 +472,11 @@ public abstract class GridH2IndexBase extends BaseIndex {
assert !msg.bounds().isEmpty() : "empty bounds";
- src = new RangeSource(msg.bounds(), snapshot0, qctx.filter());
+ src = new RangeSource(msg.bounds(), msg.segment(), snapshot0, qctx.filter());
}
else {
// This is request to fetch next portion of data.
- src = qctx.getSource(node.id(), msg.batchLookupId());
+ src = qctx.getSource(node.id(), msg.segment(), msg.batchLookupId());
assert src != null;
}
@@ -491,11 +502,11 @@ public abstract class GridH2IndexBase extends BaseIndex {
if (src.hasMoreRows()) {
// Save source for future fetches.
if (msg.bounds() != null)
- qctx.putSource(node.id(), msg.batchLookupId(), src);
+ qctx.putSource(node.id(), msg.segment(), msg.batchLookupId(), src);
}
else if (msg.bounds() == null) {
// Drop saved source.
- qctx.putSource(node.id(), msg.batchLookupId(), null);
+ qctx.putSource(node.id(), msg.segment(), msg.batchLookupId(), null);
}
assert !ranges.isEmpty();
@@ -520,17 +531,17 @@ public abstract class GridH2IndexBase extends BaseIndex {
*/
private void onIndexRangeResponse(ClusterNode node, GridH2IndexRangeResponse msg) {
GridH2QueryContext qctx = GridH2QueryContext.get(kernalContext().localNodeId(),
- msg.originNodeId(), msg.queryId(), MAP);
+ msg.originNodeId(), msg.queryId(), msg.originSegmentId(), MAP);
if (qctx == null)
return;
- Map<ClusterNode, RangeStream> streams = qctx.getStreams(msg.batchLookupId());
+ Map<SegmentKey, RangeStream> streams = qctx.getStreams(msg.batchLookupId());
if (streams == null)
return;
- RangeStream stream = streams.get(node);
+ RangeStream stream = streams.get(new SegmentKey(node, msg.segment()));
assert stream != null;
@@ -549,47 +560,69 @@ public abstract class GridH2IndexBase extends BaseIndex {
/**
* @param qctx Query context.
* @param batchLookupId Batch lookup ID.
+ * @param segmentId Segment ID.
* @return Index range request.
*/
- private static GridH2IndexRangeRequest createRequest(GridH2QueryContext qctx, int batchLookupId) {
+ private static GridH2IndexRangeRequest createRequest(GridH2QueryContext qctx, int batchLookupId, int segmentId) {
GridH2IndexRangeRequest req = new GridH2IndexRangeRequest();
req.originNodeId(qctx.originNodeId());
req.queryId(qctx.queryId());
+ req.originSegmentId(qctx.segment());
+ req.segment(segmentId);
req.batchLookupId(batchLookupId);
return req;
}
+
/**
* @param qctx Query context.
* @param cctx Cache context.
+ * @param isLocalQry Local query flag.
* @return Collection of nodes for broadcasting.
*/
- private List<ClusterNode> broadcastNodes(GridH2QueryContext qctx, GridCacheContext<?,?> cctx) {
+ private List<SegmentKey> broadcastSegments(GridH2QueryContext qctx, GridCacheContext<?, ?> cctx, boolean isLocalQry) {
Map<UUID, int[]> partMap = qctx.partitionsMap();
- List<ClusterNode> res;
+ List<ClusterNode> nodes;
+
+ if (isLocalQry) {
+ if (partMap != null && !partMap.containsKey(cctx.localNodeId()))
+ return Collections.<SegmentKey>emptyList(); // Prevent remote index call for local queries.
- if (partMap == null)
- res = new ArrayList<>(CU.affinityNodes(cctx, qctx.topologyVersion()));
+ nodes = Collections.singletonList(cctx.localNode());
+ }
else {
- res = new ArrayList<>(partMap.size());
+ if (partMap == null)
+ nodes = new ArrayList<>(CU.affinityNodes(cctx, qctx.topologyVersion()));
+ else {
+ nodes = new ArrayList<>(partMap.size());
- GridKernalContext ctx = kernalContext();
+ GridKernalContext ctx = kernalContext();
- for (UUID nodeId : partMap.keySet()) {
- ClusterNode node = ctx.discovery().node(nodeId);
+ for (UUID nodeId : partMap.keySet()) {
+ ClusterNode node = ctx.discovery().node(nodeId);
- if (node == null)
- throw new GridH2RetryException("Failed to find node.");
+ if (node == null)
+ throw new GridH2RetryException("Failed to find node.");
- res.add(node);
+ nodes.add(node);
+ }
}
+
+ if (F.isEmpty(nodes))
+ throw new GridH2RetryException("Failed to collect affinity nodes.");
}
- if (F.isEmpty(res))
- throw new GridH2RetryException("Failed to collect affinity nodes.");
+ int segmentsCount = segmentsCount();
+
+ List<SegmentKey> res = new ArrayList<>(nodes.size() * segmentsCount);
+
+ for (ClusterNode node : nodes) {
+ for (int seg = 0; seg < segmentsCount; seg++)
+ res.add(new SegmentKey(node, seg));
+ }
return res;
}
@@ -598,26 +631,81 @@ public abstract class GridH2IndexBase extends BaseIndex {
* @param cctx Cache context.
* @param qctx Query context.
* @param affKeyObj Affinity key.
- * @return Cluster nodes or {@code null} if affinity key is a null value.
+ * @param isLocalQry Local query flag.
+ * @return Segment key for Affinity key.
*/
- private ClusterNode rangeNode(GridCacheContext<?,?> cctx, GridH2QueryContext qctx, Object affKeyObj) {
+ private SegmentKey rangeSegment(GridCacheContext<?, ?> cctx, GridH2QueryContext qctx, Object affKeyObj, boolean isLocalQry) {
assert affKeyObj != null && affKeyObj != EXPLICIT_NULL : affKeyObj;
ClusterNode node;
- if (qctx.partitionsMap() != null) {
- // If we have explicit partitions map, we have to use it to calculate affinity node.
- UUID nodeId = qctx.nodeForPartition(cctx.affinity().partition(affKeyObj), cctx);
+ int partition = cctx.affinity().partition(affKeyObj);
+
+ if (isLocalQry) {
+ if (qctx.partitionsMap() != null) {
+ // If we have explicit partitions map, we have to use it to calculate affinity node.
+ UUID nodeId = qctx.nodeForPartition(partition, cctx);
+
+ if(!cctx.localNodeId().equals(nodeId))
+ return null; // Prevent remote index call for local queries.
+ }
+
+ if (!cctx.affinity().primaryByKey(cctx.localNode(), partition, qctx.topologyVersion()))
+ return null;
+
+ node = cctx.localNode();
+ }
+ else{
+ if (qctx.partitionsMap() != null) {
+ // If we have explicit partitions map, we have to use it to calculate affinity node.
+ UUID nodeId = qctx.nodeForPartition(partition, cctx);
node = cctx.discovery().node(nodeId);
}
else // Get primary node for current topology version.
node = cctx.affinity().primaryByKey(affKeyObj, qctx.topologyVersion());
- if (node == null) // Node was not found, probably topology changed and we need to retry the whole query.
- throw new GridH2RetryException("Failed to find node.");
+ if (node == null) // Node was not found, probably topology changed and we need to retry the whole query.
+ throw new GridH2RetryException("Failed to find node.");
+ }
+
+ return new SegmentKey(node, segment(partition));
+ }
+
+ /** */
+ protected class SegmentKey {
+ /** */
+ final ClusterNode node;
+
+ /** */
+ final int segmentId;
+
+ SegmentKey(ClusterNode node, int segmentId) {
+ assert node != null;
+
+ this.node = node;
+ this.segmentId = segmentId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ SegmentKey key = (SegmentKey)o;
+
+ return segmentId == key.segmentId && node.id().equals(key.node.id());
- return node;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int result = node.hashCode();
+ result = 31 * result + segmentId;
+ return result;
+ }
}
/**
@@ -740,6 +828,20 @@ public abstract class GridH2IndexBase extends BaseIndex {
return database.createRow(vals0, MEMORY_CALCULATE);
}
+ /** @return Index segments count. */
+ protected int segmentsCount() {
+ return 1;
+ }
+
+ /**
+ * @param partition Partition idx.
+ * @return Segment ID for given key
+ */
+ protected int segment(int partition) {
+ return 0;
+ }
+
+
/**
* Simple cursor from a single node.
*/
@@ -752,14 +854,14 @@ public abstract class GridH2IndexBase extends BaseIndex {
/**
* @param rangeId Range ID.
- * @param nodes Remote nodes.
+ * @param keys Remote index segment keys.
* @param rangeStreams Range streams.
*/
- private UnicastCursor(int rangeId, Collection<ClusterNode> nodes, Map<ClusterNode,RangeStream> rangeStreams) {
- assert nodes.size() == 1;
+ UnicastCursor(int rangeId, List<SegmentKey> keys, Map<SegmentKey, RangeStream> rangeStreams) {
+ assert keys.size() == 1;
this.rangeId = rangeId;
- this.stream = rangeStreams.get(F.first(nodes));
+ this.stream = rangeStreams.get(F.first(keys));
assert stream != null;
}
@@ -803,20 +905,19 @@ public abstract class GridH2IndexBase extends BaseIndex {
/**
* @param rangeId Range ID.
- * @param nodes Remote nodes.
+ * @param segmentKeys Remote nodes.
* @param rangeStreams Range streams.
*/
- private BroadcastCursor(int rangeId, Collection<ClusterNode> nodes, Map<ClusterNode,RangeStream> rangeStreams) {
- assert nodes.size() > 1;
+ BroadcastCursor(int rangeId, Collection<SegmentKey> segmentKeys, Map<SegmentKey, RangeStream> rangeStreams) {
this.rangeId = rangeId;
- streams = new RangeStream[nodes.size()];
+ streams = new RangeStream[segmentKeys.size()];
int i = 0;
- for (ClusterNode node : nodes) {
- RangeStream stream = rangeStreams.get(node);
+ for (SegmentKey segmentKey : segmentKeys) {
+ RangeStream stream = rangeStreams.get(segmentKey);
assert stream != null;
@@ -928,16 +1029,19 @@ public abstract class GridH2IndexBase extends BaseIndex {
final int affColId;
/** */
+ private final boolean localQuery;
+
+ /** */
GridH2QueryContext qctx;
/** */
int batchLookupId;
/** */
- Map<ClusterNode, RangeStream> rangeStreams = Collections.emptyMap();
+ Map<SegmentKey, RangeStream> rangeStreams = Collections.emptyMap();
/** */
- List<ClusterNode> broadcastNodes;
+ List<SegmentKey> broadcastSegments;
/** */
List<Future<Cursor>> res = Collections.emptyList();
@@ -952,11 +1056,13 @@ public abstract class GridH2IndexBase extends BaseIndex {
* @param cctx Cache Cache context.
* @param ucast Unicast or broadcast query.
* @param affColId Affinity column ID.
+ * @param localQuery Local query flag.
*/
- private DistributedLookupBatch(GridCacheContext<?,?> cctx, boolean ucast, int affColId) {
+ DistributedLookupBatch(GridCacheContext<?, ?> cctx, boolean ucast, int affColId, boolean localQuery) {
this.cctx = cctx;
this.ucast = ucast;
this.affColId = affColId;
+ this.localQuery = localQuery;
}
/**
@@ -1028,7 +1134,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
Object affKey = affColId == -1 ? null : getAffinityKey(firstRow, lastRow);
- List<ClusterNode> nodes;
+ List<SegmentKey> segmentKeys;
Future<Cursor> fut;
if (affKey != null) {
@@ -1036,17 +1142,20 @@ public abstract class GridH2IndexBase extends BaseIndex {
if (affKey == EXPLICIT_NULL) // Affinity key is explicit null, we will not find anything.
return false;
- nodes = F.asList(rangeNode(cctx, qctx, affKey));
+ segmentKeys = F.asList(rangeSegment(cctx, qctx, affKey, localQuery));
}
else {
// Affinity key is not provided or is not the same in upper and lower bounds, we have to broadcast.
- if (broadcastNodes == null)
- broadcastNodes = broadcastNodes(qctx, cctx);
+ if (broadcastSegments == null)
+ broadcastSegments = broadcastSegments(qctx, cctx, localQuery);
- nodes = broadcastNodes;
+ segmentKeys = broadcastSegments;
}
- assert !F.isEmpty(nodes) : nodes;
+ if (localQuery && segmentKeys.isEmpty())
+ return false; // Nothing to do
+
+ assert !F.isEmpty(segmentKeys) : segmentKeys;
final int rangeId = res.size();
@@ -1058,21 +1167,21 @@ public abstract class GridH2IndexBase extends BaseIndex {
GridH2RowRangeBounds rangeBounds = rangeBounds(rangeId, first, last);
// Add range to every message of every participating node.
- for (int i = 0; i < nodes.size(); i++) {
- ClusterNode node = nodes.get(i);
- assert node != null;
+ for (int i = 0; i < segmentKeys.size(); i++) {
+ SegmentKey segmentKey = segmentKeys.get(i);
+ assert segmentKey != null;
- RangeStream stream = rangeStreams.get(node);
+ RangeStream stream = rangeStreams.get(segmentKey);
List<GridH2RowRangeBounds> bounds;
if (stream == null) {
- stream = new RangeStream(qctx, node);
+ stream = new RangeStream(qctx, segmentKey.node);
- stream.req = createRequest(qctx, batchLookupId);
+ stream.req = createRequest(qctx, batchLookupId, segmentKey.segmentId);
stream.req.bounds(bounds = new ArrayList<>());
- rangeStreams.put(node, stream);
+ rangeStreams.put(segmentKey, stream);
}
else
bounds = stream.req.bounds();
@@ -1084,9 +1193,9 @@ public abstract class GridH2IndexBase extends BaseIndex {
batchFull = true;
}
- fut = new DoneFuture<>(nodes.size() == 1 ?
- new UnicastCursor(rangeId, nodes, rangeStreams) :
- new BroadcastCursor(rangeId, nodes, rangeStreams));
+ fut = new DoneFuture<>(segmentKeys.size() == 1 ?
+ new UnicastCursor(rangeId, segmentKeys, rangeStreams) :
+ new BroadcastCursor(rangeId, segmentKeys, rangeStreams));
res.add(fut);
@@ -1138,7 +1247,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
batchLookupId = 0;
rangeStreams = Collections.emptyMap();
- broadcastNodes = null;
+ broadcastSegments = null;
batchFull = false;
findCalled = false;
res = Collections.emptyList();
@@ -1244,7 +1353,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
if (remainingRanges > 0) {
if (req.bounds() != null)
- req = createRequest(qctx, req.batchLookupId());
+ req = createRequest(qctx, req.batchLookupId(), req.segment());
// Prefetch next page.
send(singletonList(node), req);
@@ -1366,6 +1475,9 @@ public abstract class GridH2IndexBase extends BaseIndex {
final ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> tree;
/** */
+ private final int segment;
+
+ /** */
final IndexingQueryFilter filter;
/**
@@ -1375,9 +1487,11 @@ public abstract class GridH2IndexBase extends BaseIndex {
*/
RangeSource(
Iterable<GridH2RowRangeBounds> bounds,
+ int segment,
ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> tree,
IndexingQueryFilter filter
) {
+ this.segment = segment;
this.filter = filter;
this.tree = tree;
boundsIter = bounds.iterator();
@@ -1435,7 +1549,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
SearchRow first = toSearchRow(bounds.first());
SearchRow last = toSearchRow(bounds.last());
- ConcurrentNavigableMap<GridSearchRowPointer,GridH2Row> t = tree != null ? tree : treeForRead();
+ ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> t = tree != null ? tree : treeForRead(segment);
curRange = doFind0(t, first, true, last, filter);
@@ -1452,9 +1566,10 @@ public abstract class GridH2IndexBase extends BaseIndex {
}
/**
- * @return Snapshot for current thread if there is one.
+ * @param segment Segment Id.
+ * @return Snapshot for requested segment if there is one.
*/
- protected ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> treeForRead() {
+ protected ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> treeForRead(int segment) {
throw new UnsupportedOperationException();
}
@@ -1505,7 +1620,8 @@ public abstract class GridH2IndexBase extends BaseIndex {
this.fltr = qryFilter.forSpace(spaceName);
this.isValRequired = qryFilter.isValueRequired();
- } else {
+ }
+ else {
this.fltr = null;
this.isValRequired = false;
http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
index 19ea2b2..a7ee0dc 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
@@ -32,6 +32,7 @@ import org.apache.ignite.spi.indexing.IndexingQueryFilter;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
+import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF;
import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.MAP;
/**
@@ -79,7 +80,7 @@ public class GridH2QueryContext {
private UUID[] partsNodes;
/** */
- private boolean distributedJoins;
+ private DistributedJoinMode distributedJoinMode;
/** */
private int pageSize;
@@ -94,7 +95,22 @@ public class GridH2QueryContext {
* @param type Query type.
*/
public GridH2QueryContext(UUID locNodeId, UUID nodeId, long qryId, GridH2QueryType type) {
- key = new Key(locNodeId, nodeId, qryId, type);
+ assert type != MAP;
+
+ key = new Key(locNodeId, nodeId, qryId, 0, type);
+ }
+
+ /**
+ * @param locNodeId Local node ID.
+ * @param nodeId The node who initiated the query.
+ * @param qryId The query ID.
+ * @param segmentId Index segment ID.
+ * @param type Query type.
+ */
+ public GridH2QueryContext(UUID locNodeId, UUID nodeId, long qryId, int segmentId, GridH2QueryType type) {
+ assert segmentId == 0 || type == MAP;
+
+ key = new Key(locNodeId, nodeId, qryId, segmentId, type);
}
/**
@@ -133,20 +149,20 @@ public class GridH2QueryContext {
}
/**
- * @param distributedJoins Distributed joins can be run in this query.
+ * @param distributedJoinMode Distributed join mode.
* @return {@code this}.
*/
- public GridH2QueryContext distributedJoins(boolean distributedJoins) {
- this.distributedJoins = distributedJoins;
+ public GridH2QueryContext distributedJoinMode(DistributedJoinMode distributedJoinMode) {
+ this.distributedJoinMode = distributedJoinMode;
return this;
}
/**
- * @return {@code true} If distributed joins can be run in this query.
+ * @return Distributed join mode.
*/
- public boolean distributedJoins() {
- return distributedJoins;
+ public DistributedJoinMode distributedJoinMode() {
+ return distributedJoinMode;
}
/**
@@ -226,6 +242,11 @@ public class GridH2QueryContext {
return nodeIds[p];
}
+ /** @return index segment ID. */
+ public int segment() {
+ return key.segmentId;
+ }
+
/**
* @param idxId Index ID.
* @param snapshot Index snapshot.
@@ -303,11 +324,12 @@ public class GridH2QueryContext {
/**
* @param ownerId Owner node ID.
+ * @param segmentId Index segment ID.
* @param batchLookupId Batch lookup ID.
* @param src Range source.
*/
- public synchronized void putSource(UUID ownerId, int batchLookupId, Object src) {
- SourceKey srcKey = new SourceKey(ownerId, batchLookupId);
+ public synchronized void putSource(UUID ownerId, int segmentId, int batchLookupId, Object src) {
+ SourceKey srcKey = new SourceKey(ownerId, segmentId, batchLookupId);
if (src != null) {
if (sources == null)
@@ -321,15 +343,16 @@ public class GridH2QueryContext {
/**
* @param ownerId Owner node ID.
+ * @param segmentId Index segment ID.
* @param batchLookupId Batch lookup ID.
* @return Range source.
*/
@SuppressWarnings("unchecked")
- public synchronized <T> T getSource(UUID ownerId, int batchLookupId) {
+ public synchronized <T> T getSource(UUID ownerId, int segmentId, int batchLookupId) {
if (sources == null)
return null;
- return (T)sources.get(new SourceKey(ownerId, batchLookupId));
+ return (T)sources.get(new SourceKey(ownerId, segmentId, batchLookupId));
}
/**
@@ -356,7 +379,7 @@ public class GridH2QueryContext {
assert qctx.get() == null;
// We need MAP query context to be available to other threads to run distributed joins.
- if (x.key.type == MAP && x.distributedJoins() && qctxs.putIfAbsent(x.key, x) != null)
+ if (x.key.type == MAP && x.distributedJoinMode() != OFF && qctxs.putIfAbsent(x.key, x) != null)
throw new IllegalStateException("Query context is already set.");
qctx.set(x);
@@ -381,7 +404,14 @@ public class GridH2QueryContext {
* @return {@code True} if context was found.
*/
public static boolean clear(UUID locNodeId, UUID nodeId, long qryId, GridH2QueryType type) {
- return doClear(new Key(locNodeId, nodeId, qryId, type), false);
+ boolean res = false;
+
+ for (Key key : qctxs.keySet()) {
+ if (key.locNodeId.equals(locNodeId) && key.nodeId.equals(nodeId) && key.qryId == qryId && key.type == type)
+ res |= doClear(new Key(locNodeId, nodeId, qryId, key.segmentId, type), false);
+ }
+
+ return res;
}
/**
@@ -463,6 +493,7 @@ public class GridH2QueryContext {
* @param locNodeId Local node ID.
* @param nodeId The node who initiated the query.
* @param qryId The query ID.
+ * @param segmentId Index segment ID.
* @param type Query type.
* @return Query context.
*/
@@ -470,9 +501,10 @@ public class GridH2QueryContext {
UUID locNodeId,
UUID nodeId,
long qryId,
+ int segmentId,
GridH2QueryType type
) {
- return qctxs.get(new Key(locNodeId, nodeId, qryId, type));
+ return qctxs.get(new Key(locNodeId, nodeId, qryId, segmentId, type));
}
/**
@@ -528,15 +560,19 @@ public class GridH2QueryContext {
private final long qryId;
/** */
+ private final int segmentId;
+
+ /** */
private final GridH2QueryType type;
/**
* @param locNodeId Local node ID.
* @param nodeId The node who initiated the query.
* @param qryId The query ID.
+ * @param segmentId Index segment ID.
* @param type Query type.
*/
- private Key(UUID locNodeId, UUID nodeId, long qryId, GridH2QueryType type) {
+ private Key(UUID locNodeId, UUID nodeId, long qryId, int segmentId, GridH2QueryType type) {
assert locNodeId != null;
assert nodeId != null;
assert type != null;
@@ -544,6 +580,7 @@ public class GridH2QueryContext {
this.locNodeId = locNodeId;
this.nodeId = nodeId;
this.qryId = qryId;
+ this.segmentId = segmentId;
this.type = type;
}
@@ -568,6 +605,7 @@ public class GridH2QueryContext {
res = 31 * res + nodeId.hashCode();
res = 31 * res + (int)(qryId ^ (qryId >>> 32));
res = 31 * res + type.hashCode();
+ res = 31 * res + segmentId;
return res;
}
@@ -586,14 +624,19 @@ public class GridH2QueryContext {
UUID ownerId;
/** */
+ int segmentId;
+
+ /** */
int batchLookupId;
/**
* @param ownerId Owner node ID.
+ * @param segmentId Index segment ID.
* @param batchLookupId Batch lookup ID.
*/
- SourceKey(UUID ownerId, int batchLookupId) {
+ SourceKey(UUID ownerId, int segmentId, int batchLookupId) {
this.ownerId = ownerId;
+ this.segmentId = segmentId;
this.batchLookupId = batchLookupId;
}
@@ -601,12 +644,15 @@ public class GridH2QueryContext {
@Override public boolean equals(Object o) {
SourceKey srcKey = (SourceKey)o;
- return batchLookupId == srcKey.batchLookupId && ownerId.equals(srcKey.ownerId);
+ return batchLookupId == srcKey.batchLookupId && segmentId == srcKey.segmentId &&
+ ownerId.equals(srcKey.ownerId);
}
/** {@inheritDoc} */
@Override public int hashCode() {
- return 31 * ownerId.hashCode() + batchLookupId;
+ int hash = ownerId.hashCode();
+ hash = 31 * hash + segmentId;
+ return 31 * hash + batchLookupId;
}
}
}
[10/12] ignite git commit: Review minors.
Posted by se...@apache.org.
Review minors.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9bc776fe
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9bc776fe
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9bc776fe
Branch: refs/heads/ignite-1.9
Commit: 9bc776fef537d2fb749951cbbf186e81858c11b5
Parents: de6e52b
Author: Andrey V. Mashenkov <an...@gmail.com>
Authored: Tue Feb 21 19:25:14 2017 +0300
Committer: Andrey V. Mashenkov <an...@gmail.com>
Committed: Tue Feb 21 19:29:59 2017 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheProcessor.java | 2 +-
.../query/h2/opt/GridH2SpatialIndex.java | 23 +++-------
.../h2/GridH2IndexingSegmentedGeoSelfTest.java | 2 +
.../query/h2/opt/GridH2IndexBase.java | 38 ++++++++++++++++-
.../query/h2/opt/GridH2TreeIndex.java | 44 +-------------------
.../query/IgniteSqlSplitterSelfTest.java | 2 +-
6 files changed, 46 insertions(+), 65 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/9bc776fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index c3e3f3b..50e1379 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -270,7 +270,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
cfg.setBackups(Integer.MAX_VALUE);
if( cfg.getQueryParallelism() > 1 && cfg.getCacheMode() != PARTITIONED)
- throw new IgniteCheckedException("Cache index segmentation is supported for PARTITIONED mode only.");
+ throw new IgniteCheckedException("Segmented indices are supported for PARTITIONED mode only.");
if (cfg.getAffinityMapper() == null)
cfg.setAffinityMapper(cacheObjCtx.defaultAffMapper());
http://git-wip-us.apache.org/repos/asf/ignite/blob/9bc776fe/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java
----------------------------------------------------------------------
diff --git a/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java b/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java
index 096b82d..c3a1362 100644
--- a/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java
+++ b/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java
@@ -121,6 +121,8 @@ public class GridH2SpatialIndex extends GridH2IndexBase implements SpatialIndex
for (int i = 0; i < segmentsCnt; i++)
segments[i] = store.openMap("spatialIndex-" + i, new MVRTreeMap.Builder<Long>());
+
+ ctx = tbl.rowDescriptor().context();
}
/**
@@ -156,13 +158,11 @@ public class GridH2SpatialIndex extends GridH2IndexBase implements SpatialIndex
assert key != null;
- Long rowId = keyToId.get(key);
+ final int seg = segmentForRow(row);
- int seg;
+ Long rowId = keyToId.get(key);
if (rowId != null) {
- seg = segmentForRowID(rowId);
-
Long oldRowId = segments[seg].remove(getEnvelope(idToRow.get(rowId), rowId));
assert rowId.equals(oldRowId);
@@ -170,8 +170,6 @@ public class GridH2SpatialIndex extends GridH2IndexBase implements SpatialIndex
else {
rowId = ++rowIds;
- seg = segmentForRowID(rowId);
-
keyToId.put(key, rowId);
}
@@ -190,17 +188,6 @@ public class GridH2SpatialIndex extends GridH2IndexBase implements SpatialIndex
}
/**
- * @param id Row ID.
- *
- * @return Segment ID for given row ID.
- */
- private int segmentForRowID(Long id) {
- assert id != null;
-
- return (int)(id % segmentsCount());
- }
-
- /**
* @param row Row.
* @param rowId Row id.
* @return Envelope.
@@ -235,7 +222,7 @@ public class GridH2SpatialIndex extends GridH2IndexBase implements SpatialIndex
assert oldRow != null;
- int seg = segmentForRowID(rowId);
+ final int seg = segmentForRow(row);
if (!segments[seg].remove(getEnvelope(row, rowId), rowId))
throw DbException.throwInternalError("row not found");
http://git-wip-us.apache.org/repos/asf/ignite/blob/9bc776fe/modules/geospatial/src/test/java/org/apache/ignite/internal/processors/query/h2/GridH2IndexingSegmentedGeoSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/geospatial/src/test/java/org/apache/ignite/internal/processors/query/h2/GridH2IndexingSegmentedGeoSelfTest.java b/modules/geospatial/src/test/java/org/apache/ignite/internal/processors/query/h2/GridH2IndexingSegmentedGeoSelfTest.java
index b806321..e404f38 100644
--- a/modules/geospatial/src/test/java/org/apache/ignite/internal/processors/query/h2/GridH2IndexingSegmentedGeoSelfTest.java
+++ b/modules/geospatial/src/test/java/org/apache/ignite/internal/processors/query/h2/GridH2IndexingSegmentedGeoSelfTest.java
@@ -27,4 +27,6 @@ public class GridH2IndexingSegmentedGeoSelfTest extends GridH2IndexingGeoSelfTes
@Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception {
return super.cacheConfiguration(gridName).setQueryParallelism(7);
}
+
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/9bc776fe/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
index 31057c7..fc5eb4b 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
@@ -43,6 +43,7 @@ import org.apache.ignite.internal.GridTopic;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
+import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable;
import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeRequest;
@@ -126,6 +127,8 @@ public abstract class GridH2IndexBase extends BaseIndex {
}
};
+ protected GridCacheContext<?, ?> ctx;
+
/**
* @param tbl Table.
*/
@@ -133,6 +136,8 @@ public abstract class GridH2IndexBase extends BaseIndex {
final GridH2RowDescriptor desc = tbl.rowDescriptor();
if (desc != null && desc.context() != null) {
+ ctx = desc.context();
+
GridKernalContext ctx = desc.context().kernalContext();
log = ctx.log(getClass());
@@ -183,11 +188,11 @@ public abstract class GridH2IndexBase extends BaseIndex {
* @return Index segment ID for current query context.
*/
protected int threadLocalSegment() {
- GridH2QueryContext qctx = GridH2QueryContext.get();
-
if(segmentsCount() == 1)
return 0;
+ GridH2QueryContext qctx = GridH2QueryContext.get();
+
if(qctx == null)
throw new IllegalStateException("GridH2QueryContext is not initialized.");
@@ -864,6 +869,35 @@ public abstract class GridH2IndexBase extends BaseIndex {
}
/**
+ * @param row Table row.
+ * @return Segment ID for given row.
+ */
+ protected int segmentForRow(SearchRow row) {
+ assert row != null;
+
+ CacheObject key;
+
+ if (ctx != null) {
+ final Value keyColValue = row.getValue(KEY_COL);
+
+ assert keyColValue != null;
+
+ final Object o = keyColValue.getObject();
+
+ if (o instanceof CacheObject)
+ key = (CacheObject)o;
+ else
+ key = ctx.toCacheKeyObject(o);
+
+ return segmentForPartition(ctx.affinity().partition(key));
+ }
+
+ assert segmentsCount() == 1;
+
+ return 0;
+ }
+
+ /**
* Simple cursor from a single node.
*/
private static class UnicastCursor implements Cursor {
http://git-wip-us.apache.org/repos/asf/ignite/blob/9bc776fe/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
index 80597f2..2873211 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
@@ -17,15 +17,12 @@
package org.apache.ignite.internal.processors.query.h2.opt;
-import java.lang.reflect.Field;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.NavigableMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
-import org.apache.ignite.internal.processors.cache.CacheObject;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.util.GridEmptyIterator;
import org.apache.ignite.internal.util.offheap.unsafe.GridOffHeapSnapTreeMap;
import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeGuard;
@@ -45,8 +42,6 @@ import org.h2.table.TableFilter;
import org.h2.value.Value;
import org.jetbrains.annotations.Nullable;
-import static org.apache.ignite.internal.processors.query.h2.opt.GridH2AbstractKeyValueRow.KEY_COL;
-
/**
* Base class for snapshotable segmented tree indexes.
*/
@@ -58,9 +53,6 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
/** */
private final boolean snapshotEnabled;
- /** */
- private final GridH2RowDescriptor desc;
-
/**
* Constructor with index initialization. Creates index with single segment.
*
@@ -91,8 +83,6 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
IndexColumn.mapColumns(cols, tbl);
- desc = tbl.rowDescriptor();
-
initBaseIndex(tbl, 0, name, cols,
pk ? IndexType.createPrimaryKey(false, false) : IndexType.createNonUnique(false, false, false));
@@ -106,8 +96,7 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
if (snapshotEnabled) {
for (int i = 0; i < segmentsCnt; i++) {
segments[i] = new SnapTreeMap<GridSearchRowPointer, GridH2Row>(this) {
- @Override
- protected void afterNodeUpdate_nl(Node<GridSearchRowPointer, GridH2Row> node, Object val) {
+ @Override protected void afterNodeUpdate_nl(Node<GridSearchRowPointer, GridH2Row> node, Object val) {
if (val != null)
node.key = (GridSearchRowPointer)val;
}
@@ -426,37 +415,6 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
}
/**
- * @param row Table row.
- * @return Segment ID for given row.
- */
- private int segmentForRow(SearchRow row) {
- assert row != null;
-
- CacheObject key;
-
- if (desc != null && desc.context() != null) {
- GridCacheContext<?, ?> ctx = desc.context();
-
- assert ctx != null;
-
- final Value keyColValue = row.getValue(KEY_COL);
-
- assert keyColValue != null;
-
- final Object o = keyColValue.getObject();
-
- if (o instanceof CacheObject)
- key = (CacheObject)o;
- else
- key = ctx.toCacheKeyObject(o);
-
- return segmentForPartition(ctx.affinity().partition(key));
- }
- else
- return 0;
- }
-
- /**
* Comparable row with bias. Will be used for queries to have correct bounds (in case of multicolumn index
* and query on few first columns we will multiple equal entries in tree).
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/9bc776fe/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
index 37dea47..69f66a5 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
@@ -824,7 +824,7 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest {
return null;
}
- }, CacheException.class, "Cache index segmentation is supported for PARTITIONED mode only.");
+ }, CacheException.class, " Segmented indices are supported for PARTITIONED mode only.");
}
/**
[09/12] ignite git commit: Fix queries with binary marshaller.
Posted by se...@apache.org.
Fix queries with binary marshaller.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/de6e52ba
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/de6e52ba
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/de6e52ba
Branch: refs/heads/ignite-1.9
Commit: de6e52ba54ce58f0dadad16de8aae14b55ab8478
Parents: f4d36cb
Author: Andrey V. Mashenkov <an...@gmail.com>
Authored: Tue Feb 21 16:27:08 2017 +0300
Committer: Andrey V. Mashenkov <an...@gmail.com>
Committed: Tue Feb 21 16:27:08 2017 +0300
----------------------------------------------------------------------
.../internal/processors/query/h2/opt/GridH2TreeIndex.java | 6 +++++-
1 file changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/de6e52ba/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
index 07c3e6d..80597f2 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
@@ -439,7 +439,11 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
assert ctx != null;
- Object o = ctx.toCacheKeyObject(row.getValue(KEY_COL));
+ final Value keyColValue = row.getValue(KEY_COL);
+
+ assert keyColValue != null;
+
+ final Object o = keyColValue.getObject();
if (o instanceof CacheObject)
key = (CacheObject)o;
[06/12] ignite git commit: Minor
Posted by se...@apache.org.
Minor
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/56ce2237
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/56ce2237
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/56ce2237
Branch: refs/heads/ignite-1.9
Commit: 56ce2237a87c2285d9e2f2e8ce433ba17d27c64c
Parents: 5b774f6
Author: Andrey V. Mashenkov <an...@gmail.com>
Authored: Tue Feb 21 12:23:28 2017 +0300
Committer: Andrey V. Mashenkov <an...@gmail.com>
Committed: Tue Feb 21 12:52:33 2017 +0300
----------------------------------------------------------------------
.../processors/query/h2/twostep/GridMapQueryExecutor.java | 7 +++++--
.../processors/query/h2/twostep/GridMergeIndexSorted.java | 4 ++--
.../processors/query/h2/twostep/GridReduceQueryExecutor.java | 2 +-
3 files changed, 8 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/56ce2237/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 72a34a6..f002a5e 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -458,6 +458,8 @@ public class GridMapQueryExecutor {
req.isFlagSet(GridH2QueryRequest.FLAG_IS_LOCAL),
req.isFlagSet(GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS));
+ final boolean enforceJoinOrder = req.isFlagSet(GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER);
+
for (int i = 1; i < mainCctx.config().getQueryParallelism(); i++) {
final int segment = i;
@@ -475,6 +477,7 @@ public class GridMapQueryExecutor {
req.tables(),
req.pageSize(),
joinMode,
+ enforceJoinOrder,
req.timeout());
return null;
@@ -494,7 +497,7 @@ public class GridMapQueryExecutor {
req.tables(),
req.pageSize(),
joinMode,
- req.isFlagSet(GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER),
+ enforceJoinOrder,
req.timeout());
}
@@ -585,7 +588,7 @@ public class GridMapQueryExecutor {
Connection conn = h2.connectionForSpace(mainCctx.name());
// Here we enforce join order to have the same behavior on all the nodes.
- h2.setupConnection(conn, distributedJoinMode != OFF, enforceJoinOrder);
+ setupConnection(conn, distributedJoinMode != OFF, enforceJoinOrder);
GridH2QueryContext.set(qctx);
http://git-wip-us.apache.org/repos/asf/ignite/blob/56ce2237/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexSorted.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexSorted.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexSorted.java
index a1b6691..32c676d 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexSorted.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexSorted.java
@@ -85,8 +85,8 @@ public final class GridMergeIndexSorted extends GridMergeIndex {
}
/** {@inheritDoc} */
- @Override public void setSources(Collection<ClusterNode> nodes) {
- super.setSources(nodes);
+ @Override public void setSources(Collection<ClusterNode> nodes, int segmentsCnt) {
+ super.setSources(nodes, segmentsCnt);
streamsMap = U.newHashMap(nodes.size());
streams = new RowStream[nodes.size()];
http://git-wip-us.apache.org/repos/asf/ignite/blob/56ce2237/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 3cfaae9..4cae6ac 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -624,7 +624,7 @@ public class GridReduceQueryExecutor {
if (distributedJoins)
flags |= GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS;
- if(qry.isLocal())
+ if (qry.isLocal())
flags |= GridH2QueryRequest.FLAG_IS_LOCAL;
if (send(nodes,
[07/12] ignite git commit: Minors
Posted by se...@apache.org.
Minors
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2e90a072
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2e90a072
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2e90a072
Branch: refs/heads/ignite-1.9
Commit: 2e90a07218272c5cb2c09111e193a3613d3d51ac
Parents: 56ce223
Author: Andrey V. Mashenkov <an...@gmail.com>
Authored: Tue Feb 21 15:05:51 2017 +0300
Committer: Andrey V. Mashenkov <an...@gmail.com>
Committed: Tue Feb 21 15:05:51 2017 +0300
----------------------------------------------------------------------
.../query/h2/opt/GridH2TreeIndex.java | 34 ++++----------------
1 file changed, 6 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/2e90a072/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
index 64ca9ea..07c3e6d 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
@@ -45,25 +45,13 @@ import org.h2.table.TableFilter;
import org.h2.value.Value;
import org.jetbrains.annotations.Nullable;
+import static org.apache.ignite.internal.processors.query.h2.opt.GridH2AbstractKeyValueRow.KEY_COL;
+
/**
* Base class for snapshotable segmented tree indexes.
*/
@SuppressWarnings("ComparatorNotSerializable")
public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridSearchRowPointer> {
- /** */
- private static Field KEY_FIELD;
-
- /** */
- static {
- try {
- KEY_FIELD = GridH2AbstractKeyValueRow.class.getDeclaredField("key");
- KEY_FIELD.setAccessible(true);
- }
- catch (NoSuchFieldException e) {
- KEY_FIELD = null;
- }
- }
-
/** Index segments. */
private final ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row>[] segments;
@@ -451,22 +439,12 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
assert ctx != null;
- if (row instanceof GridH2AbstractKeyValueRow && KEY_FIELD != null) {
- try {
- Object o = KEY_FIELD.get(row);
-
- if (o instanceof CacheObject)
- key = (CacheObject)o;
- else
- key = ctx.toCacheKeyObject(o);
+ Object o = ctx.toCacheKeyObject(row.getValue(KEY_COL));
- }
- catch (IllegalAccessException e) {
- throw new IllegalStateException(e);
- }
- }
+ if (o instanceof CacheObject)
+ key = (CacheObject)o;
else
- key = ctx.toCacheKeyObject(row.getValue(0));
+ key = ctx.toCacheKeyObject(o);
return segmentForPartition(ctx.affinity().partition(key));
}
[11/12] ignite git commit: Distributed joins with segmented spatial
index tests added.
Posted by se...@apache.org.
Distributed joins with segmented spatial index tests added.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4dc36948
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4dc36948
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4dc36948
Branch: refs/heads/ignite-1.9
Commit: 4dc369487ef6f5583348f6a0232952c441623723
Parents: 9bc776f
Author: Andrey V. Mashenkov <an...@gmail.com>
Authored: Tue Feb 21 21:13:13 2017 +0300
Committer: Andrey V. Mashenkov <an...@gmail.com>
Committed: Tue Feb 21 21:13:13 2017 +0300
----------------------------------------------------------------------
.../query/h2/GridH2IndexingGeoSelfTest.java | 405 ++++++++++++++-----
.../h2/GridH2IndexingSegmentedGeoSelfTest.java | 13 +-
2 files changed, 302 insertions(+), 116 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/4dc36948/modules/geospatial/src/test/java/org/apache/ignite/internal/processors/query/h2/GridH2IndexingGeoSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/geospatial/src/test/java/org/apache/ignite/internal/processors/query/h2/GridH2IndexingGeoSelfTest.java b/modules/geospatial/src/test/java/org/apache/ignite/internal/processors/query/h2/GridH2IndexingGeoSelfTest.java
index 2843076..839514b 100644
--- a/modules/geospatial/src/test/java/org/apache/ignite/internal/processors/query/h2/GridH2IndexingGeoSelfTest.java
+++ b/modules/geospatial/src/test/java/org/apache/ignite/internal/processors/query/h2/GridH2IndexingGeoSelfTest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.query.h2;
import com.vividsolutions.jts.geom.Geometry;
+import com.vividsolutions.jts.io.ParseException;
import com.vividsolutions.jts.io.WKTReader;
import java.io.Serializable;
import java.util.Arrays;
@@ -31,14 +32,18 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.cache.Cache;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.SqlQuery;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.cache.GridCacheAbstractSelfTest;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.GridTestUtils;
+import org.jetbrains.annotations.NotNull;
/**
* Geo-indexing test.
@@ -50,6 +55,12 @@ public class GridH2IndexingGeoSelfTest extends GridCacheAbstractSelfTest {
/** */
private static final long DUR = 60000L;
+ /** Number of generated samples. */
+ public static final int ENEMYCAMP_SAMPLES_COUNT = 500;
+
+ /** Number of generated samples. */
+ public static final int ENEMY_SAMPLES_COUNT = 1000;
+
/** {@inheritDoc} */
@Override protected int gridCount() {
return 3;
@@ -60,29 +71,41 @@ public class GridH2IndexingGeoSelfTest extends GridCacheAbstractSelfTest {
return DUR * 3;
}
- /** {@inheritDoc} */
- @Override protected Class<?>[] indexedTypes() {
- return new Class<?>[]{
- Integer.class, EnemyCamp.class,
- Long.class, Geometry.class // Geometry must be indexed here.
- };
+ /**
+ * @param name Cache name.
+ * @param partitioned Partition or replicated cache.
+ * @param idxTypes Indexed types.
+ * @return Cache configuration.
+ */
+ protected <K, V> CacheConfiguration<K, V> cacheConfig(String name, boolean partitioned,
+ Class<?>... idxTypes) throws Exception {
+ return new CacheConfiguration<K, V>(name)
+ .setName(name)
+ .setCacheMode(partitioned ? CacheMode.PARTITIONED : CacheMode.REPLICATED)
+ .setAtomicityMode(CacheAtomicityMode.ATOMIC)
+ .setIndexedTypes(idxTypes);
}
/**
* @throws Exception If failed.
*/
public void testPrimitiveGeometry() throws Exception {
- IgniteCache<Long, Geometry> cache = grid(0).cache(null);
+ IgniteCache<Long, Geometry> cache = grid(0).getOrCreateCache(cacheConfig("geom", true, Long.class, Geometry.class));
- WKTReader r = new WKTReader();
+ try {
+ WKTReader r = new WKTReader();
- for (long i = 0; i < 100; i++)
- cache.put(i, r.read("POINT(" + i + " " + i + ")"));
+ for (long i = 0; i < 100; i++)
+ cache.put(i, r.read("POINT(" + i + " " + i + ")"));
- List<List<?>> res = cache.query(new SqlFieldsQuery("explain select _key from Geometry where _val && ?")
- .setArgs(r.read("POLYGON((5 70, 5 80, 30 80, 30 70, 5 70))")).setLocal(true)).getAll();
+ List<List<?>> res = cache.query(new SqlFieldsQuery("explain select _key from Geometry where _val && ?")
+ .setArgs(r.read("POLYGON((5 70, 5 80, 30 80, 30 70, 5 70))")).setLocal(true)).getAll();
- assertTrue("__ explain: " + res, res.get(0).get(0).toString().contains("_val_idx"));
+ assertTrue("__ explain: " + res, res.get(0).get(0).toString().contains("_val_idx"));
+ }
+ finally {
+ cache.destroy();
+ }
}
/**
@@ -90,59 +113,65 @@ public class GridH2IndexingGeoSelfTest extends GridCacheAbstractSelfTest {
*/
@SuppressWarnings("unchecked")
public void testGeo() throws Exception {
- IgniteCache<Integer, EnemyCamp> cache = grid(0).cache(null);
+ IgniteCache<Integer, EnemyCamp> cache = grid(0).getOrCreateCache(
+ cacheConfig("camp", true, Integer.class, EnemyCamp.class));
- WKTReader r = new WKTReader();
+ try {
+ WKTReader r = new WKTReader();
- cache.getAndPut(0, new EnemyCamp(r.read("POINT(25 75)"), "A"));
- cache.getAndPut(1, new EnemyCamp(r.read("POINT(70 70)"), "B"));
- cache.getAndPut(2, new EnemyCamp(r.read("POINT(70 30)"), "C"));
- cache.getAndPut(3, new EnemyCamp(r.read("POINT(75 25)"), "D"));
+ cache.getAndPut(0, new EnemyCamp(r.read("POINT(25 75)"), "A"));
+ cache.getAndPut(1, new EnemyCamp(r.read("POINT(70 70)"), "B"));
+ cache.getAndPut(2, new EnemyCamp(r.read("POINT(70 30)"), "C"));
+ cache.getAndPut(3, new EnemyCamp(r.read("POINT(75 25)"), "D"));
- SqlQuery<Integer, EnemyCamp> qry = new SqlQuery(EnemyCamp.class, "coords && ?");
+ SqlQuery<Integer, EnemyCamp> qry = new SqlQuery(EnemyCamp.class, "coords && ?");
- Collection<Cache.Entry<Integer, EnemyCamp>> res = cache.query(
- qry.setArgs(r.read("POLYGON((5 70, 5 80, 30 80, 30 70, 5 70))"))).getAll();
+ Collection<Cache.Entry<Integer, EnemyCamp>> res = cache.query(
+ qry.setArgs(r.read("POLYGON((5 70, 5 80, 30 80, 30 70, 5 70))"))).getAll();
- checkPoints(res, "A");
+ checkPoints(res, "A");
- res = cache.query(
- qry.setArgs(r.read("POLYGON((10 5, 10 35, 70 30, 75 25, 10 5))"))).getAll();
+ res = cache.query(
+ qry.setArgs(r.read("POLYGON((10 5, 10 35, 70 30, 75 25, 10 5))"))).getAll();
- checkPoints(res, "C", "D");
+ checkPoints(res, "C", "D");
- // Move B to the first polygon.
- cache.getAndPut(1, new EnemyCamp(r.read("POINT(20 75)"), "B"));
+ // Move B to the first polygon.
+ cache.getAndPut(1, new EnemyCamp(r.read("POINT(20 75)"), "B"));
- res = cache.query(
- qry.setArgs(r.read("POLYGON((5 70, 5 80, 30 80, 30 70, 5 70))"))).getAll();
+ res = cache.query(
+ qry.setArgs(r.read("POLYGON((5 70, 5 80, 30 80, 30 70, 5 70))"))).getAll();
- checkPoints(res, "A", "B");
+ checkPoints(res, "A", "B");
- // Move B to the second polygon.
- cache.getAndPut(1, new EnemyCamp(r.read("POINT(30 30)"), "B"));
+ // Move B to the second polygon.
+ cache.getAndPut(1, new EnemyCamp(r.read("POINT(30 30)"), "B"));
- res = cache.query(
- qry.setArgs(r.read("POLYGON((10 5, 10 35, 70 30, 75 25, 10 5))"))).getAll();
+ res = cache.query(
+ qry.setArgs(r.read("POLYGON((10 5, 10 35, 70 30, 75 25, 10 5))"))).getAll();
- checkPoints(res, "B", "C", "D");
+ checkPoints(res, "B", "C", "D");
- // Remove B.
- cache.getAndRemove(1);
+ // Remove B.
+ cache.getAndRemove(1);
- res = cache.query(
- qry.setArgs(r.read("POLYGON((5 70, 5 80, 30 80, 30 70, 5 70))"))).getAll();
+ res = cache.query(
+ qry.setArgs(r.read("POLYGON((5 70, 5 80, 30 80, 30 70, 5 70))"))).getAll();
- checkPoints(res, "A");
+ checkPoints(res, "A");
- res = cache.query(
- qry.setArgs(r.read("POLYGON((10 5, 10 35, 70 30, 75 25, 10 5))"))).getAll();
+ res = cache.query(
+ qry.setArgs(r.read("POLYGON((10 5, 10 35, 70 30, 75 25, 10 5))"))).getAll();
- checkPoints(res, "C", "D");
+ checkPoints(res, "C", "D");
- // Check explaint request.
- assertTrue(F.first(cache.query(new SqlFieldsQuery("explain select * from EnemyCamp " +
- "where coords && 'POINT(25 75)'")).getAll()).get(0).toString().contains("coords_idx"));
+ // Check explaint request.
+ assertTrue(F.first(cache.query(new SqlFieldsQuery("explain select * from EnemyCamp " +
+ "where coords && 'POINT(25 75)'")).getAll()).get(0).toString().contains("coords_idx"));
+ }
+ finally {
+ cache.destroy();
+ }
}
/**
@@ -150,100 +179,107 @@ public class GridH2IndexingGeoSelfTest extends GridCacheAbstractSelfTest {
*/
@SuppressWarnings("unchecked")
public void testGeoMultithreaded() throws Exception {
- final IgniteCache<Integer, EnemyCamp> cache1 = grid(0).cache(null);
- final IgniteCache<Integer, EnemyCamp> cache2 = grid(1).cache(null);
- final IgniteCache<Integer, EnemyCamp> cache3 = grid(2).cache(null);
+ final CacheConfiguration<Integer, EnemyCamp> ccfg = cacheConfig("camp", true, Integer.class, EnemyCamp.class);
- final String[] points = new String[CNT];
+ final IgniteCache<Integer, EnemyCamp> cache1 = grid(0).getOrCreateCache(ccfg);
+ final IgniteCache<Integer, EnemyCamp> cache2 = grid(1).cache("camp");
+ final IgniteCache<Integer, EnemyCamp> cache3 = grid(2).cache("camp");
- WKTReader r = new WKTReader();
+ try {
+ final String[] points = new String[CNT];
- ThreadLocalRandom rnd = ThreadLocalRandom.current();
+ WKTReader r = new WKTReader();
- for (int idx = 0; idx < CNT; idx++) {
- int x = rnd.nextInt(1, 100);
- int y = rnd.nextInt(1, 100);
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
- cache1.getAndPut(idx, new EnemyCamp(r.read("POINT(" + x + " " + y + ")"), Integer.toString(idx)));
+ for (int idx = 0; idx < CNT; idx++) {
+ int x = rnd.nextInt(1, 100);
+ int y = rnd.nextInt(1, 100);
- points[idx] = Integer.toString(idx);
- }
+ cache1.getAndPut(idx, new EnemyCamp(r.read("POINT(" + x + " " + y + ")"), Integer.toString(idx)));
- Thread.sleep(200);
+ points[idx] = Integer.toString(idx);
+ }
- final AtomicBoolean stop = new AtomicBoolean();
- final AtomicReference<Exception> err = new AtomicReference<>();
+ Thread.sleep(200);
- IgniteInternalFuture<?> putFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
- @Override public Void call() throws Exception {
- WKTReader r = new WKTReader();
+ final AtomicBoolean stop = new AtomicBoolean();
+ final AtomicReference<Exception> err = new AtomicReference<>();
- ThreadLocalRandom rnd = ThreadLocalRandom.current();
+ IgniteInternalFuture<?> putFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ WKTReader r = new WKTReader();
- while (!stop.get()) {
- int cacheIdx = rnd.nextInt(0, 3);
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
- IgniteCache<Integer, EnemyCamp> cache = cacheIdx == 0 ? cache1 : cacheIdx == 1 ? cache2 : cache3;
+ while (!stop.get()) {
+ int cacheIdx = rnd.nextInt(0, 3);
- int idx = rnd.nextInt(CNT);
- int x = rnd.nextInt(1, 100);
- int y = rnd.nextInt(1, 100);
+ IgniteCache<Integer, EnemyCamp> cache = cacheIdx == 0 ? cache1 : cacheIdx == 1 ? cache2 : cache3;
- cache.getAndPut(idx, new EnemyCamp(r.read("POINT(" + x + " " + y + ")"), Integer.toString(idx)));
+ int idx = rnd.nextInt(CNT);
+ int x = rnd.nextInt(1, 100);
+ int y = rnd.nextInt(1, 100);
- U.sleep(50);
- }
+ cache.getAndPut(idx, new EnemyCamp(r.read("POINT(" + x + " " + y + ")"), Integer.toString(idx)));
- return null;
- }
- }, Runtime.getRuntime().availableProcessors(), "put-thread");
+ U.sleep(50);
+ }
- IgniteInternalFuture<?> qryFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
- @Override public Void call() throws Exception {
- WKTReader r = new WKTReader();
+ return null;
+ }
+ }, Runtime.getRuntime().availableProcessors(), "put-thread");
- ThreadLocalRandom rnd = ThreadLocalRandom.current();
+ IgniteInternalFuture<?> qryFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ WKTReader r = new WKTReader();
- while (!stop.get()) {
- try {
- int cacheIdx = rnd.nextInt(0, 3);
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
- IgniteCache<Integer, EnemyCamp> cache = cacheIdx == 0 ? cache1 : cacheIdx == 1 ? cache2 : cache3;
+ while (!stop.get()) {
+ try {
+ int cacheIdx = rnd.nextInt(0, 3);
- SqlQuery<Integer, EnemyCamp> qry = new SqlQuery<>(
- EnemyCamp.class, "coords && ?");
+ IgniteCache<Integer, EnemyCamp> cache = cacheIdx == 0 ? cache1 : cacheIdx == 1 ? cache2 : cache3;
- Collection<Cache.Entry<Integer, EnemyCamp>> res = cache.query(qry.setArgs(
- r.read("POLYGON((0 0, 0 100, 100 100, 100 0, 0 0))"))).getAll();
+ SqlQuery<Integer, EnemyCamp> qry = new SqlQuery<>(
+ EnemyCamp.class, "coords && ?");
- checkPoints(res, points);
+ Collection<Cache.Entry<Integer, EnemyCamp>> res = cache.query(qry.setArgs(
+ r.read("POLYGON((0 0, 0 100, 100 100, 100 0, 0 0))"))).getAll();
- U.sleep(5);
- }
- catch (Exception e) {
- err.set(e);
+ checkPoints(res, points);
- stop.set(true);
+ U.sleep(5);
+ }
+ catch (Exception e) {
+ err.set(e);
- break;
+ stop.set(true);
+
+ break;
+ }
}
- }
- return null;
- }
- }, 4, "qry-thread");
+ return null;
+ }
+ }, 4, "qry-thread");
- U.sleep(60000L);
+ U.sleep(6000L);
- stop.set(true);
+ stop.set(true);
- putFut.get();
- qryFut.get();
+ putFut.get();
+ qryFut.get();
- Exception err0 = err.get();
+ Exception err0 = err.get();
- if (err0 != null)
- throw err0;
+ if (err0 != null)
+ throw err0;
+ }
+ finally {
+ cache1.destroy();
+ }
}
/**
@@ -252,7 +288,7 @@ public class GridH2IndexingGeoSelfTest extends GridCacheAbstractSelfTest {
* @param res Result.
* @param points Expected points.
*/
- private void checkPoints( Collection<Cache.Entry<Integer, EnemyCamp>> res, String... points) {
+ private void checkPoints(Collection<Cache.Entry<Integer, EnemyCamp>> res, String... points) {
Set<String> set = new HashSet<>(Arrays.asList(points));
assertEquals(set.size(), res.size());
@@ -262,12 +298,157 @@ public class GridH2IndexingGeoSelfTest extends GridCacheAbstractSelfTest {
}
/**
+ * @throws Exception if fails.
+ */
+ public void testSegmentedGeoIndexJoin() throws Exception {
+ IgniteCache<Integer, Enemy> c1 = ignite(0).getOrCreateCache(cacheConfig("enemy", true, Integer.class, Enemy.class));
+ IgniteCache<Integer, EnemyCamp> c2 = ignite(0).getOrCreateCache(cacheConfig("camp", true, Integer.class, EnemyCamp.class));
+
+ try {
+ fillCache();
+
+ checkDistributedQuery();
+
+ checkLocalQuery();
+ }
+ finally {
+ c1.destroy();
+ c2.destroy();
+ }
+ }
+
+ /**
+ * @throws Exception if fails.
+ */
+ public void testSegmentedGeoIndexJoin2() throws Exception {
+ IgniteCache<Integer, Enemy> c1 = ignite(0).getOrCreateCache(cacheConfig("enemy", true, Integer.class, Enemy.class));
+ IgniteCache<Integer, EnemyCamp> c2 = ignite(0).getOrCreateCache(cacheConfig("camp", false, Integer.class, EnemyCamp.class));
+
+ try {
+ fillCache();
+
+ checkDistributedQuery();
+
+ checkLocalQuery();
+ }
+ finally {
+ c1.destroy();
+ c2.destroy();
+ }
+ }
+
+ /** */
+ private void checkDistributedQuery() throws ParseException {
+ IgniteCache<Integer, Enemy> c1 = ignite(0).cache("enemy");
+ IgniteCache<Integer, EnemyCamp> c2 = ignite(0).cache("camp");
+
+ final Geometry lethalArea = new WKTReader().read("POLYGON((30 30, 30 70, 70 70, 70 30, 30 30))");
+
+ int expectedEnemies = 0;
+
+ for (Cache.Entry<Integer, Enemy> e : c1) {
+ final Integer campID = e.getValue().campId;
+
+ if (30 <= campID && campID < ENEMYCAMP_SAMPLES_COUNT) {
+ final EnemyCamp camp = c2.get(campID);
+
+ if (lethalArea.covers(camp.coords))
+ expectedEnemies++;
+ }
+ }
+
+ final SqlFieldsQuery query = new SqlFieldsQuery("select e._val, c._val from \"enemy\".Enemy e, \"camp\".EnemyCamp c " +
+ "where e.campId = c._key and c.coords && ?").setArgs(lethalArea);
+
+ List<List<?>> result = c1.query(query.setDistributedJoins(true)).getAll();
+
+ assertEquals(expectedEnemies, result.size());
+ }
+
+ /** */
+ private void checkLocalQuery() throws ParseException {
+ IgniteCache<Integer, Enemy> c1 = ignite(0).cache("enemy");
+ IgniteCache<Integer, EnemyCamp> c2 = ignite(0).cache("camp");
+
+ final Geometry lethalArea = new WKTReader().read("POLYGON((30 30, 30 70, 70 70, 70 30, 30 30))");
+
+ Set<Integer> localCampsIDs = new HashSet<>();
+
+ for(Cache.Entry<Integer, EnemyCamp> e : c2.localEntries())
+ localCampsIDs.add(e.getKey());
+
+ int expectedEnemies = 0;
+
+ for (Cache.Entry<Integer, Enemy> e : c1.localEntries()) {
+ final Integer campID = e.getValue().campId;
+
+ if (localCampsIDs.contains(campID)) {
+ final EnemyCamp camp = c2.get(campID);
+
+ if (lethalArea.covers(camp.coords))
+ expectedEnemies++;
+ }
+ }
+
+ final SqlFieldsQuery query = new SqlFieldsQuery("select e._val, c._val from \"enemy\".Enemy e, \"camp\".EnemyCamp c " +
+ "where e.campId = c._key and c.coords && ?").setArgs(lethalArea);
+
+ List<List<?>> result = c1.query(query.setLocal(true)).getAll();
+
+ assertEquals(expectedEnemies, result.size());
+ }
+
+ /** */
+ private void fillCache() throws ParseException {
+ IgniteCache<Integer, Enemy> c1 = ignite(0).cache("enemy");
+ IgniteCache<Integer, EnemyCamp> c2 = ignite(0).cache("camp");
+
+ final ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ WKTReader r = new WKTReader();
+
+ for (int i = 0; i < ENEMYCAMP_SAMPLES_COUNT; i++) {
+ final String point = String.format("POINT(%d %d)", rnd.nextInt(100), rnd.nextInt(100));
+
+ c2.put(i, new EnemyCamp(r.read(point), "camp-" + i));
+ }
+
+ for (int i = 0; i < ENEMY_SAMPLES_COUNT; i++) {
+ int campID = 30 + rnd.nextInt(ENEMYCAMP_SAMPLES_COUNT + 10);
+
+ c1.put(i, new Enemy(campID, "enemy-" + i));
+ }
+ }
+
+ /**
+ *
+ */
+ private static class Enemy {
+ /** */
+ @QuerySqlField
+ int campId;
+
+ /** */
+ @QuerySqlField
+ String name;
+
+ /**
+ * @param campId Camp ID.
+ * @param name Name.
+ */
+ public Enemy(int campId, String name) {
+ this.campId = campId;
+ this.name = name;
+ }
+ }
+
+ /**
*
*/
- private static class EnemyCamp implements Serializable {
+ protected static class EnemyCamp implements Serializable {
/** */
@QuerySqlField(index = true)
- private Geometry coords;
+ Geometry coords;
/** */
@QuerySqlField
@@ -277,7 +458,7 @@ public class GridH2IndexingGeoSelfTest extends GridCacheAbstractSelfTest {
* @param coords Coordinates.
* @param name Name.
*/
- private EnemyCamp(Geometry coords, String name) {
+ EnemyCamp(Geometry coords, String name) {
this.coords = coords;
this.name = name;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4dc36948/modules/geospatial/src/test/java/org/apache/ignite/internal/processors/query/h2/GridH2IndexingSegmentedGeoSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/geospatial/src/test/java/org/apache/ignite/internal/processors/query/h2/GridH2IndexingSegmentedGeoSelfTest.java b/modules/geospatial/src/test/java/org/apache/ignite/internal/processors/query/h2/GridH2IndexingSegmentedGeoSelfTest.java
index e404f38..eb0fd0f 100644
--- a/modules/geospatial/src/test/java/org/apache/ignite/internal/processors/query/h2/GridH2IndexingSegmentedGeoSelfTest.java
+++ b/modules/geospatial/src/test/java/org/apache/ignite/internal/processors/query/h2/GridH2IndexingSegmentedGeoSelfTest.java
@@ -23,10 +23,15 @@ import org.apache.ignite.configuration.CacheConfiguration;
* Test for segmented geo index.
*/
public class GridH2IndexingSegmentedGeoSelfTest extends GridH2IndexingGeoSelfTest {
- /** {@inheritDoc} */
- @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception {
- return super.cacheConfiguration(gridName).setQueryParallelism(7);
- }
+ /** */
+ private static int QRY_PARALLELISM_LVL = 7;
+ /** {@inheritDoc} */
+ @Override
+ protected <K, V> CacheConfiguration<K, V> cacheConfig(String name, boolean partitioned,
+ Class<?>... idxTypes) throws Exception {
+ final CacheConfiguration<K, V> ccfg = super.cacheConfig(name, partitioned, idxTypes);
+ return ccfg.setQueryParallelism(partitioned ? QRY_PARALLELISM_LVL : 1);
+ }
}
\ No newline at end of file
[03/12] ignite git commit: Added Spatial Index segmentation.
Posted by se...@apache.org.
Added Spatial Index segmentation.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e69d3c34
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e69d3c34
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e69d3c34
Branch: refs/heads/ignite-1.9
Commit: e69d3c34915f2f7de1b66f22e0584b7c09d8fe71
Parents: 64ba13b
Author: Andrey V. Mashenkov <an...@gmail.com>
Authored: Tue Feb 21 11:35:34 2017 +0300
Committer: Andrey V. Mashenkov <an...@gmail.com>
Committed: Tue Feb 21 11:52:58 2017 +0300
----------------------------------------------------------------------
.../query/h2/opt/GridH2SpatialIndex.java | 69 +++++++++++++++++---
.../h2/GridH2IndexingSegmentedGeoSelfTest.java | 30 +++++++++
.../testsuites/GeoSpatialIndexingTestSuite.java | 2 +
.../processors/query/h2/IgniteH2Indexing.java | 16 ++---
.../query/h2/opt/GridH2IndexBase.java | 21 +++---
.../query/h2/opt/GridH2TreeIndex.java | 35 ++--------
6 files changed, 118 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/e69d3c34/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java
----------------------------------------------------------------------
diff --git a/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java b/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java
index 3062d13..096b82d 100644
--- a/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java
+++ b/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java
@@ -41,7 +41,6 @@ import org.h2.mvstore.rtree.SpatialKey;
import org.h2.result.SearchRow;
import org.h2.result.SortOrder;
import org.h2.table.IndexColumn;
-import org.h2.table.Table;
import org.h2.table.TableFilter;
import org.h2.value.Value;
import org.h2.value.ValueGeometry;
@@ -67,7 +66,7 @@ public class GridH2SpatialIndex extends GridH2IndexBase implements SpatialIndex
private boolean closed;
/** */
- private final MVRTreeMap<Long> treeMap;
+ private final MVRTreeMap<Long>[] segments;
/** */
private final Map<Long, GridH2Row> idToRow = new HashMap<>();
@@ -83,7 +82,17 @@ public class GridH2SpatialIndex extends GridH2IndexBase implements SpatialIndex
* @param idxName Index name.
* @param cols Columns.
*/
- public GridH2SpatialIndex(Table tbl, String idxName, IndexColumn... cols) {
+ public GridH2SpatialIndex(GridH2Table tbl, String idxName, IndexColumn... cols) {
+ this(tbl, idxName, 1, cols);
+ }
+
+ /**
+ * @param tbl Table.
+ * @param idxName Index name.
+ * @param segmentsCnt Index segments count.
+ * @param cols Columns.
+ */
+ public GridH2SpatialIndex(GridH2Table tbl, String idxName, int segmentsCnt, IndexColumn... cols) {
if (cols.length > 1)
throw DbException.getUnsupportedException("can only do one column");
@@ -107,7 +116,11 @@ public class GridH2SpatialIndex extends GridH2IndexBase implements SpatialIndex
// Index in memory
store = MVStore.open(null);
- treeMap = store.openMap("spatialIndex", new MVRTreeMap.Builder<Long>());
+
+ segments = new MVRTreeMap[segmentsCnt];
+
+ for (int i = 0; i < segmentsCnt; i++)
+ segments[i] = store.openMap("spatialIndex-" + i, new MVRTreeMap.Builder<Long>());
}
/**
@@ -119,6 +132,11 @@ public class GridH2SpatialIndex extends GridH2IndexBase implements SpatialIndex
}
/** {@inheritDoc} */
+ @Override protected int segmentsCount() {
+ return segments.length;
+ }
+
+ /** {@inheritDoc} */
@Nullable @Override protected Object doTakeSnapshot() {
return null; // TODO We do not support snapshots, but probably this is possible.
}
@@ -140,20 +158,26 @@ public class GridH2SpatialIndex extends GridH2IndexBase implements SpatialIndex
Long rowId = keyToId.get(key);
+ int seg;
+
if (rowId != null) {
- Long oldRowId = treeMap.remove(getEnvelope(idToRow.get(rowId), rowId));
+ seg = segmentForRowID(rowId);
+
+ Long oldRowId = segments[seg].remove(getEnvelope(idToRow.get(rowId), rowId));
assert rowId.equals(oldRowId);
}
else {
rowId = ++rowIds;
+ seg = segmentForRowID(rowId);
+
keyToId.put(key, rowId);
}
GridH2Row old = idToRow.put(rowId, row);
- treeMap.put(getEnvelope(row, rowId), rowId);
+ segments[seg].put(getEnvelope(row, rowId), rowId);
if (old == null)
rowCnt++; // No replace.
@@ -166,6 +190,17 @@ public class GridH2SpatialIndex extends GridH2IndexBase implements SpatialIndex
}
/**
+ * @param id Row ID.
+ *
+ * @return Segment ID for given row ID.
+ */
+ private int segmentForRowID(Long id) {
+ assert id != null;
+
+ return (int)(id % segmentsCount());
+ }
+
+ /**
* @param row Row.
* @param rowId Row id.
* @return Envelope.
@@ -200,7 +235,9 @@ public class GridH2SpatialIndex extends GridH2IndexBase implements SpatialIndex
assert oldRow != null;
- if (!treeMap.remove(getEnvelope(row, rowId), rowId))
+ int seg = segmentForRowID(rowId);
+
+ if (!segments[seg].remove(getEnvelope(row, rowId), rowId))
throw DbException.throwInternalError("row not found");
rowCnt--;
@@ -258,7 +295,11 @@ public class GridH2SpatialIndex extends GridH2IndexBase implements SpatialIndex
try {
checkClosed();
- return new GridH2Cursor(rowIterator(treeMap.keySet().iterator(), filter));
+ final int seg = threadLocalSegment();
+
+ final MVRTreeMap<Long> segment = segments[seg];
+
+ return new GridH2Cursor(rowIterator(segment.keySet().iterator(), filter));
}
finally {
l.unlock();
@@ -305,7 +346,11 @@ public class GridH2SpatialIndex extends GridH2IndexBase implements SpatialIndex
if (!first)
throw DbException.throwInternalError("Spatial Index can only be fetch by ascending order");
- Iterator<GridH2Row> iter = rowIterator(treeMap.keySet().iterator(), null);
+ final int seg = threadLocalSegment();
+
+ final MVRTreeMap<Long> segment = segments[seg];
+
+ Iterator<GridH2Row> iter = rowIterator(segment.keySet().iterator(), null);
return new SingleRowCursor(iter.hasNext() ? iter.next() : null);
}
@@ -334,7 +379,11 @@ public class GridH2SpatialIndex extends GridH2IndexBase implements SpatialIndex
if (intersection == null)
return find(filter.getSession(), null, null);
- return new GridH2Cursor(rowIterator(treeMap.findIntersectingKeys(getEnvelope(intersection, 0)), filter));
+ final int seg = threadLocalSegment();
+
+ final MVRTreeMap<Long> segment = segments[seg];
+
+ return new GridH2Cursor(rowIterator(segment.findIntersectingKeys(getEnvelope(intersection, 0)), filter));
}
finally {
l.unlock();
http://git-wip-us.apache.org/repos/asf/ignite/blob/e69d3c34/modules/geospatial/src/test/java/org/apache/ignite/internal/processors/query/h2/GridH2IndexingSegmentedGeoSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/geospatial/src/test/java/org/apache/ignite/internal/processors/query/h2/GridH2IndexingSegmentedGeoSelfTest.java b/modules/geospatial/src/test/java/org/apache/ignite/internal/processors/query/h2/GridH2IndexingSegmentedGeoSelfTest.java
new file mode 100644
index 0000000..b806321
--- /dev/null
+++ b/modules/geospatial/src/test/java/org/apache/ignite/internal/processors/query/h2/GridH2IndexingSegmentedGeoSelfTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2;
+
+import org.apache.ignite.configuration.CacheConfiguration;
+
+/**
+ * Test for segmented geo index.
+ */
+public class GridH2IndexingSegmentedGeoSelfTest extends GridH2IndexingGeoSelfTest {
+ /** {@inheritDoc} */
+ @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception {
+ return super.cacheConfiguration(gridName).setQueryParallelism(7);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/e69d3c34/modules/geospatial/src/test/java/org/apache/ignite/testsuites/GeoSpatialIndexingTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/geospatial/src/test/java/org/apache/ignite/testsuites/GeoSpatialIndexingTestSuite.java b/modules/geospatial/src/test/java/org/apache/ignite/testsuites/GeoSpatialIndexingTestSuite.java
index 1773894..3907b9e 100644
--- a/modules/geospatial/src/test/java/org/apache/ignite/testsuites/GeoSpatialIndexingTestSuite.java
+++ b/modules/geospatial/src/test/java/org/apache/ignite/testsuites/GeoSpatialIndexingTestSuite.java
@@ -20,6 +20,7 @@ package org.apache.ignite.testsuites;
import junit.framework.TestSuite;
import org.apache.ignite.internal.processors.query.h2.GridBinaryH2IndexingGeoSelfTest;
import org.apache.ignite.internal.processors.query.h2.GridH2IndexingGeoSelfTest;
+import org.apache.ignite.internal.processors.query.h2.GridH2IndexingSegmentedGeoSelfTest;
/**
* Geospatial indexing tests.
@@ -35,6 +36,7 @@ public class GeoSpatialIndexingTestSuite extends TestSuite {
// Geo.
suite.addTestSuite(GridH2IndexingGeoSelfTest.class);
suite.addTestSuite(GridBinaryH2IndexingGeoSelfTest.class);
+ suite.addTestSuite(GridH2IndexingSegmentedGeoSelfTest.class);
return suite;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e69d3c34/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 2f40d87..652d1f3 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -2813,7 +2813,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
* @param cols Columns.
*/
private SpatialIndex createH2SpatialIndex(
- Table tbl,
+ GridH2Table tbl,
String idxName,
IndexColumn[] cols
) {
@@ -2823,14 +2823,17 @@ public class IgniteH2Indexing implements GridQueryIndexing {
Class<?> cls = Class.forName(className);
Constructor<?> ctor = cls.getConstructor(
- Table.class,
+ GridH2Table.class,
String.class,
+ Integer.TYPE,
IndexColumn[].class);
if (!ctor.isAccessible())
ctor.setAccessible(true);
- return (SpatialIndex)ctor.newInstance(tbl, idxName, cols);
+ final int segments = tbl.rowDescriptor().context().config().getQueryParallelism();
+
+ return (SpatialIndex)ctor.newInstance(tbl, idxName, segments, cols);
}
catch (Exception e) {
throw new IgniteException("Failed to instantiate: " + className, e);
@@ -2845,12 +2848,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
* @return
*/
private Index createTreeIndex(String idxName, GridH2Table tbl, boolean pk, List<IndexColumn> columns) {
- GridCacheContext<?, ?> cctx = tbl.rowDescriptor().context();
-
- if (cctx != null && cctx.config().getQueryParallelism() > 1)
- return new GridH2TreeIndex(idxName, tbl, pk, columns, cctx.config().getQueryParallelism());
+ final int segments = tbl.rowDescriptor().context().config().getQueryParallelism();
- return new GridH2TreeIndex(idxName, tbl, pk, columns, 1);
+ return new GridH2TreeIndex(idxName, tbl, pk, columns, segments);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e69d3c34/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
index 131e03b..89d661d 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
@@ -183,7 +183,15 @@ public abstract class GridH2IndexBase extends BaseIndex {
* @return Index segment ID for current query context.
*/
protected int threadLocalSegment() {
- return 0;
+ GridH2QueryContext qctx = GridH2QueryContext.get();
+
+ if(segmentsCount() == 1)
+ return 0;
+
+ if(qctx == null)
+ throw new IllegalStateException("GridH2QueryContext is not initialized.");
+
+ return qctx.segment();
}
/**
@@ -669,7 +677,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
throw new GridH2RetryException("Failed to find node.");
}
- return new SegmentKey(node, segment(partition));
+ return new SegmentKey(node, segmentForPartition(partition));
}
/** */
@@ -829,19 +837,16 @@ public abstract class GridH2IndexBase extends BaseIndex {
}
/** @return Index segments count. */
- protected int segmentsCount() {
- return 1;
- }
+ protected abstract int segmentsCount();
/**
* @param partition Partition idx.
* @return Segment ID for given key
*/
- protected int segment(int partition) {
- return 0;
+ protected int segmentForPartition(int partition){
+ return segmentsCount() == 1 ? 0 : (partition % segmentsCount());
}
-
/**
* Simple cursor from a single node.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/e69d3c34/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
index 0829df0..64ca9ea 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
@@ -50,7 +50,6 @@ import org.jetbrains.annotations.Nullable;
*/
@SuppressWarnings("ComparatorNotSerializable")
public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridSearchRowPointer> {
-
/** */
private static Field KEY_FIELD;
@@ -224,20 +223,6 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
super.destroy();
}
-
- /** {@inheritDoc} */
- protected int threadLocalSegment() {
- GridH2QueryContext qctx = GridH2QueryContext.get();
-
- if(segments.length == 1)
- return 0;
-
- if(qctx == null)
- throw new IllegalStateException("GridH2QueryContext is not initialized.");
-
- return qctx.segment();
- }
-
/** {@inheritDoc} */
@Override public long getRowCount(@Nullable Session ses) {
IndexingQueryFilter f = threadLocalFilter();
@@ -433,7 +418,7 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
/** {@inheritDoc} */
@Override public GridH2Row put(GridH2Row row) {
- int seg = segment(row);
+ int seg = segmentForRow(row);
return segments[seg].put(row, row);
}
@@ -442,7 +427,7 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
@Override public GridH2Row remove(SearchRow row) {
GridSearchRowPointer comparable = comparable(row, 0);
- int seg = segment(row);
+ int seg = segmentForRow(row);
return segments[seg].remove(comparable);
}
@@ -453,18 +438,10 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
}
/**
- * @param partition Parttition idx.
- * @return index currentSegment Id for given key
- */
- protected int segment(int partition) {
- return partition % segments.length;
- }
-
- /**
- * @param row
- * @return index currentSegment Id for given row
+ * @param row Table row.
+ * @return Segment ID for given row.
*/
- private int segment(SearchRow row) {
+ private int segmentForRow(SearchRow row) {
assert row != null;
CacheObject key;
@@ -491,7 +468,7 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
else
key = ctx.toCacheKeyObject(row.getValue(0));
- return segment(ctx.affinity().partition(key));
+ return segmentForPartition(ctx.affinity().partition(key));
}
else
return 0;
[08/12] ignite git commit: Minor. .
Posted by se...@apache.org.
Minor. .
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f4d36cb6
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f4d36cb6
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f4d36cb6
Branch: refs/heads/ignite-1.9
Commit: f4d36cb683b82bc0d059f183a586be4662818bbc
Parents: 2e90a07
Author: Andrey V. Mashenkov <an...@gmail.com>
Authored: Tue Feb 21 15:29:52 2017 +0300
Committer: Andrey V. Mashenkov <an...@gmail.com>
Committed: Tue Feb 21 15:29:52 2017 +0300
----------------------------------------------------------------------
.../internal/processors/query/h2/sql/GridSqlQuerySplitter.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/f4d36cb6/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
index 165b6b8..779e565 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
@@ -1628,7 +1628,7 @@ public class GridSqlQuerySplitter {
findParams(((GridSqlSubquery)el).subquery(), params, target, paramIdxs);
else {
for (int i = 0; i < el.size(); i++)
- findParams(el.child(i), params, target, paramIdxs);
+ findParams((GridSqlAst)el.child(i), params, target, paramIdxs);
}
}
[04/12] ignite git commit: Minor
Posted by se...@apache.org.
Minor
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f684c00a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f684c00a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f684c00a
Branch: refs/heads/ignite-1.9
Commit: f684c00affa25d67557b382e6ffaf2d1de45c964
Parents: e69d3c3
Author: Andrey V. Mashenkov <an...@gmail.com>
Authored: Tue Feb 21 12:00:41 2017 +0300
Committer: Andrey V. Mashenkov <an...@gmail.com>
Committed: Tue Feb 21 12:00:41 2017 +0300
----------------------------------------------------------------------
.../ignite/internal/processors/query/h2/IgniteH2Indexing.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/f684c00a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 652d1f3..de48eb9 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -2831,7 +2831,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
if (!ctor.isAccessible())
ctor.setAccessible(true);
- final int segments = tbl.rowDescriptor().context().config().getQueryParallelism();
+ final int segments = tbl.rowDescriptor().configuration().getQueryParallelism();
return (SpatialIndex)ctor.newInstance(tbl, idxName, segments, cols);
}
@@ -2848,7 +2848,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
* @return
*/
private Index createTreeIndex(String idxName, GridH2Table tbl, boolean pk, List<IndexColumn> columns) {
- final int segments = tbl.rowDescriptor().context().config().getQueryParallelism();
+ final int segments = tbl.rowDescriptor().configuration().getQueryParallelism();
return new GridH2TreeIndex(idxName, tbl, pk, columns, segments);
}
[05/12] ignite git commit: Merge remote-tracking branch
'apache/ignite-1.9' into ignite-4106-1.9
Posted by se...@apache.org.
Merge remote-tracking branch 'apache/ignite-1.9' into ignite-4106-1.9
# Conflicts:
# modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
# modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
# modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
# modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
# modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
# modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5b774f6d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5b774f6d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5b774f6d
Branch: refs/heads/ignite-1.9
Commit: 5b774f6d9dc7faa60a95b1206d30115b0ef67d16
Parents: f684c00 166e65c
Author: Andrey V. Mashenkov <an...@gmail.com>
Authored: Tue Feb 21 12:10:38 2017 +0300
Committer: Andrey V. Mashenkov <an...@gmail.com>
Committed: Tue Feb 21 12:10:38 2017 +0300
----------------------------------------------------------------------
.../examples/java8/spark/SharedRDDExample.java | 4 +-
.../jdbc2/JdbcAbstractDmlStatementSelfTest.java | 49 +-
.../jdbc2/JdbcInsertStatementSelfTest.java | 51 +
.../jdbc2/JdbcMergeStatementSelfTest.java | 51 +
.../internal/jdbc2/JdbcStreamingSelfTest.java | 189 ++
.../jdbc2/JdbcUpdateStatementSelfTest.java | 50 +
.../jdbc/suite/IgniteJdbcDriverTestSuite.java | 1 +
.../org/apache/ignite/IgniteJdbcDriver.java | 30 +
.../apache/ignite/IgniteSystemProperties.java | 6 +
.../ignite/internal/jdbc2/JdbcConnection.java | 72 +-
.../internal/jdbc2/JdbcPreparedStatement.java | 34 +-
.../ignite/internal/jdbc2/JdbcStatement.java | 20 +-
.../jdbc2/JdbcStreamedPreparedStatement.java | 59 +
.../cache/query/GridCacheSqlQuery.java | 45 +-
.../processors/query/GridQueryIndexing.java | 35 +
.../processors/query/GridQueryProcessor.java | 63 +-
.../visor/cache/VisorCacheClearTask.java | 88 +-
.../visor/compute/VisorGatewayTask.java | 30 +-
.../junits/multijvm/IgniteProcessProxy.java | 5 +-
.../query/h2/DmlStatementsProcessor.java | 267 ++-
.../processors/query/h2/IgniteH2Indexing.java | 160 +-
.../query/h2/dml/UpdatePlanBuilder.java | 2 +-
.../query/h2/opt/GridH2CollocationModel.java | 78 +-
.../query/h2/opt/GridH2IndexBase.java | 47 +-
.../processors/query/h2/sql/DmlAstUtils.java | 46 +-
.../processors/query/h2/sql/GridSqlAlias.java | 13 +-
.../processors/query/h2/sql/GridSqlArray.java | 8 +-
.../processors/query/h2/sql/GridSqlAst.java | 61 +
.../processors/query/h2/sql/GridSqlColumn.java | 85 +-
.../processors/query/h2/sql/GridSqlConst.java | 6 +-
.../processors/query/h2/sql/GridSqlElement.java | 43 +-
.../query/h2/sql/GridSqlFunction.java | 16 +-
.../processors/query/h2/sql/GridSqlJoin.java | 35 +-
.../processors/query/h2/sql/GridSqlKeyword.java | 3 +-
.../query/h2/sql/GridSqlOperation.java | 6 +-
.../query/h2/sql/GridSqlOperationType.java | 4 +-
.../query/h2/sql/GridSqlParameter.java | 4 +-
.../query/h2/sql/GridSqlPlaceholder.java | 2 +-
.../processors/query/h2/sql/GridSqlQuery.java | 80 +-
.../query/h2/sql/GridSqlQueryParser.java | 228 ++-
.../query/h2/sql/GridSqlQuerySplitter.java | 1634 +++++++++++++++---
.../processors/query/h2/sql/GridSqlSelect.java | 121 +-
.../query/h2/sql/GridSqlStatement.java | 6 +-
.../query/h2/sql/GridSqlSubquery.java | 31 +-
.../processors/query/h2/sql/GridSqlTable.java | 19 +-
.../processors/query/h2/sql/GridSqlType.java | 6 +-
.../processors/query/h2/sql/GridSqlUnion.java | 66 +-
.../processors/query/h2/sql/GridSqlValue.java | 25 -
.../query/h2/twostep/GridMapQueryExecutor.java | 6 +-
.../query/h2/twostep/GridMergeIndex.java | 418 ++++-
.../query/h2/twostep/GridMergeIndexSorted.java | 284 +++
.../h2/twostep/GridMergeIndexUnsorted.java | 40 +-
.../h2/twostep/GridReduceQueryExecutor.java | 56 +-
.../h2/twostep/msg/GridH2QueryRequest.java | 9 +-
.../IgniteCacheAbstractFieldsQuerySelfTest.java | 2 +-
...niteCacheAbstractInsertSqlQuerySelfTest.java | 6 +-
.../IgniteCacheInsertSqlQuerySelfTest.java | 14 +
.../query/IgniteSqlSplitterSelfTest.java | 100 +-
.../query/h2/sql/GridQueryParsingTest.java | 41 +-
modules/spark-2.10/pom.xml | 54 +
modules/spark/pom.xml | 183 +-
.../org/apache/ignite/spark/IgniteContext.scala | 22 +-
.../spark/JavaEmbeddedIgniteRDDSelfTest.java | 10 +-
.../spark/JavaStandaloneIgniteRDDSelfTest.java | 22 +-
.../web-console/backend/services/activities.js | 7 +-
.../list-of-registered-users.controller.js | 14 +-
.../configuration/configuration.module.js | 6 +-
.../configuration/summary/summary.worker.js | 28 +-
.../console/agent/handlers/RestListener.java | 9 +-
modules/yardstick/DEVNOTES-standalone.txt | 5 +-
modules/yardstick/README.txt | 85 +-
.../config/benchmark-atomic-win.properties | 2 +-
.../config/benchmark-atomic.properties | 35 +-
.../config/benchmark-bin-identity.properties | 16 +-
.../config/benchmark-cache-load-win.properties | 2 +-
.../config/benchmark-cache-load.properties | 4 +-
.../config/benchmark-client-mode.properties | 68 +-
.../config/benchmark-compute-win.properties | 2 +-
.../config/benchmark-compute.properties | 30 +-
.../config/benchmark-failover.properties | 2 +-
.../yardstick/config/benchmark-full.properties | 62 +-
.../config/benchmark-multicast.properties | 107 +-
.../config/benchmark-put-indexed-val.properties | 23 +-
.../benchmark-query-put-separated.properties | 3 +-
.../config/benchmark-query-win.properties | 2 +-
.../yardstick/config/benchmark-query.properties | 33 +-
.../config/benchmark-remote-sample.properties | 80 +
.../config/benchmark-remote.properties | 119 ++
.../config/benchmark-sample.properties | 22 +-
.../config/benchmark-sql-dml.properties | 36 +-
.../yardstick/config/benchmark-store.properties | 3 +-
.../config/benchmark-tx-win.properties | 2 +-
.../yardstick/config/benchmark-tx.properties | 33 +-
.../yardstick/config/benchmark-win.properties | 2 +-
modules/yardstick/config/benchmark.properties | 76 +-
.../yardstick/config/ignite-remote-config.xml | 47 +
.../test-max-int-values-offheap.properties | 3 +-
.../test-max-int-values-onheap.properties | 3 +-
.../config/test-max-int-values-swap.properties | 3 +-
modules/yardstick/pom-standalone.xml | 2 +-
modules/yardstick/pom.xml | 2 +-
parent/pom.xml | 3 +-
pom.xml | 5 +-
103 files changed, 4874 insertions(+), 1363 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/5b774f6d/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
index 37f0ade,2abb3a9..5d2b728
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
@@@ -84,26 -85,48 +85,39 @@@ public interface GridQueryIndexing
/**
* Queries individual fields (generally used by JDBC drivers).
*
- * @param spaceName Space name.
+ * @param cctx Cache context.
* @param qry Query.
- * @param params Query parameters.
* @param filter Space name and key filter.
- * @param enforceJoinOrder Enforce join order of tables in the query.
- * @param timeout Query timeout in milliseconds.
* @param cancel Query cancel.
- * @return Query result.
- * @throws IgniteCheckedException If failed.
+ * @return Cursor.
*/
- public GridQueryFieldsResult queryLocalSqlFields(@Nullable String spaceName, String qry,
- Collection<Object> params, IndexingQueryFilter filter, boolean enforceJoinOrder, int timeout,
- GridQueryCancel cancel) throws IgniteCheckedException;
+ public <K, V> QueryCursor<List<?>> queryLocalSqlFields(GridCacheContext<?, ?> cctx, SqlFieldsQuery qry,
+ IndexingQueryFilter filter, GridQueryCancel cancel) throws IgniteCheckedException;
/**
+ * Perform a MERGE statement using data streamer as receiver.
+ *
+ * @param spaceName Space name.
+ * @param qry Query.
+ * @param params Query parameters.
+ * @param streamer Data streamer to feed data to.
+ * @return Query result.
+ * @throws IgniteCheckedException If failed.
+ */
+ public long streamUpdateQuery(@Nullable final String spaceName, final String qry,
+ @Nullable final Object[] params, IgniteDataStreamer<?, ?> streamer) throws IgniteCheckedException;
+
+ /**
* Executes regular query.
*
- * @param spaceName Space name.
+ * @param cctx Cache context.
* @param qry Query.
- * @param alias Table alias used in Query.
- * @param params Query parameters.
- * @param type Query return type.
* @param filter Space name and key filter.
- * @return Queried rows.
- * @throws IgniteCheckedException If failed.
+ * @param keepBinary Keep binary flag.
+ * @return Cursor.
*/
- public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> queryLocalSql(@Nullable String spaceName, String qry,
- String alias, Collection<Object> params, GridQueryTypeDescriptor type, IndexingQueryFilter filter)
- throws IgniteCheckedException;
+ public <K, V> QueryCursor<Cache.Entry<K,V>> queryLocalSql(GridCacheContext<?, ?> cctx, SqlQuery qry,
+ IndexingQueryFilter filter, boolean keepBinary) throws IgniteCheckedException;
/**
* Executes text query.
http://git-wip-us.apache.org/repos/asf/ignite/blob/5b774f6d/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/5b774f6d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --cc modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index de48eb9,15e7fc6..057fd10
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@@ -93,8 -91,8 +94,9 @@@ import org.apache.ignite.internal.proce
import org.apache.ignite.internal.processors.query.GridQueryIndexing;
import org.apache.ignite.internal.processors.query.GridQueryProperty;
import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
+ import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2DefaultTableEngine;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOffheap;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOnheap;
@@@ -1352,9 -1281,7 +1400,9 @@@ public class IgniteH2Indexing implement
final boolean distributedJoins = qry.isDistributedJoins() && cctx.isPartitioned();
final boolean grpByCollocated = qry.isCollocated();
+ final DistributedJoinMode distributedJoinMode = distributedJoinMode(qry.isLocal(), distributedJoins);
+
- GridCacheTwoStepQuery twoStepQry;
+ GridCacheTwoStepQuery twoStepQry = null;
List<GridQueryFieldMetadata> meta;
final TwoStepCachedQueryKey cachedQryKey = new TwoStepCachedQueryKey(space, sqlQry, grpByCollocated,
@@@ -1368,12 -1295,13 +1416,13 @@@
else {
final UUID locNodeId = ctx.localNodeId();
- setupConnection(c, distributedJoins, enforceJoinOrder);
+ // Here we will just parse the statement, no need to optimize it at all.
+ setupConnection(c, /*distributedJoins*/false, /*enforceJoinOrder*/true);
GridH2QueryContext.set(new GridH2QueryContext(locNodeId, locNodeId, 0, PREPARE)
- .distributedJoins(distributedJoins));
+ .distributedJoinMode(distributedJoinMode));
- PreparedStatement stmt;
+ PreparedStatement stmt = null;
boolean cachesCreated = false;
http://git-wip-us.apache.org/repos/asf/ignite/blob/5b774f6d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
----------------------------------------------------------------------
diff --cc modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
index 89d661d,bdfddd5..31057c7
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
@@@ -262,14 -245,15 +262,15 @@@ public abstract class GridH2IndexBase e
* @param filter Current filter.
* @return Multiplier.
*/
- public int getDistributedMultiplier(Session ses, TableFilter[] filters, int filter) {
+ public final int getDistributedMultiplier(Session ses, TableFilter[] filters, int filter) {
GridH2QueryContext qctx = GridH2QueryContext.get();
- // We do complex optimizations with respect to distributed joins only on prepare stage
- // because on run stage reordering of joined tables by Optimizer is explicitly disabled
- // and thus multiplier will be always the same, so it will not affect choice of index.
+ // We do optimizations with respect to distributed joins only on PREPARE stage only.
+ // Notice that we check for isJoinBatchEnabled, because we can do multiple different
+ // optimization passes on PREPARE stage.
// Query expressions can not be distributed as well.
- if (qctx == null || qctx.type() != PREPARE || qctx.distributedJoinMode() == OFF || ses.isPreparingQueryExpression())
- if (qctx == null || qctx.type() != PREPARE || !qctx.distributedJoins() ||
++ if (qctx == null || qctx.type() != PREPARE || qctx.distributedJoinMode() == OFF ||
+ !ses.isJoinBatchEnabled() || ses.isPreparingQueryExpression())
return GridH2CollocationModel.MULTIPLIER_COLLOCATED;
// We have to clear this cache because normally sub-query plan cost does not depend on anything
@@@ -391,18 -375,16 +392,18 @@@
if (affCol != null) {
affColId = affCol.column.getColumnId();
int[] masks = filter.getMasks();
- ucast = masks != null && masks[affColId] == IndexCondition.EQUALITY;
- }
- else {
- affColId = -1;
- ucast = false;
+
+ if (masks != null) {
+ ucast = (masks[affColId] & IndexCondition.EQUALITY) != 0 ||
+ (masks[KEY_COL] & IndexCondition.EQUALITY) != 0;
+ }
}
- GridCacheContext<?,?> cctx = getTable().rowDescriptor().context();
+ GridCacheContext<?, ?> cctx = getTable().rowDescriptor().context();
- return new DistributedLookupBatch(cctx, ucast, affColId);
+ boolean isLocal = qctx.distributedJoinMode() == LOCAL_ONLY;
+
+ return new DistributedLookupBatch(cctx, ucast, affColId, isLocal);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/5b774f6d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --cc modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 5027c9a,b0fa639..72a34a6
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@@ -86,8 -84,7 +86,9 @@@ import static org.apache.ignite.events.
import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ;
import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE;
import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
+ import static org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing.setupConnection;
+import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF;
+import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.distributedJoinMode;
import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.MAP;
import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.REPLICATED;
import static org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExecutor.QUERY_POOL;
@@@ -433,7 -435,8 +434,8 @@@ public class GridMapQueryExecutor
req.partitions(),
null,
req.pageSize(),
- false,
+ OFF,
+ true,
req.timeout());
}
@@@ -491,7 -457,8 +493,8 @@@
parts,
req.tables(),
req.pageSize(),
- req.isFlagSet(GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS),
+ joinMode,
+ req.isFlagSet(GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER),
req.timeout());
}
@@@ -519,7 -484,8 +522,8 @@@
int[] parts,
Collection<String> tbls,
int pageSize,
- boolean distributedJoins,
+ DistributedJoinMode distributedJoinMode,
+ boolean enforceJoinOrder,
int timeout
) {
// Prepare to run queries.
@@@ -580,8 -545,7 +584,8 @@@
Connection conn = h2.connectionForSpace(mainCctx.name());
- setupConnection(conn, distributedJoins, enforceJoinOrder);
+ // Here we enforce join order to have the same behavior on all the nodes.
- h2.setupConnection(conn, distributedJoinMode != OFF, true);
++ h2.setupConnection(conn, distributedJoinMode != OFF, enforceJoinOrder);
GridH2QueryContext.set(qctx);
http://git-wip-us.apache.org/repos/asf/ignite/blob/5b774f6d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/5b774f6d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --cc modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 8837046,f813cae..3cfaae9
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@@ -62,12 -62,11 +62,11 @@@ import org.apache.ignite.internal.proce
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
-import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator;
import org.apache.ignite.internal.processors.query.GridQueryCancel;
+import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext;
- import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQuerySplitter;
import org.apache.ignite.internal.processors.query.h2.sql.GridSqlType;
import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryCancelRequest;
import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryFailResponse;
@@@ -100,8 -99,9 +99,10 @@@ import org.jsr166.ConcurrentHashMap8
import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE;
import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL_FIELDS;
+ import static org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing.setupConnection;
+import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF;
import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.REDUCE;
+ import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlQuerySplitter.mergeTableIdentifier;
/**
* Reduce query executor.
@@@ -616,6 -610,12 +618,15 @@@ public class GridReduceQueryExecutor
if (oldStyle && distributedJoins)
throw new CacheException("Failed to enable distributed joins. Topology contains older data nodes.");
+ // Always enforce join order on map side to have consistent behavior.
+ int flags = GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER;
+
+ if (distributedJoins)
+ flags |= GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS;
+
++ if(qry.isLocal())
++ flags |= GridH2QueryRequest.FLAG_IS_LOCAL;
++
if (send(nodes,
oldStyle ?
new GridQueryRequest(qryReqId,
@@@ -634,13 -634,14 +645,12 @@@
.tables(distributedJoins ? qry.tables() : null)
.partitions(convert(partsMap))
.queries(mapQrys)
- .flags((qry.isLocal() ? GridH2QueryRequest.FLAG_IS_LOCAL : 0) |
- (distributedJoins ? GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS : 0))
+ .flags(flags)
.timeout(timeoutMillis),
oldStyle && partsMap != null ? new ExplicitPartitionsSpecializer(partsMap) : null,
- distributedJoins)
- ) {
- awaitAllReplies(r, nodes);
+ false)) {
- cancel.checkCancelled();
+ awaitAllReplies(r, nodes, cancel);
Object state = r.state.get();
@@@ -700,10 -700,10 +709,10 @@@
UUID locNodeId = ctx.localNodeId();
- h2.setupConnection(r.conn, false, enforceJoinOrder);
+ setupConnection(r.conn, false, enforceJoinOrder);
GridH2QueryContext.set(new GridH2QueryContext(locNodeId, locNodeId, qryReqId, REDUCE)
- .pageSize(r.pageSize).distributedJoins(false));
+ .pageSize(r.pageSize).distributedJoinMode(OFF));
try {
if (qry.explain())
http://git-wip-us.apache.org/repos/asf/ignite/blob/5b774f6d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
----------------------------------------------------------------------
diff --cc modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
index ec49aff,e5dbf33..0ad534c
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
@@@ -51,10 -51,10 +51,15 @@@ public class GridH2QueryRequest impleme
public static int FLAG_DISTRIBUTED_JOINS = 1;
/**
+ * Remote map query executor will enforce join order for the received map queries.
+ */
+ public static int FLAG_ENFORCE_JOIN_ORDER = 1 << 1;
+
++ /**
+ * Restrict distributed joins range-requests to local index segments. Range requests to other nodes will not be sent.
+ */
- public static int FLAG_IS_LOCAL = 2;
++ public static int FLAG_IS_LOCAL = 1 << 2;
+
/** */
private long reqId;
http://git-wip-us.apache.org/repos/asf/ignite/blob/5b774f6d/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
----------------------------------------------------------------------
diff --cc modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
index 4ae2f91,432ed34..37dea47
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
@@@ -34,9 -34,8 +34,10 @@@ import org.apache.ignite.cache.CacheKey
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cache.affinity.Affinity;
+ import org.apache.ignite.cache.affinity.AffinityKeyMapped;
import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.affinity.AffinityKeyMapped;
+import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
import org.apache.ignite.cluster.ClusterNode;
[12/12] ignite git commit: Merge branch 'ignite-4106-1.9' of
https://github.com/gridgain/apache-ignite into ignite-1.9
Posted by se...@apache.org.
Merge branch 'ignite-4106-1.9' of https://github.com/gridgain/apache-ignite into ignite-1.9
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/658b4ad5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/658b4ad5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/658b4ad5
Branch: refs/heads/ignite-1.9
Commit: 658b4ad5a3bf17fcb3fb9f364e47b4e28f5eea64
Parents: 963a9d9 4dc3694
Author: Sergi Vladykin <se...@gmail.com>
Authored: Wed Feb 22 13:17:43 2017 +0300
Committer: Sergi Vladykin <se...@gmail.com>
Committed: Wed Feb 22 13:17:43 2017 +0300
----------------------------------------------------------------------
.../configuration/CacheConfiguration.java | 48 +++
.../processors/cache/GridCacheProcessor.java | 3 +
.../processors/cache/IgniteCacheProxy.java | 6 +-
.../closure/GridClosureProcessor.java | 2 +-
.../processors/query/GridQueryIndexing.java | 27 +-
.../processors/query/GridQueryProcessor.java | 141 ++-----
.../messages/GridQueryNextPageRequest.java | 29 +-
.../messages/GridQueryNextPageResponse.java | 29 +-
.../query/h2/opt/GridH2SpatialIndex.java | 56 ++-
.../query/h2/GridH2IndexingGeoSelfTest.java | 405 ++++++++++++++-----
.../h2/GridH2IndexingSegmentedGeoSelfTest.java | 37 ++
.../testsuites/GeoSpatialIndexingTestSuite.java | 2 +
.../cache/query/GridCacheTwoStepQuery.java | 17 +
.../processors/query/h2/IgniteH2Indexing.java | 238 +++++++++--
.../query/h2/opt/DistributedJoinMode.java | 51 +++
.../query/h2/opt/GridH2IndexBase.java | 303 ++++++++++----
.../query/h2/opt/GridH2QueryContext.java | 84 +++-
.../query/h2/opt/GridH2TreeIndex.java | 151 ++++---
.../query/h2/sql/GridSqlQuerySplitter.java | 2 +-
.../query/h2/twostep/GridMapQueryExecutor.java | 233 ++++++++---
.../query/h2/twostep/GridMergeIndex.java | 39 +-
.../query/h2/twostep/GridMergeIndexSorted.java | 4 +-
.../h2/twostep/GridReduceQueryExecutor.java | 69 ++--
.../h2/twostep/msg/GridH2IndexRangeRequest.java | 60 ++-
.../twostep/msg/GridH2IndexRangeResponse.java | 62 ++-
.../h2/twostep/msg/GridH2QueryRequest.java | 5 +
.../query/IgniteSqlSegmentedIndexSelfTest.java | 263 ++++++++++++
.../query/IgniteSqlSplitterSelfTest.java | 136 ++++++-
.../h2/GridIndexingSpiAbstractSelfTest.java | 26 +-
.../FetchingQueryCursorStressTest.java | 277 +++++++++++++
.../IgniteCacheQuerySelfTestSuite.java | 2 +
31 files changed, 2262 insertions(+), 545 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/658b4ad5/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/658b4ad5/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
----------------------------------------------------------------------