You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2017/02/22 10:18:06 UTC

[01/12] ignite git commit: Implemented.

Repository: ignite
Updated Branches:
  refs/heads/ignite-1.9 963a9d9fe -> 658b4ad5a


http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
index 914e0da..0829df0 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
@@ -17,12 +17,15 @@
 
 package org.apache.ignite.internal.processors.query.h2.opt;
 
+import java.lang.reflect.Field;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.NavigableMap;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.util.GridEmptyIterator;
 import org.apache.ignite.internal.util.offheap.unsafe.GridOffHeapSnapTreeMap;
 import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeGuard;
@@ -43,18 +46,36 @@ import org.h2.value.Value;
 import org.jetbrains.annotations.Nullable;
 
 /**
- * Base class for snapshotable tree indexes.
+ * Base class for snapshotable segmented tree indexes.
  */
 @SuppressWarnings("ComparatorNotSerializable")
 public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridSearchRowPointer> {
+
     /** */
-    private final ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> tree;
+    private static Field KEY_FIELD;
+
+    /** */
+    static {
+        try {
+            KEY_FIELD = GridH2AbstractKeyValueRow.class.getDeclaredField("key");
+            KEY_FIELD.setAccessible(true);
+        }
+        catch (NoSuchFieldException e) {
+            KEY_FIELD = null;
+        }
+    }
+
+    /** Index segments. */
+    private final ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row>[] segments;
 
     /** */
     private final boolean snapshotEnabled;
 
+    /** */
+    private final GridH2RowDescriptor desc;
+
     /**
-     * Constructor with index initialization.
+     * Constructor with index initialization. Creates index with single segment.
      *
      * @param name Index name.
      * @param tbl Table.
@@ -63,35 +84,59 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
      */
     @SuppressWarnings("unchecked")
     public GridH2TreeIndex(String name, GridH2Table tbl, boolean pk, List<IndexColumn> colsList) {
+        this(name, tbl, pk, colsList, 1);
+    }
+
+    /**
+     * Constructor with index initialization.
+     *
+     * @param name Index name.
+     * @param tbl Table.
+     * @param pk If this index is primary key.
+     * @param colsList Index columns list.
+     * @param segmentsCnt Number of segments.
+     */
+    @SuppressWarnings("unchecked")
+    public GridH2TreeIndex(String name, GridH2Table tbl, boolean pk, List<IndexColumn> colsList, int segmentsCnt) {
+        assert segmentsCnt > 0 : segmentsCnt;
+
         IndexColumn[] cols = colsList.toArray(new IndexColumn[colsList.size()]);
 
         IndexColumn.mapColumns(cols, tbl);
 
+        desc = tbl.rowDescriptor();
+
         initBaseIndex(tbl, 0, name, cols,
             pk ? IndexType.createPrimaryKey(false, false) : IndexType.createNonUnique(false, false, false));
 
+        segments = new ConcurrentNavigableMap[segmentsCnt];
+
         final GridH2RowDescriptor desc = tbl.rowDescriptor();
 
         if (desc == null || desc.memory() == null) {
             snapshotEnabled = desc == null || desc.snapshotableIndex();
 
             if (snapshotEnabled) {
-                tree = new SnapTreeMap<GridSearchRowPointer, GridH2Row>(this) {
-                    @Override protected void afterNodeUpdate_nl(Node<GridSearchRowPointer, GridH2Row> node, Object val) {
-                        if (val != null)
-                            node.key = (GridSearchRowPointer)val;
-                    }
+                for (int i = 0; i < segmentsCnt; i++) {
+                    segments[i] = new SnapTreeMap<GridSearchRowPointer, GridH2Row>(this) {
+                        @Override
+                        protected void afterNodeUpdate_nl(Node<GridSearchRowPointer, GridH2Row> node, Object val) {
+                            if (val != null)
+                                node.key = (GridSearchRowPointer)val;
+                        }
 
-                    @Override protected Comparable<? super GridSearchRowPointer> comparable(Object key) {
-                        if (key instanceof ComparableRow)
-                            return (Comparable<? super SearchRow>)key;
+                        @Override protected Comparable<? super GridSearchRowPointer> comparable(Object key) {
+                            if (key instanceof ComparableRow)
+                                return (Comparable<? super SearchRow>)key;
 
-                        return super.comparable(key);
-                    }
-                };
+                            return super.comparable(key);
+                        }
+                    };
+                }
             }
             else {
-                tree = new ConcurrentSkipListMap<>(
+                for (int i = 0; i < segmentsCnt; i++) {
+                    segments[i] = new ConcurrentSkipListMap<>(
                         new Comparator<GridSearchRowPointer>() {
                             @Override public int compare(GridSearchRowPointer o1, GridSearchRowPointer o2) {
                                 if (o1 instanceof ComparableRow)
@@ -103,7 +148,8 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
                                 return compareRows(o1, o2);
                             }
                         }
-                );
+                    );
+                }
             }
         }
         else {
@@ -111,28 +157,30 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
 
             snapshotEnabled = true;
 
-            tree = new GridOffHeapSnapTreeMap<GridSearchRowPointer, GridH2Row>(desc, desc, desc.memory(), desc.guard(), this) {
-                @Override protected void afterNodeUpdate_nl(long node, GridH2Row val) {
-                    final long oldKey = keyPtr(node);
+            for (int i = 0; i < segmentsCnt; i++) {
+                segments[i] = new GridOffHeapSnapTreeMap<GridSearchRowPointer, GridH2Row>(desc, desc, desc.memory(), desc.guard(), this) {
+                    @Override protected void afterNodeUpdate_nl(long node, GridH2Row val) {
+                        final long oldKey = keyPtr(node);
 
-                    if (val != null) {
-                        key(node, val);
+                        if (val != null) {
+                            key(node, val);
 
-                        guard.finalizeLater(new Runnable() {
-                            @Override public void run() {
-                                desc.createPointer(oldKey).decrementRefCount();
-                            }
-                        });
+                            guard.finalizeLater(new Runnable() {
+                                @Override public void run() {
+                                    desc.createPointer(oldKey).decrementRefCount();
+                                }
+                            });
+                        }
                     }
-                }
 
-                @Override protected Comparable<? super GridSearchRowPointer> comparable(Object key) {
-                    if (key instanceof ComparableRow)
-                        return (Comparable<? super SearchRow>)key;
+                    @Override protected Comparable<? super GridSearchRowPointer> comparable(Object key) {
+                        if (key instanceof ComparableRow)
+                            return (Comparable<? super SearchRow>)key;
 
-                    return super.comparable(key);
-                }
-            };
+                        return super.comparable(key);
+                    }
+                };
+            }
         }
 
         initDistributedJoinMessaging(tbl);
@@ -142,20 +190,24 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
     @Override protected Object doTakeSnapshot() {
         assert snapshotEnabled;
 
+        int seg = threadLocalSegment();
+
+        ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> tree = segments[seg];
+
         return tree instanceof SnapTreeMap ?
             ((SnapTreeMap)tree).clone() :
             ((GridOffHeapSnapTreeMap)tree).clone();
     }
 
     /** {@inheritDoc} */
-    protected final ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> treeForRead() {
+    protected ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> treeForRead(int seg) {
         if (!snapshotEnabled)
-            return tree;
+            return segments[seg];
 
         ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> res = threadLocalSnapshot();
 
         if (res == null)
-            res = tree;
+            return segments[seg];
 
         return res;
     }
@@ -164,19 +216,37 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
     @Override public void destroy() {
         assert threadLocalSnapshot() == null;
 
-        if (tree instanceof AutoCloseable)
-            U.closeQuiet((AutoCloseable)tree);
+        for (int i = 0; i < segments.length; i++) {
+            if (segments[i] instanceof AutoCloseable)
+                U.closeQuiet((AutoCloseable)segments[i]);
+        }
 
         super.destroy();
     }
 
+
+    /** {@inheritDoc} */
+    protected int threadLocalSegment() {
+        GridH2QueryContext qctx = GridH2QueryContext.get();
+
+        if(segments.length == 1)
+            return 0;
+
+        if(qctx == null)
+            throw new IllegalStateException("GridH2QueryContext is not initialized.");
+
+        return qctx.segment();
+    }
+
     /** {@inheritDoc} */
     @Override public long getRowCount(@Nullable Session ses) {
         IndexingQueryFilter f = threadLocalFilter();
 
+        int seg = threadLocalSegment();
+
         // Fast path if we don't need to perform any filtering.
         if (f == null || f.forSpace((getTable()).spaceName()) == null)
-            return treeForRead().size();
+            return treeForRead(seg).size();
 
         Iterator<GridH2Row> iter = doFind(null, false, null);
 
@@ -255,7 +325,9 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
      * @return Row.
      */
     public GridH2Row findOne(GridSearchRowPointer row) {
-        return tree.get(row);
+        int seg = threadLocalSegment();
+
+        return segments[seg].get(row);
     }
 
     /**
@@ -268,7 +340,9 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
      */
     @SuppressWarnings("unchecked")
     private Iterator<GridH2Row> doFind(@Nullable SearchRow first, boolean includeFirst, @Nullable SearchRow last) {
-        ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> t = treeForRead();
+        int seg = threadLocalSegment();
+
+        ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> t = treeForRead(seg);
 
         return doFind0(t, first, includeFirst, last, threadLocalFilter());
     }
@@ -359,12 +433,68 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
 
     /** {@inheritDoc} */
     @Override public GridH2Row put(GridH2Row row) {
-        return tree.put(row, row);
+        int seg = segment(row);
+
+        return segments[seg].put(row, row);
     }
 
     /** {@inheritDoc} */
     @Override public GridH2Row remove(SearchRow row) {
-        return tree.remove(comparable(row, 0));
+        GridSearchRowPointer comparable = comparable(row, 0);
+
+        int seg = segment(row);
+
+        return segments[seg].remove(comparable);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int segmentsCount() {
+        return segments.length;
+    }
+
+    /**
+     * @param partition Parttition idx.
+     * @return index currentSegment Id for given key
+     */
+    protected int segment(int partition) {
+        return partition % segments.length;
+    }
+
+    /**
+     * @param row
+     * @return index currentSegment Id for given row
+     */
+    private int segment(SearchRow row) {
+        assert row != null;
+
+        CacheObject key;
+
+        if (desc != null && desc.context() != null) {
+            GridCacheContext<?, ?> ctx = desc.context();
+
+            assert ctx != null;
+
+            if (row instanceof GridH2AbstractKeyValueRow && KEY_FIELD != null) {
+                try {
+                    Object o = KEY_FIELD.get(row);
+
+                    if (o instanceof CacheObject)
+                        key = (CacheObject)o;
+                    else
+                        key = ctx.toCacheKeyObject(o);
+
+                }
+                catch (IllegalAccessException e) {
+                    throw new IllegalStateException(e);
+                }
+            }
+            else
+                key = ctx.toCacheKeyObject(row.getValue(0));
+
+            return segment(ctx.affinity().partition(key));
+        }
+        else
+            return 0;
     }
 
     /**
@@ -463,18 +593,20 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
         IndexColumn[] cols = getIndexColumns();
 
         GridH2TreeIndex idx = new GridH2TreeIndex(getName(), getTable(),
-            getIndexType().isUnique(), F.asList(cols));
+            getIndexType().isUnique(), F.asList(cols), segments.length);
 
         Thread thread = Thread.currentThread();
 
-        long i = 0;
+        long j = 0;
 
-        for (GridH2Row row : tree.values()) {
-            // Check for interruptions every 1000 iterations.
-            if (++i % 1000 == 0 && thread.isInterrupted())
-                throw new InterruptedException();
+        for (int i = 0; i < segments.length; i++) {
+            for (GridH2Row row : segments[i].values()) {
+                // Check for interruptions every 1000 iterations.
+                if ((++j & 1023) == 0 && thread.isInterrupted())
+                    throw new InterruptedException();
 
-            idx.tree.put(row, row);
+                idx.put(row);
+            }
         }
 
         return idx;

http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index ac1a6a6..5027c9a 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -29,6 +29,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.UUID;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicReferenceArray;
 import javax.cache.CacheException;
@@ -56,6 +57,7 @@ import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshalla
 import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
 import org.apache.ignite.internal.processors.query.GridQueryCancel;
 import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2RetryException;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
@@ -84,6 +86,8 @@ import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED;
 import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ;
 import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
+import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF;
+import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.distributedJoinMode;
 import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.MAP;
 import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.REPLICATED;
 import static org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExecutor.QUERY_POOL;
@@ -161,8 +165,7 @@ public class GridMapQueryExecutor {
                 if (nodeRess == null)
                     return;
 
-                for (QueryResults ress : nodeRess.results().values())
-                    ress.cancel(true);
+                nodeRess.cancelAll();
             }
         }, EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT);
 
@@ -235,12 +238,7 @@ public class GridMapQueryExecutor {
             GridH2QueryContext.clear(ctx.localNodeId(), node.id(), qryReqId, MAP);
         }
 
-        QueryResults results = nodeRess.results().remove(qryReqId);
-
-        if (results == null)
-            return;
-
-        results.cancel(true);
+        nodeRess.cancelRequest(qryReqId);
     }
 
     /**
@@ -427,6 +425,7 @@ public class GridMapQueryExecutor {
 
         onQueryRequest0(node,
             req.requestId(),
+            0,
             req.queries(),
             cacheIds,
             req.topologyVersion(),
@@ -434,7 +433,7 @@ public class GridMapQueryExecutor {
             req.partitions(),
             null,
             req.pageSize(),
-            false,
+            OFF,
             req.timeout());
     }
 
@@ -442,12 +441,49 @@ public class GridMapQueryExecutor {
      * @param node Node.
      * @param req Query request.
      */
-    private void onQueryRequest(ClusterNode node, GridH2QueryRequest req) {
-        Map<UUID,int[]> partsMap = req.partitions();
-        int[] parts = partsMap == null ? null : partsMap.get(ctx.localNodeId());
+    private void onQueryRequest(final ClusterNode node, final GridH2QueryRequest req) throws IgniteCheckedException {
+        final Map<UUID,int[]> partsMap = req.partitions();
+        final int[] parts = partsMap == null ? null : partsMap.get(ctx.localNodeId());
+
+        assert req.caches() != null && !req.caches().isEmpty();
+
+        GridCacheContext<?, ?> mainCctx = ctx.cache().context().cacheContext( req.caches().get(0));
+
+        if (mainCctx == null)
+            throw new CacheException("Failed to find cache.");
+
+        final DistributedJoinMode joinMode = distributedJoinMode(
+            req.isFlagSet(GridH2QueryRequest.FLAG_IS_LOCAL),
+            req.isFlagSet(GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS));
+
+        for (int i = 1; i < mainCctx.config().getQueryParallelism(); i++) {
+            final int segment = i;
+
+            ctx.closure().callLocal(
+                new Callable<Void>() {
+                    @Override public Void call() throws Exception {
+                        onQueryRequest0(node,
+                            req.requestId(),
+                            segment,
+                            req.queries(),
+                            req.caches(),
+                            req.topologyVersion(),
+                            partsMap,
+                            parts,
+                            req.tables(),
+                            req.pageSize(),
+                            joinMode,
+                            req.timeout());
+
+                        return null;
+                    }
+                }
+                , QUERY_POOL);
+        }
 
         onQueryRequest0(node,
             req.requestId(),
+            0,
             req.queries(),
             req.caches(),
             req.topologyVersion(),
@@ -455,13 +491,14 @@ public class GridMapQueryExecutor {
             parts,
             req.tables(),
             req.pageSize(),
-            req.isFlagSet(GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS),
+            joinMode,
             req.timeout());
     }
 
     /**
      * @param node Node authored request.
      * @param reqId Request ID.
+     * @param segmentId index segment ID.
      * @param qrys Queries to execute.
      * @param cacheIds Caches which will be affected by these queries.
      * @param topVer Topology version.
@@ -469,11 +506,12 @@ public class GridMapQueryExecutor {
      * @param parts Explicit partitions for current node.
      * @param tbls Tables.
      * @param pageSize Page size.
-     * @param distributedJoins Can we expect distributed joins to be ran.
+     * @param distributedJoinMode Query distributed join mode.
      */
     private void onQueryRequest0(
         ClusterNode node,
         long reqId,
+        int segmentId,
         Collection<GridCacheSqlQuery> qrys,
         List<Integer> cacheIds,
         AffinityTopologyVersion topVer,
@@ -481,7 +519,7 @@ public class GridMapQueryExecutor {
         int[] parts,
         Collection<String> tbls,
         int pageSize,
-        boolean distributedJoins,
+        DistributedJoinMode distributedJoinMode,
         int timeout
     ) {
         // Prepare to run queries.
@@ -500,7 +538,7 @@ public class GridMapQueryExecutor {
             if (topVer != null) {
                 // Reserve primary for topology version or explicit partitions.
                 if (!reservePartitions(cacheIds, topVer, parts, reserved)) {
-                    sendRetry(node, reqId);
+                    sendRetry(node, reqId, segmentId);
 
                     return;
                 }
@@ -508,17 +546,18 @@ public class GridMapQueryExecutor {
 
             qr = new QueryResults(reqId, qrys.size(), mainCctx);
 
-            if (nodeRess.results().put(reqId, qr) != null)
+            if (nodeRess.put(reqId, segmentId, qr) != null)
                 throw new IllegalStateException();
 
             // Prepare query context.
             GridH2QueryContext qctx = new GridH2QueryContext(ctx.localNodeId(),
                 node.id(),
                 reqId,
+                segmentId,
                 mainCctx.isReplicated() ? REPLICATED : MAP)
                 .filter(h2.backupFilter(topVer, parts))
                 .partitionsMap(partsMap)
-                .distributedJoins(distributedJoins)
+                .distributedJoinMode(distributedJoinMode)
                 .pageSize(pageSize)
                 .topologyVersion(topVer)
                 .reservations(reserved);
@@ -542,7 +581,7 @@ public class GridMapQueryExecutor {
             Connection conn = h2.connectionForSpace(mainCctx.name());
 
             // Here we enforce join order to have the same behavior on all the nodes.
-            h2.setupConnection(conn, distributedJoins, true);
+            h2.setupConnection(conn, distributedJoinMode != OFF, true);
 
             GridH2QueryContext.set(qctx);
 
@@ -553,13 +592,13 @@ public class GridMapQueryExecutor {
                 if (nodeRess.cancelled(reqId)) {
                     GridH2QueryContext.clear(ctx.localNodeId(), node.id(), reqId, qctx.type());
 
-                    nodeRess.results().remove(reqId);
+                    nodeRess.cancelRequest(reqId);
 
                     throw new QueryCancelledException();
                 }
 
                 // Run queries.
-                int i = 0;
+                int qryIdx = 0;
 
                 boolean evt = ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED);
 
@@ -567,7 +606,7 @@ public class GridMapQueryExecutor {
                     ResultSet rs = h2.executeSqlQueryWithTimer(mainCctx.name(), conn, qry.query(),
                         F.asList(qry.parameters()), true,
                         timeout,
-                        qr.cancels[i]);
+                        qr.cancels[qryIdx]);
 
                     if (evt) {
                         ctx.event().record(new CacheQueryExecutedEvent<>(
@@ -587,24 +626,24 @@ public class GridMapQueryExecutor {
 
                     assert rs instanceof JdbcResultSet : rs.getClass();
 
-                    qr.addResult(i, qry, node.id(), rs);
+                    qr.addResult(qryIdx, qry, node.id(), rs);
 
                     if (qr.canceled) {
-                        qr.result(i).close();
+                        qr.result(qryIdx).close();
 
                         throw new QueryCancelledException();
                     }
 
                     // Send the first page.
-                    sendNextPage(nodeRess, node, qr, i, pageSize);
+                    sendNextPage(nodeRess, node, qr, qryIdx, segmentId, pageSize);
 
-                    i++;
+                    qryIdx++;
                 }
             }
             finally {
                 GridH2QueryContext.clearThreadLocal();
 
-                if (!distributedJoins)
+                if (distributedJoinMode == OFF)
                     qctx.clearContext(false);
 
                 if (!F.isEmpty(snapshotedTbls)) {
@@ -615,13 +654,13 @@ public class GridMapQueryExecutor {
         }
         catch (Throwable e) {
             if (qr != null) {
-                nodeRess.results().remove(reqId, qr);
+                nodeRess.remove(reqId, segmentId, qr);
 
                 qr.cancel(false);
             }
 
             if (X.hasCause(e, GridH2RetryException.class))
-                sendRetry(node, reqId);
+                sendRetry(node, reqId, segmentId);
             else {
                 U.error(log, "Failed to execute local query.", e);
 
@@ -681,14 +720,14 @@ public class GridMapQueryExecutor {
             return;
         }
 
-        QueryResults qr = nodeRess.results().get(req.queryRequestId());
+        QueryResults qr = nodeRess.get(req.queryRequestId(), req.segmentId());
 
         if (qr == null)
             sendError(node, req.queryRequestId(), new CacheException("No query result found for request: " + req));
         else if (qr.canceled)
             sendError(node, req.queryRequestId(), new QueryCancelledException());
         else
-            sendNextPage(nodeRess, node, qr, req.query(), req.pageSize());
+            sendNextPage(nodeRess, node, qr, req.query(), req.segmentId(), req.pageSize());
     }
 
     /**
@@ -696,9 +735,10 @@ public class GridMapQueryExecutor {
      * @param node Node.
      * @param qr Query results.
      * @param qry Query.
+     * @param segmentId Index segment ID.
      * @param pageSize Page size.
      */
-    private void sendNextPage(NodeResults nodeRess, ClusterNode node, QueryResults qr, int qry,
+    private void sendNextPage(NodeResults nodeRess, ClusterNode node, QueryResults qr, int qry, int segmentId,
         int pageSize) {
         QueryResult res = qr.result(qry);
 
@@ -714,14 +754,14 @@ public class GridMapQueryExecutor {
             res.close();
 
             if (qr.isAllClosed())
-                nodeRess.results().remove(qr.qryReqId, qr);
+                nodeRess.remove(qr.qryReqId, segmentId, qr);
         }
 
         try {
             boolean loc = node.isLocal();
 
-            GridQueryNextPageResponse msg = new GridQueryNextPageResponse(qr.qryReqId, qry, page,
-                page == 0 ? res.rowCnt : -1 ,
+            GridQueryNextPageResponse msg = new GridQueryNextPageResponse(qr.qryReqId, segmentId, qry, page,
+                page == 0 ? res.rowCnt : -1,
                 res.cols,
                 loc ? null : toMessages(rows, new ArrayList<Message>(res.cols)),
                 loc ? rows : null);
@@ -741,12 +781,13 @@ public class GridMapQueryExecutor {
     /**
      * @param node Node.
      * @param reqId Request ID.
+     * @param segmentId Index segment ID.
      */
-    private void sendRetry(ClusterNode node, long reqId) {
+    private void sendRetry(ClusterNode node, long reqId, int segmentId) {
         try {
             boolean loc = node.isLocal();
 
-            GridQueryNextPageResponse msg = new GridQueryNextPageResponse(reqId,
+            GridQueryNextPageResponse msg = new GridQueryNextPageResponse(reqId, segmentId,
             /*qry*/0, /*page*/0, /*allRows*/0, /*cols*/1,
                 loc ? null : Collections.<Message>emptyList(),
                 loc ? Collections.<Value[]>emptyList() : null);
@@ -780,35 +821,118 @@ public class GridMapQueryExecutor {
      */
     private static class NodeResults {
         /** */
-        private final ConcurrentMap<Long, QueryResults> res = new ConcurrentHashMap8<>();
+        private final ConcurrentMap<RequestKey, QueryResults> res = new ConcurrentHashMap8<>();
 
         /** */
         private final GridBoundedConcurrentLinkedHashMap<Long, Boolean> qryHist =
             new GridBoundedConcurrentLinkedHashMap<>(1024, 1024, 0.75f, 64, PER_SEGMENT_Q);
 
         /**
-         * @return All results.
+         * @param reqId Query Request ID.
+         * @return {@code False} if query was already cancelled.
          */
-        ConcurrentMap<Long, QueryResults> results() {
-            return res;
+        boolean cancelled(long reqId) {
+            return qryHist.get(reqId) != null;
         }
 
         /**
-         * @param qryId Query ID.
-         * @return {@code False} if query was already cancelled.
+         * @param reqId Query Request ID.
+         * @return {@code True} if cancelled.
          */
-        boolean cancelled(long qryId) {
-            return qryHist.get(qryId) != null;
+        boolean onCancel(long reqId) {
+            Boolean old = qryHist.putIfAbsent(reqId, Boolean.FALSE);
+
+            return old == null;
         }
 
         /**
-         * @param qryId Query ID.
-         * @return {@code True} if cancelled.
+         * @param reqId Query Request ID.
+         * @param segmentId Index segment ID.
+         * @return query partial results.
          */
-        boolean onCancel(long qryId) {
-            Boolean old = qryHist.putIfAbsent(qryId, Boolean.FALSE);
+        public QueryResults get(long reqId, int segmentId) {
+            return res.get(new RequestKey(reqId, segmentId));
+        }
 
-            return old == null;
+        /**
+         * Cancel all thread of given request.
+         * @param reqID Request ID.
+         */
+        public void cancelRequest(long reqID) {
+            for (RequestKey key : res.keySet()) {
+                if (key.reqId == reqID) {
+                    QueryResults removed = res.remove(key);
+
+                    if (removed != null)
+                        removed.cancel(true);
+                }
+
+            }
+        }
+
+        /**
+         * @param reqId Query Request ID.
+         * @param segmentId Index segment ID.
+         * @param qr Query Results.
+         * @return {@code True} if removed.
+         */
+        public boolean remove(long reqId, int segmentId, QueryResults qr) {
+            return res.remove(new RequestKey(reqId, segmentId), qr);
+        }
+
+        /**
+         * @param reqId Query Request ID.
+         * @param segmentId Index segment ID.
+         * @param qr Query Results.
+         * @return previous value.
+         */
+        public QueryResults put(long reqId, int segmentId, QueryResults qr) {
+            return res.put(new RequestKey(reqId, segmentId), qr);
+        }
+
+        /**
+         * Cancel all node queries.
+         */
+        public void cancelAll() {
+            for (QueryResults ress : res.values())
+                ress.cancel(true);
+        }
+
+        /**
+         *
+         */
+        private static class RequestKey {
+            /** */
+            private long reqId;
+
+            /** */
+            private int segmentId;
+
+            /** Constructor */
+            RequestKey(long reqId, int segmentId) {
+                this.reqId = reqId;
+                this.segmentId = segmentId;
+            }
+
+            /** {@inheritDoc} */
+            @Override public boolean equals(Object o) {
+                if (this == o)
+                    return true;
+                if (o == null || getClass() != o.getClass())
+                    return false;
+
+                RequestKey other = (RequestKey)o;
+
+                return reqId == other.reqId && segmentId == other.segmentId;
+
+            }
+
+            /** {@inheritDoc} */
+            @Override public int hashCode() {
+                int result = (int)(reqId ^ (reqId >>> 32));
+                result = 31 * result + segmentId;
+                return result;
+            }
         }
     }
 
@@ -836,7 +960,8 @@ public class GridMapQueryExecutor {
          * @param qrys Number of queries.
          * @param cctx Cache context.
          */
-        private QueryResults(long qryReqId, int qrys, GridCacheContext<?,?> cctx) {
+        @SuppressWarnings("unchecked")
+        private QueryResults(long qryReqId, int qrys, GridCacheContext<?, ?> cctx) {
             this.qryReqId = qryReqId;
             this.cctx = cctx;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
index c267f4a..45d3c58 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
@@ -59,7 +59,7 @@ public abstract class GridMergeIndex extends BaseIndex {
     private final AtomicInteger expRowsCnt = new AtomicInteger(0);
 
     /** Remaining rows per source node ID. */
-    private Map<UUID, Counter> remainingRows;
+    private Map<UUID, Counter[]> remainingRows;
 
     /** */
     private final AtomicBoolean lastSubmitted = new AtomicBoolean();
@@ -141,15 +141,22 @@ public abstract class GridMergeIndex extends BaseIndex {
      * Set source nodes.
      *
      * @param nodes Nodes.
+     * @param segmentsCnt Index segments per table.
      */
-    public void setSources(Collection<ClusterNode> nodes) {
+    public void setSources(Collection<ClusterNode> nodes, int segmentsCnt) {
         assert remainingRows == null;
 
         remainingRows = U.newHashMap(nodes.size());
 
         for (ClusterNode node : nodes) {
-            if (remainingRows.put(node.id(), new Counter()) != null)
+            Counter[] counters = new Counter[segmentsCnt];
+
+            for (int i = 0; i < segmentsCnt; i++)
+                counters[i] = new Counter();
+
+            if (remainingRows.put(node.id(), counters) != null)
                 throw new IllegalStateException("Duplicate node id: " + node.id());
+
         }
     }
 
@@ -194,7 +201,7 @@ public abstract class GridMergeIndex extends BaseIndex {
     public final void addPage(GridResultPage page) {
         int pageRowsCnt = page.rowsInPage();
 
-        Counter cnt = remainingRows.get(page.source());
+        Counter cnt = remainingRows.get(page.source())[page.res.segmentId()];
 
         // RemainingRowsCount should be updated before page adding to avoid race
         // in GridMergeIndexUnsorted cursor iterator
@@ -231,13 +238,14 @@ public abstract class GridMergeIndex extends BaseIndex {
             // Guarantee that finished state possible only if counter is zero and all pages was added
             cnt.state = State.FINISHED;
 
-            for (Counter c : remainingRows.values()) { // Check all the sources.
-                if (c.state != State.FINISHED)
-                    return;
+            for (Counter[] cntrs : remainingRows.values()) { // Check all the sources.
+                for(int i = 0; i < cntrs.length; i++) {
+                    if (cntrs[i].state != State.FINISHED)
+                        return;
+                }
             }
 
             if (lastSubmitted.compareAndSet(false, true)) {
-                // Add page-marker that last page was added
                 addPage0(new GridResultPage(null, page.source(), null) {
                     @Override public boolean isLast() {
                         return true;
@@ -256,7 +264,20 @@ public abstract class GridMergeIndex extends BaseIndex {
      * @param page Page.
      */
     protected void fetchNextPage(GridResultPage page) {
-        if (remainingRows.get(page.source()).get() != 0)
+        assert !page.isLast();
+
+        if(page.isFail())
+            page.fetchNextPage(); //rethrow exceptions
+
+        assert page.res != null;
+
+        Counter[] counters = remainingRows.get(page.source());
+
+        int segId = page.res.segmentId();
+
+        Counter counter = counters[segId];
+
+        if (counter.get() != 0)
             page.fetchNextPage();
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index f54fab6..8837046 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -62,9 +62,9 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartit
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
 import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
 import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
-import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
 import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator;
 import org.apache.ignite.internal.processors.query.GridQueryCancel;
+import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
 import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQuerySplitter;
@@ -100,6 +100,7 @@ import org.jsr166.ConcurrentHashMap8;
 
 import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE;
 import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL_FIELDS;
+import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF;
 import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.REDUCE;
 
 /**
@@ -294,6 +295,7 @@ public class GridReduceQueryExecutor {
     private void onNextPage(final ClusterNode node, GridQueryNextPageResponse msg) {
         final long qryReqId = msg.queryRequestId();
         final int qry = msg.query();
+        final int seg = msg.segmentId();
 
         final QueryRun r = runs.get(qryReqId);
 
@@ -326,7 +328,7 @@ public class GridReduceQueryExecutor {
                     }
 
                     try {
-                        GridQueryNextPageRequest msg0 = new GridQueryNextPageRequest(qryReqId, qry, pageSize);
+                        GridQueryNextPageRequest msg0 = new GridQueryNextPageRequest(qryReqId, qry, seg, pageSize);
 
                         if (node.isLocal())
                             h2.mapQueryExecutor().onMessage(ctx.localNodeId(), msg0);
@@ -514,29 +516,33 @@ public class GridReduceQueryExecutor {
             // Explicit partition mapping for unstable topology.
             Map<ClusterNode, IntArray> partsMap = null;
 
-            if (isPreloadingActive(cctx, extraSpaces)) {
-                if (cctx.isReplicated())
-                    nodes = replicatedUnstableDataNodes(cctx, extraSpaces);
-                else {
-                    partsMap = partitionedUnstableDataNodes(cctx, extraSpaces);
+            if (qry.isLocal())
+                nodes = Collections.singleton(ctx.discovery().localNode());
+            else {
+                if (isPreloadingActive(cctx, extraSpaces)) {
+                    if (cctx.isReplicated())
+                        nodes = replicatedUnstableDataNodes(cctx, extraSpaces);
+                    else {
+                        partsMap = partitionedUnstableDataNodes(cctx, extraSpaces);
 
-                    nodes = partsMap == null ? null : partsMap.keySet();
+                        nodes = partsMap == null ? null : partsMap.keySet();
+                    }
                 }
-            }
-            else
-                nodes = stableDataNodes(topVer, cctx, extraSpaces);
+                else
+                    nodes = stableDataNodes(topVer, cctx, extraSpaces);
 
-            if (nodes == null)
-                continue; // Retry.
+                if (nodes == null)
+                    continue; // Retry.
 
-            assert !nodes.isEmpty();
+                assert !nodes.isEmpty();
 
-            if (cctx.isReplicated() || qry.explain()) {
-                assert qry.explain() || !nodes.contains(ctx.discovery().localNode()) :
-                    "We must be on a client node.";
+                if (cctx.isReplicated() || qry.explain()) {
+                    assert qry.explain() || !nodes.contains(ctx.discovery().localNode()) :
+                        "We must be on a client node.";
 
-                // Select random data node to run query on a replicated data or get EXPLAIN PLAN from a single node.
-                nodes = Collections.singleton(F.rand(nodes));
+                    // Select random data node to run query on a replicated data or get EXPLAIN PLAN from a single node.
+                    nodes = Collections.singleton(F.rand(nodes));
+                }
             }
 
             final Collection<ClusterNode> finalNodes = nodes;
@@ -545,6 +551,8 @@ public class GridReduceQueryExecutor {
 
             final boolean skipMergeTbl = !qry.explain() && qry.skipMergeTable();
 
+            final int segmentsPerIndex = cctx.config().getQueryParallelism();
+
             for (GridCacheSqlQuery mapQry : qry.mapQueries()) {
                 GridMergeIndex idx;
 
@@ -565,12 +573,12 @@ public class GridReduceQueryExecutor {
                 else
                     idx = GridMergeIndexUnsorted.createDummy(ctx);
 
-                idx.setSources(nodes);
+                idx.setSources(nodes, segmentsPerIndex);
 
                 r.idxs.add(idx);
             }
 
-            r.latch = new CountDownLatch(r.idxs.size() * nodes.size());
+            r.latch = new CountDownLatch(r.idxs.size() * nodes.size() * segmentsPerIndex);
 
             runs.put(qryReqId, r);
 
@@ -626,14 +634,13 @@ public class GridReduceQueryExecutor {
                             .tables(distributedJoins ? qry.tables() : null)
                             .partitions(convert(partsMap))
                             .queries(mapQrys)
-                            .flags(distributedJoins ? GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS : 0)
+                            .flags((qry.isLocal() ? GridH2QueryRequest.FLAG_IS_LOCAL : 0) |
+                                (distributedJoins ? GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS : 0))
                             .timeout(timeoutMillis),
                     oldStyle && partsMap != null ? new ExplicitPartitionsSpecializer(partsMap) : null,
-                    distributedJoins)
-                    ) {
-                    awaitAllReplies(r, nodes);
+                    false)) {
 
-                    cancel.checkCancelled();
+                    awaitAllReplies(r, nodes, cancel);
 
                     Object state = r.state.get();
 
@@ -696,7 +703,7 @@ public class GridReduceQueryExecutor {
                         h2.setupConnection(r.conn, false, enforceJoinOrder);
 
                         GridH2QueryContext.set(new GridH2QueryContext(locNodeId, locNodeId, qryReqId, REDUCE)
-                            .pageSize(r.pageSize).distributedJoins(false));
+                            .pageSize(r.pageSize).distributedJoinMode(OFF));
 
                         try {
                             if (qry.explain())
@@ -817,11 +824,15 @@ public class GridReduceQueryExecutor {
     /**
      * @param r Query run.
      * @param nodes Nodes to check periodically if they alive.
+     * @param cancel Query cancel.
      * @throws IgniteInterruptedCheckedException If interrupted.
      */
-    private void awaitAllReplies(QueryRun r, Collection<ClusterNode> nodes)
-        throws IgniteInterruptedCheckedException {
+    private void awaitAllReplies(QueryRun r, Collection<ClusterNode> nodes, GridQueryCancel cancel)
+        throws IgniteInterruptedCheckedException, QueryCancelledException {
         while (!U.await(r.latch, 500, TimeUnit.MILLISECONDS)) {
+
+            cancel.checkCancelled();
+
             for (ClusterNode node : nodes) {
                 if (!ctx.discovery().alive(node)) {
                     handleNodeLeft(r, node.id());

http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2IndexRangeRequest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2IndexRangeRequest.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2IndexRangeRequest.java
index e49c48f..b2548cc 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2IndexRangeRequest.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2IndexRangeRequest.java
@@ -38,6 +38,12 @@ public class GridH2IndexRangeRequest implements Message {
     private long qryId;
 
     /** */
+    private int originSegmentId;
+
+    /** */
+    private int segmentId;
+
+    /** */
     private int batchLookupId;
 
     /** */
@@ -87,6 +93,34 @@ public class GridH2IndexRangeRequest implements Message {
     }
 
     /**
+     * @param segmentId Index segment ID.
+     */
+    public void segment(int segmentId) {
+        this.segmentId = segmentId;
+    }
+
+    /**
+     * @return Index segment ID.
+     */
+    public int segment() {
+        return segmentId;
+    }
+
+    /**
+     * @return Origin index segment ID.
+     */
+    public int originSegmentId() {
+        return originSegmentId;
+    }
+
+    /**
+     * @param segmentId Origin index segment ID.
+     */
+    public void originSegmentId(int segmentId) {
+        this.originSegmentId = segmentId;
+    }
+
+    /**
      * @param batchLookupId Batch lookup ID.
      */
     public void batchLookupId(int batchLookupId) {
@@ -136,6 +170,15 @@ public class GridH2IndexRangeRequest implements Message {
 
                 writer.incrementState();
 
+            case 4:
+                if (!writer.writeInt("segmentId", segmentId))
+                    return false;
+
+            case 5:
+                if (!writer.writeInt("originSegId", originSegmentId))
+                    return false;
+
+                writer.incrementState();
         }
 
         return true;
@@ -181,6 +224,21 @@ public class GridH2IndexRangeRequest implements Message {
 
                 reader.incrementState();
 
+            case 4:
+                segmentId = reader.readInt("segmentId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 5:
+                originSegmentId = reader.readInt("originSegId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
         }
 
         return reader.afterMessageRead(GridH2IndexRangeRequest.class);
@@ -193,7 +251,7 @@ public class GridH2IndexRangeRequest implements Message {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 4;
+        return 6;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2IndexRangeResponse.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2IndexRangeResponse.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2IndexRangeResponse.java
index c6414bd..4d3db12 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2IndexRangeResponse.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2IndexRangeResponse.java
@@ -47,6 +47,12 @@ public class GridH2IndexRangeResponse implements Message {
     private long qryId;
 
     /** */
+    private int segmentId;
+
+    /** */
+    private int originSegmentId;
+
+    /** */
     private int batchLookupId;
 
     /** */
@@ -130,6 +136,34 @@ public class GridH2IndexRangeResponse implements Message {
     }
 
     /**
+     * @param segmentId Index segment ID.
+     */
+    public void segment(int segmentId) {
+        this.segmentId = segmentId;
+    }
+
+    /**
+     * @return Index segment ID.
+     */
+    public int segment() {
+        return segmentId;
+    }
+
+    /**
+     * @return  Origin index segment ID.
+     */
+    public int originSegmentId() {
+        return originSegmentId;
+    }
+
+    /**
+     * @param segmentId Origin index segment ID.
+     */
+    public void originSegmentId(int segmentId) {
+        this.originSegmentId = segmentId;
+    }
+
+    /**
      * @param batchLookupId Batch lookup ID.
      */
     public void batchLookupId(int batchLookupId) {
@@ -191,6 +225,17 @@ public class GridH2IndexRangeResponse implements Message {
 
                 writer.incrementState();
 
+            case 6:
+                if (!writer.writeInt("originSegId", originSegmentId))
+                    return false;
+
+                writer.incrementState();
+
+            case 7:
+                if (!writer.writeInt("segmentId", segmentId))
+                    return false;
+
+                writer.incrementState();
         }
 
         return true;
@@ -252,6 +297,21 @@ public class GridH2IndexRangeResponse implements Message {
 
                 reader.incrementState();
 
+            case 6:
+                originSegmentId = reader.readInt("originSegId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 7:
+                segmentId = reader.readInt("segmentId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
         }
 
         return reader.afterMessageRead(GridH2IndexRangeResponse.class);
@@ -264,7 +324,7 @@ public class GridH2IndexRangeResponse implements Message {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 6;
+        return 8;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
index 884173f..ec49aff 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
@@ -50,6 +50,11 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
      */
     public static int FLAG_DISTRIBUTED_JOINS = 1;
 
+    /**
+     * Restrict distributed joins range-requests to local index segments. Range requests to other nodes will not be sent.
+     */
+    public static int FLAG_IS_LOCAL = 2;
+
     /** */
     private long reqId;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSegmentedIndexSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSegmentedIndexSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSegmentedIndexSelfTest.java
new file mode 100644
index 0000000..f8c9dd5
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSegmentedIndexSelfTest.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query;
+
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import javax.cache.Cache;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheKeyConfiguration;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Tests for correct distributed queries with index consisted of many segments.
+ */
+public class IgniteSqlSegmentedIndexSelfTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static int QRY_PARALLELISM_LVL = 97;
+
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        CacheKeyConfiguration keyCfg = new CacheKeyConfiguration("MyCache", "affKey");
+
+        cfg.setCacheKeyConfiguration(keyCfg);
+
+        cfg.setPeerClassLoadingEnabled(false);
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(ipFinder);
+
+        cfg.setDiscoverySpi(disco);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids(true);
+    }
+
+    /**
+     * @param name Cache name.
+     * @param partitioned Partition or replicated cache.
+     * @param idxTypes Indexed types.
+     * @return Cache configuration.
+     */
+    private static <K, V> CacheConfiguration<K, V> cacheConfig(String name, boolean partitioned, Class<?>... idxTypes) {
+        return new CacheConfiguration<K, V>()
+            .setName(name)
+            .setCacheMode(partitioned ? CacheMode.PARTITIONED : CacheMode.REPLICATED)
+            .setQueryParallelism(partitioned ? QRY_PARALLELISM_LVL : 1)
+            .setAtomicityMode(CacheAtomicityMode.ATOMIC)
+            .setIndexedTypes(idxTypes);
+    }
+
+    /**
+     * Run tests on single-node grid
+     * @throws Exception If failed.
+     */
+    public void testSingleNodeIndexSegmentation() throws Exception {
+        startGridsMultiThreaded(1, true);
+
+        ignite(0).createCache(cacheConfig("pers", true, Integer.class, Person.class));
+        ignite(0).createCache(cacheConfig("org", true, Integer.class, Organization.class));
+
+        fillCache();
+
+        checkDistributedQueryWithSegmentedIndex();
+
+        checkLocalQueryWithSegmentedIndex();
+    }
+
+    /**
+     * Run tests on multi-node grid
+     * @throws Exception If failed.
+     */
+    public void testMultiNodeIndexSegmentation() throws Exception {
+        startGridsMultiThreaded(4, true);
+
+        ignite(0).createCache(cacheConfig("pers", true, Integer.class, Person.class));
+        ignite(0).createCache(cacheConfig("org", true, Integer.class, Organization.class));
+
+        fillCache();
+
+        checkDistributedQueryWithSegmentedIndex();
+
+        checkLocalQueryWithSegmentedIndex();
+    }
+
+    /**
+     * Run tests on multi-node grid
+     * @throws Exception If failed.
+     */
+    public void testMultiNodeSegmentedPartitionedWithReplicated() throws Exception {
+        startGridsMultiThreaded(4, true);
+
+        ignite(0).createCache(cacheConfig("pers", true, Integer.class, Person.class));
+        ignite(0).createCache(cacheConfig("org", false, Integer.class, Organization.class));
+
+        fillCache();
+
+        checkDistributedQueryWithSegmentedIndex();
+
+        checkLocalQueryWithSegmentedIndex();
+    }
+
+    /**
+     * Check distributed joins.
+     * @throws Exception If failed.
+     */
+    public void checkDistributedQueryWithSegmentedIndex() throws Exception {
+        IgniteCache<Integer, Person> c1 = ignite(0).cache("pers");
+
+        int expectedPersons = 0;
+
+        for (Cache.Entry<Integer, Person> e : c1) {
+            final Integer orgId = e.getValue().orgId;
+
+            if (10 <= orgId && orgId < 500)
+                expectedPersons++;
+        }
+
+        String select0 = "select o.name n1, p.name n2 from \"pers\".Person p, \"org\".Organization o where p.orgId = o._key";
+
+        List<List<?>> result = c1.query(new SqlFieldsQuery(select0).setDistributedJoins(true)).getAll();
+
+        assertEquals(expectedPersons, result.size());
+    }
+
+    /**
+     * Test local query.
+     * @throws Exception If failed.
+     */
+    public void checkLocalQueryWithSegmentedIndex() throws Exception {
+        IgniteCache<Integer, Person> c1 = ignite(0).cache("pers");
+        IgniteCache<Integer, Organization> c2 = ignite(0).cache("org");
+
+        Set<Integer> localOrgIds = new HashSet<>();
+
+        for(Cache.Entry<Integer, Organization> e : c2.localEntries())
+            localOrgIds.add(e.getKey());
+
+        int expectedPersons = 0;
+
+        for (Cache.Entry<Integer, Person> e : c1.localEntries()) {
+            final Integer orgId = e.getValue().orgId;
+
+            if (localOrgIds.contains(orgId))
+                expectedPersons++;
+        }
+
+        String select0 = "select o.name n1, p.name n2 from \"pers\".Person p, \"org\".Organization o where p.orgId = o._key";
+
+        List<List<?>> result = c1.query(new SqlFieldsQuery(select0).setLocal(true)).getAll();
+
+        assertEquals(expectedPersons, result.size());
+    }
+
+    /** */
+    private void fillCache() {
+        IgniteCache<Object, Object> c1 = ignite(0).cache("pers");
+
+        IgniteCache<Object, Object> c2 = ignite(0).cache("org");
+
+        final int orgCount = 500;
+
+        for (int i = 0; i < orgCount; i++)
+            c2.put(i, new Organization("org-" + i));
+
+        final Random random = new Random();
+
+        for (int i = 0; i < 1000; i++) {
+            int orgID = 10 + random.nextInt(orgCount + 10);
+
+            c1.put(i, new Person(orgID, "pers-" + i));
+        }
+    }
+
+    /**
+     *
+     */
+    private static class Person implements Serializable {
+        /** */
+        @QuerySqlField(index = true)
+        Integer orgId;
+
+        /** */
+        @QuerySqlField
+        String name;
+
+        /**
+         *
+         */
+        public Person() {
+            // No-op.
+        }
+
+        /**
+         * @param orgId Organization ID.
+         * @param name Name.
+         */
+        public Person(int orgId, String name) {
+            this.orgId = orgId;
+            this.name = name;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class Organization implements Serializable {
+        /** */
+        @QuerySqlField
+        String name;
+
+        /**
+         *
+         */
+        public Organization() {
+            // No-op.
+        }
+
+        /**
+         * @param name Organization name.
+         */
+        public Organization(String name) {
+            this.name = name;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
index 06afe7c..4ae2f91 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
@@ -33,9 +33,10 @@ import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheKeyConfiguration;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.CachePeekMode;
-import org.apache.ignite.cache.affinity.AffinityKeyMapped;
 import org.apache.ignite.cache.affinity.Affinity;
 import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.affinity.AffinityKeyMapped;
+import org.apache.ignite.cache.query.QueryCursor;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.cache.query.annotations.QuerySqlField;
 import org.apache.ignite.cluster.ClusterNode;
@@ -701,6 +702,7 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest {
                 ignite(0).destroyCache(cache.getName());
         }
     }
+
     /**
      * @throws Exception If failed.
      */
@@ -750,6 +752,127 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testIndexSegmentation() throws Exception {
+        CacheConfiguration ccfg1 = cacheConfig("pers", true,
+            Integer.class, Person2.class).setQueryParallelism(4);
+        CacheConfiguration ccfg2 = cacheConfig("org", true,
+            Integer.class, Organization.class).setQueryParallelism(4);
+
+        IgniteCache<Object, Object> c1 = ignite(0).getOrCreateCache(ccfg1);
+        IgniteCache<Object, Object> c2 = ignite(0).getOrCreateCache(ccfg2);
+
+        try {
+            c2.put(1, new Organization("o1"));
+            c2.put(2, new Organization("o2"));
+            c1.put(3, new Person2(1, "p1"));
+            c1.put(4, new Person2(2, "p2"));
+            c1.put(5, new Person2(3, "p3"));
+
+            String select0 = "select o.name n1, p.name n2 from \"pers\".Person2 p, \"org\".Organization o where p.orgId = o._key and o._key=1";
+
+            checkQueryPlan(c1, true, 1, new SqlFieldsQuery(select0));
+
+            checkQueryPlan(c1, true, 1, new SqlFieldsQuery(select0).setLocal(true));
+        }
+        finally {
+            c1.destroy();
+            c2.destroy();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReplicationCacheIndexSegmentationFailure() throws Exception {
+        GridTestUtils.assertThrows(log, new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                CacheConfiguration ccfg = cacheConfig("org", false,
+                    Integer.class, Organization.class).setQueryParallelism(4);
+
+                IgniteCache<Object, Object> c = ignite(0).createCache(ccfg);
+
+                return null;
+            }
+        }, CacheException.class, "Cache index segmentation is supported for PARTITIONED mode only.");
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testIndexSegmentationPartitionedReplicated() throws Exception {
+        CacheConfiguration ccfg1 = cacheConfig("pers", true,
+            Integer.class, Person2.class).setQueryParallelism(4);
+        CacheConfiguration ccfg2 = cacheConfig("org", false,
+            Integer.class, Organization.class);
+
+        final IgniteCache<Object, Object> c1 = ignite(0).getOrCreateCache(ccfg1);
+        final IgniteCache<Object, Object> c2 = ignite(0).getOrCreateCache(ccfg2);
+
+        try {
+            c2.put(1, new Organization("o1"));
+            c2.put(2, new Organization("o2"));
+            c1.put(3, new Person2(1, "p1"));
+            c1.put(4, new Person2(2, "p2"));
+            c1.put(5, new Person2(3, "p3"));
+
+            String select0 = "select o.name n1, p.name n2 from \"pers\".Person2 p, \"org\".Organization o where p.orgId = o._key";
+
+            final SqlFieldsQuery qry = new SqlFieldsQuery(select0);
+
+            qry.setDistributedJoins(true);
+
+            List<List<?>> results = c1.query(qry).getAll();
+
+            assertEquals(2, results.size());
+        }
+        finally {
+            c1.destroy();
+            c2.destroy();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testIndexWithDifferentSegmentationLevelsFailure() throws Exception {
+        CacheConfiguration ccfg1 = cacheConfig("pers", true,
+            Integer.class, Person2.class).setQueryParallelism(4);
+        CacheConfiguration ccfg2 = cacheConfig("org", true,
+            Integer.class, Organization.class).setQueryParallelism(3);
+
+        final IgniteCache<Object, Object> c1 = ignite(0).getOrCreateCache(ccfg1);
+        final IgniteCache<Object, Object> c2 = ignite(0).getOrCreateCache(ccfg2);
+
+        try {
+            c2.put(1, new Organization("o1"));
+            c2.put(2, new Organization("o2"));
+            c1.put(3, new Person2(1, "p1"));
+            c1.put(4, new Person2(2, "p2"));
+            c1.put(5, new Person2(3, "p3"));
+
+            String select0 = "select o.name n1, p.name n2 from \"pers\".Person2 p, \"org\".Organization o where p.orgId = o._key and o._key=1";
+
+            final SqlFieldsQuery qry = new SqlFieldsQuery(select0);
+
+            qry.setDistributedJoins(true);
+
+            GridTestUtils.assertThrows(log, new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    c1.query(qry);
+
+                    return null;
+                }
+            }, CacheException.class, "Using indexes with different parallelism levels in same query is forbidden.");
+        }
+        finally {
+            c1.destroy();
+            c2.destroy();
+        }
+    }
+
+    /**
      * @param cache Cache.
      * @param sql SQL.
      * @param enforceJoinOrder Enforce join order flag.
@@ -789,26 +912,26 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest {
             false,
             0,
             select +
-                "from " + cache1 + ","  + cache2 + " "+ where);
+                "from " + cache1 + "," + cache2 + " " + where);
 
         checkQueryPlan(cache,
             false,
             0,
             select +
-                "from " + cache2 + ","  + cache1 + " "+ where);
+                "from " + cache2 + "," + cache1 + " " + where);
 
         if (testEnforceJoinOrder) {
             checkQueryPlan(cache,
                 true,
                 0,
                 select +
-                    "from " + cache1 + ","  + cache2 + " "+ where);
+                    "from " + cache1 + "," + cache2 + " " + where);
 
             checkQueryPlan(cache,
                 true,
                 0,
                 select +
-                    "from " + cache2 + ","  + cache1 + " "+ where);
+                    "from " + cache2 + "," + cache1 + " " + where);
         }
     }
 
@@ -823,7 +946,7 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest {
         boolean enforceJoinOrder,
         int expBatchedJoins,
         String sql,
-        String...expText) {
+        String... expText) {
         checkQueryPlan(cache,
             enforceJoinOrder,
             expBatchedJoins,
@@ -848,7 +971,7 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest {
         boolean enforceJoinOrder,
         int expBatchedJoins,
         SqlFieldsQuery qry,
-        String...expText) {
+        String... expText) {
         qry.setEnforceJoinOrder(enforceJoinOrder);
         qry.setDistributedJoins(true);
 
@@ -984,7 +1107,7 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest {
      * @param args Arguments.
      * @return Column as list.
      */
-    private static <X> List<X> columnQuery(IgniteCache<?,?> c, String qry, Object... args) {
+    private static <X> List<X> columnQuery(IgniteCache<?, ?> c, String qry, Object... args) {
         return column(0, c.query(new SqlFieldsQuery(qry).setArgs(args)).getAll());
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java
index 52084c7..d6a5fb1 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java
@@ -230,16 +230,16 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract
         assertEquals(0, spi.size(typeAB.space(), typeAB));
         assertEquals(0, spi.size(typeBA.space(), typeBA));
 
-        assertFalse(spi.queryLocalSql(typeAA.space(), "select * from A.A", null, Collections.emptySet(), typeAA, null).hasNext());
-        assertFalse(spi.queryLocalSql(typeAB.space(), "select * from A.B", null, Collections.emptySet(), typeAB, null).hasNext());
-        assertFalse(spi.queryLocalSql(typeBA.space(), "select * from B.A", null, Collections.emptySet(), typeBA, null).hasNext());
+        assertFalse(spi.queryLocalSql(typeAA.space(), "select * from A.A", null, Collections.emptySet(), typeAA.name(), null, null).hasNext());
+        assertFalse(spi.queryLocalSql(typeAB.space(), "select * from A.B", null, Collections.emptySet(), typeAB.name(), null, null).hasNext());
+        assertFalse(spi.queryLocalSql(typeBA.space(), "select * from B.A", null, Collections.emptySet(), typeBA.name(), null, null).hasNext());
 
         assertFalse(spi.queryLocalSql(typeBA.space(), "select * from B.A, A.B, A.A", null,
-            Collections.emptySet(), typeBA, null).hasNext());
+            Collections.emptySet(), typeBA.name(), null, null).hasNext());
 
         try {
             spi.queryLocalSql(typeBA.space(), "select aa.*, ab.*, ba.* from A.A aa, A.B ab, B.A ba", null,
-                Collections.emptySet(), typeBA, null).hasNext();
+                Collections.emptySet(), typeBA.name(), null, null).hasNext();
 
             fail("Enumerations of aliases in select block must be prohibited");
         }
@@ -248,10 +248,10 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract
         }
 
         assertFalse(spi.queryLocalSql(typeAB.space(), "select ab.* from A.B ab", null,
-            Collections.emptySet(), typeAB, null).hasNext());
+            Collections.emptySet(), typeAB.name(), null, null).hasNext());
 
         assertFalse(spi.queryLocalSql(typeBA.space(), "select   ba.*   from B.A  as ba", null,
-            Collections.emptySet(), typeBA, null).hasNext());
+            Collections.emptySet(), typeBA.name(), null, null).hasNext());
 
         // Nothing to remove.
         spi.remove("A", key(1), aa(1, "", 10));
@@ -305,7 +305,7 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract
 
         // Query data.
         Iterator<IgniteBiTuple<Integer, Map<String, Object>>> res =
-            spi.queryLocalSql(typeAA.space(), "from a order by age", null, Collections.emptySet(), typeAA, null);
+            spi.queryLocalSql(typeAA.space(), "from a order by age", null, Collections.emptySet(), typeAA.name(), null, null);
 
         assertTrue(res.hasNext());
         assertEquals(aa(3, "Borya", 18).value(null, false), value(res.next()));
@@ -314,7 +314,7 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract
         assertFalse(res.hasNext());
 
         res = spi.queryLocalSql(typeAA.space(), "select aa.* from a aa order by aa.age", null,
-            Collections.emptySet(), typeAA, null);
+            Collections.emptySet(), typeAA.name(), null, null);
 
         assertTrue(res.hasNext());
         assertEquals(aa(3, "Borya", 18).value(null, false), value(res.next()));
@@ -322,7 +322,7 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract
         assertEquals(aa(2, "Valera", 19).value(null, false), value(res.next()));
         assertFalse(res.hasNext());
 
-        res = spi.queryLocalSql(typeAB.space(), "from b order by name", null, Collections.emptySet(), typeAB, null);
+        res = spi.queryLocalSql(typeAB.space(), "from b order by name", null, Collections.emptySet(), typeAB.name(), null, null);
 
         assertTrue(res.hasNext());
         assertEquals(ab(1, "Vasya", 20, "Some text about Vasya goes here.").value(null, false), value(res.next()));
@@ -331,7 +331,7 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract
         assertFalse(res.hasNext());
 
         res = spi.queryLocalSql(typeAB.space(), "select bb.* from b as bb order by bb.name", null,
-            Collections.emptySet(), typeAB, null);
+            Collections.emptySet(), typeAB.name(), null, null);
 
         assertTrue(res.hasNext());
         assertEquals(ab(1, "Vasya", 20, "Some text about Vasya goes here.").value(null, false), value(res.next()));
@@ -340,7 +340,7 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract
         assertFalse(res.hasNext());
 
 
-        res = spi.queryLocalSql(typeBA.space(), "from a", null, Collections.emptySet(), typeBA, null);
+        res = spi.queryLocalSql(typeBA.space(), "from a", null, Collections.emptySet(), typeBA.name(), null, null);
 
         assertTrue(res.hasNext());
         assertEquals(ba(2, "Kolya", 25, true).value(null, false), value(res.next()));
@@ -725,4 +725,4 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract
             throw new UnsupportedOperationException();
         }
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/test/java/org/apache/ignite/loadtests/h2indexing/FetchingQueryCursorStressTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/loadtests/h2indexing/FetchingQueryCursorStressTest.java b/modules/indexing/src/test/java/org/apache/ignite/loadtests/h2indexing/FetchingQueryCursorStressTest.java
new file mode 100644
index 0000000..d1d80f2
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/loadtests/h2indexing/FetchingQueryCursorStressTest.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.loadtests.h2indexing;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.cache.CacheException;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.marshaller.optimized.OptimizedMarshaller;
+
+/**
+ * SQL query stress test.
+ */
+public class FetchingQueryCursorStressTest {
+    /** Node count. */
+    private static final int NODE_CNT = 4; // Switch to 4 to see better throughput.
+
+    /** Number of entries. */
+    private static final int ENTRIES_CNT = 10_000;
+
+    /** Cache name. */
+    private static final String CACHE_NAME = "cache";
+
+    /** Thread count. */
+    private static final int THREAD_CNT = 16;
+
+    /** Execution counter. */
+    private static final AtomicLong CNT = new AtomicLong();
+
+    /** Verbose mode. */
+    private static final boolean VERBOSE = false;
+
+    /** */
+    private static final long TIMEOUT = TimeUnit.SECONDS.toMillis(30);
+
+    public static final AtomicReference<Exception> error = new AtomicReference<>();
+
+    /**
+     * Entry point.
+     */
+    public static void main(String[] args) throws Exception {
+        List<Thread> threads = new ArrayList<>(THREAD_CNT + 1);
+
+        try (Ignite ignite = start()) {
+
+            IgniteCache<Integer, Person> cache = ignite.cache(CACHE_NAME);
+
+            loadData(ignite, cache);
+
+            System.out.println("Loaded data: " + cache.size());
+
+            for (int i = 0; i < THREAD_CNT; i++)
+                threads.add(startDaemon("qry-exec-" + i, new QueryExecutor(cache, "Select * from Person")));
+
+            threads.add(startDaemon("printer", new ThroughputPrinter()));
+
+            Thread.sleep(TIMEOUT);
+
+            for (Thread t : threads)
+                t.join();
+
+            if(error.get()!=null)
+                throw error.get();
+        }
+        finally {
+            Ignition.stopAll(false);
+        }
+    }
+
+    /**
+     * Start daemon thread.
+     *
+     * @param name Name.
+     * @param r Runnable.
+     */
+    private static Thread startDaemon(String name, Runnable r) {
+        Thread t = new Thread(r);
+
+        t.setName(name);
+        t.setDaemon(true);
+
+        t.start();
+
+        return t;
+    }
+
+    /**
+     * Load data into Ignite.
+     *
+     * @param ignite Ignite.
+     * @param cache Cache.
+     */
+    private static void loadData(Ignite ignite, IgniteCache<Integer, Person> cache) throws Exception {
+        try (IgniteDataStreamer<Object, Object> str = ignite.dataStreamer(cache.getName())) {
+
+            for (int id = 0; id < ENTRIES_CNT; id++)
+                str.addData(id, new Person(id, "John" + id, "Doe"));
+        }
+    }
+
+    /**
+     * Start topology.
+     *
+     * @return Client node.
+     */
+    private static Ignite start() {
+        int i = 0;
+
+        for (; i < NODE_CNT; i++)
+            Ignition.start(config(i, false));
+
+        return Ignition.start(config(i, true));
+    }
+
+    /**
+     * Create configuration.
+     *
+     * @param idx Index.
+     * @param client Client flag.
+     * @return Configuration.
+     */
+    @SuppressWarnings("unchecked")
+    private static IgniteConfiguration config(int idx, boolean client) {
+        IgniteConfiguration cfg = new IgniteConfiguration();
+
+        cfg.setGridName("grid-" + idx);
+        cfg.setClientMode(client);
+
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setName(CACHE_NAME);
+        ccfg.setIndexedTypes(Integer.class, Person.class);
+        cfg.setMarshaller(new OptimizedMarshaller());
+
+        cfg.setCacheConfiguration(ccfg);
+
+        cfg.setLocalHost("127.0.0.1");
+
+        return cfg;
+    }
+
+    /**
+     *
+     */
+    private static class Person implements Serializable {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        @QuerySqlField
+        private int id;
+
+        /** */
+        @QuerySqlField
+        private String firstName;
+
+        /** */
+        @QuerySqlField
+        private String lastName;
+
+        public Person(int id, String firstName, String lastName) {
+            this.id = id;
+            this.firstName = firstName;
+            this.lastName = lastName;
+        }
+    }
+
+    /**
+     * Query runner.
+     */
+    private static class QueryExecutor implements Runnable {
+        /** Cache. */
+        private final IgniteCache<Integer, Person> cache;
+
+        /** */
+        private final String query;
+
+        /**
+         * Constructor.
+         *
+         * @param cache Cache.
+         */
+        public QueryExecutor(IgniteCache<Integer, Person> cache, String query) {
+            this.cache = cache;
+            this.query = query;
+        }
+
+        /** {@inheritDoc} */
+        @SuppressWarnings("InfiniteLoopStatement")
+        @Override public void run() {
+            System.out.println("Executor started: " + Thread.currentThread().getName());
+
+            try {
+                while (error.get()==null && !Thread.currentThread().isInterrupted()) {
+                    long start = System.nanoTime();
+
+                    SqlFieldsQuery qry = new SqlFieldsQuery(query);
+
+//                qry.setArgs((Object[]) argumentForQuery());
+
+                    Set<Integer> extIds = new HashSet<>();
+
+                    for (List<?> next : cache.query(qry))
+                        extIds.add((Integer)next.get(0));
+
+                    long dur = (System.nanoTime() - start) / 1_000_000;
+
+                    CNT.incrementAndGet();
+
+                    if (VERBOSE)
+                        System.out.println("[extIds=" + extIds.size() + ", dur=" + dur + ']');
+                }
+            }
+            catch (CacheException ex){
+                error.compareAndSet(null, ex);
+            }
+        }
+    }
+
+    /**
+     * Throughput printer.
+     */
+    private static class ThroughputPrinter implements Runnable {
+        /** {@inheritDoc} */
+        @SuppressWarnings("InfiniteLoopStatement")
+        @Override public void run() {
+            while (error.get()==null) {
+                long before = CNT.get();
+                long beforeTime = System.currentTimeMillis();
+
+                try {
+                    Thread.sleep(2000L);
+                }
+                catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+
+                    return;
+                }
+
+                long after = CNT.get();
+                long afterTime = System.currentTimeMillis();
+
+                double res = 1000 * ((double)(after - before)) / (afterTime - beforeTime);
+
+                System.out.println((long)res + " ops/sec");
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index 386b8fd..b417b0a 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -98,6 +98,7 @@ import org.apache.ignite.internal.processors.cache.query.IndexingSpiQuerySelfTes
 import org.apache.ignite.internal.processors.cache.query.IndexingSpiQueryTxSelfTest;
 import org.apache.ignite.internal.processors.query.IgniteSqlEntryCacheModeAgnosticTest;
 import org.apache.ignite.internal.processors.query.IgniteSqlSchemaIndexingTest;
+import org.apache.ignite.internal.processors.query.IgniteSqlSegmentedIndexSelfTest;
 import org.apache.ignite.internal.processors.query.IgniteSqlSplitterSelfTest;
 import org.apache.ignite.internal.processors.query.h2.GridH2IndexRebuildTest;
 import org.apache.ignite.internal.processors.query.h2.GridH2IndexingInMemSelfTest;
@@ -134,6 +135,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
 
         // Queries tests.
         suite.addTestSuite(IgniteSqlSplitterSelfTest.class);
+        suite.addTestSuite(IgniteSqlSegmentedIndexSelfTest.class);
         suite.addTestSuite(IgniteSqlSchemaIndexingTest.class);
         suite.addTestSuite(GridCacheQueryIndexDisabledSelfTest.class);
         suite.addTestSuite(IgniteCacheQueryLoadSelfTest.class);


[02/12] ignite git commit: Implemented.

Posted by se...@apache.org.
Implemented.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/64ba13b0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/64ba13b0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/64ba13b0

Branch: refs/heads/ignite-1.9
Commit: 64ba13b0a3be6acbf7d629029b460a39c2e2b388
Parents: 4eac51c
Author: AMRepo <an...@gmail.com>
Authored: Mon Feb 20 21:24:29 2017 +0300
Committer: Andrey V. Mashenkov <an...@gmail.com>
Committed: Tue Feb 21 11:52:40 2017 +0300

----------------------------------------------------------------------
 .../configuration/CacheConfiguration.java       |  48 ++++
 .../processors/cache/GridCacheProcessor.java    |   3 +
 .../processors/cache/IgniteCacheProxy.java      |   6 +-
 .../closure/GridClosureProcessor.java           |   2 +-
 .../processors/query/GridQueryIndexing.java     |  27 +-
 .../processors/query/GridQueryProcessor.java    | 141 +++-------
 .../messages/GridQueryNextPageRequest.java      |  29 +-
 .../messages/GridQueryNextPageResponse.java     |  29 +-
 .../cache/query/GridCacheTwoStepQuery.java      |  17 ++
 .../processors/query/h2/IgniteH2Indexing.java   | 235 ++++++++++++++--
 .../query/h2/opt/DistributedJoinMode.java       |  51 ++++
 .../query/h2/opt/GridH2IndexBase.java           | 264 +++++++++++++-----
 .../query/h2/opt/GridH2QueryContext.java        |  84 ++++--
 .../query/h2/opt/GridH2TreeIndex.java           | 232 ++++++++++++----
 .../query/h2/twostep/GridMapQueryExecutor.java  | 227 +++++++++++----
 .../query/h2/twostep/GridMergeIndex.java        |  39 ++-
 .../h2/twostep/GridReduceQueryExecutor.java     |  69 +++--
 .../h2/twostep/msg/GridH2IndexRangeRequest.java |  60 +++-
 .../twostep/msg/GridH2IndexRangeResponse.java   |  62 ++++-
 .../h2/twostep/msg/GridH2QueryRequest.java      |   5 +
 .../query/IgniteSqlSegmentedIndexSelfTest.java  | 263 ++++++++++++++++++
 .../query/IgniteSqlSplitterSelfTest.java        | 139 +++++++++-
 .../h2/GridIndexingSpiAbstractSelfTest.java     |  26 +-
 .../FetchingQueryCursorStressTest.java          | 277 +++++++++++++++++++
 .../IgniteCacheQuerySelfTestSuite.java          |   2 +
 25 files changed, 1917 insertions(+), 420 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
index 0656dda..149f25a 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
@@ -223,6 +223,9 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
     /** Default threshold for concurrent loading of keys from {@link CacheStore}. */
     public static final int DFLT_CONCURRENT_LOAD_ALL_THRESHOLD = 5;
 
+    /** Default SQL query parallelism level */
+    public static final int DFLT_SQL_QUERY_PARALLELISM_LVL = 1;
+
     /** Cache name. */
     private String name;
 
@@ -410,6 +413,9 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
     /** Query entities. */
     private Collection<QueryEntity> qryEntities;
 
+    /** */
+    private int qryParallelism = DFLT_SQL_QUERY_PARALLELISM_LVL;
+
     /** Empty constructor (all values are initialized to their defaults). */
     public CacheConfiguration() {
         /* No-op. */
@@ -462,6 +468,7 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
         interceptor = cc.getInterceptor();
         invalidate = cc.isInvalidate();
         isReadThrough = cc.isReadThrough();
+        qryParallelism = cc.getQueryParallelism();
         isWriteThrough = cc.isWriteThrough();
         storeKeepBinary = cc.isStoreKeepBinary() != null ? cc.isStoreKeepBinary() : DFLT_STORE_KEEP_BINARY;
         listenerConfigurations = cc.listenerConfigurations;
@@ -2108,6 +2115,47 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
     }
 
     /**
+     * Defines a hint to query execution engine on desired degree of parallelism within a single node.
+     * Query executor may or may not use this hint depending on estimated query costs. Query executor may define
+     * certain restrictions on parallelism depending on query type and/or cache type.
+     * <p>
+     * As of {@code Apache Ignite 1.9} this hint is only supported for SQL queries with the following restrictions:
+     * <ul>
+     *     <li>Hint cannot be used for {@code REPLICATED} cache, exception is thrown otherwise</li>
+     *     <li>All caches participating in query must have the same degree of parallelism, exception is thrown
+     *     otherwise</li>
+     * </ul>
+     * These restrictions will be removed in future versions of Apache Ignite.
+     * <p>
+     * Defaults to {@code 1}.
+     */
+    public int getQueryParallelism() {
+        return qryParallelism;
+    }
+
+    /**
+     * Defines a hint to query execution engine on desired degree of parallelism within a single node.
+     * Query executor may or may not use this hint depending on estimated query costs. Query executor may define
+     * certain restrictions on parallelism depending on query type and/or cache type.
+     * <p>
+     * As of {@code Apache Ignite 1.9} this hint is only supported for SQL queries with the following restrictions:
+     * <ul>
+     *     <li>Hint cannot be used for {@code REPLICATED} cache, exception is thrown otherwise</li>
+     *     <li>All caches participating in query must have the same degree of parallelism, exception is thrown
+     *     otherwise</li>
+     * </ul>
+     * These restrictions will be removed in future versions of Apache Ignite.
+     *
+     * @param qryParallelism Query parallelizm level.
+     * @return {@code this} for chaining.
+     */
+    public CacheConfiguration<K,V> setQueryParallelism(int qryParallelism) {
+        this.qryParallelism = qryParallelism;
+
+        return this;
+    }
+
+    /**
      * Gets topology validator.
      * <p>
      * See {@link TopologyValidator} for details.

http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 7093403..c3e3f3b 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -269,6 +269,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         if (cfg.getCacheMode() == REPLICATED)
             cfg.setBackups(Integer.MAX_VALUE);
 
+        if( cfg.getQueryParallelism() > 1 && cfg.getCacheMode() != PARTITIONED)
+            throw new IgniteCheckedException("Cache index segmentation is supported for PARTITIONED mode only.");
+
         if (cfg.getAffinityMapper() == null)
             cfg.setAffinityMapper(cacheObjCtx.defaultAffMapper());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 1381670..f806d05 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -729,12 +729,8 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
                 final SqlQuery p = (SqlQuery)qry;
 
                 if (isReplicatedDataNode() || ctx.isLocal() || qry.isLocal())
-                    return (QueryCursor<R>)new QueryCursorImpl<>(new Iterable<Cache.Entry<K, V>>() {
-                        @Override public Iterator<Cache.Entry<K, V>> iterator() {
-                            return ctx.kernalContext().query().queryLocal(ctx, p,
+                     return (QueryCursor<R>)ctx.kernalContext().query().queryLocal(ctx, p,
                                 opCtxCall != null && opCtxCall.isKeepBinary());
-                        }
-                    });
 
                 return (QueryCursor<R>)ctx.kernalContext().query().queryTwoStep(ctx, p);
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
index 20fb6a0..61ed8a0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
@@ -902,7 +902,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
      * @return Future.
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
-    private <R> IgniteInternalFuture<R> callLocal(@Nullable final Callable<R> c, byte plc)
+    public <R> IgniteInternalFuture<R> callLocal(@Nullable final Callable<R> c, byte plc)
         throws IgniteCheckedException {
         if (c == null)
             return new GridFinishedFuture<>();

http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
index ca04724..37f0ade 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
@@ -84,35 +84,26 @@ public interface GridQueryIndexing {
     /**
      * Queries individual fields (generally used by JDBC drivers).
      *
-     * @param spaceName Space name.
+     * @param cctx Cache context.
      * @param qry Query.
-     * @param params Query parameters.
      * @param filter Space name and key filter.
-     * @param enforceJoinOrder Enforce join order of tables in the query.
-     * @param timeout Query timeout in milliseconds.
      * @param cancel Query cancel.
-     * @return Query result.
-     * @throws IgniteCheckedException If failed.
+     * @return Cursor.
      */
-    public GridQueryFieldsResult queryLocalSqlFields(@Nullable String spaceName, String qry,
-        Collection<Object> params, IndexingQueryFilter filter, boolean enforceJoinOrder, int timeout,
-        GridQueryCancel cancel) throws IgniteCheckedException;
+    public <K, V> QueryCursor<List<?>> queryLocalSqlFields(GridCacheContext<?, ?> cctx, SqlFieldsQuery qry,
+        IndexingQueryFilter filter, GridQueryCancel cancel) throws IgniteCheckedException;
 
     /**
      * Executes regular query.
      *
-     * @param spaceName Space name.
+     * @param cctx Cache context.
      * @param qry Query.
-     * @param alias Table alias used in Query.
-     * @param params Query parameters.
-     * @param type Query return type.
      * @param filter Space name and key filter.
-     * @return Queried rows.
-     * @throws IgniteCheckedException If failed.
+     * @param keepBinary Keep binary flag.
+     * @return Cursor.
      */
-    public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> queryLocalSql(@Nullable String spaceName, String qry,
-        String alias, Collection<Object> params, GridQueryTypeDescriptor type, IndexingQueryFilter filter)
-        throws IgniteCheckedException;
+    public <K, V> QueryCursor<Cache.Entry<K,V>> queryLocalSql(GridCacheContext<?, ?> cctx, SqlQuery qry,
+        IndexingQueryFilter filter, boolean keepBinary) throws IgniteCheckedException;
 
     /**
      * Executes text query.

http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index ee9224b..85744d5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -754,42 +754,6 @@ public class GridQueryProcessor extends GridProcessorAdapter {
                 INDEXING.module() + " to classpath or moving it from 'optional' to 'libs' folder).");
     }
 
-    /**
-     * @param space Space.
-     * @param clause Clause.
-     * @param params Parameters collection.
-     * @param resType Result type.
-     * @param filters Filters.
-     * @return Key/value rows.
-     * @throws IgniteCheckedException If failed.
-     */
-    @SuppressWarnings("unchecked")
-    public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> query(final String space, final String clause,
-        final Collection<Object> params, final String resType, final IndexingQueryFilter filters)
-        throws IgniteCheckedException {
-        checkEnabled();
-
-        if (!busyLock.enterBusy())
-            throw new IllegalStateException("Failed to execute query (grid is stopping).");
-
-        try {
-            final GridCacheContext<?, ?> cctx = ctx.cache().internalCache(space).context();
-
-            return executeQuery(GridCacheQueryType.SQL_FIELDS, clause, cctx, new IgniteOutClosureX<GridCloseableIterator<IgniteBiTuple<K, V>>>() {
-                @Override public GridCloseableIterator<IgniteBiTuple<K, V>> applyx() throws IgniteCheckedException {
-                    TypeDescriptor type = typesByName.get(new TypeName(space, resType));
-
-                    if (type == null || !type.registered())
-                        throw new CacheException("Failed to find SQL table for type: " + resType);
-
-                    return idx.queryLocalSql(space, clause, null, params, type, filters);
-                }
-            }, false);
-        }
-        finally {
-            busyLock.leaveBusy();
-        }
-    }
 
     /**
      * @param cctx Cache context.
@@ -829,11 +793,12 @@ public class GridQueryProcessor extends GridProcessorAdapter {
             throw new IllegalStateException("Failed to execute query (grid is stopping).");
 
         try {
-            return executeQuery(GridCacheQueryType.SQL, qry.getSql(), cctx, new IgniteOutClosureX<QueryCursor<Cache.Entry<K, V>>>() {
-                @Override public QueryCursor<Cache.Entry<K, V>> applyx() throws IgniteCheckedException {
-                    return idx.queryTwoStep(cctx, qry);
-                }
-            }, true);
+            return executeQuery(GridCacheQueryType.SQL, qry.getSql(), cctx,
+                new IgniteOutClosureX<QueryCursor<Cache.Entry<K, V>>>() {
+                    @Override public QueryCursor<Cache.Entry<K, V>> applyx() throws IgniteCheckedException {
+                        return idx.queryTwoStep(cctx, qry);
+                    }
+                }, true);
         }
         catch (IgniteCheckedException e) {
             throw new IgniteException(e);
@@ -849,7 +814,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
      * @param keepBinary Keep binary flag.
      * @return Cursor.
      */
-    public <K, V> Iterator<Cache.Entry<K, V>> queryLocal(
+    public <K, V> QueryCursor<Cache.Entry<K, V>> queryLocal(
         final GridCacheContext<?, ?> cctx,
         final SqlQuery qry,
         final boolean keepBinary
@@ -859,54 +824,25 @@ public class GridQueryProcessor extends GridProcessorAdapter {
 
         try {
             return executeQuery(GridCacheQueryType.SQL, qry.getSql(), cctx,
-                new IgniteOutClosureX<Iterator<Cache.Entry<K, V>>>() {
-                    @Override public Iterator<Cache.Entry<K, V>> applyx() throws IgniteCheckedException {
-                        String space = cctx.name();
+                new IgniteOutClosureX<QueryCursor<Cache.Entry<K, V>>>() {
+                    @Override public QueryCursor<Cache.Entry<K, V>> applyx() throws IgniteCheckedException {
                         String type = qry.getType();
-                        String sqlQry = qry.getSql();
-                        Object[] params = qry.getArgs();
 
-                        TypeDescriptor typeDesc = typesByName.get(
-                            new TypeName(
-                                space,
+                        GridQueryProcessor.TypeDescriptor typeDesc = typesByName.get(
+                            new GridQueryProcessor.TypeName(
+                                cctx.name(),
                                 type));
 
                         if (typeDesc == null || !typeDesc.registered())
                             throw new CacheException("Failed to find SQL table for type: " + type);
 
-                        final GridCloseableIterator<IgniteBiTuple<K, V>> i = idx.queryLocalSql(
-                            space,
-                            qry.getSql(),
-                            qry.getAlias(),
-                            F.asList(params),
-                            typeDesc,
-                            idx.backupFilter(requestTopVer.get(), null));
+                        qry.setType(typeDesc.name());
 
                         sendQueryExecutedEvent(
-                            sqlQry,
-                            params);
-
-                        return new ClIter<Cache.Entry<K, V>>() {
-                            @Override public void close() throws Exception {
-                                i.close();
-                            }
-
-                            @Override public boolean hasNext() {
-                                return i.hasNext();
-                            }
-
-                            @Override public Cache.Entry<K, V> next() {
-                                IgniteBiTuple<K, V> t = i.next();
-
-                                return new CacheEntryImpl<>(
-                                    (K)cctx.unwrapBinaryIfNeeded(t.getKey(), keepBinary, false),
-                                    (V)cctx.unwrapBinaryIfNeeded(t.getValue(), keepBinary, false));
-                            }
-
-                            @Override public void remove() {
-                                throw new UnsupportedOperationException();
-                            }
-                        };
+                            qry.getSql(),
+                            qry.getArgs());
+
+                        return idx.queryLocalSql(cctx, qry, idx.backupFilter(requestTopVer.get(), null), keepBinary);
                     }
                 }, true);
         }
@@ -994,13 +930,6 @@ public class GridQueryProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * Closeable iterator.
-     */
-    private interface ClIter<X> extends AutoCloseable, Iterator<X> {
-        // No-op.
-    }
-
-    /**
      * @param cctx Cache context.
      * @param qry Query.
      * @return Iterator.
@@ -1010,34 +939,26 @@ public class GridQueryProcessor extends GridProcessorAdapter {
             throw new IllegalStateException("Failed to execute query (grid is stopping).");
 
         try {
-            final boolean keepBinary = cctx.keepBinary();
-
             return executeQuery(GridCacheQueryType.SQL_FIELDS, qry.getSql(), cctx, new IgniteOutClosureX<QueryCursor<List<?>>>() {
                 @Override public QueryCursor<List<?>> applyx() throws IgniteCheckedException {
-                    final String space = cctx.name();
-                    final String sql = qry.getSql();
-                    final Object[] args = qry.getArgs();
-                    final GridQueryCancel cancel = new GridQueryCancel();
+                    GridQueryCancel cancel = new GridQueryCancel();
 
-                    final GridQueryFieldsResult res = idx.queryLocalSqlFields(space, sql, F.asList(args),
-                        idx.backupFilter(requestTopVer.get(), null), qry.isEnforceJoinOrder(), qry.getTimeout(), cancel);
+                    final QueryCursor<List<?>> cursor = idx.queryLocalSqlFields(cctx, qry,
+                        idx.backupFilter(requestTopVer.get(), null), cancel);
 
-                    QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>(new Iterable<List<?>>() {
+                    return new QueryCursorImpl<List<?>>(new Iterable<List<?>>() {
                         @Override public Iterator<List<?>> iterator() {
-                            try {
-                                sendQueryExecutedEvent(sql, args);
-
-                                return new GridQueryCacheObjectsIterator(res.iterator(), cctx, keepBinary);
-                            }
-                            catch (IgniteCheckedException e) {
-                                throw new IgniteException(e);
-                            }
-                        }
-                    }, cancel);
-
-                    cursor.fieldsMeta(res.metaData());
+                            sendQueryExecutedEvent(qry.getSql(), qry.getArgs());
 
-                    return cursor;
+                            return cursor.iterator();
+                        }
+                    }, cancel) {
+                        @Override public List<GridQueryFieldMetadata> fieldsMeta() {
+                            if (cursor instanceof QueryCursorImpl)
+                                return ((QueryCursorImpl)cursor).fieldsMeta();
+                            return super.fieldsMeta();
+                        }
+                    };
                 }
             }, true);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageRequest.java
index 1feff5a..acea084 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageRequest.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.processors.query.h2.twostep.messages;
 
-
 import java.nio.ByteBuffer;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.plugin.extensions.communication.Message;
@@ -35,6 +34,9 @@ public class GridQueryNextPageRequest implements Message {
     private long qryReqId;
 
     /** */
+    private int segmentId;
+
+    /** */
     private int qry;
 
     /** */
@@ -50,11 +52,13 @@ public class GridQueryNextPageRequest implements Message {
     /**
      * @param qryReqId Query request ID.
      * @param qry Query.
+     * @param segmentId Index segment ID.
      * @param pageSize Page size.
      */
-    public GridQueryNextPageRequest(long qryReqId, int qry, int pageSize) {
+    public GridQueryNextPageRequest(long qryReqId, int qry, int segmentId, int pageSize) {
         this.qryReqId = qryReqId;
         this.qry = qry;
+        this.segmentId = segmentId;
         this.pageSize = pageSize;
     }
 
@@ -72,6 +76,11 @@ public class GridQueryNextPageRequest implements Message {
         return qry;
     }
 
+    /** @return Index segment ID */
+    public int segmentId() {
+        return segmentId;
+    }
+
     /**
      * @return Page size.
      */
@@ -119,6 +128,12 @@ public class GridQueryNextPageRequest implements Message {
 
                 writer.incrementState();
 
+            case 3:
+                if (!writer.writeInt("segmentId", segmentId))
+                    return false;
+
+                writer.incrementState();
+
         }
 
         return true;
@@ -156,6 +171,14 @@ public class GridQueryNextPageRequest implements Message {
 
                 reader.incrementState();
 
+            case 3:
+                segmentId = reader.readInt("segmentId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
         }
 
         return reader.afterMessageRead(GridQueryNextPageRequest.class);
@@ -168,6 +191,6 @@ public class GridQueryNextPageRequest implements Message {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 3;
+        return 4;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java
index 4889069..e85c00b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java
@@ -42,6 +42,9 @@ public class GridQueryNextPageResponse implements Message {
     private long qryReqId;
 
     /** */
+    private int segmentId;
+
+    /** */
     private int qry;
 
     /** */
@@ -73,6 +76,7 @@ public class GridQueryNextPageResponse implements Message {
 
     /**
      * @param qryReqId Query request ID.
+     * @param segmentId Index segment ID.
      * @param qry Query.
      * @param page Page.
      * @param allRows All rows count.
@@ -80,12 +84,13 @@ public class GridQueryNextPageResponse implements Message {
      * @param vals Values for rows in this page added sequentially.
      * @param plainRows Not marshalled rows for local node.
      */
-    public GridQueryNextPageResponse(long qryReqId, int qry, int page, int allRows, int cols,
+    public GridQueryNextPageResponse(long qryReqId, int segmentId, int qry, int page, int allRows, int cols,
         Collection<Message> vals, Collection<?> plainRows) {
         assert vals != null ^ plainRows != null;
         assert cols > 0 : cols;
 
         this.qryReqId = qryReqId;
+        this.segmentId = segmentId;
         this.qry = qry;
         this.page = page;
         this.allRows = allRows;
@@ -102,6 +107,13 @@ public class GridQueryNextPageResponse implements Message {
     }
 
     /**
+     * @return Index segment ID.
+     */
+    public int segmentId() {
+        return segmentId;
+    }
+
+    /**
      * @return Query.
      */
     public int query() {
@@ -202,6 +214,12 @@ public class GridQueryNextPageResponse implements Message {
 
                 writer.incrementState();
 
+            case 7:
+                if (!writer.writeInt("segmentId", segmentId))
+                    return false;
+
+                writer.incrementState();
+
         }
 
         return true;
@@ -271,6 +289,13 @@ public class GridQueryNextPageResponse implements Message {
 
                 reader.incrementState();
 
+            case 7:
+                segmentId = reader.readInt("segmentId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
         }
 
         return reader.afterMessageRead(GridQueryNextPageResponse.class);
@@ -283,7 +308,7 @@ public class GridQueryNextPageResponse implements Message {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 7;
+        return 8;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
index f53936f..c127eeb 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
@@ -69,6 +69,9 @@ public class GridCacheTwoStepQuery {
     /** */
     private List<Integer> extraCaches;
 
+    /** */
+    private boolean local;
+
     /**
      * @param originalSql Original query SQL.
      * @param schemas Schema names in query.
@@ -229,6 +232,20 @@ public class GridCacheTwoStepQuery {
     }
 
     /**
+     * @return {@code True} If query is local.
+     */
+    public boolean isLocal() {
+        return local;
+    }
+
+    /**
+     * @param local Local query flag.
+     */
+    public void local(boolean local) {
+        this.local = local;
+    }
+
+    /**
      * @param args New arguments to copy with.
      * @return Copy.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index e4b0c1f..2f40d87 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -77,11 +77,13 @@ import org.apache.ignite.internal.processors.cache.CacheObjectContext;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
 import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
 import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
+import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator;
 import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
 import org.apache.ignite.internal.processors.query.GridQueryCancel;
 import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
@@ -92,6 +94,7 @@ import org.apache.ignite.internal.processors.query.GridQueryIndexing;
 import org.apache.ignite.internal.processors.query.GridQueryProperty;
 import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2DefaultTableEngine;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOffheap;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOnheap;
@@ -187,6 +190,8 @@ import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryTy
 import static org.apache.ignite.internal.processors.query.GridQueryIndexType.FULLTEXT;
 import static org.apache.ignite.internal.processors.query.GridQueryIndexType.GEO_SPATIAL;
 import static org.apache.ignite.internal.processors.query.GridQueryIndexType.SORTED;
+import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF;
+import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.distributedJoinMode;
 import static org.apache.ignite.internal.processors.query.h2.opt.GridH2AbstractKeyValueRow.KEY_COL;
 import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.LOCAL;
 import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.PREPARE;
@@ -810,10 +815,22 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             removeTable(tbl);
     }
 
-    /** {@inheritDoc} */
+    /**
+     * Queries individual fields (generally used by JDBC drivers).
+     *
+     * @param spaceName Space name.
+     * @param qry Query.
+     * @param params Query parameters.
+     * @param filter Space name and key filter.
+     * @param enforceJoinOrder Enforce join order of tables in the query.
+     * @param timeout Query timeout in milliseconds.
+     * @param cancel Query cancel.
+     * @return Query result.
+     * @throws IgniteCheckedException If failed.
+     */
     @SuppressWarnings("unchecked")
-    @Override public GridQueryFieldsResult queryLocalSqlFields(@Nullable final String spaceName, final String qry,
-        @Nullable final Collection<Object> params, final IndexingQueryFilter filters, boolean enforceJoinOrder,
+    public GridQueryFieldsResult queryLocalSqlFields(@Nullable final String spaceName, final String qry,
+        @Nullable final Collection<Object> params, final IndexingQueryFilter filter, boolean enforceJoinOrder,
         final int timeout, final GridQueryCancel cancel)
         throws IgniteCheckedException {
         final Connection conn = connectionForSpace(spaceName);
@@ -833,7 +850,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             fldsQry.setEnforceJoinOrder(enforceJoinOrder);
             fldsQry.setTimeout(timeout, TimeUnit.MILLISECONDS);
 
-            return dmlProc.updateLocalSqlFields(spaceName, stmt, fldsQry, filters, cancel);
+            return dmlProc.updateLocalSqlFields(spaceName, stmt, fldsQry, filter, cancel);
         }
 
         List<GridQueryFieldMetadata> meta;
@@ -846,7 +863,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         }
 
         final GridH2QueryContext ctx = new GridH2QueryContext(nodeId, nodeId, 0, LOCAL)
-            .filter(filters).distributedJoins(false);
+            .filter(filter).distributedJoinMode(OFF);
 
         return new GridQueryFieldsResultAdapter(meta, null) {
             @Override public GridCloseableIterator<List<?>> iterator() throws IgniteCheckedException {
@@ -1099,14 +1116,113 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     }
 
     /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
-    @Override public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> queryLocalSql(@Nullable String spaceName,
-        final String qry, String alias, @Nullable final Collection<Object> params, GridQueryTypeDescriptor type,
-        final IndexingQueryFilter filter) throws IgniteCheckedException {
-        final TableDescriptor tbl = tableDescriptor(spaceName, type);
+    @Override public <K, V> QueryCursor<List<?>> queryLocalSqlFields(final GridCacheContext<?, ?> cctx,
+        final SqlFieldsQuery qry, final IndexingQueryFilter filter, final GridQueryCancel cancel)
+        throws IgniteCheckedException {
+
+        if (cctx.config().getQueryParallelism() > 1) {
+            qry.setDistributedJoins(true);
+
+            assert qry.isLocal();
+
+            return queryTwoStep(cctx, qry, cancel);
+        }
+        else {
+            final boolean keepBinary = cctx.keepBinary();
+
+            final String space = cctx.name();
+            final String sql = qry.getSql();
+            final Object[] args = qry.getArgs();
+
+            final GridQueryFieldsResult res = queryLocalSqlFields(space, sql, F.asList(args), filter,
+                qry.isEnforceJoinOrder(), qry.getTimeout(), cancel);
+
+            QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>(new Iterable<List<?>>() {
+                @Override public Iterator<List<?>> iterator() {
+                    try {
+                        return new GridQueryCacheObjectsIterator(res.iterator(), cctx, keepBinary);
+                    }
+                    catch (IgniteCheckedException e) {
+                        throw new IgniteException(e);
+                    }
+                }
+            }, cancel);
+
+            cursor.fieldsMeta(res.metaData());
+
+            return cursor;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public <K, V> QueryCursor<Cache.Entry<K,V>> queryLocalSql(final GridCacheContext<?, ?> cctx,
+        final SqlQuery qry, final IndexingQueryFilter filter, final boolean keepBinary) throws IgniteCheckedException {
+        if (cctx.config().getQueryParallelism() > 1) {
+            qry.setDistributedJoins(true);
+
+            assert qry.isLocal();
+
+            return queryTwoStep(cctx, qry);
+        }
+        else {
+            String space = cctx.name();
+            String type = qry.getType();
+            String sqlQry = qry.getSql();
+            String alias = qry.getAlias();
+            Object[] params = qry.getArgs();
+
+            GridQueryCancel cancel = new GridQueryCancel();
+
+            final GridCloseableIterator<IgniteBiTuple<K, V>> i = queryLocalSql(space, sqlQry, alias,
+                F.asList(params), type, filter, cancel);
+
+            return new QueryCursorImpl<Cache.Entry<K, V>>(new Iterable<Cache.Entry<K, V>>() {
+                @Override public Iterator<Cache.Entry<K, V>> iterator() {
+                    return new ClIter<Cache.Entry<K, V>>() {
+                        @Override public void close() throws Exception {
+                            i.close();
+                        }
+
+                        @Override public boolean hasNext() {
+                            return i.hasNext();
+                        }
+
+                        @Override public Cache.Entry<K, V> next() {
+                            IgniteBiTuple<K, V> t = i.next();
+
+                            return new CacheEntryImpl<>(
+                                (K)cctx.unwrapBinaryIfNeeded(t.get1(), keepBinary, false),
+                                (V)cctx.unwrapBinaryIfNeeded(t.get2(), keepBinary, false));
+                        }
+
+                        @Override public void remove() {
+                            throw new UnsupportedOperationException();
+                        }
+                    };
+                }
+            }, cancel);
+        }
+    }
+
+    /**
+     * Executes regular query.
+     *
+     * @param spaceName Space name.
+     * @param qry Query.
+     * @param alias Table alias.
+     * @param params Query parameters.
+     * @param type Query return type.
+     * @param filter Space name and key filter.
+     * @return Queried rows.
+     * @throws IgniteCheckedException If failed.
+     */
+    public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> queryLocalSql(@Nullable String spaceName,
+        final String qry, String alias, @Nullable final Collection<Object> params, String type,
+        final IndexingQueryFilter filter, GridQueryCancel cancel) throws IgniteCheckedException {
+        final TableDescriptor tbl = tableDescriptor(type, spaceName);
 
         if (tbl == null)
-            throw new IgniteSQLException("Failed to find SQL table for type: " + type.name(),
+            throw new IgniteSQLException("Failed to find SQL table for type: " + type,
                 IgniteQueryErrorCode.TABLE_NOT_FOUND);
 
         String sql = generateQuery(qry, alias, tbl);
@@ -1115,7 +1231,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
         setupConnection(conn, false, false);
 
-        GridH2QueryContext.set(new GridH2QueryContext(nodeId, nodeId, 0, LOCAL).filter(filter).distributedJoins(false));
+        GridH2QueryContext.set(new GridH2QueryContext(nodeId, nodeId, 0, LOCAL).filter(filter)
+            .distributedJoinMode(OFF));
 
         GridRunningQueryInfo run = new GridRunningQueryInfo(qryIdGen.incrementAndGet(), qry, SQL, spaceName,
             U.currentTimeMillis(), null, true);
@@ -1123,7 +1240,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         runs.put(run.id(), run);
 
         try {
-            ResultSet rs = executeSqlQueryWithTimer(spaceName, conn, sql, params, true, 0, null);
+            ResultSet rs = executeSqlQueryWithTimer(spaceName, conn, sql, params, true, 0, cancel);
 
             return new KeyValIterator(rs);
         }
@@ -1178,8 +1295,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         fqry.setArgs(qry.getArgs());
         fqry.setPageSize(qry.getPageSize());
         fqry.setDistributedJoins(qry.isDistributedJoins());
+        fqry.setLocal(qry.isLocal());
 
-        if(qry.getTimeout() > 0)
+        if (qry.getTimeout() > 0)
             fqry.setTimeout(qry.getTimeout(), TimeUnit.MILLISECONDS);
 
         final QueryCursor<List<?>> res = queryTwoStep(cctx, fqry, null);
@@ -1234,11 +1352,13 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         final boolean distributedJoins = qry.isDistributedJoins() && cctx.isPartitioned();
         final boolean grpByCollocated = qry.isCollocated();
 
+        final DistributedJoinMode distributedJoinMode = distributedJoinMode(qry.isLocal(), distributedJoins);
+
         GridCacheTwoStepQuery twoStepQry;
         List<GridQueryFieldMetadata> meta;
 
         final TwoStepCachedQueryKey cachedQryKey = new TwoStepCachedQueryKey(space, sqlQry, grpByCollocated,
-            distributedJoins, enforceJoinOrder);
+            distributedJoins, enforceJoinOrder, qry.isLocal());
         TwoStepCachedQuery cachedQry = twoStepCache.get(cachedQryKey);
 
         if (cachedQry != null) {
@@ -1251,7 +1371,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             setupConnection(c, distributedJoins, enforceJoinOrder);
 
             GridH2QueryContext.set(new GridH2QueryContext(locNodeId, locNodeId, 0, PREPARE)
-                .distributedJoins(distributedJoins));
+                .distributedJoinMode(distributedJoinMode));
 
             PreparedStatement stmt;
 
@@ -1286,9 +1406,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                 GridH2QueryContext.clearThreadLocal();
             }
 
-            Prepared prepared = GridSqlQueryParser.prepared((JdbcPreparedStatement) stmt);
+            Prepared prepared = GridSqlQueryParser.prepared((JdbcPreparedStatement)stmt);
 
-            if (qry instanceof JdbcSqlFieldsQuery && ((JdbcSqlFieldsQuery) qry).isQuery() != prepared.isQuery())
+            if (qry instanceof JdbcSqlFieldsQuery && ((JdbcSqlFieldsQuery)qry).isQuery() != prepared.isQuery())
                 throw new IgniteSQLException("Given statement type does not match that declared by JDBC driver",
                     IgniteQueryErrorCode.STMT_TYPE_MISMATCH);
 
@@ -1341,8 +1461,12 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                     extraCaches = null;
                 }
 
+                //Prohibit usage indices with different numbers of segments in same query.
+                checkCacheIndexSegmentation(caches);
+
                 twoStepQry.caches(caches);
                 twoStepQry.extraCaches(extraCaches);
+                twoStepQry.local(qry.isLocal());
 
                 meta = meta(stmt.getMetaData());
             }
@@ -1380,6 +1504,32 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     }
 
     /**
+     * @throws IllegalStateException if segmented indices used with non-segmented indices.
+     */
+    private void checkCacheIndexSegmentation(List<Integer> caches) {
+        if (caches.isEmpty())
+            return; //Nnothing to check
+
+        GridCacheSharedContext sharedContext = ctx.cache().context();
+
+        int expectedParallelism = 0;
+
+        for (int i = 0; i < caches.size(); i++) {
+            GridCacheContext cctx = sharedContext.cacheContext(caches.get(i));
+
+            assert cctx != null;
+
+            if(!cctx.isPartitioned())
+                continue;
+
+            if(expectedParallelism == 0)
+                expectedParallelism = cctx.config().getQueryParallelism();
+            else if (expectedParallelism != 0 && cctx.config().getQueryParallelism() != expectedParallelism)
+                throw new IllegalStateException("Using indexes with different parallelism levels in same query is forbidden.");
+        }
+    }
+
+    /**
      * Prepares statement for query.
      *
      * @param qry Query string.
@@ -1669,7 +1819,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     private void cleanupStatementCache() {
         long cur = U.currentTimeMillis();
 
-        for(Iterator<Map.Entry<Thread, StatementCache>> it = stmtCache.entrySet().iterator(); it.hasNext(); ) {
+        for (Iterator<Map.Entry<Thread, StatementCache>> it = stmtCache.entrySet().iterator(); it.hasNext(); ) {
             Map.Entry<Thread, StatementCache> entry = it.next();
 
             Thread t = entry.getKey();
@@ -1877,6 +2027,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
         for (ClusterNode node : nodes) {
             if (node.isLocal()) {
+                if (locNode != null)
+                    throw new IllegalStateException();
+
                 locNode = node;
 
                 continue;
@@ -2163,23 +2316,29 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         /** */
         private final boolean enforceJoinOrder;
 
+        /** */
+        private final boolean isLocal;
+
         /**
          * @param space Space.
          * @param sql Sql.
          * @param grpByCollocated Collocated GROUP BY.
          * @param distributedJoins Distributed joins enabled.
          * @param enforceJoinOrder Enforce join order of tables.
+         * @param isLocal Query is local flag.
          */
         private TwoStepCachedQueryKey(String space,
             String sql,
             boolean grpByCollocated,
             boolean distributedJoins,
-            boolean enforceJoinOrder) {
+            boolean enforceJoinOrder,
+            boolean isLocal) {
             this.space = space;
             this.sql = sql;
             this.grpByCollocated = grpByCollocated;
             this.distributedJoins = distributedJoins;
             this.enforceJoinOrder = enforceJoinOrder;
+            this.isLocal = isLocal;
         }
 
         /** {@inheritDoc} */
@@ -2204,7 +2363,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             if (space != null ? !space.equals(that.space) : that.space != null)
                 return false;
 
-            return sql.equals(that.sql);
+            return isLocal == that.isLocal && sql.equals(that.sql);
         }
 
         /** {@inheritDoc} */
@@ -2212,8 +2371,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             int res = space != null ? space.hashCode() : 0;
             res = 31 * res + sql.hashCode();
             res = 31 * res + (grpByCollocated ? 1 : 0);
-            res = 31 * res + (distributedJoins ? 1 : 0);
-            res = 31 * res + (enforceJoinOrder ? 1 : 0);
+            res = res + (distributedJoins ? 2 : 0);
+            res = res + (enforceJoinOrder ? 4 : 0);
+            res = res + (isLocal ? 8 : 0);
 
             return res;
         }
@@ -2572,7 +2732,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                 affCol = null;
 
             // Add primary key index.
-            idxs.add(new GridH2TreeIndex("_key_PK", tbl, true,
+            idxs.add(createTreeIndex("_key_PK", tbl, true,
                 treeIndexColumns(new ArrayList<IndexColumn>(2), keyCol, affCol)));
 
             if (type().valueClass() == String.class) {
@@ -2618,7 +2778,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
                         cols = treeIndexColumns(cols, keyCol, affCol);
 
-                        idxs.add(new GridH2TreeIndex(name, tbl, false, cols));
+                        idxs.add(createTreeIndex(name, tbl, false, cols));
                     }
                     else if (idx.type() == GEO_SPATIAL)
                         idxs.add(createH2SpatialIndex(tbl, name, cols.toArray(new IndexColumn[cols.size()])));
@@ -2629,7 +2789,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
             // Add explicit affinity key index if nothing alike was found.
             if (affCol != null && !affIdxFound) {
-                idxs.add(new GridH2TreeIndex("AFFINITY_KEY", tbl, false,
+                idxs.add(createTreeIndex("AFFINITY_KEY", tbl, false,
                     treeIndexColumns(new ArrayList<IndexColumn>(2), affCol, keyCol)));
             }
 
@@ -2676,6 +2836,22 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                 throw new IgniteException("Failed to instantiate: " + className, e);
             }
         }
+
+        /**
+         * @param idxName Index name.
+         * @param tbl Table.
+         * @param pk Primary key flag.
+         * @param columns Index column list.
+         * @return
+         */
+        private Index createTreeIndex(String idxName, GridH2Table tbl, boolean pk, List<IndexColumn> columns) {
+            GridCacheContext<?, ?> cctx = tbl.rowDescriptor().context();
+
+            if (cctx != null && cctx.config().getQueryParallelism() > 1)
+                return new GridH2TreeIndex(idxName, tbl, pk, columns, cctx.config().getQueryParallelism());
+
+            return new GridH2TreeIndex(idxName, tbl, pk, columns, 1);
+        }
     }
 
     /**
@@ -2729,6 +2905,13 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     }
 
     /**
+     * Closeable iterator.
+     */
+    private interface ClIter<X> extends AutoCloseable, Iterator<X> {
+        // No-op.
+    }
+
+    /**
      * Field descriptor.
      */
     static class SqlFieldMetadata implements GridQueryFieldMetadata {

http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/DistributedJoinMode.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/DistributedJoinMode.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/DistributedJoinMode.java
new file mode 100644
index 0000000..cc06244
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/DistributedJoinMode.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.opt;
+
+/**
+ * Defines set of distributed join modes.
+ */
+public enum DistributedJoinMode {
+    /**
+     * Distributed joins is disabled. Local joins will be performed instead.
+     */
+    OFF,
+
+    /**
+     * Distributed joins is enabled within local node only.
+     *
+     * NOTE: This mode is used with segmented indices for local sql queries.
+     * As in this case we need to make distributed join across local index segments
+     * and prevent range-queries to other nodes.
+     */
+    LOCAL_ONLY,
+
+    /**
+     * Distributed joins is enabled.
+     */
+    ON;
+
+    /**
+     * @param isLocal Query local flag.
+     * @param distributedJoins Query distributed joins flag.
+     * @return DistributedJoinMode for the query.
+     */
+    public static DistributedJoinMode distributedJoinMode(boolean isLocal, boolean distributedJoins) {
+        return distributedJoins ? (isLocal ? LOCAL_ONLY : ON) : OFF;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
index bab219c..131e03b 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
@@ -81,6 +81,8 @@ import org.jetbrains.annotations.Nullable;
 
 import static java.util.Collections.emptyIterator;
 import static java.util.Collections.singletonList;
+import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.LOCAL_ONLY;
+import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF;
 import static org.apache.ignite.internal.processors.query.h2.opt.GridH2AbstractKeyValueRow.KEY_COL;
 import static org.apache.ignite.internal.processors.query.h2.opt.GridH2AbstractKeyValueRow.VAL_COL;
 import static org.apache.ignite.internal.processors.query.h2.opt.GridH2CollocationModel.buildCollocationModel;
@@ -178,6 +180,13 @@ public abstract class GridH2IndexBase extends BaseIndex {
     }
 
     /**
+     * @return Index segment ID for current query context.
+     */
+    protected int threadLocalSegment() {
+       return 0;
+    }
+
+    /**
      * If the index supports rebuilding it has to creates its own copy.
      *
      * @return Rebuilt copy.
@@ -252,7 +261,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
         // because on run stage reordering of joined tables by Optimizer is explicitly disabled
         // and thus multiplier will be always the same, so it will not affect choice of index.
         // Query expressions can not be distributed as well.
-        if (qctx == null || qctx.type() != PREPARE || !qctx.distributedJoins() || ses.isPreparingQueryExpression())
+        if (qctx == null || qctx.type() != PREPARE || qctx.distributedJoinMode() == OFF || ses.isPreparingQueryExpression())
             return GridH2CollocationModel.MULTIPLIER_COLLOCATED;
 
         // We have to clear this cache because normally sub-query plan cost does not depend on anything
@@ -363,7 +372,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
     @Override public IndexLookupBatch createLookupBatch(TableFilter filter) {
         GridH2QueryContext qctx = GridH2QueryContext.get();
 
-        if (qctx == null || !qctx.distributedJoins() || !getTable().isPartitioned())
+        if (qctx == null || qctx.distributedJoinMode() == OFF || !getTable().isPartitioned())
             return null;
 
         IndexColumn affCol = getTable().getAffinityKeyColumn();
@@ -381,9 +390,11 @@ public abstract class GridH2IndexBase extends BaseIndex {
             ucast = false;
         }
 
-        GridCacheContext<?,?> cctx = getTable().rowDescriptor().context();
+        GridCacheContext<?, ?> cctx = getTable().rowDescriptor().context();
 
-        return new DistributedLookupBatch(cctx, ucast, affColId);
+        boolean isLocal = qctx.distributedJoinMode() == LOCAL_ONLY;
+
+        return new DistributedLookupBatch(cctx, ucast, affColId, isLocal);
     }
 
     /**
@@ -437,18 +448,18 @@ public abstract class GridH2IndexBase extends BaseIndex {
      * @param node Requesting node.
      * @param msg Request message.
      */
-    private void onIndexRangeRequest(ClusterNode node, GridH2IndexRangeRequest msg) {
-        GridH2QueryContext qctx = GridH2QueryContext.get(kernalContext().localNodeId(),
-            msg.originNodeId(),
-            msg.queryId(),
-            MAP);
-
+    private void onIndexRangeRequest(final ClusterNode node, final GridH2IndexRangeRequest msg) {
         GridH2IndexRangeResponse res = new GridH2IndexRangeResponse();
 
         res.originNodeId(msg.originNodeId());
         res.queryId(msg.queryId());
+        res.originSegmentId(msg.originSegmentId());
+        res.segment(msg.segment());
         res.batchLookupId(msg.batchLookupId());
 
+        GridH2QueryContext qctx = GridH2QueryContext.get(kernalContext().localNodeId(), msg.originNodeId(),
+            msg.queryId(), msg.originSegmentId(), MAP);
+
         if (qctx == null)
             res.status(STATUS_NOT_FOUND);
         else {
@@ -461,11 +472,11 @@ public abstract class GridH2IndexBase extends BaseIndex {
 
                     assert !msg.bounds().isEmpty() : "empty bounds";
 
-                    src = new RangeSource(msg.bounds(), snapshot0, qctx.filter());
+                    src = new RangeSource(msg.bounds(), msg.segment(), snapshot0, qctx.filter());
                 }
                 else {
                     // This is request to fetch next portion of data.
-                    src = qctx.getSource(node.id(), msg.batchLookupId());
+                    src = qctx.getSource(node.id(), msg.segment(), msg.batchLookupId());
 
                     assert src != null;
                 }
@@ -491,11 +502,11 @@ public abstract class GridH2IndexBase extends BaseIndex {
                 if (src.hasMoreRows()) {
                     // Save source for future fetches.
                     if (msg.bounds() != null)
-                        qctx.putSource(node.id(), msg.batchLookupId(), src);
+                        qctx.putSource(node.id(), msg.segment(), msg.batchLookupId(), src);
                 }
                 else if (msg.bounds() == null) {
                     // Drop saved source.
-                    qctx.putSource(node.id(), msg.batchLookupId(), null);
+                    qctx.putSource(node.id(), msg.segment(), msg.batchLookupId(), null);
                 }
 
                 assert !ranges.isEmpty();
@@ -520,17 +531,17 @@ public abstract class GridH2IndexBase extends BaseIndex {
      */
     private void onIndexRangeResponse(ClusterNode node, GridH2IndexRangeResponse msg) {
         GridH2QueryContext qctx = GridH2QueryContext.get(kernalContext().localNodeId(),
-            msg.originNodeId(), msg.queryId(), MAP);
+            msg.originNodeId(), msg.queryId(), msg.originSegmentId(), MAP);
 
         if (qctx == null)
             return;
 
-        Map<ClusterNode, RangeStream> streams = qctx.getStreams(msg.batchLookupId());
+        Map<SegmentKey, RangeStream> streams = qctx.getStreams(msg.batchLookupId());
 
         if (streams == null)
             return;
 
-        RangeStream stream = streams.get(node);
+        RangeStream stream = streams.get(new SegmentKey(node, msg.segment()));
 
         assert stream != null;
 
@@ -549,47 +560,69 @@ public abstract class GridH2IndexBase extends BaseIndex {
     /**
      * @param qctx Query context.
      * @param batchLookupId Batch lookup ID.
+     * @param segmentId Segment ID.
      * @return Index range request.
      */
-    private static GridH2IndexRangeRequest createRequest(GridH2QueryContext qctx, int batchLookupId) {
+    private static GridH2IndexRangeRequest createRequest(GridH2QueryContext qctx, int batchLookupId, int segmentId) {
         GridH2IndexRangeRequest req = new GridH2IndexRangeRequest();
 
         req.originNodeId(qctx.originNodeId());
         req.queryId(qctx.queryId());
+        req.originSegmentId(qctx.segment());
+        req.segment(segmentId);
         req.batchLookupId(batchLookupId);
 
         return req;
     }
 
+
     /**
      * @param qctx Query context.
      * @param cctx Cache context.
+     * @param isLocalQry Local query flag.
      * @return Collection of nodes for broadcasting.
      */
-    private List<ClusterNode> broadcastNodes(GridH2QueryContext qctx, GridCacheContext<?,?> cctx) {
+    private List<SegmentKey> broadcastSegments(GridH2QueryContext qctx, GridCacheContext<?, ?> cctx, boolean isLocalQry) {
         Map<UUID, int[]> partMap = qctx.partitionsMap();
 
-        List<ClusterNode> res;
+        List<ClusterNode> nodes;
+
+        if (isLocalQry) {
+            if (partMap != null && !partMap.containsKey(cctx.localNodeId()))
+                return Collections.<SegmentKey>emptyList(); // Prevent remote index call for local queries.
 
-        if (partMap == null)
-            res = new ArrayList<>(CU.affinityNodes(cctx, qctx.topologyVersion()));
+            nodes = Collections.singletonList(cctx.localNode());
+        }
         else {
-            res = new ArrayList<>(partMap.size());
+            if (partMap == null)
+                nodes = new ArrayList<>(CU.affinityNodes(cctx, qctx.topologyVersion()));
+            else {
+                nodes = new ArrayList<>(partMap.size());
 
-            GridKernalContext ctx = kernalContext();
+                GridKernalContext ctx = kernalContext();
 
-            for (UUID nodeId : partMap.keySet()) {
-                ClusterNode node = ctx.discovery().node(nodeId);
+                for (UUID nodeId : partMap.keySet()) {
+                    ClusterNode node = ctx.discovery().node(nodeId);
 
-                if (node == null)
-                    throw new GridH2RetryException("Failed to find node.");
+                    if (node == null)
+                        throw new GridH2RetryException("Failed to find node.");
 
-                res.add(node);
+                    nodes.add(node);
+                }
             }
+
+            if (F.isEmpty(nodes))
+                throw new GridH2RetryException("Failed to collect affinity nodes.");
         }
 
-        if (F.isEmpty(res))
-            throw new GridH2RetryException("Failed to collect affinity nodes.");
+        int segmentsCount = segmentsCount();
+
+        List<SegmentKey> res = new ArrayList<>(nodes.size() * segmentsCount);
+
+        for (ClusterNode node : nodes) {
+            for (int seg = 0; seg < segmentsCount; seg++)
+                res.add(new SegmentKey(node, seg));
+        }
 
         return res;
     }
@@ -598,26 +631,81 @@ public abstract class GridH2IndexBase extends BaseIndex {
      * @param cctx Cache context.
      * @param qctx Query context.
      * @param affKeyObj Affinity key.
-     * @return Cluster nodes or {@code null} if affinity key is a null value.
+     * @param isLocalQry Local query flag.
+     * @return Segment key for Affinity key.
      */
-    private ClusterNode rangeNode(GridCacheContext<?,?> cctx, GridH2QueryContext qctx, Object affKeyObj) {
+    private SegmentKey rangeSegment(GridCacheContext<?, ?> cctx, GridH2QueryContext qctx, Object affKeyObj, boolean isLocalQry) {
         assert affKeyObj != null && affKeyObj != EXPLICIT_NULL : affKeyObj;
 
         ClusterNode node;
 
-        if (qctx.partitionsMap() != null) {
-            // If we have explicit partitions map, we have to use it to calculate affinity node.
-            UUID nodeId = qctx.nodeForPartition(cctx.affinity().partition(affKeyObj), cctx);
+        int partition = cctx.affinity().partition(affKeyObj);
+
+        if (isLocalQry) {
+            if (qctx.partitionsMap() != null) {
+                // If we have explicit partitions map, we have to use it to calculate affinity node.
+                UUID nodeId = qctx.nodeForPartition(partition, cctx);
+
+                if(!cctx.localNodeId().equals(nodeId))
+                    return null; // Prevent remote index call for local queries.
+            }
+
+            if (!cctx.affinity().primaryByKey(cctx.localNode(), partition, qctx.topologyVersion()))
+                return null;
+
+            node = cctx.localNode();
+        }
+        else{
+            if (qctx.partitionsMap() != null) {
+                // If we have explicit partitions map, we have to use it to calculate affinity node.
+                UUID nodeId = qctx.nodeForPartition(partition, cctx);
 
             node = cctx.discovery().node(nodeId);
         }
         else // Get primary node for current topology version.
             node = cctx.affinity().primaryByKey(affKeyObj, qctx.topologyVersion());
 
-        if (node == null) // Node was not found, probably topology changed and we need to retry the whole query.
-            throw new GridH2RetryException("Failed to find node.");
+            if (node == null) // Node was not found, probably topology changed and we need to retry the whole query.
+                throw new GridH2RetryException("Failed to find node.");
+        }
+
+        return new SegmentKey(node, segment(partition));
+    }
+
+    /** */
+    protected class SegmentKey {
+        /** */
+        final ClusterNode node;
+
+        /** */
+        final int segmentId;
+
+        SegmentKey(ClusterNode node, int segmentId) {
+            assert node != null;
+
+            this.node = node;
+            this.segmentId = segmentId;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            SegmentKey key = (SegmentKey)o;
+
+            return segmentId == key.segmentId && node.id().equals(key.node.id());
 
-        return node;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            int result = node.hashCode();
+            result = 31 * result + segmentId;
+            return result;
+        }
     }
 
     /**
@@ -740,6 +828,20 @@ public abstract class GridH2IndexBase extends BaseIndex {
         return database.createRow(vals0, MEMORY_CALCULATE);
     }
 
+    /** @return Index segments count. */
+    protected int segmentsCount() {
+        return 1;
+    }
+
+    /**
+     * @param partition Partition idx.
+     * @return Segment ID for given key
+     */
+    protected int segment(int partition) {
+        return 0;
+    }
+
+
     /**
      * Simple cursor from a single node.
      */
@@ -752,14 +854,14 @@ public abstract class GridH2IndexBase extends BaseIndex {
 
         /**
          * @param rangeId Range ID.
-         * @param nodes Remote nodes.
+         * @param keys Remote index segment keys.
          * @param rangeStreams Range streams.
          */
-        private UnicastCursor(int rangeId, Collection<ClusterNode> nodes, Map<ClusterNode,RangeStream> rangeStreams) {
-            assert nodes.size() == 1;
+        UnicastCursor(int rangeId, List<SegmentKey> keys, Map<SegmentKey, RangeStream> rangeStreams) {
+            assert keys.size() == 1;
 
             this.rangeId = rangeId;
-            this.stream = rangeStreams.get(F.first(nodes));
+            this.stream = rangeStreams.get(F.first(keys));
 
             assert stream != null;
         }
@@ -803,20 +905,19 @@ public abstract class GridH2IndexBase extends BaseIndex {
 
         /**
          * @param rangeId Range ID.
-         * @param nodes Remote nodes.
+         * @param segmentKeys Remote nodes.
          * @param rangeStreams Range streams.
          */
-        private BroadcastCursor(int rangeId, Collection<ClusterNode> nodes, Map<ClusterNode,RangeStream> rangeStreams) {
-            assert nodes.size() > 1;
+        BroadcastCursor(int rangeId, Collection<SegmentKey> segmentKeys, Map<SegmentKey, RangeStream> rangeStreams) {
 
             this.rangeId = rangeId;
 
-            streams = new RangeStream[nodes.size()];
+            streams = new RangeStream[segmentKeys.size()];
 
             int i = 0;
 
-            for (ClusterNode node : nodes) {
-                RangeStream stream = rangeStreams.get(node);
+            for (SegmentKey segmentKey : segmentKeys) {
+                RangeStream stream = rangeStreams.get(segmentKey);
 
                 assert stream != null;
 
@@ -928,16 +1029,19 @@ public abstract class GridH2IndexBase extends BaseIndex {
         final int affColId;
 
         /** */
+        private final boolean localQuery;
+
+        /** */
         GridH2QueryContext qctx;
 
         /** */
         int batchLookupId;
 
         /** */
-        Map<ClusterNode, RangeStream> rangeStreams = Collections.emptyMap();
+        Map<SegmentKey, RangeStream> rangeStreams = Collections.emptyMap();
 
         /** */
-        List<ClusterNode> broadcastNodes;
+        List<SegmentKey> broadcastSegments;
 
         /** */
         List<Future<Cursor>> res = Collections.emptyList();
@@ -952,11 +1056,13 @@ public abstract class GridH2IndexBase extends BaseIndex {
          * @param cctx Cache Cache context.
          * @param ucast Unicast or broadcast query.
          * @param affColId Affinity column ID.
+         * @param localQuery Local query flag.
          */
-        private DistributedLookupBatch(GridCacheContext<?,?> cctx, boolean ucast, int affColId) {
+        DistributedLookupBatch(GridCacheContext<?, ?> cctx, boolean ucast, int affColId, boolean localQuery) {
             this.cctx = cctx;
             this.ucast = ucast;
             this.affColId = affColId;
+            this.localQuery = localQuery;
         }
 
         /**
@@ -1028,7 +1134,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
 
             Object affKey = affColId == -1 ? null : getAffinityKey(firstRow, lastRow);
 
-            List<ClusterNode> nodes;
+            List<SegmentKey> segmentKeys;
             Future<Cursor> fut;
 
             if (affKey != null) {
@@ -1036,17 +1142,20 @@ public abstract class GridH2IndexBase extends BaseIndex {
                 if (affKey == EXPLICIT_NULL) // Affinity key is explicit null, we will not find anything.
                     return false;
 
-                nodes = F.asList(rangeNode(cctx, qctx, affKey));
+                segmentKeys = F.asList(rangeSegment(cctx, qctx, affKey, localQuery));
             }
             else {
                 // Affinity key is not provided or is not the same in upper and lower bounds, we have to broadcast.
-                if (broadcastNodes == null)
-                    broadcastNodes = broadcastNodes(qctx, cctx);
+                if (broadcastSegments == null)
+                    broadcastSegments = broadcastSegments(qctx, cctx, localQuery);
 
-                nodes = broadcastNodes;
+                segmentKeys = broadcastSegments;
             }
 
-            assert !F.isEmpty(nodes) : nodes;
+            if (localQuery && segmentKeys.isEmpty())
+                return false; // Nothing to do
+
+            assert !F.isEmpty(segmentKeys) : segmentKeys;
 
             final int rangeId = res.size();
 
@@ -1058,21 +1167,21 @@ public abstract class GridH2IndexBase extends BaseIndex {
             GridH2RowRangeBounds rangeBounds = rangeBounds(rangeId, first, last);
 
             // Add range to every message of every participating node.
-            for (int i = 0; i < nodes.size(); i++) {
-                ClusterNode node = nodes.get(i);
-                assert node != null;
+            for (int i = 0; i < segmentKeys.size(); i++) {
+                SegmentKey segmentKey = segmentKeys.get(i);
+                assert segmentKey != null;
 
-                RangeStream stream = rangeStreams.get(node);
+                RangeStream stream = rangeStreams.get(segmentKey);
 
                 List<GridH2RowRangeBounds> bounds;
 
                 if (stream == null) {
-                    stream = new RangeStream(qctx, node);
+                    stream = new RangeStream(qctx, segmentKey.node);
 
-                    stream.req = createRequest(qctx, batchLookupId);
+                    stream.req = createRequest(qctx, batchLookupId, segmentKey.segmentId);
                     stream.req.bounds(bounds = new ArrayList<>());
 
-                    rangeStreams.put(node, stream);
+                    rangeStreams.put(segmentKey, stream);
                 }
                 else
                     bounds = stream.req.bounds();
@@ -1084,9 +1193,9 @@ public abstract class GridH2IndexBase extends BaseIndex {
                     batchFull = true;
             }
 
-            fut = new DoneFuture<>(nodes.size() == 1 ?
-                new UnicastCursor(rangeId, nodes, rangeStreams) :
-                new BroadcastCursor(rangeId, nodes, rangeStreams));
+            fut = new DoneFuture<>(segmentKeys.size() == 1 ?
+                new UnicastCursor(rangeId, segmentKeys, rangeStreams) :
+                new BroadcastCursor(rangeId, segmentKeys, rangeStreams));
 
             res.add(fut);
 
@@ -1138,7 +1247,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
             batchLookupId = 0;
 
             rangeStreams = Collections.emptyMap();
-            broadcastNodes = null;
+            broadcastSegments = null;
             batchFull = false;
             findCalled = false;
             res = Collections.emptyList();
@@ -1244,7 +1353,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
 
                             if (remainingRanges > 0) {
                                 if (req.bounds() != null)
-                                    req = createRequest(qctx, req.batchLookupId());
+                                    req = createRequest(qctx, req.batchLookupId(), req.segment());
 
                                 // Prefetch next page.
                                 send(singletonList(node), req);
@@ -1366,6 +1475,9 @@ public abstract class GridH2IndexBase extends BaseIndex {
         final ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> tree;
 
         /** */
+        private final int segment;
+
+        /** */
         final IndexingQueryFilter filter;
 
         /**
@@ -1375,9 +1487,11 @@ public abstract class GridH2IndexBase extends BaseIndex {
          */
         RangeSource(
             Iterable<GridH2RowRangeBounds> bounds,
+            int segment,
             ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> tree,
             IndexingQueryFilter filter
         ) {
+            this.segment = segment;
             this.filter = filter;
             this.tree = tree;
             boundsIter = bounds.iterator();
@@ -1435,7 +1549,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
                 SearchRow first = toSearchRow(bounds.first());
                 SearchRow last = toSearchRow(bounds.last());
 
-                ConcurrentNavigableMap<GridSearchRowPointer,GridH2Row> t = tree != null ? tree : treeForRead();
+                ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> t = tree != null ? tree : treeForRead(segment);
 
                 curRange = doFind0(t, first, true, last, filter);
 
@@ -1452,9 +1566,10 @@ public abstract class GridH2IndexBase extends BaseIndex {
     }
 
     /**
-     * @return Snapshot for current thread if there is one.
+     * @param segment Segment Id.
+     * @return Snapshot for requested segment if there is one.
      */
-    protected ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> treeForRead() {
+    protected ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> treeForRead(int segment) {
         throw new UnsupportedOperationException();
     }
 
@@ -1505,7 +1620,8 @@ public abstract class GridH2IndexBase extends BaseIndex {
                 this.fltr = qryFilter.forSpace(spaceName);
 
                 this.isValRequired = qryFilter.isValueRequired();
-            } else {
+            }
+            else {
                 this.fltr = null;
 
                 this.isValRequired = false;

http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
index 19ea2b2..a7ee0dc 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
@@ -32,6 +32,7 @@ import org.apache.ignite.spi.indexing.IndexingQueryFilter;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 
+import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF;
 import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.MAP;
 
 /**
@@ -79,7 +80,7 @@ public class GridH2QueryContext {
     private UUID[] partsNodes;
 
     /** */
-    private boolean distributedJoins;
+    private DistributedJoinMode distributedJoinMode;
 
     /** */
     private int pageSize;
@@ -94,7 +95,22 @@ public class GridH2QueryContext {
      * @param type Query type.
      */
     public GridH2QueryContext(UUID locNodeId, UUID nodeId, long qryId, GridH2QueryType type) {
-        key = new Key(locNodeId, nodeId, qryId, type);
+        assert type != MAP;
+
+        key = new Key(locNodeId, nodeId, qryId, 0, type);
+    }
+
+    /**
+     * @param locNodeId Local node ID.
+     * @param nodeId The node who initiated the query.
+     * @param qryId The query ID.
+     * @param segmentId Index segment ID.
+     * @param type Query type.
+     */
+    public GridH2QueryContext(UUID locNodeId, UUID nodeId, long qryId, int segmentId, GridH2QueryType type) {
+        assert segmentId == 0 || type == MAP;
+
+        key = new Key(locNodeId, nodeId, qryId, segmentId, type);
     }
 
     /**
@@ -133,20 +149,20 @@ public class GridH2QueryContext {
     }
 
     /**
-     * @param distributedJoins Distributed joins can be run in this query.
+     * @param distributedJoinMode Distributed join mode.
      * @return {@code this}.
      */
-    public GridH2QueryContext distributedJoins(boolean distributedJoins) {
-        this.distributedJoins = distributedJoins;
+    public GridH2QueryContext distributedJoinMode(DistributedJoinMode distributedJoinMode) {
+        this.distributedJoinMode = distributedJoinMode;
 
         return this;
     }
 
     /**
-     * @return {@code true} If distributed joins can be run in this query.
+     * @return Distributed join mode.
      */
-    public boolean distributedJoins() {
-        return distributedJoins;
+    public DistributedJoinMode distributedJoinMode() {
+        return distributedJoinMode;
     }
 
     /**
@@ -226,6 +242,11 @@ public class GridH2QueryContext {
         return nodeIds[p];
     }
 
+    /** @return index segment ID. */
+    public int segment() {
+        return key.segmentId;
+    }
+
     /**
      * @param idxId Index ID.
      * @param snapshot Index snapshot.
@@ -303,11 +324,12 @@ public class GridH2QueryContext {
 
     /**
      * @param ownerId Owner node ID.
+     * @param segmentId Index segment ID.
      * @param batchLookupId Batch lookup ID.
      * @param src Range source.
      */
-    public synchronized void putSource(UUID ownerId, int batchLookupId, Object src) {
-        SourceKey srcKey = new SourceKey(ownerId, batchLookupId);
+    public synchronized void putSource(UUID ownerId, int segmentId, int batchLookupId, Object src) {
+        SourceKey srcKey = new SourceKey(ownerId, segmentId, batchLookupId);
 
         if (src != null) {
             if (sources == null)
@@ -321,15 +343,16 @@ public class GridH2QueryContext {
 
     /**
      * @param ownerId Owner node ID.
+     * @param segmentId Index segment ID.
      * @param batchLookupId Batch lookup ID.
      * @return Range source.
      */
     @SuppressWarnings("unchecked")
-    public synchronized <T> T getSource(UUID ownerId, int batchLookupId) {
+    public synchronized <T> T getSource(UUID ownerId, int segmentId, int batchLookupId) {
         if (sources == null)
             return null;
 
-        return (T)sources.get(new SourceKey(ownerId, batchLookupId));
+        return (T)sources.get(new SourceKey(ownerId, segmentId, batchLookupId));
     }
 
     /**
@@ -356,7 +379,7 @@ public class GridH2QueryContext {
          assert qctx.get() == null;
 
          // We need MAP query context to be available to other threads to run distributed joins.
-         if (x.key.type == MAP && x.distributedJoins() && qctxs.putIfAbsent(x.key, x) != null)
+         if (x.key.type == MAP && x.distributedJoinMode() != OFF && qctxs.putIfAbsent(x.key, x) != null)
              throw new IllegalStateException("Query context is already set.");
 
          qctx.set(x);
@@ -381,7 +404,14 @@ public class GridH2QueryContext {
      * @return {@code True} if context was found.
      */
     public static boolean clear(UUID locNodeId, UUID nodeId, long qryId, GridH2QueryType type) {
-        return doClear(new Key(locNodeId, nodeId, qryId, type), false);
+        boolean res = false;
+
+        for (Key key : qctxs.keySet()) {
+            if (key.locNodeId.equals(locNodeId) && key.nodeId.equals(nodeId) && key.qryId == qryId && key.type == type)
+                res |= doClear(new Key(locNodeId, nodeId, qryId, key.segmentId, type), false);
+        }
+
+        return res;
     }
 
     /**
@@ -463,6 +493,7 @@ public class GridH2QueryContext {
      * @param locNodeId Local node ID.
      * @param nodeId The node who initiated the query.
      * @param qryId The query ID.
+     * @param segmentId Index segment ID.
      * @param type Query type.
      * @return Query context.
      */
@@ -470,9 +501,10 @@ public class GridH2QueryContext {
         UUID locNodeId,
         UUID nodeId,
         long qryId,
+        int segmentId,
         GridH2QueryType type
     ) {
-        return qctxs.get(new Key(locNodeId, nodeId, qryId, type));
+        return qctxs.get(new Key(locNodeId, nodeId, qryId, segmentId, type));
     }
 
     /**
@@ -528,15 +560,19 @@ public class GridH2QueryContext {
         private final long qryId;
 
         /** */
+        private final int segmentId;
+
+        /** */
         private final GridH2QueryType type;
 
         /**
          * @param locNodeId Local node ID.
          * @param nodeId The node who initiated the query.
          * @param qryId The query ID.
+         * @param segmentId Index segment ID.
          * @param type Query type.
          */
-        private Key(UUID locNodeId, UUID nodeId, long qryId, GridH2QueryType type) {
+        private Key(UUID locNodeId, UUID nodeId, long qryId, int segmentId, GridH2QueryType type) {
             assert locNodeId != null;
             assert nodeId != null;
             assert type != null;
@@ -544,6 +580,7 @@ public class GridH2QueryContext {
             this.locNodeId = locNodeId;
             this.nodeId = nodeId;
             this.qryId = qryId;
+            this.segmentId = segmentId;
             this.type = type;
         }
 
@@ -568,6 +605,7 @@ public class GridH2QueryContext {
             res = 31 * res + nodeId.hashCode();
             res = 31 * res + (int)(qryId ^ (qryId >>> 32));
             res = 31 * res + type.hashCode();
+            res = 31 * res + segmentId;
 
             return res;
         }
@@ -586,14 +624,19 @@ public class GridH2QueryContext {
         UUID ownerId;
 
         /** */
+        int segmentId;
+
+        /** */
         int batchLookupId;
 
         /**
          * @param ownerId Owner node ID.
+         * @param segmentId Index segment ID.
          * @param batchLookupId Batch lookup ID.
          */
-        SourceKey(UUID ownerId, int batchLookupId) {
+        SourceKey(UUID ownerId, int segmentId, int batchLookupId) {
             this.ownerId = ownerId;
+            this.segmentId = segmentId;
             this.batchLookupId = batchLookupId;
         }
 
@@ -601,12 +644,15 @@ public class GridH2QueryContext {
         @Override public boolean equals(Object o) {
             SourceKey srcKey = (SourceKey)o;
 
-            return batchLookupId == srcKey.batchLookupId && ownerId.equals(srcKey.ownerId);
+            return batchLookupId == srcKey.batchLookupId && segmentId == srcKey.segmentId &&
+                ownerId.equals(srcKey.ownerId);
         }
 
         /** {@inheritDoc} */
         @Override public int hashCode() {
-            return 31 * ownerId.hashCode() + batchLookupId;
+            int hash = ownerId.hashCode();
+            hash = 31 * hash + segmentId;
+            return 31 * hash + batchLookupId;
         }
     }
 }


[10/12] ignite git commit: Review minors.

Posted by se...@apache.org.
Review minors.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9bc776fe
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9bc776fe
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9bc776fe

Branch: refs/heads/ignite-1.9
Commit: 9bc776fef537d2fb749951cbbf186e81858c11b5
Parents: de6e52b
Author: Andrey V. Mashenkov <an...@gmail.com>
Authored: Tue Feb 21 19:25:14 2017 +0300
Committer: Andrey V. Mashenkov <an...@gmail.com>
Committed: Tue Feb 21 19:29:59 2017 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheProcessor.java    |  2 +-
 .../query/h2/opt/GridH2SpatialIndex.java        | 23 +++-------
 .../h2/GridH2IndexingSegmentedGeoSelfTest.java  |  2 +
 .../query/h2/opt/GridH2IndexBase.java           | 38 ++++++++++++++++-
 .../query/h2/opt/GridH2TreeIndex.java           | 44 +-------------------
 .../query/IgniteSqlSplitterSelfTest.java        |  2 +-
 6 files changed, 46 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/9bc776fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index c3e3f3b..50e1379 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -270,7 +270,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             cfg.setBackups(Integer.MAX_VALUE);
 
         if( cfg.getQueryParallelism() > 1 && cfg.getCacheMode() != PARTITIONED)
-            throw new IgniteCheckedException("Cache index segmentation is supported for PARTITIONED mode only.");
+            throw new IgniteCheckedException("Segmented indices are supported for PARTITIONED mode only.");
 
         if (cfg.getAffinityMapper() == null)
             cfg.setAffinityMapper(cacheObjCtx.defaultAffMapper());

http://git-wip-us.apache.org/repos/asf/ignite/blob/9bc776fe/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java
----------------------------------------------------------------------
diff --git a/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java b/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java
index 096b82d..c3a1362 100644
--- a/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java
+++ b/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java
@@ -121,6 +121,8 @@ public class GridH2SpatialIndex extends GridH2IndexBase implements SpatialIndex
 
         for (int i = 0; i < segmentsCnt; i++)
             segments[i] = store.openMap("spatialIndex-" + i, new MVRTreeMap.Builder<Long>());
+
+        ctx = tbl.rowDescriptor().context();
     }
 
     /**
@@ -156,13 +158,11 @@ public class GridH2SpatialIndex extends GridH2IndexBase implements SpatialIndex
 
             assert key != null;
 
-            Long rowId = keyToId.get(key);
+            final int seg = segmentForRow(row);
 
-            int seg;
+            Long rowId = keyToId.get(key);
 
             if (rowId != null) {
-                seg = segmentForRowID(rowId);
-
                 Long oldRowId = segments[seg].remove(getEnvelope(idToRow.get(rowId), rowId));
 
                 assert rowId.equals(oldRowId);
@@ -170,8 +170,6 @@ public class GridH2SpatialIndex extends GridH2IndexBase implements SpatialIndex
             else {
                 rowId = ++rowIds;
 
-                seg = segmentForRowID(rowId);
-
                 keyToId.put(key, rowId);
             }
 
@@ -190,17 +188,6 @@ public class GridH2SpatialIndex extends GridH2IndexBase implements SpatialIndex
     }
 
     /**
-     * @param id Row ID.
-     *
-     * @return Segment ID for given row ID.
-     */
-    private int segmentForRowID(Long id) {
-        assert id != null;
-
-        return (int)(id % segmentsCount());
-    }
-
-    /**
      * @param row Row.
      * @param rowId Row id.
      * @return Envelope.
@@ -235,7 +222,7 @@ public class GridH2SpatialIndex extends GridH2IndexBase implements SpatialIndex
 
             assert oldRow != null;
 
-            int seg = segmentForRowID(rowId);
+            final int seg = segmentForRow(row);
 
             if (!segments[seg].remove(getEnvelope(row, rowId), rowId))
                 throw DbException.throwInternalError("row not found");

http://git-wip-us.apache.org/repos/asf/ignite/blob/9bc776fe/modules/geospatial/src/test/java/org/apache/ignite/internal/processors/query/h2/GridH2IndexingSegmentedGeoSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/geospatial/src/test/java/org/apache/ignite/internal/processors/query/h2/GridH2IndexingSegmentedGeoSelfTest.java b/modules/geospatial/src/test/java/org/apache/ignite/internal/processors/query/h2/GridH2IndexingSegmentedGeoSelfTest.java
index b806321..e404f38 100644
--- a/modules/geospatial/src/test/java/org/apache/ignite/internal/processors/query/h2/GridH2IndexingSegmentedGeoSelfTest.java
+++ b/modules/geospatial/src/test/java/org/apache/ignite/internal/processors/query/h2/GridH2IndexingSegmentedGeoSelfTest.java
@@ -27,4 +27,6 @@ public class GridH2IndexingSegmentedGeoSelfTest extends GridH2IndexingGeoSelfTes
     @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception {
         return super.cacheConfiguration(gridName).setQueryParallelism(7);
     }
+
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/9bc776fe/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
index 31057c7..fc5eb4b 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
@@ -43,6 +43,7 @@ import org.apache.ignite.internal.GridTopic;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
+import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable;
 import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeRequest;
@@ -126,6 +127,8 @@ public abstract class GridH2IndexBase extends BaseIndex {
         }
     };
 
+    protected GridCacheContext<?, ?> ctx;
+
     /**
      * @param tbl Table.
      */
@@ -133,6 +136,8 @@ public abstract class GridH2IndexBase extends BaseIndex {
         final GridH2RowDescriptor desc = tbl.rowDescriptor();
 
         if (desc != null && desc.context() != null) {
+            ctx = desc.context();
+
             GridKernalContext ctx = desc.context().kernalContext();
 
             log = ctx.log(getClass());
@@ -183,11 +188,11 @@ public abstract class GridH2IndexBase extends BaseIndex {
      * @return Index segment ID for current query context.
      */
     protected int threadLocalSegment() {
-        GridH2QueryContext qctx = GridH2QueryContext.get();
-
         if(segmentsCount() == 1)
             return 0;
 
+        GridH2QueryContext qctx = GridH2QueryContext.get();
+
         if(qctx == null)
             throw new IllegalStateException("GridH2QueryContext is not initialized.");
 
@@ -864,6 +869,35 @@ public abstract class GridH2IndexBase extends BaseIndex {
     }
 
     /**
+     * @param row Table row.
+     * @return Segment ID for given row.
+     */
+    protected int segmentForRow(SearchRow row) {
+        assert row != null;
+
+        CacheObject key;
+
+        if (ctx != null) {
+            final Value keyColValue = row.getValue(KEY_COL);
+
+            assert keyColValue != null;
+
+            final Object o = keyColValue.getObject();
+
+            if (o instanceof CacheObject)
+                key = (CacheObject)o;
+            else
+                key = ctx.toCacheKeyObject(o);
+
+            return segmentForPartition(ctx.affinity().partition(key));
+        }
+
+        assert segmentsCount() == 1;
+
+        return 0;
+    }
+
+    /**
      * Simple cursor from a single node.
      */
     private static class UnicastCursor implements Cursor {

http://git-wip-us.apache.org/repos/asf/ignite/blob/9bc776fe/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
index 80597f2..2873211 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
@@ -17,15 +17,12 @@
 
 package org.apache.ignite.internal.processors.query.h2.opt;
 
-import java.lang.reflect.Field;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.NavigableMap;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
-import org.apache.ignite.internal.processors.cache.CacheObject;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.util.GridEmptyIterator;
 import org.apache.ignite.internal.util.offheap.unsafe.GridOffHeapSnapTreeMap;
 import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeGuard;
@@ -45,8 +42,6 @@ import org.h2.table.TableFilter;
 import org.h2.value.Value;
 import org.jetbrains.annotations.Nullable;
 
-import static org.apache.ignite.internal.processors.query.h2.opt.GridH2AbstractKeyValueRow.KEY_COL;
-
 /**
  * Base class for snapshotable segmented tree indexes.
  */
@@ -58,9 +53,6 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
     /** */
     private final boolean snapshotEnabled;
 
-    /** */
-    private final GridH2RowDescriptor desc;
-
     /**
      * Constructor with index initialization. Creates index with single segment.
      *
@@ -91,8 +83,6 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
 
         IndexColumn.mapColumns(cols, tbl);
 
-        desc = tbl.rowDescriptor();
-
         initBaseIndex(tbl, 0, name, cols,
             pk ? IndexType.createPrimaryKey(false, false) : IndexType.createNonUnique(false, false, false));
 
@@ -106,8 +96,7 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
             if (snapshotEnabled) {
                 for (int i = 0; i < segmentsCnt; i++) {
                     segments[i] = new SnapTreeMap<GridSearchRowPointer, GridH2Row>(this) {
-                        @Override
-                        protected void afterNodeUpdate_nl(Node<GridSearchRowPointer, GridH2Row> node, Object val) {
+                        @Override protected void afterNodeUpdate_nl(Node<GridSearchRowPointer, GridH2Row> node, Object val) {
                             if (val != null)
                                 node.key = (GridSearchRowPointer)val;
                         }
@@ -426,37 +415,6 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
     }
 
     /**
-     * @param row Table row.
-     * @return Segment ID for given row.
-     */
-    private int segmentForRow(SearchRow row) {
-        assert row != null;
-
-        CacheObject key;
-
-        if (desc != null && desc.context() != null) {
-            GridCacheContext<?, ?> ctx = desc.context();
-
-            assert ctx != null;
-
-            final Value keyColValue = row.getValue(KEY_COL);
-
-            assert keyColValue != null;
-
-            final Object o = keyColValue.getObject();
-
-            if (o instanceof CacheObject)
-                key = (CacheObject)o;
-            else
-                key = ctx.toCacheKeyObject(o);
-
-            return segmentForPartition(ctx.affinity().partition(key));
-        }
-        else
-            return 0;
-    }
-
-    /**
      * Comparable row with bias. Will be used for queries to have correct bounds (in case of multicolumn index
      * and query on few first columns we will multiple equal entries in tree).
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/9bc776fe/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
index 37dea47..69f66a5 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
@@ -824,7 +824,7 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest {
 
                 return null;
             }
-        }, CacheException.class, "Cache index segmentation is supported for PARTITIONED mode only.");
+        }, CacheException.class, " Segmented indices are supported for PARTITIONED mode only.");
     }
 
     /**


[09/12] ignite git commit: Fix queries with binary marshaller.

Posted by se...@apache.org.
Fix queries with binary marshaller.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/de6e52ba
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/de6e52ba
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/de6e52ba

Branch: refs/heads/ignite-1.9
Commit: de6e52ba54ce58f0dadad16de8aae14b55ab8478
Parents: f4d36cb
Author: Andrey V. Mashenkov <an...@gmail.com>
Authored: Tue Feb 21 16:27:08 2017 +0300
Committer: Andrey V. Mashenkov <an...@gmail.com>
Committed: Tue Feb 21 16:27:08 2017 +0300

----------------------------------------------------------------------
 .../internal/processors/query/h2/opt/GridH2TreeIndex.java      | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/de6e52ba/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
index 07c3e6d..80597f2 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
@@ -439,7 +439,11 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
 
             assert ctx != null;
 
-            Object o = ctx.toCacheKeyObject(row.getValue(KEY_COL));
+            final Value keyColValue = row.getValue(KEY_COL);
+
+            assert keyColValue != null;
+
+            final Object o = keyColValue.getObject();
 
             if (o instanceof CacheObject)
                 key = (CacheObject)o;


[06/12] ignite git commit: Minor

Posted by se...@apache.org.
Minor


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/56ce2237
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/56ce2237
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/56ce2237

Branch: refs/heads/ignite-1.9
Commit: 56ce2237a87c2285d9e2f2e8ce433ba17d27c64c
Parents: 5b774f6
Author: Andrey V. Mashenkov <an...@gmail.com>
Authored: Tue Feb 21 12:23:28 2017 +0300
Committer: Andrey V. Mashenkov <an...@gmail.com>
Committed: Tue Feb 21 12:52:33 2017 +0300

----------------------------------------------------------------------
 .../processors/query/h2/twostep/GridMapQueryExecutor.java     | 7 +++++--
 .../processors/query/h2/twostep/GridMergeIndexSorted.java     | 4 ++--
 .../processors/query/h2/twostep/GridReduceQueryExecutor.java  | 2 +-
 3 files changed, 8 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/56ce2237/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 72a34a6..f002a5e 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -458,6 +458,8 @@ public class GridMapQueryExecutor {
             req.isFlagSet(GridH2QueryRequest.FLAG_IS_LOCAL),
             req.isFlagSet(GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS));
 
+        final boolean enforceJoinOrder = req.isFlagSet(GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER);
+
         for (int i = 1; i < mainCctx.config().getQueryParallelism(); i++) {
             final int segment = i;
 
@@ -475,6 +477,7 @@ public class GridMapQueryExecutor {
                             req.tables(),
                             req.pageSize(),
                             joinMode,
+                            enforceJoinOrder,
                             req.timeout());
 
                         return null;
@@ -494,7 +497,7 @@ public class GridMapQueryExecutor {
             req.tables(),
             req.pageSize(),
             joinMode,
-            req.isFlagSet(GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER),
+            enforceJoinOrder,
             req.timeout());
     }
 
@@ -585,7 +588,7 @@ public class GridMapQueryExecutor {
             Connection conn = h2.connectionForSpace(mainCctx.name());
 
             // Here we enforce join order to have the same behavior on all the nodes.
-            h2.setupConnection(conn, distributedJoinMode != OFF, enforceJoinOrder);
+            setupConnection(conn, distributedJoinMode != OFF, enforceJoinOrder);
 
             GridH2QueryContext.set(qctx);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/56ce2237/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexSorted.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexSorted.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexSorted.java
index a1b6691..32c676d 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexSorted.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexSorted.java
@@ -85,8 +85,8 @@ public final class GridMergeIndexSorted extends GridMergeIndex {
     }
 
     /** {@inheritDoc} */
-    @Override public void setSources(Collection<ClusterNode> nodes) {
-        super.setSources(nodes);
+    @Override public void setSources(Collection<ClusterNode> nodes, int segmentsCnt) {
+        super.setSources(nodes, segmentsCnt);
 
         streamsMap = U.newHashMap(nodes.size());
         streams = new RowStream[nodes.size()];

http://git-wip-us.apache.org/repos/asf/ignite/blob/56ce2237/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 3cfaae9..4cae6ac 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -624,7 +624,7 @@ public class GridReduceQueryExecutor {
                 if (distributedJoins)
                     flags |= GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS;
 
-                if(qry.isLocal())
+                if (qry.isLocal())
                     flags |= GridH2QueryRequest.FLAG_IS_LOCAL;
 
                 if (send(nodes,


[07/12] ignite git commit: Minors

Posted by se...@apache.org.
Minors


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2e90a072
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2e90a072
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2e90a072

Branch: refs/heads/ignite-1.9
Commit: 2e90a07218272c5cb2c09111e193a3613d3d51ac
Parents: 56ce223
Author: Andrey V. Mashenkov <an...@gmail.com>
Authored: Tue Feb 21 15:05:51 2017 +0300
Committer: Andrey V. Mashenkov <an...@gmail.com>
Committed: Tue Feb 21 15:05:51 2017 +0300

----------------------------------------------------------------------
 .../query/h2/opt/GridH2TreeIndex.java           | 34 ++++----------------
 1 file changed, 6 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/2e90a072/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
index 64ca9ea..07c3e6d 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
@@ -45,25 +45,13 @@ import org.h2.table.TableFilter;
 import org.h2.value.Value;
 import org.jetbrains.annotations.Nullable;
 
+import static org.apache.ignite.internal.processors.query.h2.opt.GridH2AbstractKeyValueRow.KEY_COL;
+
 /**
  * Base class for snapshotable segmented tree indexes.
  */
 @SuppressWarnings("ComparatorNotSerializable")
 public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridSearchRowPointer> {
-    /** */
-    private static Field KEY_FIELD;
-
-    /** */
-    static {
-        try {
-            KEY_FIELD = GridH2AbstractKeyValueRow.class.getDeclaredField("key");
-            KEY_FIELD.setAccessible(true);
-        }
-        catch (NoSuchFieldException e) {
-            KEY_FIELD = null;
-        }
-    }
-
     /** Index segments. */
     private final ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row>[] segments;
 
@@ -451,22 +439,12 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
 
             assert ctx != null;
 
-            if (row instanceof GridH2AbstractKeyValueRow && KEY_FIELD != null) {
-                try {
-                    Object o = KEY_FIELD.get(row);
-
-                    if (o instanceof CacheObject)
-                        key = (CacheObject)o;
-                    else
-                        key = ctx.toCacheKeyObject(o);
+            Object o = ctx.toCacheKeyObject(row.getValue(KEY_COL));
 
-                }
-                catch (IllegalAccessException e) {
-                    throw new IllegalStateException(e);
-                }
-            }
+            if (o instanceof CacheObject)
+                key = (CacheObject)o;
             else
-                key = ctx.toCacheKeyObject(row.getValue(0));
+                key = ctx.toCacheKeyObject(o);
 
             return segmentForPartition(ctx.affinity().partition(key));
         }


[11/12] ignite git commit: Distributed joins with segmented spatial index tests added.

Posted by se...@apache.org.
Distributed joins with segmented spatial index tests added.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4dc36948
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4dc36948
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4dc36948

Branch: refs/heads/ignite-1.9
Commit: 4dc369487ef6f5583348f6a0232952c441623723
Parents: 9bc776f
Author: Andrey V. Mashenkov <an...@gmail.com>
Authored: Tue Feb 21 21:13:13 2017 +0300
Committer: Andrey V. Mashenkov <an...@gmail.com>
Committed: Tue Feb 21 21:13:13 2017 +0300

----------------------------------------------------------------------
 .../query/h2/GridH2IndexingGeoSelfTest.java     | 405 ++++++++++++++-----
 .../h2/GridH2IndexingSegmentedGeoSelfTest.java  |  13 +-
 2 files changed, 302 insertions(+), 116 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/4dc36948/modules/geospatial/src/test/java/org/apache/ignite/internal/processors/query/h2/GridH2IndexingGeoSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/geospatial/src/test/java/org/apache/ignite/internal/processors/query/h2/GridH2IndexingGeoSelfTest.java b/modules/geospatial/src/test/java/org/apache/ignite/internal/processors/query/h2/GridH2IndexingGeoSelfTest.java
index 2843076..839514b 100644
--- a/modules/geospatial/src/test/java/org/apache/ignite/internal/processors/query/h2/GridH2IndexingGeoSelfTest.java
+++ b/modules/geospatial/src/test/java/org/apache/ignite/internal/processors/query/h2/GridH2IndexingGeoSelfTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.query.h2;
 
 import com.vividsolutions.jts.geom.Geometry;
+import com.vividsolutions.jts.io.ParseException;
 import com.vividsolutions.jts.io.WKTReader;
 import java.io.Serializable;
 import java.util.Arrays;
@@ -31,14 +32,18 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.cache.Cache;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.cache.query.SqlQuery;
 import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheAbstractSelfTest;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.testframework.GridTestUtils;
+import org.jetbrains.annotations.NotNull;
 
 /**
  * Geo-indexing test.
@@ -50,6 +55,12 @@ public class GridH2IndexingGeoSelfTest extends GridCacheAbstractSelfTest {
     /** */
     private static final long DUR = 60000L;
 
+    /** Number of generated samples. */
+    public static final int ENEMYCAMP_SAMPLES_COUNT = 500;
+
+    /** Number of generated samples. */
+    public static final int ENEMY_SAMPLES_COUNT = 1000;
+
     /** {@inheritDoc} */
     @Override protected int gridCount() {
         return 3;
@@ -60,29 +71,41 @@ public class GridH2IndexingGeoSelfTest extends GridCacheAbstractSelfTest {
         return DUR * 3;
     }
 
-    /** {@inheritDoc} */
-    @Override protected Class<?>[] indexedTypes() {
-        return new Class<?>[]{
-            Integer.class, EnemyCamp.class,
-            Long.class, Geometry.class // Geometry must be indexed here.
-        };
+    /**
+     * @param name Cache name.
+     * @param partitioned Partition or replicated cache.
+     * @param idxTypes Indexed types.
+     * @return Cache configuration.
+     */
+    protected <K, V> CacheConfiguration<K, V> cacheConfig(String name, boolean partitioned,
+        Class<?>... idxTypes) throws Exception {
+        return new CacheConfiguration<K, V>(name)
+            .setName(name)
+            .setCacheMode(partitioned ? CacheMode.PARTITIONED : CacheMode.REPLICATED)
+            .setAtomicityMode(CacheAtomicityMode.ATOMIC)
+            .setIndexedTypes(idxTypes);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testPrimitiveGeometry() throws Exception {
-        IgniteCache<Long, Geometry> cache = grid(0).cache(null);
+        IgniteCache<Long, Geometry> cache = grid(0).getOrCreateCache(cacheConfig("geom", true, Long.class, Geometry.class));
 
-        WKTReader r = new WKTReader();
+        try {
+            WKTReader r = new WKTReader();
 
-        for (long i = 0; i < 100; i++)
-            cache.put(i, r.read("POINT(" + i + " " + i + ")"));
+            for (long i = 0; i < 100; i++)
+                cache.put(i, r.read("POINT(" + i + " " + i + ")"));
 
-        List<List<?>> res = cache.query(new SqlFieldsQuery("explain select _key from Geometry where _val && ?")
-            .setArgs(r.read("POLYGON((5 70, 5 80, 30 80, 30 70, 5 70))")).setLocal(true)).getAll();
+            List<List<?>> res = cache.query(new SqlFieldsQuery("explain select _key from Geometry where _val && ?")
+                .setArgs(r.read("POLYGON((5 70, 5 80, 30 80, 30 70, 5 70))")).setLocal(true)).getAll();
 
-        assertTrue("__ explain: " + res, res.get(0).get(0).toString().contains("_val_idx"));
+            assertTrue("__ explain: " + res, res.get(0).get(0).toString().contains("_val_idx"));
+        }
+        finally {
+            cache.destroy();
+        }
     }
 
     /**
@@ -90,59 +113,65 @@ public class GridH2IndexingGeoSelfTest extends GridCacheAbstractSelfTest {
      */
     @SuppressWarnings("unchecked")
     public void testGeo() throws Exception {
-        IgniteCache<Integer, EnemyCamp> cache = grid(0).cache(null);
+        IgniteCache<Integer, EnemyCamp> cache = grid(0).getOrCreateCache(
+            cacheConfig("camp", true, Integer.class, EnemyCamp.class));
 
-        WKTReader r = new WKTReader();
+        try {
+            WKTReader r = new WKTReader();
 
-        cache.getAndPut(0, new EnemyCamp(r.read("POINT(25 75)"), "A"));
-        cache.getAndPut(1, new EnemyCamp(r.read("POINT(70 70)"), "B"));
-        cache.getAndPut(2, new EnemyCamp(r.read("POINT(70 30)"), "C"));
-        cache.getAndPut(3, new EnemyCamp(r.read("POINT(75 25)"), "D"));
+            cache.getAndPut(0, new EnemyCamp(r.read("POINT(25 75)"), "A"));
+            cache.getAndPut(1, new EnemyCamp(r.read("POINT(70 70)"), "B"));
+            cache.getAndPut(2, new EnemyCamp(r.read("POINT(70 30)"), "C"));
+            cache.getAndPut(3, new EnemyCamp(r.read("POINT(75 25)"), "D"));
 
-        SqlQuery<Integer, EnemyCamp> qry = new SqlQuery(EnemyCamp.class, "coords && ?");
+            SqlQuery<Integer, EnemyCamp> qry = new SqlQuery(EnemyCamp.class, "coords && ?");
 
-        Collection<Cache.Entry<Integer, EnemyCamp>> res = cache.query(
-            qry.setArgs(r.read("POLYGON((5 70, 5 80, 30 80, 30 70, 5 70))"))).getAll();
+            Collection<Cache.Entry<Integer, EnemyCamp>> res = cache.query(
+                qry.setArgs(r.read("POLYGON((5 70, 5 80, 30 80, 30 70, 5 70))"))).getAll();
 
-        checkPoints(res, "A");
+            checkPoints(res, "A");
 
-        res = cache.query(
-            qry.setArgs(r.read("POLYGON((10 5, 10 35, 70 30, 75 25, 10 5))"))).getAll();
+            res = cache.query(
+                qry.setArgs(r.read("POLYGON((10 5, 10 35, 70 30, 75 25, 10 5))"))).getAll();
 
-        checkPoints(res, "C", "D");
+            checkPoints(res, "C", "D");
 
-        // Move B to the first polygon.
-        cache.getAndPut(1, new EnemyCamp(r.read("POINT(20 75)"), "B"));
+            // Move B to the first polygon.
+            cache.getAndPut(1, new EnemyCamp(r.read("POINT(20 75)"), "B"));
 
-        res = cache.query(
-            qry.setArgs(r.read("POLYGON((5 70, 5 80, 30 80, 30 70, 5 70))"))).getAll();
+            res = cache.query(
+                qry.setArgs(r.read("POLYGON((5 70, 5 80, 30 80, 30 70, 5 70))"))).getAll();
 
-        checkPoints(res, "A", "B");
+            checkPoints(res, "A", "B");
 
-        // Move B to the second polygon.
-        cache.getAndPut(1, new EnemyCamp(r.read("POINT(30 30)"), "B"));
+            // Move B to the second polygon.
+            cache.getAndPut(1, new EnemyCamp(r.read("POINT(30 30)"), "B"));
 
-        res = cache.query(
-            qry.setArgs(r.read("POLYGON((10 5, 10 35, 70 30, 75 25, 10 5))"))).getAll();
+            res = cache.query(
+                qry.setArgs(r.read("POLYGON((10 5, 10 35, 70 30, 75 25, 10 5))"))).getAll();
 
-        checkPoints(res, "B", "C", "D");
+            checkPoints(res, "B", "C", "D");
 
-        // Remove B.
-        cache.getAndRemove(1);
+            // Remove B.
+            cache.getAndRemove(1);
 
-        res = cache.query(
-            qry.setArgs(r.read("POLYGON((5 70, 5 80, 30 80, 30 70, 5 70))"))).getAll();
+            res = cache.query(
+                qry.setArgs(r.read("POLYGON((5 70, 5 80, 30 80, 30 70, 5 70))"))).getAll();
 
-        checkPoints(res, "A");
+            checkPoints(res, "A");
 
-        res = cache.query(
-            qry.setArgs(r.read("POLYGON((10 5, 10 35, 70 30, 75 25, 10 5))"))).getAll();
+            res = cache.query(
+                qry.setArgs(r.read("POLYGON((10 5, 10 35, 70 30, 75 25, 10 5))"))).getAll();
 
-        checkPoints(res, "C", "D");
+            checkPoints(res, "C", "D");
 
-        // Check explaint request.
-        assertTrue(F.first(cache.query(new SqlFieldsQuery("explain select * from EnemyCamp " +
-            "where coords && 'POINT(25 75)'")).getAll()).get(0).toString().contains("coords_idx"));
+            // Check explaint request.
+            assertTrue(F.first(cache.query(new SqlFieldsQuery("explain select * from EnemyCamp " +
+                "where coords && 'POINT(25 75)'")).getAll()).get(0).toString().contains("coords_idx"));
+        }
+        finally {
+            cache.destroy();
+        }
     }
 
     /**
@@ -150,100 +179,107 @@ public class GridH2IndexingGeoSelfTest extends GridCacheAbstractSelfTest {
      */
     @SuppressWarnings("unchecked")
     public void testGeoMultithreaded() throws Exception {
-        final IgniteCache<Integer, EnemyCamp> cache1 = grid(0).cache(null);
-        final IgniteCache<Integer, EnemyCamp> cache2 = grid(1).cache(null);
-        final IgniteCache<Integer, EnemyCamp> cache3 = grid(2).cache(null);
+        final CacheConfiguration<Integer, EnemyCamp> ccfg = cacheConfig("camp", true, Integer.class, EnemyCamp.class);
 
-        final String[] points = new String[CNT];
+        final IgniteCache<Integer, EnemyCamp> cache1 = grid(0).getOrCreateCache(ccfg);
+        final IgniteCache<Integer, EnemyCamp> cache2 = grid(1).cache("camp");
+        final IgniteCache<Integer, EnemyCamp> cache3 = grid(2).cache("camp");
 
-        WKTReader r = new WKTReader();
+        try {
+            final String[] points = new String[CNT];
 
-        ThreadLocalRandom rnd = ThreadLocalRandom.current();
+            WKTReader r = new WKTReader();
 
-        for (int idx = 0; idx < CNT; idx++) {
-            int x = rnd.nextInt(1, 100);
-            int y = rnd.nextInt(1, 100);
+            ThreadLocalRandom rnd = ThreadLocalRandom.current();
 
-            cache1.getAndPut(idx, new EnemyCamp(r.read("POINT(" + x + " " + y + ")"), Integer.toString(idx)));
+            for (int idx = 0; idx < CNT; idx++) {
+                int x = rnd.nextInt(1, 100);
+                int y = rnd.nextInt(1, 100);
 
-            points[idx] = Integer.toString(idx);
-        }
+                cache1.getAndPut(idx, new EnemyCamp(r.read("POINT(" + x + " " + y + ")"), Integer.toString(idx)));
 
-        Thread.sleep(200);
+                points[idx] = Integer.toString(idx);
+            }
 
-        final AtomicBoolean stop = new AtomicBoolean();
-        final AtomicReference<Exception> err = new AtomicReference<>();
+            Thread.sleep(200);
 
-        IgniteInternalFuture<?> putFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
-            @Override public Void call() throws Exception {
-                WKTReader r = new WKTReader();
+            final AtomicBoolean stop = new AtomicBoolean();
+            final AtomicReference<Exception> err = new AtomicReference<>();
 
-                ThreadLocalRandom rnd = ThreadLocalRandom.current();
+            IgniteInternalFuture<?> putFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    WKTReader r = new WKTReader();
 
-                while (!stop.get()) {
-                    int cacheIdx = rnd.nextInt(0, 3);
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
 
-                    IgniteCache<Integer, EnemyCamp> cache = cacheIdx == 0 ? cache1 : cacheIdx == 1 ? cache2 : cache3;
+                    while (!stop.get()) {
+                        int cacheIdx = rnd.nextInt(0, 3);
 
-                    int idx = rnd.nextInt(CNT);
-                    int x = rnd.nextInt(1, 100);
-                    int y = rnd.nextInt(1, 100);
+                        IgniteCache<Integer, EnemyCamp> cache = cacheIdx == 0 ? cache1 : cacheIdx == 1 ? cache2 : cache3;
 
-                    cache.getAndPut(idx, new EnemyCamp(r.read("POINT(" + x + " " + y + ")"), Integer.toString(idx)));
+                        int idx = rnd.nextInt(CNT);
+                        int x = rnd.nextInt(1, 100);
+                        int y = rnd.nextInt(1, 100);
 
-                    U.sleep(50);
-                }
+                        cache.getAndPut(idx, new EnemyCamp(r.read("POINT(" + x + " " + y + ")"), Integer.toString(idx)));
 
-                return null;
-            }
-        }, Runtime.getRuntime().availableProcessors(), "put-thread");
+                        U.sleep(50);
+                    }
 
-        IgniteInternalFuture<?> qryFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
-            @Override public Void call() throws Exception {
-                WKTReader r = new WKTReader();
+                    return null;
+                }
+            }, Runtime.getRuntime().availableProcessors(), "put-thread");
 
-                ThreadLocalRandom rnd = ThreadLocalRandom.current();
+            IgniteInternalFuture<?> qryFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    WKTReader r = new WKTReader();
 
-                while (!stop.get()) {
-                    try {
-                        int cacheIdx = rnd.nextInt(0, 3);
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
 
-                        IgniteCache<Integer, EnemyCamp> cache = cacheIdx == 0 ? cache1 : cacheIdx == 1 ? cache2 : cache3;
+                    while (!stop.get()) {
+                        try {
+                            int cacheIdx = rnd.nextInt(0, 3);
 
-                        SqlQuery<Integer, EnemyCamp> qry = new SqlQuery<>(
-                            EnemyCamp.class, "coords && ?");
+                            IgniteCache<Integer, EnemyCamp> cache = cacheIdx == 0 ? cache1 : cacheIdx == 1 ? cache2 : cache3;
 
-                        Collection<Cache.Entry<Integer, EnemyCamp>> res = cache.query(qry.setArgs(
-                            r.read("POLYGON((0 0, 0 100, 100 100, 100 0, 0 0))"))).getAll();
+                            SqlQuery<Integer, EnemyCamp> qry = new SqlQuery<>(
+                                EnemyCamp.class, "coords && ?");
 
-                        checkPoints(res, points);
+                            Collection<Cache.Entry<Integer, EnemyCamp>> res = cache.query(qry.setArgs(
+                                r.read("POLYGON((0 0, 0 100, 100 100, 100 0, 0 0))"))).getAll();
 
-                        U.sleep(5);
-                    }
-                    catch (Exception e) {
-                        err.set(e);
+                            checkPoints(res, points);
 
-                        stop.set(true);
+                            U.sleep(5);
+                        }
+                        catch (Exception e) {
+                            err.set(e);
 
-                        break;
+                            stop.set(true);
+
+                            break;
+                        }
                     }
-                }
 
-                return null;
-            }
-        }, 4, "qry-thread");
+                    return null;
+                }
+            }, 4, "qry-thread");
 
-        U.sleep(60000L);
+            U.sleep(6000L);
 
-        stop.set(true);
+            stop.set(true);
 
-        putFut.get();
-        qryFut.get();
+            putFut.get();
+            qryFut.get();
 
-        Exception err0 = err.get();
+            Exception err0 = err.get();
 
-        if (err0 != null)
-            throw err0;
+            if (err0 != null)
+                throw err0;
+        }
+        finally {
+            cache1.destroy();
+        }
     }
 
     /**
@@ -252,7 +288,7 @@ public class GridH2IndexingGeoSelfTest extends GridCacheAbstractSelfTest {
      * @param res Result.
      * @param points Expected points.
      */
-    private void checkPoints( Collection<Cache.Entry<Integer, EnemyCamp>> res, String... points) {
+    private void checkPoints(Collection<Cache.Entry<Integer, EnemyCamp>> res, String... points) {
         Set<String> set = new HashSet<>(Arrays.asList(points));
 
         assertEquals(set.size(), res.size());
@@ -262,12 +298,157 @@ public class GridH2IndexingGeoSelfTest extends GridCacheAbstractSelfTest {
     }
 
     /**
+     * @throws Exception if fails.
+     */
+    public void testSegmentedGeoIndexJoin() throws Exception {
+        IgniteCache<Integer, Enemy> c1 = ignite(0).getOrCreateCache(cacheConfig("enemy", true, Integer.class, Enemy.class));
+        IgniteCache<Integer, EnemyCamp> c2 = ignite(0).getOrCreateCache(cacheConfig("camp", true, Integer.class, EnemyCamp.class));
+
+        try {
+            fillCache();
+
+            checkDistributedQuery();
+
+            checkLocalQuery();
+        }
+        finally {
+            c1.destroy();
+            c2.destroy();
+        }
+    }
+
+    /**
+     * @throws Exception if fails.
+     */
+    public void testSegmentedGeoIndexJoin2() throws Exception {
+        IgniteCache<Integer, Enemy> c1 = ignite(0).getOrCreateCache(cacheConfig("enemy", true, Integer.class, Enemy.class));
+        IgniteCache<Integer, EnemyCamp> c2 = ignite(0).getOrCreateCache(cacheConfig("camp", false, Integer.class, EnemyCamp.class));
+
+        try {
+            fillCache();
+
+            checkDistributedQuery();
+
+            checkLocalQuery();
+        }
+        finally {
+            c1.destroy();
+            c2.destroy();
+        }
+    }
+
+    /** */
+    private void checkDistributedQuery() throws ParseException {
+        IgniteCache<Integer, Enemy> c1 = ignite(0).cache("enemy");
+        IgniteCache<Integer, EnemyCamp> c2 = ignite(0).cache("camp");
+
+        final Geometry lethalArea = new WKTReader().read("POLYGON((30 30, 30 70, 70 70, 70 30, 30 30))");
+
+        int expectedEnemies = 0;
+
+        for (Cache.Entry<Integer, Enemy> e : c1) {
+            final Integer campID = e.getValue().campId;
+
+            if (30 <= campID && campID < ENEMYCAMP_SAMPLES_COUNT) {
+                final EnemyCamp camp = c2.get(campID);
+
+                if (lethalArea.covers(camp.coords))
+                    expectedEnemies++;
+            }
+        }
+
+        final SqlFieldsQuery query = new SqlFieldsQuery("select e._val, c._val from \"enemy\".Enemy e, \"camp\".EnemyCamp c " +
+            "where e.campId = c._key and c.coords && ?").setArgs(lethalArea);
+
+        List<List<?>> result = c1.query(query.setDistributedJoins(true)).getAll();
+
+        assertEquals(expectedEnemies, result.size());
+    }
+
+    /** */
+    private void checkLocalQuery() throws ParseException {
+        IgniteCache<Integer, Enemy> c1 = ignite(0).cache("enemy");
+        IgniteCache<Integer, EnemyCamp> c2 = ignite(0).cache("camp");
+
+        final Geometry lethalArea = new WKTReader().read("POLYGON((30 30, 30 70, 70 70, 70 30, 30 30))");
+
+        Set<Integer> localCampsIDs = new HashSet<>();
+
+        for(Cache.Entry<Integer, EnemyCamp> e : c2.localEntries())
+            localCampsIDs.add(e.getKey());
+
+        int expectedEnemies = 0;
+
+        for (Cache.Entry<Integer, Enemy> e : c1.localEntries()) {
+            final Integer campID = e.getValue().campId;
+
+            if (localCampsIDs.contains(campID)) {
+                final EnemyCamp camp = c2.get(campID);
+
+                if (lethalArea.covers(camp.coords))
+                    expectedEnemies++;
+            }
+        }
+
+        final SqlFieldsQuery query = new SqlFieldsQuery("select e._val, c._val from \"enemy\".Enemy e, \"camp\".EnemyCamp c " +
+            "where e.campId = c._key and c.coords && ?").setArgs(lethalArea);
+
+        List<List<?>> result = c1.query(query.setLocal(true)).getAll();
+
+        assertEquals(expectedEnemies, result.size());
+    }
+
+    /** */
+    private void fillCache() throws ParseException {
+        IgniteCache<Integer, Enemy> c1 = ignite(0).cache("enemy");
+        IgniteCache<Integer, EnemyCamp> c2 = ignite(0).cache("camp");
+
+        final ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+        WKTReader r = new WKTReader();
+
+        for (int i = 0; i < ENEMYCAMP_SAMPLES_COUNT; i++) {
+            final String point = String.format("POINT(%d %d)", rnd.nextInt(100), rnd.nextInt(100));
+
+            c2.put(i, new EnemyCamp(r.read(point), "camp-" + i));
+        }
+
+        for (int i = 0; i < ENEMY_SAMPLES_COUNT; i++) {
+            int campID = 30 + rnd.nextInt(ENEMYCAMP_SAMPLES_COUNT + 10);
+
+            c1.put(i, new Enemy(campID, "enemy-" + i));
+        }
+    }
+
+    /**
+     *
+     */
+    private static class Enemy {
+        /** */
+        @QuerySqlField
+        int campId;
+
+        /** */
+        @QuerySqlField
+        String name;
+
+        /**
+         * @param campId Camp ID.
+         * @param name Name.
+         */
+        public Enemy(int campId, String name) {
+            this.campId = campId;
+            this.name = name;
+        }
+    }
+
+    /**
      *
      */
-    private static class EnemyCamp implements Serializable {
+    protected static class EnemyCamp implements Serializable {
         /** */
         @QuerySqlField(index = true)
-        private Geometry coords;
+        Geometry coords;
 
         /** */
         @QuerySqlField
@@ -277,7 +458,7 @@ public class GridH2IndexingGeoSelfTest extends GridCacheAbstractSelfTest {
          * @param coords Coordinates.
          * @param name Name.
          */
-        private EnemyCamp(Geometry coords, String name) {
+        EnemyCamp(Geometry coords, String name) {
             this.coords = coords;
             this.name = name;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/4dc36948/modules/geospatial/src/test/java/org/apache/ignite/internal/processors/query/h2/GridH2IndexingSegmentedGeoSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/geospatial/src/test/java/org/apache/ignite/internal/processors/query/h2/GridH2IndexingSegmentedGeoSelfTest.java b/modules/geospatial/src/test/java/org/apache/ignite/internal/processors/query/h2/GridH2IndexingSegmentedGeoSelfTest.java
index e404f38..eb0fd0f 100644
--- a/modules/geospatial/src/test/java/org/apache/ignite/internal/processors/query/h2/GridH2IndexingSegmentedGeoSelfTest.java
+++ b/modules/geospatial/src/test/java/org/apache/ignite/internal/processors/query/h2/GridH2IndexingSegmentedGeoSelfTest.java
@@ -23,10 +23,15 @@ import org.apache.ignite.configuration.CacheConfiguration;
  * Test for segmented geo index.
  */
 public class GridH2IndexingSegmentedGeoSelfTest extends GridH2IndexingGeoSelfTest {
-    /** {@inheritDoc} */
-    @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception {
-        return super.cacheConfiguration(gridName).setQueryParallelism(7);
-    }
+    /** */
+    private static int QRY_PARALLELISM_LVL = 7;
 
+    /** {@inheritDoc} */
+    @Override
+    protected <K, V> CacheConfiguration<K, V> cacheConfig(String name, boolean partitioned,
+        Class<?>... idxTypes) throws Exception {
+        final CacheConfiguration<K, V> ccfg = super.cacheConfig(name, partitioned, idxTypes);
 
+        return ccfg.setQueryParallelism(partitioned ? QRY_PARALLELISM_LVL : 1);
+    }
 }
\ No newline at end of file


[03/12] ignite git commit: Added Spatial Index segmentation.

Posted by se...@apache.org.
Added Spatial Index segmentation.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e69d3c34
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e69d3c34
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e69d3c34

Branch: refs/heads/ignite-1.9
Commit: e69d3c34915f2f7de1b66f22e0584b7c09d8fe71
Parents: 64ba13b
Author: Andrey V. Mashenkov <an...@gmail.com>
Authored: Tue Feb 21 11:35:34 2017 +0300
Committer: Andrey V. Mashenkov <an...@gmail.com>
Committed: Tue Feb 21 11:52:58 2017 +0300

----------------------------------------------------------------------
 .../query/h2/opt/GridH2SpatialIndex.java        | 69 +++++++++++++++++---
 .../h2/GridH2IndexingSegmentedGeoSelfTest.java  | 30 +++++++++
 .../testsuites/GeoSpatialIndexingTestSuite.java |  2 +
 .../processors/query/h2/IgniteH2Indexing.java   | 16 ++---
 .../query/h2/opt/GridH2IndexBase.java           | 21 +++---
 .../query/h2/opt/GridH2TreeIndex.java           | 35 ++--------
 6 files changed, 118 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e69d3c34/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java
----------------------------------------------------------------------
diff --git a/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java b/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java
index 3062d13..096b82d 100644
--- a/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java
+++ b/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java
@@ -41,7 +41,6 @@ import org.h2.mvstore.rtree.SpatialKey;
 import org.h2.result.SearchRow;
 import org.h2.result.SortOrder;
 import org.h2.table.IndexColumn;
-import org.h2.table.Table;
 import org.h2.table.TableFilter;
 import org.h2.value.Value;
 import org.h2.value.ValueGeometry;
@@ -67,7 +66,7 @@ public class GridH2SpatialIndex extends GridH2IndexBase implements SpatialIndex
     private boolean closed;
 
     /** */
-    private final MVRTreeMap<Long> treeMap;
+    private final MVRTreeMap<Long>[] segments;
 
     /** */
     private final Map<Long, GridH2Row> idToRow = new HashMap<>();
@@ -83,7 +82,17 @@ public class GridH2SpatialIndex extends GridH2IndexBase implements SpatialIndex
      * @param idxName Index name.
      * @param cols Columns.
      */
-    public GridH2SpatialIndex(Table tbl, String idxName, IndexColumn... cols) {
+    public GridH2SpatialIndex(GridH2Table tbl, String idxName, IndexColumn... cols) {
+        this(tbl, idxName, 1, cols);
+    }
+
+    /**
+     * @param tbl Table.
+     * @param idxName Index name.
+     * @param segmentsCnt Index segments count.
+     * @param cols Columns.
+     */
+    public GridH2SpatialIndex(GridH2Table tbl, String idxName, int segmentsCnt, IndexColumn... cols) {
         if (cols.length > 1)
             throw DbException.getUnsupportedException("can only do one column");
 
@@ -107,7 +116,11 @@ public class GridH2SpatialIndex extends GridH2IndexBase implements SpatialIndex
 
         // Index in memory
         store = MVStore.open(null);
-        treeMap = store.openMap("spatialIndex", new MVRTreeMap.Builder<Long>());
+
+        segments = new MVRTreeMap[segmentsCnt];
+
+        for (int i = 0; i < segmentsCnt; i++)
+            segments[i] = store.openMap("spatialIndex-" + i, new MVRTreeMap.Builder<Long>());
     }
 
     /**
@@ -119,6 +132,11 @@ public class GridH2SpatialIndex extends GridH2IndexBase implements SpatialIndex
     }
 
     /** {@inheritDoc} */
+    @Override protected int segmentsCount() {
+        return segments.length;
+    }
+
+    /** {@inheritDoc} */
     @Nullable @Override protected Object doTakeSnapshot() {
         return null; // TODO We do not support snapshots, but probably this is possible.
     }
@@ -140,20 +158,26 @@ public class GridH2SpatialIndex extends GridH2IndexBase implements SpatialIndex
 
             Long rowId = keyToId.get(key);
 
+            int seg;
+
             if (rowId != null) {
-                Long oldRowId = treeMap.remove(getEnvelope(idToRow.get(rowId), rowId));
+                seg = segmentForRowID(rowId);
+
+                Long oldRowId = segments[seg].remove(getEnvelope(idToRow.get(rowId), rowId));
 
                 assert rowId.equals(oldRowId);
             }
             else {
                 rowId = ++rowIds;
 
+                seg = segmentForRowID(rowId);
+
                 keyToId.put(key, rowId);
             }
 
             GridH2Row old = idToRow.put(rowId, row);
 
-            treeMap.put(getEnvelope(row, rowId), rowId);
+            segments[seg].put(getEnvelope(row, rowId), rowId);
 
             if (old == null)
                 rowCnt++; // No replace.
@@ -166,6 +190,17 @@ public class GridH2SpatialIndex extends GridH2IndexBase implements SpatialIndex
     }
 
     /**
+     * @param id Row ID.
+     *
+     * @return Segment ID for given row ID.
+     */
+    private int segmentForRowID(Long id) {
+        assert id != null;
+
+        return (int)(id % segmentsCount());
+    }
+
+    /**
      * @param row Row.
      * @param rowId Row id.
      * @return Envelope.
@@ -200,7 +235,9 @@ public class GridH2SpatialIndex extends GridH2IndexBase implements SpatialIndex
 
             assert oldRow != null;
 
-            if (!treeMap.remove(getEnvelope(row, rowId), rowId))
+            int seg = segmentForRowID(rowId);
+
+            if (!segments[seg].remove(getEnvelope(row, rowId), rowId))
                 throw DbException.throwInternalError("row not found");
 
             rowCnt--;
@@ -258,7 +295,11 @@ public class GridH2SpatialIndex extends GridH2IndexBase implements SpatialIndex
         try {
             checkClosed();
 
-            return new GridH2Cursor(rowIterator(treeMap.keySet().iterator(), filter));
+            final int seg = threadLocalSegment();
+
+            final MVRTreeMap<Long> segment = segments[seg];
+
+            return new GridH2Cursor(rowIterator(segment.keySet().iterator(), filter));
         }
         finally {
             l.unlock();
@@ -305,7 +346,11 @@ public class GridH2SpatialIndex extends GridH2IndexBase implements SpatialIndex
             if (!first)
                 throw DbException.throwInternalError("Spatial Index can only be fetch by ascending order");
 
-            Iterator<GridH2Row> iter = rowIterator(treeMap.keySet().iterator(), null);
+            final int seg = threadLocalSegment();
+
+            final MVRTreeMap<Long> segment = segments[seg];
+
+            Iterator<GridH2Row> iter = rowIterator(segment.keySet().iterator(), null);
 
             return new SingleRowCursor(iter.hasNext() ? iter.next() : null);
         }
@@ -334,7 +379,11 @@ public class GridH2SpatialIndex extends GridH2IndexBase implements SpatialIndex
             if (intersection == null)
                 return find(filter.getSession(), null, null);
 
-            return new GridH2Cursor(rowIterator(treeMap.findIntersectingKeys(getEnvelope(intersection, 0)), filter));
+            final int seg = threadLocalSegment();
+
+            final MVRTreeMap<Long> segment = segments[seg];
+
+            return new GridH2Cursor(rowIterator(segment.findIntersectingKeys(getEnvelope(intersection, 0)), filter));
         }
         finally {
             l.unlock();

http://git-wip-us.apache.org/repos/asf/ignite/blob/e69d3c34/modules/geospatial/src/test/java/org/apache/ignite/internal/processors/query/h2/GridH2IndexingSegmentedGeoSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/geospatial/src/test/java/org/apache/ignite/internal/processors/query/h2/GridH2IndexingSegmentedGeoSelfTest.java b/modules/geospatial/src/test/java/org/apache/ignite/internal/processors/query/h2/GridH2IndexingSegmentedGeoSelfTest.java
new file mode 100644
index 0000000..b806321
--- /dev/null
+++ b/modules/geospatial/src/test/java/org/apache/ignite/internal/processors/query/h2/GridH2IndexingSegmentedGeoSelfTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2;
+
+import org.apache.ignite.configuration.CacheConfiguration;
+
+/**
+ * Test for segmented geo index.
+ */
+public class GridH2IndexingSegmentedGeoSelfTest extends GridH2IndexingGeoSelfTest {
+    /** {@inheritDoc} */
+    @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception {
+        return super.cacheConfiguration(gridName).setQueryParallelism(7);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/e69d3c34/modules/geospatial/src/test/java/org/apache/ignite/testsuites/GeoSpatialIndexingTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/geospatial/src/test/java/org/apache/ignite/testsuites/GeoSpatialIndexingTestSuite.java b/modules/geospatial/src/test/java/org/apache/ignite/testsuites/GeoSpatialIndexingTestSuite.java
index 1773894..3907b9e 100644
--- a/modules/geospatial/src/test/java/org/apache/ignite/testsuites/GeoSpatialIndexingTestSuite.java
+++ b/modules/geospatial/src/test/java/org/apache/ignite/testsuites/GeoSpatialIndexingTestSuite.java
@@ -20,6 +20,7 @@ package org.apache.ignite.testsuites;
 import junit.framework.TestSuite;
 import org.apache.ignite.internal.processors.query.h2.GridBinaryH2IndexingGeoSelfTest;
 import org.apache.ignite.internal.processors.query.h2.GridH2IndexingGeoSelfTest;
+import org.apache.ignite.internal.processors.query.h2.GridH2IndexingSegmentedGeoSelfTest;
 
 /**
  * Geospatial indexing tests.
@@ -35,6 +36,7 @@ public class GeoSpatialIndexingTestSuite extends TestSuite {
         // Geo.
         suite.addTestSuite(GridH2IndexingGeoSelfTest.class);
         suite.addTestSuite(GridBinaryH2IndexingGeoSelfTest.class);
+        suite.addTestSuite(GridH2IndexingSegmentedGeoSelfTest.class);
 
         return suite;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e69d3c34/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 2f40d87..652d1f3 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -2813,7 +2813,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
          * @param cols Columns.
          */
         private SpatialIndex createH2SpatialIndex(
-            Table tbl,
+            GridH2Table tbl,
             String idxName,
             IndexColumn[] cols
         ) {
@@ -2823,14 +2823,17 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                 Class<?> cls = Class.forName(className);
 
                 Constructor<?> ctor = cls.getConstructor(
-                    Table.class,
+                    GridH2Table.class,
                     String.class,
+                    Integer.TYPE,
                     IndexColumn[].class);
 
                 if (!ctor.isAccessible())
                     ctor.setAccessible(true);
 
-                return (SpatialIndex)ctor.newInstance(tbl, idxName, cols);
+                final int segments = tbl.rowDescriptor().context().config().getQueryParallelism();
+
+                return (SpatialIndex)ctor.newInstance(tbl, idxName, segments, cols);
             }
             catch (Exception e) {
                 throw new IgniteException("Failed to instantiate: " + className, e);
@@ -2845,12 +2848,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
          * @return
          */
         private Index createTreeIndex(String idxName, GridH2Table tbl, boolean pk, List<IndexColumn> columns) {
-            GridCacheContext<?, ?> cctx = tbl.rowDescriptor().context();
-
-            if (cctx != null && cctx.config().getQueryParallelism() > 1)
-                return new GridH2TreeIndex(idxName, tbl, pk, columns, cctx.config().getQueryParallelism());
+            final int segments = tbl.rowDescriptor().context().config().getQueryParallelism();
 
-            return new GridH2TreeIndex(idxName, tbl, pk, columns, 1);
+            return new GridH2TreeIndex(idxName, tbl, pk, columns, segments);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e69d3c34/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
index 131e03b..89d661d 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
@@ -183,7 +183,15 @@ public abstract class GridH2IndexBase extends BaseIndex {
      * @return Index segment ID for current query context.
      */
     protected int threadLocalSegment() {
-       return 0;
+        GridH2QueryContext qctx = GridH2QueryContext.get();
+
+        if(segmentsCount() == 1)
+            return 0;
+
+        if(qctx == null)
+            throw new IllegalStateException("GridH2QueryContext is not initialized.");
+
+        return qctx.segment();
     }
 
     /**
@@ -669,7 +677,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
                 throw new GridH2RetryException("Failed to find node.");
         }
 
-        return new SegmentKey(node, segment(partition));
+        return new SegmentKey(node, segmentForPartition(partition));
     }
 
     /** */
@@ -829,19 +837,16 @@ public abstract class GridH2IndexBase extends BaseIndex {
     }
 
     /** @return Index segments count. */
-    protected int segmentsCount() {
-        return 1;
-    }
+    protected abstract int segmentsCount();
 
     /**
      * @param partition Partition idx.
      * @return Segment ID for given key
      */
-    protected int segment(int partition) {
-        return 0;
+    protected int segmentForPartition(int partition){
+        return segmentsCount() == 1 ? 0 : (partition % segmentsCount());
     }
 
-
     /**
      * Simple cursor from a single node.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/e69d3c34/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
index 0829df0..64ca9ea 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
@@ -50,7 +50,6 @@ import org.jetbrains.annotations.Nullable;
  */
 @SuppressWarnings("ComparatorNotSerializable")
 public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridSearchRowPointer> {
-
     /** */
     private static Field KEY_FIELD;
 
@@ -224,20 +223,6 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
         super.destroy();
     }
 
-
-    /** {@inheritDoc} */
-    protected int threadLocalSegment() {
-        GridH2QueryContext qctx = GridH2QueryContext.get();
-
-        if(segments.length == 1)
-            return 0;
-
-        if(qctx == null)
-            throw new IllegalStateException("GridH2QueryContext is not initialized.");
-
-        return qctx.segment();
-    }
-
     /** {@inheritDoc} */
     @Override public long getRowCount(@Nullable Session ses) {
         IndexingQueryFilter f = threadLocalFilter();
@@ -433,7 +418,7 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
 
     /** {@inheritDoc} */
     @Override public GridH2Row put(GridH2Row row) {
-        int seg = segment(row);
+        int seg = segmentForRow(row);
 
         return segments[seg].put(row, row);
     }
@@ -442,7 +427,7 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
     @Override public GridH2Row remove(SearchRow row) {
         GridSearchRowPointer comparable = comparable(row, 0);
 
-        int seg = segment(row);
+        int seg = segmentForRow(row);
 
         return segments[seg].remove(comparable);
     }
@@ -453,18 +438,10 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
     }
 
     /**
-     * @param partition Parttition idx.
-     * @return index currentSegment Id for given key
-     */
-    protected int segment(int partition) {
-        return partition % segments.length;
-    }
-
-    /**
-     * @param row
-     * @return index currentSegment Id for given row
+     * @param row Table row.
+     * @return Segment ID for given row.
      */
-    private int segment(SearchRow row) {
+    private int segmentForRow(SearchRow row) {
         assert row != null;
 
         CacheObject key;
@@ -491,7 +468,7 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
             else
                 key = ctx.toCacheKeyObject(row.getValue(0));
 
-            return segment(ctx.affinity().partition(key));
+            return segmentForPartition(ctx.affinity().partition(key));
         }
         else
             return 0;


[08/12] ignite git commit: Minor. .

Posted by se...@apache.org.
Minor. .


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f4d36cb6
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f4d36cb6
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f4d36cb6

Branch: refs/heads/ignite-1.9
Commit: f4d36cb683b82bc0d059f183a586be4662818bbc
Parents: 2e90a07
Author: Andrey V. Mashenkov <an...@gmail.com>
Authored: Tue Feb 21 15:29:52 2017 +0300
Committer: Andrey V. Mashenkov <an...@gmail.com>
Committed: Tue Feb 21 15:29:52 2017 +0300

----------------------------------------------------------------------
 .../internal/processors/query/h2/sql/GridSqlQuerySplitter.java     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/f4d36cb6/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
index 165b6b8..779e565 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
@@ -1628,7 +1628,7 @@ public class GridSqlQuerySplitter {
             findParams(((GridSqlSubquery)el).subquery(), params, target, paramIdxs);
         else {
             for (int i = 0; i < el.size(); i++)
-                findParams(el.child(i), params, target, paramIdxs);
+                findParams((GridSqlAst)el.child(i), params, target, paramIdxs);
         }
     }
 


[04/12] ignite git commit: Minor

Posted by se...@apache.org.
Minor


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f684c00a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f684c00a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f684c00a

Branch: refs/heads/ignite-1.9
Commit: f684c00affa25d67557b382e6ffaf2d1de45c964
Parents: e69d3c3
Author: Andrey V. Mashenkov <an...@gmail.com>
Authored: Tue Feb 21 12:00:41 2017 +0300
Committer: Andrey V. Mashenkov <an...@gmail.com>
Committed: Tue Feb 21 12:00:41 2017 +0300

----------------------------------------------------------------------
 .../ignite/internal/processors/query/h2/IgniteH2Indexing.java    | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/f684c00a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 652d1f3..de48eb9 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -2831,7 +2831,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                 if (!ctor.isAccessible())
                     ctor.setAccessible(true);
 
-                final int segments = tbl.rowDescriptor().context().config().getQueryParallelism();
+                final int segments = tbl.rowDescriptor().configuration().getQueryParallelism();
 
                 return (SpatialIndex)ctor.newInstance(tbl, idxName, segments, cols);
             }
@@ -2848,7 +2848,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
          * @return
          */
         private Index createTreeIndex(String idxName, GridH2Table tbl, boolean pk, List<IndexColumn> columns) {
-            final int segments = tbl.rowDescriptor().context().config().getQueryParallelism();
+            final int segments = tbl.rowDescriptor().configuration().getQueryParallelism();
 
             return new GridH2TreeIndex(idxName, tbl, pk, columns, segments);
         }


[05/12] ignite git commit: Merge remote-tracking branch 'apache/ignite-1.9' into ignite-4106-1.9

Posted by se...@apache.org.
Merge remote-tracking branch 'apache/ignite-1.9' into ignite-4106-1.9

# Conflicts:
#	modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
#	modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
#	modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
#	modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
#	modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
#	modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5b774f6d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5b774f6d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5b774f6d

Branch: refs/heads/ignite-1.9
Commit: 5b774f6d9dc7faa60a95b1206d30115b0ef67d16
Parents: f684c00 166e65c
Author: Andrey V. Mashenkov <an...@gmail.com>
Authored: Tue Feb 21 12:10:38 2017 +0300
Committer: Andrey V. Mashenkov <an...@gmail.com>
Committed: Tue Feb 21 12:10:38 2017 +0300

----------------------------------------------------------------------
 .../examples/java8/spark/SharedRDDExample.java  |    4 +-
 .../jdbc2/JdbcAbstractDmlStatementSelfTest.java |   49 +-
 .../jdbc2/JdbcInsertStatementSelfTest.java      |   51 +
 .../jdbc2/JdbcMergeStatementSelfTest.java       |   51 +
 .../internal/jdbc2/JdbcStreamingSelfTest.java   |  189 ++
 .../jdbc2/JdbcUpdateStatementSelfTest.java      |   50 +
 .../jdbc/suite/IgniteJdbcDriverTestSuite.java   |    1 +
 .../org/apache/ignite/IgniteJdbcDriver.java     |   30 +
 .../apache/ignite/IgniteSystemProperties.java   |    6 +
 .../ignite/internal/jdbc2/JdbcConnection.java   |   72 +-
 .../internal/jdbc2/JdbcPreparedStatement.java   |   34 +-
 .../ignite/internal/jdbc2/JdbcStatement.java    |   20 +-
 .../jdbc2/JdbcStreamedPreparedStatement.java    |   59 +
 .../cache/query/GridCacheSqlQuery.java          |   45 +-
 .../processors/query/GridQueryIndexing.java     |   35 +
 .../processors/query/GridQueryProcessor.java    |   63 +-
 .../visor/cache/VisorCacheClearTask.java        |   88 +-
 .../visor/compute/VisorGatewayTask.java         |   30 +-
 .../junits/multijvm/IgniteProcessProxy.java     |    5 +-
 .../query/h2/DmlStatementsProcessor.java        |  267 ++-
 .../processors/query/h2/IgniteH2Indexing.java   |  160 +-
 .../query/h2/dml/UpdatePlanBuilder.java         |    2 +-
 .../query/h2/opt/GridH2CollocationModel.java    |   78 +-
 .../query/h2/opt/GridH2IndexBase.java           |   47 +-
 .../processors/query/h2/sql/DmlAstUtils.java    |   46 +-
 .../processors/query/h2/sql/GridSqlAlias.java   |   13 +-
 .../processors/query/h2/sql/GridSqlArray.java   |    8 +-
 .../processors/query/h2/sql/GridSqlAst.java     |   61 +
 .../processors/query/h2/sql/GridSqlColumn.java  |   85 +-
 .../processors/query/h2/sql/GridSqlConst.java   |    6 +-
 .../processors/query/h2/sql/GridSqlElement.java |   43 +-
 .../query/h2/sql/GridSqlFunction.java           |   16 +-
 .../processors/query/h2/sql/GridSqlJoin.java    |   35 +-
 .../processors/query/h2/sql/GridSqlKeyword.java |    3 +-
 .../query/h2/sql/GridSqlOperation.java          |    6 +-
 .../query/h2/sql/GridSqlOperationType.java      |    4 +-
 .../query/h2/sql/GridSqlParameter.java          |    4 +-
 .../query/h2/sql/GridSqlPlaceholder.java        |    2 +-
 .../processors/query/h2/sql/GridSqlQuery.java   |   80 +-
 .../query/h2/sql/GridSqlQueryParser.java        |  228 ++-
 .../query/h2/sql/GridSqlQuerySplitter.java      | 1634 +++++++++++++++---
 .../processors/query/h2/sql/GridSqlSelect.java  |  121 +-
 .../query/h2/sql/GridSqlStatement.java          |    6 +-
 .../query/h2/sql/GridSqlSubquery.java           |   31 +-
 .../processors/query/h2/sql/GridSqlTable.java   |   19 +-
 .../processors/query/h2/sql/GridSqlType.java    |    6 +-
 .../processors/query/h2/sql/GridSqlUnion.java   |   66 +-
 .../processors/query/h2/sql/GridSqlValue.java   |   25 -
 .../query/h2/twostep/GridMapQueryExecutor.java  |    6 +-
 .../query/h2/twostep/GridMergeIndex.java        |  418 ++++-
 .../query/h2/twostep/GridMergeIndexSorted.java  |  284 +++
 .../h2/twostep/GridMergeIndexUnsorted.java      |   40 +-
 .../h2/twostep/GridReduceQueryExecutor.java     |   56 +-
 .../h2/twostep/msg/GridH2QueryRequest.java      |    9 +-
 .../IgniteCacheAbstractFieldsQuerySelfTest.java |    2 +-
 ...niteCacheAbstractInsertSqlQuerySelfTest.java |    6 +-
 .../IgniteCacheInsertSqlQuerySelfTest.java      |   14 +
 .../query/IgniteSqlSplitterSelfTest.java        |  100 +-
 .../query/h2/sql/GridQueryParsingTest.java      |   41 +-
 modules/spark-2.10/pom.xml                      |   54 +
 modules/spark/pom.xml                           |  183 +-
 .../org/apache/ignite/spark/IgniteContext.scala |   22 +-
 .../spark/JavaEmbeddedIgniteRDDSelfTest.java    |   10 +-
 .../spark/JavaStandaloneIgniteRDDSelfTest.java  |   22 +-
 .../web-console/backend/services/activities.js  |    7 +-
 .../list-of-registered-users.controller.js      |   14 +-
 .../configuration/configuration.module.js       |    6 +-
 .../configuration/summary/summary.worker.js     |   28 +-
 .../console/agent/handlers/RestListener.java    |    9 +-
 modules/yardstick/DEVNOTES-standalone.txt       |    5 +-
 modules/yardstick/README.txt                    |   85 +-
 .../config/benchmark-atomic-win.properties      |    2 +-
 .../config/benchmark-atomic.properties          |   35 +-
 .../config/benchmark-bin-identity.properties    |   16 +-
 .../config/benchmark-cache-load-win.properties  |    2 +-
 .../config/benchmark-cache-load.properties      |    4 +-
 .../config/benchmark-client-mode.properties     |   68 +-
 .../config/benchmark-compute-win.properties     |    2 +-
 .../config/benchmark-compute.properties         |   30 +-
 .../config/benchmark-failover.properties        |    2 +-
 .../yardstick/config/benchmark-full.properties  |   62 +-
 .../config/benchmark-multicast.properties       |  107 +-
 .../config/benchmark-put-indexed-val.properties |   23 +-
 .../benchmark-query-put-separated.properties    |    3 +-
 .../config/benchmark-query-win.properties       |    2 +-
 .../yardstick/config/benchmark-query.properties |   33 +-
 .../config/benchmark-remote-sample.properties   |   80 +
 .../config/benchmark-remote.properties          |  119 ++
 .../config/benchmark-sample.properties          |   22 +-
 .../config/benchmark-sql-dml.properties         |   36 +-
 .../yardstick/config/benchmark-store.properties |    3 +-
 .../config/benchmark-tx-win.properties          |    2 +-
 .../yardstick/config/benchmark-tx.properties    |   33 +-
 .../yardstick/config/benchmark-win.properties   |    2 +-
 modules/yardstick/config/benchmark.properties   |   76 +-
 .../yardstick/config/ignite-remote-config.xml   |   47 +
 .../test-max-int-values-offheap.properties      |    3 +-
 .../test-max-int-values-onheap.properties       |    3 +-
 .../config/test-max-int-values-swap.properties  |    3 +-
 modules/yardstick/pom-standalone.xml            |    2 +-
 modules/yardstick/pom.xml                       |    2 +-
 parent/pom.xml                                  |    3 +-
 pom.xml                                         |    5 +-
 103 files changed, 4874 insertions(+), 1363 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/5b774f6d/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
index 37f0ade,2abb3a9..5d2b728
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
@@@ -84,26 -85,48 +85,39 @@@ public interface GridQueryIndexing 
      /**
       * Queries individual fields (generally used by JDBC drivers).
       *
 -     * @param spaceName Space name.
 +     * @param cctx Cache context.
       * @param qry Query.
 -     * @param params Query parameters.
       * @param filter Space name and key filter.
 -     * @param enforceJoinOrder Enforce join order of tables in the query.
 -     * @param timeout Query timeout in milliseconds.
       * @param cancel Query cancel.
 -     * @return Query result.
 -     * @throws IgniteCheckedException If failed.
 +     * @return Cursor.
       */
 -    public GridQueryFieldsResult queryLocalSqlFields(@Nullable String spaceName, String qry,
 -        Collection<Object> params, IndexingQueryFilter filter, boolean enforceJoinOrder, int timeout,
 -        GridQueryCancel cancel) throws IgniteCheckedException;
 +    public <K, V> QueryCursor<List<?>> queryLocalSqlFields(GridCacheContext<?, ?> cctx, SqlFieldsQuery qry,
 +        IndexingQueryFilter filter, GridQueryCancel cancel) throws IgniteCheckedException;
  
      /**
+      * Perform a MERGE statement using data streamer as receiver.
+      *
+      * @param spaceName Space name.
+      * @param qry Query.
+      * @param params Query parameters.
+      * @param streamer Data streamer to feed data to.
+      * @return Query result.
+      * @throws IgniteCheckedException If failed.
+      */
+     public long streamUpdateQuery(@Nullable final String spaceName, final String qry,
+          @Nullable final Object[] params, IgniteDataStreamer<?, ?> streamer) throws IgniteCheckedException;
+ 
+     /**
       * Executes regular query.
       *
 -     * @param spaceName Space name.
 +     * @param cctx Cache context.
       * @param qry Query.
 -     * @param alias Table alias used in Query.
 -     * @param params Query parameters.
 -     * @param type Query return type.
       * @param filter Space name and key filter.
 -     * @return Queried rows.
 -     * @throws IgniteCheckedException If failed.
 +     * @param keepBinary Keep binary flag.
 +     * @return Cursor.
       */
 -    public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> queryLocalSql(@Nullable String spaceName, String qry,
 -        String alias, Collection<Object> params, GridQueryTypeDescriptor type, IndexingQueryFilter filter)
 -        throws IgniteCheckedException;
 +    public <K, V> QueryCursor<Cache.Entry<K,V>> queryLocalSql(GridCacheContext<?, ?> cctx, SqlQuery qry,
 +        IndexingQueryFilter filter, boolean keepBinary) throws IgniteCheckedException;
  
      /**
       * Executes text query.

http://git-wip-us.apache.org/repos/asf/ignite/blob/5b774f6d/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/5b774f6d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --cc modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index de48eb9,15e7fc6..057fd10
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@@ -93,8 -91,8 +94,9 @@@ import org.apache.ignite.internal.proce
  import org.apache.ignite.internal.processors.query.GridQueryIndexing;
  import org.apache.ignite.internal.processors.query.GridQueryProperty;
  import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
+ import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
  import org.apache.ignite.internal.processors.query.IgniteSQLException;
 +import org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode;
  import org.apache.ignite.internal.processors.query.h2.opt.GridH2DefaultTableEngine;
  import org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOffheap;
  import org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOnheap;
@@@ -1352,9 -1281,7 +1400,9 @@@ public class IgniteH2Indexing implement
          final boolean distributedJoins = qry.isDistributedJoins() && cctx.isPartitioned();
          final boolean grpByCollocated = qry.isCollocated();
  
 +        final DistributedJoinMode distributedJoinMode = distributedJoinMode(qry.isLocal(), distributedJoins);
 +
-         GridCacheTwoStepQuery twoStepQry;
+         GridCacheTwoStepQuery twoStepQry = null;
          List<GridQueryFieldMetadata> meta;
  
          final TwoStepCachedQueryKey cachedQryKey = new TwoStepCachedQueryKey(space, sqlQry, grpByCollocated,
@@@ -1368,12 -1295,13 +1416,13 @@@
          else {
              final UUID locNodeId = ctx.localNodeId();
  
-             setupConnection(c, distributedJoins, enforceJoinOrder);
+             // Here we will just parse the statement, no need to optimize it at all.
+             setupConnection(c, /*distributedJoins*/false, /*enforceJoinOrder*/true);
  
              GridH2QueryContext.set(new GridH2QueryContext(locNodeId, locNodeId, 0, PREPARE)
 -                .distributedJoins(distributedJoins));
 +                .distributedJoinMode(distributedJoinMode));
  
-             PreparedStatement stmt;
+             PreparedStatement stmt = null;
  
              boolean cachesCreated = false;
  

http://git-wip-us.apache.org/repos/asf/ignite/blob/5b774f6d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
----------------------------------------------------------------------
diff --cc modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
index 89d661d,bdfddd5..31057c7
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
@@@ -262,14 -245,15 +262,15 @@@ public abstract class GridH2IndexBase e
       * @param filter Current filter.
       * @return Multiplier.
       */
-     public int getDistributedMultiplier(Session ses, TableFilter[] filters, int filter) {
+     public final int getDistributedMultiplier(Session ses, TableFilter[] filters, int filter) {
          GridH2QueryContext qctx = GridH2QueryContext.get();
  
-         // We do complex optimizations with respect to distributed joins only on prepare stage
-         // because on run stage reordering of joined tables by Optimizer is explicitly disabled
-         // and thus multiplier will be always the same, so it will not affect choice of index.
+         // We do optimizations with respect to distributed joins only on PREPARE stage only.
+         // Notice that we check for isJoinBatchEnabled, because we can do multiple different
+         // optimization passes on PREPARE stage.
          // Query expressions can not be distributed as well.
-         if (qctx == null || qctx.type() != PREPARE || qctx.distributedJoinMode() == OFF || ses.isPreparingQueryExpression())
 -        if (qctx == null || qctx.type() != PREPARE || !qctx.distributedJoins() ||
++        if (qctx == null || qctx.type() != PREPARE || qctx.distributedJoinMode() == OFF ||
+             !ses.isJoinBatchEnabled() || ses.isPreparingQueryExpression())
              return GridH2CollocationModel.MULTIPLIER_COLLOCATED;
  
          // We have to clear this cache because normally sub-query plan cost does not depend on anything
@@@ -391,18 -375,16 +392,18 @@@
          if (affCol != null) {
              affColId = affCol.column.getColumnId();
              int[] masks = filter.getMasks();
-             ucast = masks != null && masks[affColId] == IndexCondition.EQUALITY;
-         }
-         else {
-             affColId = -1;
-             ucast = false;
+ 
+             if (masks != null) {
+                 ucast = (masks[affColId] & IndexCondition.EQUALITY) != 0 ||
+                     (masks[KEY_COL] & IndexCondition.EQUALITY) != 0;
+             }
          }
  
 -        GridCacheContext<?,?> cctx = getTable().rowDescriptor().context();
 +        GridCacheContext<?, ?> cctx = getTable().rowDescriptor().context();
  
 -        return new DistributedLookupBatch(cctx, ucast, affColId);
 +        boolean isLocal = qctx.distributedJoinMode() == LOCAL_ONLY;
 +
 +        return new DistributedLookupBatch(cctx, ucast, affColId, isLocal);
      }
  
      /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/5b774f6d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --cc modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 5027c9a,b0fa639..72a34a6
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@@ -86,8 -84,7 +86,9 @@@ import static org.apache.ignite.events.
  import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ;
  import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE;
  import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
+ import static org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing.setupConnection;
 +import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF;
 +import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.distributedJoinMode;
  import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.MAP;
  import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.REPLICATED;
  import static org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExecutor.QUERY_POOL;
@@@ -433,7 -435,8 +434,8 @@@ public class GridMapQueryExecutor 
              req.partitions(),
              null,
              req.pageSize(),
 -            false,
 +            OFF,
+             true,
              req.timeout());
      }
  
@@@ -491,7 -457,8 +493,8 @@@
              parts,
              req.tables(),
              req.pageSize(),
 -            req.isFlagSet(GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS),
 +            joinMode,
+             req.isFlagSet(GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER),
              req.timeout());
      }
  
@@@ -519,7 -484,8 +522,8 @@@
          int[] parts,
          Collection<String> tbls,
          int pageSize,
 -        boolean distributedJoins,
 +        DistributedJoinMode distributedJoinMode,
+         boolean enforceJoinOrder,
          int timeout
      ) {
          // Prepare to run queries.
@@@ -580,8 -545,7 +584,8 @@@
  
              Connection conn = h2.connectionForSpace(mainCctx.name());
  
 -            setupConnection(conn, distributedJoins, enforceJoinOrder);
 +            // Here we enforce join order to have the same behavior on all the nodes.
-             h2.setupConnection(conn, distributedJoinMode != OFF, true);
++            h2.setupConnection(conn, distributedJoinMode != OFF, enforceJoinOrder);
  
              GridH2QueryContext.set(qctx);
  

http://git-wip-us.apache.org/repos/asf/ignite/blob/5b774f6d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/5b774f6d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --cc modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 8837046,f813cae..3cfaae9
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@@ -62,12 -62,11 +62,11 @@@ import org.apache.ignite.internal.proce
  import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
  import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
  import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
 -import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
  import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator;
  import org.apache.ignite.internal.processors.query.GridQueryCancel;
 +import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
  import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
  import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext;
- import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQuerySplitter;
  import org.apache.ignite.internal.processors.query.h2.sql.GridSqlType;
  import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryCancelRequest;
  import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryFailResponse;
@@@ -100,8 -99,9 +99,10 @@@ import org.jsr166.ConcurrentHashMap8
  
  import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE;
  import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL_FIELDS;
+ import static org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing.setupConnection;
 +import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF;
  import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.REDUCE;
+ import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlQuerySplitter.mergeTableIdentifier;
  
  /**
   * Reduce query executor.
@@@ -616,6 -610,12 +618,15 @@@ public class GridReduceQueryExecutor 
                  if (oldStyle && distributedJoins)
                      throw new CacheException("Failed to enable distributed joins. Topology contains older data nodes.");
  
+                 // Always enforce join order on map side to have consistent behavior.
+                 int flags = GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER;
+ 
+                 if (distributedJoins)
+                     flags |= GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS;
+ 
++                if(qry.isLocal())
++                    flags |= GridH2QueryRequest.FLAG_IS_LOCAL;
++
                  if (send(nodes,
                      oldStyle ?
                          new GridQueryRequest(qryReqId,
@@@ -634,13 -634,14 +645,12 @@@
                              .tables(distributedJoins ? qry.tables() : null)
                              .partitions(convert(partsMap))
                              .queries(mapQrys)
-                             .flags((qry.isLocal() ? GridH2QueryRequest.FLAG_IS_LOCAL : 0) |
-                                 (distributedJoins ? GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS : 0))
+                             .flags(flags)
                              .timeout(timeoutMillis),
                      oldStyle && partsMap != null ? new ExplicitPartitionsSpecializer(partsMap) : null,
 -                    distributedJoins)
 -                    ) {
 -                    awaitAllReplies(r, nodes);
 +                    false)) {
  
 -                    cancel.checkCancelled();
 +                    awaitAllReplies(r, nodes, cancel);
  
                      Object state = r.state.get();
  
@@@ -700,10 -700,10 +709,10 @@@
  
                          UUID locNodeId = ctx.localNodeId();
  
-                         h2.setupConnection(r.conn, false, enforceJoinOrder);
+                         setupConnection(r.conn, false, enforceJoinOrder);
  
                          GridH2QueryContext.set(new GridH2QueryContext(locNodeId, locNodeId, qryReqId, REDUCE)
 -                            .pageSize(r.pageSize).distributedJoins(false));
 +                            .pageSize(r.pageSize).distributedJoinMode(OFF));
  
                          try {
                              if (qry.explain())

http://git-wip-us.apache.org/repos/asf/ignite/blob/5b774f6d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
----------------------------------------------------------------------
diff --cc modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
index ec49aff,e5dbf33..0ad534c
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
@@@ -51,10 -51,10 +51,15 @@@ public class GridH2QueryRequest impleme
      public static int FLAG_DISTRIBUTED_JOINS = 1;
  
      /**
+      * Remote map query executor will enforce join order for the received map queries.
+      */
+     public static int FLAG_ENFORCE_JOIN_ORDER = 1 << 1;
+ 
++    /**
 +     * Restrict distributed joins range-requests to local index segments. Range requests to other nodes will not be sent.
 +     */
-     public static int FLAG_IS_LOCAL = 2;
++    public static int FLAG_IS_LOCAL = 1 << 2;
 +
      /** */
      private long reqId;
  

http://git-wip-us.apache.org/repos/asf/ignite/blob/5b774f6d/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
----------------------------------------------------------------------
diff --cc modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
index 4ae2f91,432ed34..37dea47
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
@@@ -34,9 -34,8 +34,10 @@@ import org.apache.ignite.cache.CacheKey
  import org.apache.ignite.cache.CacheMode;
  import org.apache.ignite.cache.CachePeekMode;
  import org.apache.ignite.cache.affinity.Affinity;
+ import org.apache.ignite.cache.affinity.AffinityKeyMapped;
  import org.apache.ignite.cache.query.QueryCursor;
 +import org.apache.ignite.cache.affinity.AffinityKeyMapped;
 +import org.apache.ignite.cache.query.QueryCursor;
  import org.apache.ignite.cache.query.SqlFieldsQuery;
  import org.apache.ignite.cache.query.annotations.QuerySqlField;
  import org.apache.ignite.cluster.ClusterNode;


[12/12] ignite git commit: Merge branch 'ignite-4106-1.9' of https://github.com/gridgain/apache-ignite into ignite-1.9

Posted by se...@apache.org.
Merge branch 'ignite-4106-1.9' of https://github.com/gridgain/apache-ignite into ignite-1.9


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/658b4ad5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/658b4ad5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/658b4ad5

Branch: refs/heads/ignite-1.9
Commit: 658b4ad5a3bf17fcb3fb9f364e47b4e28f5eea64
Parents: 963a9d9 4dc3694
Author: Sergi Vladykin <se...@gmail.com>
Authored: Wed Feb 22 13:17:43 2017 +0300
Committer: Sergi Vladykin <se...@gmail.com>
Committed: Wed Feb 22 13:17:43 2017 +0300

----------------------------------------------------------------------
 .../configuration/CacheConfiguration.java       |  48 +++
 .../processors/cache/GridCacheProcessor.java    |   3 +
 .../processors/cache/IgniteCacheProxy.java      |   6 +-
 .../closure/GridClosureProcessor.java           |   2 +-
 .../processors/query/GridQueryIndexing.java     |  27 +-
 .../processors/query/GridQueryProcessor.java    | 141 ++-----
 .../messages/GridQueryNextPageRequest.java      |  29 +-
 .../messages/GridQueryNextPageResponse.java     |  29 +-
 .../query/h2/opt/GridH2SpatialIndex.java        |  56 ++-
 .../query/h2/GridH2IndexingGeoSelfTest.java     | 405 ++++++++++++++-----
 .../h2/GridH2IndexingSegmentedGeoSelfTest.java  |  37 ++
 .../testsuites/GeoSpatialIndexingTestSuite.java |   2 +
 .../cache/query/GridCacheTwoStepQuery.java      |  17 +
 .../processors/query/h2/IgniteH2Indexing.java   | 238 +++++++++--
 .../query/h2/opt/DistributedJoinMode.java       |  51 +++
 .../query/h2/opt/GridH2IndexBase.java           | 303 ++++++++++----
 .../query/h2/opt/GridH2QueryContext.java        |  84 +++-
 .../query/h2/opt/GridH2TreeIndex.java           | 151 ++++---
 .../query/h2/sql/GridSqlQuerySplitter.java      |   2 +-
 .../query/h2/twostep/GridMapQueryExecutor.java  | 233 ++++++++---
 .../query/h2/twostep/GridMergeIndex.java        |  39 +-
 .../query/h2/twostep/GridMergeIndexSorted.java  |   4 +-
 .../h2/twostep/GridReduceQueryExecutor.java     |  69 ++--
 .../h2/twostep/msg/GridH2IndexRangeRequest.java |  60 ++-
 .../twostep/msg/GridH2IndexRangeResponse.java   |  62 ++-
 .../h2/twostep/msg/GridH2QueryRequest.java      |   5 +
 .../query/IgniteSqlSegmentedIndexSelfTest.java  | 263 ++++++++++++
 .../query/IgniteSqlSplitterSelfTest.java        | 136 ++++++-
 .../h2/GridIndexingSpiAbstractSelfTest.java     |  26 +-
 .../FetchingQueryCursorStressTest.java          | 277 +++++++++++++
 .../IgniteCacheQuerySelfTestSuite.java          |   2 +
 31 files changed, 2262 insertions(+), 545 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/658b4ad5/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/658b4ad5/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
----------------------------------------------------------------------